From 92046a8c2e9b34c44ab56dc8e23fec6c4e3d553b Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Fri, 27 Jun 2025 14:02:24 +0800 Subject: [PATCH] fix: improve task cancellation and connection health check logic with timeout handling --- core/task/handler/runner.go | 65 ++++++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index 567ed203..7b3e2a16 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -279,6 +279,24 @@ func (r *Runner) Cancel(force bool) (err error) { // Signal goroutines to stop r.cancel() + // Stop health check ticker immediately to prevent interference + if r.connHealthTicker != nil { + r.connHealthTicker.Stop() + r.Debugf("stopped connection health ticker") + } + + // Close gRPC connection to stop health check messages + r.connMutex.Lock() + if r.conn != nil { + _ = r.conn.CloseSend() + r.conn = nil + r.Debugf("closed gRPC connection to stop health checks") + } + r.connMutex.Unlock() + + // Wait a moment for background goroutines to respond to cancellation signal + time.Sleep(100 * time.Millisecond) + // If force is not requested, try graceful termination first if !force { r.Debugf("attempting graceful termination of process[%d]", r.pid) @@ -334,6 +352,19 @@ forceKill: case <-ticker.C: if !utils.ProcessIdExists(r.pid) { r.Debugf("process[%d] terminated successfully", r.pid) + // Wait for background goroutines to finish with timeout + done := make(chan struct{}) + go func() { + r.wg.Wait() + close(done) + }() + + select { + case <-done: + r.Debugf("all background goroutines stopped") + case <-time.After(5 * time.Second): + r.Warnf("some background goroutines did not stop within timeout") + } return nil } } @@ -906,7 +937,16 @@ func (r *Runner) isConnectionHealthy() bool { if r.conn == nil { return false } - // Try to send a ping-like message to test connection + + // Check if context is already cancelled - don't do health checks during cancellation + select { + case <-r.ctx.Done(): + r.Debugf("skipping health check - task is being cancelled") + return false + default: + } + + // Try to send a ping-like message to test connection with timeout // Use a simple log message as ping since PING code doesn't exist testMsg := &grpc.TaskServiceConnectRequest{ Code: grpc.TaskServiceConnectCode_INSERT_LOGS, @@ -914,12 +954,27 @@ func (r *Runner) isConnectionHealthy() bool { Data: []byte(`["[HEALTH CHECK] connection test"]`), } - if err := r.conn.Send(testMsg); err != nil { - r.Debugf("connection health check failed: %v", err) + // Use a channel to make the Send operation timeout-aware + done := make(chan error, 1) + go func() { + done <- r.conn.Send(testMsg) + }() + + // Wait for either completion or timeout + select { + case err := <-done: + if err != nil { + r.Debugf("connection health check failed: %v", err) + return false + } + return true + case <-time.After(5 * time.Second): + r.Debugf("connection health check timed out") + return false + case <-r.ctx.Done(): + r.Debugf("connection health check cancelled") return false } - - return true } // reconnectWithRetry attempts to reconnect to the gRPC service with exponential backoff