From 893cb3cb8a9f3f45a92322abf4fdb0b0095d8789 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 22 Oct 2025 22:03:16 +0800 Subject: [PATCH] fix(grpc/server): mark node active on Subscribe and notify on status change Optimistically mark node as online/active and persist ActiveAt when a node subscribes, logging success or warning on failure. Send a node notification in Pro mode if the status changed. Also tidy import ordering. --- core/grpc/server/node_service_server.go | 26 ++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/core/grpc/server/node_service_server.go b/core/grpc/server/node_service_server.go index be53406e..d7e512b8 100644 --- a/core/grpc/server/node_service_server.go +++ b/core/grpc/server/node_service_server.go @@ -2,6 +2,9 @@ package server import ( "context" + "sync" + "time" + "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/errors" "github.com/crawlab-team/crawlab/core/interfaces" @@ -15,8 +18,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" - "sync" - "time" ) var nodeServiceMutex = sync.Mutex{} @@ -122,12 +123,31 @@ func (svr NodeServiceServer) Subscribe(request *grpc.NodeServiceSubscribeRequest return err } + // Optimistically mark node as active when subscription succeeds + // This provides fast recovery after reconnection while master monitor + // will verify health and revert if the connection is unstable + oldStatus := node.Status + node.Status = constants.NodeStatusOnline + node.Active = true + node.ActiveAt = time.Now() + err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node) + if err != nil { + svr.Warnf("failed to update node status on subscribe: %v", err) + // Continue anyway - monitor will fix it + } else { + svr.Debugf("marked node[%s] as active on subscription (status: %s -> %s)", + request.NodeKey, oldStatus, node.Status) + } + // subscribe nodeServiceMutex.Lock() svr.subs[node.Id] = stream nodeServiceMutex.Unlock() - // TODO: send notification + // send notification if status changed + if utils.IsPro() && oldStatus != node.Status { + go notification.GetNotificationService().SendNodeNotification(node) + } // wait for stream to close <-stream.Context().Done()