From 4f52936ad88a817b1e4bf9115f54894b41096c59 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 8 Oct 2024 18:41:36 +0800 Subject: [PATCH] refactor: integrated database services into task data insert --- core/controllers/spider_v2.go | 60 +--------- core/controllers/task_v2.go | 44 +++---- core/{ => database}/entity/database.go | 0 .../interfaces/database_registry_service.go | 11 ++ core/database/interfaces/database_service.go | 27 +++++ core/database/registry_service.go | 15 +++ core/grpc/server/task_server_v2.go | 2 +- core/models/models/v2/spider_v2.go | 4 +- core/result/service.go | 8 +- core/result/test/base.go | 76 ------------ core/result/test/service_test.go | 67 ----------- core/task/stats/service_v2.go | 111 +++++++++++++----- 12 files changed, 161 insertions(+), 264 deletions(-) rename core/{ => database}/entity/database.go (100%) create mode 100644 core/database/interfaces/database_registry_service.go create mode 100644 core/database/interfaces/database_service.go create mode 100644 core/database/registry_service.go delete mode 100644 core/result/test/base.go delete mode 100644 core/result/test/service_test.go diff --git a/core/controllers/spider_v2.go b/core/controllers/spider_v2.go index bcd0b5aa..76d27038 100644 --- a/core/controllers/spider_v2.go +++ b/core/controllers/spider_v2.go @@ -3,7 +3,6 @@ package controllers import ( "errors" "github.com/apex/log" - "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/fs" "github.com/crawlab-team/crawlab/core/interfaces" models2 "github.com/crawlab-team/crawlab/core/models/models/v2" @@ -48,8 +47,8 @@ func GetSpiderById(c *gin.Context) { } } - // data collection - if !s.ColId.IsZero() { + // data collection (compatible to old version) # TODO: remove in the future + if s.ColName == "" && !s.ColId.IsZero() { col, err := service.NewModelServiceV2[models2.DataCollectionV2]().GetById(s.ColId) if err != nil { if !errors.Is(err, mongo2.ErrNoDocuments) { @@ -252,12 +251,6 @@ func PostSpider(c *gin.Context) { return } - // upsert data collection - if err := upsertSpiderDataCollection(&s); err != nil { - HandleErrorInternalServerError(c, err) - return - } - // user u := GetUserFromContextV2(c) @@ -311,12 +304,6 @@ func PutSpiderById(c *gin.Context) { return } - // upsert data collection - if err := upsertSpiderDataCollection(&s); err != nil { - HandleErrorInternalServerError(c, err) - return - } - u := GetUserFromContextV2(c) modelSvc := service.NewModelServiceV2[models2.SpiderV2]() @@ -773,49 +760,6 @@ func getSpiderFsSvcById(id primitive.ObjectID) (svc interfaces.FsServiceV2, err return getSpiderFsSvc(s) } -func upsertSpiderDataCollection(s *models2.SpiderV2) (err error) { - modelSvc := service.NewModelServiceV2[models2.DataCollectionV2]() - if s.ColId.IsZero() { - // validate - if s.ColName == "" { - return errors.New("data collection name is required") - } - // no id - dc, err := modelSvc.GetOne(bson.M{"name": s.ColName}, nil) - if err != nil { - if errors.Is(err, mongo2.ErrNoDocuments) { - // not exists, add new - dc = &models2.DataCollectionV2{Name: s.ColName} - dcId, err := modelSvc.InsertOne(*dc) - if err != nil { - return err - } - dc.SetId(dcId) - } else { - // error - return err - } - } - s.ColId = dc.Id - - // create index - _ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{constants.TaskKey: 1}}) - _ = mongo.GetMongoCol(dc.Name).CreateIndex(mongo2.IndexModel{Keys: bson.M{constants.HashKey: 1}}) - } else { - // with id - dc, err := modelSvc.GetById(s.ColId) - if err != nil { - return err - } - s.ColId = dc.Id - } - return nil -} - -func UpsertSpiderDataCollection(s *models2.SpiderV2) (err error) { - return upsertSpiderDataCollection(s) -} - func getSpiderRootPath(c *gin.Context) (rootPath string, err error) { // spider id id, err := primitive.ObjectIDFromHex(c.Param("id")) diff --git a/core/controllers/task_v2.go b/core/controllers/task_v2.go index 5284210a..4334d008 100644 --- a/core/controllers/task_v2.go +++ b/core/controllers/task_v2.go @@ -5,7 +5,7 @@ import ( log2 "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/interfaces" - models2 "github.com/crawlab-team/crawlab/core/models/models/v2" + "github.com/crawlab-team/crawlab/core/models/models/v2" "github.com/crawlab-team/crawlab/core/models/service" "github.com/crawlab-team/crawlab/core/result" "github.com/crawlab-team/crawlab/core/spider/admin" @@ -34,7 +34,7 @@ func GetTaskById(c *gin.Context) { } // task - t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id) + t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id) if errors.Is(err, mongo2.ErrNoDocuments) { HandleErrorNotFound(c, err) return @@ -45,7 +45,7 @@ func GetTaskById(c *gin.Context) { } // spider - t.Spider, _ = service.NewModelServiceV2[models2.SpiderV2]().GetById(t.SpiderId) + t.Spider, _ = service.NewModelServiceV2[models.SpiderV2]().GetById(t.SpiderId) // skip if task status is pending if t.Status == constants.TaskStatusPending { @@ -54,7 +54,7 @@ func GetTaskById(c *gin.Context) { } // task stat - t.Stat, _ = service.NewModelServiceV2[models2.TaskStatV2]().GetById(id) + t.Stat, _ = service.NewModelServiceV2[models.TaskStatV2]().GetById(id) HandleSuccessWithData(c, t) } @@ -62,7 +62,7 @@ func GetTaskById(c *gin.Context) { func GetTaskList(c *gin.Context) { withStats := c.Query("stats") if withStats == "" { - NewControllerV2[models2.TaskV2]().GetList(c) + NewControllerV2[models.TaskV2]().GetList(c) return } @@ -72,7 +72,7 @@ func GetTaskList(c *gin.Context) { sort := MustGetSortOption(c) // get tasks - tasks, err := service.NewModelServiceV2[models2.TaskV2]().GetMany(query, &mongo.FindOptions{ + tasks, err := service.NewModelServiceV2[models.TaskV2]().GetMany(query, &mongo.FindOptions{ Sort: sort, Skip: pagination.Size * (pagination.Page - 1), Limit: pagination.Size, @@ -101,14 +101,14 @@ func GetTaskList(c *gin.Context) { } // total count - total, err := service.NewModelServiceV2[models2.TaskV2]().Count(query) + total, err := service.NewModelServiceV2[models.TaskV2]().Count(query) if err != nil { HandleErrorInternalServerError(c, err) return } // stat list - stats, err := service.NewModelServiceV2[models2.TaskStatV2]().GetMany(bson.M{ + stats, err := service.NewModelServiceV2[models.TaskStatV2]().GetMany(bson.M{ "_id": bson.M{ "$in": taskIds, }, @@ -119,13 +119,13 @@ func GetTaskList(c *gin.Context) { } // cache stat list to dict - statsDict := map[primitive.ObjectID]models2.TaskStatV2{} + statsDict := map[primitive.ObjectID]models.TaskStatV2{} for _, s := range stats { statsDict[s.Id] = s } // spider list - spiders, err := service.NewModelServiceV2[models2.SpiderV2]().GetMany(bson.M{ + spiders, err := service.NewModelServiceV2[models.SpiderV2]().GetMany(bson.M{ "_id": bson.M{ "$in": spiderIds, }, @@ -136,7 +136,7 @@ func GetTaskList(c *gin.Context) { } // cache spider list to dict - spiderDict := map[primitive.ObjectID]models2.SpiderV2{} + spiderDict := map[primitive.ObjectID]models.SpiderV2{} for _, s := range spiders { spiderDict[s.Id] = s } @@ -170,22 +170,22 @@ func DeleteTaskById(c *gin.Context) { // delete in db if err := mongo.RunTransaction(func(context mongo2.SessionContext) (err error) { // delete task - _, err = service.NewModelServiceV2[models2.TaskV2]().GetById(id) + _, err = service.NewModelServiceV2[models.TaskV2]().GetById(id) if err != nil { return err } - err = service.NewModelServiceV2[models2.TaskV2]().DeleteById(id) + err = service.NewModelServiceV2[models.TaskV2]().DeleteById(id) if err != nil { return err } // delete task stat - _, err = service.NewModelServiceV2[models2.TaskStatV2]().GetById(id) + _, err = service.NewModelServiceV2[models.TaskStatV2]().GetById(id) if err != nil { log2.Warnf("delete task stat error: %s", err.Error()) return nil } - err = service.NewModelServiceV2[models2.TaskStatV2]().DeleteById(id) + err = service.NewModelServiceV2[models.TaskStatV2]().DeleteById(id) if err != nil { log2.Warnf("delete task stat error: %s", err.Error()) return nil @@ -217,7 +217,7 @@ func DeleteList(c *gin.Context) { if err := mongo.RunTransaction(func(context mongo2.SessionContext) error { // delete tasks - if err := service.NewModelServiceV2[models2.TaskV2]().DeleteMany(bson.M{ + if err := service.NewModelServiceV2[models.TaskV2]().DeleteMany(bson.M{ "_id": bson.M{ "$in": payload.Ids, }, @@ -226,7 +226,7 @@ func DeleteList(c *gin.Context) { } // delete task stats - if err := service.NewModelServiceV2[models2.TaskV2]().DeleteMany(bson.M{ + if err := service.NewModelServiceV2[models.TaskV2]().DeleteMany(bson.M{ "_id": bson.M{ "$in": payload.Ids, }, @@ -261,7 +261,7 @@ func DeleteList(c *gin.Context) { func PostTaskRun(c *gin.Context) { // task - var t models2.TaskV2 + var t models.TaskV2 if err := c.ShouldBindJSON(&t); err != nil { HandleErrorBadRequest(c, err) return @@ -274,7 +274,7 @@ func PostTaskRun(c *gin.Context) { } // spider - s, err := service.NewModelServiceV2[models2.SpiderV2]().GetById(t.SpiderId) + s, err := service.NewModelServiceV2[models.SpiderV2]().GetById(t.SpiderId) if err != nil { HandleErrorInternalServerError(c, err) return @@ -319,7 +319,7 @@ func PostTaskRestart(c *gin.Context) { } // task - t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id) + t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id) if err != nil { HandleErrorInternalServerError(c, err) return @@ -363,7 +363,7 @@ func PostTaskCancel(c *gin.Context) { } // task - t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id) + t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id) if err != nil { HandleErrorInternalServerError(c, err) return @@ -446,7 +446,7 @@ func GetTaskData(c *gin.Context) { } // task - t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id) + t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id) if err != nil { HandleErrorInternalServerError(c, err) return diff --git a/core/entity/database.go b/core/database/entity/database.go similarity index 100% rename from core/entity/database.go rename to core/database/entity/database.go diff --git a/core/database/interfaces/database_registry_service.go b/core/database/interfaces/database_registry_service.go new file mode 100644 index 00000000..5fc66682 --- /dev/null +++ b/core/database/interfaces/database_registry_service.go @@ -0,0 +1,11 @@ +package interfaces + +import ( + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type DatabaseRegistryService interface { + Start() + CheckStatus() + GetDatabaseService(id primitive.ObjectID) (res DatabaseService, err error) +} diff --git a/core/database/interfaces/database_service.go b/core/database/interfaces/database_service.go new file mode 100644 index 00000000..621829c2 --- /dev/null +++ b/core/database/interfaces/database_service.go @@ -0,0 +1,27 @@ +package interfaces + +import ( + "github.com/crawlab-team/crawlab/core/database/entity" + "github.com/crawlab-team/crawlab/core/models/models/v2" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type DatabaseService interface { + TestConnection(id primitive.ObjectID) (err error) + GetMetadata(id primitive.ObjectID) (m *entity.DatabaseMetadata, err error) + GetMetadataAllDb(id primitive.ObjectID) (m *entity.DatabaseMetadata, err error) + CreateDatabase(id primitive.ObjectID, databaseName string) (err error) + DropDatabase(id primitive.ObjectID, databaseName string) (err error) + GetTableMetadata(id primitive.ObjectID, databaseName, tableName string) (table *entity.DatabaseTable, err error) + CreateTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error) + ModifyTable(id primitive.ObjectID, databaseName string, table *entity.DatabaseTable) (err error) + DropTable(id primitive.ObjectID, databaseName, tableName string) (err error) + RenameTable(id primitive.ObjectID, databaseName, oldTableName, newTableName string) (err error) + GetColumnTypes(query string) (types []string) + ReadRows(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}, skip, limit int) ([]map[string]interface{}, int64, error) + CreateRow(id primitive.ObjectID, databaseName, tableName string, row map[string]interface{}) error + UpdateRow(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}, update map[string]interface{}) error + DeleteRow(id primitive.ObjectID, databaseName, tableName string, filter map[string]interface{}) error + Query(id primitive.ObjectID, databaseName, query string) (results *entity.DatabaseQueryResults, err error) + GetCurrentMetric(id primitive.ObjectID) (m *models.DatabaseMetricV2, err error) +} diff --git a/core/database/registry_service.go b/core/database/registry_service.go new file mode 100644 index 00000000..e05b7809 --- /dev/null +++ b/core/database/registry_service.go @@ -0,0 +1,15 @@ +package database + +import ( + "github.com/crawlab-team/crawlab/core/database/interfaces" +) + +var serviceInstance interfaces.DatabaseRegistryService + +func SetDatabaseRegistryService(svc interfaces.DatabaseRegistryService) { + serviceInstance = svc +} + +func GetDatabaseRegistryService() interfaces.DatabaseRegistryService { + return serviceInstance +} diff --git a/core/grpc/server/task_server_v2.go b/core/grpc/server/task_server_v2.go index 3ae0ddb0..050a5cd1 100644 --- a/core/grpc/server/task_server_v2.go +++ b/core/grpc/server/task_server_v2.go @@ -214,7 +214,7 @@ func (svr TaskServerV2) handleInsertData(msg *grpc.StreamMessage) (err error) { if err != nil { return err } - var records []interface{} + var records []map[string]interface{} for _, d := range data.Records { res, ok := d[constants.TaskKey] if ok { diff --git a/core/models/models/v2/spider_v2.go b/core/models/models/v2/spider_v2.go index 5e91baf4..b743ede2 100644 --- a/core/models/models/v2/spider_v2.go +++ b/core/models/models/v2/spider_v2.go @@ -8,8 +8,8 @@ type SpiderV2 struct { any `collection:"spiders"` BaseModelV2[SpiderV2] `bson:",inline"` Name string `json:"name" bson:"name"` // spider name - ColId primitive.ObjectID `json:"col_id" bson:"col_id"` // data collection id - ColName string `json:"col_name,omitempty" bson:"-"` // data collection name + ColId primitive.ObjectID `json:"col_id" bson:"col_id"` // data collection id (deprecated) # TODO: remove this field in the future + ColName string `json:"col_name,omitempty" bson:"col_name"` // data collection name DataSourceId primitive.ObjectID `json:"data_source_id" bson:"data_source_id"` // data source id DataSource *DatabaseV2 `json:"data_source,omitempty" bson:"-"` // data source Description string `json:"description" bson:"description"` // description diff --git a/core/result/service.go b/core/result/service.go index d1e94685..5497f3a6 100644 --- a/core/result/service.go +++ b/core/result/service.go @@ -38,7 +38,7 @@ func NewResultService(registryKey string, s *models.Spider) (svc2 interfaces.Res var store = sync.Map{} -func GetResultService(spiderId primitive.ObjectID, opts ...Option) (svc2 interfaces.ResultService, err error) { +func GetResultService(spiderId primitive.ObjectID) (svc2 interfaces.ResultService, err error) { // model service modelSvc, err := service.GetService() if err != nil { @@ -51,12 +51,6 @@ func GetResultService(spiderId primitive.ObjectID, opts ...Option) (svc2 interfa return nil, trace.TraceError(err) } - // apply options - _opts := &Options{} - for _, opt := range opts { - opt(_opts) - } - // store key storeKey := s.ColId.Hex() + ":" + s.DataSourceId.Hex() diff --git a/core/result/test/base.go b/core/result/test/base.go deleted file mode 100644 index a224fa29..00000000 --- a/core/result/test/base.go +++ /dev/null @@ -1,76 +0,0 @@ -package test - -import ( - "github.com/crawlab-team/crawlab/core/interfaces" - "github.com/crawlab-team/crawlab/core/models/delegate" - "github.com/crawlab-team/crawlab/core/models/models" - "github.com/crawlab-team/crawlab/core/models/service" - "github.com/crawlab-team/crawlab/core/result" - "github.com/crawlab-team/crawlab/db/mongo" - "go.uber.org/dig" - "testing" -) - -func init() { - T = NewTest() -} - -var T *Test - -type Test struct { - // dependencies - modelSvc service.ModelService - resultSvc interfaces.ResultService - - // test data - TestColName string - TestCol *mongo.Col - TestDc *models.DataCollection -} - -func (t *Test) Setup(t2 *testing.T) { - t2.Cleanup(t.Cleanup) -} - -func (t *Test) Cleanup() { - _ = t.modelSvc.DropAll() -} - -func NewTest() *Test { - var err error - - // test - t := &Test{ - TestColName: "test_results", - } - - // dependency injection - c := dig.New() - if err := c.Provide(service.NewService); err != nil { - panic(err) - } - if err := c.Invoke(func( - modelSvc service.ModelService, - ) { - t.modelSvc = modelSvc - }); err != nil { - panic(err) - } - - // data collection - t.TestDc = &models.DataCollection{ - Name: t.TestColName, - } - if err := delegate.NewModelDelegate(t.TestDc).Add(); err != nil { - panic(err) - } - t.TestCol = mongo.GetMongoCol(t.TestColName) - - // result service - t.resultSvc, err = result.GetResultService(t.TestDc.GetId()) - if err != nil { - panic(err) - } - - return t -} diff --git a/core/result/test/service_test.go b/core/result/test/service_test.go deleted file mode 100644 index b44faf4e..00000000 --- a/core/result/test/service_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package test - -import ( - "github.com/crawlab-team/crawlab/core/models/models" - "github.com/stretchr/testify/require" - "testing" -) - -func TestResultService_GetList(t *testing.T) { - var err error - T.Setup(t) - - n := 1000 - var docs []interface{} - for i := 0; i < n; i++ { - d := &models.Result{ - "i": i, - } - docs = append(docs, d) - } - _, err = T.TestCol.InsertMany(docs) - require.Nil(t, err) - - // get all - results, err := T.resultSvc.List(nil, nil) - require.Nil(t, err) - require.Equal(t, n, len(results)) - - //query := bson.M{ - // "i": bson.M{ - // "$lt": n / 2, - // }, - //} - //results, err = T.resultSvc.List(query, nil) - //require.Nil(t, err) - //require.Equal(t, n/2, len(results)) -} - -func TestResultService_Count(t *testing.T) { - var err error - T.Setup(t) - - n := 1000 - var docs []interface{} - for i := 0; i < n; i++ { - d := &models.Result{ - "i": i, - } - docs = append(docs, d) - } - _, err = T.TestCol.InsertMany(docs) - require.Nil(t, err) - - // get all - total, err := T.resultSvc.Count(nil) - require.Nil(t, err) - require.Equal(t, n, total) - - //query := bson.M{ - // "i": bson.M{ - // "$lt": n / 2, - // }, - //} - //total, err = T.resultSvc.Count(query) - //require.Nil(t, err) - //require.Equal(t, n/2, total) -} diff --git a/core/task/stats/service_v2.go b/core/task/stats/service_v2.go index 0637e1e1..d5a7d73a 100644 --- a/core/task/stats/service_v2.go +++ b/core/task/stats/service_v2.go @@ -1,12 +1,16 @@ package stats import ( + log2 "github.com/apex/log" + "github.com/crawlab-team/crawlab/core/database" + interfaces2 "github.com/crawlab-team/crawlab/core/database/interfaces" "github.com/crawlab-team/crawlab/core/interfaces" models2 "github.com/crawlab-team/crawlab/core/models/models/v2" "github.com/crawlab-team/crawlab/core/models/service" nodeconfig "github.com/crawlab-team/crawlab/core/node/config" - "github.com/crawlab-team/crawlab/core/result" "github.com/crawlab-team/crawlab/core/task/log" + "github.com/crawlab-team/crawlab/core/utils" + "github.com/crawlab-team/crawlab/db/mongo" "github.com/crawlab-team/crawlab/trace" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -14,15 +18,23 @@ import ( "time" ) +type databaseServiceItem struct { + taskId primitive.ObjectID + dbId primitive.ObjectID + dbSvc interfaces2.DatabaseService + tableName string + time time.Time +} + type ServiceV2 struct { // dependencies nodeCfgSvc interfaces.NodeConfigService // internals - mu sync.Mutex - resultServices sync.Map - rsTtl time.Duration - logDriver log.Driver + mu sync.Mutex + databaseServiceItems map[string]*databaseServiceItem + databaseServiceTll time.Duration + logDriver log.Driver } func (svc *ServiceV2) Init() (err error) { @@ -30,15 +42,39 @@ func (svc *ServiceV2) Init() (err error) { return nil } -func (svc *ServiceV2) InsertData(id primitive.ObjectID, records ...interface{}) (err error) { - resultSvc, err := svc.getResultService(id) +func (svc *ServiceV2) InsertData(taskId primitive.ObjectID, records ...map[string]interface{}) (err error) { + count := 0 + + item, err := svc.getDatabaseServiceItem(taskId) if err != nil { return err } - if err := resultSvc.Insert(records...); err != nil { - return err + dbId := item.dbId + dbSvc := item.dbSvc + tableName := item.tableName + if utils.IsPro() && dbSvc != nil { + for _, record := range records { + if err := dbSvc.CreateRow(dbId, "", tableName, record); err != nil { + log2.Errorf("failed to insert data: %v", err) + continue + } + count++ + } + } else { + var records2 []interface{} + for _, record := range records { + records2 = append(records2, record) + } + _, err = mongo.GetMongoCol(tableName).InsertMany(records2) + if err != nil { + log2.Errorf("failed to insert data: %v", err) + return err + } + count = len(records) } - go svc.updateTaskStats(id, len(records)) + + go svc.updateTaskStats(taskId, count) + return nil } @@ -46,38 +82,52 @@ func (svc *ServiceV2) InsertLogs(id primitive.ObjectID, logs ...string) (err err return svc.logDriver.WriteLines(id.Hex(), logs) } -func (svc *ServiceV2) getResultService(id primitive.ObjectID) (resultSvc interfaces.ResultService, err error) { +func (svc *ServiceV2) getDatabaseServiceItem(taskId primitive.ObjectID) (item *databaseServiceItem, err error) { // atomic operation svc.mu.Lock() defer svc.mu.Unlock() // attempt to get from cache - res, _ := svc.resultServices.Load(id.Hex()) - if res != nil { + item, ok := svc.databaseServiceItems[taskId.Hex()] + if ok { // hit in cache - resultSvc, ok := res.(interfaces.ResultService) - resultSvc.SetTime(time.Now()) - if ok { - return resultSvc, nil - } + item.time = time.Now() + return item, nil } // task - t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id) + t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(taskId) if err != nil { return nil, err } - // result service - resultSvc, err = result.GetResultService(t.SpiderId) + // spider + s, err := service.NewModelServiceV2[models2.SpiderV2]().GetById(t.SpiderId) if err != nil { return nil, err } + // database service + var dbSvc interfaces2.DatabaseService + if utils.IsPro() { + if dbRegSvc := database.GetDatabaseRegistryService(); dbRegSvc != nil { + dbSvc, err = dbRegSvc.GetDatabaseService(s.DataSourceId) + if err != nil { + return nil, err + } + } + } + // store in cache - svc.resultServices.Store(id.Hex(), resultSvc) + svc.databaseServiceItems[taskId.Hex()] = &databaseServiceItem{ + taskId: taskId, + dbId: s.DataSourceId, + dbSvc: dbSvc, + tableName: s.ColName, + time: time.Now(), + } - return resultSvc, nil + return item, nil } func (svc *ServiceV2) updateTaskStats(id primitive.ObjectID, resultCount int) { @@ -96,13 +146,11 @@ func (svc *ServiceV2) cleanup() { // atomic operation svc.mu.Lock() - svc.resultServices.Range(func(key, value interface{}) bool { - rs := value.(interfaces.ResultService) - if time.Now().After(rs.GetTime().Add(svc.rsTtl)) { - svc.resultServices.Delete(key) + for k, v := range svc.databaseServiceItems { + if time.Now().After(v.time.Add(svc.databaseServiceTll)) { + delete(svc.databaseServiceItems, k) } - return true - }) + } svc.mu.Unlock() @@ -113,8 +161,9 @@ func (svc *ServiceV2) cleanup() { func NewTaskStatsServiceV2() (svc2 *ServiceV2, err error) { // service svc := &ServiceV2{ - mu: sync.Mutex{}, - resultServices: sync.Map{}, + mu: sync.Mutex{}, + databaseServiceItems: map[string]*databaseServiceItem{}, + databaseServiceTll: 10 * time.Minute, } svc.nodeCfgSvc = nodeconfig.GetNodeConfigService()