Files
crawlab/core/grpc/server/node_service_server.go
Marvin Zhang 893cb3cb8a fix(grpc/server): mark node active on Subscribe and notify on status change
Optimistically mark node as online/active and persist ActiveAt when a node
subscribes, logging success or warning on failure. Send a node notification
in Pro mode if the status changed. Also tidy import ordering.
2025-10-22 22:03:16 +08:00

188 lines
5.4 KiB
Go

package server
import (
"context"
"sync"
"time"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/errors"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/core/notification"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/grpc"
errors2 "github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)
var nodeServiceMutex = sync.Mutex{}
type NodeServiceServer struct {
grpc.UnimplementedNodeServiceServer
// dependencies
cfgSvc interfaces.NodeConfigService
// internals
subs map[primitive.ObjectID]grpc.NodeService_SubscribeServer
interfaces.Logger
}
// Register from handler/worker to master
func (svr NodeServiceServer) Register(_ context.Context, req *grpc.NodeServiceRegisterRequest) (res *grpc.Response, err error) {
// node key
if req.NodeKey == "" {
return HandleError(errors.ErrorModelMissingRequiredData)
}
// find in db
var node *models.Node
node, err = service.NewModelService[models.Node]().GetOne(bson.M{"key": req.NodeKey}, nil)
if err == nil {
// register existing
node.Status = constants.NodeStatusOnline
node.Active = true
node.ActiveAt = time.Now()
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
if err != nil {
return HandleError(err)
}
svr.Infof("updated worker[%s] in db. id: %s", req.NodeKey, node.Id.Hex())
} else if errors2.Is(err, mongo.ErrNoDocuments) {
// register new
node = &models.Node{
Key: req.NodeKey,
Name: req.NodeName,
Status: constants.NodeStatusOnline,
Active: true,
ActiveAt: time.Now(),
Enabled: true,
MaxRunners: int(req.MaxRunners),
}
node.SetCreated(primitive.NilObjectID)
node.SetUpdated(primitive.NilObjectID)
node.Id, err = service.NewModelService[models.Node]().InsertOne(*node)
if err != nil {
return HandleError(err)
}
svr.Infof("added worker[%s] in db. id: %s", req.NodeKey, node.Id.Hex())
} else {
// error
return HandleError(err)
}
svr.Infof("master registered worker[%s]", req.NodeKey)
return HandleSuccessWithData(node)
}
// SendHeartbeat from worker to master
func (svr NodeServiceServer) SendHeartbeat(_ context.Context, req *grpc.NodeServiceSendHeartbeatRequest) (res *grpc.Response, err error) {
// find in db
node, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": req.NodeKey}, nil)
if err != nil {
if errors2.Is(err, mongo.ErrNoDocuments) {
return HandleError(errors.ErrorNodeNotExists)
}
return HandleError(err)
}
oldStatus := node.Status
// update status
node.Status = constants.NodeStatusOnline
node.Active = true
node.ActiveAt = time.Now()
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
if err != nil {
return HandleError(err)
}
newStatus := node.Status
// send notification if status changed
if utils.IsPro() {
if oldStatus != newStatus {
go notification.GetNotificationService().SendNodeNotification(node)
}
}
return HandleSuccessWithData(node)
}
func (svr NodeServiceServer) Subscribe(request *grpc.NodeServiceSubscribeRequest, stream grpc.NodeService_SubscribeServer) (err error) {
svr.Infof("master received subscribe request from node[%s]", request.NodeKey)
// find in db
node, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": request.NodeKey}, nil)
if err != nil {
svr.Errorf("error getting node: %v", err)
return err
}
// Optimistically mark node as active when subscription succeeds
// This provides fast recovery after reconnection while master monitor
// will verify health and revert if the connection is unstable
oldStatus := node.Status
node.Status = constants.NodeStatusOnline
node.Active = true
node.ActiveAt = time.Now()
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
if err != nil {
svr.Warnf("failed to update node status on subscribe: %v", err)
// Continue anyway - monitor will fix it
} else {
svr.Debugf("marked node[%s] as active on subscription (status: %s -> %s)",
request.NodeKey, oldStatus, node.Status)
}
// subscribe
nodeServiceMutex.Lock()
svr.subs[node.Id] = stream
nodeServiceMutex.Unlock()
// send notification if status changed
if utils.IsPro() && oldStatus != node.Status {
go notification.GetNotificationService().SendNodeNotification(node)
}
// wait for stream to close
<-stream.Context().Done()
// unsubscribe
nodeServiceMutex.Lock()
delete(svr.subs, node.Id)
nodeServiceMutex.Unlock()
svr.Infof("master unsubscribed from node[%s]", request.NodeKey)
return nil
}
func (svr NodeServiceServer) GetSubscribeStream(nodeId primitive.ObjectID) (stream grpc.NodeService_SubscribeServer, ok bool) {
nodeServiceMutex.Lock()
defer nodeServiceMutex.Unlock()
stream, ok = svr.subs[nodeId]
return stream, ok
}
func newNodeServiceServer() *NodeServiceServer {
return &NodeServiceServer{
cfgSvc: nodeconfig.GetNodeConfigService(),
subs: make(map[primitive.ObjectID]grpc.NodeService_SubscribeServer),
Logger: utils.NewLogger("GrpcNodeServiceServer"),
}
}
var nodeSvr *NodeServiceServer
var nodeSvrOnce sync.Once
func GetNodeServiceServer() *NodeServiceServer {
nodeSvrOnce.Do(func() {
nodeSvr = newNodeServiceServer()
})
return nodeSvr
}