From 789f71fd8035cf3b2b6e6665786d0f48b6a2cdac Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 29 Oct 2024 17:59:39 +0800 Subject: [PATCH] refactor: updated task related grpc proto --- core/controllers/task_v2.go | 13 +++++- core/interfaces/task_runner.go | 2 +- core/task/handler/runner_v2.go | 4 +- core/task/handler/service_v2.go | 4 +- core/task/scheduler/service_v2.go | 6 +-- grpc/proto/services/model_base_service.proto | 22 ---------- grpc/proto/services/plugin_service.proto | 14 ------- .../services/task_scheduler_service.proto | 41 +++++++++++++++++++ grpc/proto/services/task_service.proto | 19 --------- 9 files changed, 61 insertions(+), 64 deletions(-) delete mode 100644 grpc/proto/services/model_base_service.proto delete mode 100644 grpc/proto/services/plugin_service.proto create mode 100644 grpc/proto/services/task_scheduler_service.proto delete mode 100644 grpc/proto/services/task_service.proto diff --git a/core/controllers/task_v2.go b/core/controllers/task_v2.go index 2b3e16a5..f6890646 100644 --- a/core/controllers/task_v2.go +++ b/core/controllers/task_v2.go @@ -365,6 +365,10 @@ func PostTaskRestart(c *gin.Context) { } func PostTaskCancel(c *gin.Context) { + type Payload struct { + Force bool `json:"force,omitempty"` + } + // id id, err := primitive.ObjectIDFromHex(c.Param("id")) if err != nil { @@ -372,6 +376,13 @@ func PostTaskCancel(c *gin.Context) { return } + // payload + var p Payload + if err := c.ShouldBindJSON(&p); err != nil { + HandleErrorBadRequest(c, err) + return + } + // task t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id) if err != nil { @@ -393,7 +404,7 @@ func PostTaskCancel(c *gin.Context) { HandleErrorInternalServerError(c, err) return } - if err := schedulerSvc.Cancel(id, u.Id); err != nil { + if err := schedulerSvc.Cancel(id, u.Id, p.Force); err != nil { HandleErrorInternalServerError(c, err) return } diff --git a/core/interfaces/task_runner.go b/core/interfaces/task_runner.go index 4194764c..90714e04 100644 --- a/core/interfaces/task_runner.go +++ b/core/interfaces/task_runner.go @@ -8,7 +8,7 @@ import ( type TaskRunner interface { Init() (err error) Run() (err error) - Cancel() (err error) + Cancel(force bool) (err error) SetSubscribeTimeout(timeout time.Duration) GetTaskId() (id primitive.ObjectID) CleanUp() (err error) diff --git a/core/task/handler/runner_v2.go b/core/task/handler/runner_v2.go index 8690aa6e..bb61c5f4 100644 --- a/core/task/handler/runner_v2.go +++ b/core/task/handler/runner_v2.go @@ -166,11 +166,11 @@ func (r *RunnerV2) Run() (err error) { return err } -func (r *RunnerV2) Cancel() (err error) { +func (r *RunnerV2) Cancel(force bool) (err error) { // kill process opts := &sys_exec.KillProcessOptions{ Timeout: r.svc.GetCancelTimeout(), - Force: true, + Force: force, } if err := sys_exec.KillProcess(r.cmd, opts); err != nil { return err diff --git a/core/task/handler/service_v2.go b/core/task/handler/service_v2.go index 5d1c583c..763fa622 100644 --- a/core/task/handler/service_v2.go +++ b/core/task/handler/service_v2.go @@ -66,12 +66,12 @@ func (svc *ServiceV2) Reset() { defer svc.mu.Unlock() } -func (svc *ServiceV2) Cancel(taskId primitive.ObjectID) (err error) { +func (svc *ServiceV2) Cancel(taskId primitive.ObjectID, force bool) (err error) { r, err := svc.getRunner(taskId) if err != nil { return err } - if err := r.Cancel(); err != nil { + if err := r.Cancel(force); err != nil { return err } return nil diff --git a/core/task/scheduler/service_v2.go b/core/task/scheduler/service_v2.go index 18c71245..65cf6449 100644 --- a/core/task/scheduler/service_v2.go +++ b/core/task/scheduler/service_v2.go @@ -12,7 +12,7 @@ import ( nodeconfig "github.com/crawlab-team/crawlab/core/node/config" "github.com/crawlab-team/crawlab/core/task/handler" "github.com/crawlab-team/crawlab/core/utils" - grpc "github.com/crawlab-team/crawlab/grpc" + "github.com/crawlab-team/crawlab/grpc" "github.com/crawlab-team/crawlab/trace" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -80,7 +80,7 @@ func (svc *ServiceV2) Enqueue(t *models2.TaskV2, by primitive.ObjectID) (t2 *mod return t, nil } -func (svc *ServiceV2) Cancel(id primitive.ObjectID, by primitive.ObjectID) (err error) { +func (svc *ServiceV2) Cancel(id primitive.ObjectID, by primitive.ObjectID, force bool) (err error) { // task t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id) if err != nil { @@ -119,7 +119,7 @@ func (svc *ServiceV2) Cancel(id primitive.ObjectID, by primitive.ObjectID) (err if isMasterTask { // cancel task on master - if err := svc.handlerSvc.Cancel(id); err != nil { + if err := svc.handlerSvc.Cancel(id, force); err != nil { return trace.TraceError(err) } // cancel success diff --git a/grpc/proto/services/model_base_service.proto b/grpc/proto/services/model_base_service.proto deleted file mode 100644 index 0836659a..00000000 --- a/grpc/proto/services/model_base_service.proto +++ /dev/null @@ -1,22 +0,0 @@ -syntax = "proto3"; - -import "entity/request.proto"; -import "entity/response.proto"; - -package grpc; -option go_package = ".;grpc"; - -service ModelBaseService { - rpc GetById(Request) returns (Response){}; - rpc Get(Request) returns (Response){}; - rpc GetList(Request) returns (Response){}; - rpc DeleteById(Request) returns (Response){}; - rpc Delete(Request) returns (Response){}; - rpc DeleteList(Request) returns (Response){}; - rpc ForceDeleteList(Request) returns (Response){}; - rpc UpdateById(Request) returns (Response){}; - rpc Update(Request) returns (Response){}; - rpc UpdateDoc(Request) returns (Response){}; - rpc Insert(Request) returns (Response){}; - rpc Count(Request) returns (Response){}; -} diff --git a/grpc/proto/services/plugin_service.proto b/grpc/proto/services/plugin_service.proto deleted file mode 100644 index 5e41977e..00000000 --- a/grpc/proto/services/plugin_service.proto +++ /dev/null @@ -1,14 +0,0 @@ -syntax = "proto3"; - -import "entity/plugin_request.proto"; -import "entity/response.proto"; -import "entity/stream_message.proto"; - -package grpc; -option go_package = ".;grpc"; - -service PluginService { - rpc Register(PluginRequest) returns (Response){}; - rpc Subscribe(PluginRequest) returns (stream StreamMessage){}; - rpc Poll(stream StreamMessage) returns (stream StreamMessage){}; -} diff --git a/grpc/proto/services/task_scheduler_service.proto b/grpc/proto/services/task_scheduler_service.proto new file mode 100644 index 00000000..3f12edbf --- /dev/null +++ b/grpc/proto/services/task_scheduler_service.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +import "entity/request.proto"; +import "entity/response.proto"; +import "entity/stream_message.proto"; + +package grpc; +option go_package = ".;grpc"; + +message TaskSchedulerServiceConnectRequest { + string node_key = 1; +} + +enum TaskSchedulerServiceCode { + CANCEL = 0; +} + +message TaskSchedulerServiceConnectResponse { + TaskSchedulerServiceCode code = 1; + string task_id = 2; + bool force = 3; +} + +message TaskSchedulerServiceFetchTaskRequest { + string node_key = 1; +} + +message TaskSchedulerServiceFetchTaskResponse { + string task_id = 2; +} + +message TaskServiceSendNotificationRequest { + string node_key = 1; + string task_id = 2; +} + +service TaskSchedulerService { + rpc Connect(stream TaskSchedulerServiceConnectRequest) returns (TaskSchedulerServiceConnectResponse){}; + rpc FetchTask(TaskSchedulerServiceFetchTaskRequest) returns (TaskSchedulerServiceFetchTaskResponse){}; + rpc SendNotification(TaskServiceSendNotificationRequest) returns (Response){}; +} diff --git a/grpc/proto/services/task_service.proto b/grpc/proto/services/task_service.proto deleted file mode 100644 index 6b67caf0..00000000 --- a/grpc/proto/services/task_service.proto +++ /dev/null @@ -1,19 +0,0 @@ -syntax = "proto3"; - -import "entity/request.proto"; -import "entity/response.proto"; -import "entity/stream_message.proto"; - -package grpc; -option go_package = ".;grpc"; - -message TaskServiceSendNotificationRequest { - string node_key = 1; - string task_id = 2; -} - -service TaskService { - rpc Subscribe(stream StreamMessage) returns (Response){}; - rpc Fetch(Request) returns (Response){}; - rpc SendNotification(TaskServiceSendNotificationRequest) returns (Response){}; -}