Files
crawlab/core/models/client/model_service.go

429 lines
11 KiB
Go

package client
import (
"encoding/json"
"fmt"
"reflect"
"sync"
"github.com/crawlab-team/crawlab/core/grpc/client"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/mongo"
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/grpc"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
var (
instanceMap = make(map[string]interface{})
onceMap = make(map[string]*sync.Once)
mu sync.Mutex
)
type ModelService[T any] struct {
cfg interfaces.NodeConfigService
modelType string
}
func (svc *ModelService[T]) GetById(id primitive.ObjectID) (model *T, err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return nil, fmt.Errorf("failed to get model base service client: %v", err)
}
res, err := modelClient.GetById(ctx, &grpc.ModelServiceGetByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
})
if err != nil {
return nil, err
}
return svc.deserializeOne(res)
}
func (svc *ModelService[T]) GetOne(query bson.M, options *mongo.FindOptions) (model *T, err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return nil, err
}
findOptionsData, err := json.Marshal(options)
if err != nil {
return nil, err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return nil, fmt.Errorf("failed to get model base service client: %v", err)
}
res, err := modelClient.GetOne(ctx, &grpc.ModelServiceGetOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
FindOptions: findOptionsData,
})
if err != nil {
return nil, err
}
return svc.deserializeOne(res)
}
func (svc *ModelService[T]) GetMany(query bson.M, options *mongo.FindOptions) (models []T, err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return nil, err
}
findOptionsData, err := json.Marshal(options)
if err != nil {
return nil, err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return nil, fmt.Errorf("failed to get model base service client: %v", err)
}
res, err := modelClient.GetMany(ctx, &grpc.ModelServiceGetManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
FindOptions: findOptionsData,
})
if err != nil {
return nil, err
}
return svc.deserializeMany(res)
}
func (svc *ModelService[T]) DeleteById(id primitive.ObjectID) (err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return fmt.Errorf("failed to get model base service client: %v", err)
}
_, err = modelClient.DeleteById(ctx, &grpc.ModelServiceDeleteByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
})
if err != nil {
return err
}
return nil
}
func (svc *ModelService[T]) DeleteOne(query bson.M) (err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return fmt.Errorf("failed to get model base service client: %v", err)
}
_, err = modelClient.DeleteOne(ctx, &grpc.ModelServiceDeleteOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelService[T]) DeleteMany(query bson.M) (err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return fmt.Errorf("failed to get model base service client: %v", err)
}
_, err = modelClient.DeleteMany(ctx, &grpc.ModelServiceDeleteManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelService[T]) UpdateById(id primitive.ObjectID, update bson.M) (err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
updateData, err := json.Marshal(update)
if err != nil {
return err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return fmt.Errorf("failed to get model base service client: %v", err)
}
_, err = modelClient.UpdateById(ctx, &grpc.ModelServiceUpdateByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
Update: updateData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelService[T]) UpdateOne(query bson.M, update bson.M) (err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
updateData, err := json.Marshal(update)
if err != nil {
return err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return fmt.Errorf("failed to get model base service client: %v", err)
}
_, err = modelClient.UpdateOne(ctx, &grpc.ModelServiceUpdateOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Update: updateData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelService[T]) UpdateMany(query bson.M, update bson.M) (err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
updateData, err := json.Marshal(update)
if err != nil {
return err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return fmt.Errorf("failed to get model base service client: %v", err)
}
_, err = modelClient.UpdateMany(ctx, &grpc.ModelServiceUpdateManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Update: updateData,
})
return nil
}
func (svc *ModelService[T]) ReplaceById(id primitive.ObjectID, model T) (err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
modelData, err := json.Marshal(model)
if err != nil {
return err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return fmt.Errorf("failed to get model base service client: %v", err)
}
_, err = modelClient.ReplaceById(ctx, &grpc.ModelServiceReplaceByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
Model: modelData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelService[T]) ReplaceOne(query bson.M, model T) (err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
modelData, err := json.Marshal(model)
if err != nil {
return err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return fmt.Errorf("failed to get model base service client: %v", err)
}
_, err = modelClient.ReplaceOne(ctx, &grpc.ModelServiceReplaceOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Model: modelData,
})
if err != nil {
return err
}
return nil
}
func (svc *ModelService[T]) InsertOne(model T) (id primitive.ObjectID, err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
modelData, err := json.Marshal(model)
if err != nil {
return primitive.NilObjectID, err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return primitive.NilObjectID, fmt.Errorf("failed to get model base service client: %v", err)
}
res, err := modelClient.InsertOne(ctx, &grpc.ModelServiceInsertOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Model: modelData,
})
if err != nil {
return primitive.NilObjectID, err
}
return deserialize[primitive.ObjectID](res)
}
func (svc *ModelService[T]) InsertMany(models []T) (ids []primitive.ObjectID, err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
modelsData, err := json.Marshal(models)
if err != nil {
return nil, err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return nil, fmt.Errorf("failed to get model base service client: %v", err)
}
res, err := modelClient.InsertMany(ctx, &grpc.ModelServiceInsertManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Models: modelsData,
})
if err != nil {
return nil, err
}
return deserialize[[]primitive.ObjectID](res)
}
func (svc *ModelService[T]) UpsertOne(query bson.M, model T) (id primitive.ObjectID, err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return primitive.NilObjectID, err
}
modelData, err := json.Marshal(model)
if err != nil {
return primitive.NilObjectID, err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return primitive.NilObjectID, fmt.Errorf("failed to get model base service client: %v", err)
}
res, err := modelClient.UpsertOne(ctx, &grpc.ModelServiceUpsertOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Model: modelData,
})
if err != nil {
return primitive.NilObjectID, err
}
return deserialize[primitive.ObjectID](res)
}
func (svc *ModelService[T]) Count(query bson.M) (total int, err error) {
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return 0, err
}
modelClient, err := client.GetGrpcClient().GetModelBaseServiceClient()
if err != nil {
return 0, fmt.Errorf("failed to get model base service client: %v", err)
}
res, err := modelClient.Count(ctx, &grpc.ModelServiceCountRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
})
if err != nil {
return 0, err
}
return deserialize[int](res)
}
func (svc *ModelService[T]) GetCol() (col *mongo.Col) {
return nil
}
func (svc *ModelService[T]) deserializeOne(res *grpc.Response) (result *T, err error) {
r, err := deserialize[T](res)
if err != nil {
return nil, err
}
return &r, err
}
func (svc *ModelService[T]) deserializeMany(res *grpc.Response) (results []T, err error) {
return deserialize[[]T](res)
}
func deserialize[T any](res *grpc.Response) (result T, err error) {
err = json.Unmarshal(res.Data, &result)
if err != nil {
return result, err
}
return result, nil
}
func NewModelService[T any]() *ModelService[T] {
mu.Lock()
defer mu.Unlock()
var v T
t := reflect.TypeOf(v)
typeName := t.Name()
if _, exists := onceMap[typeName]; !exists {
onceMap[typeName] = new(sync.Once)
}
var instance *ModelService[T]
onceMap[typeName].Do(func() {
instance = &ModelService[T]{
cfg: nodeconfig.GetNodeConfigService(),
modelType: typeName,
}
instanceMap[typeName] = instance
})
return instanceMap[typeName].(*ModelService[T])
}