From 9f251f3ebed4a1c549a00b5022d74247ad21fb8a Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Fri, 27 Jun 2025 13:50:21 +0800 Subject: [PATCH] fix: enhance task cancellation logic with graceful termination and stuck task cleanup --- core/controllers/task.go | 4 +- core/task/handler/runner.go | 52 +++++++++++++--- core/task/handler/service.go | 110 ++++++++++++++++++++++++++++++++- core/task/scheduler/service.go | 67 +++++++++++++++----- core/utils/task.go | 1 + 5 files changed, 206 insertions(+), 28 deletions(-) diff --git a/core/controllers/task.go b/core/controllers/task.go index 6199ce02..2fedc784 100644 --- a/core/controllers/task.go +++ b/core/controllers/task.go @@ -312,8 +312,8 @@ func PostTaskCancel(c *gin.Context, params *PostTaskCancelParams) (response *Voi return GetErrorVoidResponse(err) } - // validate - if !utils.IsCancellable(t.Status) { + // validate - only check if task is cancellable when force is false + if !params.Force && !utils.IsCancellable(t.Status) { return GetErrorVoidResponse(errors.New("task is not cancellable")) } diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index aa90d5a8..567ed203 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -274,30 +274,66 @@ func (r *Runner) Run() (err error) { // Cancel terminates the running task. If force is true, the process will be killed immediately // without waiting for graceful shutdown. func (r *Runner) Cancel(force bool) (err error) { + r.Debugf("attempting to cancel task (force: %v)", force) + // Signal goroutines to stop r.cancel() - // Kill process - r.Debugf("attempt to kill process[%d]", r.pid) - err = utils.KillProcess(r.cmd, force) - if err != nil { - r.Warnf("kill process error: %v", err) + // If force is not requested, try graceful termination first + if !force { + r.Debugf("attempting graceful termination of process[%d]", r.pid) + if err = utils.KillProcess(r.cmd, false); err != nil { + r.Warnf("graceful termination failed: %v, escalating to force", err) + force = true + } else { + // Wait for graceful termination with shorter timeout + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + r.Warnf("graceful termination timeout, escalating to force") + force = true + goto forceKill + case <-ticker.C: + if !utils.ProcessIdExists(r.pid) { + r.Debugf("process[%d] terminated gracefully", r.pid) + return nil + } + } + } + } } - // Create a context with timeout +forceKill: + if force { + r.Debugf("force killing process[%d]", r.pid) + if err = utils.KillProcess(r.cmd, true); err != nil { + r.Errorf("force kill failed: %v", err) + return err + } + } + + // Wait for process to be killed with timeout ctx, cancel := context.WithTimeout(context.Background(), r.svc.GetCancelTimeout()) defer cancel() - // Wait for process to be killed with context ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-ctx.Done(): - return fmt.Errorf("timeout waiting for task to stop") + r.Errorf("timeout waiting for task to stop after %v", r.svc.GetCancelTimeout()) + // At this point, process might be completely stuck, log and return error + return fmt.Errorf("task cancellation timeout: process may be stuck") case <-ticker.C: if !utils.ProcessIdExists(r.pid) { + r.Debugf("process[%d] terminated successfully", r.pid) return nil } } diff --git a/core/task/handler/service.go b/core/task/handler/service.go index 56653b76..459d7710 100644 --- a/core/task/handler/service.go +++ b/core/task/handler/service.go @@ -64,6 +64,9 @@ func (svc *Service) Start() { go svc.fetchAndRunTasks() svc.Infof("Task handler service started") + + // Start the stuck task cleanup routine + svc.startStuckTaskCleanup() } func (svc *Service) Stop() { @@ -604,11 +607,39 @@ func (svc *Service) handleCancel(msg *grpc.TaskServiceSubscribeResponse, taskId func (svc *Service) cancelTask(taskId primitive.ObjectID, force bool) (err error) { r, err := svc.getRunner(taskId) if err != nil { - return err + // Runner not found, task might already be finished + svc.Warnf("runner not found for task[%s]: %v", taskId.Hex(), err) + return nil } - if err := r.Cancel(force); err != nil { - return err + + // Attempt cancellation with timeout + cancelCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) + defer cancelFunc() + + cancelDone := make(chan error, 1) + go func() { + cancelDone <- r.Cancel(force) + }() + + select { + case err = <-cancelDone: + if err != nil { + svc.Errorf("failed to cancel task[%s]: %v", taskId.Hex(), err) + // If cancellation failed and force is not set, try force cancellation + if !force { + svc.Warnf("escalating to force cancellation for task[%s]", taskId.Hex()) + return svc.cancelTask(taskId, true) + } + return err + } + svc.Infof("task[%s] cancelled successfully", taskId.Hex()) + case <-cancelCtx.Done(): + svc.Errorf("timeout cancelling task[%s], removing runner from pool", taskId.Hex()) + // Remove runner from pool to prevent further issues + svc.runners.Delete(taskId) + return fmt.Errorf("task cancellation timeout") } + return nil } @@ -656,6 +687,79 @@ func (svc *Service) stopAllRunners() { } } +func (svc *Service) startStuckTaskCleanup() { + go func() { + ticker := time.NewTicker(5 * time.Minute) // Check every 5 minutes + defer ticker.Stop() + + for { + select { + case <-svc.ctx.Done(): + svc.Debugf("stuck task cleanup routine shutting down") + return + case <-ticker.C: + svc.checkAndCleanupStuckTasks() + } + } + }() +} + +// checkAndCleanupStuckTasks checks for tasks that have been trying to cancel for too long +func (svc *Service) checkAndCleanupStuckTasks() { + defer func() { + if r := recover(); r != nil { + svc.Errorf("panic in stuck task cleanup: %v", r) + } + }() + + var stuckTasks []primitive.ObjectID + + // Check all running tasks + svc.runners.Range(func(key, value interface{}) bool { + taskId, ok := key.(primitive.ObjectID) + if !ok { + return true + } + + // Get task from database to check its state + t, err := svc.GetTaskById(taskId) + if err != nil { + svc.Errorf("failed to get task[%s] during stuck cleanup: %v", taskId.Hex(), err) + return true + } + + // Check if task has been in cancelling state too long (15+ minutes) + if t.Status == constants.TaskStatusCancelled && time.Since(t.UpdatedAt) > 15*time.Minute { + svc.Warnf("detected stuck cancelled task[%s], will force cleanup", taskId.Hex()) + stuckTasks = append(stuckTasks, taskId) + } + + return true + }) + + // Force cleanup stuck tasks + for _, taskId := range stuckTasks { + svc.Infof("force cleaning up stuck task[%s]", taskId.Hex()) + + // Remove from runners map + svc.runners.Delete(taskId) + + // Update task status to indicate it was force cleaned + t, err := svc.GetTaskById(taskId) + if err == nil { + t.Status = constants.TaskStatusCancelled + t.Error = "Task was stuck in cancelling state and was force cleaned up" + if updateErr := svc.UpdateTask(t); updateErr != nil { + svc.Errorf("failed to update stuck task[%s] status: %v", taskId.Hex(), updateErr) + } + } + } + + if len(stuckTasks) > 0 { + svc.Infof("cleaned up %d stuck tasks", len(stuckTasks)) + } +} + func newTaskHandlerService() *Service { // service svc := &Service{ diff --git a/core/task/scheduler/service.go b/core/task/scheduler/service.go index c8e46522..8f75337d 100644 --- a/core/task/scheduler/service.go +++ b/core/task/scheduler/service.go @@ -1,8 +1,12 @@ package scheduler import ( + "context" errors2 "errors" "fmt" + "sync" + "time" + "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/grpc/server" "github.com/crawlab-team/crawlab/core/interfaces" @@ -14,8 +18,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" mongo2 "go.mongodb.org/mongo-driver/mongo" - "sync" - "time" ) type Service struct { @@ -116,22 +118,57 @@ func (svc *Service) cancelOnWorker(t *models.Task, by primitive.ObjectID, force // get subscribe stream stream, ok := svc.svr.TaskSvr.GetSubscribeStream(t.Id) if !ok { - err := fmt.Errorf("stream not found for task (%s)", t.Id.Hex()) - svc.Errorf(err.Error()) - t.Status = constants.TaskStatusAbnormal - t.Error = err.Error() + svc.Warnf("stream not found for task (%s), task may already be finished or connection lost", t.Id.Hex()) + // Task might have finished or connection lost, mark as cancelled anyway + t.Status = constants.TaskStatusCancelled + t.Error = "cancel signal could not be delivered - stream not found" return svc.SaveTask(t, by) } - // send cancel request - err = stream.Send(&grpc.TaskServiceSubscribeResponse{ - Code: grpc.TaskServiceSubscribeCode_CANCEL, - TaskId: t.Id.Hex(), - Force: force, - }) - if err != nil { - svc.Errorf("failed to send cancel task (%s) request to worker: %v", t.Id.Hex(), err) - return err + // send cancel request with timeout + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Create a channel to handle the send operation + sendDone := make(chan error, 1) + go func() { + err := stream.Send(&grpc.TaskServiceSubscribeResponse{ + Code: grpc.TaskServiceSubscribeCode_CANCEL, + TaskId: t.Id.Hex(), + Force: force, + }) + sendDone <- err + }() + + select { + case err = <-sendDone: + if err != nil { + svc.Errorf("failed to send cancel task (%s) request to worker: %v", t.Id.Hex(), err) + // If sending failed, still mark task as cancelled to avoid stuck state + t.Status = constants.TaskStatusCancelled + t.Error = fmt.Sprintf("cancel signal delivery failed: %v", err) + return svc.SaveTask(t, by) + } + svc.Infof("cancel signal sent for task (%s) with force=%v", t.Id.Hex(), force) + case <-ctx.Done(): + svc.Errorf("timeout sending cancel request for task (%s)", t.Id.Hex()) + // Mark as cancelled even if signal couldn't be delivered + t.Status = constants.TaskStatusCancelled + t.Error = "cancel signal delivery timeout" + return svc.SaveTask(t, by) + } + + // For force cancellation, wait a bit and verify cancellation + if force { + time.Sleep(5 * time.Second) + // Re-fetch task to check current status + currentTask, fetchErr := service.NewModelService[models.Task]().GetById(t.Id) + if fetchErr == nil && currentTask.Status == constants.TaskStatusRunning { + svc.Warnf("task (%s) still running after force cancel, marking as cancelled", t.Id.Hex()) + currentTask.Status = constants.TaskStatusCancelled + currentTask.Error = "forced cancellation - task was unresponsive" + return svc.SaveTask(currentTask, by) + } } return nil diff --git a/core/utils/task.go b/core/utils/task.go index cd06bfeb..9e4d8a9d 100644 --- a/core/utils/task.go +++ b/core/utils/task.go @@ -7,6 +7,7 @@ import ( func IsCancellable(status string) bool { switch status { case constants.TaskStatusPending, + constants.TaskStatusAssigned, constants.TaskStatusRunning: return true default: