feat: Update gRPC service definitions and implement CheckProcess method

- Downgraded protoc-gen-go-grpc and protoc versions for compatibility.
- Added CheckProcess method to TaskService with corresponding request and response types.
- Updated Subscribe and Connect methods to use new generic client stream types.
- Refactored server and client implementations for Subscribe and Connect methods.
- Ensured backward compatibility by maintaining existing method signatures where applicable.
- Added necessary handler for CheckProcess in the service descriptor.
This commit is contained in:
Marvin Zhang
2025-09-17 10:37:03 +08:00
parent c6834e9964
commit 8c2c23d9b6
17 changed files with 2046 additions and 600 deletions

View File

@@ -73,6 +73,9 @@ func (svc *MasterService) Start() {
// start monitoring worker nodes
go svc.startMonitoring()
// start task reconciliation service for periodic status checks
go svc.taskReconciliationSvc.StartPeriodicReconciliation()
// start task handler
go svc.taskHandlerSvc.Start()

View File

@@ -1,6 +1,7 @@
package service
import (
"context"
"fmt"
"sync"
"time"
@@ -160,13 +161,32 @@ func (svc *TaskReconciliationService) queryProcessStatusFromWorker(node *models.
// requestProcessStatusFromWorker sends a status query request to the worker node
func (svc *TaskReconciliationService) requestProcessStatusFromWorker(nodeStream grpc.NodeService_SubscribeServer, task *models.Task, timeout time.Duration) (string, error) {
// TODO: Implement actual gRPC call to worker to check process status
// This would require extending the gRPC protocol to support process status queries
// For now, we'll use the existing heuristics but with improved logic
// Check if task has a valid PID
if task.Pid <= 0 {
return svc.inferProcessStatusFromLocalState(task, false)
}
// As a placeholder, we'll use the improved heuristic detection
_, hasActiveStream := svc.server.TaskSvr.GetSubscribeStream(task.Id)
return svc.inferProcessStatusFromLocalState(task, hasActiveStream)
// Get the node for this task
node, err := service.NewModelService[models.Node]().GetById(task.NodeId)
if err != nil {
svc.Warnf("failed to get node[%s] for task[%s]: %v", task.NodeId.Hex(), task.Id.Hex(), err)
_, hasActiveStream := svc.server.TaskSvr.GetSubscribeStream(task.Id)
return svc.inferProcessStatusFromLocalState(task, hasActiveStream)
}
// Attempt to query worker directly (future implementation)
// This will return an error until worker discovery infrastructure is built
workerStatus, err := svc.queryWorkerProcessStatus(node, task, timeout)
if err != nil {
svc.Debugf("direct worker query not available, falling back to heuristics: %v", err)
// Fallback to heuristic detection
_, hasActiveStream := svc.server.TaskSvr.GetSubscribeStream(task.Id)
return svc.inferProcessStatusFromLocalState(task, hasActiveStream)
}
svc.Infof("successfully queried worker process status for task[%s]: %s", task.Id.Hex(), workerStatus)
return workerStatus, nil
}
// inferProcessStatusFromLocalState uses local information to infer process status
@@ -226,6 +246,92 @@ func (svc *TaskReconciliationService) checkFinalTaskState(task *models.Task) str
}
}
// mapProcessStatusToTaskStatus converts gRPC process status to task status
func (svc *TaskReconciliationService) mapProcessStatusToTaskStatus(processStatus grpc.ProcessStatus, exitCode int32, task *models.Task) string {
switch processStatus {
case grpc.ProcessStatus_PROCESS_RUNNING:
return constants.TaskStatusRunning
case grpc.ProcessStatus_PROCESS_FINISHED:
// Process finished - check exit code to determine success or failure
if exitCode == 0 {
return constants.TaskStatusFinished
}
return constants.TaskStatusError
case grpc.ProcessStatus_PROCESS_ERROR:
return constants.TaskStatusError
case grpc.ProcessStatus_PROCESS_NOT_FOUND:
// Process not found - could mean it finished and was cleaned up
// Check if task was recently active to determine likely outcome
if time.Since(task.UpdatedAt) < 5*time.Minute {
// Recently active task with missing process - likely completed
if task.Error != "" {
return constants.TaskStatusError
}
return constants.TaskStatusFinished
}
// Old task with missing process - probably error
return constants.TaskStatusError
case grpc.ProcessStatus_PROCESS_ZOMBIE:
// Zombie process indicates abnormal termination
return constants.TaskStatusError
case grpc.ProcessStatus_PROCESS_UNKNOWN:
fallthrough
default:
// Unknown status - use heuristic detection
_, hasActiveStream := svc.server.TaskSvr.GetSubscribeStream(task.Id)
status, _ := svc.inferProcessStatusFromLocalState(task, hasActiveStream)
return status
}
}
// createWorkerClient creates a gRPC client connection to a worker node
// This is a placeholder for future implementation when worker discovery is available
func (svc *TaskReconciliationService) createWorkerClient(node *models.Node) (grpc.TaskServiceClient, error) {
// TODO: Implement worker node discovery and connection
// This would require:
// 1. Worker nodes to register their gRPC server endpoints
// 2. A service discovery mechanism
// 3. Connection pooling and management
//
// For now, return an error to indicate this functionality is not yet available
return nil, fmt.Errorf("direct worker client connections not yet implemented - need worker discovery infrastructure")
}
// queryWorkerProcessStatus attempts to query a worker node directly for process status
// This demonstrates the intended future architecture for worker communication
func (svc *TaskReconciliationService) queryWorkerProcessStatus(node *models.Node, task *models.Task, timeout time.Duration) (string, error) {
// This is the intended implementation once worker discovery is available
// 1. Create gRPC client to worker
client, err := svc.createWorkerClient(node)
if err != nil {
return "", fmt.Errorf("failed to create worker client: %w", err)
}
// 2. Create timeout context
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// 3. Send process status request
req := &grpc.TaskServiceCheckProcessRequest{
TaskId: task.Id.Hex(),
Pid: int32(task.Pid),
}
resp, err := client.CheckProcess(ctx, req)
if err != nil {
return "", fmt.Errorf("worker process status query failed: %w", err)
}
// 4. Convert process status to task status
taskStatus := svc.mapProcessStatusToTaskStatus(resp.Status, resp.ExitCode, task)
svc.Infof("worker reported process status for task[%s]: process_status=%s, exit_code=%d, mapped_to=%s",
task.Id.Hex(), resp.Status.String(), resp.ExitCode, taskStatus)
return taskStatus, nil
}
// syncTaskStatusWithProcess ensures task status matches the actual process status
func (svc *TaskReconciliationService) syncTaskStatusWithProcess(task *models.Task, actualProcessStatus string) (string, error) {
// If the actual process status differs from the database status, we need to sync
@@ -528,7 +634,9 @@ var taskReconciliationServiceOnce sync.Once
func GetTaskReconciliationService() *TaskReconciliationService {
taskReconciliationServiceOnce.Do(func() {
taskReconciliationService = NewTaskReconciliationService(nil) // Will be set by the master service
// Get the server from gRPC server singleton
grpcServer := server.GetGrpcServer()
taskReconciliationService = NewTaskReconciliationService(grpcServer)
})
return taskReconciliationService
}