From 53afb0064e55ded619fa2cf6f5f762f640515ac2 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Thu, 31 Oct 2024 12:59:58 +0800 Subject: [PATCH] refactor: code cleanup --- core/controllers/base_file_v2.go | 4 +- core/controllers/base_v2.go | 8 +- core/controllers/schedule_v2.go | 6 +- core/controllers/setting_v2.go | 4 +- core/controllers/spider_v2.go | 8 +- core/controllers/task_v2.go | 6 +- core/controllers/token_v2.go | 2 +- core/controllers/user_v2.go | 8 +- core/controllers/utils_context.go | 15 +- core/entity/grpc_subscribe.go | 26 -- core/entity/task.go | 11 - core/errors/grpc.go | 2 - core/fs/service_v2.go | 2 +- core/grpc/client/client.go | 182 +++++++++++ core/grpc/client/client_v2.go | 308 ------------------ core/grpc/client/options.go | 13 - core/grpc/server/{server_v2.go => server.go} | 88 ++--- core/grpc/server/task_service_server.go | 5 +- core/interfaces/controller_params.go | 6 - core/interfaces/data_source_service.go | 14 - core/interfaces/event_data.go | 6 - core/interfaces/event_service.go | 9 - .../{fs_service_v2.go => fs_service.go} | 2 +- core/interfaces/fs_service_options.go | 21 -- core/interfaces/grpc_base.go | 9 - core/interfaces/grpc_base_service_params.go | 5 - core/interfaces/grpc_client.go | 30 -- .../grpc_client_model_base_service.go | 7 - core/interfaces/grpc_client_model_delegate.go | 7 - .../grpc_client_model_environment_service.go | 14 - .../grpc_client_model_node_service.go | 15 - core/interfaces/grpc_client_model_service.go | 6 - .../grpc_client_model_spider_service.go | 14 - .../grpc_client_model_task_service.go | 14 - .../grpc_client_model_task_stat_service.go | 14 - core/interfaces/grpc_client_pool.go | 9 - .../grpc_model_base_service_message.go | 7 - core/interfaces/grpc_model_binder.go | 5 - .../interfaces/grpc_model_delegate_message.go | 8 - core/interfaces/grpc_model_list_binder.go | 5 - core/interfaces/grpc_server.go | 16 - core/interfaces/grpc_stream.go | 12 - core/interfaces/grpc_subscribe.go | 7 - core/interfaces/i18n_service.go | 6 - core/interfaces/injectable.go | 5 - core/interfaces/list.go | 5 - core/interfaces/model.go | 86 ----- core/interfaces/model_artifact.go | 12 - core/interfaces/model_artifact_sys.go | 21 -- core/interfaces/model_base_service.go | 28 -- core/interfaces/model_binder.go | 6 - core/interfaces/model_delegate.go | 22 -- core/interfaces/model_environment.go | 9 - core/interfaces/model_extra_value.go | 17 - core/interfaces/model_git.go | 18 - core/interfaces/model_list_binder.go | 6 - core/interfaces/model_node.go | 22 -- core/interfaces/model_node_delegate.go | 10 - core/interfaces/model_permission.go | 14 - core/interfaces/model_result.go | 11 - core/interfaces/model_role.go | 6 - core/interfaces/model_schedule.go | 28 -- core/interfaces/model_service_v2.go | 25 -- core/interfaces/model_spider.go | 24 -- core/interfaces/model_tag.go | 8 - core/interfaces/model_task.go | 23 -- core/interfaces/model_task_stat.go | 23 -- core/interfaces/model_user.go | 9 - core/interfaces/model_user_group.go | 6 - core/interfaces/node_master_service.go | 12 - core/interfaces/node_service_option.go | 4 - core/interfaces/node_worker_service.go | 10 - core/interfaces/options.go | 1 - core/interfaces/process_daemon.go | 17 - core/interfaces/provide.go | 3 - core/interfaces/result_service_mongo.go | 15 - core/interfaces/schedule_service.go | 23 -- core/interfaces/spider_admin_service.go | 22 -- core/interfaces/task_base_service.go | 11 - core/interfaces/task_handler_service.go | 60 ---- core/interfaces/task_hook_service.go | 6 - core/interfaces/task_scheduler_service.go | 16 - core/interfaces/task_stats_service.go | 9 - core/interfaces/test.go | 8 - core/interfaces/translation.go | 5 - core/interfaces/user_service.go | 19 -- core/interfaces/user_service_options.go | 13 - core/interfaces/with_address.go | 6 - core/interfaces/with_model_id.go | 6 - core/models/client/model_service_v2.go | 4 +- core/models/client/model_service_v2_test.go | 28 +- core/node/service/master_service.go | 4 +- core/node/service/worker_service.go | 14 +- core/task/handler/runner_v2.go | 10 +- .../handler/{service_v2.go => service.go} | 113 ++++--- core/task/scheduler/service_v2.go | 76 +++-- core/task/stats/{service_v2.go => service.go} | 24 +- 97 files changed, 388 insertions(+), 1561 deletions(-) delete mode 100644 core/entity/grpc_subscribe.go delete mode 100644 core/entity/task.go create mode 100644 core/grpc/client/client.go delete mode 100644 core/grpc/client/client_v2.go delete mode 100644 core/grpc/client/options.go rename core/grpc/server/{server_v2.go => server.go} (60%) delete mode 100644 core/interfaces/controller_params.go delete mode 100644 core/interfaces/data_source_service.go delete mode 100644 core/interfaces/event_data.go delete mode 100644 core/interfaces/event_service.go rename core/interfaces/{fs_service_v2.go => fs_service.go} (93%) delete mode 100644 core/interfaces/fs_service_options.go delete mode 100644 core/interfaces/grpc_base.go delete mode 100644 core/interfaces/grpc_base_service_params.go delete mode 100644 core/interfaces/grpc_client.go delete mode 100644 core/interfaces/grpc_client_model_base_service.go delete mode 100644 core/interfaces/grpc_client_model_delegate.go delete mode 100644 core/interfaces/grpc_client_model_environment_service.go delete mode 100644 core/interfaces/grpc_client_model_node_service.go delete mode 100644 core/interfaces/grpc_client_model_service.go delete mode 100644 core/interfaces/grpc_client_model_spider_service.go delete mode 100644 core/interfaces/grpc_client_model_task_service.go delete mode 100644 core/interfaces/grpc_client_model_task_stat_service.go delete mode 100644 core/interfaces/grpc_client_pool.go delete mode 100644 core/interfaces/grpc_model_base_service_message.go delete mode 100644 core/interfaces/grpc_model_binder.go delete mode 100644 core/interfaces/grpc_model_delegate_message.go delete mode 100644 core/interfaces/grpc_model_list_binder.go delete mode 100644 core/interfaces/grpc_server.go delete mode 100644 core/interfaces/grpc_stream.go delete mode 100644 core/interfaces/grpc_subscribe.go delete mode 100644 core/interfaces/i18n_service.go delete mode 100644 core/interfaces/injectable.go delete mode 100644 core/interfaces/list.go delete mode 100644 core/interfaces/model_artifact.go delete mode 100644 core/interfaces/model_artifact_sys.go delete mode 100644 core/interfaces/model_base_service.go delete mode 100644 core/interfaces/model_binder.go delete mode 100644 core/interfaces/model_delegate.go delete mode 100644 core/interfaces/model_environment.go delete mode 100644 core/interfaces/model_extra_value.go delete mode 100644 core/interfaces/model_git.go delete mode 100644 core/interfaces/model_list_binder.go delete mode 100644 core/interfaces/model_node.go delete mode 100644 core/interfaces/model_node_delegate.go delete mode 100644 core/interfaces/model_permission.go delete mode 100644 core/interfaces/model_result.go delete mode 100644 core/interfaces/model_role.go delete mode 100644 core/interfaces/model_schedule.go delete mode 100644 core/interfaces/model_service_v2.go delete mode 100644 core/interfaces/model_spider.go delete mode 100644 core/interfaces/model_tag.go delete mode 100644 core/interfaces/model_task.go delete mode 100644 core/interfaces/model_task_stat.go delete mode 100644 core/interfaces/model_user.go delete mode 100644 core/interfaces/model_user_group.go delete mode 100644 core/interfaces/node_master_service.go delete mode 100644 core/interfaces/node_service_option.go delete mode 100644 core/interfaces/node_worker_service.go delete mode 100644 core/interfaces/options.go delete mode 100644 core/interfaces/process_daemon.go delete mode 100644 core/interfaces/provide.go delete mode 100644 core/interfaces/result_service_mongo.go delete mode 100644 core/interfaces/schedule_service.go delete mode 100644 core/interfaces/spider_admin_service.go delete mode 100644 core/interfaces/task_base_service.go delete mode 100644 core/interfaces/task_handler_service.go delete mode 100644 core/interfaces/task_hook_service.go delete mode 100644 core/interfaces/task_scheduler_service.go delete mode 100644 core/interfaces/task_stats_service.go delete mode 100644 core/interfaces/test.go delete mode 100644 core/interfaces/translation.go delete mode 100644 core/interfaces/user_service.go delete mode 100644 core/interfaces/user_service_options.go delete mode 100644 core/interfaces/with_address.go delete mode 100644 core/interfaces/with_model_id.go rename core/task/handler/{service_v2.go => service.go} (75%) rename core/task/stats/{service_v2.go => service.go} (84%) diff --git a/core/controllers/base_file_v2.go b/core/controllers/base_file_v2.go index 5ea63d9a..7fd671b7 100644 --- a/core/controllers/base_file_v2.go +++ b/core/controllers/base_file_v2.go @@ -291,11 +291,11 @@ func PostBaseFileExport(rootPath string, c *gin.Context) { c.File(zipFilePath) } -func GetBaseFileFsSvc(rootPath string) (svc interfaces.FsServiceV2, err error) { +func GetBaseFileFsSvc(rootPath string) (svc interfaces.FsService, err error) { return getBaseFileFsSvc(rootPath) } -func getBaseFileFsSvc(rootPath string) (svc interfaces.FsServiceV2, err error) { +func getBaseFileFsSvc(rootPath string) (svc interfaces.FsService, err error) { workspacePath := viper.GetString("workspace") fsSvc := fs.NewFsServiceV2(filepath.Join(workspacePath, rootPath)) diff --git a/core/controllers/base_v2.go b/core/controllers/base_v2.go index 82130a3c..1e3e1407 100644 --- a/core/controllers/base_v2.go +++ b/core/controllers/base_v2.go @@ -56,8 +56,8 @@ func (ctr *BaseControllerV2[T]) Post(c *gin.Context) { HandleErrorBadRequest(c, err) return } - u := GetUserFromContextV2(c) - m := any(&model).(interfaces.ModelV2) + u := GetUserFromContext(c) + m := any(&model).(interfaces.Model) m.SetId(primitive.NewObjectID()) m.SetCreated(u.Id) m.SetUpdated(u.Id) @@ -90,8 +90,8 @@ func (ctr *BaseControllerV2[T]) PutById(c *gin.Context) { return } - u := GetUserFromContextV2(c) - m := any(&model).(interfaces.ModelV2) + u := GetUserFromContext(c) + m := any(&model).(interfaces.Model) m.SetUpdated(u.Id) if err := ctr.modelSvc.ReplaceById(id, model); err != nil { diff --git a/core/controllers/schedule_v2.go b/core/controllers/schedule_v2.go index 5be720ac..ffd1c3e3 100644 --- a/core/controllers/schedule_v2.go +++ b/core/controllers/schedule_v2.go @@ -16,7 +16,7 @@ func PostSchedule(c *gin.Context) { return } - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) modelSvc := service.NewModelServiceV2[models.ScheduleV2]() @@ -73,7 +73,7 @@ func PutScheduleById(c *gin.Context) { return } - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) if s.Enabled { if err := scheduleSvc.Enable(s, u.Id); err != nil { @@ -115,7 +115,7 @@ func postScheduleEnableDisableFunc(isEnable bool) func(c *gin.Context) { HandleErrorInternalServerError(c, err) return } - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) if isEnable { err = svc.Enable(*s, u.Id) } else { diff --git a/core/controllers/setting_v2.go b/core/controllers/setting_v2.go index c4be44cc..e7abca56 100644 --- a/core/controllers/setting_v2.go +++ b/core/controllers/setting_v2.go @@ -42,7 +42,7 @@ func PostSetting(c *gin.Context) { s.Key = key } - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) s.SetCreated(u.Id) s.SetUpdated(u.Id) @@ -77,7 +77,7 @@ func PutSetting(c *gin.Context) { return } - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) // save _s.Value = s.Value diff --git a/core/controllers/spider_v2.go b/core/controllers/spider_v2.go index fd78153a..9a8e6ca7 100644 --- a/core/controllers/spider_v2.go +++ b/core/controllers/spider_v2.go @@ -254,7 +254,7 @@ func PostSpider(c *gin.Context) { } // user - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) // add s.SetCreated(u.Id) @@ -306,7 +306,7 @@ func PutSpiderById(c *gin.Context) { return } - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) modelSvc := service.NewModelServiceV2[models2.SpiderV2]() @@ -708,14 +708,14 @@ func GetSpiderResults(c *gin.Context) { HandleSuccessWithListData(c, results, total) } -func getSpiderFsSvc(s *models2.SpiderV2) (svc interfaces.FsServiceV2, err error) { +func getSpiderFsSvc(s *models2.SpiderV2) (svc interfaces.FsService, err error) { workspacePath := viper.GetString("workspace") fsSvc := fs.NewFsServiceV2(filepath.Join(workspacePath, s.Id.Hex())) return fsSvc, nil } -func getSpiderFsSvcById(id primitive.ObjectID) (svc interfaces.FsServiceV2, err error) { +func getSpiderFsSvcById(id primitive.ObjectID) (svc interfaces.FsService, err error) { s, err := service.NewModelServiceV2[models2.SpiderV2]().GetById(id) if err != nil { log.Errorf("failed to get spider: %s", err.Error()) diff --git a/core/controllers/task_v2.go b/core/controllers/task_v2.go index f6890646..829ad385 100644 --- a/core/controllers/task_v2.go +++ b/core/controllers/task_v2.go @@ -300,7 +300,7 @@ func PostTaskRun(c *gin.Context) { } // user - if u := GetUserFromContextV2(c); u != nil { + if u := GetUserFromContext(c); u != nil { opts.UserId = u.Id } @@ -345,7 +345,7 @@ func PostTaskRestart(c *gin.Context) { } // user - if u := GetUserFromContextV2(c); u != nil { + if u := GetUserFromContext(c); u != nil { opts.UserId = u.Id } @@ -396,7 +396,7 @@ func PostTaskCancel(c *gin.Context) { return } - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) // cancel schedulerSvc, err := scheduler.GetTaskSchedulerServiceV2() diff --git a/core/controllers/token_v2.go b/core/controllers/token_v2.go index b66f873b..ae85cff1 100644 --- a/core/controllers/token_v2.go +++ b/core/controllers/token_v2.go @@ -18,7 +18,7 @@ func PostToken(c *gin.Context) { HandleErrorInternalServerError(c, err) return } - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) t.SetCreated(u.Id) t.SetUpdated(u.Id) t.Token, err = svc.MakeToken(u) diff --git a/core/controllers/user_v2.go b/core/controllers/user_v2.go index 152dcb35..b91a69ba 100644 --- a/core/controllers/user_v2.go +++ b/core/controllers/user_v2.go @@ -19,7 +19,7 @@ func PostUser(c *gin.Context) { HandleErrorBadRequest(c, err) return } - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) model := models.UserV2{ Username: payload.Username, Password: utils.EncryptMd5(payload.Password), @@ -62,7 +62,7 @@ func PostUserChangePassword(c *gin.Context) { } // get user - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) modelSvc := service.NewModelServiceV2[models.UserV2]() // update password @@ -83,7 +83,7 @@ func PostUserChangePassword(c *gin.Context) { } func GetUserMe(c *gin.Context) { - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) _u, err := service.NewModelServiceV2[models.UserV2]().GetById(u.Id) if err != nil { HandleErrorInternalServerError(c, err) @@ -101,7 +101,7 @@ func PutUserById(c *gin.Context) { } // get user - u := GetUserFromContextV2(c) + u := GetUserFromContext(c) modelSvc := service.NewModelServiceV2[models.UserV2]() diff --git a/core/controllers/utils_context.go b/core/controllers/utils_context.go index 00e2cc9c..08f55255 100644 --- a/core/controllers/utils_context.go +++ b/core/controllers/utils_context.go @@ -2,24 +2,11 @@ package controllers import ( "github.com/crawlab-team/crawlab/core/constants" - "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/models/v2" "github.com/gin-gonic/gin" ) -func GetUserFromContext(c *gin.Context) (u interfaces.User) { - value, ok := c.Get(constants.UserContextKey) - if !ok { - return nil - } - u, ok = value.(interfaces.User) - if !ok { - return nil - } - return u -} - -func GetUserFromContextV2(c *gin.Context) (u *models.UserV2) { +func GetUserFromContext(c *gin.Context) (u *models.UserV2) { value, ok := c.Get(constants.UserContextKey) if !ok { return nil diff --git a/core/entity/grpc_subscribe.go b/core/entity/grpc_subscribe.go deleted file mode 100644 index 79ae46ee..00000000 --- a/core/entity/grpc_subscribe.go +++ /dev/null @@ -1,26 +0,0 @@ -package entity - -import ( - "github.com/crawlab-team/crawlab/core/interfaces" -) - -type GrpcSubscribe struct { - Stream interfaces.GrpcStream - Finished chan bool -} - -func (sub *GrpcSubscribe) GetStream() interfaces.GrpcStream { - return sub.Stream -} - -func (sub *GrpcSubscribe) GetStreamBidirectional() interfaces.GrpcStreamBidirectional { - stream, ok := sub.Stream.(interfaces.GrpcStreamBidirectional) - if !ok { - return nil - } - return stream -} - -func (sub *GrpcSubscribe) GetFinished() chan bool { - return sub.Finished -} diff --git a/core/entity/task.go b/core/entity/task.go deleted file mode 100644 index 15aebfd0..00000000 --- a/core/entity/task.go +++ /dev/null @@ -1,11 +0,0 @@ -package entity - -import ( - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type StreamMessageTaskData struct { - TaskId primitive.ObjectID `json:"task_id"` - Records []Result `json:"data"` - Logs []string `json:"logs"` -} diff --git a/core/errors/grpc.go b/core/errors/grpc.go index 196d5eaf..c1975c0e 100644 --- a/core/errors/grpc.go +++ b/core/errors/grpc.go @@ -8,7 +8,5 @@ var ( ErrorGrpcClientFailedToStart = NewGrpcError("client failed to start") ErrorGrpcServerFailedToListen = NewGrpcError("server failed to listen") ErrorGrpcServerFailedToServe = NewGrpcError("server failed to serve") - ErrorGrpcNotAllowed = NewGrpcError("not allowed") - ErrorGrpcSubscribeNotExists = NewGrpcError("subscribe not exists") ErrorGrpcUnauthorized = NewGrpcError("unauthorized") ) diff --git a/core/fs/service_v2.go b/core/fs/service_v2.go index 918a3593..efdd9a0b 100644 --- a/core/fs/service_v2.go +++ b/core/fs/service_v2.go @@ -170,7 +170,7 @@ func (svc *ServiceV2) Export() (resultPath string, err error) { return zipFilePath, nil } -func NewFsServiceV2(path string) (svc interfaces.FsServiceV2) { +func NewFsServiceV2(path string) (svc interfaces.FsService) { return &ServiceV2{ rootPath: path, skipNames: []string{".git"}, diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go new file mode 100644 index 00000000..9d3c9165 --- /dev/null +++ b/core/grpc/client/client.go @@ -0,0 +1,182 @@ +package client + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/apex/log" + "github.com/cenkalti/backoff/v4" + "github.com/crawlab-team/crawlab/core/constants" + "github.com/crawlab-team/crawlab/core/entity" + "github.com/crawlab-team/crawlab/core/grpc/middlewares" + "github.com/crawlab-team/crawlab/core/interfaces" + nodeconfig "github.com/crawlab-team/crawlab/core/node/config" + "github.com/crawlab-team/crawlab/core/utils" + grpc2 "github.com/crawlab-team/crawlab/grpc" + "github.com/spf13/viper" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" +) + +type GrpcClient struct { + // dependencies + nodeCfgSvc interfaces.NodeConfigService + + // settings + address interfaces.Address + timeout time.Duration + + // internals + conn *grpc.ClientConn + err error + once sync.Once + stopped bool + stop chan struct{} + + // clients + NodeClient grpc2.NodeServiceClient + TaskClient grpc2.TaskServiceClient + ModelBaseServiceV2Client grpc2.ModelBaseServiceV2Client + DependencyClient grpc2.DependencyServiceV2Client + MetricClient grpc2.MetricServiceV2Client +} + +func (c *GrpcClient) Start() (err error) { + c.once.Do(func() { + // connect + err = c.connect() + if err != nil { + return + } + + // register rpc services + c.register() + }) + + return err +} + +func (c *GrpcClient) Stop() (err error) { + // set stopped flag + c.stopped = true + c.stop <- struct{}{} + log.Infof("[GrpcClient] stopped") + + // skip if connection is nil + if c.conn == nil { + return nil + } + + // grpc server address + address := c.address.String() + + // close connection + if err := c.conn.Close(); err != nil { + return err + } + log.Infof("grpc client disconnected from %s", address) + + return nil +} + +func (c *GrpcClient) register() { + c.NodeClient = grpc2.NewNodeServiceClient(c.conn) + c.ModelBaseServiceV2Client = grpc2.NewModelBaseServiceV2Client(c.conn) + c.TaskClient = grpc2.NewTaskServiceClient(c.conn) + c.DependencyClient = grpc2.NewDependencyServiceV2Client(c.conn) + c.MetricClient = grpc2.NewMetricServiceV2Client(c.conn) +} + +func (c *GrpcClient) Context() (ctx context.Context, cancel context.CancelFunc) { + return context.WithTimeout(context.Background(), c.timeout) +} + +func (c *GrpcClient) IsStarted() (res bool) { + return c.conn != nil +} + +func (c *GrpcClient) IsClosed() (res bool) { + if c.conn != nil { + return c.conn.GetState() == connectivity.Shutdown + } + return false +} + +func (c *GrpcClient) getRequestData(d interface{}) (data []byte) { + if d == nil { + return data + } + switch d.(type) { + case []byte: + data = d.([]byte) + default: + var err error + data, err = json.Marshal(d) + if err != nil { + panic(err) + } + } + return data +} + +func (c *GrpcClient) connect() (err error) { + op := func() error { + // grpc server address + address := c.address.String() + + // connection options + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithChainUnaryInterceptor(middlewares.GetAuthTokenUnaryChainInterceptor(c.nodeCfgSvc)), + grpc.WithChainStreamInterceptor(middlewares.GetAuthTokenStreamChainInterceptor(c.nodeCfgSvc)), + } + + // create new client connection + c.conn, err = grpc.NewClient(address, opts...) + if err != nil { + log.Errorf("[GrpcClient] grpc client failed to connect to %s: %v", address, err) + return err + } + + log.Infof("[GrpcClient] grpc client connected to %s", address) + + return nil + } + return backoff.RetryNotify(op, backoff.NewExponentialBackOff(), utils.BackoffErrorNotify("grpc client connect")) +} + +func newGrpcClient() (c *GrpcClient) { + client := &GrpcClient{ + address: entity.NewAddress(&entity.AddressOptions{ + Host: constants.DefaultGrpcClientRemoteHost, + Port: constants.DefaultGrpcClientRemotePort, + }), + timeout: 10 * time.Second, + stop: make(chan struct{}), + } + client.nodeCfgSvc = nodeconfig.GetNodeConfigService() + + if viper.GetString("grpc.address") != "" { + address, err := entity.NewAddressFromString(viper.GetString("grpc.address")) + if err != nil { + log.Errorf("failed to parse grpc address: %s", viper.GetString("grpc.address")) + panic(err) + } + client.address = address + } + + return client +} + +var clientV2 *GrpcClient +var clientV2Once sync.Once + +func GetGrpcClient() *GrpcClient { + clientV2Once.Do(func() { + clientV2 = newGrpcClient() + }) + return clientV2 +} diff --git a/core/grpc/client/client_v2.go b/core/grpc/client/client_v2.go deleted file mode 100644 index 47bfa301..00000000 --- a/core/grpc/client/client_v2.go +++ /dev/null @@ -1,308 +0,0 @@ -package client - -import ( - "context" - "encoding/json" - "github.com/apex/log" - "github.com/cenkalti/backoff/v4" - "github.com/crawlab-team/crawlab/core/constants" - "github.com/crawlab-team/crawlab/core/entity" - "github.com/crawlab-team/crawlab/core/errors" - "github.com/crawlab-team/crawlab/core/grpc/middlewares" - "github.com/crawlab-team/crawlab/core/interfaces" - nodeconfig "github.com/crawlab-team/crawlab/core/node/config" - "github.com/crawlab-team/crawlab/core/utils" - grpc2 "github.com/crawlab-team/crawlab/grpc" - "github.com/crawlab-team/crawlab/trace" - "github.com/spf13/viper" - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" - "io" - "sync" - "time" -) - -type GrpcClientV2 struct { - // dependencies - nodeCfgSvc interfaces.NodeConfigService - - // settings - address interfaces.Address - timeout time.Duration - - // internals - conn *grpc.ClientConn - stream grpc2.NodeService_SubscribeClient - msgCh chan *grpc2.StreamMessage - err error - once sync.Once - stopped bool - stop chan struct{} - - // clients - NodeClient grpc2.NodeServiceClient - TaskClient grpc2.TaskServiceClient - ModelBaseServiceV2Client grpc2.ModelBaseServiceV2Client - DependenciesClient grpc2.DependenciesServiceV2Client - MetricsClient grpc2.MetricsServiceV2Client -} - -func (c *GrpcClientV2) Start() (err error) { - c.once.Do(func() { - // connect - err = c.connect() - if err != nil { - return - } - - // register rpc services - c.Register() - - // subscribe - err = c.subscribe() - if err != nil { - return - } - - // handle stream message - go c.handleStreamMessage() - }) - - return err -} - -func (c *GrpcClientV2) Stop() (err error) { - // set stopped flag - c.stopped = true - c.stop <- struct{}{} - log.Infof("[GrpcClient] stopped") - - // skip if connection is nil - if c.conn == nil { - return nil - } - - // grpc server address - address := c.address.String() - - // unsubscribe - if err := c.unsubscribe(); err != nil { - return err - } - log.Infof("grpc client unsubscribed from %s", address) - - // close connection - if err := c.conn.Close(); err != nil { - return err - } - log.Infof("grpc client disconnected from %s", address) - - return nil -} - -func (c *GrpcClientV2) Register() { - c.NodeClient = grpc2.NewNodeServiceClient(c.conn) - c.ModelBaseServiceV2Client = grpc2.NewModelBaseServiceV2Client(c.conn) - c.TaskClient = grpc2.NewTaskServiceClient(c.conn) - c.DependenciesClient = grpc2.NewDependenciesServiceV2Client(c.conn) - c.MetricsClient = grpc2.NewMetricsServiceV2Client(c.conn) -} - -func (c *GrpcClientV2) Context() (ctx context.Context, cancel context.CancelFunc) { - return context.WithTimeout(context.Background(), c.timeout) -} - -func (c *GrpcClientV2) NewRequest(d interface{}) (req *grpc2.Request) { - return &grpc2.Request{ - NodeKey: c.nodeCfgSvc.GetNodeKey(), - Data: c.getRequestData(d), - } -} - -func (c *GrpcClientV2) IsStarted() (res bool) { - return c.conn != nil -} - -func (c *GrpcClientV2) IsClosed() (res bool) { - if c.conn != nil { - return c.conn.GetState() == connectivity.Shutdown - } - return false -} - -func (c *GrpcClientV2) GetMessageChannel() (msgCh chan *grpc2.StreamMessage) { - return c.msgCh -} - -func (c *GrpcClientV2) getRequestData(d interface{}) (data []byte) { - if d == nil { - return data - } - switch d.(type) { - case []byte: - data = d.([]byte) - default: - var err error - data, err = json.Marshal(d) - if err != nil { - panic(err) - } - } - return data -} - -func (c *GrpcClientV2) unsubscribe() (err error) { - req := c.NewRequest(&entity.NodeInfo{ - Key: c.nodeCfgSvc.GetNodeKey(), - IsMaster: false, - }) - if _, err = c.NodeClient.Unsubscribe(context.Background(), req); err != nil { - return trace.TraceError(err) - } - return nil -} - -func (c *GrpcClientV2) connect() (err error) { - op := func() error { - // grpc server address - address := c.address.String() - - // timeout context - ctx, cancel := context.WithTimeout(context.Background(), c.timeout) - defer cancel() - - // connection - // TODO: configure dial options - var opts []grpc.DialOption - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - opts = append(opts, grpc.WithBlock()) - opts = append(opts, grpc.WithChainUnaryInterceptor(middlewares.GetAuthTokenUnaryChainInterceptor(c.nodeCfgSvc))) - opts = append(opts, grpc.WithChainStreamInterceptor(middlewares.GetAuthTokenStreamChainInterceptor(c.nodeCfgSvc))) - c.conn, err = grpc.DialContext(ctx, address, opts...) - if err != nil { - _ = trace.TraceError(err) - return errors.ErrorGrpcClientFailedToStart - } - log.Infof("[GrpcClient] grpc client connected to %s", address) - - return nil - } - return backoff.RetryNotify(op, backoff.NewExponentialBackOff(), utils.BackoffErrorNotify("grpc client connect")) -} - -func (c *GrpcClientV2) subscribe() (err error) { - op := func() error { - // skip if stopped - if c.stopped { - return nil - } - - // request - req := c.NewRequest(&entity.NodeInfo{ - Key: c.nodeCfgSvc.GetNodeKey(), - IsMaster: false, - }) - - // timeout context - ctx, cancel := context.WithTimeout(context.Background(), c.timeout) - defer cancel() - - // subscribe - c.stream, err = c.NodeClient.Subscribe(ctx, req) - if err != nil { - return trace.TraceError(err) - } - - // log - log.Infof("[GrpcClient] grpc client subscribed to remote server") - - return nil - } - - return backoff.RetryNotify(op, backoff.NewExponentialBackOff(), utils.BackoffErrorNotify("grpc client subscribe")) -} - -func (c *GrpcClientV2) handleStreamMessage() { - log.Infof("[GrpcClient] start handling stream message...") - for { - select { - case <-c.stop: - return - - default: - // resubscribe if stream is set to nil - if c.stream == nil { - if err := c.subscribe(); err != nil { - log.Errorf("subscribe") - return - } - } - - // receive stream message - msg, err := c.stream.Recv() - log.Debugf("[GrpcClient] received message: %v", msg) - if err != nil { - // set error - c.err = err - - // end - if err == io.EOF { - log.Infof("[GrpcClient] received EOF signal, disconnecting") - return - } - - // connection closed - if c.IsClosed() { - return - } - - // error - trace.PrintError(err) - c.stream = nil - time.Sleep(1 * time.Second) - continue - } - - // send stream message to channel - c.msgCh <- msg - - // reset error - c.err = nil - } - } -} - -func newGrpcClientV2() (c *GrpcClientV2) { - client := &GrpcClientV2{ - address: entity.NewAddress(&entity.AddressOptions{ - Host: constants.DefaultGrpcClientRemoteHost, - Port: constants.DefaultGrpcClientRemotePort, - }), - timeout: 10 * time.Second, - stop: make(chan struct{}), - msgCh: make(chan *grpc2.StreamMessage), - } - client.nodeCfgSvc = nodeconfig.GetNodeConfigService() - - if viper.GetString("grpc.address") != "" { - address, err := entity.NewAddressFromString(viper.GetString("grpc.address")) - if err != nil { - log.Errorf("failed to parse grpc address: %s", viper.GetString("grpc.address")) - panic(err) - } - client.address = address - } - - return client -} - -var clientV2 *GrpcClientV2 -var clientV2Once sync.Once - -func GetGrpcClientV2() *GrpcClientV2 { - clientV2Once.Do(func() { - clientV2 = newGrpcClientV2() - }) - return clientV2 -} diff --git a/core/grpc/client/options.go b/core/grpc/client/options.go deleted file mode 100644 index 8936e657..00000000 --- a/core/grpc/client/options.go +++ /dev/null @@ -1,13 +0,0 @@ -package client - -import ( - "github.com/crawlab-team/crawlab/core/interfaces" -) - -type Option func(client interfaces.GrpcClient) - -func WithAddress(address interfaces.Address) Option { - return func(c interfaces.GrpcClient) { - c.SetAddress(address) - } -} diff --git a/core/grpc/server/server_v2.go b/core/grpc/server/server.go similarity index 60% rename from core/grpc/server/server_v2.go rename to core/grpc/server/server.go index 6c43c474..9dea9db1 100644 --- a/core/grpc/server/server_v2.go +++ b/core/grpc/server/server.go @@ -5,25 +5,17 @@ import ( "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/entity" - "github.com/crawlab-team/crawlab/core/errors" "github.com/crawlab-team/crawlab/core/grpc/middlewares" "github.com/crawlab-team/crawlab/core/interfaces" nodeconfig "github.com/crawlab-team/crawlab/core/node/config" grpc2 "github.com/crawlab-team/crawlab/grpc" - "github.com/crawlab-team/crawlab/trace" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth" - grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/auth" + grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" errors2 "github.com/pkg/errors" "github.com/spf13/viper" "google.golang.org/grpc" "net" - "sync" -) - -var ( - subsV2 = map[string]interfaces.GrpcSubscribe{} - mutexSubsV2 = &sync.Mutex{} ) type GrpcServer struct { @@ -57,7 +49,7 @@ func (svr *GrpcServer) SetConfigPath(path string) { func (svr *GrpcServer) Init() (err error) { // register - if err := svr.Register(); err != nil { + if err := svr.register(); err != nil { return err } @@ -71,10 +63,10 @@ func (svr *GrpcServer) Start() (err error) { // listener svr.l, err = net.Listen("tcp", address) if err != nil { - _ = trace.TraceError(err) - return errors.ErrorGrpcServerFailedToListen + log.Errorf("[GrpcServer] failed to listen: %v", err) + return err } - log.Infof("grpc server listens to %s", address) + log.Infof("[GrpcServer] grpc server listens to %s", address) // start grpc server go func() { @@ -82,8 +74,7 @@ func (svr *GrpcServer) Start() (err error) { if errors2.Is(err, grpc.ErrServerStopped) { return } - trace.PrintError(err) - log.Error(errors.ErrorGrpcServerFailedToServe.Error()) + log.Errorf("[GrpcServer] failed to serve: %v", err) } }() @@ -97,23 +88,23 @@ func (svr *GrpcServer) Stop() (err error) { } // graceful stop - log.Infof("grpc server stopping...") + log.Infof("[GrpcServer] grpc server stopping...") svr.svr.Stop() // close listener - log.Infof("grpc server closing listener...") + log.Infof("[GrpcServer] grpc server closing listener...") _ = svr.l.Close() // mark as stopped svr.stopped = true // log - log.Infof("grpc server stopped") + log.Infof("[GrpcServer] grpc server stopped") return nil } -func (svr *GrpcServer) Register() (err error) { +func (svr *GrpcServer) register() (err error) { grpc2.RegisterNodeServiceServer(svr.svr, *svr.NodeSvr) grpc2.RegisterModelBaseServiceV2Server(svr.svr, *svr.ModelBaseServiceSvr) grpc2.RegisterTaskServiceServer(svr.svr, *svr.TaskSvr) @@ -124,42 +115,11 @@ func (svr *GrpcServer) Register() (err error) { } func (svr *GrpcServer) recoveryHandlerFunc(p interface{}) (err error) { - err = errors.NewError(errors.ErrorPrefixGrpc, fmt.Sprintf("%v", p)) - trace.PrintError(err) - return err + log.Errorf("[GrpcServer] recovered from panic: %v", p) + return fmt.Errorf("recovered from panic: %v", p) } -func (svr *GrpcServer) SetAddress(address interfaces.Address) { - -} - -func (svr *GrpcServer) GetSubscribe(key string) (sub interfaces.GrpcSubscribe, err error) { - mutexSubsV2.Lock() - defer mutexSubsV2.Unlock() - sub, ok := subsV2[key] - if !ok { - return nil, errors.ErrorGrpcSubscribeNotExists - } - return sub, nil -} - -func (svr *GrpcServer) SetSubscribe(key string, sub interfaces.GrpcSubscribe) { - mutexSubsV2.Lock() - defer mutexSubsV2.Unlock() - subsV2[key] = sub -} - -func (svr *GrpcServer) DeleteSubscribe(key string) { - mutexSubsV2.Lock() - defer mutexSubsV2.Unlock() - delete(subsV2, key) -} - -func (svr *GrpcServer) IsStopped() (res bool) { - return svr.stopped -} - -func NewGrpcServerV2() (svr *GrpcServer, err error) { +func NewGrpcServer() (svr *GrpcServer, err error) { // server svr = &GrpcServer{ address: entity.NewAddress(&entity.AddressOptions{ @@ -190,19 +150,19 @@ func NewGrpcServerV2() (svr *GrpcServer, err error) { svr.MetricSvr = GetMetricsServerV2() // recovery options - recoveryOpts := []grpc_recovery.Option{ - grpc_recovery.WithRecoveryHandler(svr.recoveryHandlerFunc), + recoveryOpts := []grpcrecovery.Option{ + grpcrecovery.WithRecoveryHandler(svr.recoveryHandlerFunc), } // grpc server svr.svr = grpc.NewServer( - grpc_middleware.WithUnaryServerChain( - grpc_recovery.UnaryServerInterceptor(recoveryOpts...), - grpc_auth.UnaryServerInterceptor(middlewares.GetAuthTokenFunc(svr.nodeCfgSvc)), + grpcmiddleware.WithUnaryServerChain( + grpcrecovery.UnaryServerInterceptor(recoveryOpts...), + grpcauth.UnaryServerInterceptor(middlewares.GetAuthTokenFunc(svr.nodeCfgSvc)), ), - grpc_middleware.WithStreamServerChain( - grpc_recovery.StreamServerInterceptor(recoveryOpts...), - grpc_auth.StreamServerInterceptor(middlewares.GetAuthTokenFunc(svr.nodeCfgSvc)), + grpcmiddleware.WithStreamServerChain( + grpcrecovery.StreamServerInterceptor(recoveryOpts...), + grpcauth.StreamServerInterceptor(middlewares.GetAuthTokenFunc(svr.nodeCfgSvc)), ), ) @@ -220,7 +180,7 @@ func GetGrpcServerV2() (svr *GrpcServer, err error) { if _serverV2 != nil { return _serverV2, nil } - _serverV2, err = NewGrpcServerV2() + _serverV2, err = NewGrpcServer() if err != nil { return nil, err } diff --git a/core/grpc/server/task_service_server.go b/core/grpc/server/task_service_server.go index bb326894..f2230786 100644 --- a/core/grpc/server/task_service_server.go +++ b/core/grpc/server/task_service_server.go @@ -31,11 +31,10 @@ type TaskServiceServer struct { // dependencies cfgSvc interfaces.NodeConfigService - statsSvc *stats.ServiceV2 + statsSvc *stats.Service // internals - server interfaces.GrpcServer - subs map[primitive.ObjectID]grpc.TaskService_SubscribeServer + subs map[primitive.ObjectID]grpc.TaskService_SubscribeServer } func (svr TaskServiceServer) Subscribe(req *grpc.TaskServiceSubscribeRequest, stream grpc.TaskService_SubscribeServer) (err error) { diff --git a/core/interfaces/controller_params.go b/core/interfaces/controller_params.go deleted file mode 100644 index 986af892..00000000 --- a/core/interfaces/controller_params.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type ControllerParams interface { - IsZero() (ok bool) - IsDefault() (ok bool) -} diff --git a/core/interfaces/data_source_service.go b/core/interfaces/data_source_service.go deleted file mode 100644 index f5309dfa..00000000 --- a/core/interfaces/data_source_service.go +++ /dev/null @@ -1,14 +0,0 @@ -package interfaces - -import ( - "go.mongodb.org/mongo-driver/bson/primitive" - "time" -) - -type DataSourceService interface { - ChangePassword(id primitive.ObjectID, password string) (err error) - Monitor() - CheckStatus(id primitive.ObjectID) (err error) - SetTimeout(duration time.Duration) - SetMonitorInterval(duration time.Duration) -} diff --git a/core/interfaces/event_data.go b/core/interfaces/event_data.go deleted file mode 100644 index 12f790ea..00000000 --- a/core/interfaces/event_data.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type EventData interface { - GetEvent() string - GetData() interface{} -} diff --git a/core/interfaces/event_service.go b/core/interfaces/event_service.go deleted file mode 100644 index 35bf0505..00000000 --- a/core/interfaces/event_service.go +++ /dev/null @@ -1,9 +0,0 @@ -package interfaces - -type EventFn func(data ...interface{}) (err error) - -type EventService interface { - Register(key, include, exclude string, ch *chan EventData) - Unregister(key string) - SendEvent(eventName string, data ...interface{}) -} diff --git a/core/interfaces/fs_service_v2.go b/core/interfaces/fs_service.go similarity index 93% rename from core/interfaces/fs_service_v2.go rename to core/interfaces/fs_service.go index 533a9d6b..5121e027 100644 --- a/core/interfaces/fs_service_v2.go +++ b/core/interfaces/fs_service.go @@ -1,6 +1,6 @@ package interfaces -type FsServiceV2 interface { +type FsService interface { List(path string) (files []FsFileInfo, err error) GetFile(path string) (data []byte, err error) GetFileInfo(path string) (file FsFileInfo, err error) diff --git a/core/interfaces/fs_service_options.go b/core/interfaces/fs_service_options.go deleted file mode 100644 index fa50f8b3..00000000 --- a/core/interfaces/fs_service_options.go +++ /dev/null @@ -1,21 +0,0 @@ -package interfaces - -type ServiceCrudOptions struct { - IsAbsolute bool // whether the path is absolute - OnlyFromWorkspace bool // whether only sync from workspace - NotSyncToWorkspace bool // whether not sync to workspace -} - -type ServiceCrudOption func(o *ServiceCrudOptions) - -func WithOnlyFromWorkspace() ServiceCrudOption { - return func(o *ServiceCrudOptions) { - o.OnlyFromWorkspace = true - } -} - -func WithNotSyncToWorkspace() ServiceCrudOption { - return func(o *ServiceCrudOptions) { - o.NotSyncToWorkspace = true - } -} diff --git a/core/interfaces/grpc_base.go b/core/interfaces/grpc_base.go deleted file mode 100644 index 4ab5f546..00000000 --- a/core/interfaces/grpc_base.go +++ /dev/null @@ -1,9 +0,0 @@ -package interfaces - -type GrpcBase interface { - WithConfigPath - Init() (err error) - Start() (err error) - Stop() (err error) - Register() (err error) -} diff --git a/core/interfaces/grpc_base_service_params.go b/core/interfaces/grpc_base_service_params.go deleted file mode 100644 index 239c1b6c..00000000 --- a/core/interfaces/grpc_base_service_params.go +++ /dev/null @@ -1,5 +0,0 @@ -package interfaces - -type GrpcBaseServiceParams interface { - Entity -} diff --git a/core/interfaces/grpc_client.go b/core/interfaces/grpc_client.go deleted file mode 100644 index 12fa2e90..00000000 --- a/core/interfaces/grpc_client.go +++ /dev/null @@ -1,30 +0,0 @@ -package interfaces - -import ( - "context" - grpc "github.com/crawlab-team/crawlab/grpc" - "time" -) - -type GrpcClient interface { - GrpcBase - WithConfigPath - GetModelDelegateClient() grpc.ModelDelegateClient - GetModelBaseServiceClient() grpc.ModelBaseServiceClient - GetNodeClient() grpc.NodeServiceClient - GetTaskClient() grpc.TaskServiceClient - GetMessageClient() grpc.MessageServiceClient - SetAddress(Address) - SetTimeout(time.Duration) - SetSubscribeType(string) - SetHandleMessage(bool) - Context() (context.Context, context.CancelFunc) - NewRequest(interface{}) *grpc.Request - GetMessageChannel() chan *grpc.StreamMessage - Restart() error - NewModelBaseServiceRequest(ModelId, GrpcBaseServiceParams) (*grpc.Request, error) - IsStarted() bool - IsClosed() bool - Err() error - GetStream() grpc.NodeService_SubscribeClient -} diff --git a/core/interfaces/grpc_client_model_base_service.go b/core/interfaces/grpc_client_model_base_service.go deleted file mode 100644 index 87b702e4..00000000 --- a/core/interfaces/grpc_client_model_base_service.go +++ /dev/null @@ -1,7 +0,0 @@ -package interfaces - -type GrpcClientModelBaseService interface { - WithModelId - WithConfigPath - ModelBaseService -} diff --git a/core/interfaces/grpc_client_model_delegate.go b/core/interfaces/grpc_client_model_delegate.go deleted file mode 100644 index b45e9e2c..00000000 --- a/core/interfaces/grpc_client_model_delegate.go +++ /dev/null @@ -1,7 +0,0 @@ -package interfaces - -type GrpcClientModelDelegate interface { - ModelDelegate - WithConfigPath - Close() error -} diff --git a/core/interfaces/grpc_client_model_environment_service.go b/core/interfaces/grpc_client_model_environment_service.go deleted file mode 100644 index d8276d69..00000000 --- a/core/interfaces/grpc_client_model_environment_service.go +++ /dev/null @@ -1,14 +0,0 @@ -package interfaces - -import ( - "github.com/crawlab-team/crawlab/db/mongo" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type GrpcClientModelEnvironmentService interface { - ModelBaseService - GetEnvironmentById(id primitive.ObjectID) (s Environment, err error) - GetEnvironment(query bson.M, opts *mongo.FindOptions) (s Environment, err error) - GetEnvironmentList(query bson.M, opts *mongo.FindOptions) (res []Environment, err error) -} diff --git a/core/interfaces/grpc_client_model_node_service.go b/core/interfaces/grpc_client_model_node_service.go deleted file mode 100644 index e3168941..00000000 --- a/core/interfaces/grpc_client_model_node_service.go +++ /dev/null @@ -1,15 +0,0 @@ -package interfaces - -import ( - "github.com/crawlab-team/crawlab/db/mongo" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type GrpcClientModelNodeService interface { - ModelBaseService - GetNodeById(id primitive.ObjectID) (n Node, err error) - GetNode(query bson.M, opts *mongo.FindOptions) (n Node, err error) - GetNodeByKey(key string) (n Node, err error) - GetNodeList(query bson.M, opts *mongo.FindOptions) (res []Node, err error) -} diff --git a/core/interfaces/grpc_client_model_service.go b/core/interfaces/grpc_client_model_service.go deleted file mode 100644 index 9e79c96f..00000000 --- a/core/interfaces/grpc_client_model_service.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type GrpcClientModelService interface { - WithConfigPath - NewBaseServiceDelegate(id ModelId) (GrpcClientModelBaseService, error) -} diff --git a/core/interfaces/grpc_client_model_spider_service.go b/core/interfaces/grpc_client_model_spider_service.go deleted file mode 100644 index de0a993c..00000000 --- a/core/interfaces/grpc_client_model_spider_service.go +++ /dev/null @@ -1,14 +0,0 @@ -package interfaces - -import ( - "github.com/crawlab-team/crawlab/db/mongo" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type GrpcClientModelSpiderService interface { - ModelBaseService - GetSpiderById(id primitive.ObjectID) (s Spider, err error) - GetSpider(query bson.M, opts *mongo.FindOptions) (s Spider, err error) - GetSpiderList(query bson.M, opts *mongo.FindOptions) (res []Spider, err error) -} diff --git a/core/interfaces/grpc_client_model_task_service.go b/core/interfaces/grpc_client_model_task_service.go deleted file mode 100644 index cabeb81c..00000000 --- a/core/interfaces/grpc_client_model_task_service.go +++ /dev/null @@ -1,14 +0,0 @@ -package interfaces - -import ( - "github.com/crawlab-team/crawlab/db/mongo" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type GrpcClientModelTaskService interface { - ModelBaseService - GetTaskById(id primitive.ObjectID) (s Task, err error) - GetTask(query bson.M, opts *mongo.FindOptions) (s Task, err error) - GetTaskList(query bson.M, opts *mongo.FindOptions) (res []Task, err error) -} diff --git a/core/interfaces/grpc_client_model_task_stat_service.go b/core/interfaces/grpc_client_model_task_stat_service.go deleted file mode 100644 index ded7ee19..00000000 --- a/core/interfaces/grpc_client_model_task_stat_service.go +++ /dev/null @@ -1,14 +0,0 @@ -package interfaces - -import ( - "github.com/crawlab-team/crawlab/db/mongo" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type GrpcClientModelTaskStatService interface { - ModelBaseService - GetTaskStatById(id primitive.ObjectID) (s TaskStat, err error) - GetTaskStat(query bson.M, opts *mongo.FindOptions) (s TaskStat, err error) - GetTaskStatList(query bson.M, opts *mongo.FindOptions) (res []TaskStat, err error) -} diff --git a/core/interfaces/grpc_client_pool.go b/core/interfaces/grpc_client_pool.go deleted file mode 100644 index 7294daf0..00000000 --- a/core/interfaces/grpc_client_pool.go +++ /dev/null @@ -1,9 +0,0 @@ -package interfaces - -type GrpcClientPool interface { - WithConfigPath - Init() error - NewClient() error - GetClient() (GrpcClient, error) - SetSize(int) -} diff --git a/core/interfaces/grpc_model_base_service_message.go b/core/interfaces/grpc_model_base_service_message.go deleted file mode 100644 index 3df2d296..00000000 --- a/core/interfaces/grpc_model_base_service_message.go +++ /dev/null @@ -1,7 +0,0 @@ -package interfaces - -type GrpcModelBaseServiceMessage interface { - GetModelId() ModelId - GetData() []byte - ToBytes() (data []byte) -} diff --git a/core/interfaces/grpc_model_binder.go b/core/interfaces/grpc_model_binder.go deleted file mode 100644 index 695b370b..00000000 --- a/core/interfaces/grpc_model_binder.go +++ /dev/null @@ -1,5 +0,0 @@ -package interfaces - -type GrpcModelBinder interface { - ModelBinder -} diff --git a/core/interfaces/grpc_model_delegate_message.go b/core/interfaces/grpc_model_delegate_message.go deleted file mode 100644 index dd24e129..00000000 --- a/core/interfaces/grpc_model_delegate_message.go +++ /dev/null @@ -1,8 +0,0 @@ -package interfaces - -type GrpcModelDelegateMessage interface { - GetModelId() ModelId - GetMethod() ModelDelegateMethod - GetData() []byte - ToBytes() (data []byte) -} diff --git a/core/interfaces/grpc_model_list_binder.go b/core/interfaces/grpc_model_list_binder.go deleted file mode 100644 index ed96892d..00000000 --- a/core/interfaces/grpc_model_list_binder.go +++ /dev/null @@ -1,5 +0,0 @@ -package interfaces - -type GrpcModelListBinder interface { - ModelListBinder -} diff --git a/core/interfaces/grpc_server.go b/core/interfaces/grpc_server.go deleted file mode 100644 index 440a7b3c..00000000 --- a/core/interfaces/grpc_server.go +++ /dev/null @@ -1,16 +0,0 @@ -package interfaces - -import ( - grpc "github.com/crawlab-team/crawlab/grpc" -) - -type GrpcServer interface { - GrpcBase - SetAddress(Address) - GetSubscribe(key string) (sub GrpcSubscribe, err error) - SetSubscribe(key string, sub GrpcSubscribe) - DeleteSubscribe(key string) - SendStreamMessage(key string, code grpc.StreamMessageCode) (err error) - SendStreamMessageWithData(nodeKey string, code grpc.StreamMessageCode, d interface{}) (err error) - IsStopped() (res bool) -} diff --git a/core/interfaces/grpc_stream.go b/core/interfaces/grpc_stream.go deleted file mode 100644 index 3b87e726..00000000 --- a/core/interfaces/grpc_stream.go +++ /dev/null @@ -1,12 +0,0 @@ -package interfaces - -import grpc "github.com/crawlab-team/crawlab/grpc" - -type GrpcStream interface { - Send(msg *grpc.StreamMessage) (err error) -} - -type GrpcStreamBidirectional interface { - GrpcStream - Recv() (msg *grpc.StreamMessage, err error) -} diff --git a/core/interfaces/grpc_subscribe.go b/core/interfaces/grpc_subscribe.go deleted file mode 100644 index fcbb9277..00000000 --- a/core/interfaces/grpc_subscribe.go +++ /dev/null @@ -1,7 +0,0 @@ -package interfaces - -type GrpcSubscribe interface { - GetStream() GrpcStream - GetStreamBidirectional() GrpcStreamBidirectional - GetFinished() chan bool -} diff --git a/core/interfaces/i18n_service.go b/core/interfaces/i18n_service.go deleted file mode 100644 index 9436092a..00000000 --- a/core/interfaces/i18n_service.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type I18nService interface { - AddTranslations(t []Translation) - GetTranslations() (t []Translation) -} diff --git a/core/interfaces/injectable.go b/core/interfaces/injectable.go deleted file mode 100644 index 4ec79ad5..00000000 --- a/core/interfaces/injectable.go +++ /dev/null @@ -1,5 +0,0 @@ -package interfaces - -type Injectable interface { - Inject() error -} diff --git a/core/interfaces/list.go b/core/interfaces/list.go deleted file mode 100644 index 7c5c08aa..00000000 --- a/core/interfaces/list.go +++ /dev/null @@ -1,5 +0,0 @@ -package interfaces - -type List interface { - GetModels() (res []Model) -} diff --git a/core/interfaces/model.go b/core/interfaces/model.go index bcbc6f7b..ed7ec822 100644 --- a/core/interfaces/model.go +++ b/core/interfaces/model.go @@ -7,92 +7,6 @@ import ( type Model interface { GetId() (id primitive.ObjectID) SetId(id primitive.ObjectID) -} - -type ModelV2 interface { - GetId() (id primitive.ObjectID) - SetId(id primitive.ObjectID) SetCreated(by primitive.ObjectID) SetUpdated(by primitive.ObjectID) } - -type ModelId int - -const ( - ModelIdArtifact = iota - ModelIdTag - ModelIdNode - ModelIdProject - ModelIdSpider - ModelIdTask - ModelIdJob - ModelIdSchedule - ModelIdUser - ModelIdSetting - ModelIdToken - ModelIdVariable - ModelIdTaskQueue - ModelIdTaskStat - ModelIdSpiderStat - ModelIdDataSource - ModelIdDataCollection - ModelIdResult - ModelIdPassword - ModelIdExtraValue - ModelIdGit - ModelIdRole - ModelIdUserRole - ModelIdPermission - ModelIdRolePermission - ModelIdEnvironment - ModelIdDependencySetting -) - -const ( - ModelColNameArtifact = "artifacts" - ModelColNameTag = "tags" - ModelColNameNode = "nodes" - ModelColNameProject = "projects" - ModelColNameSpider = "spiders" - ModelColNameTask = "tasks" - ModelColNameJob = "jobs" - ModelColNameSchedule = "schedules" - ModelColNameUser = "users" - ModelColNameSetting = "settings" - ModelColNameToken = "tokens" - ModelColNameVariable = "variables" - ModelColNameTaskQueue = "task_queue" - ModelColNameTaskStat = "task_stats" - ModelColNameSpiderStat = "spider_stats" - ModelColNameDataSource = "data_sources" - ModelColNameDataCollection = "data_collections" - ModelColNamePasswords = "passwords" - ModelColNameExtraValues = "extra_values" - ModelColNameGit = "gits" - ModelColNameRole = "roles" - ModelColNameUserRole = "user_roles" - ModelColNamePermission = "permissions" - ModelColNameRolePermission = "role_permissions" - ModelColNameEnvironment = "environments" - ModelColNameDependencySetting = "dependency_settings" -) - -type ModelWithTags interface { - Model - SetTags(tags []Tag) - GetTags() (tags []Tag) -} - -type ModelWithNameDescription interface { - Model - GetName() (name string) - SetName(name string) - GetDescription() (description string) - SetDescription(description string) -} - -type ModelWithKey interface { - Model - GetKey() (key string) - SetKey(key string) -} diff --git a/core/interfaces/model_artifact.go b/core/interfaces/model_artifact.go deleted file mode 100644 index 297c0853..00000000 --- a/core/interfaces/model_artifact.go +++ /dev/null @@ -1,12 +0,0 @@ -package interfaces - -import "go.mongodb.org/mongo-driver/bson/primitive" - -type ModelArtifact interface { - Model - GetSys() (sys ModelArtifactSys) - GetTagIds() (ids []primitive.ObjectID) - SetTagIds(ids []primitive.ObjectID) - SetObj(obj Model) - SetDel(del bool) -} diff --git a/core/interfaces/model_artifact_sys.go b/core/interfaces/model_artifact_sys.go deleted file mode 100644 index 3c2e1011..00000000 --- a/core/interfaces/model_artifact_sys.go +++ /dev/null @@ -1,21 +0,0 @@ -package interfaces - -import ( - "go.mongodb.org/mongo-driver/bson/primitive" - "time" -) - -type ModelArtifactSys interface { - GetCreateTs() time.Time - SetCreateTs(ts time.Time) - GetUpdateTs() time.Time - SetUpdateTs(ts time.Time) - GetDeleteTs() time.Time - SetDeleteTs(ts time.Time) - GetCreateUid() primitive.ObjectID - SetCreateUid(id primitive.ObjectID) - GetUpdateUid() primitive.ObjectID - SetUpdateUid(id primitive.ObjectID) - GetDeleteUid() primitive.ObjectID - SetDeleteUid(id primitive.ObjectID) -} diff --git a/core/interfaces/model_base_service.go b/core/interfaces/model_base_service.go deleted file mode 100644 index f3062b5e..00000000 --- a/core/interfaces/model_base_service.go +++ /dev/null @@ -1,28 +0,0 @@ -package interfaces - -import ( - "github.com/crawlab-team/crawlab/db/mongo" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type ModelBaseService interface { - GetModelId() (id ModelId) - SetModelId(id ModelId) - GetById(id primitive.ObjectID) (res Model, err error) - Get(query bson.M, opts *mongo.FindOptions) (res Model, err error) - GetList(query bson.M, opts *mongo.FindOptions) (res List, err error) - DeleteById(id primitive.ObjectID, args ...interface{}) (err error) - Delete(query bson.M, args ...interface{}) (err error) - DeleteList(query bson.M, args ...interface{}) (err error) - ForceDeleteList(query bson.M, args ...interface{}) (err error) - UpdateById(id primitive.ObjectID, update bson.M, args ...interface{}) (err error) - Update(query bson.M, update bson.M, fields []string, args ...interface{}) (err error) - UpdateDoc(query bson.M, doc Model, fields []string, args ...interface{}) (err error) - Insert(u User, docs ...interface{}) (err error) - Count(query bson.M) (total int, err error) -} - -type ModelService interface { - GetBaseService(id ModelId) (svc ModelBaseService) -} diff --git a/core/interfaces/model_binder.go b/core/interfaces/model_binder.go deleted file mode 100644 index 1fda6672..00000000 --- a/core/interfaces/model_binder.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type ModelBinder interface { - Bind() (res Model, err error) - Process(d Model) (res Model, err error) -} diff --git a/core/interfaces/model_delegate.go b/core/interfaces/model_delegate.go deleted file mode 100644 index 5224d579..00000000 --- a/core/interfaces/model_delegate.go +++ /dev/null @@ -1,22 +0,0 @@ -package interfaces - -type ModelDelegateMethod string - -type ModelDelegate interface { - Add() error - Save() error - Delete() error - GetArtifact() (ModelArtifact, error) - GetModel() Model - Refresh() error - ToBytes(interface{}) ([]byte, error) -} - -const ( - ModelDelegateMethodAdd = "add" - ModelDelegateMethodSave = "save" - ModelDelegateMethodDelete = "delete" - ModelDelegateMethodGetArtifact = "get-artifact" - ModelDelegateMethodRefresh = "refresh" - ModelDelegateMethodChange = "change" -) diff --git a/core/interfaces/model_environment.go b/core/interfaces/model_environment.go deleted file mode 100644 index 121d7d0d..00000000 --- a/core/interfaces/model_environment.go +++ /dev/null @@ -1,9 +0,0 @@ -package interfaces - -type Environment interface { - Model - GetKey() (key string) - SetKey(key string) - GetValue() (value string) - SetValue(value string) -} diff --git a/core/interfaces/model_extra_value.go b/core/interfaces/model_extra_value.go deleted file mode 100644 index a3889976..00000000 --- a/core/interfaces/model_extra_value.go +++ /dev/null @@ -1,17 +0,0 @@ -package interfaces - -import ( - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type ExtraValue interface { - Model - GetValue() (v interface{}) - SetValue(v interface{}) - GetObjectId() (oid primitive.ObjectID) - SetObjectId(oid primitive.ObjectID) - GetModel() (m string) - SetModel(m string) - GetType() (t string) - SetType(t string) -} diff --git a/core/interfaces/model_git.go b/core/interfaces/model_git.go deleted file mode 100644 index e24eb06c..00000000 --- a/core/interfaces/model_git.go +++ /dev/null @@ -1,18 +0,0 @@ -package interfaces - -// Git interface -type Git interface { - Model - GetUrl() (url string) - SetUrl(url string) - GetAuthType() (authType string) - SetAuthType(authType string) - GetUsername() (username string) - SetUsername(username string) - GetPassword() (password string) - SetPassword(password string) - GetCurrentBranch() (currentBranch string) - SetCurrentBranch(currentBranch string) - GetAutoPull() (autoPull bool) - SetAutoPull(autoPull bool) -} diff --git a/core/interfaces/model_list_binder.go b/core/interfaces/model_list_binder.go deleted file mode 100644 index d05ba94c..00000000 --- a/core/interfaces/model_list_binder.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type ModelListBinder interface { - Bind() (l List, err error) - Process(d interface{}) (l List, err error) -} diff --git a/core/interfaces/model_node.go b/core/interfaces/model_node.go deleted file mode 100644 index bf904660..00000000 --- a/core/interfaces/model_node.go +++ /dev/null @@ -1,22 +0,0 @@ -package interfaces - -import "time" - -type Node interface { - ModelWithNameDescription - GetKey() (key string) - GetIsMaster() (ok bool) - GetActive() (active bool) - SetActive(active bool) - SetActiveTs(activeTs time.Time) - GetStatus() (status string) - SetStatus(status string) - GetEnabled() (enabled bool) - SetEnabled(enabled bool) - GetAvailableRunners() (runners int) - SetAvailableRunners(runners int) - GetMaxRunners() (runners int) - SetMaxRunners(runners int) - IncrementAvailableRunners() - DecrementAvailableRunners() -} diff --git a/core/interfaces/model_node_delegate.go b/core/interfaces/model_node_delegate.go deleted file mode 100644 index 11272fbb..00000000 --- a/core/interfaces/model_node_delegate.go +++ /dev/null @@ -1,10 +0,0 @@ -package interfaces - -import "time" - -type ModelNodeDelegate interface { - ModelDelegate - UpdateStatus(active bool, activeTs *time.Time, status string) (err error) - UpdateStatusOnline() (err error) - UpdateStatusOffline() (err error) -} diff --git a/core/interfaces/model_permission.go b/core/interfaces/model_permission.go deleted file mode 100644 index 99b4e746..00000000 --- a/core/interfaces/model_permission.go +++ /dev/null @@ -1,14 +0,0 @@ -package interfaces - -type Permission interface { - ModelWithKey - ModelWithNameDescription - GetType() (t string) - SetType(t string) - GetTarget() (target []string) - SetTarget(target []string) - GetAllow() (allow []string) - SetAllow(allow []string) - GetDeny() (deny []string) - SetDeny(deny []string) -} diff --git a/core/interfaces/model_result.go b/core/interfaces/model_result.go deleted file mode 100644 index 91e349e3..00000000 --- a/core/interfaces/model_result.go +++ /dev/null @@ -1,11 +0,0 @@ -package interfaces - -import "go.mongodb.org/mongo-driver/bson/primitive" - -type Result interface { - Value() map[string]interface{} - SetValue(key string, value interface{}) - GetValue(key string) (value interface{}) - GetTaskId() (id primitive.ObjectID) - SetTaskId(id primitive.ObjectID) -} diff --git a/core/interfaces/model_role.go b/core/interfaces/model_role.go deleted file mode 100644 index efcc085e..00000000 --- a/core/interfaces/model_role.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type Role interface { - ModelWithKey - ModelWithNameDescription -} diff --git a/core/interfaces/model_schedule.go b/core/interfaces/model_schedule.go deleted file mode 100644 index ec805472..00000000 --- a/core/interfaces/model_schedule.go +++ /dev/null @@ -1,28 +0,0 @@ -package interfaces - -import ( - "github.com/robfig/cron/v3" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type Schedule interface { - Model - GetEnabled() (enabled bool) - SetEnabled(enabled bool) - GetEntryId() (id cron.EntryID) - SetEntryId(id cron.EntryID) - GetCron() (c string) - SetCron(c string) - GetSpiderId() (id primitive.ObjectID) - SetSpiderId(id primitive.ObjectID) - GetMode() (mode string) - SetMode(mode string) - GetNodeIds() (ids []primitive.ObjectID) - SetNodeIds(ids []primitive.ObjectID) - GetCmd() (cmd string) - SetCmd(cmd string) - GetParam() (param string) - SetParam(param string) - GetPriority() (p int) - SetPriority(p int) -} diff --git a/core/interfaces/model_service_v2.go b/core/interfaces/model_service_v2.go deleted file mode 100644 index 723de953..00000000 --- a/core/interfaces/model_service_v2.go +++ /dev/null @@ -1,25 +0,0 @@ -package interfaces - -import ( - "github.com/crawlab-team/crawlab/db/mongo" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type ModelServiceV2[T any] interface { - GetById(id primitive.ObjectID) (model *T, err error) - Get(query bson.M, options *mongo.FindOptions) (model *T, err error) - GetList(query bson.M, options *mongo.FindOptions) (models []T, err error) - DeleteById(id primitive.ObjectID) (err error) - Delete(query bson.M) (err error) - DeleteList(query bson.M) (err error) - UpdateById(id primitive.ObjectID, update bson.M) (err error) - UpdateOne(query bson.M, update bson.M) (err error) - UpdateMany(query bson.M, update bson.M) (err error) - ReplaceById(id primitive.ObjectID, model T) (err error) - Replace(query bson.M, model T) (err error) - InsertOne(model T) (id primitive.ObjectID, err error) - InsertMany(models []T) (ids []primitive.ObjectID, err error) - Count(query bson.M) (total int, err error) - GetCol() (col *mongo.Col) -} diff --git a/core/interfaces/model_spider.go b/core/interfaces/model_spider.go deleted file mode 100644 index c6cfe416..00000000 --- a/core/interfaces/model_spider.go +++ /dev/null @@ -1,24 +0,0 @@ -package interfaces - -import "go.mongodb.org/mongo-driver/bson/primitive" - -type Spider interface { - ModelWithNameDescription - GetType() (ty string) - GetMode() (mode string) - SetMode(mode string) - GetNodeIds() (ids []primitive.ObjectID) - SetNodeIds(ids []primitive.ObjectID) - GetCmd() (cmd string) - SetCmd(cmd string) - GetParam() (param string) - SetParam(param string) - GetPriority() (p int) - SetPriority(p int) - GetColId() (id primitive.ObjectID) - SetColId(id primitive.ObjectID) - GetIncrementalSync() (incrementalSync bool) - SetIncrementalSync(incrementalSync bool) - GetAutoInstall() (autoInstall bool) - SetAutoInstall(autoInstall bool) -} diff --git a/core/interfaces/model_tag.go b/core/interfaces/model_tag.go deleted file mode 100644 index 5afd89ba..00000000 --- a/core/interfaces/model_tag.go +++ /dev/null @@ -1,8 +0,0 @@ -package interfaces - -type Tag interface { - Model - GetName() string - GetColor() string - SetCol(string) -} diff --git a/core/interfaces/model_task.go b/core/interfaces/model_task.go deleted file mode 100644 index 7eb07a79..00000000 --- a/core/interfaces/model_task.go +++ /dev/null @@ -1,23 +0,0 @@ -package interfaces - -import "go.mongodb.org/mongo-driver/bson/primitive" - -type Task interface { - Model - GetNodeId() (id primitive.ObjectID) - SetNodeId(id primitive.ObjectID) - GetNodeIds() (ids []primitive.ObjectID) - GetStatus() (status string) - SetStatus(status string) - GetError() (error string) - SetError(error string) - GetPid() (pid int) - SetPid(pid int) - GetSpiderId() (id primitive.ObjectID) - GetType() (ty string) - GetCmd() (cmd string) - GetParam() (param string) - GetPriority() (p int) - GetUserId() (id primitive.ObjectID) - SetUserId(id primitive.ObjectID) -} diff --git a/core/interfaces/model_task_stat.go b/core/interfaces/model_task_stat.go deleted file mode 100644 index 7ddce156..00000000 --- a/core/interfaces/model_task_stat.go +++ /dev/null @@ -1,23 +0,0 @@ -package interfaces - -import "time" - -type TaskStat interface { - Model - GetCreateTs() (ts time.Time) - SetCreateTs(ts time.Time) - GetStartTs() (ts time.Time) - SetStartTs(ts time.Time) - GetEndTs() (ts time.Time) - SetEndTs(ts time.Time) - GetWaitDuration() (d int64) - SetWaitDuration(d int64) - GetRuntimeDuration() (d int64) - SetRuntimeDuration(d int64) - GetTotalDuration() (d int64) - SetTotalDuration(d int64) - GetResultCount() (c int64) - SetResultCount(c int64) - GetErrorLogCount() (c int64) - SetErrorLogCount(c int64) -} diff --git a/core/interfaces/model_user.go b/core/interfaces/model_user.go deleted file mode 100644 index d8f86c7c..00000000 --- a/core/interfaces/model_user.go +++ /dev/null @@ -1,9 +0,0 @@ -package interfaces - -type User interface { - Model - GetUsername() (name string) - GetPassword() (p string) - GetRole() (r string) - GetEmail() (email string) -} diff --git a/core/interfaces/model_user_group.go b/core/interfaces/model_user_group.go deleted file mode 100644 index f3930a1e..00000000 --- a/core/interfaces/model_user_group.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type UserGroup interface { - Model - GetUsers() (users []User, err error) -} diff --git a/core/interfaces/node_master_service.go b/core/interfaces/node_master_service.go deleted file mode 100644 index 8c9ab572..00000000 --- a/core/interfaces/node_master_service.go +++ /dev/null @@ -1,12 +0,0 @@ -package interfaces - -import ( - "time" -) - -type NodeMasterService interface { - NodeService - Monitor() - SetMonitorInterval(duration time.Duration) - Register() error -} diff --git a/core/interfaces/node_service_option.go b/core/interfaces/node_service_option.go deleted file mode 100644 index 0402b349..00000000 --- a/core/interfaces/node_service_option.go +++ /dev/null @@ -1,4 +0,0 @@ -package interfaces - -type NodeServiceOption interface { -} diff --git a/core/interfaces/node_worker_service.go b/core/interfaces/node_worker_service.go deleted file mode 100644 index 15f136e8..00000000 --- a/core/interfaces/node_worker_service.go +++ /dev/null @@ -1,10 +0,0 @@ -package interfaces - -import "time" - -type NodeWorkerService interface { - NodeService - Register() - ReportStatus() - SetHeartbeatInterval(duration time.Duration) -} diff --git a/core/interfaces/options.go b/core/interfaces/options.go deleted file mode 100644 index 08badf2f..00000000 --- a/core/interfaces/options.go +++ /dev/null @@ -1 +0,0 @@ -package interfaces diff --git a/core/interfaces/process_daemon.go b/core/interfaces/process_daemon.go deleted file mode 100644 index aea65df7..00000000 --- a/core/interfaces/process_daemon.go +++ /dev/null @@ -1,17 +0,0 @@ -package interfaces - -import ( - "os/exec" - "time" -) - -type ProcessDaemon interface { - Start() (err error) - Stop() - GetMaxErrors() (maxErrors int) - SetMaxErrors(maxErrors int) - GetExitTimeout() (timeout time.Duration) - SetExitTimeout(timeout time.Duration) - GetCmd() (cmd *exec.Cmd) - GetCh() (ch chan int) -} diff --git a/core/interfaces/provide.go b/core/interfaces/provide.go deleted file mode 100644 index e8392948..00000000 --- a/core/interfaces/provide.go +++ /dev/null @@ -1,3 +0,0 @@ -package interfaces - -type Provide func(env string) diff --git a/core/interfaces/result_service_mongo.go b/core/interfaces/result_service_mongo.go deleted file mode 100644 index dcca1c5b..00000000 --- a/core/interfaces/result_service_mongo.go +++ /dev/null @@ -1,15 +0,0 @@ -package interfaces - -import ( - "github.com/crawlab-team/crawlab/db/mongo" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type ResultServiceMongo interface { - GetId() (id primitive.ObjectID) - SetId(id primitive.ObjectID) - List(query bson.M, opts *mongo.FindOptions) (results []Result, err error) - Count(query bson.M) (total int, err error) - Insert(docs ...interface{}) (err error) -} diff --git a/core/interfaces/schedule_service.go b/core/interfaces/schedule_service.go deleted file mode 100644 index 9e535dba..00000000 --- a/core/interfaces/schedule_service.go +++ /dev/null @@ -1,23 +0,0 @@ -package interfaces - -import ( - "github.com/robfig/cron/v3" - "time" -) - -type ScheduleService interface { - WithConfigPath - Module - GetLocation() (loc *time.Location) - SetLocation(loc *time.Location) - GetDelay() (delay bool) - SetDelay(delay bool) - GetSkip() (skip bool) - SetSkip(skip bool) - GetUpdateInterval() (interval time.Duration) - SetUpdateInterval(interval time.Duration) - Enable(s Schedule, args ...interface{}) (err error) - Disable(s Schedule, args ...interface{}) (err error) - Update() - GetCron() (c *cron.Cron) -} diff --git a/core/interfaces/spider_admin_service.go b/core/interfaces/spider_admin_service.go deleted file mode 100644 index d410e5cb..00000000 --- a/core/interfaces/spider_admin_service.go +++ /dev/null @@ -1,22 +0,0 @@ -package interfaces - -import ( - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type SpiderAdminService interface { - WithConfigPath - Start() (err error) - // Schedule a new task of the spider - Schedule(id primitive.ObjectID, opts *SpiderRunOptions) (taskIds []primitive.ObjectID, err error) - // Clone the spider - Clone(id primitive.ObjectID, opts *SpiderCloneOptions) (err error) - // Delete the spider - Delete(id primitive.ObjectID) (err error) - // SyncGit syncs all git repositories - SyncGit() (err error) - // SyncGitOne syncs one git repository - SyncGitOne(g Git) (err error) - // Export exports the spider and return zip file path - Export(id primitive.ObjectID) (filePath string, err error) -} diff --git a/core/interfaces/task_base_service.go b/core/interfaces/task_base_service.go deleted file mode 100644 index 95161316..00000000 --- a/core/interfaces/task_base_service.go +++ /dev/null @@ -1,11 +0,0 @@ -package interfaces - -import "go.mongodb.org/mongo-driver/bson/primitive" - -type TaskBaseService interface { - WithConfigPath - Module - SaveTask(t Task, status string) (err error) - IsStopped() (res bool) - GetQueue(nodeId primitive.ObjectID) (queue string) -} diff --git a/core/interfaces/task_handler_service.go b/core/interfaces/task_handler_service.go deleted file mode 100644 index 382764ce..00000000 --- a/core/interfaces/task_handler_service.go +++ /dev/null @@ -1,60 +0,0 @@ -package interfaces - -import ( - "go.mongodb.org/mongo-driver/bson/primitive" - "time" -) - -type TaskHandlerService interface { - TaskBaseService - // Run task and execute locally - Run(taskId primitive.ObjectID) (err error) - // Cancel task locally - Cancel(taskId primitive.ObjectID) (err error) - // Fetch tasks and run - Fetch() - // ReportStatus periodically report handler status to master - ReportStatus() - // Reset reset internals to default - Reset() - // IsSyncLocked whether the given task is locked for files sync - IsSyncLocked(path string) (ok bool) - // LockSync lock files sync for given task - LockSync(path string) - // UnlockSync unlock files sync for given task - UnlockSync(path string) - // GetExitWatchDuration get max runners - GetExitWatchDuration() (duration time.Duration) - // SetExitWatchDuration set max runners - SetExitWatchDuration(duration time.Duration) - // GetFetchInterval get report interval - GetFetchInterval() (interval time.Duration) - // SetFetchInterval set report interval - SetFetchInterval(interval time.Duration) - // GetReportInterval get report interval - GetReportInterval() (interval time.Duration) - // SetReportInterval set report interval - SetReportInterval(interval time.Duration) - // GetCancelTimeout get report interval - GetCancelTimeout() (timeout time.Duration) - // SetCancelTimeout set report interval - SetCancelTimeout(timeout time.Duration) - // GetModelService get model service - GetModelService() (modelSvc GrpcClientModelService) - // GetModelSpiderService get model spider service - GetModelSpiderService() (modelSpiderSvc GrpcClientModelSpiderService) - // GetModelTaskService get model task service - GetModelTaskService() (modelTaskSvc GrpcClientModelTaskService) - // GetModelTaskStatService get model task stat service - GetModelTaskStatService() (modelTaskStatSvc GrpcClientModelTaskStatService) - // GetModelEnvironmentService get model environment service - GetModelEnvironmentService() (modelEnvironmentSvc GrpcClientModelEnvironmentService) - // GetNodeConfigService get node config service - GetNodeConfigService() (cfgSvc NodeConfigService) - // GetCurrentNode get node of the handler - GetCurrentNode() (n Node, err error) - // GetTaskById get task by id - GetTaskById(id primitive.ObjectID) (t Task, err error) - // GetSpiderById get task by id - GetSpiderById(id primitive.ObjectID) (t Spider, err error) -} diff --git a/core/interfaces/task_hook_service.go b/core/interfaces/task_hook_service.go deleted file mode 100644 index f7c18082..00000000 --- a/core/interfaces/task_hook_service.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type TaskHookService interface { - PreActions(Task, Spider, FsServiceV2, TaskHandlerService) (err error) - PostActions(Task, Spider, FsServiceV2, TaskHandlerService) (err error) -} diff --git a/core/interfaces/task_scheduler_service.go b/core/interfaces/task_scheduler_service.go deleted file mode 100644 index b287440f..00000000 --- a/core/interfaces/task_scheduler_service.go +++ /dev/null @@ -1,16 +0,0 @@ -package interfaces - -import ( - "go.mongodb.org/mongo-driver/bson/primitive" - "time" -) - -type TaskSchedulerService interface { - TaskBaseService - // Enqueue task into the task queue - Enqueue(t Task) (t2 Task, err error) - // Cancel task to corresponding node - Cancel(id primitive.ObjectID, args ...interface{}) (err error) - // SetInterval set the interval or duration between two adjacent fetches - SetInterval(interval time.Duration) -} diff --git a/core/interfaces/task_stats_service.go b/core/interfaces/task_stats_service.go deleted file mode 100644 index 9d1604ac..00000000 --- a/core/interfaces/task_stats_service.go +++ /dev/null @@ -1,9 +0,0 @@ -package interfaces - -import "go.mongodb.org/mongo-driver/bson/primitive" - -type TaskStatsService interface { - TaskBaseService - InsertData(id primitive.ObjectID, records ...interface{}) (err error) - InsertLogs(id primitive.ObjectID, logs ...string) (err error) -} diff --git a/core/interfaces/test.go b/core/interfaces/test.go deleted file mode 100644 index bc107b42..00000000 --- a/core/interfaces/test.go +++ /dev/null @@ -1,8 +0,0 @@ -package interfaces - -import "testing" - -type Test interface { - Setup(*testing.T) - Cleanup() -} diff --git a/core/interfaces/translation.go b/core/interfaces/translation.go deleted file mode 100644 index c5c4c6b6..00000000 --- a/core/interfaces/translation.go +++ /dev/null @@ -1,5 +0,0 @@ -package interfaces - -type Translation interface { - GetLang() (l string) -} diff --git a/core/interfaces/user_service.go b/core/interfaces/user_service.go deleted file mode 100644 index d0495234..00000000 --- a/core/interfaces/user_service.go +++ /dev/null @@ -1,19 +0,0 @@ -package interfaces - -import ( - "github.com/gin-gonic/gin" - "github.com/golang-jwt/jwt/v5" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -type UserService interface { - Init() (err error) - SetJwtSecret(secret string) - SetJwtSigningMethod(method jwt.SigningMethod) - Create(opts *UserCreateOptions, args ...interface{}) (err error) - Login(opts *UserLoginOptions) (token string, u User, err error) - CheckToken(token string) (u User, err error) - ChangePassword(id primitive.ObjectID, password string, args ...interface{}) (err error) - MakeToken(user User) (tokenStr string, err error) - GetCurrentUser(c *gin.Context) (u User, err error) -} diff --git a/core/interfaces/user_service_options.go b/core/interfaces/user_service_options.go deleted file mode 100644 index abf29774..00000000 --- a/core/interfaces/user_service_options.go +++ /dev/null @@ -1,13 +0,0 @@ -package interfaces - -type UserCreateOptions struct { - Username string - Password string - Email string - Role string -} - -type UserLoginOptions struct { - Username string - Password string -} diff --git a/core/interfaces/with_address.go b/core/interfaces/with_address.go deleted file mode 100644 index 805eff18..00000000 --- a/core/interfaces/with_address.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type WithAddress interface { - GetAddress() (address Address) - SetAddress(address Address) -} diff --git a/core/interfaces/with_model_id.go b/core/interfaces/with_model_id.go deleted file mode 100644 index f2f59f8c..00000000 --- a/core/interfaces/with_model_id.go +++ /dev/null @@ -1,6 +0,0 @@ -package interfaces - -type WithModelId interface { - GetModelId() (id ModelId) - SetModelId(id ModelId) -} diff --git a/core/models/client/model_service_v2.go b/core/models/client/model_service_v2.go index 2077e3e9..4d5d236e 100644 --- a/core/models/client/model_service_v2.go +++ b/core/models/client/model_service_v2.go @@ -21,7 +21,7 @@ var ( type ModelServiceV2[T any] struct { cfg interfaces.NodeConfigService - c *client.GrpcClientV2 + c *client.GrpcClient modelType string } @@ -336,7 +336,7 @@ func NewModelServiceV2[T any]() *ModelServiceV2[T] { var instance *ModelServiceV2[T] - c := client.GetGrpcClientV2() + c := client.GetGrpcClient() if !c.IsStarted() { err := c.Start() if err != nil { diff --git a/core/models/client/model_service_v2_test.go b/core/models/client/model_service_v2_test.go index f8a4b2f2..85469653 100644 --- a/core/models/client/model_service_v2_test.go +++ b/core/models/client/model_service_v2_test.go @@ -46,7 +46,7 @@ func stopSvr(svr *server.GrpcServer) { func TestModelServiceV2_GetById(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -74,7 +74,7 @@ func TestModelServiceV2_GetById(t *testing.T) { func TestModelServiceV2_GetOne(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -102,7 +102,7 @@ func TestModelServiceV2_GetOne(t *testing.T) { func TestModelServiceV2_GetMany(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -131,7 +131,7 @@ func TestModelServiceV2_GetMany(t *testing.T) { func TestModelServiceV2_DeleteById(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -161,7 +161,7 @@ func TestModelServiceV2_DeleteById(t *testing.T) { func TestModelServiceV2_DeleteOne(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -191,7 +191,7 @@ func TestModelServiceV2_DeleteOne(t *testing.T) { func TestModelServiceV2_DeleteMany(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -221,7 +221,7 @@ func TestModelServiceV2_DeleteMany(t *testing.T) { func TestModelServiceV2_UpdateById(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -251,7 +251,7 @@ func TestModelServiceV2_UpdateById(t *testing.T) { func TestModelServiceV2_UpdateOne(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -281,7 +281,7 @@ func TestModelServiceV2_UpdateOne(t *testing.T) { func TestModelServiceV2_UpdateMany(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -315,7 +315,7 @@ func TestModelServiceV2_UpdateMany(t *testing.T) { func TestModelServiceV2_ReplaceById(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -346,7 +346,7 @@ func TestModelServiceV2_ReplaceById(t *testing.T) { func TestModelServiceV2_ReplaceOne(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -377,7 +377,7 @@ func TestModelServiceV2_ReplaceOne(t *testing.T) { func TestModelServiceV2_InsertOne(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -401,7 +401,7 @@ func TestModelServiceV2_InsertOne(t *testing.T) { func TestModelServiceV2_InsertMany(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) @@ -428,7 +428,7 @@ func TestModelServiceV2_InsertMany(t *testing.T) { func TestModelServiceV2_Count(t *testing.T) { setupTestDB() defer teardownTestDB() - svr, err := server.NewGrpcServerV2() + svr, err := server.NewGrpcServer() require.Nil(t, err) go startSvr(svr) defer stopSvr(svr) diff --git a/core/node/service/master_service.go b/core/node/service/master_service.go index b444ab3f..fa6736a5 100644 --- a/core/node/service/master_service.go +++ b/core/node/service/master_service.go @@ -32,7 +32,7 @@ type MasterService struct { cfgSvc interfaces.NodeConfigService server *server.GrpcServer schedulerSvc *scheduler.ServiceV2 - handlerSvc *handler.ServiceV2 + handlerSvc *handler.Service scheduleSvc *schedule.ServiceV2 systemSvc *system.ServiceV2 @@ -345,7 +345,7 @@ func newMasterServiceV2() (res *MasterService, err error) { } // handler service - svc.handlerSvc, err = handler.GetTaskHandlerServiceV2() + svc.handlerSvc, err = handler.GetTaskHandlerService() if err != nil { return nil, err } diff --git a/core/node/service/worker_service.go b/core/node/service/worker_service.go index 6b674aa2..3d26c892 100644 --- a/core/node/service/worker_service.go +++ b/core/node/service/worker_service.go @@ -23,8 +23,8 @@ import ( type WorkerService struct { // dependencies cfgSvc interfaces.NodeConfigService - client *client.GrpcClientV2 - handlerSvc *handler.ServiceV2 + client *client.GrpcClient + handlerSvc *handler.Service // settings cfgPath string @@ -188,20 +188,14 @@ func newWorkerService() (res *WorkerService, err error) { heartbeatInterval: 15 * time.Second, } - // dependency options - var clientOpts []client.Option - if svc.address != nil { - clientOpts = append(clientOpts, client.WithAddress(svc.address)) - } - // node config service svc.cfgSvc = nodeconfig.GetNodeConfigService() // grpc client - svc.client = client.GetGrpcClientV2() + svc.client = client.GetGrpcClient() // handler service - svc.handlerSvc, err = handler.GetTaskHandlerServiceV2() + svc.handlerSvc, err = handler.GetTaskHandlerService() if err != nil { return nil, err } diff --git a/core/task/handler/runner_v2.go b/core/task/handler/runner_v2.go index d273ad81..09f74d90 100644 --- a/core/task/handler/runner_v2.go +++ b/core/task/handler/runner_v2.go @@ -35,8 +35,8 @@ import ( type RunnerV2 struct { // dependencies - svc *ServiceV2 // task handler service - fsSvc interfaces.FsServiceV2 // task fs service + svc *Service // task handler service + fsSvc interfaces.FsService // task fs service // settings subscribeTimeout time.Duration @@ -51,7 +51,7 @@ type RunnerV2 struct { ch chan constants.TaskSignal // channel to communicate between Service and RunnerV2 err error // standard process error cwd string // working directory - c *client2.GrpcClientV2 // grpc client + c *client2.GrpcClient // grpc client conn grpc.TaskService_ConnectClient // grpc task service stream client // log internals @@ -668,7 +668,7 @@ func (r *RunnerV2) configureCwd() { } } -func NewTaskRunnerV2(id primitive.ObjectID, svc *ServiceV2) (r2 *RunnerV2, err error) { +func NewTaskRunnerV2(id primitive.ObjectID, svc *Service) (r2 *RunnerV2, err error) { // validate options if id.IsZero() { return nil, constants.ErrInvalidOptions @@ -700,7 +700,7 @@ func NewTaskRunnerV2(id primitive.ObjectID, svc *ServiceV2) (r2 *RunnerV2, err e r.fsSvc = fs.NewFsServiceV2(filepath.Join(viper.GetString("workspace"), r.s.Id.Hex())) // grpc client - r.c = client2.GetGrpcClientV2() + r.c = client2.GetGrpcClient() // initialize task runner if err := r.Init(); err != nil { diff --git a/core/task/handler/service_v2.go b/core/task/handler/service.go similarity index 75% rename from core/task/handler/service_v2.go rename to core/task/handler/service.go index 245faf1f..e02c7c3a 100644 --- a/core/task/handler/service_v2.go +++ b/core/task/handler/service.go @@ -22,10 +22,10 @@ import ( "time" ) -type ServiceV2 struct { +type Service struct { // dependencies cfgSvc interfaces.NodeConfigService - c *grpcclient.GrpcClientV2 // grpc client + c *grpcclient.GrpcClient // grpc client // settings //maxRunners int @@ -42,7 +42,7 @@ type ServiceV2 struct { syncLocks sync.Map // files sync locks map of task runners } -func (svc *ServiceV2) Start() { +func (svc *Service) Start() { // Initialize gRPC if not started if !svc.c.IsStarted() { err := svc.c.Start() @@ -55,26 +55,19 @@ func (svc *ServiceV2) Start() { go svc.FetchAndRunTasks() } -func (svc *ServiceV2) Stop() { +func (svc *Service) Stop() { svc.stopped = true } -func (svc *ServiceV2) Run(taskId primitive.ObjectID) (err error) { +func (svc *Service) Run(taskId primitive.ObjectID) (err error) { return svc.runTask(taskId) } -func (svc *ServiceV2) Cancel(taskId primitive.ObjectID, force bool) (err error) { - r, err := svc.getRunner(taskId) - if err != nil { - return err - } - if err := r.Cancel(force); err != nil { - return err - } - return nil +func (svc *Service) Cancel(taskId primitive.ObjectID, force bool) (err error) { + return svc.cancelTask(taskId, force) } -func (svc *ServiceV2) FetchAndRunTasks() { +func (svc *Service) FetchAndRunTasks() { ticker := time.NewTicker(svc.fetchInterval) for { if svc.stopped { @@ -126,7 +119,7 @@ func (svc *ServiceV2) FetchAndRunTasks() { } } -func (svc *ServiceV2) ReportStatus() { +func (svc *Service) ReportStatus() { ticker := time.NewTicker(svc.reportInterval) for { if svc.stopped { @@ -143,19 +136,19 @@ func (svc *ServiceV2) ReportStatus() { } } -func (svc *ServiceV2) GetExitWatchDuration() (duration time.Duration) { +func (svc *Service) GetExitWatchDuration() (duration time.Duration) { return svc.exitWatchDuration } -func (svc *ServiceV2) GetCancelTimeout() (timeout time.Duration) { +func (svc *Service) GetCancelTimeout() (timeout time.Duration) { return svc.cancelTimeout } -func (svc *ServiceV2) GetNodeConfigService() (cfgSvc interfaces.NodeConfigService) { +func (svc *Service) GetNodeConfigService() (cfgSvc interfaces.NodeConfigService) { return svc.cfgSvc } -func (svc *ServiceV2) GetCurrentNode() (n *models2.NodeV2, err error) { +func (svc *Service) GetCurrentNode() (n *models2.NodeV2, err error) { // node key nodeKey := svc.cfgSvc.GetNodeKey() @@ -172,7 +165,7 @@ func (svc *ServiceV2) GetCurrentNode() (n *models2.NodeV2, err error) { return n, nil } -func (svc *ServiceV2) GetTaskById(id primitive.ObjectID) (t *models2.TaskV2, err error) { +func (svc *Service) GetTaskById(id primitive.ObjectID) (t *models2.TaskV2, err error) { if svc.cfgSvc.IsMaster() { t, err = service.NewModelServiceV2[models2.TaskV2]().GetById(id) } else { @@ -185,7 +178,7 @@ func (svc *ServiceV2) GetTaskById(id primitive.ObjectID) (t *models2.TaskV2, err return t, nil } -func (svc *ServiceV2) GetSpiderById(id primitive.ObjectID) (s *models2.SpiderV2, err error) { +func (svc *Service) GetSpiderById(id primitive.ObjectID) (s *models2.SpiderV2, err error) { if svc.cfgSvc.IsMaster() { s, err = service.NewModelServiceV2[models2.SpiderV2]().GetById(id) } else { @@ -198,7 +191,7 @@ func (svc *ServiceV2) GetSpiderById(id primitive.ObjectID) (s *models2.SpiderV2, return s, nil } -func (svc *ServiceV2) getRunnerCount() (count int) { +func (svc *Service) getRunnerCount() (count int) { n, err := svc.GetCurrentNode() if err != nil { trace.PrintError(err) @@ -224,7 +217,7 @@ func (svc *ServiceV2) getRunnerCount() (count int) { return count } -func (svc *ServiceV2) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) { +func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) { log.Debugf("[TaskHandlerService] getRunner: taskId[%v]", taskId) v, ok := svc.runners.Load(taskId) if !ok { @@ -239,17 +232,17 @@ func (svc *ServiceV2) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRun return r, nil } -func (svc *ServiceV2) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) { +func (svc *Service) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) { log.Debugf("[TaskHandlerService] addRunner: taskId[%v]", taskId) svc.runners.Store(taskId, r) } -func (svc *ServiceV2) deleteRunner(taskId primitive.ObjectID) { +func (svc *Service) deleteRunner(taskId primitive.ObjectID) { log.Debugf("[TaskHandlerService] deleteRunner: taskId[%v]", taskId) svc.runners.Delete(taskId) } -func (svc *ServiceV2) reportStatus() (err error) { +func (svc *Service) reportStatus() (err error) { // current node n, err := svc.GetCurrentNode() if err != nil { @@ -276,10 +269,12 @@ func (svc *ServiceV2) reportStatus() (err error) { return nil } -func (svc *ServiceV2) fetchTask() (tid primitive.ObjectID, err error) { +func (svc *Service) fetchTask() (tid primitive.ObjectID, err error) { ctx, cancel := context.WithTimeout(context.Background(), svc.fetchTimeout) defer cancel() - res, err := svc.c.TaskClient.FetchTask(ctx, svc.c.NewRequest(nil)) + res, err := svc.c.TaskClient.FetchTask(ctx, &grpc.TaskServiceFetchTaskRequest{ + NodeKey: svc.cfgSvc.GetNodeKey(), + }) if err != nil { return primitive.NilObjectID, fmt.Errorf("fetchTask task error: %v", err) } @@ -291,7 +286,7 @@ func (svc *ServiceV2) fetchTask() (tid primitive.ObjectID, err error) { return tid, nil } -func (svc *ServiceV2) runTask(taskId primitive.ObjectID) (err error) { +func (svc *Service) runTask(taskId primitive.ObjectID) (err error) { // attempt to get runner from pool _, ok := svc.runners.Load(taskId) if ok { @@ -347,7 +342,7 @@ func (svc *ServiceV2) runTask(taskId primitive.ObjectID) (err error) { return nil } -func (svc *ServiceV2) subscribeTask(taskId primitive.ObjectID) (stream grpc.TaskService_SubscribeClient, err error) { +func (svc *Service) subscribeTask(taskId primitive.ObjectID) (stream grpc.TaskService_SubscribeClient, err error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() req := &grpc.TaskServiceSubscribeRequest{ @@ -361,13 +356,13 @@ func (svc *ServiceV2) subscribeTask(taskId primitive.ObjectID) (stream grpc.Task return stream, nil } -func (svc *ServiceV2) handleStreamMessages(id primitive.ObjectID, stream grpc.TaskService_SubscribeClient, stopCh chan struct{}) { +func (svc *Service) handleStreamMessages(taskId primitive.ObjectID, stream grpc.TaskService_SubscribeClient, stopCh chan struct{}) { for { select { case <-stopCh: err := stream.CloseSend() if err != nil { - log.Errorf("task[%s] failed to close stream: %v", id.Hex(), err) + log.Errorf("task[%s] failed to close stream: %v", taskId.Hex(), err) return } return @@ -375,28 +370,52 @@ func (svc *ServiceV2) handleStreamMessages(id primitive.ObjectID, stream grpc.Ta msg, err := stream.Recv() if err != nil { if errors.Is(err, io.EOF) { + log.Infof("task[%s] received EOF, stream closed", taskId.Hex()) return } - log.Errorf("task[%s] stream error: %v", id.Hex(), err) + log.Errorf("task[%s] stream error: %v", taskId.Hex(), err) continue } switch msg.Code { case grpc.TaskServiceSubscribeCode_CANCEL: - log.Infof("task[%s] received cancel signal", id.Hex()) - go func() { - if err := svc.Cancel(id, true); err != nil { - log.Errorf("task[%s] failed to cancel: %v", id.Hex(), err) - } - log.Infof("task[%s] cancelled", id.Hex()) - }() + log.Infof("task[%s] received cancel signal", taskId.Hex()) + go svc.handleCancel(msg, taskId) } } } } -func newTaskHandlerServiceV2() (svc2 *ServiceV2, err error) { +func (svc *Service) handleCancel(msg *grpc.TaskServiceSubscribeResponse, taskId primitive.ObjectID) { + // validate task id + if msg.TaskId != taskId.Hex() { + log.Errorf("task[%s] received cancel signal for another task[%s]", taskId.Hex(), msg.TaskId) + return + } + + // cancel task + err := svc.cancelTask(taskId, msg.Force) + if err != nil { + log.Errorf("task[%s] failed to cancel: %v", taskId.Hex(), err) + return + } + + log.Infof("task[%s] cancelled", taskId.Hex()) +} + +func (svc *Service) cancelTask(taskId primitive.ObjectID, force bool) (err error) { + r, err := svc.getRunner(taskId) + if err != nil { + return err + } + if err := r.Cancel(force); err != nil { + return err + } + return nil +} + +func newTaskHandlerService() (svc2 *Service, err error) { // service - svc := &ServiceV2{ + svc := &Service{ exitWatchDuration: 60 * time.Second, fetchInterval: 1 * time.Second, fetchTimeout: 15 * time.Second, @@ -410,22 +429,22 @@ func newTaskHandlerServiceV2() (svc2 *ServiceV2, err error) { svc.cfgSvc = nodeconfig.GetNodeConfigService() // grpc client - svc.c = grpcclient.GetGrpcClientV2() + svc.c = grpcclient.GetGrpcClient() log.Debugf("[NewTaskHandlerService] svc[cfgPath: %s]", svc.cfgSvc.GetConfigPath()) return svc, nil } -var _serviceV2 *ServiceV2 +var _serviceV2 *Service var _serviceV2Once = new(sync.Once) -func GetTaskHandlerServiceV2() (svr *ServiceV2, err error) { +func GetTaskHandlerService() (svr *Service, err error) { if _serviceV2 != nil { return _serviceV2, nil } _serviceV2Once.Do(func() { - _serviceV2, err = newTaskHandlerServiceV2() + _serviceV2, err = newTaskHandlerService() if err != nil { log.Errorf("failed to create task handler service: %v", err) } diff --git a/core/task/scheduler/service_v2.go b/core/task/scheduler/service_v2.go index b2bf6b18..a97760d9 100644 --- a/core/task/scheduler/service_v2.go +++ b/core/task/scheduler/service_v2.go @@ -2,6 +2,7 @@ package scheduler import ( errors2 "errors" + "fmt" "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/errors" @@ -24,7 +25,7 @@ type ServiceV2 struct { // dependencies nodeCfgSvc interfaces.NodeConfigService svr *server.GrpcServer - handlerSvc *handler.ServiceV2 + handlerSvc *handler.Service // settings interval time.Duration @@ -80,25 +81,23 @@ func (svc *ServiceV2) Enqueue(t *models2.TaskV2, by primitive.ObjectID) (t2 *mod return t, nil } -func (svc *ServiceV2) Cancel(id primitive.ObjectID, by primitive.ObjectID, force bool) (err error) { +func (svc *ServiceV2) Cancel(id, by primitive.ObjectID, force bool) (err error) { // task t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id) if err != nil { - return trace.TraceError(err) + log.Errorf("task not found: %s", id.Hex()) + return err } // initial status initialStatus := t.Status - // set task status as "cancelled" - t.Status = constants.TaskStatusCancelled - _ = svc.SaveTask(t, by) - // set status of pending tasks as "cancelled" and remove from task item queue if initialStatus == constants.TaskStatusPending { // remove from task item queue if err := service.NewModelServiceV2[models2.TaskQueueItemV2]().DeleteById(t.Id); err != nil { - return trace.TraceError(err) + log.Errorf("failed to delete task queue item: %s", t.Id.Hex()) + return err } return nil } @@ -106,34 +105,57 @@ func (svc *ServiceV2) Cancel(id primitive.ObjectID, by primitive.ObjectID, force // whether task is running on master node isMasterTask, err := svc.isMasterNode(t) if err != nil { - // when error, force status being set as "cancelled" - t.Status = constants.TaskStatusCancelled + err := fmt.Errorf("failed to check if task is running on master node: %s", t.Id.Hex()) + t.Status = constants.TaskStatusAbnormal + t.Error = err.Error() return svc.SaveTask(t, by) } - // node - n, err := service.NewModelServiceV2[models2.NodeV2]().GetById(t.NodeId) - if err != nil { - return trace.TraceError(err) - } - if isMasterTask { // cancel task on master - if err := svc.handlerSvc.Cancel(id, force); err != nil { - return trace.TraceError(err) - } - // cancel success - return nil + return svc.cancelOnMaster(t, by, force) } else { // send to cancel task on worker nodes - if err := svc.svr.SendStreamMessageWithData("node:"+n.Key, grpc.StreamMessageCode_CANCEL_TASK, t); err != nil { - return trace.TraceError(err) - } - // cancel success - return nil + return svc.cancelOnWorker(t, by, force) } } +func (svc *ServiceV2) cancelOnMaster(t *models2.TaskV2, by primitive.ObjectID, force bool) (err error) { + if err := svc.handlerSvc.Cancel(t.Id, force); err != nil { + log.Errorf("failed to cancel task on master: %s", t.Id.Hex()) + return err + } + + // set task status as "cancelled" + t.Status = constants.TaskStatusCancelled + return svc.SaveTask(t, by) +} + +func (svc *ServiceV2) cancelOnWorker(t *models2.TaskV2, by primitive.ObjectID, force bool) (err error) { + // get subscribe stream + stream, ok := svc.svr.TaskSvr.GetSubscribeStream(t.Id) + if !ok { + err := fmt.Errorf("stream not found for task: %s", t.Id.Hex()) + log.Errorf(err.Error()) + t.Status = constants.TaskStatusAbnormal + t.Error = err.Error() + return svc.SaveTask(t, by) + } + + // send cancel request + err = stream.Send(&grpc.TaskServiceSubscribeResponse{ + Code: grpc.TaskServiceSubscribeCode_CANCEL, + TaskId: t.Id.Hex(), + Force: force, + }) + if err != nil { + log.Errorf("failed to send cancel request to worker: %s", t.Id.Hex()) + return err + } + + return nil +} + func (svc *ServiceV2) SetInterval(interval time.Duration) { svc.interval = interval } @@ -244,7 +266,7 @@ func NewTaskSchedulerServiceV2() (svc2 *ServiceV2, err error) { log.Errorf("failed to get grpc server: %v", err) return nil, err } - svc.handlerSvc, err = handler.GetTaskHandlerServiceV2() + svc.handlerSvc, err = handler.GetTaskHandlerService() if err != nil { log.Errorf("failed to get task handler service: %v", err) return nil, err diff --git a/core/task/stats/service_v2.go b/core/task/stats/service.go similarity index 84% rename from core/task/stats/service_v2.go rename to core/task/stats/service.go index 731ed195..f88633b6 100644 --- a/core/task/stats/service_v2.go +++ b/core/task/stats/service.go @@ -28,7 +28,7 @@ type databaseServiceItem struct { time time.Time } -type ServiceV2 struct { +type Service struct { // dependencies nodeCfgSvc interfaces.NodeConfigService @@ -39,12 +39,12 @@ type ServiceV2 struct { logDriver log.Driver } -func (svc *ServiceV2) Init() (err error) { +func (svc *Service) Init() (err error) { go svc.cleanup() return nil } -func (svc *ServiceV2) InsertData(taskId primitive.ObjectID, records ...map[string]interface{}) (err error) { +func (svc *Service) InsertData(taskId primitive.ObjectID, records ...map[string]interface{}) (err error) { count := 0 item, err := svc.getDatabaseServiceItem(taskId) @@ -80,11 +80,11 @@ func (svc *ServiceV2) InsertData(taskId primitive.ObjectID, records ...map[strin return nil } -func (svc *ServiceV2) InsertLogs(id primitive.ObjectID, logs ...string) (err error) { +func (svc *Service) InsertLogs(id primitive.ObjectID, logs ...string) (err error) { return svc.logDriver.WriteLines(id.Hex(), logs) } -func (svc *ServiceV2) getDatabaseServiceItem(taskId primitive.ObjectID) (item *databaseServiceItem, err error) { +func (svc *Service) getDatabaseServiceItem(taskId primitive.ObjectID) (item *databaseServiceItem, err error) { // atomic operation svc.mu.Lock() defer svc.mu.Unlock() @@ -136,7 +136,7 @@ func (svc *ServiceV2) getDatabaseServiceItem(taskId primitive.ObjectID) (item *d return item, nil } -func (svc *ServiceV2) updateTaskStats(id primitive.ObjectID, resultCount int) { +func (svc *Service) updateTaskStats(id primitive.ObjectID, resultCount int) { err := service.NewModelServiceV2[models2.TaskStatV2]().UpdateById(id, bson.M{ "$inc": bson.M{ "result_count": resultCount, @@ -147,7 +147,7 @@ func (svc *ServiceV2) updateTaskStats(id primitive.ObjectID, resultCount int) { } } -func (svc *ServiceV2) cleanup() { +func (svc *Service) cleanup() { for { // atomic operation svc.mu.Lock() @@ -164,7 +164,7 @@ func (svc *ServiceV2) cleanup() { } } -func (svc *ServiceV2) normalizeRecord(item *databaseServiceItem, record map[string]interface{}) (res map[string]interface{}) { +func (svc *Service) normalizeRecord(item *databaseServiceItem, record map[string]interface{}) (res map[string]interface{}) { res = record // set task id @@ -176,9 +176,9 @@ func (svc *ServiceV2) normalizeRecord(item *databaseServiceItem, record map[stri return res } -func NewTaskStatsServiceV2() (svc2 *ServiceV2, err error) { +func NewTaskStatsServiceV2() (svc2 *Service, err error) { // service - svc := &ServiceV2{ + svc := &Service{ mu: sync.Mutex{}, databaseServiceItems: map[string]*databaseServiceItem{}, databaseServiceTll: 10 * time.Minute, @@ -195,9 +195,9 @@ func NewTaskStatsServiceV2() (svc2 *ServiceV2, err error) { return svc, nil } -var _serviceV2 *ServiceV2 +var _serviceV2 *Service -func GetTaskStatsServiceV2() (svr *ServiceV2, err error) { +func GetTaskStatsServiceV2() (svr *Service, err error) { if _serviceV2 != nil { return _serviceV2, nil }