Files
crawlab/core/models/client/model_service_v2.go
2024-10-20 17:55:57 +08:00

358 lines
8.5 KiB
Go

package client
import (
"encoding/json"
"github.com/crawlab-team/crawlab/core/grpc/client"
"github.com/crawlab-team/crawlab/core/interfaces"
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/db/mongo"
"github.com/crawlab-team/crawlab/grpc"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"reflect"
"sync"
)
var (
instanceMap = make(map[string]interface{})
onceMap = make(map[string]*sync.Once)
mu sync.Mutex
)
type ModelServiceV2[T any] struct {
cfg interfaces.NodeConfigService
c *client.GrpcClientV2
modelType string
}
func (svc *ModelServiceV2[T]) GetById(id primitive.ObjectID) (model *T, err error) {
ctx, cancel := svc.c.Context()
defer cancel()
res, err := svc.c.ModelBaseServiceV2Client.GetById(ctx, &grpc.ModelServiceV2GetByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
})
if err != nil {
return nil, err
}
return svc.deserializeOne(res)
}
func (svc *ModelServiceV2[T]) GetOne(query bson.M, options *mongo.FindOptions) (model *T, err error) {
ctx, cancel := svc.c.Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return nil, err
}
findOptionsData, err := json.Marshal(options)
if err != nil {
return nil, err
}
res, err := svc.c.ModelBaseServiceV2Client.GetOne(ctx, &grpc.ModelServiceV2GetOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
FindOptions: findOptionsData,
})
if err != nil {
return nil, err
}
return svc.deserializeOne(res)
}
func (svc *ModelServiceV2[T]) GetMany(query bson.M, options *mongo.FindOptions) (models []T, err error) {
ctx, cancel := svc.c.Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return nil, err
}
findOptionsData, err := json.Marshal(options)
if err != nil {
return nil, err
}
res, err := svc.c.ModelBaseServiceV2Client.GetMany(ctx, &grpc.ModelServiceV2GetManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
FindOptions: findOptionsData,
})
if err != nil {
return nil, err
}
return svc.deserializeMany(res)
}
func (svc *ModelServiceV2[T]) DeleteById(id primitive.ObjectID) (err error) {
ctx, cancel := svc.c.Context()
defer cancel()
_, err = svc.c.ModelBaseServiceV2Client.DeleteById(ctx, &grpc.ModelServiceV2DeleteByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
})
if err != nil {
return err
}
return nil
}
func (svc *ModelServiceV2[T]) DeleteOne(query bson.M) (err error) {
ctx, cancel := svc.c.Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceV2Client.DeleteOne(ctx, &grpc.ModelServiceV2DeleteOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelServiceV2[T]) DeleteMany(query bson.M) (err error) {
ctx, cancel := svc.c.Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceV2Client.DeleteMany(ctx, &grpc.ModelServiceV2DeleteManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelServiceV2[T]) UpdateById(id primitive.ObjectID, update bson.M) (err error) {
ctx, cancel := svc.c.Context()
defer cancel()
updateData, err := json.Marshal(update)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceV2Client.UpdateById(ctx, &grpc.ModelServiceV2UpdateByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
Update: updateData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelServiceV2[T]) UpdateOne(query bson.M, update bson.M) (err error) {
ctx, cancel := svc.c.Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
updateData, err := json.Marshal(update)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceV2Client.UpdateOne(ctx, &grpc.ModelServiceV2UpdateOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Update: updateData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelServiceV2[T]) UpdateMany(query bson.M, update bson.M) (err error) {
ctx, cancel := svc.c.Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
updateData, err := json.Marshal(update)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceV2Client.UpdateMany(ctx, &grpc.ModelServiceV2UpdateManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Update: updateData,
})
return nil
}
func (svc *ModelServiceV2[T]) ReplaceById(id primitive.ObjectID, model T) (err error) {
ctx, cancel := svc.c.Context()
defer cancel()
modelData, err := json.Marshal(model)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceV2Client.ReplaceById(ctx, &grpc.ModelServiceV2ReplaceByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
Model: modelData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelServiceV2[T]) ReplaceOne(query bson.M, model T) (err error) {
ctx, cancel := svc.c.Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
modelData, err := json.Marshal(model)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceV2Client.ReplaceOne(ctx, &grpc.ModelServiceV2ReplaceOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Model: modelData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelServiceV2[T]) InsertOne(model T) (id primitive.ObjectID, err error) {
ctx, cancel := svc.c.Context()
defer cancel()
modelData, err := json.Marshal(model)
if err != nil {
return primitive.NilObjectID, err
}
res, err := svc.c.ModelBaseServiceV2Client.InsertOne(ctx, &grpc.ModelServiceV2InsertOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Model: modelData,
})
if err != nil {
return primitive.NilObjectID, err
}
return deserialize[primitive.ObjectID](res)
//idStr, err := deserialize[string](res)
//if err != nil {
// return primitive.NilObjectID, err
//}
//return primitive.ObjectIDFromHex(idStr)
}
func (svc *ModelServiceV2[T]) InsertMany(models []T) (ids []primitive.ObjectID, err error) {
ctx, cancel := svc.c.Context()
defer cancel()
modelsData, err := json.Marshal(models)
if err != nil {
return nil, err
}
res, err := svc.c.ModelBaseServiceV2Client.InsertMany(ctx, &grpc.ModelServiceV2InsertManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Models: modelsData,
})
if err != nil {
return nil, err
}
return deserialize[[]primitive.ObjectID](res)
}
func (svc *ModelServiceV2[T]) Count(query bson.M) (total int, err error) {
ctx, cancel := svc.c.Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return 0, err
}
res, err := svc.c.ModelBaseServiceV2Client.Count(ctx, &grpc.ModelServiceV2CountRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
})
if err != nil {
return 0, err
}
return deserialize[int](res)
}
func (svc *ModelServiceV2[T]) GetCol() (col *mongo.Col) {
return nil
}
func (svc *ModelServiceV2[T]) deserializeOne(res *grpc.Response) (result *T, err error) {
r, err := deserialize[T](res)
if err != nil {
return nil, err
}
return &r, err
}
func (svc *ModelServiceV2[T]) deserializeMany(res *grpc.Response) (results []T, err error) {
return deserialize[[]T](res)
}
func deserialize[T any](res *grpc.Response) (result T, err error) {
err = json.Unmarshal(res.Data, &result)
if err != nil {
return result, err
}
return result, nil
}
func NewModelServiceV2[T any]() *ModelServiceV2[T] {
mu.Lock()
defer mu.Unlock()
var v T
t := reflect.TypeOf(v)
typeName := t.Name()
if _, exists := onceMap[typeName]; !exists {
onceMap[typeName] = new(sync.Once)
}
var instance *ModelServiceV2[T]
c := client.GetGrpcClientV2()
if !c.IsStarted() {
err := c.Start()
if err != nil {
panic(err)
}
}
onceMap[typeName].Do(func() {
instance = &ModelServiceV2[T]{
cfg: nodeconfig.GetNodeConfigService(),
c: c,
modelType: typeName,
}
instanceMap[typeName] = instance
})
return instanceMap[typeName].(*ModelServiceV2[T])
}