From 851097dc59c1eaf0a729085f5a660554fe9b45b9 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Mon, 27 Oct 2025 16:26:32 +0800 Subject: [PATCH] fix(node/service): implement worker client wrapper using local TaskServiceServer Implement createWorkerClient to return a workerTaskClient when a node has an active gRPC stream. Add workerTaskClient that wraps the server.TaskServiceServer, stubs other client RPCs and forwards CheckProcess calls directly to server.CheckProcess for querying worker process status. --- .../service/task_reconciliation_service.go | 51 ++++++++++++++++++- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/core/node/service/task_reconciliation_service.go b/core/node/service/task_reconciliation_service.go index 07eecbdb..683d213a 100644 --- a/core/node/service/task_reconciliation_service.go +++ b/core/node/service/task_reconciliation_service.go @@ -18,6 +18,7 @@ import ( "github.com/crawlab-team/crawlab/grpc" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" + grpc2 "google.golang.org/grpc" ) const ( @@ -356,8 +357,54 @@ func (svc *TaskReconciliationService) mapProcessStatusToTaskStatus(processStatus } func (svc *TaskReconciliationService) createWorkerClient(node *models.Node) (grpc.TaskServiceClient, error) { - // TODO: Implement worker discovery and gRPC client creation - return nil, fmt.Errorf("worker client not yet implemented - need service discovery") + // Check if we have an active gRPC stream to this node + // This indicates the node is connected and can receive requests + _, hasStream := svc.server.NodeSvr.GetSubscribeStream(node.Id) + if !hasStream { + return nil, fmt.Errorf("node[%s] not connected via gRPC stream", node.Key) + } + + // Use the existing gRPC server's task service server + // The master node has a TaskServiceServer that can handle CheckProcess requests + // We'll use it through the server's registered service + taskClient := &workerTaskClient{ + server: svc.server.TaskSvr, + nodeId: node.Id, + logger: svc.Logger, + } + + return taskClient, nil +} + +// workerTaskClient wraps the TaskServiceServer to provide TaskServiceClient interface +// This allows the reconciliation service to query worker process status through the gRPC server +type workerTaskClient struct { + server *server.TaskServiceServer + nodeId primitive.ObjectID + logger interfaces.Logger +} + +func (c *workerTaskClient) Subscribe(ctx context.Context, in *grpc.TaskServiceSubscribeRequest, opts ...grpc2.CallOption) (grpc2.ServerStreamingClient[grpc.TaskServiceSubscribeResponse], error) { + return nil, fmt.Errorf("Subscribe not implemented for worker task client") +} + +func (c *workerTaskClient) Connect(ctx context.Context, opts ...grpc2.CallOption) (grpc2.BidiStreamingClient[grpc.TaskServiceConnectRequest, grpc.TaskServiceConnectResponse], error) { + return nil, fmt.Errorf("Connect not implemented for worker task client") +} + +func (c *workerTaskClient) FetchTask(ctx context.Context, in *grpc.TaskServiceFetchTaskRequest, opts ...grpc2.CallOption) (*grpc.TaskServiceFetchTaskResponse, error) { + return nil, fmt.Errorf("FetchTask not implemented for worker task client") +} + +func (c *workerTaskClient) SendNotification(ctx context.Context, in *grpc.TaskServiceSendNotificationRequest, opts ...grpc2.CallOption) (*grpc.TaskServiceSendNotificationResponse, error) { + return nil, fmt.Errorf("SendNotification not implemented for worker task client") +} + +func (c *workerTaskClient) CheckProcess(ctx context.Context, in *grpc.TaskServiceCheckProcessRequest, opts ...grpc2.CallOption) (*grpc.TaskServiceCheckProcessResponse, error) { + // Call the server's CheckProcess method directly + // This works because all gRPC requests from workers go through the same server + c.logger.Debugf("checking process for task[%s] on node[%s]", in.TaskId, c.nodeId.Hex()) + return c.server.CheckProcess(ctx, in) } // ============================================================================