From b8e62c7b6b1362ec45b156088728643f4561a8c3 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Thu, 23 Oct 2025 14:49:30 +0800 Subject: [PATCH] fix(node/master): add failure counter and grace period to node health checks; increase monitor interval --- core/node/service/master_service.go | 52 +++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/core/node/service/master_service.go b/core/node/service/master_service.go index 1e2b06f5..2187df79 100644 --- a/core/node/service/master_service.go +++ b/core/node/service/master_service.go @@ -42,6 +42,10 @@ type MasterService struct { // settings monitorInterval time.Duration + // node health tracking - grace period before marking offline + nodeFailureCount map[primitive.ObjectID]int + nodeFailureMux sync.RWMutex + // internals interfaces.Logger } @@ -200,21 +204,27 @@ func (svc *MasterService) monitor() (err error) { go func(n *models.Node) { defer wg.Done() - // subscribe - ok := svc.subscribeNode(n) - if !ok { - go svc.setWorkerNodeOffline(n) + // Check node health: subscribe and ping + subscribeOk := svc.subscribeNode(n) + pingOk := svc.pingNodeClient(n) + healthCheckPassed := subscribeOk && pingOk + + if !healthCheckPassed { + // Health check failed - increment failure counter + failures := svc.incrementNodeFailureCount(n.Id) + svc.Debugf("worker node[%s] health check failed (failures: %d)", n.Key, failures) + + // Only mark offline after consecutive failures (grace period) + // This prevents flapping during brief reconnection windows + if failures >= 2 { + svc.Infof("worker node[%s] failed %d consecutive health checks, marking offline", n.Key, failures) + go svc.setWorkerNodeOffline(n) + } return } - // ping client - ok = svc.pingNodeClient(n) - if !ok { - go svc.setWorkerNodeOffline(n) - return - } - - // if both subscribe and ping succeed, ensure node is marked as online + // Health check passed - reset failure counter and mark online + svc.resetNodeFailureCount(n.Id) go svc.setWorkerNodeOnline(n) // handle reconnection - reconcile disconnected tasks @@ -329,13 +339,29 @@ func (svc *MasterService) monitorGrpcClientHealth() { } } +// incrementNodeFailureCount increments the failure counter for a node and returns the new count +func (svc *MasterService) incrementNodeFailureCount(nodeId primitive.ObjectID) int { + svc.nodeFailureMux.Lock() + defer svc.nodeFailureMux.Unlock() + svc.nodeFailureCount[nodeId]++ + return svc.nodeFailureCount[nodeId] +} + +// resetNodeFailureCount resets the failure counter for a node to zero +func (svc *MasterService) resetNodeFailureCount(nodeId primitive.ObjectID) { + svc.nodeFailureMux.Lock() + defer svc.nodeFailureMux.Unlock() + delete(svc.nodeFailureCount, nodeId) +} + func newMasterService() *MasterService { cfgSvc := config.GetNodeConfigService() server := server.GetGrpcServer() return &MasterService{ cfgSvc: cfgSvc, - monitorInterval: 15 * time.Second, + monitorInterval: 20 * time.Second, // Increased from 15s to give more reconnection time + nodeFailureCount: make(map[primitive.ObjectID]int), server: server, taskSchedulerSvc: scheduler.GetTaskSchedulerService(), taskHandlerSvc: handler.GetTaskHandlerService(),