From 138bed5c05bfa59f65c8b778f7b97b9babeedc08 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 21 Oct 2025 21:26:57 +0800 Subject: [PATCH] fix(grpc/client): wait for full reconnection readiness before clearing reconnecting flag - add maxReconnectionWait and reconnectionCheckInterval constants for reconnection readiness polling - introduce waitForFullReconnectionReady() to verify: connection READY, clients registered, and ability to obtain critical service clients (model/task) within short timeouts - ensure reconnecting flag is cleared immediately on reconnection failure and only cleared after full readiness checks on success - improve logging around reconnection stabilization and readiness checks --- core/grpc/client/client.go | 85 +++++++++++++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 5 deletions(-) diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go index 8c62afd7..c09cb7f6 100644 --- a/core/grpc/client/client.go +++ b/core/grpc/client/client.go @@ -31,6 +31,8 @@ const ( defaultClientTimeout = 15 * time.Second // Increased from 5s for better reconnection handling reconnectionClientTimeout = 90 * time.Second // Extended timeout during reconnection scenarios (must be > worker reset timeout) connectionStabilizationDelay = 2 * time.Second // Wait after reconnection before declaring success + maxReconnectionWait = 30 * time.Second // Maximum time to wait for full reconnection completion + reconnectionCheckInterval = 500 * time.Millisecond ) // Circuit breaker states @@ -806,11 +808,8 @@ func (c *GrpcClient) executeReconnection() { c.reconnecting = true c.reconnectMux.Unlock() - defer func() { - c.reconnectMux.Lock() - c.reconnecting = false - c.reconnectMux.Unlock() - }() + // Don't use defer to clear reconnecting flag - we need to control the timing precisely + // to ensure all dependent services have completed registration before clearing it c.Infof("executing reconnection to %s (current state: %s)", c.address, c.getState()) @@ -818,6 +817,11 @@ func (c *GrpcClient) executeReconnection() { c.Errorf("reconnection failed: %v", err) c.recordFailure() + // Clear reconnecting flag on failure + c.reconnectMux.Lock() + c.reconnecting = false + c.reconnectMux.Unlock() + // Exponential backoff before allowing next attempt backoffDuration := c.calculateBackoff() c.Warnf("will retry reconnection after %v backoff", backoffDuration) @@ -837,9 +841,80 @@ func (c *GrpcClient) executeReconnection() { } else { c.Warnf("connection became unstable during stabilization") } + + // Wait for full registration and service readiness before clearing reconnecting flag + // This ensures all dependent services can successfully get their clients with extended timeout + if c.waitForFullReconnectionReady() { + c.Infof("reconnection fully complete, all services ready") + } else { + c.Warnf("reconnection completed but some checks didn't pass within timeout") + } + + // Now it's safe to clear the reconnecting flag + c.reconnectMux.Lock() + c.reconnecting = false + c.reconnectMux.Unlock() + c.Infof("resuming normal operation mode") } } +// waitForFullReconnectionReady waits for the client to be fully ready after reconnection +// by verifying all critical service clients can be successfully obtained. +// This ensures dependent services won't fail with "context cancelled" errors. +func (c *GrpcClient) waitForFullReconnectionReady() bool { + c.Debugf("waiting for full reconnection readiness (max %v)", maxReconnectionWait) + startTime := time.Now() + checkCount := 0 + + for time.Since(startTime) < maxReconnectionWait { + checkCount++ + + // Check 1: Connection must be in READY state + if c.conn == nil || c.conn.GetState() != connectivity.Ready { + c.Debugf("check %d: connection not ready (state: %v)", checkCount, c.getState()) + time.Sleep(reconnectionCheckInterval) + continue + } + + // Check 2: Client must be registered + if !c.IsRegistered() { + c.Debugf("check %d: client not yet registered", checkCount) + time.Sleep(reconnectionCheckInterval) + continue + } + + // Check 3: Verify we can actually get service clients without timeout + // This is the critical check that ensures dependent services won't fail + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + + // Try to get model base service client (most commonly used by TaskHandler) + _, err1 := c.GetModelBaseServiceClientWithContext(ctx) + + // Try to get task client (used for task operations) + _, err2 := c.GetTaskClientWithContext(ctx) + + cancel() + + if err1 != nil || err2 != nil { + c.Debugf("check %d: service clients not ready (model: %v, task: %v)", + checkCount, err1, err2) + time.Sleep(reconnectionCheckInterval) + continue + } + + // All checks passed! + elapsed := time.Since(startTime) + c.Infof("full reconnection readiness achieved after %v (%d checks)", elapsed, checkCount) + return true + } + + // Timeout reached + elapsed := time.Since(startTime) + c.Warnf("reconnection readiness checks did not complete within %v (elapsed: %v, checks: %d)", + maxReconnectionWait, elapsed, checkCount) + return false +} + // Enhanced circuit breaker methods func (c *GrpcClient) getCircuitBreakerState() circuitBreakerState { c.cbMux.RLock()