diff --git a/core/task/handler/runner_cleanup.go b/core/task/handler/runner_cleanup.go index 94c66e59..75fe2b5e 100644 --- a/core/task/handler/runner_cleanup.go +++ b/core/task/handler/runner_cleanup.go @@ -1,22 +1,3 @@ -/* - * Copyright (c) 2025. Core Digital Limited - * 版权所有 (c) 2025. 重庆科锐数研科技有限公司 (Core Digital Limited) - * All rights reserved. 保留所有权利。 - * - * 该软件由 重庆科锐数研科技有限公司 (Core Digital Limited) 开发,未经明确书面许可,任何人不得使用、复制、修改或分发该软件的任何部分。 - * This software is developed by Core Digital Limited. No one is permitted to use, copy, modify, or distribute this software without explicit written permission. - * - * 许可证: - * 该软件仅供授权使用。授权用户有权在授权范围内使用、复制、修改和分发该软件。 - * License: - * This software is for authorized use only. Authorized users are permitted to use, copy, modify, and distribute this software within the scope of their authorization. - * - * 免责声明: - * 该软件按"原样"提供,不附带任何明示或暗示的担保,包括但不限于对适销性和适用于特定目的的担保。在任何情况下,版权持有者或其许可方对因使用该软件而产生的任何损害或其他责任概不负责。 - * Disclaimer: - * This software is provided "as is," without any express or implied warranties, including but not limited to warranties of merchantability and fitness for a particular purpose. In no event shall the copyright holder or its licensors be liable for any damages or other liability arising from the use of this software. - */ - package handler import ( diff --git a/core/task/handler/runner_ipc.go b/core/task/handler/runner_ipc.go index 56908ea6..12920fcc 100644 --- a/core/task/handler/runner_ipc.go +++ b/core/task/handler/runner_ipc.go @@ -1,22 +1,3 @@ -/* - * Copyright (c) 2025. Core Digital Limited - * 版权所有 (c) 2025. 重庆科锐数研科技有限公司 (Core Digital Limited) - * All rights reserved. 保留所有权利。 - * - * 该软件由 重庆科锐数研科技有限公司 (Core Digital Limited) 开发,未经明确书面许可,任何人不得使用、复制、修改或分发该软件的任何部分。 - * This software is developed by Core Digital Limited. No one is permitted to use, copy, modify, or distribute this software without explicit written permission. - * - * 许可证: - * 该软件仅供授权使用。授权用户有权在授权范围内使用、复制、修改和分发该软件。 - * License: - * This software is for authorized use only. Authorized users are permitted to use, copy, modify, and distribute this software within the scope of their authorization. - * - * 免责声明: - * 该软件按"原样"提供,不附带任何明示或暗示的担保,包括但不限于对适销性和适用于特定目的的担保。在任何情况下,版权持有者或其许可方对因使用该软件而产生的任何损害或其他责任概不负责。 - * Disclaimer: - * This software is provided "as is," without any express or implied warranties, including but not limited to warranties of merchantability and fitness for a particular purpose. In no event shall the copyright holder or its licensors be liable for any damages or other liability arising from the use of this software. - */ - package handler import ( diff --git a/core/task/handler/runner_log.go b/core/task/handler/runner_log.go index 3733aad4..918fa82f 100644 --- a/core/task/handler/runner_log.go +++ b/core/task/handler/runner_log.go @@ -1,22 +1,3 @@ -/* - * Copyright (c) 2025. Core Digital Limited - * 版权所有 (c) 2025. 重庆科锐数研科技有限公司 (Core Digital Limited) - * All rights reserved. 保留所有权利。 - * - * 该软件由 重庆科锐数研科技有限公司 (Core Digital Limited) 开发,未经明确书面许可,任何人不得使用、复制、修改或分发该软件的任何部分。 - * This software is developed by Core Digital Limited. No one is permitted to use, copy, modify, or distribute this software without explicit written permission. - * - * 许可证: - * 该软件仅供授权使用。授权用户有权在授权范围内使用、复制、修改和分发该软件。 - * License: - * This software is for authorized use only. Authorized users are permitted to use, copy, modify, and distribute this software within the scope of their authorization. - * - * 免责声明: - * 该软件按"原样"提供,不附带任何明示或暗示的担保,包括但不限于对适销性和适用于特定目的的担保。在任何情况下,版权持有者或其许可方对因使用该软件而产生的任何损害或其他责任概不负责。 - * Disclaimer: - * This software is provided "as is," without any express or implied warranties, including but not limited to warranties of merchantability and fitness for a particular purpose. In no event shall the copyright holder or its licensors be liable for any damages or other liability arising from the use of this software. - */ - package handler import ( diff --git a/core/task/handler/service.go b/core/task/handler/service.go index 49ce521c..0c1554e4 100644 --- a/core/task/handler/service.go +++ b/core/task/handler/service.go @@ -61,7 +61,10 @@ func (svc *Service) Start() { svc.fetchTicker = time.NewTicker(svc.fetchInterval) svc.reportTicker = time.NewTicker(svc.reportInterval) - // Initialize and start worker pool + // Get max workers from current node configuration + svc.maxWorkers = svc.getCurrentNodeMaxRunners() + + // Initialize and start worker pool with dynamic max workers svc.workerPool = NewTaskWorkerPool(svc.maxWorkers, svc) svc.workerPool.Start() @@ -77,7 +80,11 @@ func (svc *Service) Start() { go svc.fetchAndRunTasks() queueSize := cap(svc.workerPool.taskQueue) - svc.Infof("Task handler service started with %d workers and queue size %d", svc.maxWorkers, queueSize) + if svc.maxWorkers == -1 { + svc.Infof("Task handler service started with unlimited workers (from node config) and queue size %d", queueSize) + } else { + svc.Infof("Task handler service started with %d max workers (from node config) and queue size %d", svc.maxWorkers, queueSize) + } // Start the stuck task cleanup routine (adds to WaitGroup internally) svc.startStuckTaskCleanup() @@ -277,6 +284,27 @@ func (svc *Service) GetNodeConfigService() (cfgSvc interfaces.NodeConfigService) return svc.cfgSvc } +func (svc *Service) getCurrentNodeMaxRunners() int { + n, err := svc.GetCurrentNode() + if err != nil { + svc.Errorf("failed to get current node for max runners: %v", err) + // Fallback to config default + return utils.GetNodeMaxRunners() + } + + // If MaxRunners is 0, it means unlimited workers + if n.MaxRunners == 0 { + return -1 // Use -1 internally to represent unlimited + } + + // If MaxRunners is negative (not set), use config default + if n.MaxRunners < 0 { + return utils.GetNodeMaxRunners() + } + + return n.MaxRunners +} + func (svc *Service) GetCurrentNode() (n *models.Node, err error) { // node key nodeKey := svc.cfgSvc.GetNodeKey() @@ -370,6 +398,29 @@ func (svc *Service) updateNodeStatus() (err error) { return err } + // Check if max runners configuration has changed and update worker pool + currentMaxWorkers := n.MaxRunners + // Handle unlimited workers (0 means unlimited) + if currentMaxWorkers == 0 { + currentMaxWorkers = -1 // Use -1 internally to represent unlimited + } else if currentMaxWorkers < 0 { + currentMaxWorkers = utils.GetNodeMaxRunners() // Use config default if not set (negative) + } + + if currentMaxWorkers != svc.maxWorkers { + if currentMaxWorkers == -1 { + svc.Infof("Node max runners changed from %d to unlimited, updating worker pool", svc.maxWorkers) + } else if svc.maxWorkers == -1 { + svc.Infof("Node max runners changed from unlimited to %d, updating worker pool", currentMaxWorkers) + } else { + svc.Infof("Node max runners changed from %d to %d, updating worker pool", svc.maxWorkers, currentMaxWorkers) + } + svc.maxWorkers = currentMaxWorkers + if svc.workerPool != nil { + svc.workerPool.UpdateMaxWorkers(currentMaxWorkers) + } + } + // set available runners n.CurrentRunners = svc.getRunnerCount() @@ -501,7 +552,7 @@ func newTaskHandlerService() *Service { fetchTimeout: 15 * time.Second, reportInterval: 5 * time.Second, cancelTimeout: 60 * time.Second, - maxWorkers: utils.GetTaskWorkers(), // Use configurable worker count + maxWorkers: utils.GetNodeMaxRunners(), mu: sync.RWMutex{}, runners: sync.Map{}, Logger: utils.NewLogger("TaskHandlerService"), diff --git a/core/task/handler/worker_pool.go b/core/task/handler/worker_pool.go index 1bbb26dc..02561746 100644 --- a/core/task/handler/worker_pool.go +++ b/core/task/handler/worker_pool.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "sync" + "time" + "github.com/crawlab-team/crawlab/core/utils" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -13,39 +15,65 @@ type taskRequest struct { taskId primitive.ObjectID } -// TaskWorkerPool manages a bounded pool of workers for task execution +// TaskWorkerPool manages a dynamic pool of workers for task execution type TaskWorkerPool struct { - workers int - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - taskQueue chan taskRequest - service *Service + maxWorkers int + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + taskQueue chan taskRequest + service *Service + + // Track active workers + activeWorkers int + workerMutex sync.RWMutex } -func NewTaskWorkerPool(workers int, service *Service) *TaskWorkerPool { +func NewTaskWorkerPool(maxWorkers int, service *Service) *TaskWorkerPool { // Use service context for proper cancellation chain ctx, cancel := context.WithCancel(service.ctx) - // Use a more generous queue size to handle task bursts - // Queue size is workers * 5 to allow for better buffering - queueSize := workers * 5 - if queueSize < 50 { - queueSize = 50 // Minimum queue size + + var queueSize int + if maxWorkers == -1 { + // Unlimited workers - use configured queue size or larger default + configuredSize := utils.GetTaskQueueSize() + if configuredSize > 0 { + queueSize = configuredSize + } else { + queueSize = 1000 // Large default for unlimited workers + } + } else { + // Limited workers - use configured size or calculate based on max workers + configuredSize := utils.GetTaskQueueSize() + if configuredSize > 0 { + queueSize = configuredSize + } else { + // Use a more generous queue size to handle task bursts + // Queue size is maxWorkers * 5 to allow for better buffering + queueSize = maxWorkers * 5 + if queueSize < 50 { + queueSize = 50 // Minimum queue size + } + } } return &TaskWorkerPool{ - workers: workers, - ctx: ctx, - cancel: cancel, - taskQueue: make(chan taskRequest, queueSize), - service: service, + maxWorkers: maxWorkers, + ctx: ctx, + cancel: cancel, + taskQueue: make(chan taskRequest, queueSize), + service: service, + activeWorkers: 0, + workerMutex: sync.RWMutex{}, } } func (pool *TaskWorkerPool) Start() { - for i := 0; i < pool.workers; i++ { - pool.wg.Add(1) - go pool.worker(i) + // Don't pre-create workers - they will be created on-demand + if pool.maxWorkers == -1 { + pool.service.Debugf("Task worker pool started with unlimited workers") + } else { + pool.service.Debugf("Task worker pool started with max workers: %d", pool.maxWorkers) } } @@ -64,30 +92,146 @@ func (pool *TaskWorkerPool) SubmitTask(taskId primitive.ObjectID) error { case pool.taskQueue <- req: pool.service.Debugf("task[%s] queued for parallel execution, queue usage: %d/%d", taskId.Hex(), len(pool.taskQueue), cap(pool.taskQueue)) + + // Try to create a new worker if we haven't reached the limit + pool.maybeCreateWorker() + return nil // Return immediately - task will execute in parallel case <-pool.ctx.Done(): return fmt.Errorf("worker pool is shutting down") default: queueLen := len(pool.taskQueue) queueCap := cap(pool.taskQueue) - pool.service.Warnf("task queue is full (%d/%d), consider increasing task.workers configuration", - queueLen, queueCap) - return fmt.Errorf("task queue is full (%d/%d), consider increasing task.workers configuration", - queueLen, queueCap) + if pool.maxWorkers == -1 { + pool.service.Warnf("task queue is full (%d/%d), consider increasing system resources", + queueLen, queueCap) + return fmt.Errorf("task queue is full (%d/%d), consider increasing system resources", + queueLen, queueCap) + } else { + pool.service.Warnf("task queue is full (%d/%d), consider increasing node max_runners configuration", + queueLen, queueCap) + return fmt.Errorf("task queue is full (%d/%d), consider increasing node max_runners configuration", + queueLen, queueCap) + } + } +} + +func (pool *TaskWorkerPool) UpdateMaxWorkers(newMaxWorkers int) { + pool.workerMutex.Lock() + defer pool.workerMutex.Unlock() + + oldMax := pool.maxWorkers + pool.maxWorkers = newMaxWorkers + + // Update queue size if needed + var needQueueResize bool + var newQueueSize int + + configuredSize := utils.GetTaskQueueSize() + + if newMaxWorkers == -1 { + // Unlimited workers + if configuredSize > 0 { + newQueueSize = configuredSize + } else { + newQueueSize = 1000 + } + needQueueResize = newQueueSize > cap(pool.taskQueue) + } else if oldMax == -1 { + // From unlimited to limited + if configuredSize > 0 { + newQueueSize = configuredSize + } else { + newQueueSize = newMaxWorkers * 5 + if newQueueSize < 50 { + newQueueSize = 50 + } + } + needQueueResize = true // Always resize when going from unlimited to limited + } else if newMaxWorkers > oldMax { + // Increase queue capacity + if configuredSize > 0 { + newQueueSize = configuredSize + } else { + newQueueSize = newMaxWorkers * 5 + if newQueueSize < 50 { + newQueueSize = 50 + } + } + needQueueResize = newQueueSize > cap(pool.taskQueue) + } + + if needQueueResize { + oldQueue := pool.taskQueue + pool.taskQueue = make(chan taskRequest, newQueueSize) + + // Copy existing tasks to new queue + close(oldQueue) + for req := range oldQueue { + select { + case pool.taskQueue <- req: + default: + // If new queue is somehow full, log the issue but don't block + pool.service.Warnf("Lost task during queue resize: %s", req.taskId.Hex()) + } + } + } + + if oldMax == -1 && newMaxWorkers != -1 { + pool.service.Infof("Updated worker pool from unlimited to max workers: %d", newMaxWorkers) + } else if oldMax != -1 && newMaxWorkers == -1 { + pool.service.Infof("Updated worker pool from max workers %d to unlimited", oldMax) + } else { + pool.service.Infof("Updated worker pool max workers from %d to %d", oldMax, newMaxWorkers) + } +} + +func (pool *TaskWorkerPool) maybeCreateWorker() { + pool.workerMutex.Lock() + defer pool.workerMutex.Unlock() + + // Only create a worker if we have tasks queued and haven't reached the limit + // For unlimited workers (maxWorkers == -1), always create if there are queued tasks + hasQueuedTasks := len(pool.taskQueue) > 0 + underLimit := pool.maxWorkers == -1 || pool.activeWorkers < pool.maxWorkers + + if hasQueuedTasks && underLimit { + pool.activeWorkers++ + workerID := pool.activeWorkers + pool.wg.Add(1) + go pool.worker(workerID) + + if pool.maxWorkers == -1 { + pool.service.Debugf("created on-demand worker %d (unlimited workers mode, total active: %d)", + workerID, pool.activeWorkers) + } else { + pool.service.Debugf("created on-demand worker %d (total active: %d/%d)", + workerID, pool.activeWorkers, pool.maxWorkers) + } } } func (pool *TaskWorkerPool) worker(workerID int) { defer pool.wg.Done() defer func() { + // Decrement active worker count when worker exits + pool.workerMutex.Lock() + pool.activeWorkers-- + pool.workerMutex.Unlock() + if r := recover(); r != nil { pool.service.Errorf("worker %d panic recovered: %v", workerID, r) } }() pool.service.Debugf("worker %d started", workerID) + idleTimeout := 5 * time.Minute // Worker will exit after 5 minutes of idleness + timer := time.NewTimer(idleTimeout) + defer timer.Stop() for { + timer.Reset(idleTimeout) + select { case <-pool.ctx.Done(): pool.service.Debugf("worker %d shutting down", workerID) @@ -108,6 +252,10 @@ func (pool *TaskWorkerPool) worker(workerID int) { } else { pool.service.Debugf("worker %d completed task[%s]", workerID, req.taskId.Hex()) } + case <-timer.C: + // Worker has been idle for too long, exit to save resources + pool.service.Debugf("worker %d exiting due to inactivity", workerID) + return } } } diff --git a/core/utils/config.go b/core/utils/config.go index 806af447..5a549c21 100644 --- a/core/utils/config.go +++ b/core/utils/config.go @@ -28,8 +28,8 @@ const ( DefaultApiAllowHeaders = "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With" DefaultApiPort = 8080 DefaultApiPath = "/api" - DefaultNodeMaxRunners = 0 // 0 means no limit - DefaultTaskWorkers = 30 // Default number of task workers + DefaultNodeMaxRunners = 20 // Default max concurrent task runners per node + DefaultTaskQueueSize = 100 // Default task queue size per node DefaultInstallRoot = "/app/install" DefaultInstallEnvs = "" MetadataConfigDirName = ".crawlab" @@ -242,11 +242,11 @@ func GetNodeMaxRunners() int { return DefaultNodeMaxRunners } -func GetTaskWorkers() int { - if res := viper.GetInt("task.workers"); res != 0 { +func GetTaskQueueSize() int { + if res := viper.GetInt("task.queue.size"); res != 0 { return res } - return DefaultTaskWorkers + return DefaultTaskQueueSize } func GetMetadataConfigPath() string {