diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go index 8487df11..0eb8639b 100644 --- a/backend/database/pubsub.go +++ b/backend/database/pubsub.go @@ -33,7 +33,6 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s done <- fmt.Errorf("redis pubsub receive err: %v", msg) return case redis.Message: - fmt.Println(msg) if err := consume(msg); err != nil { fmt.Printf("redis pubsub consume message err: %v", err) continue diff --git a/backend/services/msg_handler/msg_task.go b/backend/services/msg_handler/msg_task.go index 087217a8..5f120f80 100644 --- a/backend/services/msg_handler/msg_task.go +++ b/backend/services/msg_handler/msg_task.go @@ -14,16 +14,23 @@ type Task struct { } func (t *Task) Handle() error { + log.Infof("received cancel task msg, task_id: %s", t.msg.TaskId) // 取消任务 ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId) if ch != nil { ch <- constants.TaskCancel } else { + log.Infof("chan is empty, update status to abnormal") // 节点可能被重启,找不到chan - t, _ := model.GetTask(t.msg.TaskId) - t.Status = constants.StatusCancelled - t.FinishTs = time.Now() - if err := t.Save(); err != nil { + task, err := model.GetTask(t.msg.TaskId) + if err != nil { + log.Errorf("task not found, task_id: %s", t.msg.TaskId) + debug.PrintStack() + return err + } + task.Status = constants.StatusAbnormal + task.FinishTs = time.Now() + if err := task.Save(); err != nil { debug.PrintStack() log.Infof("cancel task error: %s", err.Error()) } diff --git a/backend/services/task.go b/backend/services/task.go index 216bdfb7..bcd5cd2e 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -465,6 +465,9 @@ func CancelTask(id string) (err error) { return err } + log.Infof("current node id is: %s", node.Id.Hex()) + log.Infof("task node id is: %s", task.NodeId.Hex()) + if node.Id == task.NodeId { // 任务节点为主节点 @@ -474,7 +477,11 @@ func CancelTask(id string) (err error) { // 发出取消进程信号 ch <- constants.TaskCancel } else { - model. + if err := model.UpdateTaskToAbnormal(node.Id); err != nil { + log.Errorf("update task to abnormal : {}", err.Error()) + debug.PrintStack() + return + } } } else {