diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index 2c1c16fc..2bcd20d3 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -3,27 +3,18 @@ package handler import ( "bufio" "context" - "encoding/json" "errors" "fmt" "io" - "net/http" - "net/url" - "os" "os/exec" "path/filepath" - "runtime" - "strings" "sync" - "syscall" "time" "github.com/crawlab-team/crawlab/core/dependency" "github.com/crawlab-team/crawlab/core/fs" - "github.com/hashicorp/go-multierror" - "github.com/shirou/gopsutil/process" - "github.com/crawlab-team/crawlab/core/models/models" + "github.com/hashicorp/go-multierror" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/entity" @@ -379,75 +370,6 @@ func (r *Runner) GetTaskId() (id primitive.ObjectID) { return r.tid } -// configureCmd builds and configures the command to be executed, including setting up IPC pipes -// and processing command parameters -func (r *Runner) configureCmd() (err error) { - var cmdStr string - - // command - if r.t.Cmd == "" { - cmdStr = r.s.Cmd - } else { - cmdStr = r.t.Cmd - } - - // parameters - if r.t.Param != "" { - cmdStr += " " + r.t.Param - } else if r.s.Param != "" { - cmdStr += " " + r.s.Param - } - - // get cmd instance - r.cmd, err = utils.BuildCmd(cmdStr) - if err != nil { - r.Errorf("error building command: %v", err) - return err - } - - // set working directory - r.cmd.Dir = r.cwd - - // ZOMBIE PREVENTION: Set process group to enable proper cleanup of child processes - if runtime.GOOS != "windows" { - // Create new process group on Unix systems to ensure child processes can be killed together - r.cmd.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, // Create new process group - Pgid: 0, // Use process ID as process group ID - } - } - - // Configure pipes for IPC and logs - r.stdinPipe, err = r.cmd.StdinPipe() - if err != nil { - r.Errorf("error creating stdin pipe: %v", err) - return err - } - - // Add stdout pipe for IPC and logs - r.stdoutPipe, err = r.cmd.StdoutPipe() - if err != nil { - r.Errorf("error creating stdout pipe: %v", err) - return err - } - - // Add stderr pipe for error logs - stderrPipe, err := r.cmd.StderrPipe() - if err != nil { - r.Errorf("error creating stderr pipe: %v", err) - return err - } - - // Create buffered readers - r.readerStdout = bufio.NewReader(r.stdoutPipe) - r.readerStderr = bufio.NewReader(stderrPipe) - - // Initialize IPC channel - r.ipcChan = make(chan entity.IPCMessage) - - return nil -} - // startHealthCheck periodically verifies that the process is still running // If the process disappears unexpectedly, it signals a task lost condition func (r *Runner) startHealthCheck() { @@ -475,285 +397,6 @@ func (r *Runner) startHealthCheck() { } } -// configurePythonPath sets up the Python environment paths, handling both pyenv and default installations -func (r *Runner) configurePythonPath() { - // Configure global node_modules path - pyenvRoot := utils.GetPyenvPath() - pyenvShimsPath := pyenvRoot + "/shims" - pyenvBinPath := pyenvRoot + "/bin" - - // Configure global pyenv path - _ = os.Setenv("PYENV_ROOT", pyenvRoot) - _ = os.Setenv("PATH", pyenvShimsPath+":"+os.Getenv("PATH")) - _ = os.Setenv("PATH", pyenvBinPath+":"+os.Getenv("PATH")) -} - -// configureNodePath sets up the Node.js environment paths, handling both nvm and default installations -func (r *Runner) configureNodePath() { - // Configure nvm-based Node.js paths - envPath := os.Getenv("PATH") - - // Configure global node_modules path - nodePath := utils.GetNodeModulesPath() - if !strings.Contains(envPath, nodePath) { - _ = os.Setenv("PATH", nodePath+":"+envPath) - } - _ = os.Setenv("NODE_PATH", nodePath) - - // Configure global node_bin path - nodeBinPath := utils.GetNodeBinPath() - if !strings.Contains(envPath, nodeBinPath) { - _ = os.Setenv("PATH", nodeBinPath+":"+envPath) - } -} - -func (r *Runner) configureGoPath() { - // Configure global go path - goPath := utils.GetGoPath() - if goPath != "" { - _ = os.Setenv("GOPATH", goPath) - } -} - -// configureEnv sets up the environment variables for the task process, including: -// - Node.js paths -// - Crawlab-specific variables -// - Global environment variables from the system -func (r *Runner) configureEnv() { - // Configure Python path - r.configurePythonPath() - - // Configure Node.js path - r.configureNodePath() - - // Configure Go path - r.configureGoPath() - - // Default envs - r.cmd.Env = os.Environ() - - // Remove CRAWLAB_ prefixed environment variables - for i := 0; i < len(r.cmd.Env); i++ { - if strings.HasPrefix(r.cmd.Env[i], "CRAWLAB_") { - r.cmd.Env = append(r.cmd.Env[:i], r.cmd.Env[i+1:]...) - i-- - } - } - - // Task-specific environment variables - r.cmd.Env = append(r.cmd.Env, "CRAWLAB_TASK_ID="+r.tid.Hex()) - - // Global environment variables - envs, err := client.NewModelService[models.Environment]().GetMany(nil, nil) - if err != nil { - r.Errorf("error getting environment variables: %v", err) - return - } - for _, env := range envs { - r.cmd.Env = append(r.cmd.Env, env.Key+"="+env.Value) - } - - // Add environment variable for child processes to identify they're running under Crawlab - r.cmd.Env = append(r.cmd.Env, "CRAWLAB_PARENT_PID="+fmt.Sprint(os.Getpid())) -} - -func (r *Runner) performHttpRequest(method, path string, params url.Values) (*http.Response, error) { - // Normalize path - path = strings.TrimPrefix(path, "/") - - // Construct master URL - var id string - if r.s.GitId.IsZero() { - id = r.s.Id.Hex() - } else { - id = r.s.GitId.Hex() - } - requestUrl := fmt.Sprintf("%s/sync/%s/%s?%s", utils.GetApiEndpoint(), id, path, params.Encode()) - - // Create and execute request - req, err := http.NewRequest(method, requestUrl, nil) - if err != nil { - return nil, fmt.Errorf("error creating request: %v", err) - } - - for k, v := range r.getHttpRequestHeaders() { - req.Header.Set(k, v) - } - - return http.DefaultClient.Do(req) -} - -// syncFiles synchronizes files between master and worker nodes: -// 1. Gets file list from master -// 2. Compares with local files -// 3. Downloads new/modified files -// 4. Deletes files that no longer exist on master -func (r *Runner) syncFiles() (err error) { - r.Infof("starting file synchronization for spider: %s", r.s.Id.Hex()) - - workingDir := "" - if !r.s.GitId.IsZero() { - workingDir = r.s.GitRootPath - r.Debugf("using git root path: %s", workingDir) - } - - // get file list from master - r.Infof("fetching file list from master node") - params := url.Values{ - "path": []string{workingDir}, - } - resp, err := r.performHttpRequest("GET", "/scan", params) - if err != nil { - r.Errorf("error getting file list from master: %v", err) - return err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - r.Errorf("error reading response body: %v", err) - return err - } - var response struct { - Data entity.FsFileInfoMap `json:"data"` - } - err = json.Unmarshal(body, &response) - if err != nil { - r.Errorf("error unmarshaling JSON for URL: %s", resp.Request.URL.String()) - r.Errorf("error details: %v", err) - return err - } - - // create a map for master files - masterFilesMap := make(entity.FsFileInfoMap) - for _, file := range response.Data { - masterFilesMap[file.Path] = file - } - - // create working directory if not exists - if _, err := os.Stat(r.cwd); os.IsNotExist(err) { - if err := os.MkdirAll(r.cwd, os.ModePerm); err != nil { - r.Errorf("error creating worker directory: %v", err) - return err - } - } - - // get file list from worker - workerFiles, err := utils.ScanDirectory(r.cwd) - if err != nil { - r.Errorf("error scanning worker directory: %v", err) - return err - } - - // delete files that are deleted on master node - for path, workerFile := range workerFiles { - if _, exists := masterFilesMap[path]; !exists { - r.Infof("deleting file: %s", path) - err := os.Remove(workerFile.FullPath) - if err != nil { - r.Errorf("error deleting file: %v", err) - } - } - } - - // set up wait group and error channel - var wg sync.WaitGroup - pool := make(chan struct{}, 10) - - // download files that are new or modified on master node - for path, masterFile := range masterFilesMap { - workerFile, exists := workerFiles[path] - if !exists || masterFile.Hash != workerFile.Hash { - wg.Add(1) - - // acquire token - pool <- struct{}{} - - // start goroutine to synchronize file or directory - go func(path string, masterFile *entity.FsFileInfo) { - defer wg.Done() - - if masterFile.IsDir { - r.Infof("directory needs to be synchronized: %s", path) - _err := os.MkdirAll(filepath.Join(r.cwd, path), masterFile.Mode) - if _err != nil { - r.Errorf("error creating directory: %v", _err) - err = errors.Join(err, _err) - } - } else { - r.Infof("file needs to be synchronized: %s", path) - _err := r.downloadFile(path, filepath.Join(r.cwd, path), masterFile) - if _err != nil { - r.Errorf("error downloading file: %v", _err) - err = errors.Join(err, _err) - } - } - - // release token - <-pool - - }(path, &masterFile) - } - } - - // wait for all files and directories to be synchronized - wg.Wait() - - r.Infof("file synchronization completed successfully") - return err -} - -// downloadFile downloads a file from the master node and saves it to the local file system -func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsFileInfo) error { - r.Debugf("downloading file: %s -> %s", path, filePath) - params := url.Values{ - "path": []string{path}, - } - resp, err := r.performHttpRequest("GET", "/download", params) - if err != nil { - r.Errorf("error getting file response: %v", err) - return err - } - if resp.StatusCode != http.StatusOK { - r.Errorf("error downloading file: %s", resp.Status) - return errors.New(resp.Status) - } - defer resp.Body.Close() - - // create directory if not exists - dirPath := filepath.Dir(filePath) - utils.Exists(dirPath) - err = os.MkdirAll(dirPath, os.ModePerm) - if err != nil { - r.Errorf("error creating directory: %v", err) - return err - } - - // create local file - out, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fileInfo.Mode) - if err != nil { - r.Errorf("error creating file: %v", err) - return err - } - defer out.Close() - - // copy file content to local file - _, err = io.Copy(out, resp.Body) - if err != nil { - r.Errorf("error copying file: %v", err) - return err - } - - r.Debugf("successfully downloaded file: %s (size: %d bytes)", path, fileInfo.FileSize) - return nil -} - -// getHttpRequestHeaders returns the headers for HTTP requests to the master node -func (r *Runner) getHttpRequestHeaders() (headers map[string]string) { - headers = make(map[string]string) - headers["Authorization"] = utils.GetAuthKey() - return headers -} - // wait monitors the process execution and sends appropriate signals based on the exit status: // - TaskSignalFinish for successful completion // - TaskSignalCancel for cancellation @@ -862,8 +505,8 @@ func (r *Runner) updateTask(status string, e error) (err error) { } // update stats - r._updateTaskStat(status) - r._updateSpiderStat(status) + r.updateTaskStat(status) + r.updateSpiderStat(status) // send notification go r.sendNotification() @@ -1030,100 +673,10 @@ func (r *Runner) reconnectWithRetry() error { return fmt.Errorf("failed to reconnect after %d attempts", r.maxConnRetries) } -// performPeriodicCleanup runs periodic cleanup for all tasks -func (r *Runner) performPeriodicCleanup() { - r.wg.Add(1) - defer r.wg.Done() - - // Cleanup every 10 minutes for all tasks - r.resourceCleanup = time.NewTicker(10 * time.Minute) - defer r.resourceCleanup.Stop() - - for { - select { - case <-r.ctx.Done(): - return - case <-r.resourceCleanup.C: - r.runPeriodicCleanup() - } - } -} - -// runPeriodicCleanup performs memory and resource cleanup -func (r *Runner) runPeriodicCleanup() { - r.Debugf("performing periodic cleanup for task") - - // Force garbage collection for memory management - runtime.GC() - - // Log current resource usage - var m runtime.MemStats - runtime.ReadMemStats(&m) - r.Debugf("memory usage - alloc: %d KB, sys: %d KB, num_gc: %d", - m.Alloc/1024, m.Sys/1024, m.NumGC) - - // Check if IPC channel is getting full - if r.ipcChan != nil { - select { - case <-r.ipcChan: - r.Debugf("drained stale IPC message during cleanup") - default: - // Channel is not full, good - } - } -} - -// writeLogLines marshals log lines to JSON and sends them to the task service -// Uses connection-safe approach for robust task execution -func (r *Runner) writeLogLines(lines []string) { - // Check if context is cancelled or connection is closed - select { - case <-r.ctx.Done(): - return - default: - } - - // Use connection with mutex for thread safety - r.connMutex.RLock() - conn := r.conn - r.connMutex.RUnlock() - - // Check if connection is available - if conn == nil { - r.Debugf("no connection available for sending log lines") - return - } - - linesBytes, err := json.Marshal(lines) - if err != nil { - r.Errorf("error marshaling log lines: %v", err) - return - } - - msg := &grpc.TaskServiceConnectRequest{ - Code: grpc.TaskServiceConnectCode_INSERT_LOGS, - TaskId: r.tid.Hex(), - Data: linesBytes, - } - - if err := conn.Send(msg); err != nil { - // Don't log errors if context is cancelled (expected during shutdown) - select { - case <-r.ctx.Done(): - return - default: - r.Errorf("error sending log lines: %v", err) - // Mark connection as unhealthy for reconnection - r.lastConnCheck = time.Time{} - } - return - } -} - -// _updateTaskStat updates task statistics based on the current status: +// updateTaskStat updates task statistics based on the current status: // - For running tasks: sets start time and wait duration // - For completed tasks: sets end time and calculates durations -func (r *Runner) _updateTaskStat(status string) { +func (r *Runner) updateTaskStat(status string) { if status != "" { r.Debugf("updating task statistics for status: %s", status) } @@ -1184,11 +737,11 @@ func (r *Runner) sendNotification() { } } -// _updateSpiderStat updates spider statistics based on task completion: +// updateSpiderStat updates spider statistics based on task completion: // - Updates last task ID // - Increments task counts // - Updates duration metrics -func (r *Runner) _updateSpiderStat(status string) { +func (r *Runner) updateSpiderStat(status string) { // task stat ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid) if err != nil { @@ -1241,208 +794,6 @@ func (r *Runner) _updateSpiderStat(status string) { } } -// configureCwd sets the working directory for the task based on the spider's configuration -func (r *Runner) configureCwd() { - workspacePath := utils.GetWorkspace() - if r.s.GitId.IsZero() { - // not git - r.cwd = filepath.Join(workspacePath, r.s.Id.Hex()) - } else { - // git - r.cwd = filepath.Join(workspacePath, r.s.GitId.Hex(), r.s.GitRootPath) - } -} - -// handleIPC processes incoming IPC messages from the child process -// Messages are converted to JSON and written to the child process's stdin -func (r *Runner) handleIPC() { - r.wg.Add(1) - defer r.wg.Done() - - for msg := range r.ipcChan { - // Convert message to JSON - jsonData, err := json.Marshal(msg) - if err != nil { - r.Errorf("error marshaling IPC message: %v", err) - continue - } - - // Write to child process's stdin - _, err = fmt.Fprintln(r.stdinPipe, string(jsonData)) - if err != nil { - r.Errorf("error writing to child process: %v", err) - } - } -} - -// SetIPCHandler sets the handler for incoming IPC messages -func (r *Runner) SetIPCHandler(handler func(entity.IPCMessage)) { - r.ipcHandler = handler -} - -// startIPCReader continuously reads IPC messages from the child process's stdout -// Messages are parsed and either handled by the IPC handler or written to logs -func (r *Runner) startIPCReader() { - r.wg.Add(2) // Add 2 to wait group for both stdout and stderr readers - - // Start stdout reader - go func() { - defer r.wg.Done() - r.readOutput(r.readerStdout, true) // true for stdout - }() - - // Start stderr reader - go func() { - defer r.wg.Done() - r.readOutput(r.readerStderr, false) // false for stderr - }() -} - -func (r *Runner) readOutput(reader *bufio.Reader, isStdout bool) { - scanner := bufio.NewScanner(reader) - for { - select { - case <-r.ctx.Done(): - // Context cancelled, stop reading - return - default: - // Scan the next line - if !scanner.Scan() { - return - } - - // Get the line - line := scanner.Text() - - // Trim the line - line = strings.TrimRight(line, "\n\r") - - // For stdout, try to parse as IPC message first - if isStdout { - var ipcMsg entity.IPCMessage - if err := json.Unmarshal([]byte(line), &ipcMsg); err == nil && ipcMsg.IPC { - if r.ipcHandler != nil { - r.ipcHandler(ipcMsg) - } else { - // Default handler (insert data) - if ipcMsg.Type == "" || ipcMsg.Type == constants.IPCMessageData { - r.handleIPCInsertDataMessage(ipcMsg) - } else { - r.Warnf("no IPC handler set for message: %v", ipcMsg) - } - } - continue - } - } - - // If not an IPC message or from stderr, treat as log - r.writeLogLines([]string{line}) - } - } -} - -// handleIPCInsertDataMessage converts the IPC message payload to JSON and sends it to the master node -func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { - if ipcMsg.Payload == nil { - r.Errorf("empty payload in IPC message") - return - } - - // Convert payload to data to be inserted - var records []map[string]interface{} - - switch payload := ipcMsg.Payload.(type) { - case []interface{}: // Handle array of objects - records = make([]map[string]interface{}, 0, len(payload)) - for i, item := range payload { - if itemMap, ok := item.(map[string]interface{}); ok { - records = append(records, itemMap) - } else { - r.Errorf("invalid record at index %d: %v", i, item) - continue - } - } - case []map[string]interface{}: // Handle direct array of maps - records = payload - case map[string]interface{}: // Handle single object - records = []map[string]interface{}{payload} - case interface{}: // Handle generic interface - if itemMap, ok := payload.(map[string]interface{}); ok { - records = []map[string]interface{}{itemMap} - } else { - r.Errorf("invalid payload type: %T", payload) - return - } - default: - r.Errorf("unsupported payload type: %T, value: %v", payload, ipcMsg.Payload) - return - } - - // Validate records - if len(records) == 0 { - r.Warnf("no valid records to insert") - return - } - - // Marshal data with error handling - data, err := json.Marshal(records) - if err != nil { - r.Errorf("error marshaling records: %v", err) - return - } - - // Check if context is cancelled or connection is closed - select { - case <-r.ctx.Done(): - return - default: - } - - // Use connection with mutex for thread safety - r.connMutex.RLock() - conn := r.conn - r.connMutex.RUnlock() - - // Validate connection - if conn == nil { - r.Errorf("gRPC connection not initialized") - return - } - - // Send IPC message to master with context and timeout - ctx, cancel := context.WithTimeout(context.Background(), r.ipcTimeout) - defer cancel() - - // Create gRPC message - grpcMsg := &grpc.TaskServiceConnectRequest{ - Code: grpc.TaskServiceConnectCode_INSERT_DATA, - TaskId: r.tid.Hex(), - Data: data, - } - - // Use context for sending - select { - case <-ctx.Done(): - r.Errorf("timeout sending IPC message") - return - case <-r.ctx.Done(): - return - default: - if err := conn.Send(grpcMsg); err != nil { - // Don't log errors if context is cancelled (expected during shutdown) - select { - case <-r.ctx.Done(): - return - default: - r.Errorf("error sending IPC message: %v", err) - // Mark connection as unhealthy for reconnection - r.lastConnCheck = time.Time{} - } - return - } - } -} - func (r *Runner) installDependenciesIfAvailable() (err error) { if !utils.IsPro() { return nil @@ -1526,81 +877,6 @@ func (r *Runner) installDependenciesIfAvailable() (err error) { return nil } -// logInternally sends internal runner logs to the same logging system as the task -func (r *Runner) logInternally(level string, message string) { - // Format the internal log with a prefix - timestamp := time.Now().Local().Format("2006-01-02 15:04:05") - - // Pad level - level = fmt.Sprintf("%-5s", level) - - // Format the log message - internalLog := fmt.Sprintf("%s [%s] [%s] %s", level, timestamp, "Crawlab", message) - - // Send to the same log system as task logs - // Only send if context is not cancelled and connection is available - if r.conn != nil { - select { - case <-r.ctx.Done(): - // Context cancelled, don't send logs - default: - go r.writeLogLines([]string{internalLog}) - } - } - - // Also log through the standard logger - switch level { - case "ERROR": - r.Logger.Error(message) - case "WARN": - r.Logger.Warn(message) - case "INFO": - r.Logger.Info(message) - case "DEBUG": - r.Logger.Debug(message) - } -} - -func (r *Runner) Error(message string) { - msg := fmt.Sprintf(message) - r.logInternally("ERROR", msg) -} - -func (r *Runner) Warn(message string) { - msg := fmt.Sprintf(message) - r.logInternally("WARN", msg) -} - -func (r *Runner) Info(message string) { - msg := fmt.Sprintf(message) - r.logInternally("INFO", msg) -} - -func (r *Runner) Debug(message string) { - msg := fmt.Sprintf(message) - r.logInternally("DEBUG", msg) -} - -func (r *Runner) Errorf(format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) - r.logInternally("ERROR", msg) -} - -func (r *Runner) Warnf(format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) - r.logInternally("WARN", msg) -} - -func (r *Runner) Infof(format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) - r.logInternally("INFO", msg) -} - -func (r *Runner) Debugf(format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) - r.logInternally("DEBUG", msg) -} - // GetConnectionStats returns connection health statistics for monitoring func (r *Runner) GetConnectionStats() map[string]interface{} { r.connMutex.RLock() @@ -1614,149 +890,3 @@ func (r *Runner) GetConnectionStats() map[string]interface{} { "connection_exists": r.conn != nil, } } - -// ZOMBIE PROCESS PREVENTION METHODS - -// cleanupOrphanedProcesses attempts to clean up any orphaned processes related to this task -func (r *Runner) cleanupOrphanedProcesses() { - r.Warnf("cleaning up orphaned processes for task %s (PID: %d)", r.tid.Hex(), r.pid) - - if r.pid <= 0 { - r.Debugf("no PID to clean up") - return - } - - // Try to kill the process group if it exists - if runtime.GOOS != "windows" { - r.killProcessGroup() - } - - // Force kill the main process if it still exists - if utils.ProcessIdExists(r.pid) { - r.Warnf("forcefully killing remaining process %d", r.pid) - if r.cmd != nil && r.cmd.Process != nil { - if err := utils.KillProcess(r.cmd, true); err != nil { - r.Errorf("failed to force kill process: %v", err) - } - } - } - - // Scan for any remaining child processes and kill them - r.scanAndKillChildProcesses() -} - -// killProcessGroup kills the entire process group on Unix systems -func (r *Runner) killProcessGroup() { - if r.pid <= 0 { - return - } - - r.Debugf("attempting to kill process group for PID %d", r.pid) - - // Kill the process group (negative PID kills the group) - err := syscall.Kill(-r.pid, syscall.SIGTERM) - if err != nil { - r.Debugf("failed to send SIGTERM to process group: %v", err) - // Try SIGKILL as last resort - err = syscall.Kill(-r.pid, syscall.SIGKILL) - if err != nil { - r.Debugf("failed to send SIGKILL to process group: %v", err) - } - } else { - r.Debugf("successfully sent SIGTERM to process group %d", r.pid) - } -} - -// scanAndKillChildProcesses scans for and kills any remaining child processes -func (r *Runner) scanAndKillChildProcesses() { - r.Debugf("scanning for orphaned child processes of task %s", r.tid.Hex()) - - processes, err := utils.GetProcesses() - if err != nil { - r.Errorf("failed to get process list: %v", err) - return - } - - taskIdEnv := "CRAWLAB_TASK_ID=" + r.tid.Hex() - killedCount := 0 - - for _, proc := range processes { - // Check if this process has our task ID in its environment - if r.isTaskRelatedProcess(proc, taskIdEnv) { - pid := int(proc.Pid) - r.Warnf("found orphaned task process PID %d, killing it", pid) - - // Kill the orphaned process - if err := proc.Kill(); err != nil { - r.Errorf("failed to kill orphaned process %d: %v", pid, err) - } else { - killedCount++ - r.Infof("successfully killed orphaned process %d", pid) - } - } - } - - if killedCount > 0 { - r.Infof("cleaned up %d orphaned processes for task %s", killedCount, r.tid.Hex()) - } else { - r.Debugf("no orphaned processes found for task %s", r.tid.Hex()) - } -} - -// isTaskRelatedProcess checks if a process is related to this task -func (r *Runner) isTaskRelatedProcess(proc *process.Process, taskIdEnv string) bool { - // Get process environment variables - environ, err := proc.Environ() - if err != nil { - // If we can't read environment, skip this process - return false - } - - // Check if this process has our task ID - for _, env := range environ { - if env == taskIdEnv { - return true - } - } - - return false -} - -// startZombieMonitor starts a background goroutine to monitor for zombie processes -func (r *Runner) startZombieMonitor() { - r.wg.Add(1) - go func() { - defer r.wg.Done() - - // Check for zombies every 5 minutes - ticker := time.NewTicker(5 * time.Minute) - defer ticker.Stop() - - for { - select { - case <-r.ctx.Done(): - return - case <-ticker.C: - r.checkForZombieProcesses() - } - } - }() -} - -// checkForZombieProcesses periodically checks for and cleans up zombie processes -func (r *Runner) checkForZombieProcesses() { - r.Debugf("checking for zombie processes related to task %s", r.tid.Hex()) - - // Check if our main process still exists and is in the expected state - if r.pid > 0 && utils.ProcessIdExists(r.pid) { - // Process exists, check if it's a zombie - if proc, err := process.NewProcess(int32(r.pid)); err == nil { - if status, err := proc.Status(); err == nil { - if status == "Z" || status == "zombie" { - r.Warnf("detected zombie process %d for task %s", r.pid, r.tid.Hex()) - go r.cleanupOrphanedProcesses() - } - } - } - } -} diff --git a/core/task/handler/runner_cleanup.go b/core/task/handler/runner_cleanup.go new file mode 100644 index 00000000..94c66e59 --- /dev/null +++ b/core/task/handler/runner_cleanup.go @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2025. Core Digital Limited + * 版权所有 (c) 2025. 重庆科锐数研科技有限公司 (Core Digital Limited) + * All rights reserved. 保留所有权利。 + * + * 该软件由 重庆科锐数研科技有限公司 (Core Digital Limited) 开发,未经明确书面许可,任何人不得使用、复制、修改或分发该软件的任何部分。 + * This software is developed by Core Digital Limited. No one is permitted to use, copy, modify, or distribute this software without explicit written permission. + * + * 许可证: + * 该软件仅供授权使用。授权用户有权在授权范围内使用、复制、修改和分发该软件。 + * License: + * This software is for authorized use only. Authorized users are permitted to use, copy, modify, and distribute this software within the scope of their authorization. + * + * 免责声明: + * 该软件按"原样"提供,不附带任何明示或暗示的担保,包括但不限于对适销性和适用于特定目的的担保。在任何情况下,版权持有者或其许可方对因使用该软件而产生的任何损害或其他责任概不负责。 + * Disclaimer: + * This software is provided "as is," without any express or implied warranties, including but not limited to warranties of merchantability and fitness for a particular purpose. In no event shall the copyright holder or its licensors be liable for any damages or other liability arising from the use of this software. + */ + +package handler + +import ( + "runtime" + "syscall" + "time" + + "github.com/crawlab-team/crawlab/core/utils" + "github.com/shirou/gopsutil/process" +) + +// cleanupOrphanedProcesses attempts to clean up any orphaned processes related to this task +func (r *Runner) cleanupOrphanedProcesses() { + r.Warnf("cleaning up orphaned processes for task %s (PID: %d)", r.tid.Hex(), r.pid) + + if r.pid <= 0 { + r.Debugf("no PID to clean up") + return + } + + // Try to kill the process group if it exists + if runtime.GOOS != "windows" { + r.killProcessGroup() + } + + // Force kill the main process if it still exists + if utils.ProcessIdExists(r.pid) { + r.Warnf("forcefully killing remaining process %d", r.pid) + if r.cmd != nil && r.cmd.Process != nil { + if err := utils.KillProcess(r.cmd, true); err != nil { + r.Errorf("failed to force kill process: %v", err) + } + } + } + + // Scan for any remaining child processes and kill them + r.scanAndKillChildProcesses() +} + +// killProcessGroup kills the entire process group on Unix systems +func (r *Runner) killProcessGroup() { + if r.pid <= 0 { + return + } + + r.Debugf("attempting to kill process group for PID %d", r.pid) + + // Kill the process group (negative PID kills the group) + err := syscall.Kill(-r.pid, syscall.SIGTERM) + if err != nil { + r.Debugf("failed to send SIGTERM to process group: %v", err) + // Try SIGKILL as last resort + err = syscall.Kill(-r.pid, syscall.SIGKILL) + if err != nil { + r.Debugf("failed to send SIGKILL to process group: %v", err) + } + } else { + r.Debugf("successfully sent SIGTERM to process group %d", r.pid) + } +} + +// scanAndKillChildProcesses scans for and kills any remaining child processes +func (r *Runner) scanAndKillChildProcesses() { + r.Debugf("scanning for orphaned child processes of task %s", r.tid.Hex()) + + processes, err := utils.GetProcesses() + if err != nil { + r.Errorf("failed to get process list: %v", err) + return + } + + taskIdEnv := "CRAWLAB_TASK_ID=" + r.tid.Hex() + killedCount := 0 + + for _, proc := range processes { + // Check if this process has our task ID in its environment + if r.isTaskRelatedProcess(proc, taskIdEnv) { + pid := int(proc.Pid) + r.Warnf("found orphaned task process PID %d, killing it", pid) + + // Kill the orphaned process + if err := proc.Kill(); err != nil { + r.Errorf("failed to kill orphaned process %d: %v", pid, err) + } else { + killedCount++ + r.Infof("successfully killed orphaned process %d", pid) + } + } + } + + if killedCount > 0 { + r.Infof("cleaned up %d orphaned processes for task %s", killedCount, r.tid.Hex()) + } else { + r.Debugf("no orphaned processes found for task %s", r.tid.Hex()) + } +} + +// isTaskRelatedProcess checks if a process is related to this task +func (r *Runner) isTaskRelatedProcess(proc *process.Process, taskIdEnv string) bool { + // Get process environment variables + environ, err := proc.Environ() + if err != nil { + // If we can't read environment, skip this process + return false + } + + // Check if this process has our task ID + for _, env := range environ { + if env == taskIdEnv { + return true + } + } + + return false +} + +// startZombieMonitor starts a background goroutine to monitor for zombie processes +func (r *Runner) startZombieMonitor() { + r.wg.Add(1) + go func() { + defer r.wg.Done() + + // Check for zombies every 5 minutes + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-r.ctx.Done(): + return + case <-ticker.C: + r.checkForZombieProcesses() + } + } + }() +} + +// checkForZombieProcesses periodically checks for and cleans up zombie processes +func (r *Runner) checkForZombieProcesses() { + r.Debugf("checking for zombie processes related to task %s", r.tid.Hex()) + + // Check if our main process still exists and is in the expected state + if r.pid > 0 && utils.ProcessIdExists(r.pid) { + // Process exists, check if it's a zombie + if proc, err := process.NewProcess(int32(r.pid)); err == nil { + if status, err := proc.Status(); err == nil { + if status == "Z" || status == "zombie" { + r.Warnf("detected zombie process %d for task %s", r.pid, r.tid.Hex()) + go r.cleanupOrphanedProcesses() + } + } + } + } +} + +// performPeriodicCleanup runs periodic cleanup for all tasks +func (r *Runner) performPeriodicCleanup() { + r.wg.Add(1) + defer r.wg.Done() + + // Cleanup every 10 minutes for all tasks + r.resourceCleanup = time.NewTicker(10 * time.Minute) + defer r.resourceCleanup.Stop() + + for { + select { + case <-r.ctx.Done(): + return + case <-r.resourceCleanup.C: + r.runPeriodicCleanup() + } + } +} + +// runPeriodicCleanup performs memory and resource cleanup +func (r *Runner) runPeriodicCleanup() { + r.Debugf("performing periodic cleanup for task") + + // Force garbage collection for memory management + runtime.GC() + + // Log current resource usage + var m runtime.MemStats + runtime.ReadMemStats(&m) + r.Debugf("memory usage - alloc: %d KB, sys: %d KB, num_gc: %d", + m.Alloc/1024, m.Sys/1024, m.NumGC) + + // Check if IPC channel is getting full + if r.ipcChan != nil { + select { + case <-r.ipcChan: + r.Debugf("drained stale IPC message during cleanup") + default: + // Channel is not full, good + } + } +} diff --git a/core/task/handler/runner_config.go b/core/task/handler/runner_config.go new file mode 100644 index 00000000..627c65af --- /dev/null +++ b/core/task/handler/runner_config.go @@ -0,0 +1,179 @@ +package handler + +import ( + "bufio" + "fmt" + "os" + "path/filepath" + "runtime" + "strings" + "syscall" + + "github.com/crawlab-team/crawlab/core/entity" + "github.com/crawlab-team/crawlab/core/models/client" + "github.com/crawlab-team/crawlab/core/models/models" + "github.com/crawlab-team/crawlab/core/utils" +) + +// configurePythonPath sets up the Python environment paths, handling both pyenv and default installations +func (r *Runner) configurePythonPath() { + // Configure global node_modules path + pyenvRoot := utils.GetPyenvPath() + pyenvShimsPath := pyenvRoot + "/shims" + pyenvBinPath := pyenvRoot + "/bin" + + // Configure global pyenv path + _ = os.Setenv("PYENV_ROOT", pyenvRoot) + _ = os.Setenv("PATH", pyenvShimsPath+":"+os.Getenv("PATH")) + _ = os.Setenv("PATH", pyenvBinPath+":"+os.Getenv("PATH")) +} + +// configureNodePath sets up the Node.js environment paths, handling both nvm and default installations +func (r *Runner) configureNodePath() { + // Configure nvm-based Node.js paths + envPath := os.Getenv("PATH") + + // Configure global node_modules path + nodePath := utils.GetNodeModulesPath() + if !strings.Contains(envPath, nodePath) { + _ = os.Setenv("PATH", nodePath+":"+envPath) + } + _ = os.Setenv("NODE_PATH", nodePath) + + // Configure global node_bin path + nodeBinPath := utils.GetNodeBinPath() + if !strings.Contains(envPath, nodeBinPath) { + _ = os.Setenv("PATH", nodeBinPath+":"+os.Getenv("PATH")) + } +} + +func (r *Runner) configureGoPath() { + // Configure global go path + goPath := utils.GetGoPath() + if goPath != "" { + _ = os.Setenv("GOPATH", goPath) + } +} + +// configureEnv sets up the environment variables for the task process, including: +// - Node.js paths +// - Crawlab-specific variables +// - Global environment variables from the system +func (r *Runner) configureEnv() { + // Configure Python path + r.configurePythonPath() + + // Configure Node.js path + r.configureNodePath() + + // Configure Go path + r.configureGoPath() + + // Default envs + r.cmd.Env = os.Environ() + + // Remove CRAWLAB_ prefixed environment variables + for i := 0; i < len(r.cmd.Env); i++ { + env := r.cmd.Env[i] + if strings.HasPrefix(env, "CRAWLAB_") { + r.cmd.Env = append(r.cmd.Env[:i], r.cmd.Env[i+1:]...) + i-- + } + } + + // Task-specific environment variables + r.cmd.Env = append(r.cmd.Env, "CRAWLAB_TASK_ID="+r.tid.Hex()) + + // Global environment variables + envs, err := client.NewModelService[models.Environment]().GetMany(nil, nil) + if err != nil { + r.Errorf("failed to get environments: %v", err) + } + for _, env := range envs { + r.cmd.Env = append(r.cmd.Env, env.Key+"="+env.Value) + } + + // Add environment variable for child processes to identify they're running under Crawlab + r.cmd.Env = append(r.cmd.Env, "CRAWLAB_PARENT_PID="+fmt.Sprint(os.Getpid())) +} + +// configureCwd sets the working directory for the task based on the spider's configuration +func (r *Runner) configureCwd() { + workspacePath := utils.GetWorkspace() + if r.s.GitId.IsZero() { + // not git + r.cwd = filepath.Join(workspacePath, r.s.Id.Hex()) + } else { + // git + r.cwd = filepath.Join(workspacePath, r.s.GitId.Hex(), r.s.GitRootPath) + } +} + +// configureCmd builds and configures the command to be executed, including setting up IPC pipes +// and processing command parameters +func (r *Runner) configureCmd() (err error) { + var cmdStr string + + // command + if r.t.Cmd == "" { + cmdStr = r.s.Cmd + } else { + cmdStr = r.t.Cmd + } + + // parameters + if r.t.Param != "" { + cmdStr += " " + r.t.Param + } else if r.s.Param != "" { + cmdStr += " " + r.s.Param + } + + // get cmd instance + r.cmd, err = utils.BuildCmd(cmdStr) + if err != nil { + r.Errorf("error building command: %v", err) + return err + } + + // set working directory + r.cmd.Dir = r.cwd + + // ZOMBIE PREVENTION: Set process group to enable proper cleanup of child processes + if runtime.GOOS != "windows" { + // Create new process group on Unix systems to ensure child processes can be killed together + r.cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, // Create new process group + Pgid: 0, // Use process ID as process group ID + } + } + + // Configure pipes for IPC and logs + r.stdinPipe, err = r.cmd.StdinPipe() + if err != nil { + r.Errorf("error creating stdin pipe: %v", err) + return err + } + + // Add stdout pipe for IPC and logs + r.stdoutPipe, err = r.cmd.StdoutPipe() + if err != nil { + r.Errorf("error creating stdout pipe: %v", err) + return err + } + + // Add stderr pipe for error logs + stderrPipe, err := r.cmd.StderrPipe() + if err != nil { + r.Errorf("error creating stderr pipe: %v", err) + return err + } + + // Create buffered readers + r.readerStdout = bufio.NewReader(r.stdoutPipe) + r.readerStderr = bufio.NewReader(stderrPipe) + + // Initialize IPC channel + r.ipcChan = make(chan entity.IPCMessage) + + return nil +} diff --git a/core/task/handler/runner_ipc.go b/core/task/handler/runner_ipc.go new file mode 100644 index 00000000..667c2d95 --- /dev/null +++ b/core/task/handler/runner_ipc.go @@ -0,0 +1,223 @@ +/* + * Copyright (c) 2025. Core Digital Limited + * 版权所有 (c) 2025. 重庆科锐数研科技有限公司 (Core Digital Limited) + * All rights reserved. 保留所有权利。 + * + * 该软件由 重庆科锐数研科技有限公司 (Core Digital Limited) 开发,未经明确书面许可,任何人不得使用、复制、修改或分发该软件的任何部分。 + * This software is developed by Core Digital Limited. No one is permitted to use, copy, modify, or distribute this software without explicit written permission. + * + * 许可证: + * 该软件仅供授权使用。授权用户有权在授权范围内使用、复制、修改和分发该软件。 + * License: + * This software is for authorized use only. Authorized users are permitted to use, copy, modify, and distribute this software within the scope of their authorization. + * + * 免责声明: + * 该软件按"原样"提供,不附带任何明示或暗示的担保,包括但不限于对适销性和适用于特定目的的担保。在任何情况下,版权持有者或其许可方对因使用该软件而产生的任何损害或其他责任概不负责。 + * Disclaimer: + * This software is provided "as is," without any express or implied warranties, including but not limited to warranties of merchantability and fitness for a particular purpose. In no event shall the copyright holder or its licensors be liable for any damages or other liability arising from the use of this software. + */ + +package handler + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/crawlab-team/crawlab/core/constants" + "github.com/crawlab-team/crawlab/core/entity" + "github.com/crawlab-team/crawlab/grpc" +) + +// handleIPC processes incoming IPC messages from the child process +// Messages are converted to JSON and written to the child process's stdin +func (r *Runner) handleIPC() { + r.wg.Add(1) + defer r.wg.Done() + + for msg := range r.ipcChan { + // Convert message to JSON + jsonData, err := json.Marshal(msg) + if err != nil { + r.Errorf("error marshaling IPC message: %v", err) + continue + } + + // Write to child process's stdin + _, err = fmt.Fprintln(r.stdinPipe, string(jsonData)) + if err != nil { + r.Errorf("error writing to child process: %v", err) + } + } +} + +// SetIPCHandler sets the handler for incoming IPC messages +func (r *Runner) SetIPCHandler(handler func(entity.IPCMessage)) { + r.ipcHandler = handler +} + +// startIPCReader continuously reads IPC messages from the child process's stdout +// Messages are parsed and either handled by the IPC handler or written to logs +func (r *Runner) startIPCReader() { + r.wg.Add(2) // Add 2 to wait group for both stdout and stderr readers + + // Start stdout reader + go func() { + defer r.wg.Done() + r.readOutput(r.readerStdout, true) // true for stdout + }() + + // Start stderr reader + go func() { + defer r.wg.Done() + r.readOutput(r.readerStderr, false) // false for stderr + }() +} + +// handleIPCInsertDataMessage converts the IPC message payload to JSON and sends it to the master node +func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { + if ipcMsg.Payload == nil { + r.Errorf("empty payload in IPC message") + return + } + + // Convert payload to data to be inserted + var records []map[string]interface{} + + switch payload := ipcMsg.Payload.(type) { + case []interface{}: // Handle array of objects + records = make([]map[string]interface{}, 0, len(payload)) + for i, item := range payload { + if itemMap, ok := item.(map[string]interface{}); ok { + records = append(records, itemMap) + } else { + r.Errorf("invalid record at index %d: %v", i, item) + continue + } + } + case []map[string]interface{}: // Handle direct array of maps + records = payload + case map[string]interface{}: // Handle single object + records = []map[string]interface{}{payload} + case interface{}: // Handle generic interface + if itemMap, ok := payload.(map[string]interface{}); ok { + records = []map[string]interface{}{itemMap} + } else { + r.Errorf("invalid payload type: %T", payload) + return + } + default: + r.Errorf("unsupported payload type: %T, value: %v", payload, ipcMsg.Payload) + return + } + + // Validate records + if len(records) == 0 { + r.Warnf("no valid records to insert") + return + } + + // Marshal data with error handling + data, err := json.Marshal(records) + if err != nil { + r.Errorf("error marshaling records: %v", err) + return + } + + // Check if context is cancelled or connection is closed + select { + case <-r.ctx.Done(): + return + default: + } + + // Use connection with mutex for thread safety + r.connMutex.RLock() + conn := r.conn + r.connMutex.RUnlock() + + // Validate connection + if conn == nil { + r.Errorf("gRPC connection not initialized") + return + } + + // Send IPC message to master with context and timeout + ctx, cancel := context.WithTimeout(context.Background(), r.ipcTimeout) + defer cancel() + + // Create gRPC message + grpcMsg := &grpc.TaskServiceConnectRequest{ + Code: grpc.TaskServiceConnectCode_INSERT_DATA, + TaskId: r.tid.Hex(), + Data: data, + } + + // Use context for sending + select { + case <-ctx.Done(): + r.Errorf("timeout sending IPC message") + return + case <-r.ctx.Done(): + return + default: + if err := conn.Send(grpcMsg); err != nil { + // Don't log errors if context is cancelled (expected during shutdown) + select { + case <-r.ctx.Done(): + return + default: + r.Errorf("error sending IPC message: %v", err) + // Mark connection as unhealthy for reconnection + r.lastConnCheck = time.Time{} + } + return + } + } +} + +func (r *Runner) readOutput(reader *bufio.Reader, isStdout bool) { + scanner := bufio.NewScanner(reader) + for { + select { + case <-r.ctx.Done(): + // Context cancelled, stop reading + return + default: + // Scan the next line + if !scanner.Scan() { + return + } + + // Get the line + line := scanner.Text() + + // Trim the line + line = strings.TrimRight(line, "\n\r") + + // For stdout, try to parse as IPC message first + if isStdout { + var ipcMsg entity.IPCMessage + if err := json.Unmarshal([]byte(line), &ipcMsg); err == nil && ipcMsg.IPC { + if r.ipcHandler != nil { + r.ipcHandler(ipcMsg) + } else { + // Default handler (insert data) + if ipcMsg.Type == "" || ipcMsg.Type == constants.IPCMessageData { + r.handleIPCInsertDataMessage(ipcMsg) + } else { + r.Warnf("no IPC handler set for message: %v", ipcMsg) + } + } + continue + } + } + + // If not an IPC message or from stderr, treat as log + r.writeLogLines([]string{line}) + } + } +} diff --git a/core/task/handler/runner_log.go b/core/task/handler/runner_log.go new file mode 100644 index 00000000..3733aad4 --- /dev/null +++ b/core/task/handler/runner_log.go @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2025. Core Digital Limited + * 版权所有 (c) 2025. 重庆科锐数研科技有限公司 (Core Digital Limited) + * All rights reserved. 保留所有权利。 + * + * 该软件由 重庆科锐数研科技有限公司 (Core Digital Limited) 开发,未经明确书面许可,任何人不得使用、复制、修改或分发该软件的任何部分。 + * This software is developed by Core Digital Limited. No one is permitted to use, copy, modify, or distribute this software without explicit written permission. + * + * 许可证: + * 该软件仅供授权使用。授权用户有权在授权范围内使用、复制、修改和分发该软件。 + * License: + * This software is for authorized use only. Authorized users are permitted to use, copy, modify, and distribute this software within the scope of their authorization. + * + * 免责声明: + * 该软件按"原样"提供,不附带任何明示或暗示的担保,包括但不限于对适销性和适用于特定目的的担保。在任何情况下,版权持有者或其许可方对因使用该软件而产生的任何损害或其他责任概不负责。 + * Disclaimer: + * This software is provided "as is," without any express or implied warranties, including but not limited to warranties of merchantability and fitness for a particular purpose. In no event shall the copyright holder or its licensors be liable for any damages or other liability arising from the use of this software. + */ + +package handler + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/crawlab-team/crawlab/grpc" +) + +// writeLogLines marshals log lines to JSON and sends them to the task service +// Uses connection-safe approach for robust task execution +func (r *Runner) writeLogLines(lines []string) { + // Check if context is cancelled or connection is closed + select { + case <-r.ctx.Done(): + return + default: + } + + // Use connection with mutex for thread safety + r.connMutex.RLock() + conn := r.conn + r.connMutex.RUnlock() + + // Check if connection is available + if conn == nil { + r.Debugf("no connection available for sending log lines") + return + } + + linesBytes, err := json.Marshal(lines) + if err != nil { + r.Errorf("error marshaling log lines: %v", err) + return + } + + msg := &grpc.TaskServiceConnectRequest{ + Code: grpc.TaskServiceConnectCode_INSERT_LOGS, + TaskId: r.tid.Hex(), + Data: linesBytes, + } + + if err := conn.Send(msg); err != nil { + // Don't log errors if context is cancelled (expected during shutdown) + select { + case <-r.ctx.Done(): + return + default: + r.Errorf("error sending log lines: %v", err) + // Mark connection as unhealthy for reconnection + r.lastConnCheck = time.Time{} + } + return + } +} + +// logInternally sends internal runner logs to the same logging system as the task +func (r *Runner) logInternally(level string, message string) { + // Format the internal log with a prefix + timestamp := time.Now().Local().Format("2006-01-02 15:04:05") + + // Pad level + level = fmt.Sprintf("%-5s", level) + + // Format the log message + internalLog := fmt.Sprintf("%s [%s] [%s] %s", level, timestamp, "Crawlab", message) + + // Send to the same log system as task logs + // Only send if context is not cancelled and connection is available + if r.conn != nil { + select { + case <-r.ctx.Done(): + // Context cancelled, don't send logs + default: + go r.writeLogLines([]string{internalLog}) + } + } + + // Also log through the standard logger + switch level { + case "ERROR": + r.Logger.Error(message) + case "WARN": + r.Logger.Warn(message) + case "INFO": + r.Logger.Info(message) + case "DEBUG": + r.Logger.Debug(message) + } +} + +func (r *Runner) Error(message string) { + msg := fmt.Sprintf(message) + r.logInternally("ERROR", msg) +} + +func (r *Runner) Warn(message string) { + msg := fmt.Sprintf(message) + r.logInternally("WARN", msg) +} + +func (r *Runner) Info(message string) { + msg := fmt.Sprintf(message) + r.logInternally("INFO", msg) +} + +func (r *Runner) Debug(message string) { + msg := fmt.Sprintf(message) + r.logInternally("DEBUG", msg) +} + +func (r *Runner) Errorf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + r.logInternally("ERROR", msg) +} + +func (r *Runner) Warnf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + r.logInternally("WARN", msg) +} + +func (r *Runner) Infof(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + r.logInternally("INFO", msg) +} + +func (r *Runner) Debugf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + r.logInternally("DEBUG", msg) +} diff --git a/core/task/handler/runner_sync.go b/core/task/handler/runner_sync.go new file mode 100644 index 00000000..2c211220 --- /dev/null +++ b/core/task/handler/runner_sync.go @@ -0,0 +1,214 @@ +package handler + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/crawlab-team/crawlab/core/entity" + "github.com/crawlab-team/crawlab/core/utils" +) + +// syncFiles synchronizes files between master and worker nodes: +// 1. Gets file list from master +// 2. Compares with local files +// 3. Downloads new/modified files +// 4. Deletes files that no longer exist on master +func (r *Runner) syncFiles() (err error) { + r.Infof("starting file synchronization for spider: %s", r.s.Id.Hex()) + + workingDir := "" + if !r.s.GitId.IsZero() { + workingDir = r.s.GitRootPath + r.Debugf("using git root path: %s", workingDir) + } + + // get file list from master + r.Infof("fetching file list from master node") + params := url.Values{ + "path": []string{workingDir}, + } + resp, err := r.performHttpRequest("GET", "/scan", params) + if err != nil { + r.Errorf("error getting file list from master: %v", err) + return err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + r.Errorf("error reading response body: %v", err) + return err + } + var response struct { + Data entity.FsFileInfoMap `json:"data"` + } + err = json.Unmarshal(body, &response) + if err != nil { + r.Errorf("error unmarshaling JSON for URL: %s", resp.Request.URL.String()) + r.Errorf("error details: %v", err) + return err + } + + // create a map for master files + masterFilesMap := make(entity.FsFileInfoMap) + for _, file := range response.Data { + masterFilesMap[file.Path] = file + } + + // create working directory if not exists + if _, err := os.Stat(r.cwd); os.IsNotExist(err) { + if err := os.MkdirAll(r.cwd, os.ModePerm); err != nil { + r.Errorf("error creating worker directory: %v", err) + return err + } + } + + // get file list from worker + workerFiles, err := utils.ScanDirectory(r.cwd) + if err != nil { + r.Errorf("error scanning worker directory: %v", err) + return err + } + + // delete files that are deleted on master node + for path, workerFile := range workerFiles { + if _, exists := masterFilesMap[path]; !exists { + r.Infof("deleting file: %s", path) + err := os.Remove(workerFile.FullPath) + if err != nil { + r.Errorf("error deleting file: %v", err) + } + } + } + + // set up wait group and error channel + var wg sync.WaitGroup + pool := make(chan struct{}, 10) + + // download files that are new or modified on master node + for path, masterFile := range masterFilesMap { + workerFile, exists := workerFiles[path] + if !exists || masterFile.Hash != workerFile.Hash { + wg.Add(1) + + // acquire token + pool <- struct{}{} + + // start goroutine to synchronize file or directory + go func(path string, masterFile *entity.FsFileInfo) { + defer wg.Done() + + if masterFile.IsDir { + r.Infof("directory needs to be synchronized: %s", path) + _err := os.MkdirAll(filepath.Join(r.cwd, path), masterFile.Mode) + if _err != nil { + r.Errorf("error creating directory: %v", _err) + err = errors.Join(err, _err) + } + } else { + r.Infof("file needs to be synchronized: %s", path) + _err := r.downloadFile(path, filepath.Join(r.cwd, path), masterFile) + if _err != nil { + r.Errorf("error downloading file: %v", _err) + err = errors.Join(err, _err) + } + } + + // release token + <-pool + + }(path, &masterFile) + } + } + + // wait for all files and directories to be synchronized + wg.Wait() + + r.Infof("file synchronization completed successfully") + return err +} + +func (r *Runner) performHttpRequest(method, path string, params url.Values) (*http.Response, error) { + // Normalize path + path = strings.TrimPrefix(path, "/") + + // Construct master URL + var id string + if r.s.GitId.IsZero() { + id = r.s.Id.Hex() + } else { + id = r.s.GitId.Hex() + } + requestUrl := fmt.Sprintf("%s/sync/%s/%s?%s", utils.GetApiEndpoint(), id, path, params.Encode()) + + // Create and execute request + req, err := http.NewRequest(method, requestUrl, nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %v", err) + } + + for k, v := range r.getHttpRequestHeaders() { + req.Header.Set(k, v) + } + + return http.DefaultClient.Do(req) +} + +// downloadFile downloads a file from the master node and saves it to the local file system +func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsFileInfo) error { + r.Debugf("downloading file: %s -> %s", path, filePath) + params := url.Values{ + "path": []string{path}, + } + resp, err := r.performHttpRequest("GET", "/download", params) + if err != nil { + r.Errorf("error getting file response: %v", err) + return err + } + if resp.StatusCode != http.StatusOK { + r.Errorf("error downloading file: %s", resp.Status) + return errors.New(resp.Status) + } + defer resp.Body.Close() + + // create directory if not exists + dirPath := filepath.Dir(filePath) + utils.Exists(dirPath) + err = os.MkdirAll(dirPath, os.ModePerm) + if err != nil { + r.Errorf("error creating directory: %v", err) + return err + } + + // create local file + out, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fileInfo.Mode) + if err != nil { + r.Errorf("error creating file: %v", err) + return err + } + defer out.Close() + + // copy file content to local file + _, err = io.Copy(out, resp.Body) + if err != nil { + r.Errorf("error copying file: %v", err) + return err + } + + r.Debugf("successfully downloaded file: %s (size: %d bytes)", path, fileInfo.FileSize) + return nil +} + +// getHttpRequestHeaders returns the headers for HTTP requests to the master node +func (r *Runner) getHttpRequestHeaders() (headers map[string]string) { + headers = make(map[string]string) + headers["Authorization"] = utils.GetAuthKey() + return headers +} diff --git a/core/task/handler/service.go b/core/task/handler/service.go index 89abc230..6f1abf33 100644 --- a/core/task/handler/service.go +++ b/core/task/handler/service.go @@ -2,9 +2,7 @@ package handler import ( "context" - "errors" "fmt" - "io" "runtime" "sync" "time" @@ -55,137 +53,6 @@ type Service struct { interfaces.Logger } -// StreamManager manages task streams without goroutine leaks -type StreamManager struct { - streams sync.Map // map[primitive.ObjectID]*TaskStream - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - service *Service - messageQueue chan *StreamMessage - maxStreams int -} - -// TaskStream represents a single task's stream -type TaskStream struct { - taskId primitive.ObjectID - stream grpc.TaskService_SubscribeClient - ctx context.Context - cancel context.CancelFunc - lastActive time.Time - mu sync.RWMutex -} - -// StreamMessage represents a message from a stream -type StreamMessage struct { - taskId primitive.ObjectID - msg *grpc.TaskServiceSubscribeResponse - err error -} - -// taskRequest represents a task execution request -type taskRequest struct { - taskId primitive.ObjectID -} - -// TaskWorkerPool manages a bounded pool of workers for task execution -type TaskWorkerPool struct { - workers int - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - taskQueue chan taskRequest - service *Service -} - -func NewTaskWorkerPool(workers int, service *Service) *TaskWorkerPool { - ctx, cancel := context.WithCancel(context.Background()) - // Use a more generous queue size to handle task bursts - // Queue size is workers * 5 to allow for better buffering - queueSize := workers * 5 - if queueSize < 50 { - queueSize = 50 // Minimum queue size - } - - return &TaskWorkerPool{ - workers: workers, - ctx: ctx, - cancel: cancel, - taskQueue: make(chan taskRequest, queueSize), - service: service, - } -} - -func (pool *TaskWorkerPool) Start() { - for i := 0; i < pool.workers; i++ { - pool.wg.Add(1) - go pool.worker(i) - } -} - -func (pool *TaskWorkerPool) Stop() { - pool.cancel() - close(pool.taskQueue) - pool.wg.Wait() -} - -func (pool *TaskWorkerPool) SubmitTask(taskId primitive.ObjectID) error { - req := taskRequest{ - taskId: taskId, - } - - select { - case pool.taskQueue <- req: - pool.service.Debugf("task[%s] queued for parallel execution, queue usage: %d/%d", - taskId.Hex(), len(pool.taskQueue), cap(pool.taskQueue)) - return nil // Return immediately - task will execute in parallel - case <-pool.ctx.Done(): - return fmt.Errorf("worker pool is shutting down") - default: - queueLen := len(pool.taskQueue) - queueCap := cap(pool.taskQueue) - pool.service.Warnf("task queue is full (%d/%d), consider increasing task.workers configuration", - queueLen, queueCap) - return fmt.Errorf("task queue is full (%d/%d), consider increasing task.workers configuration", - queueLen, queueCap) - } -} - -func (pool *TaskWorkerPool) worker(workerID int) { - defer pool.wg.Done() - defer func() { - if r := recover(); r != nil { - pool.service.Errorf("worker %d panic recovered: %v", workerID, r) - } - }() - - pool.service.Debugf("worker %d started", workerID) - - for { - select { - case <-pool.ctx.Done(): - pool.service.Debugf("worker %d shutting down", workerID) - return - case req, ok := <-pool.taskQueue: - if !ok { - pool.service.Debugf("worker %d: task queue closed", workerID) - return - } - - // Execute task asynchronously - each worker handles one task at a time - // but multiple workers can process different tasks simultaneously - pool.service.Debugf("worker %d processing task[%s]", workerID, req.taskId.Hex()) - err := pool.service.executeTask(req.taskId) - if err != nil { - pool.service.Errorf("worker %d failed to execute task[%s]: %v", - workerID, req.taskId.Hex(), err) - } else { - pool.service.Debugf("worker %d completed task[%s]", workerID, req.taskId.Hex()) - } - } - } -} - func (svc *Service) Start() { // Initialize context for graceful shutdown svc.ctx, svc.cancel = context.WithCancel(context.Background()) @@ -308,14 +175,6 @@ func (svc *Service) startGoroutineMonitoring() { }() } -func (svc *Service) Run(taskId primitive.ObjectID) (err error) { - return svc.runTask(taskId) -} - -func (svc *Service) Cancel(taskId primitive.ObjectID, force bool) (err error) { - return svc.cancelTask(taskId, force) -} - func (svc *Service) fetchAndRunTasks() { defer svc.wg.Done() defer func() { @@ -507,35 +366,6 @@ func (svc *Service) getRunnerCount() (count int) { return count } -func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) { - svc.Debugf("get runner: taskId[%v]", taskId) - v, ok := svc.runners.Load(taskId) - if !ok { - err = fmt.Errorf("task[%s] not exists", taskId.Hex()) - svc.Errorf("get runner error: %v", err) - return nil, err - } - switch v := v.(type) { - case interfaces.TaskRunner: - r = v - default: - err = fmt.Errorf("invalid type: %T", v) - svc.Errorf("get runner error: %v", err) - return nil, err - } - return r, nil -} - -func (svc *Service) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) { - svc.Debugf("add runner: taskId[%s]", taskId.Hex()) - svc.runners.Store(taskId, r) -} - -func (svc *Service) deleteRunner(taskId primitive.ObjectID) { - svc.Debugf("delete runner: taskId[%v]", taskId) - svc.runners.Delete(taskId) -} - func (svc *Service) updateNodeStatus() (err error) { // current node n, err := svc.GetCurrentNode() @@ -581,411 +411,6 @@ func (svc *Service) fetchTask() (tid primitive.ObjectID, err error) { return tid, nil } -func (svc *Service) runTask(taskId primitive.ObjectID) (err error) { - // attempt to get runner from pool - _, ok := svc.runners.Load(taskId) - if ok { - err = fmt.Errorf("task[%s] already exists", taskId.Hex()) - svc.Errorf("run task error: %v", err) - return err - } - - // Use worker pool for bounded task execution - return svc.workerPool.SubmitTask(taskId) -} - -// executeTask is the actual task execution logic called by worker pool -func (svc *Service) executeTask(taskId primitive.ObjectID) (err error) { - // attempt to get runner from pool - _, ok := svc.runners.Load(taskId) - if ok { - err = fmt.Errorf("task[%s] already exists", taskId.Hex()) - svc.Errorf("execute task error: %v", err) - return err - } - - // create a new task runner - r, err := newTaskRunner(taskId, svc) - if err != nil { - err = fmt.Errorf("failed to create task runner: %v", err) - svc.Errorf("execute task error: %v", err) - return err - } - - // add runner to pool - svc.addRunner(taskId, r) - - // Ensure cleanup always happens - defer func() { - if rec := recover(); rec != nil { - svc.Errorf("task[%s] panic recovered: %v", taskId.Hex(), rec) - } - // Always cleanup runner from pool and stream - svc.deleteRunner(taskId) - svc.streamManager.RemoveTaskStream(taskId) - }() - - // Add task to stream manager for cancellation support - if err := svc.streamManager.AddTaskStream(r.GetTaskId()); err != nil { - svc.Warnf("failed to add task[%s] to stream manager: %v", r.GetTaskId().Hex(), err) - svc.Warnf("task[%s] will not be able to receive cancellation messages", r.GetTaskId().Hex()) - } else { - svc.Debugf("task[%s] added to stream manager for cancellation support", r.GetTaskId().Hex()) - } - - // run task process (blocking) error or finish after task runner ends - if err := r.Run(); err != nil { - switch { - case errors.Is(err, constants.ErrTaskError): - svc.Errorf("task[%s] finished with error: %v", r.GetTaskId().Hex(), err) - case errors.Is(err, constants.ErrTaskCancelled): - svc.Infof("task[%s] cancelled", r.GetTaskId().Hex()) - default: - svc.Errorf("task[%s] finished with unknown error: %v", r.GetTaskId().Hex(), err) - } - } else { - svc.Infof("task[%s] finished successfully", r.GetTaskId().Hex()) - } - - return err -} - -// subscribeTask attempts to subscribe to task stream -func (svc *Service) subscribeTask(taskId primitive.ObjectID) (stream grpc.TaskService_SubscribeClient, err error) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - req := &grpc.TaskServiceSubscribeRequest{ - TaskId: taskId.Hex(), - } - taskClient, err := svc.c.GetTaskClient() - if err != nil { - return nil, fmt.Errorf("failed to get task client: %v", err) - } - stream, err = taskClient.Subscribe(ctx, req) - if err != nil { - svc.Errorf("failed to subscribe task[%s]: %v", taskId.Hex(), err) - return nil, err - } - return stream, nil -} - -// subscribeTaskWithRetry attempts to subscribe to task stream with retry logic -func (svc *Service) subscribeTaskWithRetry(ctx context.Context, taskId primitive.ObjectID, maxRetries int) (stream grpc.TaskService_SubscribeClient, err error) { - for i := 0; i < maxRetries; i++ { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - stream, err = svc.subscribeTask(taskId) - if err == nil { - return stream, nil - } - - svc.Warnf("failed to subscribe task[%s] (attempt %d/%d): %v", taskId.Hex(), i+1, maxRetries, err) - - if i < maxRetries-1 { - // Wait before retry with exponential backoff - backoff := time.Duration(i+1) * time.Second - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(backoff): - } - } - } - - return nil, fmt.Errorf("failed to subscribe after %d retries: %w", maxRetries, err) -} - -func (svc *Service) handleStreamMessagesSync(ctx context.Context, taskId primitive.ObjectID, stream grpc.TaskService_SubscribeClient) { - defer func() { - if r := recover(); r != nil { - svc.Errorf("handleStreamMessagesSync[%s] panic recovered: %v", taskId.Hex(), r) - } - // Ensure stream is properly closed - if stream != nil { - if err := stream.CloseSend(); err != nil { - svc.Debugf("task[%s] failed to close stream: %v", taskId.Hex(), err) - } - } - }() - - svc.Debugf("task[%s] starting synchronous stream message handling", taskId.Hex()) - - for { - select { - case <-ctx.Done(): - svc.Debugf("task[%s] stream handler stopped by context", taskId.Hex()) - return - case <-svc.ctx.Done(): - svc.Debugf("task[%s] stream handler stopped by service context", taskId.Hex()) - return - default: - // Set a reasonable timeout for stream receive - recvCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - - // Create a done channel to handle the recv operation - done := make(chan struct { - msg *grpc.TaskServiceSubscribeResponse - err error - }, 1) - - // Use a goroutine only for the blocking recv call, but ensure it's cleaned up - go func() { - defer cancel() - msg, err := stream.Recv() - select { - case done <- struct { - msg *grpc.TaskServiceSubscribeResponse - err error - }{msg, err}: - case <-recvCtx.Done(): - // Timeout occurred, abandon this receive - } - }() - - select { - case result := <-done: - cancel() // Clean up the context - if result.err != nil { - if errors.Is(result.err, io.EOF) { - svc.Infof("task[%s] received EOF, stream closed", taskId.Hex()) - return - } - svc.Errorf("task[%s] stream error: %v", taskId.Hex(), result.err) - return - } - svc.processStreamMessage(taskId, result.msg) - case <-recvCtx.Done(): - cancel() - // Timeout on receive - continue to next iteration - svc.Debugf("task[%s] stream receive timeout", taskId.Hex()) - case <-ctx.Done(): - cancel() - return - } - } - } -} - -func (svc *Service) handleStreamMessages(taskId primitive.ObjectID, stream grpc.TaskService_SubscribeClient, stopCh chan struct{}) { - defer func() { - if r := recover(); r != nil { - svc.Errorf("handleStreamMessages[%s] panic recovered: %v", taskId.Hex(), r) - } - // Ensure stream is properly closed - if stream != nil { - if err := stream.CloseSend(); err != nil { - svc.Debugf("task[%s] failed to close stream: %v", taskId.Hex(), err) - } - } - }() - - // Create timeout for stream operations - streamTimeout := 30 * time.Second - - for { - select { - case <-stopCh: - svc.Debugf("task[%s] stream handler received stop signal", taskId.Hex()) - return - case <-svc.ctx.Done(): - svc.Debugf("task[%s] stream handler stopped by service context", taskId.Hex()) - return - default: - // Set deadline for receive operation - ctx, cancel := context.WithTimeout(context.Background(), streamTimeout) - - // Create a buffered channel to receive the result - result := make(chan struct { - msg *grpc.TaskServiceSubscribeResponse - err error - }, 1) - - // Use a single goroutine to handle the blocking Recv call - go func() { - defer func() { - if r := recover(); r != nil { - svc.Errorf("task[%s] stream recv goroutine panic: %v", taskId.Hex(), r) - } - }() - - msg, err := stream.Recv() - select { - case result <- struct { - msg *grpc.TaskServiceSubscribeResponse - err error - }{msg, err}: - case <-ctx.Done(): - // Context cancelled, don't send result - } - }() - - select { - case res := <-result: - cancel() - if res.err != nil { - if errors.Is(res.err, io.EOF) { - svc.Infof("task[%s] received EOF, stream closed", taskId.Hex()) - return - } - svc.Errorf("task[%s] stream error: %v", taskId.Hex(), res.err) - return - } - svc.processStreamMessage(taskId, res.msg) - case <-ctx.Done(): - cancel() - svc.Warnf("task[%s] stream receive timeout", taskId.Hex()) - // Continue loop to try again - case <-stopCh: - cancel() - return - case <-svc.ctx.Done(): - cancel() - return - } - } - } -} - -func (svc *Service) processStreamMessage(taskId primitive.ObjectID, msg *grpc.TaskServiceSubscribeResponse) { - switch msg.Code { - case grpc.TaskServiceSubscribeCode_CANCEL: - svc.Infof("task[%s] received cancel signal", taskId.Hex()) - // Handle cancel synchronously to avoid goroutine accumulation - svc.handleCancel(msg, taskId) - default: - svc.Debugf("task[%s] received unknown stream message code: %v", taskId.Hex(), msg.Code) - } -} - -func (svc *Service) handleCancel(msg *grpc.TaskServiceSubscribeResponse, taskId primitive.ObjectID) { - // validate task id - if msg.TaskId != taskId.Hex() { - svc.Errorf("task[%s] received cancel signal for another task[%s]", taskId.Hex(), msg.TaskId) - return - } - - // cancel task - err := svc.cancelTask(taskId, msg.Force) - if err != nil { - svc.Errorf("task[%s] failed to cancel: %v", taskId.Hex(), err) - return - } - svc.Infof("task[%s] cancelled", taskId.Hex()) - - // set task status as "cancelled" - t, err := svc.GetTaskById(taskId) - if err != nil { - svc.Errorf("task[%s] failed to get task: %v", taskId.Hex(), err) - return - } - t.Status = constants.TaskStatusCancelled - err = svc.UpdateTask(t) - if err != nil { - svc.Errorf("task[%s] failed to update task: %v", taskId.Hex(), err) - } -} - -func (svc *Service) cancelTask(taskId primitive.ObjectID, force bool) (err error) { - r, err := svc.getRunner(taskId) - if err != nil { - // Runner not found, task might already be finished - svc.Warnf("runner not found for task[%s]: %v", taskId.Hex(), err) - return nil - } - - // Attempt cancellation with timeout - cancelCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) - defer cancelFunc() - - cancelDone := make(chan error, 1) - go func() { - cancelDone <- r.Cancel(force) - }() - - select { - case err = <-cancelDone: - if err != nil { - svc.Errorf("failed to cancel task[%s]: %v", taskId.Hex(), err) - // If cancellation failed and force is not set, try force cancellation - if !force { - svc.Warnf("escalating to force cancellation for task[%s]", taskId.Hex()) - return svc.cancelTask(taskId, true) - } - return err - } - svc.Infof("task[%s] cancelled successfully", taskId.Hex()) - case <-cancelCtx.Done(): - svc.Errorf("timeout cancelling task[%s], removing runner from pool", taskId.Hex()) - // Remove runner from pool to prevent further issues - svc.runners.Delete(taskId) - return fmt.Errorf("task cancellation timeout") - } - - return nil -} - -// stopAllRunners gracefully stops all running tasks -func (svc *Service) stopAllRunners() { - svc.Infof("Stopping all running tasks...") - - var runnerIds []primitive.ObjectID - - // Collect all runner IDs - svc.runners.Range(func(key, value interface{}) bool { - if taskId, ok := key.(primitive.ObjectID); ok { - runnerIds = append(runnerIds, taskId) - } - return true - }) - - // Cancel all runners with bounded concurrency to prevent goroutine explosion - const maxConcurrentCancellations = 10 - var wg sync.WaitGroup - semaphore := make(chan struct{}, maxConcurrentCancellations) - - for _, taskId := range runnerIds { - wg.Add(1) - - // Acquire semaphore to limit concurrent cancellations - semaphore <- struct{}{} - - go func(tid primitive.ObjectID) { - defer func() { - <-semaphore // Release semaphore - wg.Done() - if r := recover(); r != nil { - svc.Errorf("stopAllRunners panic for task[%s]: %v", tid.Hex(), r) - } - }() - - if err := svc.cancelTask(tid, false); err != nil { - svc.Errorf("failed to cancel task[%s]: %v", tid.Hex(), err) - // Force cancel after timeout - time.Sleep(5 * time.Second) - _ = svc.cancelTask(tid, true) - } - }(taskId) - } - - // Wait for all cancellations with timeout - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - select { - case <-done: - svc.Infof("All tasks stopped gracefully") - case <-time.After(30 * time.Second): - svc.Warnf("Some tasks did not stop within timeout") - } -} - func (svc *Service) startStuckTaskCleanup() { svc.wg.Add(1) // Track this goroutine in the WaitGroup go func() { @@ -1067,222 +492,6 @@ func (svc *Service) checkAndCleanupStuckTasks() { } } -func NewStreamManager(service *Service) *StreamManager { - ctx, cancel := context.WithCancel(context.Background()) - return &StreamManager{ - ctx: ctx, - cancel: cancel, - service: service, - messageQueue: make(chan *StreamMessage, 100), // Buffered channel for messages - maxStreams: 50, // Limit concurrent streams - } -} - -func (sm *StreamManager) Start() { - sm.wg.Add(2) - go sm.messageProcessor() - go sm.streamCleaner() -} - -func (sm *StreamManager) Stop() { - sm.cancel() - close(sm.messageQueue) - - // Close all active streams - sm.streams.Range(func(key, value interface{}) bool { - if ts, ok := value.(*TaskStream); ok { - ts.Close() - } - return true - }) - - sm.wg.Wait() -} - -func (sm *StreamManager) AddTaskStream(taskId primitive.ObjectID) error { - // Check if we're at the stream limit - streamCount := 0 - sm.streams.Range(func(key, value interface{}) bool { - streamCount++ - return true - }) - - if streamCount >= sm.maxStreams { - return fmt.Errorf("stream limit reached (%d)", sm.maxStreams) - } - - // Create new stream - stream, err := sm.service.subscribeTask(taskId) - if err != nil { - return fmt.Errorf("failed to subscribe to task stream: %v", err) - } - - ctx, cancel := context.WithCancel(sm.ctx) - taskStream := &TaskStream{ - taskId: taskId, - stream: stream, - ctx: ctx, - cancel: cancel, - lastActive: time.Now(), - } - - sm.streams.Store(taskId, taskStream) - - // Start listening for messages in a single goroutine per stream - sm.wg.Add(1) - go sm.streamListener(taskStream) - - return nil -} - -func (sm *StreamManager) RemoveTaskStream(taskId primitive.ObjectID) { - if value, ok := sm.streams.LoadAndDelete(taskId); ok { - if ts, ok := value.(*TaskStream); ok { - ts.Close() - } - } -} - -func (sm *StreamManager) streamListener(ts *TaskStream) { - defer sm.wg.Done() - defer func() { - if r := recover(); r != nil { - sm.service.Errorf("stream listener panic for task[%s]: %v", ts.taskId.Hex(), r) - } - ts.Close() - sm.streams.Delete(ts.taskId) - }() - - sm.service.Debugf("stream listener started for task[%s]", ts.taskId.Hex()) - - for { - select { - case <-ts.ctx.Done(): - sm.service.Debugf("stream listener stopped for task[%s]", ts.taskId.Hex()) - return - case <-sm.ctx.Done(): - return - default: - msg, err := ts.stream.Recv() - - if err != nil { - if errors.Is(err, io.EOF) { - sm.service.Debugf("stream EOF for task[%s]", ts.taskId.Hex()) - return - } - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return - } - sm.service.Errorf("stream error for task[%s]: %v", ts.taskId.Hex(), err) - return - } - - // Update last active time - ts.mu.Lock() - ts.lastActive = time.Now() - ts.mu.Unlock() - - // Send message to processor - select { - case sm.messageQueue <- &StreamMessage{ - taskId: ts.taskId, - msg: msg, - err: nil, - }: - case <-ts.ctx.Done(): - return - case <-sm.ctx.Done(): - return - default: - sm.service.Warnf("message queue full, dropping message for task[%s]", ts.taskId.Hex()) - } - } - } -} - -func (sm *StreamManager) messageProcessor() { - defer sm.wg.Done() - defer func() { - if r := recover(); r != nil { - sm.service.Errorf("message processor panic: %v", r) - } - }() - - sm.service.Debugf("stream message processor started") - - for { - select { - case <-sm.ctx.Done(): - sm.service.Debugf("stream message processor shutting down") - return - case msg, ok := <-sm.messageQueue: - if !ok { - return - } - sm.processMessage(msg) - } - } -} - -func (sm *StreamManager) processMessage(streamMsg *StreamMessage) { - if streamMsg.err != nil { - sm.service.Errorf("stream message error for task[%s]: %v", streamMsg.taskId.Hex(), streamMsg.err) - return - } - - // Process the actual message - sm.service.processStreamMessage(streamMsg.taskId, streamMsg.msg) -} - -func (sm *StreamManager) streamCleaner() { - defer sm.wg.Done() - defer func() { - if r := recover(); r != nil { - sm.service.Errorf("stream cleaner panic: %v", r) - } - }() - - ticker := time.NewTicker(2 * time.Minute) - defer ticker.Stop() - - for { - select { - case <-sm.ctx.Done(): - return - case <-ticker.C: - sm.cleanupInactiveStreams() - } - } -} - -func (sm *StreamManager) cleanupInactiveStreams() { - now := time.Now() - inactiveThreshold := 10 * time.Minute - - sm.streams.Range(func(key, value interface{}) bool { - taskId := key.(primitive.ObjectID) - ts := value.(*TaskStream) - - ts.mu.RLock() - lastActive := ts.lastActive - ts.mu.RUnlock() - - if now.Sub(lastActive) > inactiveThreshold { - sm.service.Debugf("cleaning up inactive stream for task[%s]", taskId.Hex()) - sm.RemoveTaskStream(taskId) - } - - return true - }) -} - -func (ts *TaskStream) Close() { - ts.cancel() - if ts.stream != nil { - _ = ts.stream.CloseSend() - } -} - func newTaskHandlerService() *Service { // service svc := &Service{ diff --git a/core/task/handler/service_operations.go b/core/task/handler/service_operations.go new file mode 100644 index 00000000..a28dedce --- /dev/null +++ b/core/task/handler/service_operations.go @@ -0,0 +1,277 @@ +package handler + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/crawlab-team/crawlab/core/constants" + "github.com/crawlab-team/crawlab/core/interfaces" + "github.com/crawlab-team/crawlab/grpc" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +// Service operations for task management + +func (svc *Service) Run(taskId primitive.ObjectID) (err error) { + return svc.runTask(taskId) +} + +func (svc *Service) Cancel(taskId primitive.ObjectID, force bool) (err error) { + return svc.cancelTask(taskId, force) +} + +func (svc *Service) runTask(taskId primitive.ObjectID) (err error) { + // attempt to get runner from pool + _, ok := svc.runners.Load(taskId) + if ok { + err = fmt.Errorf("task[%s] already exists", taskId.Hex()) + svc.Errorf("run task error: %v", err) + return err + } + + // Use worker pool for bounded task execution + return svc.workerPool.SubmitTask(taskId) +} + +// executeTask is the actual task execution logic called by worker pool +func (svc *Service) executeTask(taskId primitive.ObjectID) (err error) { + // attempt to get runner from pool + _, ok := svc.runners.Load(taskId) + if ok { + err = fmt.Errorf("task[%s] already exists", taskId.Hex()) + svc.Errorf("execute task error: %v", err) + return err + } + + // create a new task runner + r, err := newTaskRunner(taskId, svc) + if err != nil { + err = fmt.Errorf("failed to create task runner: %v", err) + svc.Errorf("execute task error: %v", err) + return err + } + + // add runner to pool + svc.addRunner(taskId, r) + + // Ensure cleanup always happens + defer func() { + if rec := recover(); rec != nil { + svc.Errorf("task[%s] panic recovered: %v", taskId.Hex(), rec) + } + // Always cleanup runner from pool and stream + svc.deleteRunner(taskId) + svc.streamManager.RemoveTaskStream(taskId) + }() + + // Add task to stream manager for cancellation support + if err := svc.streamManager.AddTaskStream(r.GetTaskId()); err != nil { + svc.Warnf("failed to add task[%s] to stream manager: %v", r.GetTaskId().Hex(), err) + svc.Warnf("task[%s] will not be able to receive cancellation messages", r.GetTaskId().Hex()) + } else { + svc.Debugf("task[%s] added to stream manager for cancellation support", r.GetTaskId().Hex()) + } + + // run task process (blocking) error or finish after task runner ends + if err := r.Run(); err != nil { + switch { + case errors.Is(err, constants.ErrTaskError): + svc.Errorf("task[%s] finished with error: %v", r.GetTaskId().Hex(), err) + case errors.Is(err, constants.ErrTaskCancelled): + svc.Infof("task[%s] cancelled", r.GetTaskId().Hex()) + default: + svc.Errorf("task[%s] finished with unknown error: %v", r.GetTaskId().Hex(), err) + } + } else { + svc.Infof("task[%s] finished successfully", r.GetTaskId().Hex()) + } + + return err +} + +// subscribeTask attempts to subscribe to task stream +func (svc *Service) subscribeTask(taskId primitive.ObjectID) (stream grpc.TaskService_SubscribeClient, err error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + req := &grpc.TaskServiceSubscribeRequest{ + TaskId: taskId.Hex(), + } + taskClient, err := svc.c.GetTaskClient() + if err != nil { + return nil, fmt.Errorf("failed to get task client: %v", err) + } + stream, err = taskClient.Subscribe(ctx, req) + if err != nil { + svc.Errorf("failed to subscribe task[%s]: %v", taskId.Hex(), err) + return nil, err + } + return stream, nil +} + +func (svc *Service) processStreamMessage(taskId primitive.ObjectID, msg *grpc.TaskServiceSubscribeResponse) { + switch msg.Code { + case grpc.TaskServiceSubscribeCode_CANCEL: + svc.Infof("task[%s] received cancel signal", taskId.Hex()) + // Handle cancel synchronously to avoid goroutine accumulation + svc.handleCancel(msg, taskId) + default: + svc.Debugf("task[%s] received unknown stream message code: %v", taskId.Hex(), msg.Code) + } +} + +func (svc *Service) handleCancel(msg *grpc.TaskServiceSubscribeResponse, taskId primitive.ObjectID) { + // validate task id + if msg.TaskId != taskId.Hex() { + svc.Errorf("task[%s] received cancel signal for another task[%s]", taskId.Hex(), msg.TaskId) + return + } + + // cancel task + err := svc.cancelTask(taskId, msg.Force) + if err != nil { + svc.Errorf("task[%s] failed to cancel: %v", taskId.Hex(), err) + return + } + svc.Infof("task[%s] cancelled", taskId.Hex()) + + // set task status as "cancelled" + t, err := svc.GetTaskById(taskId) + if err != nil { + svc.Errorf("task[%s] failed to get task: %v", taskId.Hex(), err) + return + } + t.Status = constants.TaskStatusCancelled + err = svc.UpdateTask(t) + if err != nil { + svc.Errorf("task[%s] failed to update task: %v", taskId.Hex(), err) + } +} + +func (svc *Service) cancelTask(taskId primitive.ObjectID, force bool) (err error) { + r, err := svc.getRunner(taskId) + if err != nil { + // Runner not found, task might already be finished + svc.Warnf("runner not found for task[%s]: %v", taskId.Hex(), err) + return nil + } + + // Attempt cancellation with timeout + cancelCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) + defer cancelFunc() + + cancelDone := make(chan error, 1) + go func() { + cancelDone <- r.Cancel(force) + }() + + select { + case err = <-cancelDone: + if err != nil { + svc.Errorf("failed to cancel task[%s]: %v", taskId.Hex(), err) + // If cancellation failed and force is not set, try force cancellation + if !force { + svc.Warnf("escalating to force cancellation for task[%s]", taskId.Hex()) + return svc.cancelTask(taskId, true) + } + return err + } + svc.Infof("task[%s] cancelled successfully", taskId.Hex()) + case <-cancelCtx.Done(): + svc.Errorf("timeout cancelling task[%s], removing runner from pool", taskId.Hex()) + // Remove runner from pool to prevent further issues + svc.runners.Delete(taskId) + return fmt.Errorf("task cancellation timeout") + } + + return nil +} + +// stopAllRunners gracefully stops all running tasks +func (svc *Service) stopAllRunners() { + svc.Infof("Stopping all running tasks...") + + var runnerIds []primitive.ObjectID + + // Collect all runner IDs + svc.runners.Range(func(key, value interface{}) bool { + if taskId, ok := key.(primitive.ObjectID); ok { + runnerIds = append(runnerIds, taskId) + } + return true + }) + + // Cancel all runners with bounded concurrency to prevent goroutine explosion + const maxConcurrentCancellations = 10 + var wg sync.WaitGroup + semaphore := make(chan struct{}, maxConcurrentCancellations) + + for _, taskId := range runnerIds { + wg.Add(1) + + // Acquire semaphore to limit concurrent cancellations + semaphore <- struct{}{} + + go func(tid primitive.ObjectID) { + defer func() { + <-semaphore // Release semaphore + wg.Done() + if r := recover(); r != nil { + svc.Errorf("stopAllRunners panic for task[%s]: %v", tid.Hex(), r) + } + }() + + if err := svc.cancelTask(tid, false); err != nil { + svc.Errorf("failed to cancel task[%s]: %v", tid.Hex(), err) + // Force cancel after timeout + time.Sleep(5 * time.Second) + _ = svc.cancelTask(tid, true) + } + }(taskId) + } + + // Wait for all cancellations with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + svc.Infof("All tasks stopped gracefully") + case <-time.After(30 * time.Second): + svc.Warnf("Some tasks did not stop within timeout") + } +} + +func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) { + svc.Debugf("get runner: taskId[%v]", taskId) + v, ok := svc.runners.Load(taskId) + if !ok { + err = fmt.Errorf("task[%s] not exists", taskId.Hex()) + svc.Errorf("get runner error: %v", err) + return nil, err + } + switch v := v.(type) { + case interfaces.TaskRunner: + r = v + default: + err = fmt.Errorf("invalid type: %T", v) + svc.Errorf("get runner error: %v", err) + return nil, err + } + return r, nil +} + +func (svc *Service) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) { + svc.Debugf("add runner: taskId[%s]", taskId.Hex()) + svc.runners.Store(taskId, r) +} + +func (svc *Service) deleteRunner(taskId primitive.ObjectID) { + svc.Debugf("delete runner: taskId[%v]", taskId) + svc.runners.Delete(taskId) +} diff --git a/core/task/handler/stream_manager.go b/core/task/handler/stream_manager.go new file mode 100644 index 00000000..3f7e128f --- /dev/null +++ b/core/task/handler/stream_manager.go @@ -0,0 +1,257 @@ +package handler + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/crawlab-team/crawlab/grpc" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +// StreamManager manages task streams without goroutine leaks +type StreamManager struct { + streams sync.Map // map[primitive.ObjectID]*TaskStream + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + service *Service + messageQueue chan *StreamMessage + maxStreams int +} + +// TaskStream represents a single task's stream +type TaskStream struct { + taskId primitive.ObjectID + stream grpc.TaskService_SubscribeClient + ctx context.Context + cancel context.CancelFunc + lastActive time.Time + mu sync.RWMutex +} + +// StreamMessage represents a message from a stream +type StreamMessage struct { + taskId primitive.ObjectID + msg *grpc.TaskServiceSubscribeResponse + err error +} + +func NewStreamManager(service *Service) *StreamManager { + ctx, cancel := context.WithCancel(context.Background()) + return &StreamManager{ + ctx: ctx, + cancel: cancel, + service: service, + messageQueue: make(chan *StreamMessage, 100), // Buffered channel for messages + maxStreams: 50, // Limit concurrent streams + } +} + +func (sm *StreamManager) Start() { + sm.wg.Add(2) + go sm.messageProcessor() + go sm.streamCleaner() +} + +func (sm *StreamManager) Stop() { + sm.cancel() + close(sm.messageQueue) + + // Close all active streams + sm.streams.Range(func(key, value interface{}) bool { + if ts, ok := value.(*TaskStream); ok { + ts.Close() + } + return true + }) + + sm.wg.Wait() +} + +func (sm *StreamManager) AddTaskStream(taskId primitive.ObjectID) error { + // Check if we're at the stream limit + streamCount := 0 + sm.streams.Range(func(key, value interface{}) bool { + streamCount++ + return true + }) + + if streamCount >= sm.maxStreams { + return fmt.Errorf("stream limit reached (%d)", sm.maxStreams) + } + + // Create new stream + stream, err := sm.service.subscribeTask(taskId) + if err != nil { + return fmt.Errorf("failed to subscribe to task stream: %v", err) + } + + ctx, cancel := context.WithCancel(sm.ctx) + taskStream := &TaskStream{ + taskId: taskId, + stream: stream, + ctx: ctx, + cancel: cancel, + lastActive: time.Now(), + } + + sm.streams.Store(taskId, taskStream) + + // Start listening for messages in a single goroutine per stream + sm.wg.Add(1) + go sm.streamListener(taskStream) + + return nil +} + +func (sm *StreamManager) RemoveTaskStream(taskId primitive.ObjectID) { + if value, ok := sm.streams.LoadAndDelete(taskId); ok { + if ts, ok := value.(*TaskStream); ok { + ts.Close() + } + } +} + +func (sm *StreamManager) streamListener(ts *TaskStream) { + defer sm.wg.Done() + defer func() { + if r := recover(); r != nil { + sm.service.Errorf("stream listener panic for task[%s]: %v", ts.taskId.Hex(), r) + } + ts.Close() + sm.streams.Delete(ts.taskId) + }() + + sm.service.Debugf("stream listener started for task[%s]", ts.taskId.Hex()) + + for { + select { + case <-ts.ctx.Done(): + sm.service.Debugf("stream listener stopped for task[%s]", ts.taskId.Hex()) + return + case <-sm.ctx.Done(): + return + default: + msg, err := ts.stream.Recv() + + if err != nil { + if errors.Is(err, io.EOF) { + sm.service.Debugf("stream EOF for task[%s]", ts.taskId.Hex()) + return + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + sm.service.Errorf("stream error for task[%s]: %v", ts.taskId.Hex(), err) + return + } + + // Update last active time + ts.mu.Lock() + ts.lastActive = time.Now() + ts.mu.Unlock() + + // Send message to processor + select { + case sm.messageQueue <- &StreamMessage{ + taskId: ts.taskId, + msg: msg, + err: nil, + }: + case <-ts.ctx.Done(): + return + case <-sm.ctx.Done(): + return + default: + sm.service.Warnf("message queue full, dropping message for task[%s]", ts.taskId.Hex()) + } + } + } +} + +func (sm *StreamManager) messageProcessor() { + defer sm.wg.Done() + defer func() { + if r := recover(); r != nil { + sm.service.Errorf("message processor panic: %v", r) + } + }() + + sm.service.Debugf("stream message processor started") + + for { + select { + case <-sm.ctx.Done(): + sm.service.Debugf("stream message processor shutting down") + return + case msg, ok := <-sm.messageQueue: + if !ok { + return + } + sm.processMessage(msg) + } + } +} + +func (sm *StreamManager) processMessage(streamMsg *StreamMessage) { + if streamMsg.err != nil { + sm.service.Errorf("stream message error for task[%s]: %v", streamMsg.taskId.Hex(), streamMsg.err) + return + } + + // Process the actual message + sm.service.processStreamMessage(streamMsg.taskId, streamMsg.msg) +} + +func (sm *StreamManager) streamCleaner() { + defer sm.wg.Done() + defer func() { + if r := recover(); r != nil { + sm.service.Errorf("stream cleaner panic: %v", r) + } + }() + + ticker := time.NewTicker(2 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-sm.ctx.Done(): + return + case <-ticker.C: + sm.cleanupInactiveStreams() + } + } +} + +func (sm *StreamManager) cleanupInactiveStreams() { + now := time.Now() + inactiveThreshold := 10 * time.Minute + + sm.streams.Range(func(key, value interface{}) bool { + taskId := key.(primitive.ObjectID) + ts := value.(*TaskStream) + + ts.mu.RLock() + lastActive := ts.lastActive + ts.mu.RUnlock() + + if now.Sub(lastActive) > inactiveThreshold { + sm.service.Debugf("cleaning up inactive stream for task[%s]", taskId.Hex()) + sm.RemoveTaskStream(taskId) + } + + return true + }) +} + +func (ts *TaskStream) Close() { + ts.cancel() + if ts.stream != nil { + _ = ts.stream.CloseSend() + } +} diff --git a/core/task/handler/worker_pool.go b/core/task/handler/worker_pool.go new file mode 100644 index 00000000..84aa7ebd --- /dev/null +++ b/core/task/handler/worker_pool.go @@ -0,0 +1,112 @@ +package handler + +import ( + "context" + "fmt" + "sync" + + "go.mongodb.org/mongo-driver/bson/primitive" +) + +// taskRequest represents a task execution request +type taskRequest struct { + taskId primitive.ObjectID +} + +// TaskWorkerPool manages a bounded pool of workers for task execution +type TaskWorkerPool struct { + workers int + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + taskQueue chan taskRequest + service *Service +} + +func NewTaskWorkerPool(workers int, service *Service) *TaskWorkerPool { + ctx, cancel := context.WithCancel(context.Background()) + // Use a more generous queue size to handle task bursts + // Queue size is workers * 5 to allow for better buffering + queueSize := workers * 5 + if queueSize < 50 { + queueSize = 50 // Minimum queue size + } + + return &TaskWorkerPool{ + workers: workers, + ctx: ctx, + cancel: cancel, + taskQueue: make(chan taskRequest, queueSize), + service: service, + } +} + +func (pool *TaskWorkerPool) Start() { + for i := 0; i < pool.workers; i++ { + pool.wg.Add(1) + go pool.worker(i) + } +} + +func (pool *TaskWorkerPool) Stop() { + pool.cancel() + close(pool.taskQueue) + pool.wg.Wait() +} + +func (pool *TaskWorkerPool) SubmitTask(taskId primitive.ObjectID) error { + req := taskRequest{ + taskId: taskId, + } + + select { + case pool.taskQueue <- req: + pool.service.Debugf("task[%s] queued for parallel execution, queue usage: %d/%d", + taskId.Hex(), len(pool.taskQueue), cap(pool.taskQueue)) + return nil // Return immediately - task will execute in parallel + case <-pool.ctx.Done(): + return fmt.Errorf("worker pool is shutting down") + default: + queueLen := len(pool.taskQueue) + queueCap := cap(pool.taskQueue) + pool.service.Warnf("task queue is full (%d/%d), consider increasing task.workers configuration", + queueLen, queueCap) + return fmt.Errorf("task queue is full (%d/%d), consider increasing task.workers configuration", + queueLen, queueCap) + } +} + +func (pool *TaskWorkerPool) worker(workerID int) { + defer pool.wg.Done() + defer func() { + if r := recover(); r != nil { + pool.service.Errorf("worker %d panic recovered: %v", workerID, r) + } + }() + + pool.service.Debugf("worker %d started", workerID) + + for { + select { + case <-pool.ctx.Done(): + pool.service.Debugf("worker %d shutting down", workerID) + return + case req, ok := <-pool.taskQueue: + if !ok { + pool.service.Debugf("worker %d: task queue closed", workerID) + return + } + + // Execute task asynchronously - each worker handles one task at a time + // but multiple workers can process different tasks simultaneously + pool.service.Debugf("worker %d processing task[%s]", workerID, req.taskId.Hex()) + err := pool.service.executeTask(req.taskId) + if err != nil { + pool.service.Errorf("worker %d failed to execute task[%s]: %v", + workerID, req.taskId.Hex(), err) + } else { + pool.service.Debugf("worker %d completed task[%s]", workerID, req.taskId.Hex()) + } + } + } +}