fix: goroutine cleanup and error handling during shutdown

This commit is contained in:
Marvin Zhang
2025-06-11 22:30:37 +08:00
parent ca069cb570
commit 622dce51c3

View File

@@ -200,11 +200,34 @@ func (r *Runner) Run() (err error) {
// Ensure cleanup when Run() exits
defer func() {
_ = r.conn.CloseSend() // Close gRPC connection
r.cancel() // Cancel context to stop all goroutines
r.wg.Wait() // Wait for all goroutines to finish
close(r.done) // Signal that everything is done
close(r.ipcChan) // Close IPC channel
// 1. Signal all goroutines to stop
r.cancel()
// 2. Wait for all goroutines to finish with timeout
done := make(chan struct{})
go func() {
r.wg.Wait()
close(done)
}()
select {
case <-done:
// All goroutines finished normally
case <-time.After(5 * time.Second):
// Timeout waiting for goroutines, proceed with cleanup
r.Warnf("timeout waiting for goroutines to finish, proceeding with cleanup")
}
// 3. Close gRPC connection after all goroutines have stopped
if r.conn != nil {
_ = r.conn.CloseSend()
}
// 4. Close channels after everything has stopped
close(r.done)
if r.ipcChan != nil {
close(r.ipcChan)
}
}()
// wait for process to finish
@@ -749,6 +772,18 @@ func (r *Runner) initConnection() (err error) {
// writeLogLines marshals log lines to JSON and sends them to the task service
func (r *Runner) writeLogLines(lines []string) {
// Check if context is cancelled or connection is closed
select {
case <-r.ctx.Done():
return
default:
}
// Check if connection is available
if r.conn == nil {
return
}
linesBytes, err := json.Marshal(lines)
if err != nil {
r.Errorf("error marshaling log lines: %v", err)
@@ -760,7 +795,13 @@ func (r *Runner) writeLogLines(lines []string) {
Data: linesBytes,
}
if err := r.conn.Send(msg); err != nil {
r.Errorf("error sending log lines: %v", err)
// Don't log errors if context is cancelled (expected during shutdown)
select {
case <-r.ctx.Done():
return
default:
r.Errorf("error sending log lines: %v", err)
}
return
}
}
@@ -896,6 +937,9 @@ func (r *Runner) configureCwd() {
// handleIPC processes incoming IPC messages from the child process
// Messages are converted to JSON and written to the child process's stdin
func (r *Runner) handleIPC() {
r.wg.Add(1)
defer r.wg.Done()
for msg := range r.ipcChan {
// Convert message to JSON
jsonData, err := json.Marshal(msg)
@@ -1028,6 +1072,13 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
return
}
// Check if context is cancelled or connection is closed
select {
case <-r.ctx.Done():
return
default:
}
// Validate connection
if r.conn == nil {
r.Errorf("gRPC connection not initialized")
@@ -1050,9 +1101,17 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
case <-ctx.Done():
r.Errorf("timeout sending IPC message")
return
case <-r.ctx.Done():
return
default:
if err := r.conn.Send(grpcMsg); err != nil {
r.Errorf("error sending IPC message: %v", err)
// Don't log errors if context is cancelled (expected during shutdown)
select {
case <-r.ctx.Done():
return
default:
r.Errorf("error sending IPC message: %v", err)
}
return
}
}
@@ -1153,8 +1212,14 @@ func (r *Runner) logInternally(level string, message string) {
internalLog := fmt.Sprintf("%s [%s] [%s] %s", level, timestamp, "Crawlab", message)
// Send to the same log system as task logs
// Only send if context is not cancelled and connection is available
if r.conn != nil {
go r.writeLogLines([]string{internalLog})
select {
case <-r.ctx.Done():
// Context cancelled, don't send logs
default:
go r.writeLogLines([]string{internalLog})
}
}
// Also log through the standard logger