From 622dce51c35f5fd85f72d1ae3ce6c7dc7f3afdfb Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 11 Jun 2025 22:30:37 +0800 Subject: [PATCH] fix: goroutine cleanup and error handling during shutdown --- core/task/handler/runner.go | 81 +++++++++++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 8 deletions(-) diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index 8a7a8182..d30dca18 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -200,11 +200,34 @@ func (r *Runner) Run() (err error) { // Ensure cleanup when Run() exits defer func() { - _ = r.conn.CloseSend() // Close gRPC connection - r.cancel() // Cancel context to stop all goroutines - r.wg.Wait() // Wait for all goroutines to finish - close(r.done) // Signal that everything is done - close(r.ipcChan) // Close IPC channel + // 1. Signal all goroutines to stop + r.cancel() + + // 2. Wait for all goroutines to finish with timeout + done := make(chan struct{}) + go func() { + r.wg.Wait() + close(done) + }() + + select { + case <-done: + // All goroutines finished normally + case <-time.After(5 * time.Second): + // Timeout waiting for goroutines, proceed with cleanup + r.Warnf("timeout waiting for goroutines to finish, proceeding with cleanup") + } + + // 3. Close gRPC connection after all goroutines have stopped + if r.conn != nil { + _ = r.conn.CloseSend() + } + + // 4. Close channels after everything has stopped + close(r.done) + if r.ipcChan != nil { + close(r.ipcChan) + } }() // wait for process to finish @@ -749,6 +772,18 @@ func (r *Runner) initConnection() (err error) { // writeLogLines marshals log lines to JSON and sends them to the task service func (r *Runner) writeLogLines(lines []string) { + // Check if context is cancelled or connection is closed + select { + case <-r.ctx.Done(): + return + default: + } + + // Check if connection is available + if r.conn == nil { + return + } + linesBytes, err := json.Marshal(lines) if err != nil { r.Errorf("error marshaling log lines: %v", err) @@ -760,7 +795,13 @@ func (r *Runner) writeLogLines(lines []string) { Data: linesBytes, } if err := r.conn.Send(msg); err != nil { - r.Errorf("error sending log lines: %v", err) + // Don't log errors if context is cancelled (expected during shutdown) + select { + case <-r.ctx.Done(): + return + default: + r.Errorf("error sending log lines: %v", err) + } return } } @@ -896,6 +937,9 @@ func (r *Runner) configureCwd() { // handleIPC processes incoming IPC messages from the child process // Messages are converted to JSON and written to the child process's stdin func (r *Runner) handleIPC() { + r.wg.Add(1) + defer r.wg.Done() + for msg := range r.ipcChan { // Convert message to JSON jsonData, err := json.Marshal(msg) @@ -1028,6 +1072,13 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { return } + // Check if context is cancelled or connection is closed + select { + case <-r.ctx.Done(): + return + default: + } + // Validate connection if r.conn == nil { r.Errorf("gRPC connection not initialized") @@ -1050,9 +1101,17 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { case <-ctx.Done(): r.Errorf("timeout sending IPC message") return + case <-r.ctx.Done(): + return default: if err := r.conn.Send(grpcMsg); err != nil { - r.Errorf("error sending IPC message: %v", err) + // Don't log errors if context is cancelled (expected during shutdown) + select { + case <-r.ctx.Done(): + return + default: + r.Errorf("error sending IPC message: %v", err) + } return } } @@ -1153,8 +1212,14 @@ func (r *Runner) logInternally(level string, message string) { internalLog := fmt.Sprintf("%s [%s] [%s] %s", level, timestamp, "Crawlab", message) // Send to the same log system as task logs + // Only send if context is not cancelled and connection is available if r.conn != nil { - go r.writeLogLines([]string{internalLog}) + select { + case <-r.ctx.Done(): + // Context cancelled, don't send logs + default: + go r.writeLogLines([]string{internalLog}) + } } // Also log through the standard logger