refactor: updated task related grpc proto

This commit is contained in:
Marvin Zhang
2024-10-29 17:59:39 +08:00
parent 3aa32a14ad
commit 789f71fd80
9 changed files with 61 additions and 64 deletions

View File

@@ -365,6 +365,10 @@ func PostTaskRestart(c *gin.Context) {
}
func PostTaskCancel(c *gin.Context) {
type Payload struct {
Force bool `json:"force,omitempty"`
}
// id
id, err := primitive.ObjectIDFromHex(c.Param("id"))
if err != nil {
@@ -372,6 +376,13 @@ func PostTaskCancel(c *gin.Context) {
return
}
// payload
var p Payload
if err := c.ShouldBindJSON(&p); err != nil {
HandleErrorBadRequest(c, err)
return
}
// task
t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id)
if err != nil {
@@ -393,7 +404,7 @@ func PostTaskCancel(c *gin.Context) {
HandleErrorInternalServerError(c, err)
return
}
if err := schedulerSvc.Cancel(id, u.Id); err != nil {
if err := schedulerSvc.Cancel(id, u.Id, p.Force); err != nil {
HandleErrorInternalServerError(c, err)
return
}

View File

@@ -8,7 +8,7 @@ import (
type TaskRunner interface {
Init() (err error)
Run() (err error)
Cancel() (err error)
Cancel(force bool) (err error)
SetSubscribeTimeout(timeout time.Duration)
GetTaskId() (id primitive.ObjectID)
CleanUp() (err error)

View File

@@ -166,11 +166,11 @@ func (r *RunnerV2) Run() (err error) {
return err
}
func (r *RunnerV2) Cancel() (err error) {
func (r *RunnerV2) Cancel(force bool) (err error) {
// kill process
opts := &sys_exec.KillProcessOptions{
Timeout: r.svc.GetCancelTimeout(),
Force: true,
Force: force,
}
if err := sys_exec.KillProcess(r.cmd, opts); err != nil {
return err

View File

@@ -66,12 +66,12 @@ func (svc *ServiceV2) Reset() {
defer svc.mu.Unlock()
}
func (svc *ServiceV2) Cancel(taskId primitive.ObjectID) (err error) {
func (svc *ServiceV2) Cancel(taskId primitive.ObjectID, force bool) (err error) {
r, err := svc.getRunner(taskId)
if err != nil {
return err
}
if err := r.Cancel(); err != nil {
if err := r.Cancel(force); err != nil {
return err
}
return nil

View File

@@ -12,7 +12,7 @@ import (
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/core/task/handler"
"github.com/crawlab-team/crawlab/core/utils"
grpc "github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -80,7 +80,7 @@ func (svc *ServiceV2) Enqueue(t *models2.TaskV2, by primitive.ObjectID) (t2 *mod
return t, nil
}
func (svc *ServiceV2) Cancel(id primitive.ObjectID, by primitive.ObjectID) (err error) {
func (svc *ServiceV2) Cancel(id primitive.ObjectID, by primitive.ObjectID, force bool) (err error) {
// task
t, err := service.NewModelServiceV2[models2.TaskV2]().GetById(id)
if err != nil {
@@ -119,7 +119,7 @@ func (svc *ServiceV2) Cancel(id primitive.ObjectID, by primitive.ObjectID) (err
if isMasterTask {
// cancel task on master
if err := svc.handlerSvc.Cancel(id); err != nil {
if err := svc.handlerSvc.Cancel(id, force); err != nil {
return trace.TraceError(err)
}
// cancel success

View File

@@ -1,22 +0,0 @@
syntax = "proto3";
import "entity/request.proto";
import "entity/response.proto";
package grpc;
option go_package = ".;grpc";
service ModelBaseService {
rpc GetById(Request) returns (Response){};
rpc Get(Request) returns (Response){};
rpc GetList(Request) returns (Response){};
rpc DeleteById(Request) returns (Response){};
rpc Delete(Request) returns (Response){};
rpc DeleteList(Request) returns (Response){};
rpc ForceDeleteList(Request) returns (Response){};
rpc UpdateById(Request) returns (Response){};
rpc Update(Request) returns (Response){};
rpc UpdateDoc(Request) returns (Response){};
rpc Insert(Request) returns (Response){};
rpc Count(Request) returns (Response){};
}

View File

@@ -1,14 +0,0 @@
syntax = "proto3";
import "entity/plugin_request.proto";
import "entity/response.proto";
import "entity/stream_message.proto";
package grpc;
option go_package = ".;grpc";
service PluginService {
rpc Register(PluginRequest) returns (Response){};
rpc Subscribe(PluginRequest) returns (stream StreamMessage){};
rpc Poll(stream StreamMessage) returns (stream StreamMessage){};
}

View File

@@ -0,0 +1,41 @@
syntax = "proto3";
import "entity/request.proto";
import "entity/response.proto";
import "entity/stream_message.proto";
package grpc;
option go_package = ".;grpc";
message TaskSchedulerServiceConnectRequest {
string node_key = 1;
}
enum TaskSchedulerServiceCode {
CANCEL = 0;
}
message TaskSchedulerServiceConnectResponse {
TaskSchedulerServiceCode code = 1;
string task_id = 2;
bool force = 3;
}
message TaskSchedulerServiceFetchTaskRequest {
string node_key = 1;
}
message TaskSchedulerServiceFetchTaskResponse {
string task_id = 2;
}
message TaskServiceSendNotificationRequest {
string node_key = 1;
string task_id = 2;
}
service TaskSchedulerService {
rpc Connect(stream TaskSchedulerServiceConnectRequest) returns (TaskSchedulerServiceConnectResponse){};
rpc FetchTask(TaskSchedulerServiceFetchTaskRequest) returns (TaskSchedulerServiceFetchTaskResponse){};
rpc SendNotification(TaskServiceSendNotificationRequest) returns (Response){};
}

View File

@@ -1,19 +0,0 @@
syntax = "proto3";
import "entity/request.proto";
import "entity/response.proto";
import "entity/stream_message.proto";
package grpc;
option go_package = ".;grpc";
message TaskServiceSendNotificationRequest {
string node_key = 1;
string task_id = 2;
}
service TaskService {
rpc Subscribe(stream StreamMessage) returns (Response){};
rpc Fetch(Request) returns (Response){};
rpc SendNotification(TaskServiceSendNotificationRequest) returns (Response){};
}