From 49165b21652522a3fc74a9dd61cfd95aa9ca8179 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Mon, 20 Oct 2025 10:54:32 +0800 Subject: [PATCH] refactor(node): reorganize task reconciliation, prioritize worker cache, add periodic cleanup - Move and document reconciliation constants and add sectioned organization/comments. - Split large monolithic logic into smaller functions: - reconcileDisconnectedTasks / reconcileDisconnectedTask - reconcileAbandonedAssignedTasks - reconcileStalePendingTasks / handleStalePendingTask - getActualTaskStatus / getStatusFromWorkerCache / triggerWorkerStatusSync - queryProcessStatus / requestProcessStatusFromWorker / mapProcessStatusToTaskStatus - findTasksByStatus / markTaskDisconnected / findAvailableNodeForTask - updateTaskStatus / saveTask / shouldMarkTaskAbnormal / markTaskAbnormal - Add periodic background workers: - StartPeriodicReconciliation -> runPeriodicReconciliation to reconcile running/disconnected tasks - runPeriodicAssignedTaskCleanup -> cleanupStuckAssignedTasks to detect and recover stuck assigned tasks - Prioritize worker-side cached status and attempt sync from task runner before querying worker processes. - Introduce a placeholder createWorkerClient for future gRPC worker discovery/invocation. - Replace ad-hoc DB updates with saveTask using retry/backoff and centralize status update logic. - Improve logging and error messages, and tighten conditions for marking tasks abnormal. This refactor clarifies responsibilities, improves reliability of status updates, and prepares the codebase for future worker gRPC integration. --- .../service/task_reconciliation_service.go | 856 ++++++++++-------- 1 file changed, 455 insertions(+), 401 deletions(-) diff --git a/core/node/service/task_reconciliation_service.go b/core/node/service/task_reconciliation_service.go index 61f82e4a..07eecbdb 100644 --- a/core/node/service/task_reconciliation_service.go +++ b/core/node/service/task_reconciliation_service.go @@ -20,316 +20,295 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) -// TaskReconciliationService handles task status reconciliation for node disconnection scenarios +const ( + staleReconciliationThreshold = 15 * time.Minute + assignedTaskTimeout = 10 * time.Minute + pendingTaskTimeout = 30 * time.Minute + reconciliationInterval = 30 * time.Second + assignedCleanupInterval = 2 * time.Minute +) + +// TaskReconciliationService handles task status reconciliation for node disconnection scenarios. +// It ensures task statuses remain accurate even when nodes go offline/online unexpectedly. type TaskReconciliationService struct { server *server.GrpcServer - taskHandlerSvc *handler.Service // access to task handlers and their status caches + taskHandlerSvc *handler.Service interfaces.Logger } -const staleReconciliationThreshold = 15 * time.Minute +// ============================================================================ +// Section 1: Public API - Node Lifecycle Events +// ============================================================================ -// HandleTasksForOfflineNode updates all running tasks on an offline node to abnormal status +// HandleTasksForOfflineNode marks all running tasks as disconnected when node goes offline func (svc *TaskReconciliationService) HandleTasksForOfflineNode(node *models.Node) { - // Find all running tasks on the offline node - query := bson.M{ - "node_id": node.Id, - "status": constants.TaskStatusRunning, - } - - runningTasks, err := service.NewModelService[models.Task]().GetMany(query, nil) - if err != nil { - svc.Errorf("failed to get running tasks for offline node[%s]: %v", node.Key, err) - return - } - + runningTasks := svc.findTasksByStatus(node.Id, constants.TaskStatusRunning) if len(runningTasks) == 0 { svc.Debugf("no running tasks found for offline node[%s]", node.Key) return } - svc.Infof("updating %d running tasks to abnormal status for offline node[%s]", len(runningTasks), node.Key) - - // Update each task status to node_disconnected (recoverable) + svc.Infof("marking %d running tasks as disconnected for offline node[%s]", len(runningTasks), node.Key) for _, task := range runningTasks { - task.Status = constants.TaskStatusNodeDisconnected - task.Error = "Task temporarily disconnected due to worker node offline" - task.SetUpdated(primitive.NilObjectID) - - // Update the task in database - err := backoff.Retry(func() error { - return service.NewModelService[models.Task]().ReplaceById(task.Id, task) - }, backoff.WithMaxRetries(backoff.NewConstantBackOff(500*time.Millisecond), 3)) - - if err != nil { - svc.Errorf("failed to update task[%s] status for offline node[%s]: %v", task.Id.Hex(), node.Key, err) - } else { - svc.Debugf("updated task[%s] status to abnormal for offline node[%s]", task.Id.Hex(), node.Key) - } + svc.markTaskDisconnected(&task, node.Key) } } -// triggerWorkerStatusSync triggers synchronization of cached status from worker to database -func (svc *TaskReconciliationService) triggerWorkerStatusSync(task *models.Task) error { - // Check if we have access to task handler service (only on worker nodes) - if svc.taskHandlerSvc == nil { - return fmt.Errorf("task handler service not available - not on worker node") +// HandleNodeReconnection reconciles tasks when node comes back online +func (svc *TaskReconciliationService) HandleNodeReconnection(node *models.Node) { + svc.reconcileDisconnectedTasks(node) + svc.reconcileAbandonedAssignedTasks(node) + svc.reconcileStalePendingTasks(node) +} + +// StartPeriodicReconciliation starts background goroutines for periodic task reconciliation +func (svc *TaskReconciliationService) StartPeriodicReconciliation() { + go svc.runPeriodicReconciliation() + go svc.runPeriodicAssignedTaskCleanup() +} + +// ForceReconcileTask forces reconciliation of a specific task (for manual intervention) +func (svc *TaskReconciliationService) ForceReconcileTask(taskId primitive.ObjectID) error { + task, err := service.NewModelService[models.Task]().GetById(taskId) + if err != nil { + return fmt.Errorf("failed to get task[%s]: %w", taskId.Hex(), err) + } + return svc.reconcileTaskStatus(task) +} + +// ValidateTaskStatus ensures task status is consistent with actual process state +func (svc *TaskReconciliationService) ValidateTaskStatus(task *models.Task) error { + if task == nil { + return fmt.Errorf("task is nil") } - // Get the task runner for this task - taskRunner := svc.taskHandlerSvc.GetTaskRunner(task.Id) - if taskRunner == nil { - return fmt.Errorf("no active task runner found for task %s", task.Id.Hex()) + node, err := service.NewModelService[models.Node]().GetById(task.NodeId) + if err != nil { + return fmt.Errorf("failed to get node for task: %w", err) } - // Cast to concrete Runner type to access status cache methods - runner, ok := taskRunner.(*handler.Runner) - if !ok { - return fmt.Errorf("task runner is not of expected type for task %s", task.Id.Hex()) + actualStatus, err := svc.getActualTaskStatus(node, task) + if err != nil { + return fmt.Errorf("failed to get actual task status: %w", err) } - // Trigger sync of pending status updates - if err := runner.SyncPendingStatusUpdates(); err != nil { - return fmt.Errorf("failed to sync pending status updates: %w", err) + if actualStatus != task.Status { + svc.Warnf("task[%s] status inconsistency: database='%s', actual='%s'", + task.Id.Hex(), task.Status, actualStatus) + return svc.updateTaskStatus(task, actualStatus) } - svc.Infof("successfully triggered status sync for task[%s]", task.Id.Hex()) return nil } -// HandleNodeReconnection reconciles tasks that were marked as disconnected when the node comes back online -// Now leverages worker-side status cache for more accurate reconciliation -func (svc *TaskReconciliationService) HandleNodeReconnection(node *models.Node) { - // Find all disconnected tasks on this node - query := bson.M{ - "node_id": node.Id, - "status": constants.TaskStatusNodeDisconnected, +// ============================================================================ +// Section 2: Reconciliation Logic - Node Reconnection Handlers +// ============================================================================ + +func (svc *TaskReconciliationService) reconcileDisconnectedTasks(node *models.Node) { + tasks := svc.findTasksByStatus(node.Id, constants.TaskStatusNodeDisconnected) + if len(tasks) == 0 { + svc.Debugf("no disconnected tasks found for node[%s]", node.Key) + return } - disconnectedTasks, err := service.NewModelService[models.Task]().GetMany(query, nil) + svc.Infof("reconciling %d disconnected tasks for node[%s]", len(tasks), node.Key) + for _, task := range tasks { + svc.reconcileDisconnectedTask(node, &task) + } +} + +func (svc *TaskReconciliationService) reconcileDisconnectedTask(node *models.Node, task *models.Task) { + // Try to sync from worker cache first + _ = svc.triggerWorkerStatusSync(task) + + actualStatus, err := svc.getActualTaskStatus(node, task) if err != nil { - svc.Errorf("failed to get disconnected tasks for reconnected node[%s]: %v", node.Key, err) + svc.Warnf("cannot determine actual status for task[%s]: %v", task.Id.Hex(), err) + if task.Error == "" { + task.Error = "Unable to verify task status after node reconnection" + } return } - if len(disconnectedTasks) == 0 { - svc.Debugf("no disconnected tasks found for reconnected node[%s]", node.Key) + // Update with actual status + task.Status = actualStatus + if actualStatus == constants.TaskStatusFinished { + task.Error = "" + } else if actualStatus == constants.TaskStatusError { + task.Error = "Task encountered an error during node disconnection" + } + + if err := svc.saveTask(task); err != nil { + svc.Errorf("failed to save reconciled task[%s]: %v", task.Id.Hex(), err) + } else { + svc.Infof("reconciled task[%s] from disconnected to '%s'", task.Id.Hex(), actualStatus) + } +} + +func (svc *TaskReconciliationService) reconcileAbandonedAssignedTasks(node *models.Node) { + tasks := svc.findTasksByStatus(node.Id, constants.TaskStatusAssigned) + if len(tasks) == 0 { return } - svc.Infof("reconciling %d disconnected tasks for reconnected node[%s]", len(disconnectedTasks), node.Key) - - // For each disconnected task, try to get its actual status from the worker node - for _, task := range disconnectedTasks { - // First, try to trigger status sync from worker cache if we're on the worker node - if err := svc.triggerWorkerStatusSync(&task); err != nil { - svc.Debugf("could not trigger worker status sync for task[%s]: %v", task.Id.Hex(), err) - } - - actualStatus, err := svc.GetActualTaskStatusFromWorker(node, &task) - if err != nil { - svc.Warnf("failed to get actual status for task[%s] from reconnected node[%s]: %v", task.Id.Hex(), node.Key, err) - // If we can't determine the actual status, keep the current status and add a note - // Don't assume abnormal - we simply don't have enough information - if task.Error == "" { - task.Error = "Unable to verify task status after node reconnection - status may be stale" - } - // Skip status update since we don't know the actual state - continue - } else { - // Update with actual status from worker - task.Status = actualStatus - switch actualStatus { - case constants.TaskStatusFinished: - task.Error = "" // Clear error message for successfully completed tasks - case constants.TaskStatusError: - task.Error = "Task encountered an error during node disconnection" - } - } - - // Update the task in database - task.SetUpdated(primitive.NilObjectID) - err = backoff.Retry(func() error { - return service.NewModelService[models.Task]().ReplaceById(task.Id, task) - }, backoff.WithMaxRetries(backoff.NewConstantBackOff(500*time.Millisecond), 3)) - - if err != nil { - svc.Errorf("failed to update reconciled task[%s] status for node[%s]: %v", task.Id.Hex(), node.Key, err) - } else { - svc.Infof("reconciled task[%s] status from 'node_disconnected' to '%s' for node[%s]", task.Id.Hex(), task.Status, node.Key) + svc.Infof("resetting %d abandoned assigned tasks for node[%s]", len(tasks), node.Key) + for _, task := range tasks { + task.Status = constants.TaskStatusPending + task.Error = "Task reset from 'assigned' to 'pending' after node reconnection" + if err := svc.saveTask(&task); err != nil { + svc.Errorf("failed to reset assigned task[%s]: %v", task.Id.Hex(), err) } } } -// GetActualTaskStatusFromWorker queries the worker node to get the actual status of a task -// Now prioritizes worker-side status cache over heuristics -func (svc *TaskReconciliationService) GetActualTaskStatusFromWorker(node *models.Node, task *models.Task) (status string, err error) { - // First priority: get status from worker-side task runner cache - cachedStatus, err := svc.getStatusFromWorkerCache(task) - if err == nil && cachedStatus != "" { +func (svc *TaskReconciliationService) reconcileStalePendingTasks(node *models.Node) { + tasks := svc.findTasksByStatus(node.Id, constants.TaskStatusPending) + if len(tasks) == 0 { + return + } + + staleCount := 0 + for _, task := range tasks { + if time.Since(task.CreatedAt) > pendingTaskTimeout { + staleCount++ + svc.handleStalePendingTask(node, &task) + } + } + + if staleCount > 0 { + svc.Infof("handled %d stale pending tasks for node[%s]", staleCount, node.Key) + } +} + +func (svc *TaskReconciliationService) handleStalePendingTask(node *models.Node, task *models.Task) { + svc.Warnf("task[%s] pending for %v, attempting recovery", task.Id.Hex(), time.Since(task.CreatedAt)) + + originalNodeKey := node.Key + originalNodeId := task.NodeId + + availableNode, err := svc.findAvailableNodeForTask(task) + if err == nil && availableNode != nil { + // Re-assign to available node + task.NodeId = availableNode.Id + task.Error = fmt.Sprintf("Pending for %v on node %s (%s), re-assigned to %s", + time.Since(task.CreatedAt), originalNodeKey, originalNodeId.Hex(), availableNode.Key) + svc.Infof("re-assigned stale task[%s] to node[%s]", task.Id.Hex(), availableNode.Key) + } else { + // Mark as abnormal + task.Status = constants.TaskStatusAbnormal + task.Error = fmt.Sprintf("Pending for %v on node %s (%s), no available nodes for re-assignment", + time.Since(task.CreatedAt), originalNodeKey, originalNodeId.Hex()) + svc.Warnf("marked stale task[%s] as abnormal", task.Id.Hex()) + } + + if err := svc.saveTask(task); err != nil { + svc.Errorf("failed to handle stale pending task[%s]: %v", task.Id.Hex(), err) + } +} + +// ============================================================================ +// Section 3: Status Querying - Get Actual Task Status from Worker +// ============================================================================ + +func (svc *TaskReconciliationService) getActualTaskStatus(node *models.Node, task *models.Task) (string, error) { + // Priority 1: Worker cache (most accurate) + if cachedStatus, err := svc.getStatusFromWorkerCache(task); err == nil && cachedStatus != "" { svc.Debugf("retrieved cached status for task[%s]: %s", task.Id.Hex(), cachedStatus) return cachedStatus, nil } - // Second priority: query process status from worker - actualProcessStatus, err := svc.queryProcessStatusFromWorker(node, task) + // Priority 2: Query process status from worker + processStatus, err := svc.queryProcessStatus(node, task) if err != nil { - svc.Warnf("failed to query process status from worker node[%s] for task[%s]: %v", node.Key, task.Id.Hex(), err) - // Return error instead of falling back to unreliable heuristics - return "", fmt.Errorf("unable to determine actual task status: %w", err) + return "", fmt.Errorf("unable to determine task status: %w", err) } - // Synchronize task status with actual process status - return svc.syncTaskStatusWithProcess(task, actualProcessStatus) + return processStatus, nil } -// getStatusFromWorkerCache retrieves task status from worker-side task runner cache func (svc *TaskReconciliationService) getStatusFromWorkerCache(task *models.Task) (string, error) { - // Check if we have access to task handler service (only on worker nodes) if svc.taskHandlerSvc == nil { - return "", fmt.Errorf("task handler service not available - not on worker node") + return "", fmt.Errorf("task handler service not available") } - // Get the task runner for this task taskRunner := svc.taskHandlerSvc.GetTaskRunner(task.Id) if taskRunner == nil { - return "", fmt.Errorf("no active task runner found for task %s", task.Id.Hex()) + return "", fmt.Errorf("no active task runner") } - // Cast to concrete Runner type to access status cache methods runner, ok := taskRunner.(*handler.Runner) if !ok { - return "", fmt.Errorf("task runner is not of expected type for task %s", task.Id.Hex()) + return "", fmt.Errorf("unexpected task runner type") } - // Get cached status from the runner cachedSnapshot := runner.GetCachedTaskStatus() if cachedSnapshot == nil { - return "", fmt.Errorf("no cached status available for task %s", task.Id.Hex()) + return "", fmt.Errorf("no cached status available") } - svc.Infof("retrieved cached status for task[%s]: %s (cached at %v)", - task.Id.Hex(), cachedSnapshot.Status, cachedSnapshot.Timestamp) return cachedSnapshot.Status, nil } -// queryProcessStatusFromWorker directly queries the worker node for the actual process status -func (svc *TaskReconciliationService) queryProcessStatusFromWorker(node *models.Node, task *models.Task) (processStatus string, err error) { - // Check if there's an active stream for this task - _, hasActiveStream := svc.server.TaskSvr.GetSubscribeStream(task.Id) +func (svc *TaskReconciliationService) triggerWorkerStatusSync(task *models.Task) error { + if svc.taskHandlerSvc == nil { + return fmt.Errorf("task handler service not available") + } - // Check if the node is still connected via subscription + taskRunner := svc.taskHandlerSvc.GetTaskRunner(task.Id) + if taskRunner == nil { + return fmt.Errorf("no active task runner") + } + + runner, ok := taskRunner.(*handler.Runner) + if !ok { + return fmt.Errorf("unexpected task runner type") + } + + if err := runner.SyncPendingStatusUpdates(); err != nil { + return fmt.Errorf("failed to sync: %w", err) + } + + svc.Infof("triggered status sync for task[%s]", task.Id.Hex()) + return nil +} + +func (svc *TaskReconciliationService) queryProcessStatus(node *models.Node, task *models.Task) (string, error) { + // Check task stream exists + _, hasActiveStream := svc.server.TaskSvr.GetSubscribeStream(task.Id) + if hasActiveStream { + return constants.TaskStatusRunning, nil + } + + // Check node connection nodeStream, nodeConnected := svc.server.NodeSvr.GetSubscribeStream(node.Id) if !nodeConnected { - return "", fmt.Errorf("node[%s] is not connected", node.Key) + return "", fmt.Errorf("node not connected") } - // Query the worker for actual process status + // Query worker for process status if nodeStream != nil && task.Pid > 0 { - // Send a process status query to the worker - actualStatus, err := svc.requestProcessStatusFromWorker(nodeStream, task, 5*time.Second) - if err != nil { - return "", fmt.Errorf("failed to get process status from worker: %w", err) - } - return actualStatus, nil + return svc.requestProcessStatusFromWorker(node, task, 5*time.Second) } - // If we can't query the worker directly, return error - if hasActiveStream { - return constants.TaskStatusRunning, nil // Task likely still running if stream exists - } - return "", fmt.Errorf("unable to determine process status for task[%s] on node[%s]", task.Id.Hex(), node.Key) + return "", fmt.Errorf("unable to determine process status") } -// requestProcessStatusFromWorker sends a status query request to the worker node -func (svc *TaskReconciliationService) requestProcessStatusFromWorker(nodeStream grpc.NodeService_SubscribeServer, task *models.Task, timeout time.Duration) (string, error) { - // Check if task has a valid PID +func (svc *TaskReconciliationService) requestProcessStatusFromWorker(node *models.Node, task *models.Task, timeout time.Duration) (string, error) { if task.Pid <= 0 { - return "", fmt.Errorf("task[%s] has invalid PID: %d", task.Id.Hex(), task.Pid) + return "", fmt.Errorf("invalid PID: %d", task.Pid) } - // Get the node for this task - node, err := service.NewModelService[models.Node]().GetById(task.NodeId) - if err != nil { - return "", fmt.Errorf("failed to get node[%s] for task[%s]: %w", task.NodeId.Hex(), task.Id.Hex(), err) - } - - // Attempt to query worker directly - workerStatus, err := svc.queryWorkerProcessStatus(node, task, timeout) - if err != nil { - return "", fmt.Errorf("worker process status query failed: %w", err) - } - - svc.Infof("successfully queried worker process status for task[%s]: %s", task.Id.Hex(), workerStatus) - return workerStatus, nil -} - -// mapProcessStatusToTaskStatus converts gRPC process status to task status -func (svc *TaskReconciliationService) mapProcessStatusToTaskStatus(processStatus grpc.ProcessStatus, exitCode int32, task *models.Task) string { - switch processStatus { - case grpc.ProcessStatus_PROCESS_RUNNING: - return constants.TaskStatusRunning - case grpc.ProcessStatus_PROCESS_FINISHED: - // Process finished - check exit code to determine success or failure - if exitCode == 0 { - return constants.TaskStatusFinished - } - return constants.TaskStatusError - case grpc.ProcessStatus_PROCESS_ERROR: - return constants.TaskStatusError - case grpc.ProcessStatus_PROCESS_NOT_FOUND: - // Process not found - could mean it finished and was cleaned up - // Check if task was recently active to determine likely outcome - if time.Since(task.UpdatedAt) < 5*time.Minute { - // Recently active task with missing process - likely completed - if task.Error != "" { - return constants.TaskStatusError - } - return constants.TaskStatusFinished - } - // Old task with missing process - probably error - return constants.TaskStatusError - case grpc.ProcessStatus_PROCESS_ZOMBIE: - // Zombie process indicates abnormal termination - return constants.TaskStatusError - case grpc.ProcessStatus_PROCESS_UNKNOWN: - fallthrough - default: - // Unknown status - return error instead of using heuristics - svc.Warnf("unknown process status %v for task[%s]", processStatus, task.Id.Hex()) - return constants.TaskStatusError - } -} - -// createWorkerClient creates a gRPC client connection to a worker node -// This is a placeholder for future implementation when worker discovery is available -func (svc *TaskReconciliationService) createWorkerClient(node *models.Node) (grpc.TaskServiceClient, error) { - // TODO: Implement worker node discovery and connection - // This would require: - // 1. Worker nodes to register their gRPC server endpoints - // 2. A service discovery mechanism - // 3. Connection pooling and management - // - // For now, return an error to indicate this functionality is not yet available - return nil, fmt.Errorf("direct worker client connections not yet implemented - need worker discovery infrastructure") -} - -// queryWorkerProcessStatus attempts to query a worker node directly for process status -// This demonstrates the intended future architecture for worker communication -func (svc *TaskReconciliationService) queryWorkerProcessStatus(node *models.Node, task *models.Task, timeout time.Duration) (string, error) { - // This is the intended implementation once worker discovery is available - - // 1. Create gRPC client to worker + // TODO: Implement actual gRPC call to worker + // For now, this is a placeholder that demonstrates the intended architecture client, err := svc.createWorkerClient(node) if err != nil { return "", fmt.Errorf("failed to create worker client: %w", err) } - // 2. Create timeout context ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - // 3. Send process status request req := &grpc.TaskServiceCheckProcessRequest{ TaskId: task.Id.Hex(), Pid: int32(task.Pid), @@ -337,73 +316,237 @@ func (svc *TaskReconciliationService) queryWorkerProcessStatus(node *models.Node resp, err := client.CheckProcess(ctx, req) if err != nil { - return "", fmt.Errorf("worker process status query failed: %w", err) + return "", fmt.Errorf("worker query failed: %w", err) } - // 4. Convert process status to task status taskStatus := svc.mapProcessStatusToTaskStatus(resp.Status, resp.ExitCode, task) - - svc.Infof("worker reported process status for task[%s]: process_status=%s, exit_code=%d, mapped_to=%s", - task.Id.Hex(), resp.Status.String(), resp.ExitCode, taskStatus) + svc.Infof("worker reported status for task[%s]: %s (exit=%d)", task.Id.Hex(), taskStatus, resp.ExitCode) return taskStatus, nil } -// syncTaskStatusWithProcess ensures task status matches the actual process status -func (svc *TaskReconciliationService) syncTaskStatusWithProcess(task *models.Task, actualProcessStatus string) (string, error) { - // If the actual process status differs from the database status, we need to sync - if task.Status != actualProcessStatus { - svc.Infof("syncing task[%s] status from '%s' to '%s' based on actual process status", - task.Id.Hex(), task.Status, actualProcessStatus) +func (svc *TaskReconciliationService) mapProcessStatusToTaskStatus(processStatus grpc.ProcessStatus, exitCode int32, task *models.Task) string { + switch processStatus { + case grpc.ProcessStatus_PROCESS_RUNNING: + return constants.TaskStatusRunning - // Update the task status in the database to match reality - err := svc.updateTaskStatusReliably(task, actualProcessStatus) - if err != nil { - svc.Errorf("failed to sync task[%s] status: %v", task.Id.Hex(), err) - return task.Status, err // Return original status if sync fails + case grpc.ProcessStatus_PROCESS_FINISHED: + if exitCode == 0 { + return constants.TaskStatusFinished + } + return constants.TaskStatusError + + case grpc.ProcessStatus_PROCESS_ERROR, grpc.ProcessStatus_PROCESS_ZOMBIE: + return constants.TaskStatusError + + case grpc.ProcessStatus_PROCESS_NOT_FOUND: + // Recently active task with missing process likely completed + if time.Since(task.UpdatedAt) < 5*time.Minute { + if task.Error != "" { + return constants.TaskStatusError + } + return constants.TaskStatusFinished + } + return constants.TaskStatusError + + default: + svc.Warnf("unknown process status %v for task[%s]", processStatus, task.Id.Hex()) + return constants.TaskStatusError + } +} + +func (svc *TaskReconciliationService) createWorkerClient(node *models.Node) (grpc.TaskServiceClient, error) { + // TODO: Implement worker discovery and gRPC client creation + return nil, fmt.Errorf("worker client not yet implemented - need service discovery") +} + +// ============================================================================ +// Section 4: Periodic Background Tasks +// ============================================================================ + +func (svc *TaskReconciliationService) runPeriodicReconciliation() { + ticker := time.NewTicker(reconciliationInterval) + defer ticker.Stop() + + for range ticker.C { + if err := svc.reconcileAllRunningTasks(); err != nil { + svc.Errorf("periodic reconciliation failed: %v", err) + } + } +} + +func (svc *TaskReconciliationService) reconcileAllRunningTasks() error { + query := bson.M{ + "status": bson.M{ + "$in": []string{constants.TaskStatusRunning, constants.TaskStatusNodeDisconnected}, + }, + } + + tasks, err := service.NewModelService[models.Task]().GetMany(query, nil) + if err != nil { + return err + } + + svc.Debugf("reconciling %d tasks", len(tasks)) + for _, task := range tasks { + if err := svc.reconcileTaskStatus(&task); err != nil { + svc.Errorf("failed to reconcile task[%s]: %v", task.Id.Hex(), err) } } - return actualProcessStatus, nil + return nil } -// updateTaskStatusReliably updates task status with retry logic and validation -func (svc *TaskReconciliationService) updateTaskStatusReliably(task *models.Task, newStatus string) error { - // Update task with the new status +func (svc *TaskReconciliationService) reconcileTaskStatus(task *models.Task) error { + node, err := service.NewModelService[models.Node]().GetById(task.NodeId) + if err != nil { + return fmt.Errorf("failed to get node: %w", err) + } + + actualStatus, err := svc.getActualTaskStatus(node, task) + if err != nil { + if svc.shouldMarkTaskAbnormal(task) { + return svc.markTaskAbnormal(task, err) + } + return err + } + + if actualStatus != task.Status { + svc.Infof("reconciling task[%s]: '%s' -> '%s'", task.Id.Hex(), task.Status, actualStatus) + return svc.updateTaskStatus(task, actualStatus) + } + + return nil +} + +func (svc *TaskReconciliationService) runPeriodicAssignedTaskCleanup() { + ticker := time.NewTicker(assignedCleanupInterval) + defer ticker.Stop() + + for range ticker.C { + if err := svc.cleanupStuckAssignedTasks(); err != nil { + svc.Errorf("assigned task cleanup failed: %v", err) + } + } +} + +func (svc *TaskReconciliationService) cleanupStuckAssignedTasks() error { + cutoff := time.Now().Add(-assignedTaskTimeout) + query := bson.M{ + "status": constants.TaskStatusAssigned, + "$or": []bson.M{ + {"updated_at": bson.M{"$lt": cutoff}}, + {"updated_at": bson.M{"$exists": false}}, + }, + } + + tasks, err := service.NewModelService[models.Task]().GetMany(query, nil) + if err != nil { + return err + } + + if len(tasks) == 0 { + return nil + } + + svc.Infof("cleaning up %d stuck assigned tasks", len(tasks)) + for _, task := range tasks { + svc.handleStuckAssignedTask(&task) + } + + return nil +} + +func (svc *TaskReconciliationService) handleStuckAssignedTask(task *models.Task) { + node, err := service.NewModelService[models.Node]().GetById(task.NodeId) + if err != nil { + svc.Warnf("failed to get node for task[%s]: %v", task.Id.Hex(), err) + return + } + + // Only handle if node is offline or inactive + if node.Status == constants.NodeStatusOnline && node.Active { + svc.Warnf("task[%s] assigned to online node[%s] for %v without starting", + task.Id.Hex(), node.Key, time.Since(task.UpdatedAt)) + return + } + + originalNodeKey := node.Key + originalNodeId := task.NodeId + + availableNode, err := svc.findAvailableNodeForTask(task) + if err == nil && availableNode != nil { + // Re-assign to available node + task.Status = constants.TaskStatusPending + task.NodeId = availableNode.Id + task.Error = fmt.Sprintf("Stuck in assigned for 10+ min on node %s (%s), re-assigned to %s", + originalNodeKey, originalNodeId.Hex(), availableNode.Key) + svc.Infof("re-assigned stuck task[%s] to node[%s]", task.Id.Hex(), availableNode.Key) + } else { + // Mark as abnormal + task.Status = constants.TaskStatusAbnormal + task.Error = fmt.Sprintf("Stuck in assigned for 10+ min on node %s (%s), no available nodes", + originalNodeKey, originalNodeId.Hex()) + svc.Infof("marked stuck task[%s] as abnormal", task.Id.Hex()) + } + + if err := svc.saveTask(task); err != nil { + svc.Errorf("failed to handle stuck assigned task[%s]: %v", task.Id.Hex(), err) + } +} + +// ============================================================================ +// Section 5: Database Operations & Utilities +// ============================================================================ + +func (svc *TaskReconciliationService) findTasksByStatus(nodeId primitive.ObjectID, status string) []models.Task { + query := bson.M{"node_id": nodeId, "status": status} + tasks, err := service.NewModelService[models.Task]().GetMany(query, nil) + if err != nil { + svc.Errorf("failed to query tasks with status[%s] for node[%s]: %v", status, nodeId.Hex(), err) + return nil + } + return tasks +} + +func (svc *TaskReconciliationService) markTaskDisconnected(task *models.Task, nodeKey string) { + task.Status = constants.TaskStatusNodeDisconnected + task.Error = "Task temporarily disconnected due to worker node offline" + if err := svc.saveTask(task); err != nil { + svc.Errorf("failed to mark task[%s] disconnected for node[%s]: %v", task.Id.Hex(), nodeKey, err) + } else { + svc.Debugf("marked task[%s] as disconnected for node[%s]", task.Id.Hex(), nodeKey) + } +} + +func (svc *TaskReconciliationService) updateTaskStatus(task *models.Task, newStatus string) error { task.Status = newStatus - // Add appropriate error message for certain status transitions switch newStatus { + case constants.TaskStatusFinished: + task.Error = "" case constants.TaskStatusError: if task.Error == "" { task.Error = "Task status synchronized from actual process state" } - case constants.TaskStatusFinished: - // Clear error message for successfully completed tasks - task.Error = "" case constants.TaskStatusAbnormal: if task.Error == "" { task.Error = "Task marked as abnormal during status reconciliation" } - case constants.TaskStatusNodeDisconnected: - // Don't modify error message for disconnected status - keep existing context - // The disconnect reason should already be in the error field } - task.SetUpdated(primitive.NilObjectID) + return svc.saveTask(task) +} - // Update with retry logic +func (svc *TaskReconciliationService) saveTask(task *models.Task) error { + task.SetUpdated(primitive.NilObjectID) return backoff.Retry(func() error { return service.NewModelService[models.Task]().ReplaceById(task.Id, *task) }, backoff.WithMaxRetries(backoff.NewConstantBackOff(500*time.Millisecond), 3)) } func (svc *TaskReconciliationService) shouldMarkTaskAbnormal(task *models.Task) bool { - if task == nil { - return false - } - - if svc.IsTaskStatusFinal(task.Status) { + if task == nil || svc.IsTaskStatusFinal(task.Status) { return false } @@ -416,11 +559,7 @@ func (svc *TaskReconciliationService) shouldMarkTaskAbnormal(task *models.Task) lastUpdated = task.CreatedAt } - if lastUpdated.IsZero() { - return false - } - - return time.Since(lastUpdated) >= staleReconciliationThreshold + return !lastUpdated.IsZero() && time.Since(lastUpdated) >= staleReconciliationThreshold } func (svc *TaskReconciliationService) markTaskAbnormal(task *models.Task, cause error) error { @@ -428,11 +567,11 @@ func (svc *TaskReconciliationService) markTaskAbnormal(task *models.Task, cause return fmt.Errorf("task is nil") } - reasonParts := make([]string, 0, 2) + reasonParts := []string{} if cause != nil { - reasonParts = append(reasonParts, fmt.Sprintf("last reconciliation error: %v", cause)) + reasonParts = append(reasonParts, fmt.Sprintf("reconciliation error: %v", cause)) } - reasonParts = append(reasonParts, fmt.Sprintf("task status not reconciled for %s", staleReconciliationThreshold)) + reasonParts = append(reasonParts, fmt.Sprintf("not reconciled for %s", staleReconciliationThreshold)) reason := strings.Join(reasonParts, "; ") if task.Error == "" { @@ -441,103 +580,79 @@ func (svc *TaskReconciliationService) markTaskAbnormal(task *models.Task, cause task.Error = fmt.Sprintf("%s; %s", task.Error, reason) } - if err := svc.updateTaskStatusReliably(task, constants.TaskStatusAbnormal); err != nil { - svc.Errorf("failed to mark task[%s] abnormal after reconciliation timeout: %v", task.Id.Hex(), err) + if err := svc.updateTaskStatus(task, constants.TaskStatusAbnormal); err != nil { + svc.Errorf("failed to mark task[%s] abnormal: %v", task.Id.Hex(), err) return err } - svc.Warnf("marked task[%s] as abnormal after %s of unresolved reconciliation", task.Id.Hex(), staleReconciliationThreshold) + svc.Warnf("marked task[%s] as abnormal after %s", task.Id.Hex(), staleReconciliationThreshold) return nil } -// StartPeriodicReconciliation starts a background service to periodically reconcile task status -func (svc *TaskReconciliationService) StartPeriodicReconciliation() { - go svc.runPeriodicReconciliation() -} - -// runPeriodicReconciliation periodically checks and reconciles task status with actual process status -func (svc *TaskReconciliationService) runPeriodicReconciliation() { - ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds - defer ticker.Stop() - - for range ticker.C { - err := svc.reconcileRunningTasks() - if err != nil { - svc.Errorf("failed to reconcile running tasks: %v", err) - } - } -} - -// reconcileRunningTasks finds all running tasks and reconciles their status with actual process status -func (svc *TaskReconciliationService) reconcileRunningTasks() error { - // Find all tasks that might need reconciliation +func (svc *TaskReconciliationService) findAvailableNodeForTask(task *models.Task) (*models.Node, error) { query := bson.M{ - "status": bson.M{ - "$in": []string{ - constants.TaskStatusRunning, - constants.TaskStatusNodeDisconnected, - }, - }, + "status": constants.NodeStatusOnline, + "active": true, + "enabled": true, } - tasks, err := service.NewModelService[models.Task]().GetMany(query, nil) + nodes, err := service.NewModelService[models.Node]().GetMany(query, nil) if err != nil { - return err + return nil, fmt.Errorf("failed to query nodes: %w", err) } - svc.Debugf("found %d tasks to reconcile", len(tasks)) - - for _, task := range tasks { - err := svc.reconcileTaskStatus(&task) - if err != nil { - svc.Errorf("failed to reconcile task[%s]: %v", task.Id.Hex(), err) - } + if len(nodes) == 0 { + return nil, fmt.Errorf("no available nodes") } - return nil -} - -// reconcileTaskStatus reconciles a single task's status with its actual process status -func (svc *TaskReconciliationService) reconcileTaskStatus(task *models.Task) error { - // Get the node for this task - node, err := service.NewModelService[models.Node]().GetById(task.NodeId) - if err != nil { - svc.Warnf("failed to get node[%s] for task[%s]: %v", task.NodeId.Hex(), task.Id.Hex(), err) - return err - } - - // Get actual status from worker - actualStatus, err := svc.GetActualTaskStatusFromWorker(node, task) - if err != nil { - svc.Warnf("failed to get actual status for task[%s]: %v", task.Id.Hex(), err) - if svc.shouldMarkTaskAbnormal(task) { - if updateErr := svc.markTaskAbnormal(task, err); updateErr != nil { - return updateErr + // For selected nodes mode, try to find from selected list + if task.Mode == constants.RunTypeSelectedNodes && len(task.NodeIds) > 0 { + for _, node := range nodes { + for _, selectedId := range task.NodeIds { + if node.Id == selectedId { + svc.Debugf("found selected node[%s] for task[%s]", node.Key, task.Id.Hex()) + return &node, nil + } } - return nil } - return err + svc.Debugf("no selected nodes available for task[%s], using first available", task.Id.Hex()) } - // If status changed, update it - if actualStatus != task.Status { - svc.Infof("reconciling task[%s] status from '%s' to '%s'", task.Id.Hex(), task.Status, actualStatus) - return svc.updateTaskStatusReliably(task, actualStatus) - } - - return nil + // Use first available node + return &nodes[0], nil } -// ForceReconcileTask forces reconciliation of a specific task (useful for manual intervention) -func (svc *TaskReconciliationService) ForceReconcileTask(taskId primitive.ObjectID) error { - task, err := service.NewModelService[models.Task]().GetById(taskId) - if err != nil { - return fmt.Errorf("failed to get task[%s]: %w", taskId.Hex(), err) +// IsTaskStatusFinal returns true if the task is in a terminal state +func (svc *TaskReconciliationService) IsTaskStatusFinal(status string) bool { + switch status { + case constants.TaskStatusFinished, constants.TaskStatusError, + constants.TaskStatusCancelled, constants.TaskStatusAbnormal: + return true + default: + return false + } +} + +// ShouldReconcileTask determines if a task needs reconciliation +func (svc *TaskReconciliationService) ShouldReconcileTask(task *models.Task) bool { + if svc.IsTaskStatusFinal(task.Status) { + return false } - return svc.reconcileTaskStatus(task) + switch task.Status { + case constants.TaskStatusRunning, constants.TaskStatusNodeDisconnected: + return true + case constants.TaskStatusPending, constants.TaskStatusAssigned: + return time.Since(task.CreatedAt) > 10*time.Minute + default: + return false + } } +// ============================================================================ +// Section 6: Constructor & Singleton +// ============================================================================ + func NewTaskReconciliationService(server *server.GrpcServer, taskHandlerSvc *handler.Service) *TaskReconciliationService { return &TaskReconciliationService{ server: server, @@ -546,77 +661,16 @@ func NewTaskReconciliationService(server *server.GrpcServer, taskHandlerSvc *han } } -// ValidateTaskStatus ensures task status is consistent with actual process state -func (svc *TaskReconciliationService) ValidateTaskStatus(task *models.Task) error { - if task == nil { - return fmt.Errorf("task is nil") - } - - // Get the node for this task - node, err := service.NewModelService[models.Node]().GetById(task.NodeId) - if err != nil { - return fmt.Errorf("failed to get node for task: %w", err) - } - - // Get actual status - actualStatus, err := svc.GetActualTaskStatusFromWorker(node, task) - if err != nil { - return fmt.Errorf("failed to get actual task status: %w", err) - } - - // If status is inconsistent, log it and optionally fix it - if actualStatus != task.Status { - svc.Warnf("task[%s] status inconsistency detected: database='%s', actual='%s'", - task.Id.Hex(), task.Status, actualStatus) - - // Optionally auto-correct the status - return svc.updateTaskStatusReliably(task, actualStatus) - } - - return nil -} - -// IsTaskStatusFinal returns true if the task status represents a final state -func (svc *TaskReconciliationService) IsTaskStatusFinal(status string) bool { - switch status { - case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled, constants.TaskStatusAbnormal: - return true - default: - return false - } -} - -// ShouldReconcileTask determines if a task needs status reconciliation -func (svc *TaskReconciliationService) ShouldReconcileTask(task *models.Task) bool { - // Don't reconcile tasks in final states unless they're very old and might be stuck - if svc.IsTaskStatusFinal(task.Status) { - return false - } - - // Always reconcile running or disconnected tasks - switch task.Status { - case constants.TaskStatusRunning, constants.TaskStatusNodeDisconnected: - return true - case constants.TaskStatusPending, constants.TaskStatusAssigned: - // Reconcile if task has been pending/assigned for too long - return time.Since(task.CreatedAt) > 10*time.Minute - default: - return false - } -} - -// Singleton pattern -var taskReconciliationService *TaskReconciliationService -var taskReconciliationServiceOnce sync.Once +var ( + taskReconciliationService *TaskReconciliationService + taskReconciliationServiceOnce sync.Once +) func GetTaskReconciliationService() *TaskReconciliationService { taskReconciliationServiceOnce.Do(func() { - // Get the server from gRPC server singleton grpcServer := server.GetGrpcServer() - // Try to get task handler service (will be nil on master nodes) var taskHandlerSvc *handler.Service if !utils.IsMaster() { - // Only worker nodes have task handler service taskHandlerSvc = handler.GetTaskHandlerService() } taskReconciliationService = NewTaskReconciliationService(grpcServer, taskHandlerSvc)