From fda7e56e1ad57eee1c041c23bb4e2ba1bdf1c7b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Wed, 25 Sep 2019 16:19:58 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=97=A5=E5=BF=97=E6=89=93?= =?UTF-8?q?=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/database/pubsub.go | 1 - backend/services/msg_handler/msg_task.go | 15 +++++++++++---- backend/services/task.go | 9 ++++++++- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go index 8487df11..0eb8639b 100644 --- a/backend/database/pubsub.go +++ b/backend/database/pubsub.go @@ -33,7 +33,6 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s done <- fmt.Errorf("redis pubsub receive err: %v", msg) return case redis.Message: - fmt.Println(msg) if err := consume(msg); err != nil { fmt.Printf("redis pubsub consume message err: %v", err) continue diff --git a/backend/services/msg_handler/msg_task.go b/backend/services/msg_handler/msg_task.go index 087217a8..5f120f80 100644 --- a/backend/services/msg_handler/msg_task.go +++ b/backend/services/msg_handler/msg_task.go @@ -14,16 +14,23 @@ type Task struct { } func (t *Task) Handle() error { + log.Infof("received cancel task msg, task_id: %s", t.msg.TaskId) // 取消任务 ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId) if ch != nil { ch <- constants.TaskCancel } else { + log.Infof("chan is empty, update status to abnormal") // 节点可能被重启,找不到chan - t, _ := model.GetTask(t.msg.TaskId) - t.Status = constants.StatusCancelled - t.FinishTs = time.Now() - if err := t.Save(); err != nil { + task, err := model.GetTask(t.msg.TaskId) + if err != nil { + log.Errorf("task not found, task_id: %s", t.msg.TaskId) + debug.PrintStack() + return err + } + task.Status = constants.StatusAbnormal + task.FinishTs = time.Now() + if err := task.Save(); err != nil { debug.PrintStack() log.Infof("cancel task error: %s", err.Error()) } diff --git a/backend/services/task.go b/backend/services/task.go index 216bdfb7..bcd5cd2e 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -465,6 +465,9 @@ func CancelTask(id string) (err error) { return err } + log.Infof("current node id is: %s", node.Id.Hex()) + log.Infof("task node id is: %s", task.NodeId.Hex()) + if node.Id == task.NodeId { // 任务节点为主节点 @@ -474,7 +477,11 @@ func CancelTask(id string) (err error) { // 发出取消进程信号 ch <- constants.TaskCancel } else { - model. + if err := model.UpdateTaskToAbnormal(node.Id); err != nil { + log.Errorf("update task to abnormal : {}", err.Error()) + debug.PrintStack() + return + } } } else {