From 3edd2a1210fe4bf3dc792839344cd89e361de20c Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Sat, 16 Aug 2025 17:42:07 +0800 Subject: [PATCH] refactor: optimize connection health checks to reduce log stream interference; adjust health check intervals and implement non-blocking pings --- core/task/handler/runner.go | 47 ++++++++++++++++++++++++--------- core/task/handler/runner_ipc.go | 3 +++ core/task/handler/runner_log.go | 3 +++ 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index a9520ee1..7a749a4d 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -51,7 +51,7 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) { connRetryDelay: 10 * time.Second, ipcTimeout: 60 * time.Second, // generous timeout for all tasks healthCheckInterval: 5 * time.Second, // check process every 5 seconds - connHealthInterval: 60 * time.Second, // check connection health every minute + connHealthInterval: 5 * time.Minute, // reduced frequency to minimize stream interference } // multi error @@ -579,11 +579,13 @@ func (r *Runner) monitorConnectionHealth() { } // isConnectionHealthy checks if the gRPC connection is still healthy +// Uses a non-blocking approach to prevent interfering with log streams func (r *Runner) isConnectionHealthy() bool { r.connMutex.RLock() - defer r.connMutex.RUnlock() + conn := r.conn + r.connMutex.RUnlock() - if r.conn == nil { + if conn == nil { return false } @@ -595,21 +597,42 @@ func (r *Runner) isConnectionHealthy() bool { default: } - // FIXED: Use proper PING mechanism instead of fake log messages - // This prevents health check messages from polluting the actual log stream + // FIXED: Use a completely non-blocking approach to prevent stream interference + // Instead of sending data that could block the log stream, just check connection state + // and use timing-based health assessment + + // Check if we've had recent successful operations + timeSinceLastCheck := time.Since(r.lastConnCheck) + + // If we haven't checked recently, consider it healthy if not too old + // This prevents health checks from interfering with active log streaming + if timeSinceLastCheck < 2*time.Minute { + r.Debugf("connection considered healthy based on recent activity") + return true + } + + // For older connections, try a non-blocking ping only if no active log streaming + // This is a compromise to avoid blocking the critical log data flow pingMsg := &grpc.TaskServiceConnectRequest{ Code: grpc.TaskServiceConnectCode_PING, TaskId: r.tid.Hex(), - Data: nil, // No data needed for ping + Data: nil, } - // Use a channel to make the Send operation timeout-aware + // Use a very short timeout and non-blocking approach done := make(chan error, 1) go func() { - done <- r.conn.Send(pingMsg) + // Re-acquire lock only for the send operation + r.connMutex.RLock() + defer r.connMutex.RUnlock() + if r.conn != nil { + done <- r.conn.Send(pingMsg) + } else { + done <- fmt.Errorf("connection is nil") + } }() - // Wait for either completion or timeout + // Very short timeout to prevent blocking log operations select { case err := <-done: if err != nil { @@ -618,9 +641,9 @@ func (r *Runner) isConnectionHealthy() bool { } r.Debugf("connection health check successful") return true - case <-time.After(5 * time.Second): - r.Debugf("connection health check timed out") - return false + case <-time.After(1 * time.Second): // Much shorter timeout + r.Debugf("connection health check timed out quickly - assume healthy to avoid blocking logs") + return true // Assume healthy to avoid disrupting log flow case <-r.ctx.Done(): r.Debugf("connection health check cancelled") return false diff --git a/core/task/handler/runner_ipc.go b/core/task/handler/runner_ipc.go index 12920fcc..a61d50d1 100644 --- a/core/task/handler/runner_ipc.go +++ b/core/task/handler/runner_ipc.go @@ -157,6 +157,9 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { } return } + + // Update last successful connection time to help health check avoid unnecessary pings + r.lastConnCheck = time.Now() } } diff --git a/core/task/handler/runner_log.go b/core/task/handler/runner_log.go index 918fa82f..73c516e6 100644 --- a/core/task/handler/runner_log.go +++ b/core/task/handler/runner_log.go @@ -53,6 +53,9 @@ func (r *Runner) writeLogLines(lines []string) { } return } + + // Update last successful connection time to help health check avoid unnecessary pings + r.lastConnCheck = time.Now() } // logInternally sends internal runner logs to the same logging system as the task