diff --git a/core/constants/dependency.go b/core/constants/dependency.go index e6457f03..96e67a4b 100644 --- a/core/constants/dependency.go +++ b/core/constants/dependency.go @@ -20,6 +20,8 @@ const ( const ( DependencyFileTypeRequirementsTxt = "requirements.txt" DependencyFileTypePackageJson = "package.json" + DependencyFileTypeGoMod = "go.mod" + DependencyFileTypePomXml = "pom.xml" ) const ( DependencyActionSync = "sync" diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index 6ce4e756..ac5fe462 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -201,7 +201,7 @@ func (r *Runner) GetTaskId() (id primitive.ObjectID) { func (r *Runner) configureCmd() (err error) { var cmdStr string - // customized spider + // command if r.t.Cmd == "" { cmdStr = r.s.Cmd } else { @@ -225,20 +225,31 @@ func (r *Runner) configureCmd() (err error) { // set working directory r.cmd.Dir = r.cwd - // Configure pipes for IPC + // Configure pipes for IPC and logs r.stdinPipe, err = r.cmd.StdinPipe() if err != nil { r.Errorf("error creating stdin pipe: %v", err) return err } - // Add stdout pipe for IPC + // Add stdout pipe for IPC and logs r.stdoutPipe, err = r.cmd.StdoutPipe() if err != nil { r.Errorf("error creating stdout pipe: %v", err) return err } + // Add stderr pipe for error logs + stderrPipe, err := r.cmd.StderrPipe() + if err != nil { + r.Errorf("error creating stderr pipe: %v", err) + return err + } + + // Create buffered readers + r.scannerStdout = bufio.NewReader(r.stdoutPipe) + r.scannerStderr = bufio.NewReader(stderrPipe) + // Initialize IPC channel r.ipcChan = make(chan entity.IPCMessage) @@ -272,24 +283,51 @@ func (r *Runner) startHealthCheck() { } } +// configureNodePath sets up the Node.js environment paths, handling both nvm and default installations +func (r *Runner) configureNodePath() { + // Get user's home directory + home, err := os.UserHomeDir() + if err != nil { + r.Errorf("error getting user home directory: %v", err) + home = "/root" // fallback to root if can't get home dir + } + + // Configure nvm-based Node.js paths + envPath := os.Getenv("PATH") + nvmPath := filepath.Join(home, ".nvm/versions/node") + + // Check if nvm is being used + if utils.Exists(nvmPath) { + // Get the current node version from NVM + currentVersion := os.Getenv("NVM_BIN") + if currentVersion != "" { + nodePath := filepath.Dir(currentVersion) + "/lib/node_modules" + if !strings.Contains(envPath, nodePath) { + _ = os.Setenv("PATH", nodePath+":"+envPath) + } + _ = os.Setenv("NODE_PATH", nodePath) + } + } else { + // Fallback to default global node_modules path + nodePath := "/usr/lib/node_modules" + if !strings.Contains(envPath, nodePath) { + _ = os.Setenv("PATH", nodePath+":"+envPath) + } + _ = os.Setenv("NODE_PATH", nodePath) + } +} + // configureEnv sets up the environment variables for the task process, including: // - Node.js paths // - Crawlab-specific variables // - Global environment variables from the system func (r *Runner) configureEnv() { - // By default, add Node.js's global node_modules to PATH - envPath := os.Getenv("PATH") - nodePath := "/usr/lib/node_modules" - if !strings.Contains(envPath, nodePath) { - _ = os.Setenv("PATH", nodePath+":"+envPath) - } - _ = os.Setenv("NODE_PATH", nodePath) + // Configure Node.js paths + r.configureNodePath() // Default envs r.cmd.Env = os.Environ() r.cmd.Env = append(r.cmd.Env, "CRAWLAB_TASK_ID="+r.tid.Hex()) - r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_ADDRESS="+utils.GetGrpcAddress()) - r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_AUTH_KEY="+utils.GetAuthKey()) r.cmd.Env = append(r.cmd.Env, "PYENV_ROOT="+utils.PyenvRoot) r.cmd.Env = append(r.cmd.Env, "PATH="+os.Getenv("PATH")+":"+utils.PyenvRoot+"/shims:"+utils.PyenvRoot+"/bin") @@ -341,12 +379,16 @@ func (r *Runner) createHttpRequest(method, path string) (*http.Response, error) // 3. Downloads new/modified files // 4. Deletes files that no longer exist on master func (r *Runner) syncFiles() (err error) { + r.Infof("starting file synchronization for spider: %s", r.s.Id.Hex()) + workingDir := "" if !r.s.GitId.IsZero() { workingDir = r.s.GitRootPath + r.Debugf("using git root path: %s", workingDir) } // get file list from master + r.Infof("fetching file list from master node") resp, err := r.createHttpRequest("GET", "/scan?path="+workingDir) if err != nil { r.Errorf("error getting file list from master: %v", err) @@ -441,11 +483,14 @@ func (r *Runner) syncFiles() (err error) { // wait for all files and directories to be synchronized wg.Wait() + r.Infof("file synchronization completed successfully") return err } // downloadFile downloads a file from the master node and saves it to the local file system func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsFileInfo) error { + r.Debugf("downloading file: %s -> %s", path, filePath) + resp, err := r.createHttpRequest("GET", "/download?path="+path) if err != nil { r.Errorf("error getting file response: %v", err) @@ -480,6 +525,8 @@ func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsF r.Errorf("error copying file: %v", err) return err } + + r.Debugf("successfully downloaded file: %s (size: %d bytes)", path, fileInfo.FileSize) return nil } @@ -555,12 +602,28 @@ func (r *Runner) wait() (err error) { return err } + // log according to status + switch status { + case constants.TaskStatusFinished: + r.Infof("task[%s] finished", r.tid.Hex()) + case constants.TaskStatusCancelled: + r.Infof("task[%s] cancelled", r.tid.Hex()) + case constants.TaskStatusError: + r.Errorf("task[%s] error: %v", r.tid.Hex(), err) + default: + r.Errorf("invalid task status: %s", status) + } + return nil } // updateTask updates the task status and related statistics in the database // If running on a worker node, updates are sent to the master func (r *Runner) updateTask(status string, e error) (err error) { + if status != "" { + r.Debugf("updating task status to: %s", status) + } + if r.t != nil && status != "" { // update task status r.t.Status = status @@ -588,8 +651,10 @@ func (r *Runner) updateTask(status string, e error) (err error) { } // get task + r.Debugf("fetching updated task from database") r.t, err = r.svc.GetTaskById(r.tid) if err != nil { + r.Errorf("failed to get updated task: %v", err) return err } @@ -628,11 +693,18 @@ func (r *Runner) writeLogLines(lines []string) { // - For running tasks: sets start time and wait duration // - For completed tasks: sets end time and calculates durations func (r *Runner) _updateTaskStat(status string) { + if status != "" { + r.Debugf("updating task statistics for status: %s", status) + } + ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid) if err != nil { r.Errorf("error getting task stat: %v", err) return } + + r.Debugf("current task statistics - wait_duration: %dms, runtime_duration: %dms", ts.WaitDuration, ts.RuntimeDuration) + switch status { case constants.TaskStatusPending: // do nothing @@ -772,45 +844,68 @@ func (r *Runner) SetIPCHandler(handler func(entity.IPCMessage)) { // startIPCReader continuously reads IPC messages from the child process's stdout // Messages are parsed and either handled by the IPC handler or written to logs func (r *Runner) startIPCReader() { - r.wg.Add(1) - defer r.wg.Done() + r.wg.Add(2) // Add 2 to wait group for both stdout and stderr readers - scanner := bufio.NewScanner(r.stdoutPipe) + // Start stdout reader + go func() { + defer r.wg.Done() + r.readOutput(r.scannerStdout, true) // true for stdout + }() + + // Start stderr reader + go func() { + defer r.wg.Done() + r.readOutput(r.scannerStderr, false) // false for stderr + }() +} + +func (r *Runner) readOutput(reader *bufio.Reader, isStdout bool) { for { select { case <-r.ctx.Done(): return default: - if !scanner.Scan() { + line, err := reader.ReadString('\n') + if err != nil { + if err != io.EOF { + r.Errorf("error reading from %s: %v", + map[bool]string{true: "stdout", false: "stderr"}[isStdout], + err) + } return } - line := scanner.Text() - var ipcMsg entity.IPCMessage - err := json.Unmarshal([]byte(line), &ipcMsg) - if err == nil && ipcMsg.IPC { - // Only handle as IPC if it's valid JSON AND has IPC flag set - if r.ipcHandler != nil { - r.ipcHandler(ipcMsg) - } else { - // Default handler (insert data) - if ipcMsg.Type == "" || ipcMsg.Type == constants.IPCMessageData { - r.handleIPCInsertDataMessage(ipcMsg) + // Trim the line + line = strings.TrimRight(line, "\n\r") + + // For stdout, try to parse as IPC message first + if isStdout { + var ipcMsg entity.IPCMessage + if err := json.Unmarshal([]byte(line), &ipcMsg); err == nil && ipcMsg.IPC { + if r.ipcHandler != nil { + r.ipcHandler(ipcMsg) } else { - r.Warnf("no IPC handler set for message: %v", ipcMsg) + // Default handler (insert data) + if ipcMsg.Type == "" || ipcMsg.Type == constants.IPCMessageData { + r.handleIPCInsertDataMessage(ipcMsg) + } else { + r.Warnf("no IPC handler set for message: %v", ipcMsg) + } } + continue } - } else { - // Everything else is treated as logs - r.writeLogLines([]string{line}) } + + // If not an IPC message or from stderr, treat as log + r.writeLogLines([]string{line}) } } } // handleIPCInsertDataMessage converts the IPC message payload to JSON and sends it to the master node func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { - // Validate message + r.Debugf("processing IPC data message") + if ipcMsg.Payload == nil { r.Errorf("empty payload in IPC message") return @@ -887,6 +982,8 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { return } } + + r.Infof("successfully sent %d records to master node", len(records)) } // newTaskRunner creates a new task runner instance with the specified task ID @@ -939,3 +1036,52 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) { return r, errs.ErrorOrNil() } + +// logInternally sends internal runner logs to the same logging system as the task +func (r *Runner) logInternally(level string, message string) { + // Format the internal log with a prefix + timestamp := time.Now().Local().Format("2006-01-02 15:04:05") + + // Pad level + level = fmt.Sprintf("%-5s", level) + + // Format the log message + internalLog := fmt.Sprintf("%s [%s] [%s] %s", level, timestamp, "Crawlab", message) + + // Send to the same log system as task logs + if r.conn != nil { + r.writeLogLines([]string{internalLog}) + } + + // Also log through the standard logger + switch level { + case "ERROR": + r.Logger.Error(message) + case "WARN": + r.Logger.Warn(message) + case "INFO": + r.Logger.Info(message) + case "DEBUG": + r.Logger.Debug(message) + } +} + +func (r *Runner) Errorf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + r.logInternally("ERROR", msg) +} + +func (r *Runner) Warnf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + r.logInternally("WARN", msg) +} + +func (r *Runner) Infof(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + r.logInternally("INFO", msg) +} + +func (r *Runner) Debugf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + r.logInternally("DEBUG", msg) +} diff --git a/core/task/handler/service.go b/core/task/handler/service.go index 3ee92af9..0230ff59 100644 --- a/core/task/handler/service.go +++ b/core/task/handler/service.go @@ -223,7 +223,7 @@ func (svc *Service) getRunnerCount() (count int) { } func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) { - svc.Debugf("[TaskHandlerService] getRunner: taskId[%v]", taskId) + svc.Debugf("get runner: taskId[%v]", taskId) v, ok := svc.runners.Load(taskId) if !ok { err = fmt.Errorf("task[%s] not exists", taskId.Hex()) @@ -242,12 +242,12 @@ func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunne } func (svc *Service) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) { - svc.Debugf("[TaskHandlerService] addRunner: taskId[%s]", taskId.Hex()) + svc.Debugf("add runner: taskId[%s]", taskId.Hex()) svc.runners.Store(taskId, r) } func (svc *Service) deleteRunner(taskId primitive.ObjectID) { - svc.Debugf("[TaskHandlerService] deleteRunner: taskId[%v]", taskId) + svc.Debugf("delete runner: taskId[%v]", taskId) svc.runners.Delete(taskId) } diff --git a/core/utils/process.go b/core/utils/process.go index 85b19aef..6667aa95 100644 --- a/core/utils/process.go +++ b/core/utils/process.go @@ -2,18 +2,21 @@ package utils import ( "errors" - "github.com/shirou/gopsutil/process" "os/exec" "runtime" - "strings" + + "github.com/shirou/gopsutil/process" ) func BuildCmd(cmdStr string) (cmd *exec.Cmd, err error) { if cmdStr == "" { return nil, errors.New("command string is empty") } - args := strings.Split(cmdStr, " ") - return exec.Command(args[0], args[1:]...), nil + + if runtime.GOOS == "windows" { + return exec.Command("cmd", "/C", cmdStr), nil + } + return exec.Command("sh", "-c", cmdStr), nil } func ProcessIdExists(pid int) (exists bool) {