Merge branch 'develop' into develop

This commit is contained in:
暗音
2019-10-28 11:06:13 +08:00
committed by GitHub
18 changed files with 364 additions and 228 deletions

View File

@@ -100,91 +100,85 @@ func AssignTask(task model.Task) error {
return nil
}
// 执行shell命令
func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) {
log.Infof("cwd: " + cwd)
log.Infof("cmd: " + cmdStr)
// 设置环境变量
func SetEnv(cmd *exec.Cmd, envs []model.Env, taskId string, dataCol string) *exec.Cmd {
// 默认环境变量
cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+taskId)
cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+dataCol)
cmd.Env = append(cmd.Env, "PYTHONUNBUFFERED=0")
cmd.Env = append(cmd.Env, "PYTHONIOENCODING=utf-8")
cmd.Env = append(cmd.Env, "TZ=Asia/Shanghai")
// 生成执行命令
var cmd *exec.Cmd
if runtime.GOOS == constants.Windows {
cmd = exec.Command("cmd", "/C", cmdStr)
} else {
cmd = exec.Command("sh", "-c", cmdStr)
}
// 工作目录
cmd.Dir = cwd
// 指定stdout, stderr日志位置
fLog, err := os.Create(t.LogPath)
if err != nil {
HandleTaskError(t, err)
return err
}
defer utils.Close(fLog)
cmd.Stdout = fLog
cmd.Stderr = fLog
// 添加默认环境变量
cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+t.Id)
cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+s.Col)
// 添加任务环境变量
for _, env := range s.Envs {
//任务环境变量
for _, env := range envs {
cmd.Env = append(cmd.Env, env.Name+"="+env.Value)
}
// 起一个goroutine来监控进程
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
go func() {
// 传入信号,此处阻塞
signal := <-ch
log.Infof("cancel process signal: %s", signal)
if signal == constants.TaskCancel && cmd.Process != nil {
// 取消进程
if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
log.Errorf("process kill error: %s", err.Error())
debug.PrintStack()
}
t.Status = constants.StatusCancelled
} else {
// 保存任务
t.Status = constants.StatusFinished
}
t.FinishTs = time.Now()
t.Error = "user kill the process ..."
if err := t.Save(); err != nil {
log.Infof("save task error: %s", err.Error())
// TODO 全局环境变量
return cmd
}
func SetLogConfig(cmd *exec.Cmd, path string) error {
fLog, err := os.Create(path)
if err != nil {
log.Errorf("create task log file error: %s", path)
debug.PrintStack()
return err
}
cmd.Stdout = fLog
cmd.Stderr = fLog
return nil
}
func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) {
// 传入信号,此处阻塞
signal := <-ch
log.Infof("process received signal: %s", signal)
if signal == constants.TaskCancel && cmd.Process != nil {
// 取消进程
if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
log.Errorf("process kill error: %s", err.Error())
debug.PrintStack()
return
t.Error = "kill process error: " + err.Error()
t.Status = constants.StatusError
} else {
t.Error = "user kill the process ..."
t.Status = constants.StatusCancelled
}
}()
} else {
// 保存任务
t.Status = constants.StatusFinished
}
// 在选择所有节点执行的时候,实际就是随机一个节点执行的,
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
t.FinishTs = time.Now()
_ = t.Save()
}
// 异步启动进程
func StartTaskProcess(cmd *exec.Cmd, t model.Task) error {
if err := cmd.Start(); err != nil {
log.Errorf("start spider error:{}", err.Error())
debug.PrintStack()
return err
}
// 保存pid到task
t.Pid = cmd.Process.Pid
if err := t.Save(); err != nil {
log.Errorf("save task pid error: %s", err.Error())
debug.PrintStack()
t.Error = "start task error: " + err.Error()
t.Status = constants.StatusError
t.FinishTs = time.Now()
_ = t.Save()
return err
}
// 同步等待进程完成
return nil
}
func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error {
if err := cmd.Wait(); err != nil {
log.Errorf("wait process finish error: %s", err.Error())
debug.PrintStack()
if exitError, ok := err.(*exec.ExitError); ok {
exitCode := exitError.ExitCode()
log.Errorf("exit error, exit code: %d", exitCode)
// 非kill 的错误类型
if exitCode != -1 {
// 非手动kill保存为错误状态
@@ -194,6 +188,52 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
_ = t.Save()
}
}
return err
}
return nil
}
// 执行shell命令
func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) {
log.Infof("cwd: %s", cwd)
log.Infof("cmd: %s", cmdStr)
// 生成执行命令
var cmd *exec.Cmd
if runtime.GOOS == constants.Windows {
cmd = exec.Command("cmd", "/C", cmdStr)
} else {
cmd = exec.Command("")
cmd = exec.Command("sh", "-c", cmdStr)
}
// 工作目录
cmd.Dir = cwd
// 日志配置
if err := SetLogConfig(cmd, t.LogPath); err != nil {
return err
}
// 环境变量配置
cmd = SetEnv(cmd, s.Envs, t.Id, s.Col)
// 起一个goroutine来监控进程
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
go FinishOrCancelTask(ch, cmd, t)
// kill的时候可以kill所有的子进程
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// 启动进程
if err := StartTaskProcess(cmd, t); err != nil {
return err
}
// 同步等待进程完成
if err := WaitTaskProcess(cmd, t); err != nil {
return err
}
ch <- constants.TaskFinish
@@ -208,6 +248,7 @@ func MakeLogDir(t model.Task) (fileDir string, err error) {
// 如果日志目录不存在,生成该目录
if !utils.Exists(fileDir) {
if err := os.MkdirAll(fileDir, 0777); err != nil {
log.Errorf("execute task, make log dir error: %s", err.Error())
debug.PrintStack()
return "", err
}
@@ -272,82 +313,55 @@ func ExecuteTask(id int) {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
log.Errorf("execute task get current node error: %s", err.Error())
debug.PrintStack()
return
}
// 公共队列
queuePub := "tasks:public"
// 节点队列
queueCur := "tasks:node:" + node.Id.Hex()
// 节点队列任务
var msg string
msg, err = database.RedisClient.LPop(queueCur)
if err != nil {
if msg == "" {
// 节点队列没有任务,获取公共队列任务
msg, err = database.RedisClient.LPop(queuePub)
if err != nil {
if msg == "" {
// 公共队列没有任务
log.Debugf(GetWorkerPrefix(id) + "没有任务...")
return
} else {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
}
} else {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
if msg, err = database.RedisClient.LPop(queueCur); err != nil {
// 节点队列没有任务,获取公共队列任务
queuePub := "tasks:public"
if msg, err = database.RedisClient.LPop(queuePub); err != nil {
}
}
if msg == "" {
return
}
// 反序列化
tMsg := TaskMessage{}
if err := json.Unmarshal([]byte(msg), &tMsg); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
log.Errorf("json string to struct error: %s", err.Error())
return
}
// 获取任务
t, err := model.GetTask(tMsg.Id)
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
log.Errorf("execute task, get task error: %s", err.Error())
return
}
// 获取爬虫
spider, err := t.GetSpider()
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
log.Errorf("execute task, get spider error: %s", err.Error())
return
}
// 创建日志目录
fileDir, err := MakeLogDir(t)
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
var fileDir string
if fileDir, err = MakeLogDir(t); err != nil {
return
}
// 获取日志文件路径
t.LogPath = GetLogFilePaths(fileDir)
// 创建日志目录文件夹
fileStdoutDir := filepath.Dir(t.LogPath)
if !utils.Exists(fileStdoutDir) {
if err := os.MkdirAll(fileStdoutDir, os.ModePerm); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
}
// 工作目录
cwd := filepath.Join(
viper.GetString("spider.path"),