This commit is contained in:
陈景阳
2019-09-25 14:38:46 +08:00
parent f3fd19d198
commit fe221ffd6f
5 changed files with 76 additions and 9 deletions

View File

@@ -1,11 +1,18 @@
package constants
const (
StatusPending string = "pending"
StatusRunning string = "running"
StatusFinished string = "finished"
StatusError string = "error"
// 调度中
StatusPending string = "pending"
// 运行中
StatusRunning string = "running"
// 已完成
StatusFinished string = "finished"
// 错误
StatusError string = "error"
// 取消
StatusCancelled string = "cancelled"
// 节点重启导致的异常终止
StatusAbnormal string = "abnormal"
)
const (

View File

@@ -25,6 +25,7 @@ type Task struct {
WaitDuration float64 `json:"wait_duration" bson:"wait_duration"`
RuntimeDuration float64 `json:"runtime_duration" bson:"runtime_duration"`
TotalDuration float64 `json:"total_duration" bson:"total_duration"`
Pid int `json:"pid" bson:"pid"`
// 前端数据
SpiderName string `json:"spider_name"`
@@ -191,6 +192,7 @@ func RemoveTask(id string) error {
return nil
}
// 删除task by spider_id
func RemoveTaskBySpiderId(id bson.ObjectId) error {
tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")
if err != nil {
@@ -206,6 +208,7 @@ func RemoveTaskBySpiderId(id bson.ObjectId) error {
return nil
}
// task 总数
func GetTaskCount(query interface{}) (int, error) {
s, c := database.GetCol("tasks")
defer s.Close()
@@ -308,6 +311,7 @@ func GetDailyTaskStats(query bson.M) ([]TaskDailyItem, error) {
return dailyItems, nil
}
// 更新task的结果数
func UpdateTaskResultCount(id string) (err error) {
// 获取任务
task, err := GetTask(id)
@@ -343,3 +347,25 @@ func UpdateTaskResultCount(id string) (err error) {
}
return nil
}
func UpdateTaskToAbnormal(nodeId bson.ObjectId) error {
s, c := database.GetCol("tasks")
defer s.Close()
selector := bson.M{
"node_id": nodeId,
"status": constants.StatusRunning,
}
update := bson.M{
"$set": bson.M{
"status": constants.StatusAbnormal,
},
}
_, err := c.UpdateAll(selector, update)
if err != nil {
log.Errorf("update task to abnormal error: %s, node_id : %s", err.Error(), nodeId.Hex())
debug.PrintStack()
return err
}
return nil
}

View File

@@ -2,7 +2,11 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/model"
"crawlab/utils"
"github.com/apex/log"
"runtime/debug"
"time"
)
type Task struct {
@@ -12,6 +16,17 @@ type Task struct {
func (t *Task) Handle() error {
// 取消任务
ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId)
ch <- constants.TaskCancel
if ch != nil {
ch <- constants.TaskCancel
} else {
// 节点可能被重启找不到chan
t, _ := model.GetTask(t.msg.TaskId)
t.Status = constants.StatusCancelled
t.FinishTs = time.Now()
if err := t.Save(); err != nil {
debug.PrintStack()
log.Infof("cancel task error: %s", err.Error())
}
}
return nil
}

View File

@@ -328,6 +328,12 @@ func InitNodeService() error {
}
}
// 更新在当前节点执行的任务状态为abnormal
if err := model.UpdateTaskToAbnormal(node.Id); err != nil {
debug.PrintStack()
return err
}
c.Start()
return nil
}

View File

@@ -159,13 +159,26 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
}
}()
// 开始执行
if err := cmd.Run(); err != nil {
HandleTaskError(t, err)
// 异步启动进程
if err := cmd.Start(); err != nil {
log.Errorf("start spider error:{}", err.Error())
debug.PrintStack()
return err
}
// 保存pid到task
t.Pid = cmd.Process.Pid
if err := t.Save(); err != nil {
log.Errorf("save task pid error: %s", err.Error())
debug.PrintStack()
return err
}
// 同步等待进程完成
if err := cmd.Wait(); err != nil {
log.Errorf("wait process finish error: %s", err.Error())
debug.PrintStack()
return err
}
ch <- constants.TaskFinish
return nil
}