From 13e8da9cdacff088d69bcbb694c3b353715cd601 Mon Sep 17 00:00:00 2001 From: marvzhang Date: Fri, 3 Jul 2020 17:28:07 +0800 Subject: [PATCH] fixed https://github.com/crawlab-team/crawlab/issues/787 --- backend/constants/rpc.go | 1 + backend/services/msg_handler/handler.go | 22 ++++----- backend/services/rpc/base.go | 2 + backend/services/rpc/cancel_task.go | 63 +++++++++++++++++++++++++ backend/services/task.go | 31 ++---------- backend/utils/chan.go | 7 +++ 6 files changed, 88 insertions(+), 38 deletions(-) create mode 100644 backend/services/rpc/cancel_task.go diff --git a/backend/constants/rpc.go b/backend/constants/rpc.go index d94eaf75..c7380110 100644 --- a/backend/constants/rpc.go +++ b/backend/constants/rpc.go @@ -6,4 +6,5 @@ const ( RpcUninstallDep = "uninstall_dep" RpcGetInstalledDepList = "get_installed_dep_list" RpcGetLang = "get_lang" + RpcCancelTask = "cancel_task" ) diff --git a/backend/services/msg_handler/handler.go b/backend/services/msg_handler/handler.go index bd98fce6..bee4113c 100644 --- a/backend/services/msg_handler/handler.go +++ b/backend/services/msg_handler/handler.go @@ -12,17 +12,17 @@ type Handler interface { func GetMsgHandler(msg entity.NodeMessage) Handler { log.Debugf("received msg , type is : %s", msg.Type) - if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog { - // 日志相关 - return &Log{ - msg: msg, - } - } else if msg.Type == constants.MsgTypeCancelTask { - // 任务相关 - return &Task{ - msg: msg, - } - } else if msg.Type == constants.MsgTypeGetSystemInfo { + //if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog { + // // 日志相关 + // return &Log{ + // msg: msg, + // } + //} else if msg.Type == constants.MsgTypeCancelTask { + // // 任务相关 + // return &Task{ + // msg: msg, + // } + if msg.Type == constants.MsgTypeGetSystemInfo { // 系统信息相关 return &SystemInfo{ msg: msg, diff --git a/backend/services/rpc/base.go b/backend/services/rpc/base.go index 369c62f4..1609be16 100644 --- a/backend/services/rpc/base.go +++ b/backend/services/rpc/base.go @@ -74,6 +74,8 @@ func GetService(msg entity.RpcMessage) Service { return &GetLangService{msg: msg} case constants.RpcGetInstalledDepList: return &GetInstalledDepsService{msg: msg} + case constants.RpcCancelTask: + return &CancelTaskService{msg: msg} } return nil } diff --git a/backend/services/rpc/cancel_task.go b/backend/services/rpc/cancel_task.go new file mode 100644 index 00000000..0e1d4617 --- /dev/null +++ b/backend/services/rpc/cancel_task.go @@ -0,0 +1,63 @@ +package rpc + +import ( + "crawlab/constants" + "crawlab/entity" + "crawlab/model" + "crawlab/utils" + "errors" + "fmt" + "github.com/globalsign/mgo/bson" +) + +type CancelTaskService struct { + msg entity.RpcMessage +} + +func (s *CancelTaskService) ServerHandle() (entity.RpcMessage, error) { + taskId := utils.GetRpcParam("task_id", s.msg.Params) + nodeId := utils.GetRpcParam("node_id", s.msg.Params) + if err := CancelTaskLocal(taskId, nodeId); err != nil { + s.msg.Error = err.Error() + return s.msg, err + } + s.msg.Result = "success" + return s.msg, nil +} + +func (s *CancelTaskService) ClientHandle() (o interface{}, err error) { + // 发起 RPC 请求,获取服务端数据 + _, err = ClientFunc(s.msg)() + if err != nil { + return + } + + return +} + +func CancelTaskLocal(taskId string, nodeId string) error { + if !utils.TaskExecChanMap.HasChanKey(taskId) { + _ = model.UpdateTaskToAbnormal(bson.ObjectIdHex(nodeId)) + return errors.New(fmt.Sprintf("task id (%s) does not exist", taskId)) + } + ch := utils.TaskExecChanMap.ChanBlocked(taskId) + ch <- constants.TaskCancel + return nil +} + +func CancelTaskRemote(taskId string, nodeId string) (err error) { + params := make(map[string]string) + params["task_id"] = taskId + params["node_id"] = nodeId + s := GetService(entity.RpcMessage{ + NodeId: nodeId, + Method: constants.RpcCancelTask, + Params: params, + Timeout: 60, + }) + _, err = s.ClientHandle() + if err != nil { + return + } + return +} diff --git a/backend/services/task.go b/backend/services/task.go index 50c58f92..d40c3f7b 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -4,11 +4,11 @@ import ( "bufio" "crawlab/constants" "crawlab/database" - "crawlab/entity" "crawlab/lib/cron" "crawlab/model" "crawlab/services/local_node" "crawlab/services/notification" + "crawlab/services/rpc" "crawlab/services/spider_handler" "crawlab/utils" "encoding/json" @@ -395,7 +395,6 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider, u // 起一个goroutine来监控进程 ch := utils.TaskExecChanMap.ChanBlocked(t.Id) - go FinishOrCancelTask(ch, cmd, s, t) // kill的时候,可以kill所有的子进程 @@ -788,34 +787,12 @@ func CancelTask(id string) (err error) { if node.Id == task.NodeId { // 任务节点为主节点 - - // 获取任务执行频道 - ch := utils.TaskExecChanMap.ChanBlocked(id) - if ch != nil { - // 发出取消进程信号 - ch <- constants.TaskCancel - } else { - if err := model.UpdateTaskToAbnormal(node.Id); err != nil { - log.Errorf("update task to abnormal : {}", err.Error()) - debug.PrintStack() - return err - } + if err := rpc.CancelTaskLocal(task.Id, task.NodeId.Hex()); err != nil { + return err } } else { // 任务节点为工作节点 - - // 序列化消息 - msg := entity.NodeMessage{ - Type: constants.MsgTypeCancelTask, - TaskId: id, - } - msgBytes, err := json.Marshal(&msg) - if err != nil { - return err - } - - // 发布消息 - if _, err := database.RedisClient.Publish("nodes:"+task.NodeId.Hex(), utils.BytesToString(msgBytes)); err != nil { + if err := rpc.CancelTaskRemote(task.Id, task.NodeId.Hex()); err != nil { return err } } diff --git a/backend/utils/chan.go b/backend/utils/chan.go index 7fb4cea3..c0144340 100644 --- a/backend/utils/chan.go +++ b/backend/utils/chan.go @@ -31,3 +31,10 @@ func (cm *ChanMap) ChanBlocked(key string) chan string { cm.m.Store(key, ch) return ch } + +func (cm *ChanMap) HasChanKey(key string) bool { + if _, ok := cm.m.Load(key); ok { + return true + } + return false +}