diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go index 8ae6eb1f..73458cef 100644 --- a/core/grpc/client/client.go +++ b/core/grpc/client/client.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" ) type GrpcClient struct { @@ -40,9 +41,15 @@ type GrpcClient struct { MetricClient grpc2.MetricServiceClient // Add new fields for state management - state connectivity.State - stateMux sync.RWMutex - reconnect chan struct{} + state connectivity.State + stateMux sync.RWMutex + reconnect chan struct{} + + // Circuit breaker fields + failureCount int + lastFailure time.Time + circuitBreaker bool + cbMux sync.RWMutex } func (c *GrpcClient) Start() { @@ -53,15 +60,12 @@ func (c *GrpcClient) Start() { // start state monitor go c.monitorState() - // connect + // connect (this will also register services) err := c.connect() if err != nil { c.Fatalf("failed to connect: %v", err) return } - - // register rpc services - c.register() }) } @@ -127,6 +131,9 @@ func (c *GrpcClient) IsClosed() (res bool) { } func (c *GrpcClient) monitorState() { + idleStartTime := time.Time{} + idleGracePeriod := 30 * time.Second // Allow IDLE state for 30 seconds before considering it a problem + for { select { case <-c.stop: @@ -144,18 +151,45 @@ func (c *GrpcClient) monitorState() { c.setState(current) c.Infof("state changed from %s to %s", previous, current) - // Trigger reconnect if connection is lost or becomes idle from ready state - if current == connectivity.TransientFailure || - current == connectivity.Shutdown || - (previous == connectivity.Ready && current == connectivity.Idle) { - select { - case c.reconnect <- struct{}{}: - c.Infof("triggering reconnection due to state change to %s", current) - default: + // Handle state transitions more intelligently + switch current { + case connectivity.TransientFailure, connectivity.Shutdown: + // Always reconnect on actual failures, but respect circuit breaker + if !c.isCircuitBreakerOpen() { + select { + case c.reconnect <- struct{}{}: + c.Infof("triggering reconnection due to state change to %s", current) + default: + } + } else { + c.Debugf("circuit breaker open, not triggering reconnection for state %s", current) } + case connectivity.Idle: + if previous == connectivity.Ready { + // Start grace period timer for IDLE state + idleStartTime = time.Now() + c.Debugf("connection went IDLE, starting grace period") + } + case connectivity.Ready: + // Reset idle timer when connection becomes ready + idleStartTime = time.Time{} + // Record successful connection + c.recordSuccess() } } + // Check if IDLE state has exceeded grace period + if current == connectivity.Idle && !idleStartTime.IsZero() && + time.Since(idleStartTime) > idleGracePeriod && !c.isCircuitBreakerOpen() { + c.Warnf("connection has been IDLE for %v, triggering reconnection", time.Since(idleStartTime)) + select { + case c.reconnect <- struct{}{}: + c.Infof("triggering reconnection due to prolonged IDLE state") + default: + } + idleStartTime = time.Time{} // Reset timer to avoid repeated reconnections + } + time.Sleep(time.Second) } } @@ -174,18 +208,32 @@ func (c *GrpcClient) getState() connectivity.State { } func (c *GrpcClient) connect() (err error) { - // Start reconnection loop + // Start reconnection loop with proper cleanup go func() { + defer func() { + if r := recover(); r != nil { + c.Errorf("reconnection loop panic: %v", r) + } + }() + for { select { case <-c.stop: + c.Debugf("reconnection loop stopping") return case <-c.reconnect: - if !c.stopped { + if !c.stopped && !c.isCircuitBreakerOpen() { c.Infof("attempting to reconnect to %s", c.address) if err := c.doConnect(); err != nil { c.Errorf("reconnection failed: %v", err) + c.recordFailure() + // Add a brief delay before allowing next reconnection attempt + time.Sleep(2 * time.Second) + } else { + c.recordSuccess() } + } else if c.isCircuitBreakerOpen() { + c.Debugf("circuit breaker is open, skipping reconnection attempt") } } } @@ -194,19 +242,68 @@ func (c *GrpcClient) connect() (err error) { return c.doConnect() } +// Circuit breaker methods +func (c *GrpcClient) isCircuitBreakerOpen() bool { + c.cbMux.RLock() + defer c.cbMux.RUnlock() + + // Circuit breaker opens after 5 consecutive failures + if c.failureCount >= 5 { + // Auto-recover after 1 minute + if time.Since(c.lastFailure) > 1*time.Minute { + return false + } + return true + } + return false +} + +func (c *GrpcClient) recordFailure() { + c.cbMux.Lock() + defer c.cbMux.Unlock() + c.failureCount++ + c.lastFailure = time.Now() + if c.failureCount >= 5 { + c.Warnf("circuit breaker opened after %d consecutive failures", c.failureCount) + } +} + +func (c *GrpcClient) recordSuccess() { + c.cbMux.Lock() + defer c.cbMux.Unlock() + if c.failureCount > 0 { + c.Infof("connection restored, resetting circuit breaker (was %d failures)", c.failureCount) + } + c.failureCount = 0 + c.lastFailure = time.Time{} +} + func (c *GrpcClient) doConnect() (err error) { op := func() error { - // connection options + // Close existing connection if any + if c.conn != nil { + if err := c.conn.Close(); err != nil { + c.Debugf("failed to close existing connection: %v", err) + } + } + + // connection options with better settings for stability opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithChainUnaryInterceptor(middlewares.GetGrpcClientAuthTokenUnaryChainInterceptor()), grpc.WithChainStreamInterceptor(middlewares.GetGrpcClientAuthTokenStreamChainInterceptor()), + // Add keep-alive settings to maintain connection health + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 30 * time.Second, // Send ping every 30 seconds + Timeout: 5 * time.Second, // Wait 5 seconds for ping response + PermitWithoutStream: true, // Send pings even without active streams + }), } // create new client connection c.conn, err = grpc.NewClient(c.address, opts...) if err != nil { - c.Errorf("failed to connect to %s: %v", c.address, err) + c.Errorf("failed to create connection to %s: %v", c.address, err) return err } @@ -214,26 +311,51 @@ func (c *GrpcClient) doConnect() (err error) { c.Infof("connecting to %s", c.address) c.conn.Connect() - // wait for connection to be ready - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + // wait for connection to be ready with shorter timeout for faster failure detection + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - ok := c.conn.WaitForStateChange(ctx, connectivity.Ready) - if !ok { - return fmt.Errorf("failed to connect to %s: timed out", c.address) + + // Wait for state to change from connecting + for c.conn.GetState() == connectivity.Connecting { + if !c.conn.WaitForStateChange(ctx, connectivity.Connecting) { + return fmt.Errorf("failed to connect to %s: connection timeout", c.address) + } + } + + // Check final state + state := c.conn.GetState() + if state != connectivity.Ready { + return fmt.Errorf("failed to connect to %s: final state is %s", c.address, state) } // success c.Infof("connected to %s", c.address) + + // Register services after successful connection + c.register() return nil } + + // Configure backoff with more reasonable settings b := backoff.NewExponentialBackOff() - b.InitialInterval = 5 * time.Second - b.MaxElapsedTime = 10 * time.Minute + b.InitialInterval = 1 * time.Second // Start with shorter interval + b.MaxInterval = 30 * time.Second // Cap the max interval + b.MaxElapsedTime = 5 * time.Minute // Reduce max retry time + b.Multiplier = 1.5 // Gentler exponential growth + n := func(err error, duration time.Duration) { c.Errorf("failed to connect to %s: %v, retrying in %s", c.address, err, duration) } - return backoff.RetryNotify(op, b, n) + + err = backoff.RetryNotify(op, b, n) + if err != nil { + c.recordFailure() + return err + } + + c.recordSuccess() + return nil } func newGrpcClient() (c *GrpcClient) {