diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go index 956fb8ea..272d2747 100644 --- a/core/grpc/client/client.go +++ b/core/grpc/client/client.go @@ -3,6 +3,7 @@ package client import ( "context" "fmt" + "strings" "sync" "time" @@ -22,10 +23,10 @@ const ( maxFailures = 5 cbResetTime = 2 * time.Minute cbHalfOpenRetryInterval = 30 * time.Second - healthCheckInterval = 30 * time.Second + healthCheckInterval = 2 * time.Minute // Reduced frequency from 30 seconds stateMonitorInterval = 5 * time.Second registrationCheckInterval = 100 * time.Millisecond - idleGracePeriod = 30 * time.Second + idleGracePeriod = 2 * time.Minute // Increased from 30 seconds connectionTimeout = 30 * time.Second defaultClientTimeout = 5 * time.Second ) @@ -108,7 +109,9 @@ type GrpcClient struct { registeredMux sync.RWMutex // Health monitoring - healthClient grpc_health_v1.HealthClient + healthClient grpc_health_v1.HealthClient + healthCheckEnabled bool + healthCheckMux sync.RWMutex } func (c *GrpcClient) Start() { @@ -190,6 +193,9 @@ func (c *GrpcClient) register() { c.metricClient = grpc2.NewMetricServiceClient(c.conn) c.healthClient = grpc_health_v1.NewHealthClient(c.conn) + // Enable health checks by default for new connections + c.setHealthCheckEnabled(true) + // Mark as registered c.setRegistered(true) } @@ -251,10 +257,14 @@ func (c *GrpcClient) checkAndHandleStateChange(idleStartTime *time.Time) { current := c.conn.GetState() if previous == current { - // Handle prolonged IDLE state + // Handle prolonged IDLE state - but be more lenient if current == connectivity.Idle && !idleStartTime.IsZero() && time.Since(*idleStartTime) > idleGracePeriod { - c.triggerReconnection("prolonged IDLE state") + c.Debugf("connection has been IDLE for %v, checking if reconnection is needed", time.Since(*idleStartTime)) + // Only reconnect if we can't make a simple call + if !c.testConnection() { + c.triggerReconnection("prolonged IDLE state with failed connection test") + } *idleStartTime = time.Time{} } return @@ -265,10 +275,18 @@ func (c *GrpcClient) checkAndHandleStateChange(idleStartTime *time.Time) { c.Infof("connection state: %s -> %s", previous, current) switch current { - case connectivity.TransientFailure, connectivity.Shutdown: + case connectivity.TransientFailure: c.setRegistered(false) + c.Warnf("connection in transient failure, will attempt reconnection") c.triggerReconnection(fmt.Sprintf("state change to %s", current)) + case connectivity.Shutdown: + c.setRegistered(false) + if !c.stopped { + c.Errorf("connection shutdown unexpectedly") + c.triggerReconnection(fmt.Sprintf("state change to %s", current)) + } + case connectivity.Idle: if previous == connectivity.Ready { *idleStartTime = time.Now() @@ -321,6 +339,36 @@ func (c *GrpcClient) IsRegistered() bool { return c.registered } +func (c *GrpcClient) setHealthCheckEnabled(enabled bool) { + c.healthCheckMux.Lock() + defer c.healthCheckMux.Unlock() + c.healthCheckEnabled = enabled +} + +func (c *GrpcClient) isHealthCheckEnabled() bool { + c.healthCheckMux.RLock() + defer c.healthCheckMux.RUnlock() + return c.healthCheckEnabled +} + +func (c *GrpcClient) testConnection() bool { + if !c.IsReady() || !c.IsRegistered() { + return false + } + + // Try a simple health check if available, otherwise just check connection state + if c.isHealthCheckEnabled() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _, err := c.healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + return err == nil + } + + // If health checks are disabled, just verify the connection state + return c.conn != nil && c.conn.GetState() == connectivity.Ready +} + func (c *GrpcClient) WaitForRegistered() { ticker := time.NewTicker(registrationCheckInterval) defer ticker.Stop() @@ -761,7 +809,7 @@ func (c *GrpcClient) startHealthMonitor() { } func (c *GrpcClient) performHealthCheck() { - if !c.IsReady() || !c.IsRegistered() { + if !c.IsReady() || !c.IsRegistered() || !c.isHealthCheckEnabled() { return } @@ -771,6 +819,14 @@ func (c *GrpcClient) performHealthCheck() { _, err := c.healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) if err != nil { + // Check if the error is due to unimplemented health service + if strings.Contains(err.Error(), "Unimplemented") && strings.Contains(err.Error(), "grpc.health.v1.Health") { + c.Warnf("health service not implemented on server, disabling health checks") + c.setHealthCheckEnabled(false) + // Don't trigger reconnection for unimplemented health service + return + } + c.Warnf("health check failed: %v", err) c.triggerReconnection("health check failure") } else { @@ -780,11 +836,12 @@ func (c *GrpcClient) performHealthCheck() { func newGrpcClient() (c *GrpcClient) { client := &GrpcClient{ - address: utils.GetGrpcAddress(), - timeout: 10 * time.Second, - stop: make(chan struct{}), - Logger: utils.NewLogger("GrpcClient"), - state: connectivity.Idle, + address: utils.GetGrpcAddress(), + timeout: 10 * time.Second, + stop: make(chan struct{}), + Logger: utils.NewLogger("GrpcClient"), + state: connectivity.Idle, + healthCheckEnabled: true, } return client