From 2dfc66743b948705f3385e4dc5dbf93313bcb9a8 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Mon, 20 Oct 2025 13:01:10 +0800 Subject: [PATCH] fix(grpc/client,node/task/handler): add RetryWithBackoff, stabilize reconnection, and retry gRPC ops - add RetryWithBackoff helper to grpc client for exponential retry with backoff and reconnection-aware handling - increase reconnectionClientTimeout to 90s and introduce connectionStabilizationDelay; wait briefly after reconnection to avoid immediate flapping - refresh reconnection flag while waiting for client registration and improve cancellation message - replace direct heartbeat RPC with RetryWithBackoff in WorkerService (use extended timeout) - use RetryWithBackoff for worker node status updates in task handler and propagate errors --- core/grpc/client/client.go | 88 +++++++++++++++++++++++++---- core/node/service/worker_service.go | 27 +++++---- core/task/handler/service.go | 20 +++++-- 3 files changed, 109 insertions(+), 26 deletions(-) diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go index 010e2a6a..f68f3e09 100644 --- a/core/grpc/client/client.go +++ b/core/grpc/client/client.go @@ -20,16 +20,17 @@ import ( // Circuit breaker constants const ( - maxFailures = 5 - cbResetTime = 2 * time.Minute - cbHalfOpenRetryInterval = 30 * time.Second - healthCheckInterval = 2 * time.Minute // Reduced frequency from 30 seconds - stateMonitorInterval = 5 * time.Second - registrationCheckInterval = 100 * time.Millisecond - idleGracePeriod = 2 * time.Minute // Increased from 30 seconds - connectionTimeout = 30 * time.Second - defaultClientTimeout = 15 * time.Second // Increased from 5s for better reconnection handling - reconnectionClientTimeout = 60 * time.Second // Extended timeout during reconnection scenarios + maxFailures = 5 + cbResetTime = 2 * time.Minute + cbHalfOpenRetryInterval = 30 * time.Second + healthCheckInterval = 2 * time.Minute // Reduced frequency from 30 seconds + stateMonitorInterval = 5 * time.Second + registrationCheckInterval = 100 * time.Millisecond + idleGracePeriod = 2 * time.Minute // Increased from 30 seconds + connectionTimeout = 30 * time.Second + defaultClientTimeout = 15 * time.Second // Increased from 5s for better reconnection handling + reconnectionClientTimeout = 90 * time.Second // Extended timeout during reconnection scenarios (must be > worker reset timeout) + connectionStabilizationDelay = 2 * time.Second // Wait after reconnection before declaring success ) // Circuit breaker states @@ -49,6 +50,54 @@ func min(a, b int) int { return b } +// RetryWithBackoff retries an operation up to maxAttempts times with exponential backoff. +// It detects "reconnection in progress" errors and retries appropriately. +// Returns the last error if all attempts fail, or nil on success. +func RetryWithBackoff(ctx context.Context, operation func() error, maxAttempts int, logger interfaces.Logger, operationName string) error { + var lastErr error + for attempt := 0; attempt < maxAttempts; attempt++ { + if attempt > 0 { + // Exponential backoff: 1s, 2s, 4s, ... + backoffDelay := time.Duration(1< 0 && logger != nil { + logger.Infof("%s succeeded after %d attempts", operationName, attempt+1) + } + return nil + } + + lastErr = err + // Check if error indicates reconnection in progress + if strings.Contains(err.Error(), "reconnection in progress") { + if logger != nil { + logger.Debugf("%s waiting for reconnection (attempt %d/%d): %v", operationName, attempt+1, maxAttempts, err) + } + continue + } + + if logger != nil { + logger.Debugf("%s failed (attempt %d/%d): %v", operationName, attempt+1, maxAttempts, err) + } + } + + return fmt.Errorf("%s failed after %d attempts: %v", operationName, maxAttempts, lastErr) +} + // GrpcClient provides a robust gRPC client with connection management and client registration. // // The client handles connection lifecycle and ensures that gRPC service clients are properly @@ -695,12 +744,17 @@ func (c *GrpcClient) getClientWithContext(ctx context.Context, getter func() int select { case <-ctx.Done(): if isReconnecting { - return nil, fmt.Errorf("context cancelled while waiting for %s client registration during reconnection (this is normal during network restoration)", clientType) + return nil, fmt.Errorf("context cancelled while waiting for %s client registration: reconnection in progress, retry recommended", clientType) } return nil, fmt.Errorf("context cancelled while waiting for %s client registration", clientType) case <-c.stop: return nil, fmt.Errorf("client stopped while waiting for %s client registration", clientType) case <-ticker.C: + // Update reconnection status in case it changed + c.reconnectMux.Lock() + isReconnecting = c.reconnecting + c.reconnectMux.Unlock() + if c.IsRegistered() { return getter(), nil } @@ -771,6 +825,18 @@ func (c *GrpcClient) executeReconnection() { } else { c.recordSuccess() c.Infof("reconnection successful - connection state: %s, registered: %v", c.getState(), c.IsRegistered()) + + // Stabilization: wait a moment to ensure connection is truly stable + // This prevents immediate flapping if the network is still unstable + c.Debugf("stabilizing connection for %v", connectionStabilizationDelay) + time.Sleep(connectionStabilizationDelay) + + // Verify connection is still stable after delay + if c.conn != nil && c.conn.GetState() == connectivity.Ready { + c.Infof("connection stabilization successful") + } else { + c.Warnf("connection became unstable during stabilization") + } } } diff --git a/core/node/service/worker_service.go b/core/node/service/worker_service.go index 658e7a51..6eb97ae0 100644 --- a/core/node/service/worker_service.go +++ b/core/node/service/worker_service.go @@ -338,18 +338,25 @@ func (svc *WorkerService) subscribe() { } func (svc *WorkerService) sendHeartbeat() { - ctx, cancel := context.WithTimeout(svc.ctx, svc.heartbeatInterval) + // Use extended timeout to allow for reconnection scenarios + ctx, cancel := context.WithTimeout(svc.ctx, 30*time.Second) defer cancel() - nodeClient, err := client.GetGrpcClient().GetNodeClient() + + // Retry up to 3 times with exponential backoff for reconnection scenarios + err := client.RetryWithBackoff(ctx, func() error { + nodeClient, err := client.GetGrpcClient().GetNodeClient() + if err != nil { + return err + } + + _, err = nodeClient.SendHeartbeat(ctx, &grpc.NodeServiceSendHeartbeatRequest{ + NodeKey: svc.cfgSvc.GetNodeKey(), + }) + return err + }, 3, svc.Logger, "heartbeat") + if err != nil { - svc.Errorf("failed to get node client: %v", err) - return - } - _, err = nodeClient.SendHeartbeat(ctx, &grpc.NodeServiceSendHeartbeatRequest{ - NodeKey: svc.cfgSvc.GetNodeKey(), - }) - if err != nil { - svc.Errorf("failed to send heartbeat to master: %v", err) + svc.Errorf("failed to send heartbeat: %v", err) } } diff --git a/core/task/handler/service.go b/core/task/handler/service.go index 03f6c9d1..0598b1a7 100644 --- a/core/task/handler/service.go +++ b/core/task/handler/service.go @@ -438,15 +438,25 @@ func (svc *Service) updateNodeStatus() (err error) { currentGoroutines := runtime.NumGoroutine() svc.Debugf("Node status update - runners: %d, goroutines: %d", n.CurrentRunners, currentGoroutines) - // save node + // save node with retry for reconnection scenarios n.SetUpdated(n.CreatedBy) if svc.cfgSvc.IsMaster() { err = service.NewModelService[models.Node]().ReplaceById(n.Id, *n) + if err != nil { + return err + } } else { - err = client.NewModelService[models.Node]().ReplaceById(n.Id, *n) - } - if err != nil { - return err + // Worker node: use gRPC with retry logic + ctx, cancel := context.WithTimeout(svc.ctx, 30*time.Second) + defer cancel() + + err = grpcclient.RetryWithBackoff(ctx, func() error { + return client.NewModelService[models.Node]().ReplaceById(n.Id, *n) + }, 3, svc.Logger, "node status update") + + if err != nil { + return err + } } return nil