diff --git a/core/grpc/server/node_service_server.go b/core/grpc/server/node_service_server.go index be53406e..d7e512b8 100644 --- a/core/grpc/server/node_service_server.go +++ b/core/grpc/server/node_service_server.go @@ -2,6 +2,9 @@ package server import ( "context" + "sync" + "time" + "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/errors" "github.com/crawlab-team/crawlab/core/interfaces" @@ -15,8 +18,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" - "sync" - "time" ) var nodeServiceMutex = sync.Mutex{} @@ -122,12 +123,31 @@ func (svr NodeServiceServer) Subscribe(request *grpc.NodeServiceSubscribeRequest return err } + // Optimistically mark node as active when subscription succeeds + // This provides fast recovery after reconnection while master monitor + // will verify health and revert if the connection is unstable + oldStatus := node.Status + node.Status = constants.NodeStatusOnline + node.Active = true + node.ActiveAt = time.Now() + err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node) + if err != nil { + svr.Warnf("failed to update node status on subscribe: %v", err) + // Continue anyway - monitor will fix it + } else { + svr.Debugf("marked node[%s] as active on subscription (status: %s -> %s)", + request.NodeKey, oldStatus, node.Status) + } + // subscribe nodeServiceMutex.Lock() svr.subs[node.Id] = stream nodeServiceMutex.Unlock() - // TODO: send notification + // send notification if status changed + if utils.IsPro() && oldStatus != node.Status { + go notification.GetNotificationService().SendNodeNotification(node) + } // wait for stream to close <-stream.Context().Done()