From 7cabe4b6ac6ea0b42b2f1429e1a6fbc0a29bf49f Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 29 Oct 2024 13:18:57 +0800 Subject: [PATCH] refactor: updated index related code --- core/models/common/index_service_v2.go | 282 ------------------------- core/models/common/index_utils.go | 59 ++++++ core/models/common/index_utils_test.go | 77 +++++++ core/models/common/init.go | 83 ++++++++ core/node/service/master_service_v2.go | 3 +- 5 files changed, 220 insertions(+), 284 deletions(-) delete mode 100644 core/models/common/index_service_v2.go create mode 100644 core/models/common/index_utils.go create mode 100644 core/models/common/index_utils_test.go create mode 100644 core/models/common/init.go diff --git a/core/models/common/index_service_v2.go b/core/models/common/index_service_v2.go deleted file mode 100644 index 6c9eace3..00000000 --- a/core/models/common/index_service_v2.go +++ /dev/null @@ -1,282 +0,0 @@ -package common - -import ( - "fmt" - - "github.com/apex/log" - models2 "github.com/crawlab-team/crawlab/core/models/models/v2" - "github.com/crawlab-team/crawlab/core/models/service" - "github.com/crawlab-team/crawlab/db/mongo" - "go.mongodb.org/mongo-driver/bson" - mongo2 "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -func CreateIndexesV2() { - // Helper function to recreate indexes only if different - recreateIndexes := func(col *mongo.Col, desiredIndexes []mongo2.IndexModel) { - cur, err := col.GetCollection().Indexes().List(col.GetContext()) - if err != nil { - log.Errorf("error listing indexes: %v", err) - return - } - - var existingIndexes []bson.M - err = cur.All(col.GetContext(), &existingIndexes) - if err != nil { - log.Errorf("error listing indexes: %v", err) - return - } - - // Compare and recreate only if different - needsUpdate := false - existingKeys := make(map[string]bool) - - // Skip _id index when comparing - for _, idx := range existingIndexes { - if name, ok := idx["name"].(string); ok && name != "_id_" { - key := idx["key"].(bson.M) - keyStr := fmt.Sprintf("%v", key) - existingKeys[keyStr] = true - } - } - - // Check if desired indexes exist - for _, idx := range desiredIndexes { - keyStr := fmt.Sprintf("%v", idx.Keys) - if !existingKeys[keyStr] { - needsUpdate = true - break - } - } - - if needsUpdate { - // Drop all existing indexes (except _id) - err := col.DeleteAllIndexes() - if err != nil { - log.Errorf("error dropping indexes: %v", err) - } - - // Create new indexes - col.MustCreateIndexes(desiredIndexes) - log.Infof("recreated indexes for collection: %s", col.GetCollection().Name()) - } - } - - // nodes - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.NodeV2{})), []mongo2.IndexModel{ - {Keys: bson.M{"key": 1}}, // key - {Keys: bson.M{"name": 1}}, // name - {Keys: bson.M{"is_master": 1}}, // is_master - {Keys: bson.M{"status": 1}}, // status - {Keys: bson.M{"enabled": 1}}, // enabled - {Keys: bson.M{"active": 1}}, // active - }) - - // projects - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.ProjectV2{})), []mongo2.IndexModel{ - {Keys: bson.M{"name": 1}}, - }) - - // spiders - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.SpiderV2{})), []mongo2.IndexModel{ - {Keys: bson.M{"name": 1}}, - {Keys: bson.M{"type": 1}}, - {Keys: bson.M{"col_id": 1}}, - {Keys: bson.M{"project_id": 1}}, - }) - - // tasks - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TaskV2{})), []mongo2.IndexModel{ - {Keys: bson.M{"spider_id": 1}}, - {Keys: bson.M{"status": 1}}, - {Keys: bson.M{"node_id": 1}}, - {Keys: bson.M{"schedule_id": 1}}, - {Keys: bson.M{"type": 1}}, - {Keys: bson.M{"mode": 1}}, - {Keys: bson.M{"priority": 1}}, - {Keys: bson.M{"parent_id": 1}}, - {Keys: bson.M{"has_sub": 1}}, - {Keys: bson.M{"created_ts": -1}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)}, - }) - - // task stats - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TaskStatV2{})), []mongo2.IndexModel{ - {Keys: bson.M{"created_ts": -1}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)}, - }) - - // schedules - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.ScheduleV2{})), []mongo2.IndexModel{ - {Keys: bson.M{"name": 1}}, - {Keys: bson.M{"spider_id": 1}}, - {Keys: bson.M{"enabled": 1}}, - }) - - // users - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.UserV2{})), []mongo2.IndexModel{ - {Keys: bson.M{"username": 1}}, - {Keys: bson.M{"role": 1}}, - {Keys: bson.M{"email": 1}}, - }) - - // settings - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.SettingV2{})), []mongo2.IndexModel{ - {Keys: bson.D{{"key", 1}}, Options: options.Index().SetUnique(true)}, - }) - - // tokens - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TokenV2{})), []mongo2.IndexModel{ - {Keys: bson.M{"name": 1}}, - }) - - // data sources - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DatabaseV2{})), []mongo2.IndexModel{ - {Keys: bson.M{"name": 1}}, - }) - - // data collections - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DataCollectionV2{})), []mongo2.IndexModel{ - {Keys: bson.M{"name": 1}}, - }) - - // roles - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.RoleV2{})), []mongo2.IndexModel{ - {Keys: bson.D{{"key", 1}}, Options: options.Index().SetUnique(true)}, - }) - - // user role relations - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.UserRoleV2{})), []mongo2.IndexModel{ - {Keys: bson.D{{"user_id", 1}, {"role_id", 1}}, Options: options.Index().SetUnique(true)}, - {Keys: bson.D{{"role_id", 1}, {"user_id", 1}}, Options: options.Index().SetUnique(true)}, - }) - - // permissions - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.PermissionV2{})), []mongo2.IndexModel{ - {Keys: bson.D{{"key", 1}}, Options: options.Index().SetUnique(true)}, - }) - - // role permission relations - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.RolePermissionV2{})), []mongo2.IndexModel{ - {Keys: bson.D{{"role_id", 1}, {"permission_id", 1}}, Options: options.Index().SetUnique(true)}, - {Keys: bson.D{{"permission_id", 1}, {"role_id", 1}}, Options: options.Index().SetUnique(true)}, - }) - - // dependencies - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DependencyV2{})), []mongo2.IndexModel{ - { - Keys: bson.D{ - {"type", 1}, - {"node_id", 1}, - {"name", 1}, - }, - Options: (&options.IndexOptions{}).SetUnique(true), - }, - }) - - // dependency settings - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DependencySettingV2{})), []mongo2.IndexModel{ - { - Keys: bson.D{ - {"type", 1}, - {"node_id", 1}, - {"name", 1}, - }, - Options: (&options.IndexOptions{}).SetUnique(true), - }, - }) - - // dependency logs - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DependencyLogV2{})), []mongo2.IndexModel{ - { - Keys: bson.D{{"task_id", 1}}, - }, - { - Keys: bson.D{{"update_ts", 1}}, - Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24), - }, - }) - - // dependency tasks - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DependencyTaskV2{})), []mongo2.IndexModel{ - { - Keys: bson.D{ - {"update_ts", 1}, - }, - Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24), - }, - }) - - // metrics - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.MetricV2{})), []mongo2.IndexModel{ - { - Keys: bson.D{ - {"created_ts", -1}, - }, - Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30), - }, - { - Keys: bson.D{ - {"node_id", 1}, - }, - }, - { - Keys: bson.D{ - {"type", 1}, - }, - }, - }) - - // notification requests - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.NotificationRequestV2{})), []mongo2.IndexModel{ - { - Keys: bson.D{ - {"created_ts", -1}, - }, - Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 7), - }, - { - Keys: bson.D{ - {"channel_id", 1}, - }, - }, - { - Keys: bson.D{ - {"setting_id", 1}, - }, - }, - { - Keys: bson.D{ - {"status", 1}, - }, - }, - }) - - // databases - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DatabaseV2{})), []mongo2.IndexModel{ - { - Keys: bson.D{ - {"data_source_id", 1}, - }, - }, - }) - - // database metrics - recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DatabaseMetricV2{})), []mongo2.IndexModel{ - { - Keys: bson.D{ - {"created_ts", -1}, - }, - Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30), - }, - { - Keys: bson.D{ - {"database_id", 1}, - }, - }, - { - Keys: bson.D{ - {"type", 1}, - }, - }, - }) -} diff --git a/core/models/common/index_utils.go b/core/models/common/index_utils.go new file mode 100644 index 00000000..1d54fa1f --- /dev/null +++ b/core/models/common/index_utils.go @@ -0,0 +1,59 @@ +package common + +import ( + "fmt" + + "github.com/apex/log" + "github.com/crawlab-team/crawlab/db/mongo" + "go.mongodb.org/mongo-driver/bson" + mongo2 "go.mongodb.org/mongo-driver/mongo" +) + +func RecreateIndexes(col *mongo.Col, desiredIndexes []mongo2.IndexModel) { + cur, err := col.GetCollection().Indexes().List(col.GetContext()) + if err != nil { + log.Errorf("error listing indexes: %v", err) + return + } + + var existingIndexes []bson.M + err = cur.All(col.GetContext(), &existingIndexes) + if err != nil { + log.Errorf("error listing indexes: %v", err) + return + } + + // Compare and recreate only if different + needsUpdate := false + existingKeys := make(map[string]bool) + + // Skip _id index when comparing + for _, idx := range existingIndexes { + if name, ok := idx["name"].(string); ok && name != "_id_" { + key := idx["key"].(bson.M) + keyStr := fmt.Sprintf("%v", key) + existingKeys[keyStr] = true + } + } + + // Check if desired indexes exist + for _, idx := range desiredIndexes { + keyStr := fmt.Sprintf("%v", idx.Keys) + if !existingKeys[keyStr] { + needsUpdate = true + break + } + } + + if needsUpdate { + // Drop all existing indexes (except _id) + err := col.DeleteAllIndexes() + if err != nil { + log.Errorf("error dropping indexes: %v", err) + } + + // Create new indexes + col.MustCreateIndexes(desiredIndexes) + log.Infof("recreated indexes for collection: %s", col.GetCollection().Name()) + } +} diff --git a/core/models/common/index_utils_test.go b/core/models/common/index_utils_test.go new file mode 100644 index 00000000..e49f68b0 --- /dev/null +++ b/core/models/common/index_utils_test.go @@ -0,0 +1,77 @@ +package common + +import ( + "fmt" + "testing" + + "github.com/crawlab-team/crawlab/db/mongo" + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" + mongo2 "go.mongodb.org/mongo-driver/mongo" +) + +func TestRecreateIndexes(t *testing.T) { + // Setup test collection + testCol := mongo.GetMongoCol("test_collection") + defer func() { + _ = testCol.GetCollection().Drop(testCol.GetContext()) + }() + + // Test cases + tests := []struct { + name string + desiredIndexes []mongo2.IndexModel + expectedCount int64 + }{ + { + name: "Create new indexes", + desiredIndexes: []mongo2.IndexModel{ + {Keys: bson.M{"field1": 1}}, + {Keys: bson.M{"field2": -1}}, + }, + expectedCount: 3, // Including _id index + }, + { + name: "Update existing indexes", + desiredIndexes: []mongo2.IndexModel{ + {Keys: bson.M{"field1": 1}}, + {Keys: bson.M{"field3": 1}}, + }, + expectedCount: 3, // Including _id index + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Execute RecreateIndexes + RecreateIndexes(testCol, tt.desiredIndexes) + + // Verify indexes + cur, err := testCol.GetCollection().Indexes().List(testCol.GetContext()) + assert.NoError(t, err) + + var indexes []bson.M + err = cur.All(testCol.GetContext(), &indexes) + assert.NoError(t, err) + + // Check total number of indexes (including _id) + assert.Equal(t, tt.expectedCount, int64(len(indexes))) + + // Verify each desired index exists + for _, desiredIdx := range tt.desiredIndexes { + found := false + desiredKeyStr := fmt.Sprintf("%v", desiredIdx.Keys) + for _, existingIdx := range indexes { + if existingIdx["name"].(string) != "_id_" { + key := existingIdx["key"].(bson.M) + if fmt.Sprintf("%v", key) == desiredKeyStr { + found = true + break + } + } + } + assert.True(t, found, "Index not found: %v", desiredIdx.Keys) + } + }) + } +} diff --git a/core/models/common/init.go b/core/models/common/init.go new file mode 100644 index 00000000..69fa4348 --- /dev/null +++ b/core/models/common/init.go @@ -0,0 +1,83 @@ +package common + +import ( + models2 "github.com/crawlab-team/crawlab/core/models/models/v2" + "github.com/crawlab-team/crawlab/core/models/service" + "github.com/crawlab-team/crawlab/db/mongo" + "go.mongodb.org/mongo-driver/bson" + mongo2 "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func InitIndexes() { + // nodes + RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.NodeV2{})), []mongo2.IndexModel{ + {Keys: bson.M{"key": 1}}, // key + {Keys: bson.M{"name": 1}}, // name + {Keys: bson.M{"is_master": 1}}, // is_master + {Keys: bson.M{"status": 1}}, // status + {Keys: bson.M{"enabled": 1}}, // enabled + {Keys: bson.M{"active": 1}}, // active + }) + + // projects + RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.ProjectV2{})), []mongo2.IndexModel{ + {Keys: bson.M{"name": 1}}, + }) + + // spiders + RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.SpiderV2{})), []mongo2.IndexModel{ + {Keys: bson.M{"name": 1}}, + {Keys: bson.M{"type": 1}}, + {Keys: bson.M{"col_id": 1}}, + {Keys: bson.M{"project_id": 1}}, + }) + + // tasks + RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TaskV2{})), []mongo2.IndexModel{ + {Keys: bson.M{"spider_id": 1}}, + {Keys: bson.M{"status": 1}}, + {Keys: bson.M{"node_id": 1}}, + {Keys: bson.M{"schedule_id": 1}}, + {Keys: bson.M{"type": 1}}, + {Keys: bson.M{"mode": 1}}, + {Keys: bson.M{"priority": 1}}, + {Keys: bson.M{"parent_id": 1}}, + {Keys: bson.M{"has_sub": 1}}, + {Keys: bson.M{"created_ts": -1}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)}, + }) + + // task stats + RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TaskStatV2{})), []mongo2.IndexModel{ + {Keys: bson.M{"created_ts": -1}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)}, + }) + + // schedules + RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.ScheduleV2{})), []mongo2.IndexModel{ + {Keys: bson.M{"name": 1}}, + {Keys: bson.M{"spider_id": 1}}, + {Keys: bson.M{"enabled": 1}}, + }) + + // users + RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.UserV2{})), []mongo2.IndexModel{ + {Keys: bson.M{"username": 1}}, + {Keys: bson.M{"role": 1}}, + {Keys: bson.M{"email": 1}}, + }) + + // settings + RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.SettingV2{})), []mongo2.IndexModel{ + {Keys: bson.M{"key": 1}, Options: options.Index().SetUnique(true)}, + }) + + // tokens + RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TokenV2{})), []mongo2.IndexModel{ + {Keys: bson.M{"name": 1}}, + }) + + // data collections + RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DataCollectionV2{})), []mongo2.IndexModel{ + {Keys: bson.M{"name": 1}}, + }) +} diff --git a/core/node/service/master_service_v2.go b/core/node/service/master_service_v2.go index 651abaae..610829a8 100644 --- a/core/node/service/master_service_v2.go +++ b/core/node/service/master_service_v2.go @@ -50,7 +50,7 @@ func (svc *MasterServiceV2) Init() (err error) { func (svc *MasterServiceV2) Start() { // create indexes - common.CreateIndexesV2() + common.InitIndexes() // start grpc server if err := svc.server.Start(); err != nil { @@ -390,5 +390,4 @@ func GetMasterServiceV2() (res *MasterServiceV2, err error) { } }) return masterServiceV2, err - }