mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
feat: added performance monitoring for database
This commit is contained in:
@@ -202,4 +202,33 @@ func CreateIndexesV2() {
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// databases
|
||||
mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DatabaseV2{})).MustCreateIndexes([]mongo2.IndexModel{
|
||||
{
|
||||
Keys: bson.D{
|
||||
{"data_source_id", 1},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// database metrics
|
||||
mongo.GetMongoCol(service.GetCollectionNameByInstance(models2.DatabaseMetricV2{})).MustCreateIndexes([]mongo2.IndexModel{
|
||||
{
|
||||
Keys: bson.D{
|
||||
{"created_ts", -1},
|
||||
},
|
||||
Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30),
|
||||
},
|
||||
{
|
||||
Keys: bson.D{
|
||||
{"database_id", 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
Keys: bson.D{
|
||||
{"type", 1},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
23
core/models/models/v2/database_metric_v2.go
Normal file
23
core/models/models/v2/database_metric_v2.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package models
|
||||
|
||||
import "go.mongodb.org/mongo-driver/bson/primitive"
|
||||
|
||||
type DatabaseMetricV2 struct {
|
||||
any `collection:"database_metrics"`
|
||||
BaseModelV2[DatabaseMetricV2] `bson:",inline"`
|
||||
DatabaseId primitive.ObjectID `json:"database_id" bson:"database_id"`
|
||||
TotalMemory uint64 `json:"total_memory" bson:"total_memory"`
|
||||
AvailableMemory uint64 `json:"available_memory" bson:"available_memory"`
|
||||
UsedMemory uint64 `json:"used_memory" bson:"used_memory"`
|
||||
UsedMemoryPercent float32 `json:"used_memory_percent" bson:"used_memory_percent"`
|
||||
TotalDisk uint64 `json:"total_disk" bson:"total_disk"`
|
||||
AvailableDisk uint64 `json:"available_disk" bson:"available_disk"`
|
||||
UsedDisk uint64 `json:"used_disk" bson:"used_disk"`
|
||||
UsedDiskPercent float32 `json:"used_disk_percent" bson:"used_disk_percent"`
|
||||
Connections int `json:"connections" bson:"connections"`
|
||||
QueryPerSecond float64 `json:"query_per_second" bson:"query_per_second"`
|
||||
TotalQuery uint64 `json:"total_query,omitempty" bson:"total_query,omitempty"`
|
||||
CacheHitRatio float64 `json:"cache_hit_ratio" bson:"cache_hit_ratio"`
|
||||
ReplicationLag float64 `json:"replication_lag" bson:"replication_lag"`
|
||||
LockWaitTime float64 `json:"lock_wait_time" bson:"lock_wait_time"`
|
||||
}
|
||||
@@ -1,13 +1,16 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"github.com/crawlab-team/crawlab/db/mongo"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"reflect"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -30,6 +33,15 @@ func (svc *ModelServiceV2[T]) GetById(id primitive.ObjectID) (model *T, err erro
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) GetByIdContext(ctx context.Context, id primitive.ObjectID) (model *T, err error) {
|
||||
var result T
|
||||
err = svc.col.GetCollection().FindOne(ctx, bson.M{"_id": id}).Decode(&result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) GetOne(query bson.M, options *mongo.FindOptions) (model *T, err error) {
|
||||
var result T
|
||||
err = svc.col.Find(query, options).One(&result)
|
||||
@@ -39,6 +51,25 @@ func (svc *ModelServiceV2[T]) GetOne(query bson.M, options *mongo.FindOptions) (
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) GetOneContext(ctx context.Context, query bson.M, opts *mongo.FindOptions) (model *T, err error) {
|
||||
var result T
|
||||
_opts := &options.FindOneOptions{}
|
||||
if opts != nil {
|
||||
if opts.Skip != 0 {
|
||||
skipInt64 := int64(opts.Skip)
|
||||
_opts.Skip = &skipInt64
|
||||
}
|
||||
if opts.Sort != nil {
|
||||
_opts.Sort = opts.Sort
|
||||
}
|
||||
}
|
||||
err = svc.col.GetCollection().FindOne(ctx, query, _opts).Decode(&result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) GetMany(query bson.M, options *mongo.FindOptions) (models []T, err error) {
|
||||
var result []T
|
||||
err = svc.col.Find(query, options).All(&result)
|
||||
@@ -48,44 +79,115 @@ func (svc *ModelServiceV2[T]) GetMany(query bson.M, options *mongo.FindOptions)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) GetManyContext(ctx context.Context, query bson.M, opts *mongo.FindOptions) (models []T, err error) {
|
||||
var result []T
|
||||
_opts := &options.FindOptions{}
|
||||
if opts != nil {
|
||||
if opts.Skip != 0 {
|
||||
skipInt64 := int64(opts.Skip)
|
||||
_opts.Skip = &skipInt64
|
||||
}
|
||||
if opts.Limit != 0 {
|
||||
limitInt64 := int64(opts.Limit)
|
||||
_opts.Limit = &limitInt64
|
||||
}
|
||||
if opts.Sort != nil {
|
||||
_opts.Sort = opts.Sort
|
||||
}
|
||||
}
|
||||
cur, err := svc.col.GetCollection().Find(ctx, query, _opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cur.Close(ctx)
|
||||
for cur.Next(ctx) {
|
||||
var model T
|
||||
if err := cur.Decode(&model); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, model)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) DeleteById(id primitive.ObjectID) (err error) {
|
||||
return svc.col.DeleteId(id)
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) DeleteByIdContext(ctx context.Context, id primitive.ObjectID) (err error) {
|
||||
_, err = svc.col.GetCollection().DeleteOne(ctx, bson.M{"_id": id})
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) DeleteOne(query bson.M) (err error) {
|
||||
_, err = svc.col.GetCollection().DeleteOne(svc.col.GetContext(), query)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) DeleteOneContext(ctx context.Context, query bson.M) (err error) {
|
||||
_, err = svc.col.GetCollection().DeleteOne(ctx, query)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) DeleteMany(query bson.M) (err error) {
|
||||
_, err = svc.col.GetCollection().DeleteMany(svc.col.GetContext(), query, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) DeleteManyContext(ctx context.Context, query bson.M) (err error) {
|
||||
_, err = svc.col.GetCollection().DeleteMany(ctx, query, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) UpdateById(id primitive.ObjectID, update bson.M) (err error) {
|
||||
return svc.col.UpdateId(id, update)
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) UpdateByIdContext(ctx context.Context, id primitive.ObjectID, update bson.M) (err error) {
|
||||
_, err = svc.col.GetCollection().UpdateOne(ctx, bson.M{"_id": id}, update)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) UpdateOne(query bson.M, update bson.M) (err error) {
|
||||
_, err = svc.col.GetCollection().UpdateOne(svc.col.GetContext(), query, update)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) UpdateOneContext(ctx context.Context, query bson.M, update bson.M) (err error) {
|
||||
_, err = svc.col.GetCollection().UpdateOne(ctx, query, update)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) UpdateMany(query bson.M, update bson.M) (err error) {
|
||||
_, err = svc.col.GetCollection().UpdateMany(svc.col.GetContext(), query, update)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) UpdateManyContext(ctx context.Context, query bson.M, update bson.M) (err error) {
|
||||
_, err = svc.col.GetCollection().UpdateMany(ctx, query, update)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) ReplaceById(id primitive.ObjectID, model T) (err error) {
|
||||
_, err = svc.col.GetCollection().ReplaceOne(svc.col.GetContext(), bson.M{"_id": id}, model)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) ReplaceByIdContext(ctx context.Context, id primitive.ObjectID, model T) (err error) {
|
||||
_, err = svc.col.GetCollection().ReplaceOne(ctx, bson.M{"_id": id}, model)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) ReplaceOne(query bson.M, model T) (err error) {
|
||||
_, err = svc.col.GetCollection().ReplaceOne(svc.col.GetContext(), query, model)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) ReplaceOneContext(ctx context.Context, query bson.M, model T) (err error) {
|
||||
_, err = svc.col.GetCollection().ReplaceOne(ctx, query, model)
|
||||
return err
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) InsertOne(model T) (id primitive.ObjectID, err error) {
|
||||
m := any(&model).(interfaces.Model)
|
||||
if m.GetId().IsZero() {
|
||||
@@ -98,6 +200,18 @@ func (svc *ModelServiceV2[T]) InsertOne(model T) (id primitive.ObjectID, err err
|
||||
return res.InsertedID.(primitive.ObjectID), nil
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) InsertOneContext(ctx context.Context, model T) (id primitive.ObjectID, err error) {
|
||||
m := any(&model).(interfaces.Model)
|
||||
if m.GetId().IsZero() {
|
||||
m.SetId(primitive.NewObjectID())
|
||||
}
|
||||
res, err := svc.col.GetCollection().InsertOne(ctx, m)
|
||||
if err != nil {
|
||||
return primitive.NilObjectID, err
|
||||
}
|
||||
return res.InsertedID.(primitive.ObjectID), nil
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) InsertMany(models []T) (ids []primitive.ObjectID, err error) {
|
||||
var _models []any
|
||||
for _, model := range models {
|
||||
@@ -117,6 +231,25 @@ func (svc *ModelServiceV2[T]) InsertMany(models []T) (ids []primitive.ObjectID,
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) InsertManyContext(ctx context.Context, models []T) (ids []primitive.ObjectID, err error) {
|
||||
var _models []any
|
||||
for _, model := range models {
|
||||
m := any(&model).(interfaces.Model)
|
||||
if m.GetId().IsZero() {
|
||||
m.SetId(primitive.NewObjectID())
|
||||
}
|
||||
_models = append(_models, m)
|
||||
}
|
||||
res, err := svc.col.GetCollection().InsertMany(ctx, _models)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, v := range res.InsertedIDs {
|
||||
ids = append(ids, v.(primitive.ObjectID))
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (svc *ModelServiceV2[T]) Count(query bson.M) (total int, err error) {
|
||||
return svc.col.Count(query)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user