From 333dfd44c0273e2e53a80cd465f3e5e9345a3859 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Fri, 12 Sep 2025 13:55:44 +0800 Subject: [PATCH] refactor: implement circuit breaker for log connections to prevent flooding during failures --- core/task/handler/runner.go | 21 ++++++++++ core/task/handler/runner_log.go | 74 ++++++++++++++++++++++++++++++++- 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index a741db34..1d94b434 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -52,6 +52,9 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) { ipcTimeout: 60 * time.Second, // generous timeout for all tasks healthCheckInterval: 5 * time.Second, // check process every 5 seconds connHealthInterval: 60 * time.Second, // check connection health every minute + // initialize circuit breaker for log connections + logConnHealthy: true, + logCircuitOpenDuration: 30 * time.Second, // keep circuit open for 30 seconds after failures } // multi error @@ -133,6 +136,14 @@ type Runner struct { connRetryDelay time.Duration // delay between connection retries resourceCleanup *time.Ticker // periodic resource cleanup + // circuit breaker for log connections to prevent cascading failures + logConnHealthy bool // tracks if log connection is healthy + logConnMutex sync.RWMutex // mutex for log connection health state + lastLogSendFailure time.Time // last time log send failed + logCircuitOpenTime time.Time // when circuit breaker was opened + logFailureCount int // consecutive log send failures + logCircuitOpenDuration time.Duration // how long to keep circuit open after failures + // configurable timeouts for robust task execution ipcTimeout time.Duration // timeout for IPC operations healthCheckInterval time.Duration // interval for health checks @@ -668,6 +679,16 @@ func (r *Runner) reconnectWithRetry() error { r.lastConnCheck = time.Now() r.connRetryAttempts = 0 r.Infof("successfully reconnected to task service after %d attempts", attempt+1) + + // Reset log circuit breaker when connection is restored + r.logConnMutex.Lock() + if !r.logConnHealthy { + r.logConnHealthy = true + r.logFailureCount = 0 + r.Logger.Info("log circuit breaker reset after successful reconnection") + } + r.logConnMutex.Unlock() + return nil } diff --git a/core/task/handler/runner_log.go b/core/task/handler/runner_log.go index 918fa82f..e3bb8c79 100644 --- a/core/task/handler/runner_log.go +++ b/core/task/handler/runner_log.go @@ -18,6 +18,12 @@ func (r *Runner) writeLogLines(lines []string) { default: } + // Check circuit breaker for log connections + if !r.isLogCircuitClosed() { + // Circuit is open, don't attempt to send logs to prevent flooding + return + } + // Use connection with mutex for thread safety r.connMutex.RLock() conn := r.conn @@ -26,6 +32,7 @@ func (r *Runner) writeLogLines(lines []string) { // Check if connection is available if conn == nil { r.Debugf("no connection available for sending log lines") + r.recordLogFailure() return } @@ -47,12 +54,16 @@ func (r *Runner) writeLogLines(lines []string) { case <-r.ctx.Done(): return default: - r.Errorf("error sending log lines: %v", err) + // Record failure and open circuit breaker if needed + r.recordLogFailure() // Mark connection as unhealthy for reconnection r.lastConnCheck = time.Time{} } return } + + // Success - reset circuit breaker + r.recordLogSuccess() } // logInternally sends internal runner logs to the same logging system as the task @@ -68,7 +79,8 @@ func (r *Runner) logInternally(level string, message string) { // Send to the same log system as task logs // Only send if context is not cancelled and connection is available - if r.conn != nil { + // AND circuit breaker allows it (prevents cascading log failures) + if r.conn != nil && r.isLogCircuitClosed() { select { case <-r.ctx.Done(): // Context cancelled, don't send logs @@ -129,3 +141,61 @@ func (r *Runner) Debugf(format string, args ...interface{}) { msg := fmt.Sprintf(format, args...) r.logInternally("DEBUG", msg) } + +// Circuit breaker methods for log connection management + +// isLogCircuitClosed checks if the circuit breaker allows log sending +func (r *Runner) isLogCircuitClosed() bool { + r.logConnMutex.RLock() + defer r.logConnMutex.RUnlock() + + // If circuit was opened due to failures, check if enough time has passed to retry + if !r.logConnHealthy { + if time.Since(r.logCircuitOpenTime) > r.logCircuitOpenDuration { + // Time to retry - close the circuit + r.logConnMutex.RUnlock() + r.logConnMutex.Lock() + r.logConnHealthy = true + r.logFailureCount = 0 + r.logConnMutex.Unlock() + r.logConnMutex.RLock() + return true + } + return false + } + + return true +} + +// recordLogFailure records a log sending failure and opens circuit if threshold reached +func (r *Runner) recordLogFailure() { + r.logConnMutex.Lock() + defer r.logConnMutex.Unlock() + + r.logFailureCount++ + r.lastLogSendFailure = time.Now() + + // Open circuit breaker after 3 consecutive failures to prevent log flooding + if r.logFailureCount >= 3 && r.logConnHealthy { + r.logConnHealthy = false + r.logCircuitOpenTime = time.Now() + // Log this through standard logger only (not through writeLogLines to avoid recursion) + r.Logger.Warn(fmt.Sprintf("log circuit breaker opened after %d failures, suppressing log sends for %v", + r.logFailureCount, r.logCircuitOpenDuration)) + } +} + +// recordLogSuccess records a successful log send and resets the circuit breaker +func (r *Runner) recordLogSuccess() { + r.logConnMutex.Lock() + defer r.logConnMutex.Unlock() + + if !r.logConnHealthy || r.logFailureCount > 0 { + // Circuit was open or had failures, now closing it + if !r.logConnHealthy { + r.Logger.Info("log circuit breaker closed - connection restored") + } + r.logConnHealthy = true + r.logFailureCount = 0 + } +}