refactor: integrated database services into task data insert

This commit is contained in:
Marvin Zhang
2024-10-08 18:41:36 +08:00
parent 7fb9fc7d9f
commit 4652d27e0a
12 changed files with 161 additions and 264 deletions

View File

@@ -1,12 +1,16 @@
package stats
import (
log2 "github.com/apex/log"
"github.com/crawlab-team/crawlab/core/database"
interfaces2 "github.com/crawlab-team/crawlab/core/database/interfaces"
"github.com/crawlab-team/crawlab/core/interfaces"
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/service"
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/core/result"
"github.com/crawlab-team/crawlab/core/task/log"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/db/mongo"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -14,15 +18,23 @@ import (
"time"
)
type databaseServiceItem struct {
taskId primitive.ObjectID
dbId primitive.ObjectID
dbSvc interfaces2.DatabaseService
tableName string
time time.Time
}
type ServiceV2 struct {
// dependencies
nodeCfgSvc interfaces.NodeConfigService
// internals
mu sync.Mutex
resultServices sync.Map
rsTtl time.Duration
logDriver log.Driver
mu sync.Mutex
databaseServiceItems map[string]*databaseServiceItem
databaseServiceTll time.Duration
logDriver log.Driver
}
func (svc *ServiceV2) Init() (err error) {
@@ -30,15 +42,39 @@ func (svc *ServiceV2) Init() (err error) {
return nil
}
func (svc *ServiceV2) InsertData(id primitive.ObjectID, records ...interface{}) (err error) {
resultSvc, err := svc.getResultService(id)
func (svc *ServiceV2) InsertData(taskId primitive.ObjectID, records ...map[string]interface{}) (err error) {
count := 0
item, err := svc.getDatabaseServiceItem(taskId)
if err != nil {
return err
}
if err := resultSvc.Insert(records...); err != nil {
return err
dbId := item.dbId
dbSvc := item.dbSvc
tableName := item.tableName
if utils.IsPro() && dbSvc != nil {
for _, record := range records {
if err := dbSvc.CreateRow(dbId, "", tableName, record); err != nil {
log2.Errorf("failed to insert data: %v", err)
continue
}
count++
}
} else {
var records2 []interface{}
for _, record := range records {
records2 = append(records2, record)
}
_, err = mongo.GetMongoCol(tableName).InsertMany(records2)
if err != nil {
log2.Errorf("failed to insert data: %v", err)
return err
}
count = len(records)
}
go svc.updateTaskStats(id, len(records))
go svc.updateTaskStats(taskId, count)
return nil
}
@@ -46,38 +82,52 @@ func (svc *ServiceV2) InsertLogs(id primitive.ObjectID, logs ...string) (err err
return svc.logDriver.WriteLines(id.Hex(), logs)
}
func (svc *ServiceV2) getResultService(id primitive.ObjectID) (resultSvc interfaces.ResultService, err error) {
func (svc *ServiceV2) getDatabaseServiceItem(taskId primitive.ObjectID) (item *databaseServiceItem, err error) {
// atomic operation
svc.mu.Lock()
defer svc.mu.Unlock()
// attempt to get from cache
res, _ := svc.resultServices.Load(id.Hex())
if res != nil {
item, ok := svc.databaseServiceItems[taskId.Hex()]
if ok {
// hit in cache
resultSvc, ok := res.(interfaces.ResultService)
resultSvc.SetTime(time.Now())
if ok {
return resultSvc, nil
}
item.time = time.Now()
return item, nil
}
// task
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(taskId)
if err != nil {
return nil, err
}
// result service
resultSvc, err = result.GetResultService(t.SpiderId)
// spider
s, err := service.NewModelServiceV2[models2.SpiderV2]().GetById(t.SpiderId)
if err != nil {
return nil, err
}
// database service
var dbSvc interfaces2.DatabaseService
if utils.IsPro() {
if dbRegSvc := database.GetDatabaseRegistryService(); dbRegSvc != nil {
dbSvc, err = dbRegSvc.GetDatabaseService(s.DataSourceId)
if err != nil {
return nil, err
}
}
}
// store in cache
svc.resultServices.Store(id.Hex(), resultSvc)
svc.databaseServiceItems[taskId.Hex()] = &databaseServiceItem{
taskId: taskId,
dbId: s.DataSourceId,
dbSvc: dbSvc,
tableName: s.ColName,
time: time.Now(),
}
return resultSvc, nil
return item, nil
}
func (svc *ServiceV2) updateTaskStats(id primitive.ObjectID, resultCount int) {
@@ -96,13 +146,11 @@ func (svc *ServiceV2) cleanup() {
// atomic operation
svc.mu.Lock()
svc.resultServices.Range(func(key, value interface{}) bool {
rs := value.(interfaces.ResultService)
if time.Now().After(rs.GetTime().Add(svc.rsTtl)) {
svc.resultServices.Delete(key)
for k, v := range svc.databaseServiceItems {
if time.Now().After(v.time.Add(svc.databaseServiceTll)) {
delete(svc.databaseServiceItems, k)
}
return true
})
}
svc.mu.Unlock()
@@ -113,8 +161,9 @@ func (svc *ServiceV2) cleanup() {
func NewTaskStatsServiceV2() (svc2 *ServiceV2, err error) {
// service
svc := &ServiceV2{
mu: sync.Mutex{},
resultServices: sync.Map{},
mu: sync.Mutex{},
databaseServiceItems: map[string]*databaseServiceItem{},
databaseServiceTll: 10 * time.Minute,
}
svc.nodeCfgSvc = nodeconfig.GetNodeConfigService()