mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
202 lines
4.3 KiB
Go
202 lines
4.3 KiB
Go
package utils
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/cenkalti/backoff/v4"
|
|
"github.com/crawlab-team/crawlab/core/constants"
|
|
"github.com/crawlab-team/crawlab/core/models/models"
|
|
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
|
|
"github.com/crawlab-team/crawlab/db/generic"
|
|
"github.com/crawlab-team/crawlab/trace"
|
|
"github.com/elastic/go-elasticsearch/v8"
|
|
"github.com/elastic/go-elasticsearch/v8/esapi"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"time"
|
|
)
|
|
|
|
func GetElasticsearchClient(ds *models.DataSource) (c *elasticsearch.Client, err error) {
|
|
return getElasticsearchClient(context.Background(), ds)
|
|
}
|
|
|
|
func GetElasticsearchClientWithTimeout(ds *models.DataSource, timeout time.Duration) (c *elasticsearch.Client, err error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
return getElasticsearchClient(ctx, ds)
|
|
}
|
|
|
|
func getElasticsearchClient(ctx context.Context, ds *models.DataSource) (c *elasticsearch.Client, err error) {
|
|
// normalize settings
|
|
host := ds.Host
|
|
port := ds.Port
|
|
if ds.Host == "" {
|
|
host = constants.DefaultHost
|
|
}
|
|
if ds.Port == 0 {
|
|
port = constants.DefaultElasticsearchPort
|
|
}
|
|
|
|
// es hosts
|
|
addresses := []string{
|
|
fmt.Sprintf("http://%s:%s", host, port),
|
|
}
|
|
|
|
// retry backoff
|
|
rb := backoff.NewExponentialBackOff()
|
|
|
|
// es client options
|
|
cfg := elasticsearch.Config{
|
|
Addresses: addresses,
|
|
Username: ds.Username,
|
|
Password: ds.Password,
|
|
RetryBackoff: func(i int) time.Duration {
|
|
if i == 1 {
|
|
rb.Reset()
|
|
}
|
|
return rb.NextBackOff()
|
|
},
|
|
}
|
|
|
|
// es client
|
|
done := make(chan struct{})
|
|
go func() {
|
|
c, err = elasticsearch.NewClient(cfg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
var res *esapi.Response
|
|
res, err = c.Info()
|
|
fmt.Println(res)
|
|
close(done)
|
|
}()
|
|
|
|
// wait for done
|
|
select {
|
|
case <-ctx.Done():
|
|
if ctx.Err() != nil {
|
|
err = ctx.Err()
|
|
}
|
|
case <-done:
|
|
}
|
|
|
|
return c, err
|
|
}
|
|
|
|
func GetElasticsearchClientWithTimeoutV2(ds *models2.DatabaseV2, timeout time.Duration) (c *elasticsearch.Client, err error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
return getElasticsearchClientV2(ctx, ds)
|
|
}
|
|
|
|
func getElasticsearchClientV2(ctx context.Context, ds *models2.DatabaseV2) (c *elasticsearch.Client, err error) {
|
|
// normalize settings
|
|
host := ds.Host
|
|
port := ds.Port
|
|
if ds.Host == "" {
|
|
host = constants.DefaultHost
|
|
}
|
|
if ds.Port == 0 {
|
|
port = constants.DefaultElasticsearchPort
|
|
}
|
|
|
|
// es hosts
|
|
addresses := []string{
|
|
fmt.Sprintf("http://%s:%s", host, port),
|
|
}
|
|
|
|
// retry backoff
|
|
rb := backoff.NewExponentialBackOff()
|
|
|
|
// es client options
|
|
cfg := elasticsearch.Config{
|
|
Addresses: addresses,
|
|
Username: ds.Username,
|
|
Password: ds.Password,
|
|
RetryBackoff: func(i int) time.Duration {
|
|
if i == 1 {
|
|
rb.Reset()
|
|
}
|
|
return rb.NextBackOff()
|
|
},
|
|
}
|
|
|
|
// es client
|
|
done := make(chan struct{})
|
|
go func() {
|
|
c, err = elasticsearch.NewClient(cfg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
var res *esapi.Response
|
|
res, err = c.Info()
|
|
fmt.Println(res)
|
|
close(done)
|
|
}()
|
|
|
|
// wait for done
|
|
select {
|
|
case <-ctx.Done():
|
|
if ctx.Err() != nil {
|
|
err = ctx.Err()
|
|
}
|
|
case <-done:
|
|
}
|
|
|
|
return c, err
|
|
}
|
|
|
|
func GetElasticsearchQuery(query generic.ListQuery) (buf *bytes.Buffer) {
|
|
q := map[string]interface{}{}
|
|
if len(query) > 0 {
|
|
match := getElasticsearchQueryMatch(query)
|
|
q["query"] = map[string]interface{}{
|
|
"match": match,
|
|
}
|
|
}
|
|
buf = &bytes.Buffer{}
|
|
if err := json.NewEncoder(buf).Encode(q); err != nil {
|
|
trace.PrintError(err)
|
|
}
|
|
return buf
|
|
}
|
|
|
|
func GetElasticsearchQueryWithOptions(query generic.ListQuery, opts *generic.ListOptions) (buf *bytes.Buffer) {
|
|
q := map[string]interface{}{
|
|
"size": opts.Limit,
|
|
"from": opts.Skip,
|
|
// TODO: sort
|
|
}
|
|
if len(query) > 0 {
|
|
match := getElasticsearchQueryMatch(query)
|
|
q["query"] = map[string]interface{}{
|
|
"match": match,
|
|
}
|
|
}
|
|
buf = &bytes.Buffer{}
|
|
if err := json.NewEncoder(buf).Encode(q); err != nil {
|
|
trace.PrintError(err)
|
|
}
|
|
return buf
|
|
}
|
|
|
|
func getElasticsearchQueryMatch(query generic.ListQuery) (match map[string]interface{}) {
|
|
match = map[string]interface{}{}
|
|
for _, c := range query {
|
|
switch c.Value.(type) {
|
|
case primitive.ObjectID:
|
|
c.Value = c.Value.(primitive.ObjectID).Hex()
|
|
}
|
|
switch c.Op {
|
|
case generic.OpEqual:
|
|
match[c.Key] = c.Value
|
|
default:
|
|
match[c.Key] = map[string]interface{}{
|
|
c.Op: c.Value,
|
|
}
|
|
}
|
|
}
|
|
return match
|
|
}
|