feat: optimized dependency logic

This commit is contained in:
Marvin Zhang
2024-11-05 18:21:52 +08:00
parent 6af282058d
commit a0989d36db
13 changed files with 375 additions and 133 deletions

View File

@@ -2,6 +2,9 @@ package client
import (
"encoding/json"
"reflect"
"sync"
"github.com/crawlab-team/crawlab/core/grpc/client"
"github.com/crawlab-team/crawlab/core/interfaces"
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
@@ -9,8 +12,6 @@ import (
"github.com/crawlab-team/crawlab/grpc"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"reflect"
"sync"
)
var (
@@ -255,11 +256,6 @@ func (svc *ModelService[T]) InsertOne(model T) (id primitive.ObjectID, err error
return primitive.NilObjectID, err
}
return deserialize[primitive.ObjectID](res)
//idStr, err := deserialize[string](res)
//if err != nil {
// return primitive.NilObjectID, err
//}
//return primitive.ObjectIDFromHex(idStr)
}
func (svc *ModelService[T]) InsertMany(models []T) (ids []primitive.ObjectID, err error) {
@@ -280,6 +276,30 @@ func (svc *ModelService[T]) InsertMany(models []T) (ids []primitive.ObjectID, er
return deserialize[[]primitive.ObjectID](res)
}
func (svc *ModelService[T]) UpsertOne(query bson.M, model T) (id primitive.ObjectID, err error) {
ctx, cancel := svc.c.Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return primitive.NilObjectID, err
}
modelData, err := json.Marshal(model)
if err != nil {
return primitive.NilObjectID, err
}
res, err := svc.c.ModelBaseServiceClient.UpsertOne(ctx, &grpc.ModelServiceUpsertOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Model: modelData,
})
if err != nil {
return primitive.NilObjectID, err
}
return deserialize[primitive.ObjectID](res)
}
func (svc *ModelService[T]) Count(query bson.M) (total int, err error) {
ctx, cancel := svc.c.Context()
defer cancel()

View File

@@ -1,7 +1,10 @@
package models
import "go.mongodb.org/mongo-driver/bson/primitive"
type DependencyLog struct {
any `collection:"dependency_logs"`
BaseModel[DependencyLog] `bson:",inline"`
Content string `json:"content" bson:"content"`
DependencyId primitive.ObjectID `json:"dependency_id" bson:"dependency_id"`
Content string `json:"content" bson:"content"`
}

View File

@@ -3,10 +3,11 @@ package service
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/mongo/options"
"reflect"
"sync"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/db/mongo"
"go.mongodb.org/mongo-driver/bson"
@@ -249,6 +250,44 @@ func (svc *ModelService[T]) InsertManyContext(ctx context.Context, models []T) (
}
return ids, nil
}
func (svc *ModelService[T]) UpsertOne(query bson.M, model T) (id primitive.ObjectID, err error) {
opts := options.ReplaceOptions{}
opts.SetUpsert(true)
result, err := svc.col.GetCollection().ReplaceOne(svc.col.GetContext(), query, model, &opts)
if err != nil {
return primitive.NilObjectID, err
}
if result.UpsertedID != nil {
// If document was inserted
return result.UpsertedID.(primitive.ObjectID), nil
}
// If document was updated, get its ID from the model
m := any(&model).(interfaces.Model)
return m.GetId(), nil
}
func (svc *ModelService[T]) UpsertOneContext(ctx context.Context, query bson.M, model T) (id primitive.ObjectID, err error) {
opts := options.ReplaceOptions{}
opts.SetUpsert(true)
result, err := svc.col.GetCollection().ReplaceOne(ctx, query, model, &opts)
if err != nil {
return primitive.NilObjectID, err
}
if result.UpsertedID != nil {
// If document was inserted
return result.UpsertedID.(primitive.ObjectID), nil
}
// If document was updated, get its ID from the query or model
if id, ok := query["_id"].(primitive.ObjectID); ok {
return id, nil
}
m := any(&model).(interfaces.Model)
return m.GetId(), nil
}
func (svc *ModelService[T]) Count(query bson.M) (total int, err error) {
return svc.col.Count(query)