Files
crawlab/core/task/handler/worker_pool.go

262 lines
7.3 KiB
Go

package handler
import (
"context"
"fmt"
"sync"
"time"
"github.com/crawlab-team/crawlab/core/utils"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// taskRequest represents a task execution request
type taskRequest struct {
taskId primitive.ObjectID
}
// TaskWorkerPool manages a dynamic pool of workers for task execution
type TaskWorkerPool struct {
maxWorkers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
taskQueue chan taskRequest
service *Service
// Track active workers
activeWorkers int
workerMutex sync.RWMutex
}
func NewTaskWorkerPool(maxWorkers int, service *Service) *TaskWorkerPool {
// Use service context for proper cancellation chain
ctx, cancel := context.WithCancel(service.ctx)
var queueSize int
if maxWorkers == -1 {
// Unlimited workers - use configured queue size or larger default
configuredSize := utils.GetTaskQueueSize()
if configuredSize > 0 {
queueSize = configuredSize
} else {
queueSize = 1000 // Large default for unlimited workers
}
} else {
// Limited workers - use configured size or calculate based on max workers
configuredSize := utils.GetTaskQueueSize()
if configuredSize > 0 {
queueSize = configuredSize
} else {
// Use a more generous queue size to handle task bursts
// Queue size is maxWorkers * 5 to allow for better buffering
queueSize = maxWorkers * 5
if queueSize < 50 {
queueSize = 50 // Minimum queue size
}
}
}
return &TaskWorkerPool{
maxWorkers: maxWorkers,
ctx: ctx,
cancel: cancel,
taskQueue: make(chan taskRequest, queueSize),
service: service,
activeWorkers: 0,
workerMutex: sync.RWMutex{},
}
}
func (pool *TaskWorkerPool) Start() {
// Don't pre-create workers - they will be created on-demand
if pool.maxWorkers == -1 {
pool.service.Debugf("Task worker pool started with unlimited workers")
} else {
pool.service.Debugf("Task worker pool started with max workers: %d", pool.maxWorkers)
}
}
func (pool *TaskWorkerPool) Stop() {
pool.cancel()
close(pool.taskQueue)
pool.wg.Wait()
}
func (pool *TaskWorkerPool) SubmitTask(taskId primitive.ObjectID) error {
req := taskRequest{
taskId: taskId,
}
select {
case pool.taskQueue <- req:
pool.service.Debugf("task[%s] queued for parallel execution, queue usage: %d/%d",
taskId.Hex(), len(pool.taskQueue), cap(pool.taskQueue))
// Try to create a new worker if we haven't reached the limit
pool.maybeCreateWorker()
return nil // Return immediately - task will execute in parallel
case <-pool.ctx.Done():
return fmt.Errorf("worker pool is shutting down")
default:
queueLen := len(pool.taskQueue)
queueCap := cap(pool.taskQueue)
if pool.maxWorkers == -1 {
pool.service.Warnf("task queue is full (%d/%d), consider increasing system resources",
queueLen, queueCap)
return fmt.Errorf("task queue is full (%d/%d), consider increasing system resources",
queueLen, queueCap)
} else {
pool.service.Warnf("task queue is full (%d/%d), consider increasing node max_runners configuration",
queueLen, queueCap)
return fmt.Errorf("task queue is full (%d/%d), consider increasing node max_runners configuration",
queueLen, queueCap)
}
}
}
func (pool *TaskWorkerPool) UpdateMaxWorkers(newMaxWorkers int) {
pool.workerMutex.Lock()
defer pool.workerMutex.Unlock()
oldMax := pool.maxWorkers
pool.maxWorkers = newMaxWorkers
// Update queue size if needed
var needQueueResize bool
var newQueueSize int
configuredSize := utils.GetTaskQueueSize()
if newMaxWorkers == -1 {
// Unlimited workers
if configuredSize > 0 {
newQueueSize = configuredSize
} else {
newQueueSize = 1000
}
needQueueResize = newQueueSize > cap(pool.taskQueue)
} else if oldMax == -1 {
// From unlimited to limited
if configuredSize > 0 {
newQueueSize = configuredSize
} else {
newQueueSize = newMaxWorkers * 5
if newQueueSize < 50 {
newQueueSize = 50
}
}
needQueueResize = true // Always resize when going from unlimited to limited
} else if newMaxWorkers > oldMax {
// Increase queue capacity
if configuredSize > 0 {
newQueueSize = configuredSize
} else {
newQueueSize = newMaxWorkers * 5
if newQueueSize < 50 {
newQueueSize = 50
}
}
needQueueResize = newQueueSize > cap(pool.taskQueue)
}
if needQueueResize {
oldQueue := pool.taskQueue
pool.taskQueue = make(chan taskRequest, newQueueSize)
// Copy existing tasks to new queue
close(oldQueue)
for req := range oldQueue {
select {
case pool.taskQueue <- req:
default:
// If new queue is somehow full, log the issue but don't block
pool.service.Warnf("Lost task during queue resize: %s", req.taskId.Hex())
}
}
}
if oldMax == -1 && newMaxWorkers != -1 {
pool.service.Infof("Updated worker pool from unlimited to max workers: %d", newMaxWorkers)
} else if oldMax != -1 && newMaxWorkers == -1 {
pool.service.Infof("Updated worker pool from max workers %d to unlimited", oldMax)
} else {
pool.service.Infof("Updated worker pool max workers from %d to %d", oldMax, newMaxWorkers)
}
}
func (pool *TaskWorkerPool) maybeCreateWorker() {
pool.workerMutex.Lock()
defer pool.workerMutex.Unlock()
// Only create a worker if we have tasks queued and haven't reached the limit
// For unlimited workers (maxWorkers == -1), always create if there are queued tasks
hasQueuedTasks := len(pool.taskQueue) > 0
underLimit := pool.maxWorkers == -1 || pool.activeWorkers < pool.maxWorkers
if hasQueuedTasks && underLimit {
pool.activeWorkers++
workerID := pool.activeWorkers
pool.wg.Add(1)
go pool.worker(workerID)
if pool.maxWorkers == -1 {
pool.service.Debugf("created on-demand worker %d (unlimited workers mode, total active: %d)",
workerID, pool.activeWorkers)
} else {
pool.service.Debugf("created on-demand worker %d (total active: %d/%d)",
workerID, pool.activeWorkers, pool.maxWorkers)
}
}
}
func (pool *TaskWorkerPool) worker(workerID int) {
defer pool.wg.Done()
defer func() {
// Decrement active worker count when worker exits
pool.workerMutex.Lock()
pool.activeWorkers--
pool.workerMutex.Unlock()
if r := recover(); r != nil {
pool.service.Errorf("worker %d panic recovered: %v", workerID, r)
}
}()
pool.service.Debugf("worker %d started", workerID)
idleTimeout := 5 * time.Minute // Worker will exit after 5 minutes of idleness
timer := time.NewTimer(idleTimeout)
defer timer.Stop()
for {
timer.Reset(idleTimeout)
select {
case <-pool.ctx.Done():
pool.service.Debugf("worker %d shutting down", workerID)
return
case req, ok := <-pool.taskQueue:
if !ok {
pool.service.Debugf("worker %d: task queue closed", workerID)
return
}
// Execute task asynchronously - each worker handles one task at a time
// but multiple workers can process different tasks simultaneously
pool.service.Debugf("worker %d processing task[%s]", workerID, req.taskId.Hex())
err := pool.service.executeTask(req.taskId)
if err != nil {
pool.service.Errorf("worker %d failed to execute task[%s]: %v",
workerID, req.taskId.Hex(), err)
} else {
pool.service.Debugf("worker %d completed task[%s]", workerID, req.taskId.Hex())
}
case <-timer.C:
// Worker has been idle for too long, exit to save resources
pool.service.Debugf("worker %d exiting due to inactivity", workerID)
return
}
}
}