From e15c3c9b4f96dd0a2006895263ae1450640cba31 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Sat, 5 Oct 2024 14:46:38 +0800 Subject: [PATCH] feat: added performance monitoring for database --- .../server/model_base_service_v2_server.go | 1 + core/models/common/index_service_v2.go | 29 ++++ core/models/models/v2/database_metric_v2.go | 23 +++ core/models/service/base_service_v2.go | 137 +++++++++++++++++- 4 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 core/models/models/v2/database_metric_v2.go diff --git a/core/grpc/server/model_base_service_v2_server.go b/core/grpc/server/model_base_service_v2_server.go index d067e11c..d695d1de 100644 --- a/core/grpc/server/model_base_service_v2_server.go +++ b/core/grpc/server/model_base_service_v2_server.go @@ -19,6 +19,7 @@ var ( *new(models2.TestModelV2), *new(models2.DataCollectionV2), *new(models2.DatabaseV2), + *new(models2.DatabaseMetricV2), *new(models2.DependencyV2), *new(models2.DependencyLogV2), *new(models2.DependencySettingV2), diff --git a/core/models/common/index_service_v2.go b/core/models/common/index_service_v2.go index 20c278f2..2247649f 100644 --- a/core/models/common/index_service_v2.go +++ b/core/models/common/index_service_v2.go @@ -202,4 +202,33 @@ func CreateIndexesV2() { }, }, }) + + // databases + mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DatabaseV2{})).MustCreateIndexes([]mongo2.IndexModel{ + { + Keys: bson.D{ + {"data_source_id", 1}, + }, + }, + }) + + // database metrics + mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DatabaseMetricV2{})).MustCreateIndexes([]mongo2.IndexModel{ + { + Keys: bson.D{ + {"created_ts", -1}, + }, + Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30), + }, + { + Keys: bson.D{ + {"database_id", 1}, + }, + }, + { + Keys: bson.D{ + {"type", 1}, + }, + }, + }) } diff --git a/core/models/models/v2/database_metric_v2.go b/core/models/models/v2/database_metric_v2.go new file mode 100644 index 00000000..147c76d7 --- /dev/null +++ b/core/models/models/v2/database_metric_v2.go @@ -0,0 +1,23 @@ +package models + +import "go.mongodb.org/mongo-driver/bson/primitive" + +type DatabaseMetricV2 struct { + any `collection:"database_metrics"` + BaseModelV2[DatabaseMetricV2] `bson:",inline"` + DatabaseId primitive.ObjectID `json:"database_id" bson:"database_id"` + TotalMemory uint64 `json:"total_memory" bson:"total_memory"` + AvailableMemory uint64 `json:"available_memory" bson:"available_memory"` + UsedMemory uint64 `json:"used_memory" bson:"used_memory"` + UsedMemoryPercent float32 `json:"used_memory_percent" bson:"used_memory_percent"` + TotalDisk uint64 `json:"total_disk" bson:"total_disk"` + AvailableDisk uint64 `json:"available_disk" bson:"available_disk"` + UsedDisk uint64 `json:"used_disk" bson:"used_disk"` + UsedDiskPercent float32 `json:"used_disk_percent" bson:"used_disk_percent"` + Connections int `json:"connections" bson:"connections"` + QueryPerSecond float64 `json:"query_per_second" bson:"query_per_second"` + TotalQuery uint64 `json:"total_query,omitempty" bson:"total_query,omitempty"` + CacheHitRatio float64 `json:"cache_hit_ratio" bson:"cache_hit_ratio"` + ReplicationLag float64 `json:"replication_lag" bson:"replication_lag"` + LockWaitTime float64 `json:"lock_wait_time" bson:"lock_wait_time"` +} diff --git a/core/models/service/base_service_v2.go b/core/models/service/base_service_v2.go index 9ebe4c91..3d03a28a 100644 --- a/core/models/service/base_service_v2.go +++ b/core/models/service/base_service_v2.go @@ -1,13 +1,16 @@ package service import ( + "context" "fmt" + "go.mongodb.org/mongo-driver/mongo/options" + "reflect" + "sync" + "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/db/mongo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" - "reflect" - "sync" ) var ( @@ -30,6 +33,15 @@ func (svc *ModelServiceV2[T]) GetById(id primitive.ObjectID) (model *T, err erro return &result, nil } +func (svc *ModelServiceV2[T]) GetByIdContext(ctx context.Context, id primitive.ObjectID) (model *T, err error) { + var result T + err = svc.col.GetCollection().FindOne(ctx, bson.M{"_id": id}).Decode(&result) + if err != nil { + return nil, err + } + return &result, nil +} + func (svc *ModelServiceV2[T]) GetOne(query bson.M, options *mongo.FindOptions) (model *T, err error) { var result T err = svc.col.Find(query, options).One(&result) @@ -39,6 +51,25 @@ func (svc *ModelServiceV2[T]) GetOne(query bson.M, options *mongo.FindOptions) ( return &result, nil } +func (svc *ModelServiceV2[T]) GetOneContext(ctx context.Context, query bson.M, opts *mongo.FindOptions) (model *T, err error) { + var result T + _opts := &options.FindOneOptions{} + if opts != nil { + if opts.Skip != 0 { + skipInt64 := int64(opts.Skip) + _opts.Skip = &skipInt64 + } + if opts.Sort != nil { + _opts.Sort = opts.Sort + } + } + err = svc.col.GetCollection().FindOne(ctx, query, _opts).Decode(&result) + if err != nil { + return nil, err + } + return &result, nil +} + func (svc *ModelServiceV2[T]) GetMany(query bson.M, options *mongo.FindOptions) (models []T, err error) { var result []T err = svc.col.Find(query, options).All(&result) @@ -48,44 +79,115 @@ func (svc *ModelServiceV2[T]) GetMany(query bson.M, options *mongo.FindOptions) return result, nil } +func (svc *ModelServiceV2[T]) GetManyContext(ctx context.Context, query bson.M, opts *mongo.FindOptions) (models []T, err error) { + var result []T + _opts := &options.FindOptions{} + if opts != nil { + if opts.Skip != 0 { + skipInt64 := int64(opts.Skip) + _opts.Skip = &skipInt64 + } + if opts.Limit != 0 { + limitInt64 := int64(opts.Limit) + _opts.Limit = &limitInt64 + } + if opts.Sort != nil { + _opts.Sort = opts.Sort + } + } + cur, err := svc.col.GetCollection().Find(ctx, query, _opts) + if err != nil { + return nil, err + } + defer cur.Close(ctx) + for cur.Next(ctx) { + var model T + if err := cur.Decode(&model); err != nil { + return nil, err + } + result = append(result, model) + } + return result, nil +} + func (svc *ModelServiceV2[T]) DeleteById(id primitive.ObjectID) (err error) { return svc.col.DeleteId(id) } +func (svc *ModelServiceV2[T]) DeleteByIdContext(ctx context.Context, id primitive.ObjectID) (err error) { + _, err = svc.col.GetCollection().DeleteOne(ctx, bson.M{"_id": id}) + return err +} + func (svc *ModelServiceV2[T]) DeleteOne(query bson.M) (err error) { _, err = svc.col.GetCollection().DeleteOne(svc.col.GetContext(), query) return err } +func (svc *ModelServiceV2[T]) DeleteOneContext(ctx context.Context, query bson.M) (err error) { + _, err = svc.col.GetCollection().DeleteOne(ctx, query) + return err +} + func (svc *ModelServiceV2[T]) DeleteMany(query bson.M) (err error) { _, err = svc.col.GetCollection().DeleteMany(svc.col.GetContext(), query, nil) return err } +func (svc *ModelServiceV2[T]) DeleteManyContext(ctx context.Context, query bson.M) (err error) { + _, err = svc.col.GetCollection().DeleteMany(ctx, query, nil) + return err +} + func (svc *ModelServiceV2[T]) UpdateById(id primitive.ObjectID, update bson.M) (err error) { return svc.col.UpdateId(id, update) } +func (svc *ModelServiceV2[T]) UpdateByIdContext(ctx context.Context, id primitive.ObjectID, update bson.M) (err error) { + _, err = svc.col.GetCollection().UpdateOne(ctx, bson.M{"_id": id}, update) + return err +} + func (svc *ModelServiceV2[T]) UpdateOne(query bson.M, update bson.M) (err error) { _, err = svc.col.GetCollection().UpdateOne(svc.col.GetContext(), query, update) return err } +func (svc *ModelServiceV2[T]) UpdateOneContext(ctx context.Context, query bson.M, update bson.M) (err error) { + _, err = svc.col.GetCollection().UpdateOne(ctx, query, update) + return err +} + func (svc *ModelServiceV2[T]) UpdateMany(query bson.M, update bson.M) (err error) { _, err = svc.col.GetCollection().UpdateMany(svc.col.GetContext(), query, update) return err } +func (svc *ModelServiceV2[T]) UpdateManyContext(ctx context.Context, query bson.M, update bson.M) (err error) { + _, err = svc.col.GetCollection().UpdateMany(ctx, query, update) + return err +} + func (svc *ModelServiceV2[T]) ReplaceById(id primitive.ObjectID, model T) (err error) { _, err = svc.col.GetCollection().ReplaceOne(svc.col.GetContext(), bson.M{"_id": id}, model) return err } +func (svc *ModelServiceV2[T]) ReplaceByIdContext(ctx context.Context, id primitive.ObjectID, model T) (err error) { + _, err = svc.col.GetCollection().ReplaceOne(ctx, bson.M{"_id": id}, model) + return err +} + func (svc *ModelServiceV2[T]) ReplaceOne(query bson.M, model T) (err error) { _, err = svc.col.GetCollection().ReplaceOne(svc.col.GetContext(), query, model) return err } +func (svc *ModelServiceV2[T]) ReplaceOneContext(ctx context.Context, query bson.M, model T) (err error) { + _, err = svc.col.GetCollection().ReplaceOne(ctx, query, model) + return err +} + func (svc *ModelServiceV2[T]) InsertOne(model T) (id primitive.ObjectID, err error) { m := any(&model).(interfaces.Model) if m.GetId().IsZero() { @@ -98,6 +200,18 @@ func (svc *ModelServiceV2[T]) InsertOne(model T) (id primitive.ObjectID, err err return res.InsertedID.(primitive.ObjectID), nil } +func (svc *ModelServiceV2[T]) InsertOneContext(ctx context.Context, model T) (id primitive.ObjectID, err error) { + m := any(&model).(interfaces.Model) + if m.GetId().IsZero() { + m.SetId(primitive.NewObjectID()) + } + res, err := svc.col.GetCollection().InsertOne(ctx, m) + if err != nil { + return primitive.NilObjectID, err + } + return res.InsertedID.(primitive.ObjectID), nil +} + func (svc *ModelServiceV2[T]) InsertMany(models []T) (ids []primitive.ObjectID, err error) { var _models []any for _, model := range models { @@ -117,6 +231,25 @@ func (svc *ModelServiceV2[T]) InsertMany(models []T) (ids []primitive.ObjectID, return ids, nil } +func (svc *ModelServiceV2[T]) InsertManyContext(ctx context.Context, models []T) (ids []primitive.ObjectID, err error) { + var _models []any + for _, model := range models { + m := any(&model).(interfaces.Model) + if m.GetId().IsZero() { + m.SetId(primitive.NewObjectID()) + } + _models = append(_models, m) + } + res, err := svc.col.GetCollection().InsertMany(ctx, _models) + if err != nil { + return nil, err + } + for _, v := range res.InsertedIDs { + ids = append(ids, v.(primitive.ObjectID)) + } + return ids, nil +} + func (svc *ModelServiceV2[T]) Count(query bson.M) (total int, err error) { return svc.col.Count(query) }