marvzhang
2020-07-03 17:28:07 +08:00
parent 14ab3b9208
commit 13e8da9cda
6 changed files with 88 additions and 38 deletions

View File

@@ -6,4 +6,5 @@ const (
RpcUninstallDep = "uninstall_dep"
RpcGetInstalledDepList = "get_installed_dep_list"
RpcGetLang = "get_lang"
RpcCancelTask = "cancel_task"
)

View File

@@ -12,17 +12,17 @@ type Handler interface {
func GetMsgHandler(msg entity.NodeMessage) Handler {
log.Debugf("received msg , type is : %s", msg.Type)
if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog {
// 日志相关
return &Log{
msg: msg,
}
} else if msg.Type == constants.MsgTypeCancelTask {
// 任务相关
return &Task{
msg: msg,
}
} else if msg.Type == constants.MsgTypeGetSystemInfo {
//if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog {
// // 日志相关
// return &Log{
// msg: msg,
// }
//} else if msg.Type == constants.MsgTypeCancelTask {
// // 任务相关
// return &Task{
// msg: msg,
// }
if msg.Type == constants.MsgTypeGetSystemInfo {
// 系统信息相关
return &SystemInfo{
msg: msg,

View File

@@ -74,6 +74,8 @@ func GetService(msg entity.RpcMessage) Service {
return &GetLangService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
case constants.RpcCancelTask:
return &CancelTaskService{msg: msg}
}
return nil
}

View File

@@ -0,0 +1,63 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"errors"
"fmt"
"github.com/globalsign/mgo/bson"
)
type CancelTaskService struct {
msg entity.RpcMessage
}
func (s *CancelTaskService) ServerHandle() (entity.RpcMessage, error) {
taskId := utils.GetRpcParam("task_id", s.msg.Params)
nodeId := utils.GetRpcParam("node_id", s.msg.Params)
if err := CancelTaskLocal(taskId, nodeId); err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
s.msg.Result = "success"
return s.msg, nil
}
func (s *CancelTaskService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
_, err = ClientFunc(s.msg)()
if err != nil {
return
}
return
}
func CancelTaskLocal(taskId string, nodeId string) error {
if !utils.TaskExecChanMap.HasChanKey(taskId) {
_ = model.UpdateTaskToAbnormal(bson.ObjectIdHex(nodeId))
return errors.New(fmt.Sprintf("task id (%s) does not exist", taskId))
}
ch := utils.TaskExecChanMap.ChanBlocked(taskId)
ch <- constants.TaskCancel
return nil
}
func CancelTaskRemote(taskId string, nodeId string) (err error) {
params := make(map[string]string)
params["task_id"] = taskId
params["node_id"] = nodeId
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcCancelTask,
Params: params,
Timeout: 60,
})
_, err = s.ClientHandle()
if err != nil {
return
}
return
}

View File

@@ -4,11 +4,11 @@ import (
"bufio"
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/local_node"
"crawlab/services/notification"
"crawlab/services/rpc"
"crawlab/services/spider_handler"
"crawlab/utils"
"encoding/json"
@@ -395,7 +395,6 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider, u
// 起一个goroutine来监控进程
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
go FinishOrCancelTask(ch, cmd, s, t)
// kill的时候可以kill所有的子进程
@@ -788,34 +787,12 @@ func CancelTask(id string) (err error) {
if node.Id == task.NodeId {
// 任务节点为主节点
// 获取任务执行频道
ch := utils.TaskExecChanMap.ChanBlocked(id)
if ch != nil {
// 发出取消进程信号
ch <- constants.TaskCancel
} else {
if err := model.UpdateTaskToAbnormal(node.Id); err != nil {
log.Errorf("update task to abnormal : {}", err.Error())
debug.PrintStack()
return err
}
if err := rpc.CancelTaskLocal(task.Id, task.NodeId.Hex()); err != nil {
return err
}
} else {
// 任务节点为工作节点
// 序列化消息
msg := entity.NodeMessage{
Type: constants.MsgTypeCancelTask,
TaskId: id,
}
msgBytes, err := json.Marshal(&msg)
if err != nil {
return err
}
// 发布消息
if _, err := database.RedisClient.Publish("nodes:"+task.NodeId.Hex(), utils.BytesToString(msgBytes)); err != nil {
if err := rpc.CancelTaskRemote(task.Id, task.NodeId.Hex()); err != nil {
return err
}
}

View File

@@ -31,3 +31,10 @@ func (cm *ChanMap) ChanBlocked(key string) chan string {
cm.m.Store(key, ch)
return ch
}
func (cm *ChanMap) HasChanKey(key string) bool {
if _, ok := cm.m.Load(key); ok {
return true
}
return false
}