mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
259 lines
9.4 KiB
Go
259 lines
9.4 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
"github.com/crawlab-team/crawlab/core/constants"
|
|
"github.com/crawlab-team/crawlab/core/grpc/server"
|
|
"github.com/crawlab-team/crawlab/core/interfaces"
|
|
"github.com/crawlab-team/crawlab/core/models/models"
|
|
"github.com/crawlab-team/crawlab/core/models/service"
|
|
"github.com/crawlab-team/crawlab/core/utils"
|
|
"github.com/crawlab-team/crawlab/grpc"
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
)
|
|
|
|
// TaskReconciliationService handles task status reconciliation for node disconnection scenarios
|
|
type TaskReconciliationService struct {
|
|
server *server.GrpcServer
|
|
interfaces.Logger
|
|
}
|
|
|
|
// HandleTasksForOfflineNode updates all running tasks on an offline node to abnormal status
|
|
func (svc *TaskReconciliationService) HandleTasksForOfflineNode(node *models.Node) {
|
|
// Find all running tasks on the offline node
|
|
query := bson.M{
|
|
"node_id": node.Id,
|
|
"status": constants.TaskStatusRunning,
|
|
}
|
|
|
|
runningTasks, err := service.NewModelService[models.Task]().GetMany(query, nil)
|
|
if err != nil {
|
|
svc.Errorf("failed to get running tasks for offline node[%s]: %v", node.Key, err)
|
|
return
|
|
}
|
|
|
|
if len(runningTasks) == 0 {
|
|
svc.Debugf("no running tasks found for offline node[%s]", node.Key)
|
|
return
|
|
}
|
|
|
|
svc.Infof("updating %d running tasks to abnormal status for offline node[%s]", len(runningTasks), node.Key)
|
|
|
|
// Update each task status to node_disconnected (recoverable)
|
|
for _, task := range runningTasks {
|
|
task.Status = constants.TaskStatusNodeDisconnected
|
|
task.Error = "Task temporarily disconnected due to worker node offline"
|
|
|
|
// Update the task in database
|
|
err := backoff.Retry(func() error {
|
|
return service.NewModelService[models.Task]().ReplaceById(task.Id, task)
|
|
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(500*time.Millisecond), 3))
|
|
|
|
if err != nil {
|
|
svc.Errorf("failed to update task[%s] status for offline node[%s]: %v", task.Id.Hex(), node.Key, err)
|
|
} else {
|
|
svc.Debugf("updated task[%s] status to abnormal for offline node[%s]", task.Id.Hex(), node.Key)
|
|
}
|
|
}
|
|
}
|
|
|
|
// HandleNodeReconnection reconciles tasks that were marked as disconnected when the node comes back online
|
|
func (svc *TaskReconciliationService) HandleNodeReconnection(node *models.Node) {
|
|
// Find all disconnected tasks on this node
|
|
query := bson.M{
|
|
"node_id": node.Id,
|
|
"status": constants.TaskStatusNodeDisconnected,
|
|
}
|
|
|
|
disconnectedTasks, err := service.NewModelService[models.Task]().GetMany(query, nil)
|
|
if err != nil {
|
|
svc.Errorf("failed to get disconnected tasks for reconnected node[%s]: %v", node.Key, err)
|
|
return
|
|
}
|
|
|
|
if len(disconnectedTasks) == 0 {
|
|
svc.Debugf("no disconnected tasks found for reconnected node[%s]", node.Key)
|
|
return
|
|
}
|
|
|
|
svc.Infof("reconciling %d disconnected tasks for reconnected node[%s]", len(disconnectedTasks), node.Key)
|
|
|
|
// For each disconnected task, try to get its actual status from the worker node
|
|
for _, task := range disconnectedTasks {
|
|
actualStatus, err := svc.GetActualTaskStatusFromWorker(node, &task)
|
|
if err != nil {
|
|
svc.Warnf("failed to get actual status for task[%s] from reconnected node[%s]: %v", task.Id.Hex(), node.Key, err)
|
|
// If we can't determine the actual status, mark as abnormal after reconnection failure
|
|
task.Status = constants.TaskStatusAbnormal
|
|
task.Error = "Could not reconcile task status after node reconnection"
|
|
} else {
|
|
// Update with actual status from worker
|
|
task.Status = actualStatus
|
|
switch actualStatus {
|
|
case constants.TaskStatusFinished:
|
|
task.Error = "" // Clear error message for successfully completed tasks
|
|
case constants.TaskStatusError:
|
|
task.Error = "Task encountered an error during node disconnection"
|
|
}
|
|
}
|
|
|
|
// Update the task in database
|
|
err = backoff.Retry(func() error {
|
|
return service.NewModelService[models.Task]().ReplaceById(task.Id, task)
|
|
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(500*time.Millisecond), 3))
|
|
|
|
if err != nil {
|
|
svc.Errorf("failed to update reconciled task[%s] status for node[%s]: %v", task.Id.Hex(), node.Key, err)
|
|
} else {
|
|
svc.Infof("reconciled task[%s] status from 'node_disconnected' to '%s' for node[%s]", task.Id.Hex(), task.Status, node.Key)
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetActualTaskStatusFromWorker queries the worker node to get the actual status of a task
|
|
func (svc *TaskReconciliationService) GetActualTaskStatusFromWorker(node *models.Node, task *models.Task) (status string, err error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
// Check if there's an active stream for this task
|
|
_, hasActiveStream := svc.server.TaskSvr.GetSubscribeStream(task.Id)
|
|
|
|
// Check if the node is still connected via subscription
|
|
nodeStream, nodeConnected := svc.server.NodeSvr.GetSubscribeStream(node.Id)
|
|
if !nodeConnected {
|
|
svc.Warnf("node[%s] is not connected, using fallback detection for task[%s]", node.Key, task.Id.Hex())
|
|
return svc.inferTaskStatusFromStream(task.Id, hasActiveStream), nil
|
|
}
|
|
|
|
// Try to get more accurate status by checking recent task activity
|
|
actualStatus, err := svc.detectTaskStatusFromActivity(task, hasActiveStream)
|
|
if err != nil {
|
|
svc.Warnf("failed to detect task status from activity for task[%s]: %v", task.Id.Hex(), err)
|
|
return svc.inferTaskStatusFromStream(task.Id, hasActiveStream), nil
|
|
}
|
|
|
|
// Ping the node to verify it's responsive
|
|
if nodeStream != nil {
|
|
select {
|
|
case <-ctx.Done():
|
|
svc.Warnf("timeout while pinging node[%s] for task[%s]", node.Key, task.Id.Hex())
|
|
return svc.inferTaskStatusFromStream(task.Id, hasActiveStream), nil
|
|
default:
|
|
// Send a heartbeat to verify node responsiveness
|
|
err := nodeStream.Send(&grpc.NodeServiceSubscribeResponse{
|
|
Code: grpc.NodeServiceSubscribeCode_HEARTBEAT,
|
|
})
|
|
if err != nil {
|
|
svc.Warnf("failed to ping node[%s] for task status check: %v", node.Key, err)
|
|
return svc.inferTaskStatusFromStream(task.Id, hasActiveStream), nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return actualStatus, nil
|
|
}
|
|
|
|
// detectTaskStatusFromActivity analyzes task activity to determine its actual status
|
|
func (svc *TaskReconciliationService) detectTaskStatusFromActivity(task *models.Task, hasActiveStream bool) (string, error) {
|
|
// Check if task has been updated recently (within last 30 seconds)
|
|
if time.Since(task.UpdatedAt) < 30*time.Second {
|
|
// Task was recently updated, likely still active
|
|
if hasActiveStream {
|
|
return constants.TaskStatusRunning, nil
|
|
}
|
|
// Recently updated but no stream - check if it finished
|
|
return svc.checkTaskCompletion(task), nil
|
|
}
|
|
|
|
// Task hasn't been updated recently
|
|
if !hasActiveStream {
|
|
// No stream and no recent activity - likely finished or failed
|
|
return svc.checkTaskCompletion(task), nil
|
|
}
|
|
|
|
// Has stream but no recent updates - might be stuck
|
|
return constants.TaskStatusRunning, nil
|
|
}
|
|
|
|
// checkTaskCompletion determines if a task completed successfully or failed
|
|
func (svc *TaskReconciliationService) checkTaskCompletion(task *models.Task) string {
|
|
// Refresh task from database to get latest status
|
|
latestTask, err := service.NewModelService[models.Task]().GetById(task.Id)
|
|
if err != nil {
|
|
svc.Warnf("failed to refresh task[%s] from database: %v", task.Id.Hex(), err)
|
|
return constants.TaskStatusError
|
|
}
|
|
|
|
// If task status was already updated to a final state, return that
|
|
switch latestTask.Status {
|
|
case constants.TaskStatusFinished, constants.TaskStatusError, constants.TaskStatusCancelled:
|
|
return latestTask.Status
|
|
case constants.TaskStatusRunning:
|
|
// Task still shows as running but has no active stream - likely finished
|
|
if latestTask.Error != "" {
|
|
return constants.TaskStatusError
|
|
}
|
|
return constants.TaskStatusFinished
|
|
default:
|
|
// Unknown or intermediate status
|
|
return constants.TaskStatusError
|
|
}
|
|
}
|
|
|
|
// inferTaskStatusFromStream provides a fallback status inference based on stream presence
|
|
func (svc *TaskReconciliationService) inferTaskStatusFromStream(taskId primitive.ObjectID, hasActiveStream bool) string {
|
|
if !hasActiveStream {
|
|
// No active stream could mean:
|
|
// 1. Task finished successfully
|
|
// 2. Task failed and stream was closed
|
|
// 3. Worker disconnected ungracefully
|
|
//
|
|
// To determine which, we should check the task in the database
|
|
task, err := service.NewModelService[models.Task]().GetById(taskId)
|
|
if err != nil {
|
|
// If we can't find the task, assume it's in an error state
|
|
return constants.TaskStatusError
|
|
}
|
|
|
|
// If the task was last seen running and now has no stream,
|
|
// it likely finished or errored
|
|
switch task.Status {
|
|
case constants.TaskStatusRunning:
|
|
// Task was running but stream is gone - likely finished
|
|
return constants.TaskStatusFinished
|
|
case constants.TaskStatusPending, constants.TaskStatusAssigned:
|
|
// Task never started running - likely error
|
|
return constants.TaskStatusError
|
|
default:
|
|
// Return the last known status
|
|
return task.Status
|
|
}
|
|
}
|
|
|
|
// Stream exists, so task is likely still running
|
|
return constants.TaskStatusRunning
|
|
}
|
|
|
|
func NewTaskReconciliationService(server *server.GrpcServer) *TaskReconciliationService {
|
|
return &TaskReconciliationService{
|
|
server: server,
|
|
Logger: utils.NewLogger("TaskReconciliationService"),
|
|
}
|
|
}
|
|
|
|
// Singleton pattern
|
|
var taskReconciliationService *TaskReconciliationService
|
|
var taskReconciliationServiceOnce sync.Once
|
|
|
|
func GetTaskReconciliationService() *TaskReconciliationService {
|
|
taskReconciliationServiceOnce.Do(func() {
|
|
taskReconciliationService = NewTaskReconciliationService(nil) // Will be set by the master service
|
|
})
|
|
return taskReconciliationService
|
|
}
|