From fe221ffd6f71f3531644bcc157e8c32f5068c206 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 25 Sep 2019 14:38:46 +0800 Subject: [PATCH] fix --- backend/constants/task.go | 15 ++++++++++---- backend/model/task.go | 26 ++++++++++++++++++++++++ backend/services/msg_handler/msg_task.go | 17 +++++++++++++++- backend/services/node.go | 6 ++++++ backend/services/task.go | 21 +++++++++++++++---- 5 files changed, 76 insertions(+), 9 deletions(-) diff --git a/backend/constants/task.go b/backend/constants/task.go index 5eeee967..b6fb615c 100644 --- a/backend/constants/task.go +++ b/backend/constants/task.go @@ -1,11 +1,18 @@ package constants const ( - StatusPending string = "pending" - StatusRunning string = "running" - StatusFinished string = "finished" - StatusError string = "error" + // 调度中 + StatusPending string = "pending" + // 运行中 + StatusRunning string = "running" + // 已完成 + StatusFinished string = "finished" + // 错误 + StatusError string = "error" + // 取消 StatusCancelled string = "cancelled" + // 节点重启导致的异常终止 + StatusAbnormal string = "abnormal" ) const ( diff --git a/backend/model/task.go b/backend/model/task.go index 177edccb..f568b7fe 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -25,6 +25,7 @@ type Task struct { WaitDuration float64 `json:"wait_duration" bson:"wait_duration"` RuntimeDuration float64 `json:"runtime_duration" bson:"runtime_duration"` TotalDuration float64 `json:"total_duration" bson:"total_duration"` + Pid int `json:"pid" bson:"pid"` // 前端数据 SpiderName string `json:"spider_name"` @@ -191,6 +192,7 @@ func RemoveTask(id string) error { return nil } +// 删除task by spider_id func RemoveTaskBySpiderId(id bson.ObjectId) error { tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts") if err != nil { @@ -206,6 +208,7 @@ func RemoveTaskBySpiderId(id bson.ObjectId) error { return nil } +// task 总数 func GetTaskCount(query interface{}) (int, error) { s, c := database.GetCol("tasks") defer s.Close() @@ -308,6 +311,7 @@ func GetDailyTaskStats(query bson.M) ([]TaskDailyItem, error) { return dailyItems, nil } +// 更新task的结果数 func UpdateTaskResultCount(id string) (err error) { // 获取任务 task, err := GetTask(id) @@ -343,3 +347,25 @@ func UpdateTaskResultCount(id string) (err error) { } return nil } + +func UpdateTaskToAbnormal(nodeId bson.ObjectId) error { + s, c := database.GetCol("tasks") + defer s.Close() + + selector := bson.M{ + "node_id": nodeId, + "status": constants.StatusRunning, + } + update := bson.M{ + "$set": bson.M{ + "status": constants.StatusAbnormal, + }, + } + _, err := c.UpdateAll(selector, update) + if err != nil { + log.Errorf("update task to abnormal error: %s, node_id : %s", err.Error(), nodeId.Hex()) + debug.PrintStack() + return err + } + return nil +} diff --git a/backend/services/msg_handler/msg_task.go b/backend/services/msg_handler/msg_task.go index 1d218264..087217a8 100644 --- a/backend/services/msg_handler/msg_task.go +++ b/backend/services/msg_handler/msg_task.go @@ -2,7 +2,11 @@ package msg_handler import ( "crawlab/constants" + "crawlab/model" "crawlab/utils" + "github.com/apex/log" + "runtime/debug" + "time" ) type Task struct { @@ -12,6 +16,17 @@ type Task struct { func (t *Task) Handle() error { // 取消任务 ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId) - ch <- constants.TaskCancel + if ch != nil { + ch <- constants.TaskCancel + } else { + // 节点可能被重启,找不到chan + t, _ := model.GetTask(t.msg.TaskId) + t.Status = constants.StatusCancelled + t.FinishTs = time.Now() + if err := t.Save(); err != nil { + debug.PrintStack() + log.Infof("cancel task error: %s", err.Error()) + } + } return nil } diff --git a/backend/services/node.go b/backend/services/node.go index 44fa3905..5526cb01 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -328,6 +328,12 @@ func InitNodeService() error { } } + // 更新在当前节点执行的任务状态为:abnormal + if err := model.UpdateTaskToAbnormal(node.Id); err != nil { + debug.PrintStack() + return err + } + c.Start() return nil } diff --git a/backend/services/task.go b/backend/services/task.go index 5f3a4d07..53c96b6b 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -159,13 +159,26 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e } }() - // 开始执行 - if err := cmd.Run(); err != nil { - HandleTaskError(t, err) + // 异步启动进程 + if err := cmd.Start(); err != nil { + log.Errorf("start spider error:{}", err.Error()) + debug.PrintStack() + return err + } + // 保存pid到task + t.Pid = cmd.Process.Pid + if err := t.Save(); err != nil { + log.Errorf("save task pid error: %s", err.Error()) + debug.PrintStack() + return err + } + // 同步等待进程完成 + if err := cmd.Wait(); err != nil { + log.Errorf("wait process finish error: %s", err.Error()) + debug.PrintStack() return err } ch <- constants.TaskFinish - return nil }