From c6834e99645cc9c4353323ca68bf9ff09c987634 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 17 Sep 2025 10:18:13 +0800 Subject: [PATCH] feat: enhance task reconciliation logic with improved status handling and error messaging --- .../service/task_reconciliation_service.go | 338 ++++++++++++++++-- 1 file changed, 307 insertions(+), 31 deletions(-) diff --git a/core/node/service/task_reconciliation_service.go b/core/node/service/task_reconciliation_service.go index 066cbc5d..1f795278 100644 --- a/core/node/service/task_reconciliation_service.go +++ b/core/node/service/task_reconciliation_service.go @@ -1,7 +1,7 @@ package service import ( - "context" + "fmt" "sync" "time" @@ -88,9 +88,13 @@ func (svc *TaskReconciliationService) HandleNodeReconnection(node *models.Node) actualStatus, err := svc.GetActualTaskStatusFromWorker(node, &task) if err != nil { svc.Warnf("failed to get actual status for task[%s] from reconnected node[%s]: %v", task.Id.Hex(), node.Key, err) - // If we can't determine the actual status, mark as abnormal after reconnection failure - task.Status = constants.TaskStatusAbnormal - task.Error = "Could not reconcile task status after node reconnection" + // If we can't determine the actual status, keep the current status and add a note + // Don't assume abnormal - we simply don't have enough information + if task.Error == "" { + task.Error = "Unable to verify task status after node reconnection - status may be stale" + } + // Skip status update since we don't know the actual state + continue } else { // Update with actual status from worker task.Status = actualStatus @@ -117,45 +121,247 @@ func (svc *TaskReconciliationService) HandleNodeReconnection(node *models.Node) // GetActualTaskStatusFromWorker queries the worker node to get the actual status of a task func (svc *TaskReconciliationService) GetActualTaskStatusFromWorker(node *models.Node, task *models.Task) (status string, err error) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + // First, try to get the actual process status from the worker + actualProcessStatus, err := svc.queryProcessStatusFromWorker(node, task) + if err != nil { + svc.Warnf("failed to query process status from worker node[%s] for task[%s]: %v", node.Key, task.Id.Hex(), err) + // Fall back to heuristic detection + return svc.detectTaskStatusFromHeuristics(task) + } + // Synchronize task status with actual process status + return svc.syncTaskStatusWithProcess(task, actualProcessStatus) +} + +// queryProcessStatusFromWorker directly queries the worker node for the actual process status +func (svc *TaskReconciliationService) queryProcessStatusFromWorker(node *models.Node, task *models.Task) (processStatus string, err error) { // Check if there's an active stream for this task _, hasActiveStream := svc.server.TaskSvr.GetSubscribeStream(task.Id) // Check if the node is still connected via subscription nodeStream, nodeConnected := svc.server.NodeSvr.GetSubscribeStream(node.Id) if !nodeConnected { - svc.Warnf("node[%s] is not connected, using fallback detection for task[%s]", node.Key, task.Id.Hex()) - return svc.inferTaskStatusFromStream(task.Id, hasActiveStream), nil + return svc.inferProcessStatusFromLocalState(task, hasActiveStream) } - // Try to get more accurate status by checking recent task activity - actualStatus, err := svc.detectTaskStatusFromActivity(task, hasActiveStream) - if err != nil { - svc.Warnf("failed to detect task status from activity for task[%s]: %v", task.Id.Hex(), err) - return svc.inferTaskStatusFromStream(task.Id, hasActiveStream), nil + // Query the worker for actual process status + if nodeStream != nil && task.Pid > 0 { + // Send a process status query to the worker + actualStatus, err := svc.requestProcessStatusFromWorker(nodeStream, task, 5*time.Second) + if err != nil { + svc.Warnf("failed to get process status from worker: %v", err) + return svc.inferProcessStatusFromLocalState(task, hasActiveStream) + } + return actualStatus, nil } - // Ping the node to verify it's responsive - if nodeStream != nil { - select { - case <-ctx.Done(): - svc.Warnf("timeout while pinging node[%s] for task[%s]", node.Key, task.Id.Hex()) - return svc.inferTaskStatusFromStream(task.Id, hasActiveStream), nil - default: - // Send a heartbeat to verify node responsiveness - err := nodeStream.Send(&grpc.NodeServiceSubscribeResponse{ - Code: grpc.NodeServiceSubscribeCode_HEARTBEAT, - }) - if err != nil { - svc.Warnf("failed to ping node[%s] for task status check: %v", node.Key, err) - return svc.inferTaskStatusFromStream(task.Id, hasActiveStream), nil - } + return svc.inferProcessStatusFromLocalState(task, hasActiveStream) +} + +// requestProcessStatusFromWorker sends a status query request to the worker node +func (svc *TaskReconciliationService) requestProcessStatusFromWorker(nodeStream grpc.NodeService_SubscribeServer, task *models.Task, timeout time.Duration) (string, error) { + // TODO: Implement actual gRPC call to worker to check process status + // This would require extending the gRPC protocol to support process status queries + // For now, we'll use the existing heuristics but with improved logic + + // As a placeholder, we'll use the improved heuristic detection + _, hasActiveStream := svc.server.TaskSvr.GetSubscribeStream(task.Id) + return svc.inferProcessStatusFromLocalState(task, hasActiveStream) +} + +// inferProcessStatusFromLocalState uses local information to infer process status +func (svc *TaskReconciliationService) inferProcessStatusFromLocalState(task *models.Task, hasActiveStream bool) (string, error) { + // Check if task has been updated recently (within last 30 seconds) + isRecentlyUpdated := time.Since(task.UpdatedAt) < 30*time.Second + + switch { + case hasActiveStream && isRecentlyUpdated: + // Active stream and recent updates = likely running + return constants.TaskStatusRunning, nil + + case !hasActiveStream && isRecentlyUpdated: + // No stream but recent updates = likely just finished + if task.Error != "" { + return constants.TaskStatusError, nil + } + return constants.TaskStatusFinished, nil + + case !hasActiveStream && !isRecentlyUpdated: + // No stream and stale = process likely finished or failed + return svc.checkFinalTaskState(task), nil + + case hasActiveStream && !isRecentlyUpdated: + // Stream exists but no recent updates - could be a long-running task + // Don't assume abnormal - the task might be legitimately running without frequent updates + return constants.TaskStatusRunning, nil + + default: + // Fallback + return constants.TaskStatusError, nil + } +} + +// checkFinalTaskState determines the final state of a task without active streams +func (svc *TaskReconciliationService) checkFinalTaskState(task *models.Task) string { + // Check the current task status and error state + switch task.Status { + case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled, constants.TaskStatusAbnormal: + // Already in a final state + return task.Status + case constants.TaskStatusRunning: + // Running status but no stream = process likely completed + if task.Error != "" { + return constants.TaskStatusError + } + return constants.TaskStatusFinished + case constants.TaskStatusPending, constants.TaskStatusAssigned: + // Never started running but lost connection + return constants.TaskStatusError + case constants.TaskStatusNodeDisconnected: + // Task is marked as disconnected - keep this status since we can't determine final state + // Don't assume abnormal until we can actually verify the process state + return constants.TaskStatusNodeDisconnected + default: + return constants.TaskStatusError + } +} + +// syncTaskStatusWithProcess ensures task status matches the actual process status +func (svc *TaskReconciliationService) syncTaskStatusWithProcess(task *models.Task, actualProcessStatus string) (string, error) { + // If the actual process status differs from the database status, we need to sync + if task.Status != actualProcessStatus { + svc.Infof("syncing task[%s] status from '%s' to '%s' based on actual process status", + task.Id.Hex(), task.Status, actualProcessStatus) + + // Update the task status in the database to match reality + err := svc.updateTaskStatusReliably(task, actualProcessStatus) + if err != nil { + svc.Errorf("failed to sync task[%s] status: %v", task.Id.Hex(), err) + return task.Status, err // Return original status if sync fails } } - return actualStatus, nil + return actualProcessStatus, nil +} + +// updateTaskStatusReliably updates task status with retry logic and validation +func (svc *TaskReconciliationService) updateTaskStatusReliably(task *models.Task, newStatus string) error { + // Update task with the new status + task.Status = newStatus + + // Add appropriate error message for certain status transitions + switch newStatus { + case constants.TaskStatusError: + if task.Error == "" { + task.Error = "Task status synchronized from actual process state" + } + case constants.TaskStatusFinished: + // Clear error message for successfully completed tasks + task.Error = "" + case constants.TaskStatusAbnormal: + if task.Error == "" { + task.Error = "Task marked as abnormal during status reconciliation" + } + case constants.TaskStatusNodeDisconnected: + // Don't modify error message for disconnected status - keep existing context + // The disconnect reason should already be in the error field + } + + // Update with retry logic + return backoff.Retry(func() error { + return service.NewModelService[models.Task]().ReplaceById(task.Id, *task) + }, backoff.WithMaxRetries(backoff.NewConstantBackOff(500*time.Millisecond), 3)) +} + +// detectTaskStatusFromHeuristics provides fallback detection when worker communication fails +func (svc *TaskReconciliationService) detectTaskStatusFromHeuristics(task *models.Task) (string, error) { + // Use improved heuristic detection + _, hasActiveStream := svc.server.TaskSvr.GetSubscribeStream(task.Id) + return svc.inferProcessStatusFromLocalState(task, hasActiveStream) +} + +// StartPeriodicReconciliation starts a background service to periodically reconcile task status +func (svc *TaskReconciliationService) StartPeriodicReconciliation() { + go svc.runPeriodicReconciliation() +} + +// runPeriodicReconciliation periodically checks and reconciles task status with actual process status +func (svc *TaskReconciliationService) runPeriodicReconciliation() { + ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds + defer ticker.Stop() + + for range ticker.C { + err := svc.reconcileRunningTasks() + if err != nil { + svc.Errorf("failed to reconcile running tasks: %v", err) + } + } +} + +// reconcileRunningTasks finds all running tasks and reconciles their status with actual process status +func (svc *TaskReconciliationService) reconcileRunningTasks() error { + // Find all tasks that might need reconciliation + query := bson.M{ + "status": bson.M{ + "$in": []string{ + constants.TaskStatusRunning, + constants.TaskStatusNodeDisconnected, + }, + }, + } + + tasks, err := service.NewModelService[models.Task]().GetMany(query, nil) + if err != nil { + return err + } + + svc.Debugf("found %d tasks to reconcile", len(tasks)) + + for _, task := range tasks { + err := svc.reconcileTaskStatus(&task) + if err != nil { + svc.Errorf("failed to reconcile task[%s]: %v", task.Id.Hex(), err) + } + } + + return nil +} + +// reconcileTaskStatus reconciles a single task's status with its actual process status +func (svc *TaskReconciliationService) reconcileTaskStatus(task *models.Task) error { + // Get the node for this task + node, err := service.NewModelService[models.Node]().GetById(task.NodeId) + if err != nil { + svc.Warnf("failed to get node[%s] for task[%s]: %v", task.NodeId.Hex(), task.Id.Hex(), err) + return err + } + + // Get actual status from worker + actualStatus, err := svc.GetActualTaskStatusFromWorker(node, task) + if err != nil { + svc.Warnf("failed to get actual status for task[%s]: %v", task.Id.Hex(), err) + // Don't change the status if we can't determine the actual state + // This is more honest than making assumptions + return err + } + + // If status changed, update it + if actualStatus != task.Status { + svc.Infof("reconciling task[%s] status from '%s' to '%s'", task.Id.Hex(), task.Status, actualStatus) + return svc.updateTaskStatusReliably(task, actualStatus) + } + + return nil +} + +// ForceReconcileTask forces reconciliation of a specific task (useful for manual intervention) +func (svc *TaskReconciliationService) ForceReconcileTask(taskId primitive.ObjectID) error { + task, err := service.NewModelService[models.Task]().GetById(taskId) + if err != nil { + return fmt.Errorf("failed to get task[%s]: %w", taskId.Hex(), err) + } + + return svc.reconcileTaskStatus(task) } // detectTaskStatusFromActivity analyzes task activity to determine its actual status @@ -193,14 +399,25 @@ func (svc *TaskReconciliationService) checkTaskCompletion(task *models.Task) str switch latestTask.Status { case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled: return latestTask.Status + case constants.TaskStatusAbnormal: + // Abnormal status is also final - keep it + return latestTask.Status case constants.TaskStatusRunning: - // Task still shows as running but has no active stream - likely finished + // Task shows as running but has no active stream - need to determine actual status if latestTask.Error != "" { return constants.TaskStatusError } return constants.TaskStatusFinished + case constants.TaskStatusPending, constants.TaskStatusAssigned: + // Tasks that never started running but lost connection - mark as error + return constants.TaskStatusError + case constants.TaskStatusNodeDisconnected: + // Node disconnected status should be handled by reconnection logic + // Keep the disconnected status since we don't know the actual final state + return constants.TaskStatusNodeDisconnected default: - // Unknown or intermediate status + // Unknown status - mark as error + svc.Warnf("task[%s] has unknown status: %s", task.Id.Hex(), latestTask.Status) return constants.TaskStatusError } } @@ -246,6 +463,65 @@ func NewTaskReconciliationService(server *server.GrpcServer) *TaskReconciliation } } +// ValidateTaskStatus ensures task status is consistent with actual process state +func (svc *TaskReconciliationService) ValidateTaskStatus(task *models.Task) error { + if task == nil { + return fmt.Errorf("task is nil") + } + + // Get the node for this task + node, err := service.NewModelService[models.Node]().GetById(task.NodeId) + if err != nil { + return fmt.Errorf("failed to get node for task: %w", err) + } + + // Get actual status + actualStatus, err := svc.GetActualTaskStatusFromWorker(node, task) + if err != nil { + return fmt.Errorf("failed to get actual task status: %w", err) + } + + // If status is inconsistent, log it and optionally fix it + if actualStatus != task.Status { + svc.Warnf("task[%s] status inconsistency detected: database='%s', actual='%s'", + task.Id.Hex(), task.Status, actualStatus) + + // Optionally auto-correct the status + return svc.updateTaskStatusReliably(task, actualStatus) + } + + return nil +} + +// IsTaskStatusFinal returns true if the task status represents a final state +func (svc *TaskReconciliationService) IsTaskStatusFinal(status string) bool { + switch status { + case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled, constants.TaskStatusAbnormal: + return true + default: + return false + } +} + +// ShouldReconcileTask determines if a task needs status reconciliation +func (svc *TaskReconciliationService) ShouldReconcileTask(task *models.Task) bool { + // Don't reconcile tasks in final states unless they're very old and might be stuck + if svc.IsTaskStatusFinal(task.Status) { + return false + } + + // Always reconcile running or disconnected tasks + switch task.Status { + case constants.TaskStatusRunning, constants.TaskStatusNodeDisconnected: + return true + case constants.TaskStatusPending, constants.TaskStatusAssigned: + // Reconcile if task has been pending/assigned for too long + return time.Since(task.CreatedAt) > 10*time.Minute + default: + return false + } +} + // Singleton pattern var taskReconciliationService *TaskReconciliationService var taskReconciliationServiceOnce sync.Once