Files
crawlab/core/ds/kafka.go

117 lines
2.6 KiB
Go

package ds
import (
"github.com/cenkalti/backoff/v4"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/db/generic"
"github.com/crawlab-team/crawlab/trace"
"github.com/segmentio/kafka-go"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)
type KafkaService struct {
// dependencies
modelSvc service.ModelService
// internals
dc *models.DataCollection // models.DataCollection
ds *models.DataSource // models.DataSource
c *kafka.Conn // kafka.Conn
rb backoff.BackOff
t time.Time
}
func (svc *KafkaService) Insert(records ...interface{}) (err error) {
var messages []kafka.Message
for _, r := range records {
switch r.(type) {
case entity.Result:
d := r.(entity.Result)
messages = append(messages, kafka.Message{
Topic: svc.ds.Database,
Key: []byte(d.GetTaskId().Hex()),
Value: d.Bytes(),
})
}
}
_, err = svc.c.WriteMessages(messages...)
if err != nil {
return trace.TraceError(err)
}
return nil
}
func (svc *KafkaService) List(query generic.ListQuery, opts *generic.ListOptions) (results []interface{}, err error) {
// N/A
return nil, nil
}
func (svc *KafkaService) Count(query generic.ListQuery) (n int, err error) {
// N/A
return 0, nil
}
func NewDataSourceKafkaService(colId primitive.ObjectID, dsId primitive.ObjectID) (svc2 interfaces.ResultService, err error) {
// service
svc := &KafkaService{}
// dependency injection
svc.modelSvc, err = service.GetService()
if err != nil {
return nil, err
}
// data source
if dsId.IsZero() {
svc.ds = &models.DataSource{}
} else {
svc.ds, err = svc.modelSvc.GetDataSourceById(dsId)
if err != nil {
return nil, err
}
}
// data source defaults
if svc.ds.Host == "" {
svc.ds.Host = constants.DefaultHost
}
if svc.ds.Port == 0 {
svc.ds.Port = constants.DefaultKafkaPort
}
// data source password
pwd, err := svc.modelSvc.GetPasswordById(svc.ds.Id)
if err == nil {
svc.ds.Password, err = utils.DecryptAES(pwd.Password)
if err != nil {
return nil, err
}
}
// data collection
svc.dc, err = svc.modelSvc.GetDataCollectionById(colId)
if err != nil {
return nil, err
}
return svc, nil
}
func (svc *KafkaService) Index(fields []string) {
// TODO: implement me
}
func (svc *KafkaService) SetTime(t time.Time) {
svc.t = t
}
func (svc *KafkaService) GetTime() (t time.Time) {
return svc.t
}