mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
feat: enhance gRPC client with health check functionality and improved connection handling
This commit is contained in:
@@ -3,6 +3,7 @@ package client
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -22,10 +23,10 @@ const (
|
||||
maxFailures = 5
|
||||
cbResetTime = 2 * time.Minute
|
||||
cbHalfOpenRetryInterval = 30 * time.Second
|
||||
healthCheckInterval = 30 * time.Second
|
||||
healthCheckInterval = 2 * time.Minute // Reduced frequency from 30 seconds
|
||||
stateMonitorInterval = 5 * time.Second
|
||||
registrationCheckInterval = 100 * time.Millisecond
|
||||
idleGracePeriod = 30 * time.Second
|
||||
idleGracePeriod = 2 * time.Minute // Increased from 30 seconds
|
||||
connectionTimeout = 30 * time.Second
|
||||
defaultClientTimeout = 5 * time.Second
|
||||
)
|
||||
@@ -108,7 +109,9 @@ type GrpcClient struct {
|
||||
registeredMux sync.RWMutex
|
||||
|
||||
// Health monitoring
|
||||
healthClient grpc_health_v1.HealthClient
|
||||
healthClient grpc_health_v1.HealthClient
|
||||
healthCheckEnabled bool
|
||||
healthCheckMux sync.RWMutex
|
||||
}
|
||||
|
||||
func (c *GrpcClient) Start() {
|
||||
@@ -190,6 +193,9 @@ func (c *GrpcClient) register() {
|
||||
c.metricClient = grpc2.NewMetricServiceClient(c.conn)
|
||||
c.healthClient = grpc_health_v1.NewHealthClient(c.conn)
|
||||
|
||||
// Enable health checks by default for new connections
|
||||
c.setHealthCheckEnabled(true)
|
||||
|
||||
// Mark as registered
|
||||
c.setRegistered(true)
|
||||
}
|
||||
@@ -251,10 +257,14 @@ func (c *GrpcClient) checkAndHandleStateChange(idleStartTime *time.Time) {
|
||||
current := c.conn.GetState()
|
||||
|
||||
if previous == current {
|
||||
// Handle prolonged IDLE state
|
||||
// Handle prolonged IDLE state - but be more lenient
|
||||
if current == connectivity.Idle && !idleStartTime.IsZero() &&
|
||||
time.Since(*idleStartTime) > idleGracePeriod {
|
||||
c.triggerReconnection("prolonged IDLE state")
|
||||
c.Debugf("connection has been IDLE for %v, checking if reconnection is needed", time.Since(*idleStartTime))
|
||||
// Only reconnect if we can't make a simple call
|
||||
if !c.testConnection() {
|
||||
c.triggerReconnection("prolonged IDLE state with failed connection test")
|
||||
}
|
||||
*idleStartTime = time.Time{}
|
||||
}
|
||||
return
|
||||
@@ -265,10 +275,18 @@ func (c *GrpcClient) checkAndHandleStateChange(idleStartTime *time.Time) {
|
||||
c.Infof("connection state: %s -> %s", previous, current)
|
||||
|
||||
switch current {
|
||||
case connectivity.TransientFailure, connectivity.Shutdown:
|
||||
case connectivity.TransientFailure:
|
||||
c.setRegistered(false)
|
||||
c.Warnf("connection in transient failure, will attempt reconnection")
|
||||
c.triggerReconnection(fmt.Sprintf("state change to %s", current))
|
||||
|
||||
case connectivity.Shutdown:
|
||||
c.setRegistered(false)
|
||||
if !c.stopped {
|
||||
c.Errorf("connection shutdown unexpectedly")
|
||||
c.triggerReconnection(fmt.Sprintf("state change to %s", current))
|
||||
}
|
||||
|
||||
case connectivity.Idle:
|
||||
if previous == connectivity.Ready {
|
||||
*idleStartTime = time.Now()
|
||||
@@ -321,6 +339,36 @@ func (c *GrpcClient) IsRegistered() bool {
|
||||
return c.registered
|
||||
}
|
||||
|
||||
func (c *GrpcClient) setHealthCheckEnabled(enabled bool) {
|
||||
c.healthCheckMux.Lock()
|
||||
defer c.healthCheckMux.Unlock()
|
||||
c.healthCheckEnabled = enabled
|
||||
}
|
||||
|
||||
func (c *GrpcClient) isHealthCheckEnabled() bool {
|
||||
c.healthCheckMux.RLock()
|
||||
defer c.healthCheckMux.RUnlock()
|
||||
return c.healthCheckEnabled
|
||||
}
|
||||
|
||||
func (c *GrpcClient) testConnection() bool {
|
||||
if !c.IsReady() || !c.IsRegistered() {
|
||||
return false
|
||||
}
|
||||
|
||||
// Try a simple health check if available, otherwise just check connection state
|
||||
if c.isHealthCheckEnabled() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err := c.healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// If health checks are disabled, just verify the connection state
|
||||
return c.conn != nil && c.conn.GetState() == connectivity.Ready
|
||||
}
|
||||
|
||||
func (c *GrpcClient) WaitForRegistered() {
|
||||
ticker := time.NewTicker(registrationCheckInterval)
|
||||
defer ticker.Stop()
|
||||
@@ -761,7 +809,7 @@ func (c *GrpcClient) startHealthMonitor() {
|
||||
}
|
||||
|
||||
func (c *GrpcClient) performHealthCheck() {
|
||||
if !c.IsReady() || !c.IsRegistered() {
|
||||
if !c.IsReady() || !c.IsRegistered() || !c.isHealthCheckEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -771,6 +819,14 @@ func (c *GrpcClient) performHealthCheck() {
|
||||
_, err := c.healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
|
||||
|
||||
if err != nil {
|
||||
// Check if the error is due to unimplemented health service
|
||||
if strings.Contains(err.Error(), "Unimplemented") && strings.Contains(err.Error(), "grpc.health.v1.Health") {
|
||||
c.Warnf("health service not implemented on server, disabling health checks")
|
||||
c.setHealthCheckEnabled(false)
|
||||
// Don't trigger reconnection for unimplemented health service
|
||||
return
|
||||
}
|
||||
|
||||
c.Warnf("health check failed: %v", err)
|
||||
c.triggerReconnection("health check failure")
|
||||
} else {
|
||||
@@ -780,11 +836,12 @@ func (c *GrpcClient) performHealthCheck() {
|
||||
|
||||
func newGrpcClient() (c *GrpcClient) {
|
||||
client := &GrpcClient{
|
||||
address: utils.GetGrpcAddress(),
|
||||
timeout: 10 * time.Second,
|
||||
stop: make(chan struct{}),
|
||||
Logger: utils.NewLogger("GrpcClient"),
|
||||
state: connectivity.Idle,
|
||||
address: utils.GetGrpcAddress(),
|
||||
timeout: 10 * time.Second,
|
||||
stop: make(chan struct{}),
|
||||
Logger: utils.NewLogger("GrpcClient"),
|
||||
state: connectivity.Idle,
|
||||
healthCheckEnabled: true,
|
||||
}
|
||||
|
||||
return client
|
||||
|
||||
Reference in New Issue
Block a user