refactor: update task worker pool to support dynamic max workers and improve queue management; enhance configuration defaults for node runners and task queue size

This commit is contained in:
Marvin Zhang
2025-08-07 18:16:23 +08:00
parent 6340a9b880
commit 78f9e0ca8d
6 changed files with 232 additions and 90 deletions

View File

@@ -1,22 +1,3 @@
/*
* Copyright (c) 2025. Core Digital Limited
* 版权所有 (c) 2025. 重庆科锐数研科技有限公司 (Core Digital Limited)
* All rights reserved. 保留所有权利。
*
* 该软件由 重庆科锐数研科技有限公司 (Core Digital Limited) 开发,未经明确书面许可,任何人不得使用、复制、修改或分发该软件的任何部分。
* This software is developed by Core Digital Limited. No one is permitted to use, copy, modify, or distribute this software without explicit written permission.
*
* 许可证:
* 该软件仅供授权使用。授权用户有权在授权范围内使用、复制、修改和分发该软件。
* License:
* This software is for authorized use only. Authorized users are permitted to use, copy, modify, and distribute this software within the scope of their authorization.
*
* 免责声明:
* 该软件按"原样"提供,不附带任何明示或暗示的担保,包括但不限于对适销性和适用于特定目的的担保。在任何情况下,版权持有者或其许可方对因使用该软件而产生的任何损害或其他责任概不负责。
* Disclaimer:
* This software is provided "as is," without any express or implied warranties, including but not limited to warranties of merchantability and fitness for a particular purpose. In no event shall the copyright holder or its licensors be liable for any damages or other liability arising from the use of this software.
*/
package handler
import (

View File

@@ -1,22 +1,3 @@
/*
* Copyright (c) 2025. Core Digital Limited
* 版权所有 (c) 2025. 重庆科锐数研科技有限公司 (Core Digital Limited)
* All rights reserved. 保留所有权利。
*
* 该软件由 重庆科锐数研科技有限公司 (Core Digital Limited) 开发,未经明确书面许可,任何人不得使用、复制、修改或分发该软件的任何部分。
* This software is developed by Core Digital Limited. No one is permitted to use, copy, modify, or distribute this software without explicit written permission.
*
* 许可证:
* 该软件仅供授权使用。授权用户有权在授权范围内使用、复制、修改和分发该软件。
* License:
* This software is for authorized use only. Authorized users are permitted to use, copy, modify, and distribute this software within the scope of their authorization.
*
* 免责声明:
* 该软件按"原样"提供,不附带任何明示或暗示的担保,包括但不限于对适销性和适用于特定目的的担保。在任何情况下,版权持有者或其许可方对因使用该软件而产生的任何损害或其他责任概不负责。
* Disclaimer:
* This software is provided "as is," without any express or implied warranties, including but not limited to warranties of merchantability and fitness for a particular purpose. In no event shall the copyright holder or its licensors be liable for any damages or other liability arising from the use of this software.
*/
package handler
import (

View File

@@ -1,22 +1,3 @@
/*
* Copyright (c) 2025. Core Digital Limited
* 版权所有 (c) 2025. 重庆科锐数研科技有限公司 (Core Digital Limited)
* All rights reserved. 保留所有权利。
*
* 该软件由 重庆科锐数研科技有限公司 (Core Digital Limited) 开发,未经明确书面许可,任何人不得使用、复制、修改或分发该软件的任何部分。
* This software is developed by Core Digital Limited. No one is permitted to use, copy, modify, or distribute this software without explicit written permission.
*
* 许可证:
* 该软件仅供授权使用。授权用户有权在授权范围内使用、复制、修改和分发该软件。
* License:
* This software is for authorized use only. Authorized users are permitted to use, copy, modify, and distribute this software within the scope of their authorization.
*
* 免责声明:
* 该软件按"原样"提供,不附带任何明示或暗示的担保,包括但不限于对适销性和适用于特定目的的担保。在任何情况下,版权持有者或其许可方对因使用该软件而产生的任何损害或其他责任概不负责。
* Disclaimer:
* This software is provided "as is," without any express or implied warranties, including but not limited to warranties of merchantability and fitness for a particular purpose. In no event shall the copyright holder or its licensors be liable for any damages or other liability arising from the use of this software.
*/
package handler
import (

View File

@@ -61,7 +61,10 @@ func (svc *Service) Start() {
svc.fetchTicker = time.NewTicker(svc.fetchInterval)
svc.reportTicker = time.NewTicker(svc.reportInterval)
// Initialize and start worker pool
// Get max workers from current node configuration
svc.maxWorkers = svc.getCurrentNodeMaxRunners()
// Initialize and start worker pool with dynamic max workers
svc.workerPool = NewTaskWorkerPool(svc.maxWorkers, svc)
svc.workerPool.Start()
@@ -77,7 +80,11 @@ func (svc *Service) Start() {
go svc.fetchAndRunTasks()
queueSize := cap(svc.workerPool.taskQueue)
svc.Infof("Task handler service started with %d workers and queue size %d", svc.maxWorkers, queueSize)
if svc.maxWorkers == -1 {
svc.Infof("Task handler service started with unlimited workers (from node config) and queue size %d", queueSize)
} else {
svc.Infof("Task handler service started with %d max workers (from node config) and queue size %d", svc.maxWorkers, queueSize)
}
// Start the stuck task cleanup routine (adds to WaitGroup internally)
svc.startStuckTaskCleanup()
@@ -277,6 +284,27 @@ func (svc *Service) GetNodeConfigService() (cfgSvc interfaces.NodeConfigService)
return svc.cfgSvc
}
func (svc *Service) getCurrentNodeMaxRunners() int {
n, err := svc.GetCurrentNode()
if err != nil {
svc.Errorf("failed to get current node for max runners: %v", err)
// Fallback to config default
return utils.GetNodeMaxRunners()
}
// If MaxRunners is 0, it means unlimited workers
if n.MaxRunners == 0 {
return -1 // Use -1 internally to represent unlimited
}
// If MaxRunners is negative (not set), use config default
if n.MaxRunners < 0 {
return utils.GetNodeMaxRunners()
}
return n.MaxRunners
}
func (svc *Service) GetCurrentNode() (n *models.Node, err error) {
// node key
nodeKey := svc.cfgSvc.GetNodeKey()
@@ -370,6 +398,29 @@ func (svc *Service) updateNodeStatus() (err error) {
return err
}
// Check if max runners configuration has changed and update worker pool
currentMaxWorkers := n.MaxRunners
// Handle unlimited workers (0 means unlimited)
if currentMaxWorkers == 0 {
currentMaxWorkers = -1 // Use -1 internally to represent unlimited
} else if currentMaxWorkers < 0 {
currentMaxWorkers = utils.GetNodeMaxRunners() // Use config default if not set (negative)
}
if currentMaxWorkers != svc.maxWorkers {
if currentMaxWorkers == -1 {
svc.Infof("Node max runners changed from %d to unlimited, updating worker pool", svc.maxWorkers)
} else if svc.maxWorkers == -1 {
svc.Infof("Node max runners changed from unlimited to %d, updating worker pool", currentMaxWorkers)
} else {
svc.Infof("Node max runners changed from %d to %d, updating worker pool", svc.maxWorkers, currentMaxWorkers)
}
svc.maxWorkers = currentMaxWorkers
if svc.workerPool != nil {
svc.workerPool.UpdateMaxWorkers(currentMaxWorkers)
}
}
// set available runners
n.CurrentRunners = svc.getRunnerCount()
@@ -501,7 +552,7 @@ func newTaskHandlerService() *Service {
fetchTimeout: 15 * time.Second,
reportInterval: 5 * time.Second,
cancelTimeout: 60 * time.Second,
maxWorkers: utils.GetTaskWorkers(), // Use configurable worker count
maxWorkers: utils.GetNodeMaxRunners(),
mu: sync.RWMutex{},
runners: sync.Map{},
Logger: utils.NewLogger("TaskHandlerService"),

View File

@@ -4,7 +4,9 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/crawlab-team/crawlab/core/utils"
"go.mongodb.org/mongo-driver/bson/primitive"
)
@@ -13,39 +15,65 @@ type taskRequest struct {
taskId primitive.ObjectID
}
// TaskWorkerPool manages a bounded pool of workers for task execution
// TaskWorkerPool manages a dynamic pool of workers for task execution
type TaskWorkerPool struct {
workers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
taskQueue chan taskRequest
service *Service
maxWorkers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
taskQueue chan taskRequest
service *Service
// Track active workers
activeWorkers int
workerMutex sync.RWMutex
}
func NewTaskWorkerPool(workers int, service *Service) *TaskWorkerPool {
func NewTaskWorkerPool(maxWorkers int, service *Service) *TaskWorkerPool {
// Use service context for proper cancellation chain
ctx, cancel := context.WithCancel(service.ctx)
// Use a more generous queue size to handle task bursts
// Queue size is workers * 5 to allow for better buffering
queueSize := workers * 5
if queueSize < 50 {
queueSize = 50 // Minimum queue size
var queueSize int
if maxWorkers == -1 {
// Unlimited workers - use configured queue size or larger default
configuredSize := utils.GetTaskQueueSize()
if configuredSize > 0 {
queueSize = configuredSize
} else {
queueSize = 1000 // Large default for unlimited workers
}
} else {
// Limited workers - use configured size or calculate based on max workers
configuredSize := utils.GetTaskQueueSize()
if configuredSize > 0 {
queueSize = configuredSize
} else {
// Use a more generous queue size to handle task bursts
// Queue size is maxWorkers * 5 to allow for better buffering
queueSize = maxWorkers * 5
if queueSize < 50 {
queueSize = 50 // Minimum queue size
}
}
}
return &TaskWorkerPool{
workers: workers,
ctx: ctx,
cancel: cancel,
taskQueue: make(chan taskRequest, queueSize),
service: service,
maxWorkers: maxWorkers,
ctx: ctx,
cancel: cancel,
taskQueue: make(chan taskRequest, queueSize),
service: service,
activeWorkers: 0,
workerMutex: sync.RWMutex{},
}
}
func (pool *TaskWorkerPool) Start() {
for i := 0; i < pool.workers; i++ {
pool.wg.Add(1)
go pool.worker(i)
// Don't pre-create workers - they will be created on-demand
if pool.maxWorkers == -1 {
pool.service.Debugf("Task worker pool started with unlimited workers")
} else {
pool.service.Debugf("Task worker pool started with max workers: %d", pool.maxWorkers)
}
}
@@ -64,30 +92,146 @@ func (pool *TaskWorkerPool) SubmitTask(taskId primitive.ObjectID) error {
case pool.taskQueue <- req:
pool.service.Debugf("task[%s] queued for parallel execution, queue usage: %d/%d",
taskId.Hex(), len(pool.taskQueue), cap(pool.taskQueue))
// Try to create a new worker if we haven't reached the limit
pool.maybeCreateWorker()
return nil // Return immediately - task will execute in parallel
case <-pool.ctx.Done():
return fmt.Errorf("worker pool is shutting down")
default:
queueLen := len(pool.taskQueue)
queueCap := cap(pool.taskQueue)
pool.service.Warnf("task queue is full (%d/%d), consider increasing task.workers configuration",
queueLen, queueCap)
return fmt.Errorf("task queue is full (%d/%d), consider increasing task.workers configuration",
queueLen, queueCap)
if pool.maxWorkers == -1 {
pool.service.Warnf("task queue is full (%d/%d), consider increasing system resources",
queueLen, queueCap)
return fmt.Errorf("task queue is full (%d/%d), consider increasing system resources",
queueLen, queueCap)
} else {
pool.service.Warnf("task queue is full (%d/%d), consider increasing node max_runners configuration",
queueLen, queueCap)
return fmt.Errorf("task queue is full (%d/%d), consider increasing node max_runners configuration",
queueLen, queueCap)
}
}
}
func (pool *TaskWorkerPool) UpdateMaxWorkers(newMaxWorkers int) {
pool.workerMutex.Lock()
defer pool.workerMutex.Unlock()
oldMax := pool.maxWorkers
pool.maxWorkers = newMaxWorkers
// Update queue size if needed
var needQueueResize bool
var newQueueSize int
configuredSize := utils.GetTaskQueueSize()
if newMaxWorkers == -1 {
// Unlimited workers
if configuredSize > 0 {
newQueueSize = configuredSize
} else {
newQueueSize = 1000
}
needQueueResize = newQueueSize > cap(pool.taskQueue)
} else if oldMax == -1 {
// From unlimited to limited
if configuredSize > 0 {
newQueueSize = configuredSize
} else {
newQueueSize = newMaxWorkers * 5
if newQueueSize < 50 {
newQueueSize = 50
}
}
needQueueResize = true // Always resize when going from unlimited to limited
} else if newMaxWorkers > oldMax {
// Increase queue capacity
if configuredSize > 0 {
newQueueSize = configuredSize
} else {
newQueueSize = newMaxWorkers * 5
if newQueueSize < 50 {
newQueueSize = 50
}
}
needQueueResize = newQueueSize > cap(pool.taskQueue)
}
if needQueueResize {
oldQueue := pool.taskQueue
pool.taskQueue = make(chan taskRequest, newQueueSize)
// Copy existing tasks to new queue
close(oldQueue)
for req := range oldQueue {
select {
case pool.taskQueue <- req:
default:
// If new queue is somehow full, log the issue but don't block
pool.service.Warnf("Lost task during queue resize: %s", req.taskId.Hex())
}
}
}
if oldMax == -1 && newMaxWorkers != -1 {
pool.service.Infof("Updated worker pool from unlimited to max workers: %d", newMaxWorkers)
} else if oldMax != -1 && newMaxWorkers == -1 {
pool.service.Infof("Updated worker pool from max workers %d to unlimited", oldMax)
} else {
pool.service.Infof("Updated worker pool max workers from %d to %d", oldMax, newMaxWorkers)
}
}
func (pool *TaskWorkerPool) maybeCreateWorker() {
pool.workerMutex.Lock()
defer pool.workerMutex.Unlock()
// Only create a worker if we have tasks queued and haven't reached the limit
// For unlimited workers (maxWorkers == -1), always create if there are queued tasks
hasQueuedTasks := len(pool.taskQueue) > 0
underLimit := pool.maxWorkers == -1 || pool.activeWorkers < pool.maxWorkers
if hasQueuedTasks && underLimit {
pool.activeWorkers++
workerID := pool.activeWorkers
pool.wg.Add(1)
go pool.worker(workerID)
if pool.maxWorkers == -1 {
pool.service.Debugf("created on-demand worker %d (unlimited workers mode, total active: %d)",
workerID, pool.activeWorkers)
} else {
pool.service.Debugf("created on-demand worker %d (total active: %d/%d)",
workerID, pool.activeWorkers, pool.maxWorkers)
}
}
}
func (pool *TaskWorkerPool) worker(workerID int) {
defer pool.wg.Done()
defer func() {
// Decrement active worker count when worker exits
pool.workerMutex.Lock()
pool.activeWorkers--
pool.workerMutex.Unlock()
if r := recover(); r != nil {
pool.service.Errorf("worker %d panic recovered: %v", workerID, r)
}
}()
pool.service.Debugf("worker %d started", workerID)
idleTimeout := 5 * time.Minute // Worker will exit after 5 minutes of idleness
timer := time.NewTimer(idleTimeout)
defer timer.Stop()
for {
timer.Reset(idleTimeout)
select {
case <-pool.ctx.Done():
pool.service.Debugf("worker %d shutting down", workerID)
@@ -108,6 +252,10 @@ func (pool *TaskWorkerPool) worker(workerID int) {
} else {
pool.service.Debugf("worker %d completed task[%s]", workerID, req.taskId.Hex())
}
case <-timer.C:
// Worker has been idle for too long, exit to save resources
pool.service.Debugf("worker %d exiting due to inactivity", workerID)
return
}
}
}

View File

@@ -28,8 +28,8 @@ const (
DefaultApiAllowHeaders = "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With"
DefaultApiPort = 8080
DefaultApiPath = "/api"
DefaultNodeMaxRunners = 0 // 0 means no limit
DefaultTaskWorkers = 30 // Default number of task workers
DefaultNodeMaxRunners = 20 // Default max concurrent task runners per node
DefaultTaskQueueSize = 100 // Default task queue size per node
DefaultInstallRoot = "/app/install"
DefaultInstallEnvs = ""
MetadataConfigDirName = ".crawlab"
@@ -242,11 +242,11 @@ func GetNodeMaxRunners() int {
return DefaultNodeMaxRunners
}
func GetTaskWorkers() int {
if res := viper.GetInt("task.workers"); res != 0 {
func GetTaskQueueSize() int {
if res := viper.GetInt("task.queue.size"); res != 0 {
return res
}
return DefaultTaskWorkers
return DefaultTaskQueueSize
}
func GetMetadataConfigPath() string {