refactor: updated index related code

This commit is contained in:
Marvin Zhang
2024-10-29 13:18:57 +08:00
parent abaaa9b7f7
commit 7cabe4b6ac
5 changed files with 220 additions and 284 deletions

View File

@@ -1,282 +0,0 @@
package common
import (
"fmt"
"github.com/apex/log"
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/db/mongo"
"go.mongodb.org/mongo-driver/bson"
mongo2 "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func CreateIndexesV2() {
// Helper function to recreate indexes only if different
recreateIndexes := func(col *mongo.Col, desiredIndexes []mongo2.IndexModel) {
cur, err := col.GetCollection().Indexes().List(col.GetContext())
if err != nil {
log.Errorf("error listing indexes: %v", err)
return
}
var existingIndexes []bson.M
err = cur.All(col.GetContext(), &existingIndexes)
if err != nil {
log.Errorf("error listing indexes: %v", err)
return
}
// Compare and recreate only if different
needsUpdate := false
existingKeys := make(map[string]bool)
// Skip _id index when comparing
for _, idx := range existingIndexes {
if name, ok := idx["name"].(string); ok && name != "_id_" {
key := idx["key"].(bson.M)
keyStr := fmt.Sprintf("%v", key)
existingKeys[keyStr] = true
}
}
// Check if desired indexes exist
for _, idx := range desiredIndexes {
keyStr := fmt.Sprintf("%v", idx.Keys)
if !existingKeys[keyStr] {
needsUpdate = true
break
}
}
if needsUpdate {
// Drop all existing indexes (except _id)
err := col.DeleteAllIndexes()
if err != nil {
log.Errorf("error dropping indexes: %v", err)
}
// Create new indexes
col.MustCreateIndexes(desiredIndexes)
log.Infof("recreated indexes for collection: %s", col.GetCollection().Name())
}
}
// nodes
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.NodeV2{})), []mongo2.IndexModel{
{Keys: bson.M{"key": 1}}, // key
{Keys: bson.M{"name": 1}}, // name
{Keys: bson.M{"is_master": 1}}, // is_master
{Keys: bson.M{"status": 1}}, // status
{Keys: bson.M{"enabled": 1}}, // enabled
{Keys: bson.M{"active": 1}}, // active
})
// projects
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.ProjectV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
})
// spiders
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.SpiderV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
{Keys: bson.M{"type": 1}},
{Keys: bson.M{"col_id": 1}},
{Keys: bson.M{"project_id": 1}},
})
// tasks
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TaskV2{})), []mongo2.IndexModel{
{Keys: bson.M{"spider_id": 1}},
{Keys: bson.M{"status": 1}},
{Keys: bson.M{"node_id": 1}},
{Keys: bson.M{"schedule_id": 1}},
{Keys: bson.M{"type": 1}},
{Keys: bson.M{"mode": 1}},
{Keys: bson.M{"priority": 1}},
{Keys: bson.M{"parent_id": 1}},
{Keys: bson.M{"has_sub": 1}},
{Keys: bson.M{"created_ts": -1}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)},
})
// task stats
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TaskStatV2{})), []mongo2.IndexModel{
{Keys: bson.M{"created_ts": -1}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)},
})
// schedules
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.ScheduleV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
{Keys: bson.M{"spider_id": 1}},
{Keys: bson.M{"enabled": 1}},
})
// users
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.UserV2{})), []mongo2.IndexModel{
{Keys: bson.M{"username": 1}},
{Keys: bson.M{"role": 1}},
{Keys: bson.M{"email": 1}},
})
// settings
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.SettingV2{})), []mongo2.IndexModel{
{Keys: bson.D{{"key", 1}}, Options: options.Index().SetUnique(true)},
})
// tokens
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TokenV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
})
// data sources
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DatabaseV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
})
// data collections
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DataCollectionV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
})
// roles
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.RoleV2{})), []mongo2.IndexModel{
{Keys: bson.D{{"key", 1}}, Options: options.Index().SetUnique(true)},
})
// user role relations
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.UserRoleV2{})), []mongo2.IndexModel{
{Keys: bson.D{{"user_id", 1}, {"role_id", 1}}, Options: options.Index().SetUnique(true)},
{Keys: bson.D{{"role_id", 1}, {"user_id", 1}}, Options: options.Index().SetUnique(true)},
})
// permissions
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.PermissionV2{})), []mongo2.IndexModel{
{Keys: bson.D{{"key", 1}}, Options: options.Index().SetUnique(true)},
})
// role permission relations
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.RolePermissionV2{})), []mongo2.IndexModel{
{Keys: bson.D{{"role_id", 1}, {"permission_id", 1}}, Options: options.Index().SetUnique(true)},
{Keys: bson.D{{"permission_id", 1}, {"role_id", 1}}, Options: options.Index().SetUnique(true)},
})
// dependencies
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DependencyV2{})), []mongo2.IndexModel{
{
Keys: bson.D{
{"type", 1},
{"node_id", 1},
{"name", 1},
},
Options: (&options.IndexOptions{}).SetUnique(true),
},
})
// dependency settings
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DependencySettingV2{})), []mongo2.IndexModel{
{
Keys: bson.D{
{"type", 1},
{"node_id", 1},
{"name", 1},
},
Options: (&options.IndexOptions{}).SetUnique(true),
},
})
// dependency logs
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DependencyLogV2{})), []mongo2.IndexModel{
{
Keys: bson.D{{"task_id", 1}},
},
{
Keys: bson.D{{"update_ts", 1}},
Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24),
},
})
// dependency tasks
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DependencyTaskV2{})), []mongo2.IndexModel{
{
Keys: bson.D{
{"update_ts", 1},
},
Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24),
},
})
// metrics
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.MetricV2{})), []mongo2.IndexModel{
{
Keys: bson.D{
{"created_ts", -1},
},
Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30),
},
{
Keys: bson.D{
{"node_id", 1},
},
},
{
Keys: bson.D{
{"type", 1},
},
},
})
// notification requests
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.NotificationRequestV2{})), []mongo2.IndexModel{
{
Keys: bson.D{
{"created_ts", -1},
},
Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 7),
},
{
Keys: bson.D{
{"channel_id", 1},
},
},
{
Keys: bson.D{
{"setting_id", 1},
},
},
{
Keys: bson.D{
{"status", 1},
},
},
})
// databases
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DatabaseV2{})), []mongo2.IndexModel{
{
Keys: bson.D{
{"data_source_id", 1},
},
},
})
// database metrics
recreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DatabaseMetricV2{})), []mongo2.IndexModel{
{
Keys: bson.D{
{"created_ts", -1},
},
Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30),
},
{
Keys: bson.D{
{"database_id", 1},
},
},
{
Keys: bson.D{
{"type", 1},
},
},
})
}

View File

@@ -0,0 +1,59 @@
package common
import (
"fmt"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/db/mongo"
"go.mongodb.org/mongo-driver/bson"
mongo2 "go.mongodb.org/mongo-driver/mongo"
)
func RecreateIndexes(col *mongo.Col, desiredIndexes []mongo2.IndexModel) {
cur, err := col.GetCollection().Indexes().List(col.GetContext())
if err != nil {
log.Errorf("error listing indexes: %v", err)
return
}
var existingIndexes []bson.M
err = cur.All(col.GetContext(), &existingIndexes)
if err != nil {
log.Errorf("error listing indexes: %v", err)
return
}
// Compare and recreate only if different
needsUpdate := false
existingKeys := make(map[string]bool)
// Skip _id index when comparing
for _, idx := range existingIndexes {
if name, ok := idx["name"].(string); ok && name != "_id_" {
key := idx["key"].(bson.M)
keyStr := fmt.Sprintf("%v", key)
existingKeys[keyStr] = true
}
}
// Check if desired indexes exist
for _, idx := range desiredIndexes {
keyStr := fmt.Sprintf("%v", idx.Keys)
if !existingKeys[keyStr] {
needsUpdate = true
break
}
}
if needsUpdate {
// Drop all existing indexes (except _id)
err := col.DeleteAllIndexes()
if err != nil {
log.Errorf("error dropping indexes: %v", err)
}
// Create new indexes
col.MustCreateIndexes(desiredIndexes)
log.Infof("recreated indexes for collection: %s", col.GetCollection().Name())
}
}

View File

@@ -0,0 +1,77 @@
package common
import (
"fmt"
"testing"
"github.com/crawlab-team/crawlab/db/mongo"
"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/bson"
mongo2 "go.mongodb.org/mongo-driver/mongo"
)
func TestRecreateIndexes(t *testing.T) {
// Setup test collection
testCol := mongo.GetMongoCol("test_collection")
defer func() {
_ = testCol.GetCollection().Drop(testCol.GetContext())
}()
// Test cases
tests := []struct {
name string
desiredIndexes []mongo2.IndexModel
expectedCount int64
}{
{
name: "Create new indexes",
desiredIndexes: []mongo2.IndexModel{
{Keys: bson.M{"field1": 1}},
{Keys: bson.M{"field2": -1}},
},
expectedCount: 3, // Including _id index
},
{
name: "Update existing indexes",
desiredIndexes: []mongo2.IndexModel{
{Keys: bson.M{"field1": 1}},
{Keys: bson.M{"field3": 1}},
},
expectedCount: 3, // Including _id index
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Execute RecreateIndexes
RecreateIndexes(testCol, tt.desiredIndexes)
// Verify indexes
cur, err := testCol.GetCollection().Indexes().List(testCol.GetContext())
assert.NoError(t, err)
var indexes []bson.M
err = cur.All(testCol.GetContext(), &indexes)
assert.NoError(t, err)
// Check total number of indexes (including _id)
assert.Equal(t, tt.expectedCount, int64(len(indexes)))
// Verify each desired index exists
for _, desiredIdx := range tt.desiredIndexes {
found := false
desiredKeyStr := fmt.Sprintf("%v", desiredIdx.Keys)
for _, existingIdx := range indexes {
if existingIdx["name"].(string) != "_id_" {
key := existingIdx["key"].(bson.M)
if fmt.Sprintf("%v", key) == desiredKeyStr {
found = true
break
}
}
}
assert.True(t, found, "Index not found: %v", desiredIdx.Keys)
}
})
}
}

View File

@@ -0,0 +1,83 @@
package common
import (
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/db/mongo"
"go.mongodb.org/mongo-driver/bson"
mongo2 "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func InitIndexes() {
// nodes
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.NodeV2{})), []mongo2.IndexModel{
{Keys: bson.M{"key": 1}}, // key
{Keys: bson.M{"name": 1}}, // name
{Keys: bson.M{"is_master": 1}}, // is_master
{Keys: bson.M{"status": 1}}, // status
{Keys: bson.M{"enabled": 1}}, // enabled
{Keys: bson.M{"active": 1}}, // active
})
// projects
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.ProjectV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
})
// spiders
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.SpiderV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
{Keys: bson.M{"type": 1}},
{Keys: bson.M{"col_id": 1}},
{Keys: bson.M{"project_id": 1}},
})
// tasks
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TaskV2{})), []mongo2.IndexModel{
{Keys: bson.M{"spider_id": 1}},
{Keys: bson.M{"status": 1}},
{Keys: bson.M{"node_id": 1}},
{Keys: bson.M{"schedule_id": 1}},
{Keys: bson.M{"type": 1}},
{Keys: bson.M{"mode": 1}},
{Keys: bson.M{"priority": 1}},
{Keys: bson.M{"parent_id": 1}},
{Keys: bson.M{"has_sub": 1}},
{Keys: bson.M{"created_ts": -1}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)},
})
// task stats
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TaskStatV2{})), []mongo2.IndexModel{
{Keys: bson.M{"created_ts": -1}, Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30)},
})
// schedules
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.ScheduleV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
{Keys: bson.M{"spider_id": 1}},
{Keys: bson.M{"enabled": 1}},
})
// users
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.UserV2{})), []mongo2.IndexModel{
{Keys: bson.M{"username": 1}},
{Keys: bson.M{"role": 1}},
{Keys: bson.M{"email": 1}},
})
// settings
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.SettingV2{})), []mongo2.IndexModel{
{Keys: bson.M{"key": 1}, Options: options.Index().SetUnique(true)},
})
// tokens
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.TokenV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
})
// data collections
RecreateIndexes(mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DataCollectionV2{})), []mongo2.IndexModel{
{Keys: bson.M{"name": 1}},
})
}

View File

@@ -50,7 +50,7 @@ func (svc *MasterServiceV2) Init() (err error) {
func (svc *MasterServiceV2) Start() {
// create indexes
common.CreateIndexesV2()
common.InitIndexes()
// start grpc server
if err := svc.server.Start(); err != nil {
@@ -390,5 +390,4 @@ func GetMasterServiceV2() (res *MasterServiceV2, err error) {
}
})
return masterServiceV2, err
}