diff --git a/backend/services/node.go b/backend/services/node.go index d3409ed2..8b1f998a 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -100,13 +100,13 @@ func handleNodeInfo(key string, data Data) { defer s.Close() // 同个key可能因为并发,被注册多次 - //var nodes []model.Node - //_ = c.Find(bson.M{"key": key}).All(&nodes) - //if nodes != nil && len(nodes) > 1 { - // for _, node := range nodes { - // _ = c.RemoveId(node.Id) - // } - //} + var nodes []model.Node + _ = c.Find(bson.M{"key": key}).All(&nodes) + if nodes != nil && len(nodes) > 1 { + for _, node := range nodes { + _ = c.RemoveId(node.Id) + } + } var node model.Node if err := c.Find(bson.M{"key": key}).One(&node); err != nil { diff --git a/backend/services/task.go b/backend/services/task.go index 12f0330e..b79dbe7a 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -100,10 +100,104 @@ func AssignTask(task model.Task) error { return nil } +// 设置环境变量 +func SetEnv(cmd *exec.Cmd, envs []model.Env, taskId string, dataCol string) *exec.Cmd { + // 默认环境变量 + cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+taskId) + cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+dataCol) + cmd.Env = append(cmd.Env, "PYTHONUNBUFFERED=0") + cmd.Env = append(cmd.Env, "PYTHONIOENCODING=utf-8") + + //任务环境变量 + for _, env := range envs { + cmd.Env = append(cmd.Env, env.Name+"="+env.Value) + } + + // TODO 全局环境变量 + return cmd +} + +func SetLogConfig(cmd *exec.Cmd, path string) error { + fLog, err := os.Create(path) + if err != nil { + log.Errorf("create task log file error: %s", path) + debug.PrintStack() + return err + } + defer fLog.Close() + cmd.Stdout = fLog + cmd.Stderr = fLog + return nil +} + +func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) { + // 传入信号,此处阻塞 + signal := <-ch + log.Infof("process received signal: %s", signal) + + if signal == constants.TaskCancel && cmd.Process != nil { + // 取消进程 + if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { + log.Errorf("process kill error: %s", err.Error()) + debug.PrintStack() + + t.Error = "kill process error: " + err.Error() + t.Status = constants.StatusError + } else { + t.Error = "user kill the process ..." + t.Status = constants.StatusCancelled + } + } else { + // 保存任务 + t.Status = constants.StatusFinished + } + + t.FinishTs = time.Now() + _ = t.Save() +} + +func StartTaskProcess(cmd *exec.Cmd, t model.Task) error { + if err := cmd.Start(); err != nil { + log.Errorf("start spider error:{}", err.Error()) + debug.PrintStack() + + t.Error = "start task error: " + err.Error() + t.Status = constants.StatusError + t.FinishTs = time.Now() + _ = t.Save() + return err + } + return nil +} + +func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error { + if err := cmd.Wait(); err != nil { + log.Errorf("wait process finish error: %s", err.Error()) + debug.PrintStack() + + if exitError, ok := err.(*exec.ExitError); ok { + exitCode := exitError.ExitCode() + log.Errorf("exit error, exit code: %d", exitCode) + + // 非kill 的错误类型 + if exitCode != -1 { + // 非手动kill保存为错误状态 + t.Error = err.Error() + t.FinishTs = time.Now() + t.Status = constants.StatusError + _ = t.Save() + } + } + + return err + } + return nil +} + // 执行shell命令 func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) { - log.Infof("cwd: " + cwd) - log.Infof("cmd: " + cmdStr) + log.Infof("cwd: %s", cwd) + log.Infof("cmd: %s", cmdStr) // 生成执行命令 var cmd *exec.Cmd @@ -116,84 +210,29 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e // 工作目录 cmd.Dir = cwd - // 指定stdout, stderr日志位置 - fLog, err := os.Create(t.LogPath) - if err != nil { - HandleTaskError(t, err) + // 日志配置 + if err := SetLogConfig(cmd, t.LogPath); err != nil { return err } - defer fLog.Close() - cmd.Stdout = fLog - cmd.Stderr = fLog - // 添加默认环境变量 - cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+t.Id) - cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+s.Col) - - // 添加任务环境变量 - for _, env := range s.Envs { - cmd.Env = append(cmd.Env, env.Name+"="+env.Value) - } + // 环境变量配置 + cmd = SetEnv(cmd, s.Envs, t.Id, s.Col) // 起一个goroutine来监控进程 ch := utils.TaskExecChanMap.ChanBlocked(t.Id) - go func() { - // 传入信号,此处阻塞 - signal := <-ch - log.Infof("cancel process signal: %s", signal) - if signal == constants.TaskCancel && cmd.Process != nil { - // 取消进程 - if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { - log.Errorf("process kill error: %s", err.Error()) - debug.PrintStack() - } - t.Status = constants.StatusCancelled - t.Error = "user kill the process ..." - } else { - // 保存任务 - t.Status = constants.StatusFinished - } - t.FinishTs = time.Now() - if err := t.Save(); err != nil { - log.Infof("save task error: %s", err.Error()) - debug.PrintStack() - return - } - }() - // 在选择所有节点执行的时候,实际就是随机一个节点执行的, + go FinishOrCancelTask(ch, cmd, t) + + // kill的时候,可以kill所有的子进程 cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} - // 异步启动进程 - if err := cmd.Start(); err != nil { - log.Errorf("start spider error:{}", err.Error()) - debug.PrintStack() + // 启动进程 + if err := StartTaskProcess(cmd, t); err != nil { return err } - // 保存pid到task - t.Pid = cmd.Process.Pid - if err := t.Save(); err != nil { - log.Errorf("save task pid error: %s", err.Error()) - debug.PrintStack() - return err - } // 同步等待进程完成 - if err := cmd.Wait(); err != nil { - log.Errorf("wait process finish error: %s", err.Error()) - debug.PrintStack() - if exitError, ok := err.(*exec.ExitError); ok { - exitCode := exitError.ExitCode() - log.Errorf("exit error, exit code: %d", exitCode) - // 非kill 的错误类型 - if exitCode != -1 { - // 非手动kill保存为错误状态 - t.Error = err.Error() - t.FinishTs = time.Now() - t.Status = constants.StatusError - _ = t.Save() - } - } + if err := WaitTaskProcess(cmd, t); err != nil { return err } ch <- constants.TaskFinish