From e221e3c640cf50fe593b3c7b5ba1c7e016425787 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Fri, 12 Sep 2025 18:16:52 +0800 Subject: [PATCH] feat: enhance gRPC client handling with improved reconnection logic and monitoring --- core/grpc/client/client.go | 91 ++++++++++++++++++++++++++--- core/node/service/master_service.go | 47 ++++++++++++++- core/node/service/worker_service.go | 54 ++++++++++++++--- 3 files changed, 177 insertions(+), 15 deletions(-) diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go index 587d864d..3cbad922 100644 --- a/core/grpc/client/client.go +++ b/core/grpc/client/client.go @@ -28,7 +28,8 @@ const ( registrationCheckInterval = 100 * time.Millisecond idleGracePeriod = 2 * time.Minute // Increased from 30 seconds connectionTimeout = 30 * time.Second - defaultClientTimeout = 5 * time.Second + defaultClientTimeout = 15 * time.Second // Increased from 5s for better reconnection handling + reconnectionClientTimeout = 60 * time.Second // Extended timeout during reconnection scenarios ) // Circuit breaker states @@ -257,7 +258,7 @@ func (c *GrpcClient) monitorState() { var ( idleStartTime = time.Time{} - ticker = time.NewTicker(stateMonitorInterval) // Reduce frequency + ticker = time.NewTicker(stateMonitorInterval) ) defer ticker.Stop() @@ -310,9 +311,12 @@ func (c *GrpcClient) checkAndHandleStateChange(idleStartTime *time.Time) { case connectivity.Shutdown: c.setRegistered(false) + c.Warnf("connection state changed to SHUTDOWN - stopped flag: %v", c.stopped) if !c.stopped { c.Errorf("connection shutdown unexpectedly") c.triggerReconnection(fmt.Sprintf("state change to %s", current)) + } else { + c.Debugf("connection shutdown expected (client stopped)") } case connectivity.Idle: @@ -418,35 +422,75 @@ func (c *GrpcClient) WaitForRegistered() { // These methods will wait for registration to complete or return an error if the client is stopped func (c *GrpcClient) GetNodeClient() (grpc2.NodeServiceClient, error) { - ctx, cancel := context.WithTimeout(context.Background(), defaultClientTimeout) + // Use longer timeout during reconnection scenarios + timeout := defaultClientTimeout + c.reconnectMux.Lock() + if c.reconnecting { + timeout = reconnectionClientTimeout + } + c.reconnectMux.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return c.GetNodeClientWithContext(ctx) } func (c *GrpcClient) GetTaskClient() (grpc2.TaskServiceClient, error) { - ctx, cancel := context.WithTimeout(context.Background(), defaultClientTimeout) + // Use longer timeout during reconnection scenarios + timeout := defaultClientTimeout + c.reconnectMux.Lock() + if c.reconnecting { + timeout = reconnectionClientTimeout + } + c.reconnectMux.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return c.GetTaskClientWithContext(ctx) } func (c *GrpcClient) GetModelBaseServiceClient() (grpc2.ModelBaseServiceClient, error) { - ctx, cancel := context.WithTimeout(context.Background(), defaultClientTimeout) + // Use longer timeout during reconnection scenarios + timeout := defaultClientTimeout + c.reconnectMux.Lock() + if c.reconnecting { + timeout = reconnectionClientTimeout + } + c.reconnectMux.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return c.GetModelBaseServiceClientWithContext(ctx) } func (c *GrpcClient) GetDependencyClient() (grpc2.DependencyServiceClient, error) { - ctx, cancel := context.WithTimeout(context.Background(), defaultClientTimeout) + // Use longer timeout during reconnection scenarios + timeout := defaultClientTimeout + c.reconnectMux.Lock() + if c.reconnecting { + timeout = reconnectionClientTimeout + } + c.reconnectMux.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return c.GetDependencyClientWithContext(ctx) } func (c *GrpcClient) GetMetricClient() (grpc2.MetricServiceClient, error) { - ctx, cancel := context.WithTimeout(context.Background(), defaultClientTimeout) + // Use longer timeout during reconnection scenarios + timeout := defaultClientTimeout + c.reconnectMux.Lock() + if c.reconnecting { + timeout = reconnectionClientTimeout + } + c.reconnectMux.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return c.GetMetricClientWithContext(ctx) @@ -586,6 +630,11 @@ func (c *GrpcClient) getClientWithContext(ctx context.Context, getter func() int return getter(), nil } + // Check if we're reconnecting to provide better error context + c.reconnectMux.Lock() + isReconnecting := c.reconnecting + c.reconnectMux.Unlock() + // Wait for registration with context ticker := time.NewTicker(registrationCheckInterval) defer ticker.Stop() @@ -593,6 +642,9 @@ func (c *GrpcClient) getClientWithContext(ctx context.Context, getter func() int for { select { case <-ctx.Done(): + if isReconnecting { + return nil, fmt.Errorf("context cancelled while waiting for %s client registration during reconnection (this is normal during network restoration)", clientType) + } return nil, fmt.Errorf("context cancelled while waiting for %s client registration", clientType) case <-c.stop: return nil, fmt.Errorf("client stopped while waiting for %s client registration", clientType) @@ -881,6 +933,7 @@ func newGrpcClient() (c *GrpcClient) { var _client *GrpcClient var _clientOnce sync.Once +var _clientMux sync.Mutex func GetGrpcClient() *GrpcClient { _clientOnce.Do(func() { @@ -889,3 +942,27 @@ func GetGrpcClient() *GrpcClient { }) return _client } + +// ResetGrpcClient creates a completely new gRPC client instance +// This is needed when the client gets stuck and needs to be fully restarted +func ResetGrpcClient() *GrpcClient { + _clientMux.Lock() + defer _clientMux.Unlock() + + // Stop the old client if it exists + if _client != nil { + _client.Stop() + } + + // Reset the sync.Once so we can create a new client + _clientOnce = sync.Once{} + _client = nil + + // Create and start the new client + _clientOnce.Do(func() { + _client = newGrpcClient() + go _client.Start() + }) + + return _client +} diff --git a/core/node/service/master_service.go b/core/node/service/master_service.go index 31aa4c1f..67a7de7c 100644 --- a/core/node/service/master_service.go +++ b/core/node/service/master_service.go @@ -8,6 +8,7 @@ import ( "github.com/apex/log" "github.com/cenkalti/backoff/v4" "github.com/crawlab-team/crawlab/core/constants" + "github.com/crawlab-team/crawlab/core/grpc/client" "github.com/crawlab-team/crawlab/core/grpc/server" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/common" @@ -108,12 +109,15 @@ func (svc *MasterService) startMonitoring() { ticker := time.NewTicker(svc.monitorInterval) for { - // monitor + // monitor worker nodes err := svc.monitor() if err != nil { svc.Errorf("master[%s] monitor error: %v", svc.cfgSvc.GetNodeKey(), err) } + // monitor gRPC client health on master + svc.monitorGrpcClientHealth() + // wait <-ticker.C } @@ -207,6 +211,9 @@ func (svc *MasterService) monitor() (err error) { return } + // if both subscribe and ping succeed, ensure node is marked as online + go svc.setWorkerNodeOnline(n) + // handle reconnection - reconcile disconnected tasks go svc.taskReconciliationSvc.HandleNodeReconnection(n) @@ -236,6 +243,32 @@ func (svc *MasterService) setWorkerNodeOffline(node *models.Node) { svc.sendNotification(node) } +func (svc *MasterService) setWorkerNodeOnline(node *models.Node) { + // Only update if the node is currently offline + if node.Status == constants.NodeStatusOnline { + return + } + + oldStatus := node.Status + node.Status = constants.NodeStatusOnline + node.Active = true + node.ActiveAt = time.Now() + err := backoff.Retry(func() error { + return service.NewModelService[models.Node]().ReplaceById(node.Id, *node) + }, backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 3)) + if err != nil { + svc.Errorf("failed to set worker node[%s] online: %v", node.Key, err) + return + } + + svc.Infof("worker node[%s] status changed from '%s' to 'online'", node.Key, oldStatus) + + // send notification if status changed + if utils.IsPro() && oldStatus != constants.NodeStatusOnline { + svc.sendNotification(node) + } +} + func (svc *MasterService) subscribeNode(n *models.Node) (ok bool) { _, ok = svc.server.NodeSvr.GetSubscribeStream(n.Id) return ok @@ -277,6 +310,18 @@ func (svc *MasterService) sendMasterStatusNotification(oldStatus, newStatus stri go notification.GetNotificationService().SendNodeNotification(node) } +func (svc *MasterService) monitorGrpcClientHealth() { + grpcClient := client.GetGrpcClient() + + // Check if gRPC client is in a bad state + if !grpcClient.IsReady() && grpcClient.IsClosed() { + svc.Warnf("master node gRPC client is in SHUTDOWN state, forcing FULL RESET") + // Reset the gRPC client to get a fresh instance + client.ResetGrpcClient() + svc.Infof("master node gRPC client has been reset") + } +} + func newMasterService() *MasterService { cfgSvc := config.GetNodeConfigService() server := server.GetGrpcServer() diff --git a/core/node/service/worker_service.go b/core/node/service/worker_service.go index 23c9215e..7f1ef92e 100644 --- a/core/node/service/worker_service.go +++ b/core/node/service/worker_service.go @@ -217,8 +217,8 @@ func (svc *WorkerService) subscribe() { // Configure exponential backoff b := backoff.NewExponentialBackOff() b.InitialInterval = 1 * time.Second - b.MaxInterval = 1 * time.Minute - b.MaxElapsedTime = 10 * time.Minute + b.MaxInterval = 30 * time.Second // Reduced from 1 minute + b.MaxElapsedTime = 0 * time.Minute // Never give up b.Multiplier = 2.0 for { @@ -232,7 +232,43 @@ func (svc *WorkerService) subscribe() { // Use backoff for connection attempts operation := func() error { svc.Debugf("attempting to subscribe to master") - nodeClient, err := client.GetGrpcClient().GetNodeClient() + + // Wait for gRPC client to be ready and registered after reconnection + grpcClient := client.GetGrpcClient() + + waitStart := time.Now() + checkCount := 0 + for !grpcClient.IsReadyAndRegistered() { + select { + case <-svc.ctx.Done(): + return svc.ctx.Err() + case <-time.After(500 * time.Millisecond): + checkCount++ + // Log periodically while waiting + if checkCount%20 == 0 { // Every 10 seconds + svc.Warnf("still waiting for gRPC client (%.1fs)", time.Since(waitStart).Seconds()) + + // Force a reconnection attempt if we've been waiting too long + if time.Since(waitStart) > 15*time.Second { + svc.Warnf("forcing gRPC client reset due to prolonged wait") + grpcClient = client.ResetGrpcClient() + waitStart = time.Now() + checkCount = 0 + } + + // Check if client is in SHUTDOWN state and force restart + if !grpcClient.IsReady() && grpcClient.IsClosed() { + svc.Warnf("gRPC client is in SHUTDOWN state, forcing reset") + grpcClient = client.ResetGrpcClient() + waitStart = time.Now() + checkCount = 0 + } + } + } + } + svc.Debugf("gRPC client is ready and registered after %.1fs", time.Since(waitStart).Seconds()) + + nodeClient, err := grpcClient.GetNodeClient() if err != nil { svc.Errorf("failed to get node client: %v", err) return err @@ -246,7 +282,7 @@ func (svc *WorkerService) subscribe() { svc.Errorf("failed to subscribe to master: %v", err) return err } - svc.Debugf("subscribed to master") + svc.Infof("successfully subscribed to master") // Handle messages for { @@ -284,17 +320,21 @@ func (svc *WorkerService) subscribe() { if err != nil { if svc.ctx.Err() != nil { // Context was cancelled, exit gracefully - svc.Debugf("subscription retry cancelled due to context") + svc.Infof("subscription retry cancelled due to context") return } - svc.Errorf("subscription failed after max retries: %v", err) + svc.Errorf("subscription attempt failed: %v", err) + // Reset backoff for next attempt + b.Reset() + } else { + svc.Debugf("subscription completed successfully") } // Wait before attempting to reconnect, but respect context cancellation select { case <-svc.ctx.Done(): return - case <-time.After(time.Second): + case <-time.After(2 * time.Second): } } }