添加日志打印

This commit is contained in:
陈景阳
2019-09-25 16:19:58 +08:00
parent 82999225de
commit fda7e56e1a
3 changed files with 19 additions and 6 deletions

View File

@@ -33,7 +33,6 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
done <- fmt.Errorf("redis pubsub receive err: %v", msg)
return
case redis.Message:
fmt.Println(msg)
if err := consume(msg); err != nil {
fmt.Printf("redis pubsub consume message err: %v", err)
continue

View File

@@ -14,16 +14,23 @@ type Task struct {
}
func (t *Task) Handle() error {
log.Infof("received cancel task msg, task_id: %s", t.msg.TaskId)
// 取消任务
ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId)
if ch != nil {
ch <- constants.TaskCancel
} else {
log.Infof("chan is empty, update status to abnormal")
// 节点可能被重启找不到chan
t, _ := model.GetTask(t.msg.TaskId)
t.Status = constants.StatusCancelled
t.FinishTs = time.Now()
if err := t.Save(); err != nil {
task, err := model.GetTask(t.msg.TaskId)
if err != nil {
log.Errorf("task not found, task_id: %s", t.msg.TaskId)
debug.PrintStack()
return err
}
task.Status = constants.StatusAbnormal
task.FinishTs = time.Now()
if err := task.Save(); err != nil {
debug.PrintStack()
log.Infof("cancel task error: %s", err.Error())
}

View File

@@ -465,6 +465,9 @@ func CancelTask(id string) (err error) {
return err
}
log.Infof("current node id is: %s", node.Id.Hex())
log.Infof("task node id is: %s", task.NodeId.Hex())
if node.Id == task.NodeId {
// 任务节点为主节点
@@ -474,7 +477,11 @@ func CancelTask(id string) (err error) {
// 发出取消进程信号
ch <- constants.TaskCancel
} else {
model.
if err := model.UpdateTaskToAbnormal(node.Id); err != nil {
log.Errorf("update task to abnormal : {}", err.Error())
debug.PrintStack()
return
}
}
} else {