mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
fix(grpc/client,node/task/handler): add RetryWithBackoff, stabilize reconnection, and retry gRPC ops
- add RetryWithBackoff helper to grpc client for exponential retry with backoff and reconnection-aware handling - increase reconnectionClientTimeout to 90s and introduce connectionStabilizationDelay; wait briefly after reconnection to avoid immediate flapping - refresh reconnection flag while waiting for client registration and improve cancellation message - replace direct heartbeat RPC with RetryWithBackoff in WorkerService (use extended timeout) - use RetryWithBackoff for worker node status updates in task handler and propagate errors
This commit is contained in:
@@ -20,16 +20,17 @@ import (
|
|||||||
|
|
||||||
// Circuit breaker constants
|
// Circuit breaker constants
|
||||||
const (
|
const (
|
||||||
maxFailures = 5
|
maxFailures = 5
|
||||||
cbResetTime = 2 * time.Minute
|
cbResetTime = 2 * time.Minute
|
||||||
cbHalfOpenRetryInterval = 30 * time.Second
|
cbHalfOpenRetryInterval = 30 * time.Second
|
||||||
healthCheckInterval = 2 * time.Minute // Reduced frequency from 30 seconds
|
healthCheckInterval = 2 * time.Minute // Reduced frequency from 30 seconds
|
||||||
stateMonitorInterval = 5 * time.Second
|
stateMonitorInterval = 5 * time.Second
|
||||||
registrationCheckInterval = 100 * time.Millisecond
|
registrationCheckInterval = 100 * time.Millisecond
|
||||||
idleGracePeriod = 2 * time.Minute // Increased from 30 seconds
|
idleGracePeriod = 2 * time.Minute // Increased from 30 seconds
|
||||||
connectionTimeout = 30 * time.Second
|
connectionTimeout = 30 * time.Second
|
||||||
defaultClientTimeout = 15 * time.Second // Increased from 5s for better reconnection handling
|
defaultClientTimeout = 15 * time.Second // Increased from 5s for better reconnection handling
|
||||||
reconnectionClientTimeout = 60 * time.Second // Extended timeout during reconnection scenarios
|
reconnectionClientTimeout = 90 * time.Second // Extended timeout during reconnection scenarios (must be > worker reset timeout)
|
||||||
|
connectionStabilizationDelay = 2 * time.Second // Wait after reconnection before declaring success
|
||||||
)
|
)
|
||||||
|
|
||||||
// Circuit breaker states
|
// Circuit breaker states
|
||||||
@@ -49,6 +50,54 @@ func min(a, b int) int {
|
|||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RetryWithBackoff retries an operation up to maxAttempts times with exponential backoff.
|
||||||
|
// It detects "reconnection in progress" errors and retries appropriately.
|
||||||
|
// Returns the last error if all attempts fail, or nil on success.
|
||||||
|
func RetryWithBackoff(ctx context.Context, operation func() error, maxAttempts int, logger interfaces.Logger, operationName string) error {
|
||||||
|
var lastErr error
|
||||||
|
for attempt := 0; attempt < maxAttempts; attempt++ {
|
||||||
|
if attempt > 0 {
|
||||||
|
// Exponential backoff: 1s, 2s, 4s, ...
|
||||||
|
backoffDelay := time.Duration(1<<uint(attempt-1)) * time.Second
|
||||||
|
if logger != nil {
|
||||||
|
logger.Debugf("retrying %s after %v (attempt %d/%d)", operationName, backoffDelay, attempt+1, maxAttempts)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
if logger != nil {
|
||||||
|
logger.Debugf("%s retry cancelled due to context", operationName)
|
||||||
|
}
|
||||||
|
return ctx.Err()
|
||||||
|
case <-time.After(backoffDelay):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := operation()
|
||||||
|
if err == nil {
|
||||||
|
if attempt > 0 && logger != nil {
|
||||||
|
logger.Infof("%s succeeded after %d attempts", operationName, attempt+1)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
lastErr = err
|
||||||
|
// Check if error indicates reconnection in progress
|
||||||
|
if strings.Contains(err.Error(), "reconnection in progress") {
|
||||||
|
if logger != nil {
|
||||||
|
logger.Debugf("%s waiting for reconnection (attempt %d/%d): %v", operationName, attempt+1, maxAttempts, err)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if logger != nil {
|
||||||
|
logger.Debugf("%s failed (attempt %d/%d): %v", operationName, attempt+1, maxAttempts, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("%s failed after %d attempts: %v", operationName, maxAttempts, lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
// GrpcClient provides a robust gRPC client with connection management and client registration.
|
// GrpcClient provides a robust gRPC client with connection management and client registration.
|
||||||
//
|
//
|
||||||
// The client handles connection lifecycle and ensures that gRPC service clients are properly
|
// The client handles connection lifecycle and ensures that gRPC service clients are properly
|
||||||
@@ -695,12 +744,17 @@ func (c *GrpcClient) getClientWithContext(ctx context.Context, getter func() int
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
if isReconnecting {
|
if isReconnecting {
|
||||||
return nil, fmt.Errorf("context cancelled while waiting for %s client registration during reconnection (this is normal during network restoration)", clientType)
|
return nil, fmt.Errorf("context cancelled while waiting for %s client registration: reconnection in progress, retry recommended", clientType)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("context cancelled while waiting for %s client registration", clientType)
|
return nil, fmt.Errorf("context cancelled while waiting for %s client registration", clientType)
|
||||||
case <-c.stop:
|
case <-c.stop:
|
||||||
return nil, fmt.Errorf("client stopped while waiting for %s client registration", clientType)
|
return nil, fmt.Errorf("client stopped while waiting for %s client registration", clientType)
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
// Update reconnection status in case it changed
|
||||||
|
c.reconnectMux.Lock()
|
||||||
|
isReconnecting = c.reconnecting
|
||||||
|
c.reconnectMux.Unlock()
|
||||||
|
|
||||||
if c.IsRegistered() {
|
if c.IsRegistered() {
|
||||||
return getter(), nil
|
return getter(), nil
|
||||||
}
|
}
|
||||||
@@ -771,6 +825,18 @@ func (c *GrpcClient) executeReconnection() {
|
|||||||
} else {
|
} else {
|
||||||
c.recordSuccess()
|
c.recordSuccess()
|
||||||
c.Infof("reconnection successful - connection state: %s, registered: %v", c.getState(), c.IsRegistered())
|
c.Infof("reconnection successful - connection state: %s, registered: %v", c.getState(), c.IsRegistered())
|
||||||
|
|
||||||
|
// Stabilization: wait a moment to ensure connection is truly stable
|
||||||
|
// This prevents immediate flapping if the network is still unstable
|
||||||
|
c.Debugf("stabilizing connection for %v", connectionStabilizationDelay)
|
||||||
|
time.Sleep(connectionStabilizationDelay)
|
||||||
|
|
||||||
|
// Verify connection is still stable after delay
|
||||||
|
if c.conn != nil && c.conn.GetState() == connectivity.Ready {
|
||||||
|
c.Infof("connection stabilization successful")
|
||||||
|
} else {
|
||||||
|
c.Warnf("connection became unstable during stabilization")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -338,18 +338,25 @@ func (svc *WorkerService) subscribe() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (svc *WorkerService) sendHeartbeat() {
|
func (svc *WorkerService) sendHeartbeat() {
|
||||||
ctx, cancel := context.WithTimeout(svc.ctx, svc.heartbeatInterval)
|
// Use extended timeout to allow for reconnection scenarios
|
||||||
|
ctx, cancel := context.WithTimeout(svc.ctx, 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
nodeClient, err := client.GetGrpcClient().GetNodeClient()
|
|
||||||
|
// Retry up to 3 times with exponential backoff for reconnection scenarios
|
||||||
|
err := client.RetryWithBackoff(ctx, func() error {
|
||||||
|
nodeClient, err := client.GetGrpcClient().GetNodeClient()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = nodeClient.SendHeartbeat(ctx, &grpc.NodeServiceSendHeartbeatRequest{
|
||||||
|
NodeKey: svc.cfgSvc.GetNodeKey(),
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}, 3, svc.Logger, "heartbeat")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
svc.Errorf("failed to get node client: %v", err)
|
svc.Errorf("failed to send heartbeat: %v", err)
|
||||||
return
|
|
||||||
}
|
|
||||||
_, err = nodeClient.SendHeartbeat(ctx, &grpc.NodeServiceSendHeartbeatRequest{
|
|
||||||
NodeKey: svc.cfgSvc.GetNodeKey(),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
svc.Errorf("failed to send heartbeat to master: %v", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -438,15 +438,25 @@ func (svc *Service) updateNodeStatus() (err error) {
|
|||||||
currentGoroutines := runtime.NumGoroutine()
|
currentGoroutines := runtime.NumGoroutine()
|
||||||
svc.Debugf("Node status update - runners: %d, goroutines: %d", n.CurrentRunners, currentGoroutines)
|
svc.Debugf("Node status update - runners: %d, goroutines: %d", n.CurrentRunners, currentGoroutines)
|
||||||
|
|
||||||
// save node
|
// save node with retry for reconnection scenarios
|
||||||
n.SetUpdated(n.CreatedBy)
|
n.SetUpdated(n.CreatedBy)
|
||||||
if svc.cfgSvc.IsMaster() {
|
if svc.cfgSvc.IsMaster() {
|
||||||
err = service.NewModelService[models.Node]().ReplaceById(n.Id, *n)
|
err = service.NewModelService[models.Node]().ReplaceById(n.Id, *n)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
err = client.NewModelService[models.Node]().ReplaceById(n.Id, *n)
|
// Worker node: use gRPC with retry logic
|
||||||
}
|
ctx, cancel := context.WithTimeout(svc.ctx, 30*time.Second)
|
||||||
if err != nil {
|
defer cancel()
|
||||||
return err
|
|
||||||
|
err = grpcclient.RetryWithBackoff(ctx, func() error {
|
||||||
|
return client.NewModelService[models.Node]().ReplaceById(n.Id, *n)
|
||||||
|
}, 3, svc.Logger, "node status update")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user