mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
fix: enhance gRPC client connection management with circuit breaker and keep-alive settings
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
type GrpcClient struct {
|
||||
@@ -40,9 +41,15 @@ type GrpcClient struct {
|
||||
MetricClient grpc2.MetricServiceClient
|
||||
|
||||
// Add new fields for state management
|
||||
state connectivity.State
|
||||
stateMux sync.RWMutex
|
||||
reconnect chan struct{}
|
||||
state connectivity.State
|
||||
stateMux sync.RWMutex
|
||||
reconnect chan struct{}
|
||||
|
||||
// Circuit breaker fields
|
||||
failureCount int
|
||||
lastFailure time.Time
|
||||
circuitBreaker bool
|
||||
cbMux sync.RWMutex
|
||||
}
|
||||
|
||||
func (c *GrpcClient) Start() {
|
||||
@@ -53,15 +60,12 @@ func (c *GrpcClient) Start() {
|
||||
// start state monitor
|
||||
go c.monitorState()
|
||||
|
||||
// connect
|
||||
// connect (this will also register services)
|
||||
err := c.connect()
|
||||
if err != nil {
|
||||
c.Fatalf("failed to connect: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// register rpc services
|
||||
c.register()
|
||||
})
|
||||
}
|
||||
|
||||
@@ -127,6 +131,9 @@ func (c *GrpcClient) IsClosed() (res bool) {
|
||||
}
|
||||
|
||||
func (c *GrpcClient) monitorState() {
|
||||
idleStartTime := time.Time{}
|
||||
idleGracePeriod := 30 * time.Second // Allow IDLE state for 30 seconds before considering it a problem
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.stop:
|
||||
@@ -144,18 +151,45 @@ func (c *GrpcClient) monitorState() {
|
||||
c.setState(current)
|
||||
c.Infof("state changed from %s to %s", previous, current)
|
||||
|
||||
// Trigger reconnect if connection is lost or becomes idle from ready state
|
||||
if current == connectivity.TransientFailure ||
|
||||
current == connectivity.Shutdown ||
|
||||
(previous == connectivity.Ready && current == connectivity.Idle) {
|
||||
select {
|
||||
case c.reconnect <- struct{}{}:
|
||||
c.Infof("triggering reconnection due to state change to %s", current)
|
||||
default:
|
||||
// Handle state transitions more intelligently
|
||||
switch current {
|
||||
case connectivity.TransientFailure, connectivity.Shutdown:
|
||||
// Always reconnect on actual failures, but respect circuit breaker
|
||||
if !c.isCircuitBreakerOpen() {
|
||||
select {
|
||||
case c.reconnect <- struct{}{}:
|
||||
c.Infof("triggering reconnection due to state change to %s", current)
|
||||
default:
|
||||
}
|
||||
} else {
|
||||
c.Debugf("circuit breaker open, not triggering reconnection for state %s", current)
|
||||
}
|
||||
case connectivity.Idle:
|
||||
if previous == connectivity.Ready {
|
||||
// Start grace period timer for IDLE state
|
||||
idleStartTime = time.Now()
|
||||
c.Debugf("connection went IDLE, starting grace period")
|
||||
}
|
||||
case connectivity.Ready:
|
||||
// Reset idle timer when connection becomes ready
|
||||
idleStartTime = time.Time{}
|
||||
// Record successful connection
|
||||
c.recordSuccess()
|
||||
}
|
||||
}
|
||||
|
||||
// Check if IDLE state has exceeded grace period
|
||||
if current == connectivity.Idle && !idleStartTime.IsZero() &&
|
||||
time.Since(idleStartTime) > idleGracePeriod && !c.isCircuitBreakerOpen() {
|
||||
c.Warnf("connection has been IDLE for %v, triggering reconnection", time.Since(idleStartTime))
|
||||
select {
|
||||
case c.reconnect <- struct{}{}:
|
||||
c.Infof("triggering reconnection due to prolonged IDLE state")
|
||||
default:
|
||||
}
|
||||
idleStartTime = time.Time{} // Reset timer to avoid repeated reconnections
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
@@ -174,18 +208,32 @@ func (c *GrpcClient) getState() connectivity.State {
|
||||
}
|
||||
|
||||
func (c *GrpcClient) connect() (err error) {
|
||||
// Start reconnection loop
|
||||
// Start reconnection loop with proper cleanup
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
c.Errorf("reconnection loop panic: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.stop:
|
||||
c.Debugf("reconnection loop stopping")
|
||||
return
|
||||
case <-c.reconnect:
|
||||
if !c.stopped {
|
||||
if !c.stopped && !c.isCircuitBreakerOpen() {
|
||||
c.Infof("attempting to reconnect to %s", c.address)
|
||||
if err := c.doConnect(); err != nil {
|
||||
c.Errorf("reconnection failed: %v", err)
|
||||
c.recordFailure()
|
||||
// Add a brief delay before allowing next reconnection attempt
|
||||
time.Sleep(2 * time.Second)
|
||||
} else {
|
||||
c.recordSuccess()
|
||||
}
|
||||
} else if c.isCircuitBreakerOpen() {
|
||||
c.Debugf("circuit breaker is open, skipping reconnection attempt")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -194,19 +242,68 @@ func (c *GrpcClient) connect() (err error) {
|
||||
return c.doConnect()
|
||||
}
|
||||
|
||||
// Circuit breaker methods
|
||||
func (c *GrpcClient) isCircuitBreakerOpen() bool {
|
||||
c.cbMux.RLock()
|
||||
defer c.cbMux.RUnlock()
|
||||
|
||||
// Circuit breaker opens after 5 consecutive failures
|
||||
if c.failureCount >= 5 {
|
||||
// Auto-recover after 1 minute
|
||||
if time.Since(c.lastFailure) > 1*time.Minute {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *GrpcClient) recordFailure() {
|
||||
c.cbMux.Lock()
|
||||
defer c.cbMux.Unlock()
|
||||
c.failureCount++
|
||||
c.lastFailure = time.Now()
|
||||
if c.failureCount >= 5 {
|
||||
c.Warnf("circuit breaker opened after %d consecutive failures", c.failureCount)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GrpcClient) recordSuccess() {
|
||||
c.cbMux.Lock()
|
||||
defer c.cbMux.Unlock()
|
||||
if c.failureCount > 0 {
|
||||
c.Infof("connection restored, resetting circuit breaker (was %d failures)", c.failureCount)
|
||||
}
|
||||
c.failureCount = 0
|
||||
c.lastFailure = time.Time{}
|
||||
}
|
||||
|
||||
func (c *GrpcClient) doConnect() (err error) {
|
||||
op := func() error {
|
||||
// connection options
|
||||
// Close existing connection if any
|
||||
if c.conn != nil {
|
||||
if err := c.conn.Close(); err != nil {
|
||||
c.Debugf("failed to close existing connection: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// connection options with better settings for stability
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithChainUnaryInterceptor(middlewares.GetGrpcClientAuthTokenUnaryChainInterceptor()),
|
||||
grpc.WithChainStreamInterceptor(middlewares.GetGrpcClientAuthTokenStreamChainInterceptor()),
|
||||
// Add keep-alive settings to maintain connection health
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 30 * time.Second, // Send ping every 30 seconds
|
||||
Timeout: 5 * time.Second, // Wait 5 seconds for ping response
|
||||
PermitWithoutStream: true, // Send pings even without active streams
|
||||
}),
|
||||
}
|
||||
|
||||
// create new client connection
|
||||
c.conn, err = grpc.NewClient(c.address, opts...)
|
||||
if err != nil {
|
||||
c.Errorf("failed to connect to %s: %v", c.address, err)
|
||||
c.Errorf("failed to create connection to %s: %v", c.address, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -214,26 +311,51 @@ func (c *GrpcClient) doConnect() (err error) {
|
||||
c.Infof("connecting to %s", c.address)
|
||||
c.conn.Connect()
|
||||
|
||||
// wait for connection to be ready
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
// wait for connection to be ready with shorter timeout for faster failure detection
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
ok := c.conn.WaitForStateChange(ctx, connectivity.Ready)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to connect to %s: timed out", c.address)
|
||||
|
||||
// Wait for state to change from connecting
|
||||
for c.conn.GetState() == connectivity.Connecting {
|
||||
if !c.conn.WaitForStateChange(ctx, connectivity.Connecting) {
|
||||
return fmt.Errorf("failed to connect to %s: connection timeout", c.address)
|
||||
}
|
||||
}
|
||||
|
||||
// Check final state
|
||||
state := c.conn.GetState()
|
||||
if state != connectivity.Ready {
|
||||
return fmt.Errorf("failed to connect to %s: final state is %s", c.address, state)
|
||||
}
|
||||
|
||||
// success
|
||||
c.Infof("connected to %s", c.address)
|
||||
|
||||
// Register services after successful connection
|
||||
c.register()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Configure backoff with more reasonable settings
|
||||
b := backoff.NewExponentialBackOff()
|
||||
b.InitialInterval = 5 * time.Second
|
||||
b.MaxElapsedTime = 10 * time.Minute
|
||||
b.InitialInterval = 1 * time.Second // Start with shorter interval
|
||||
b.MaxInterval = 30 * time.Second // Cap the max interval
|
||||
b.MaxElapsedTime = 5 * time.Minute // Reduce max retry time
|
||||
b.Multiplier = 1.5 // Gentler exponential growth
|
||||
|
||||
n := func(err error, duration time.Duration) {
|
||||
c.Errorf("failed to connect to %s: %v, retrying in %s", c.address, err, duration)
|
||||
}
|
||||
return backoff.RetryNotify(op, b, n)
|
||||
|
||||
err = backoff.RetryNotify(op, b, n)
|
||||
if err != nil {
|
||||
c.recordFailure()
|
||||
return err
|
||||
}
|
||||
|
||||
c.recordSuccess()
|
||||
return nil
|
||||
}
|
||||
|
||||
func newGrpcClient() (c *GrpcClient) {
|
||||
|
||||
Reference in New Issue
Block a user