package handler import ( "context" "errors" "fmt" "io" "runtime" "sync" "time" "github.com/crawlab-team/crawlab/core/constants" grpcclient "github.com/crawlab-team/crawlab/core/grpc/client" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/client" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/core/models/service" nodeconfig "github.com/crawlab-team/crawlab/core/node/config" "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/grpc" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) type Service struct { // dependencies cfgSvc interfaces.NodeConfigService c *grpcclient.GrpcClient // grpc client // settings reportInterval time.Duration fetchInterval time.Duration fetchTimeout time.Duration cancelTimeout time.Duration // internals variables ctx context.Context cancel context.CancelFunc stopped bool mu sync.RWMutex runners sync.Map // pool of task runners started wg sync.WaitGroup // track background goroutines // tickers for cleanup fetchTicker *time.Ticker reportTicker *time.Ticker // worker pool for bounded task execution workerPool *TaskWorkerPool maxWorkers int // stream manager for leak-free stream handling streamManager *StreamManager interfaces.Logger } // StreamManager manages task streams without goroutine leaks type StreamManager struct { streams sync.Map // map[primitive.ObjectID]*TaskStream ctx context.Context cancel context.CancelFunc wg sync.WaitGroup service *Service messageQueue chan *StreamMessage maxStreams int } // TaskStream represents a single task's stream type TaskStream struct { taskId primitive.ObjectID stream grpc.TaskService_SubscribeClient ctx context.Context cancel context.CancelFunc lastActive time.Time mu sync.RWMutex } // StreamMessage represents a message from a stream type StreamMessage struct { taskId primitive.ObjectID msg *grpc.TaskServiceSubscribeResponse err error } // taskRequest represents a task execution request type taskRequest struct { taskId primitive.ObjectID } // TaskWorkerPool manages a bounded pool of workers for task execution type TaskWorkerPool struct { workers int ctx context.Context cancel context.CancelFunc wg sync.WaitGroup taskQueue chan taskRequest service *Service } func NewTaskWorkerPool(workers int, service *Service) *TaskWorkerPool { ctx, cancel := context.WithCancel(context.Background()) // Use a more generous queue size to handle task bursts // Queue size is workers * 5 to allow for better buffering queueSize := workers * 5 if queueSize < 50 { queueSize = 50 // Minimum queue size } return &TaskWorkerPool{ workers: workers, ctx: ctx, cancel: cancel, taskQueue: make(chan taskRequest, queueSize), service: service, } } func (pool *TaskWorkerPool) Start() { for i := 0; i < pool.workers; i++ { pool.wg.Add(1) go pool.worker(i) } } func (pool *TaskWorkerPool) Stop() { pool.cancel() close(pool.taskQueue) pool.wg.Wait() } func (pool *TaskWorkerPool) SubmitTask(taskId primitive.ObjectID) error { req := taskRequest{ taskId: taskId, } select { case pool.taskQueue <- req: pool.service.Debugf("task[%s] queued for parallel execution, queue usage: %d/%d", taskId.Hex(), len(pool.taskQueue), cap(pool.taskQueue)) return nil // Return immediately - task will execute in parallel case <-pool.ctx.Done(): return fmt.Errorf("worker pool is shutting down") default: queueLen := len(pool.taskQueue) queueCap := cap(pool.taskQueue) pool.service.Warnf("task queue is full (%d/%d), consider increasing task.workers configuration", queueLen, queueCap) return fmt.Errorf("task queue is full (%d/%d), consider increasing task.workers configuration", queueLen, queueCap) } } func (pool *TaskWorkerPool) worker(workerID int) { defer pool.wg.Done() defer func() { if r := recover(); r != nil { pool.service.Errorf("worker %d panic recovered: %v", workerID, r) } }() pool.service.Debugf("worker %d started", workerID) for { select { case <-pool.ctx.Done(): pool.service.Debugf("worker %d shutting down", workerID) return case req, ok := <-pool.taskQueue: if !ok { pool.service.Debugf("worker %d: task queue closed", workerID) return } // Execute task asynchronously - each worker handles one task at a time // but multiple workers can process different tasks simultaneously pool.service.Debugf("worker %d processing task[%s]", workerID, req.taskId.Hex()) err := pool.service.executeTask(req.taskId) if err != nil { pool.service.Errorf("worker %d failed to execute task[%s]: %v", workerID, req.taskId.Hex(), err) } else { pool.service.Debugf("worker %d completed task[%s]", workerID, req.taskId.Hex()) } } } } func (svc *Service) Start() { // Initialize context for graceful shutdown svc.ctx, svc.cancel = context.WithCancel(context.Background()) // wait for grpc client ready grpcclient.GetGrpcClient().WaitForReady() // Initialize tickers svc.fetchTicker = time.NewTicker(svc.fetchInterval) svc.reportTicker = time.NewTicker(svc.reportInterval) // Initialize and start worker pool svc.workerPool = NewTaskWorkerPool(svc.maxWorkers, svc) svc.workerPool.Start() // Initialize and start stream manager svc.streamManager.Start() // Start goroutine monitoring (adds to WaitGroup internally) svc.startGoroutineMonitoring() // Start background goroutines with WaitGroup tracking svc.wg.Add(2) go svc.reportStatus() go svc.fetchAndRunTasks() queueSize := cap(svc.workerPool.taskQueue) svc.Infof("Task handler service started with %d workers and queue size %d", svc.maxWorkers, queueSize) // Start the stuck task cleanup routine (adds to WaitGroup internally) svc.startStuckTaskCleanup() } func (svc *Service) Stop() { svc.mu.Lock() if svc.stopped { svc.mu.Unlock() return } svc.stopped = true svc.mu.Unlock() svc.Infof("Stopping task handler service...") // Cancel context to signal all goroutines to stop if svc.cancel != nil { svc.cancel() } // Stop worker pool first if svc.workerPool != nil { svc.workerPool.Stop() } // Stop stream manager if svc.streamManager != nil { svc.streamManager.Stop() } // Stop tickers to prevent new tasks if svc.fetchTicker != nil { svc.fetchTicker.Stop() } if svc.reportTicker != nil { svc.reportTicker.Stop() } // Cancel all running tasks gracefully svc.stopAllRunners() // Wait for all background goroutines to finish done := make(chan struct{}) go func() { svc.wg.Wait() close(done) }() // Give goroutines time to finish gracefully, then force stop select { case <-done: svc.Infof("All goroutines stopped gracefully") case <-time.After(30 * time.Second): svc.Warnf("Some goroutines did not stop gracefully within timeout") } svc.Infof("Task handler service stopped") } func (svc *Service) startGoroutineMonitoring() { svc.wg.Add(1) // Track goroutine monitoring in WaitGroup go func() { defer svc.wg.Done() defer func() { if r := recover(); r != nil { svc.Errorf("[TaskHandler] goroutine monitoring panic: %v", r) } }() ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() initialCount := runtime.NumGoroutine() svc.Infof("[TaskHandler] initial goroutine count: %d", initialCount) for { select { case <-svc.ctx.Done(): svc.Infof("[TaskHandler] goroutine monitoring shutting down") return case <-ticker.C: currentCount := runtime.NumGoroutine() if currentCount > initialCount+50 { // Alert if 50+ more goroutines than initial svc.Warnf("[TaskHandler] potential goroutine leak detected - current: %d, initial: %d, diff: %d", currentCount, initialCount, currentCount-initialCount) } else { svc.Debugf("[TaskHandler] goroutine count: %d (initial: %d)", currentCount, initialCount) } } } }() } func (svc *Service) Run(taskId primitive.ObjectID) (err error) { return svc.runTask(taskId) } func (svc *Service) Cancel(taskId primitive.ObjectID, force bool) (err error) { return svc.cancelTask(taskId, force) } func (svc *Service) fetchAndRunTasks() { defer svc.wg.Done() defer func() { if r := recover(); r != nil { svc.Errorf("fetchAndRunTasks panic recovered: %v", r) } }() for { select { case <-svc.ctx.Done(): svc.Infof("fetchAndRunTasks stopped by context") return case <-svc.fetchTicker.C: // Use a separate context with timeout for each operation if err := svc.processFetchCycle(); err != nil { //svc.Debugf("fetch cycle error: %v", err) } } } } func (svc *Service) processFetchCycle() error { // Check if stopped svc.mu.RLock() stopped := svc.stopped svc.mu.RUnlock() if stopped { return fmt.Errorf("service stopped") } // current node n, err := svc.GetCurrentNode() if err != nil { return fmt.Errorf("failed to get current node: %w", err) } // skip if node is not active or enabled if !n.Active || !n.Enabled { return fmt.Errorf("node not active or enabled") } // validate if max runners is reached (max runners = 0 means no limit) if n.MaxRunners > 0 && svc.getRunnerCount() >= n.MaxRunners { return fmt.Errorf("max runners reached") } // fetch task id tid, err := svc.fetchTask() if err != nil { return fmt.Errorf("failed to fetch task: %w", err) } // skip if no task id if tid.IsZero() { return fmt.Errorf("no task available") } // run task - now using worker pool instead of unlimited goroutines if err := svc.runTask(tid); err != nil { // Handle task error t, getErr := svc.GetTaskById(tid) if getErr == nil && t.Status != constants.TaskStatusCancelled { t.Error = err.Error() t.Status = constants.TaskStatusError t.SetUpdated(t.CreatedBy) _ = client.NewModelService[models.Task]().ReplaceById(t.Id, *t) } return fmt.Errorf("failed to run task: %w", err) } return nil } func (svc *Service) reportStatus() { defer svc.wg.Done() defer func() { if r := recover(); r != nil { svc.Errorf("reportStatus panic recovered: %v", r) } }() for { select { case <-svc.ctx.Done(): svc.Infof("reportStatus stopped by context") return case <-svc.reportTicker.C: // Update node status with error handling if err := svc.updateNodeStatus(); err != nil { svc.Errorf("failed to report status: %v", err) } } } } func (svc *Service) GetCancelTimeout() (timeout time.Duration) { return svc.cancelTimeout } func (svc *Service) GetNodeConfigService() (cfgSvc interfaces.NodeConfigService) { return svc.cfgSvc } func (svc *Service) GetCurrentNode() (n *models.Node, err error) { // node key nodeKey := svc.cfgSvc.GetNodeKey() // current node if svc.cfgSvc.IsMaster() { n, err = service.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil) } else { n, err = client.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil) } if err != nil { return nil, err } return n, nil } func (svc *Service) GetTaskById(id primitive.ObjectID) (t *models.Task, err error) { if svc.cfgSvc.IsMaster() { t, err = service.NewModelService[models.Task]().GetById(id) } else { t, err = client.NewModelService[models.Task]().GetById(id) } if err != nil { svc.Errorf("failed to get task by id: %v", err) return nil, err } return t, nil } func (svc *Service) UpdateTask(t *models.Task) (err error) { t.SetUpdated(t.CreatedBy) if svc.cfgSvc.IsMaster() { err = service.NewModelService[models.Task]().ReplaceById(t.Id, *t) } else { err = client.NewModelService[models.Task]().ReplaceById(t.Id, *t) } if err != nil { return err } return nil } func (svc *Service) GetSpiderById(id primitive.ObjectID) (s *models.Spider, err error) { if svc.cfgSvc.IsMaster() { s, err = service.NewModelService[models.Spider]().GetById(id) } else { s, err = client.NewModelService[models.Spider]().GetById(id) } if err != nil { svc.Errorf("failed to get spider by id: %v", err) return nil, err } return s, nil } func (svc *Service) getRunnerCount() (count int) { n, err := svc.GetCurrentNode() if err != nil { svc.Errorf("failed to get current node: %v", err) return } query := bson.M{ "node_id": n.Id, "status": bson.M{ "$in": []string{constants.TaskStatusAssigned, constants.TaskStatusRunning}, }, } if svc.cfgSvc.IsMaster() { count, err = service.NewModelService[models.Task]().Count(query) if err != nil { svc.Errorf("failed to count tasks: %v", err) return } } else { count, err = client.NewModelService[models.Task]().Count(query) if err != nil { svc.Errorf("failed to count tasks: %v", err) return } } return count } func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) { svc.Debugf("get runner: taskId[%v]", taskId) v, ok := svc.runners.Load(taskId) if !ok { err = fmt.Errorf("task[%s] not exists", taskId.Hex()) svc.Errorf("get runner error: %v", err) return nil, err } switch v := v.(type) { case interfaces.TaskRunner: r = v default: err = fmt.Errorf("invalid type: %T", v) svc.Errorf("get runner error: %v", err) return nil, err } return r, nil } func (svc *Service) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) { svc.Debugf("add runner: taskId[%s]", taskId.Hex()) svc.runners.Store(taskId, r) } func (svc *Service) deleteRunner(taskId primitive.ObjectID) { svc.Debugf("delete runner: taskId[%v]", taskId) svc.runners.Delete(taskId) } func (svc *Service) updateNodeStatus() (err error) { // current node n, err := svc.GetCurrentNode() if err != nil { return err } // set available runners n.CurrentRunners = svc.getRunnerCount() // save node n.SetUpdated(n.CreatedBy) if svc.cfgSvc.IsMaster() { err = service.NewModelService[models.Node]().ReplaceById(n.Id, *n) } else { err = client.NewModelService[models.Node]().ReplaceById(n.Id, *n) } if err != nil { return err } return nil } func (svc *Service) fetchTask() (tid primitive.ObjectID, err error) { ctx, cancel := context.WithTimeout(context.Background(), svc.fetchTimeout) defer cancel() taskClient, err := svc.c.GetTaskClient() if err != nil { return primitive.NilObjectID, fmt.Errorf("failed to get task client: %v", err) } res, err := taskClient.FetchTask(ctx, &grpc.TaskServiceFetchTaskRequest{ NodeKey: svc.cfgSvc.GetNodeKey(), }) if err != nil { return primitive.NilObjectID, fmt.Errorf("fetchTask task error: %v", err) } // validate task id tid, err = primitive.ObjectIDFromHex(res.GetTaskId()) if err != nil { return primitive.NilObjectID, fmt.Errorf("invalid task id: %s", res.GetTaskId()) } return tid, nil } func (svc *Service) runTask(taskId primitive.ObjectID) (err error) { // attempt to get runner from pool _, ok := svc.runners.Load(taskId) if ok { err = fmt.Errorf("task[%s] already exists", taskId.Hex()) svc.Errorf("run task error: %v", err) return err } // Use worker pool for bounded task execution return svc.workerPool.SubmitTask(taskId) } // executeTask is the actual task execution logic called by worker pool func (svc *Service) executeTask(taskId primitive.ObjectID) (err error) { // attempt to get runner from pool _, ok := svc.runners.Load(taskId) if ok { err = fmt.Errorf("task[%s] already exists", taskId.Hex()) svc.Errorf("execute task error: %v", err) return err } // create a new task runner r, err := newTaskRunner(taskId, svc) if err != nil { err = fmt.Errorf("failed to create task runner: %v", err) svc.Errorf("execute task error: %v", err) return err } // add runner to pool svc.addRunner(taskId, r) // Ensure cleanup always happens defer func() { if rec := recover(); rec != nil { svc.Errorf("task[%s] panic recovered: %v", taskId.Hex(), rec) } // Always cleanup runner from pool and stream svc.deleteRunner(taskId) svc.streamManager.RemoveTaskStream(taskId) }() // Add task to stream manager for cancellation support if err := svc.streamManager.AddTaskStream(r.GetTaskId()); err != nil { svc.Warnf("failed to add task[%s] to stream manager: %v", r.GetTaskId().Hex(), err) svc.Warnf("task[%s] will not be able to receive cancellation messages", r.GetTaskId().Hex()) } else { svc.Debugf("task[%s] added to stream manager for cancellation support", r.GetTaskId().Hex()) } // run task process (blocking) error or finish after task runner ends if err := r.Run(); err != nil { switch { case errors.Is(err, constants.ErrTaskError): svc.Errorf("task[%s] finished with error: %v", r.GetTaskId().Hex(), err) case errors.Is(err, constants.ErrTaskCancelled): svc.Infof("task[%s] cancelled", r.GetTaskId().Hex()) default: svc.Errorf("task[%s] finished with unknown error: %v", r.GetTaskId().Hex(), err) } } else { svc.Infof("task[%s] finished successfully", r.GetTaskId().Hex()) } return err } // subscribeTask attempts to subscribe to task stream func (svc *Service) subscribeTask(taskId primitive.ObjectID) (stream grpc.TaskService_SubscribeClient, err error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() req := &grpc.TaskServiceSubscribeRequest{ TaskId: taskId.Hex(), } taskClient, err := svc.c.GetTaskClient() if err != nil { return nil, fmt.Errorf("failed to get task client: %v", err) } stream, err = taskClient.Subscribe(ctx, req) if err != nil { svc.Errorf("failed to subscribe task[%s]: %v", taskId.Hex(), err) return nil, err } return stream, nil } // subscribeTaskWithRetry attempts to subscribe to task stream with retry logic func (svc *Service) subscribeTaskWithRetry(ctx context.Context, taskId primitive.ObjectID, maxRetries int) (stream grpc.TaskService_SubscribeClient, err error) { for i := 0; i < maxRetries; i++ { select { case <-ctx.Done(): return nil, ctx.Err() default: } stream, err = svc.subscribeTask(taskId) if err == nil { return stream, nil } svc.Warnf("failed to subscribe task[%s] (attempt %d/%d): %v", taskId.Hex(), i+1, maxRetries, err) if i < maxRetries-1 { // Wait before retry with exponential backoff backoff := time.Duration(i+1) * time.Second select { case <-ctx.Done(): return nil, ctx.Err() case <-time.After(backoff): } } } return nil, fmt.Errorf("failed to subscribe after %d retries: %w", maxRetries, err) } func (svc *Service) handleStreamMessagesSync(ctx context.Context, taskId primitive.ObjectID, stream grpc.TaskService_SubscribeClient) { defer func() { if r := recover(); r != nil { svc.Errorf("handleStreamMessagesSync[%s] panic recovered: %v", taskId.Hex(), r) } // Ensure stream is properly closed if stream != nil { if err := stream.CloseSend(); err != nil { svc.Debugf("task[%s] failed to close stream: %v", taskId.Hex(), err) } } }() svc.Debugf("task[%s] starting synchronous stream message handling", taskId.Hex()) for { select { case <-ctx.Done(): svc.Debugf("task[%s] stream handler stopped by context", taskId.Hex()) return case <-svc.ctx.Done(): svc.Debugf("task[%s] stream handler stopped by service context", taskId.Hex()) return default: // Set a reasonable timeout for stream receive recvCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // Create a done channel to handle the recv operation done := make(chan struct { msg *grpc.TaskServiceSubscribeResponse err error }, 1) // Use a goroutine only for the blocking recv call, but ensure it's cleaned up go func() { defer cancel() msg, err := stream.Recv() select { case done <- struct { msg *grpc.TaskServiceSubscribeResponse err error }{msg, err}: case <-recvCtx.Done(): // Timeout occurred, abandon this receive } }() select { case result := <-done: cancel() // Clean up the context if result.err != nil { if errors.Is(result.err, io.EOF) { svc.Infof("task[%s] received EOF, stream closed", taskId.Hex()) return } svc.Errorf("task[%s] stream error: %v", taskId.Hex(), result.err) return } svc.processStreamMessage(taskId, result.msg) case <-recvCtx.Done(): cancel() // Timeout on receive - continue to next iteration svc.Debugf("task[%s] stream receive timeout", taskId.Hex()) case <-ctx.Done(): cancel() return } } } } func (svc *Service) handleStreamMessages(taskId primitive.ObjectID, stream grpc.TaskService_SubscribeClient, stopCh chan struct{}) { defer func() { if r := recover(); r != nil { svc.Errorf("handleStreamMessages[%s] panic recovered: %v", taskId.Hex(), r) } // Ensure stream is properly closed if stream != nil { if err := stream.CloseSend(); err != nil { svc.Debugf("task[%s] failed to close stream: %v", taskId.Hex(), err) } } }() // Create timeout for stream operations streamTimeout := 30 * time.Second for { select { case <-stopCh: svc.Debugf("task[%s] stream handler received stop signal", taskId.Hex()) return case <-svc.ctx.Done(): svc.Debugf("task[%s] stream handler stopped by service context", taskId.Hex()) return default: // Set deadline for receive operation ctx, cancel := context.WithTimeout(context.Background(), streamTimeout) // Create a buffered channel to receive the result result := make(chan struct { msg *grpc.TaskServiceSubscribeResponse err error }, 1) // Use a single goroutine to handle the blocking Recv call go func() { defer func() { if r := recover(); r != nil { svc.Errorf("task[%s] stream recv goroutine panic: %v", taskId.Hex(), r) } }() msg, err := stream.Recv() select { case result <- struct { msg *grpc.TaskServiceSubscribeResponse err error }{msg, err}: case <-ctx.Done(): // Context cancelled, don't send result } }() select { case res := <-result: cancel() if res.err != nil { if errors.Is(res.err, io.EOF) { svc.Infof("task[%s] received EOF, stream closed", taskId.Hex()) return } svc.Errorf("task[%s] stream error: %v", taskId.Hex(), res.err) return } svc.processStreamMessage(taskId, res.msg) case <-ctx.Done(): cancel() svc.Warnf("task[%s] stream receive timeout", taskId.Hex()) // Continue loop to try again case <-stopCh: cancel() return case <-svc.ctx.Done(): cancel() return } } } } func (svc *Service) processStreamMessage(taskId primitive.ObjectID, msg *grpc.TaskServiceSubscribeResponse) { switch msg.Code { case grpc.TaskServiceSubscribeCode_CANCEL: svc.Infof("task[%s] received cancel signal", taskId.Hex()) // Handle cancel synchronously to avoid goroutine accumulation svc.handleCancel(msg, taskId) default: svc.Debugf("task[%s] received unknown stream message code: %v", taskId.Hex(), msg.Code) } } func (svc *Service) handleCancel(msg *grpc.TaskServiceSubscribeResponse, taskId primitive.ObjectID) { // validate task id if msg.TaskId != taskId.Hex() { svc.Errorf("task[%s] received cancel signal for another task[%s]", taskId.Hex(), msg.TaskId) return } // cancel task err := svc.cancelTask(taskId, msg.Force) if err != nil { svc.Errorf("task[%s] failed to cancel: %v", taskId.Hex(), err) return } svc.Infof("task[%s] cancelled", taskId.Hex()) // set task status as "cancelled" t, err := svc.GetTaskById(taskId) if err != nil { svc.Errorf("task[%s] failed to get task: %v", taskId.Hex(), err) return } t.Status = constants.TaskStatusCancelled err = svc.UpdateTask(t) if err != nil { svc.Errorf("task[%s] failed to update task: %v", taskId.Hex(), err) } } func (svc *Service) cancelTask(taskId primitive.ObjectID, force bool) (err error) { r, err := svc.getRunner(taskId) if err != nil { // Runner not found, task might already be finished svc.Warnf("runner not found for task[%s]: %v", taskId.Hex(), err) return nil } // Attempt cancellation with timeout cancelCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) defer cancelFunc() cancelDone := make(chan error, 1) go func() { cancelDone <- r.Cancel(force) }() select { case err = <-cancelDone: if err != nil { svc.Errorf("failed to cancel task[%s]: %v", taskId.Hex(), err) // If cancellation failed and force is not set, try force cancellation if !force { svc.Warnf("escalating to force cancellation for task[%s]", taskId.Hex()) return svc.cancelTask(taskId, true) } return err } svc.Infof("task[%s] cancelled successfully", taskId.Hex()) case <-cancelCtx.Done(): svc.Errorf("timeout cancelling task[%s], removing runner from pool", taskId.Hex()) // Remove runner from pool to prevent further issues svc.runners.Delete(taskId) return fmt.Errorf("task cancellation timeout") } return nil } // stopAllRunners gracefully stops all running tasks func (svc *Service) stopAllRunners() { svc.Infof("Stopping all running tasks...") var runnerIds []primitive.ObjectID // Collect all runner IDs svc.runners.Range(func(key, value interface{}) bool { if taskId, ok := key.(primitive.ObjectID); ok { runnerIds = append(runnerIds, taskId) } return true }) // Cancel all runners with bounded concurrency to prevent goroutine explosion const maxConcurrentCancellations = 10 var wg sync.WaitGroup semaphore := make(chan struct{}, maxConcurrentCancellations) for _, taskId := range runnerIds { wg.Add(1) // Acquire semaphore to limit concurrent cancellations semaphore <- struct{}{} go func(tid primitive.ObjectID) { defer func() { <-semaphore // Release semaphore wg.Done() if r := recover(); r != nil { svc.Errorf("stopAllRunners panic for task[%s]: %v", tid.Hex(), r) } }() if err := svc.cancelTask(tid, false); err != nil { svc.Errorf("failed to cancel task[%s]: %v", tid.Hex(), err) // Force cancel after timeout time.Sleep(5 * time.Second) _ = svc.cancelTask(tid, true) } }(taskId) } // Wait for all cancellations with timeout done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { case <-done: svc.Infof("All tasks stopped gracefully") case <-time.After(30 * time.Second): svc.Warnf("Some tasks did not stop within timeout") } } func (svc *Service) startStuckTaskCleanup() { svc.wg.Add(1) // Track this goroutine in the WaitGroup go func() { defer svc.wg.Done() // Ensure WaitGroup is decremented defer func() { if r := recover(); r != nil { svc.Errorf("startStuckTaskCleanup panic recovered: %v", r) } }() ticker := time.NewTicker(5 * time.Minute) // Check every 5 minutes defer ticker.Stop() for { select { case <-svc.ctx.Done(): svc.Debugf("stuck task cleanup routine shutting down") return case <-ticker.C: svc.checkAndCleanupStuckTasks() } } }() } // checkAndCleanupStuckTasks checks for tasks that have been trying to cancel for too long func (svc *Service) checkAndCleanupStuckTasks() { defer func() { if r := recover(); r != nil { svc.Errorf("panic in stuck task cleanup: %v", r) } }() var stuckTasks []primitive.ObjectID // Check all running tasks svc.runners.Range(func(key, value interface{}) bool { taskId, ok := key.(primitive.ObjectID) if !ok { return true } // Get task from database to check its state t, err := svc.GetTaskById(taskId) if err != nil { svc.Errorf("failed to get task[%s] during stuck cleanup: %v", taskId.Hex(), err) return true } // Check if task has been in cancelling state too long (15+ minutes) if t.Status == constants.TaskStatusCancelled && time.Since(t.UpdatedAt) > 15*time.Minute { svc.Warnf("detected stuck cancelled task[%s], will force cleanup", taskId.Hex()) stuckTasks = append(stuckTasks, taskId) } return true }) // Force cleanup stuck tasks for _, taskId := range stuckTasks { svc.Infof("force cleaning up stuck task[%s]", taskId.Hex()) // Remove from runners map svc.runners.Delete(taskId) // Update task status to indicate it was force cleaned t, err := svc.GetTaskById(taskId) if err == nil { t.Status = constants.TaskStatusCancelled t.Error = "Task was stuck in cancelling state and was force cleaned up" if updateErr := svc.UpdateTask(t); updateErr != nil { svc.Errorf("failed to update stuck task[%s] status: %v", taskId.Hex(), updateErr) } } } if len(stuckTasks) > 0 { svc.Infof("cleaned up %d stuck tasks", len(stuckTasks)) } } func NewStreamManager(service *Service) *StreamManager { ctx, cancel := context.WithCancel(context.Background()) return &StreamManager{ ctx: ctx, cancel: cancel, service: service, messageQueue: make(chan *StreamMessage, 100), // Buffered channel for messages maxStreams: 50, // Limit concurrent streams } } func (sm *StreamManager) Start() { sm.wg.Add(2) go sm.messageProcessor() go sm.streamCleaner() } func (sm *StreamManager) Stop() { sm.cancel() close(sm.messageQueue) // Close all active streams sm.streams.Range(func(key, value interface{}) bool { if ts, ok := value.(*TaskStream); ok { ts.Close() } return true }) sm.wg.Wait() } func (sm *StreamManager) AddTaskStream(taskId primitive.ObjectID) error { // Check if we're at the stream limit streamCount := 0 sm.streams.Range(func(key, value interface{}) bool { streamCount++ return true }) if streamCount >= sm.maxStreams { return fmt.Errorf("stream limit reached (%d)", sm.maxStreams) } // Create new stream stream, err := sm.service.subscribeTask(taskId) if err != nil { return fmt.Errorf("failed to subscribe to task stream: %v", err) } ctx, cancel := context.WithCancel(sm.ctx) taskStream := &TaskStream{ taskId: taskId, stream: stream, ctx: ctx, cancel: cancel, lastActive: time.Now(), } sm.streams.Store(taskId, taskStream) // Start listening for messages in a single goroutine per stream sm.wg.Add(1) go sm.streamListener(taskStream) return nil } func (sm *StreamManager) RemoveTaskStream(taskId primitive.ObjectID) { if value, ok := sm.streams.LoadAndDelete(taskId); ok { if ts, ok := value.(*TaskStream); ok { ts.Close() } } } func (sm *StreamManager) streamListener(ts *TaskStream) { defer sm.wg.Done() defer func() { if r := recover(); r != nil { sm.service.Errorf("stream listener panic for task[%s]: %v", ts.taskId.Hex(), r) } ts.Close() sm.streams.Delete(ts.taskId) }() sm.service.Debugf("stream listener started for task[%s]", ts.taskId.Hex()) for { select { case <-ts.ctx.Done(): sm.service.Debugf("stream listener stopped for task[%s]", ts.taskId.Hex()) return case <-sm.ctx.Done(): return default: msg, err := ts.stream.Recv() if err != nil { if errors.Is(err, io.EOF) { sm.service.Debugf("stream EOF for task[%s]", ts.taskId.Hex()) return } if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } sm.service.Errorf("stream error for task[%s]: %v", ts.taskId.Hex(), err) return } // Update last active time ts.mu.Lock() ts.lastActive = time.Now() ts.mu.Unlock() // Send message to processor select { case sm.messageQueue <- &StreamMessage{ taskId: ts.taskId, msg: msg, err: nil, }: case <-ts.ctx.Done(): return case <-sm.ctx.Done(): return default: sm.service.Warnf("message queue full, dropping message for task[%s]", ts.taskId.Hex()) } } } } func (sm *StreamManager) messageProcessor() { defer sm.wg.Done() defer func() { if r := recover(); r != nil { sm.service.Errorf("message processor panic: %v", r) } }() sm.service.Debugf("stream message processor started") for { select { case <-sm.ctx.Done(): sm.service.Debugf("stream message processor shutting down") return case msg, ok := <-sm.messageQueue: if !ok { return } sm.processMessage(msg) } } } func (sm *StreamManager) processMessage(streamMsg *StreamMessage) { if streamMsg.err != nil { sm.service.Errorf("stream message error for task[%s]: %v", streamMsg.taskId.Hex(), streamMsg.err) return } // Process the actual message sm.service.processStreamMessage(streamMsg.taskId, streamMsg.msg) } func (sm *StreamManager) streamCleaner() { defer sm.wg.Done() defer func() { if r := recover(); r != nil { sm.service.Errorf("stream cleaner panic: %v", r) } }() ticker := time.NewTicker(2 * time.Minute) defer ticker.Stop() for { select { case <-sm.ctx.Done(): return case <-ticker.C: sm.cleanupInactiveStreams() } } } func (sm *StreamManager) cleanupInactiveStreams() { now := time.Now() inactiveThreshold := 10 * time.Minute sm.streams.Range(func(key, value interface{}) bool { taskId := key.(primitive.ObjectID) ts := value.(*TaskStream) ts.mu.RLock() lastActive := ts.lastActive ts.mu.RUnlock() if now.Sub(lastActive) > inactiveThreshold { sm.service.Debugf("cleaning up inactive stream for task[%s]", taskId.Hex()) sm.RemoveTaskStream(taskId) } return true }) } func (ts *TaskStream) Close() { ts.cancel() if ts.stream != nil { _ = ts.stream.CloseSend() } } func newTaskHandlerService() *Service { // service svc := &Service{ fetchInterval: 1 * time.Second, fetchTimeout: 15 * time.Second, reportInterval: 5 * time.Second, cancelTimeout: 60 * time.Second, maxWorkers: utils.GetTaskWorkers(), // Use configurable worker count mu: sync.RWMutex{}, runners: sync.Map{}, Logger: utils.NewLogger("TaskHandlerService"), } // dependency injection svc.cfgSvc = nodeconfig.GetNodeConfigService() // grpc client svc.c = grpcclient.GetGrpcClient() // initialize stream manager svc.streamManager = NewStreamManager(svc) return svc } var _service *Service var _serviceOnce sync.Once func GetTaskHandlerService() *Service { _serviceOnce.Do(func() { _service = newTaskHandlerService() }) return _service }