Files
crawlab/backend/services/task.go
2019-10-08 20:26:08 +08:00

549 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"errors"
"github.com/apex/log"
"github.com/spf13/viper"
"os"
"os/exec"
"path/filepath"
"runtime"
"runtime/debug"
"strconv"
"sync"
"syscall"
"time"
)
var Exec *Executor
// 任务执行锁
//Added by cloud: 2019/09/04,solve data race
var LockList sync.Map
// 任务消息
type TaskMessage struct {
Id string
Cmd string
}
// 序列化任务消息
func (m *TaskMessage) ToString() (string, error) {
data, err := json.Marshal(&m)
if err != nil {
return "", err
}
return utils.BytesToString(data), err
}
// 任务执行器
type Executor struct {
Cron *cron.Cron
}
// 启动任务执行器
func (ex *Executor) Start() error {
// 启动cron服务
ex.Cron.Start()
// 加入执行器到定时任务
spec := "0/1 * * * * *" // 每秒执行一次
for i := 0; i < viper.GetInt("task.workers"); i++ {
// WorkerID
id := i
// 初始化任务锁
LockList.Store(id, false)
// 加入定时任务
_, err := ex.Cron.AddFunc(spec, GetExecuteTaskFunc(id))
if err != nil {
return err
}
}
return nil
}
// 派发任务
func AssignTask(task model.Task) error {
// 生成任务信息
msg := TaskMessage{
Id: task.Id,
}
// 序列化
msgStr, err := msg.ToString()
if err != nil {
return err
}
// 队列名称
var queue string
if utils.IsObjectIdNull(task.NodeId) {
queue = "tasks:public"
} else {
queue = "tasks:node:" + task.NodeId.Hex()
}
// 任务入队
if err := database.RedisClient.RPush(queue, msgStr); err != nil {
return err
}
return nil
}
// 执行shell命令
func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) {
log.Infof("cwd: " + cwd)
log.Infof("cmd: " + cmdStr)
// 生成执行命令
var cmd *exec.Cmd
if runtime.GOOS == constants.Windows {
cmd = exec.Command("cmd", "/C", cmdStr)
} else {
cmd = exec.Command("sh", "-c", cmdStr)
}
// 工作目录
cmd.Dir = cwd
// 指定stdout, stderr日志位置
fLog, err := os.Create(t.LogPath)
if err != nil {
HandleTaskError(t, err)
return err
}
defer fLog.Close()
cmd.Stdout = fLog
cmd.Stderr = fLog
// 添加默认环境变量
cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+t.Id)
cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+s.Col)
// 添加任务环境变量
for _, env := range s.Envs {
cmd.Env = append(cmd.Env, env.Name+"="+env.Value)
}
// 起一个goroutine来监控进程
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
go func() {
// 传入信号,此处阻塞
signal := <-ch
log.Infof("cancel process signal: %s", signal)
if signal == constants.TaskCancel && cmd.Process != nil {
// 取消进程
if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
log.Errorf("process kill error: %s", err.Error())
debug.PrintStack()
}
t.Status = constants.StatusCancelled
} else {
// 保存任务
t.Status = constants.StatusFinished
}
t.FinishTs = time.Now()
t.Error = "user kill the process ..."
if err := t.Save(); err != nil {
log.Infof("save task error: %s", err.Error())
debug.PrintStack()
return
}
}()
// 在选择所有节点执行的时候,实际就是随机一个节点执行的,
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// 异步启动进程
if err := cmd.Start(); err != nil {
log.Errorf("start spider error:{}", err.Error())
debug.PrintStack()
return err
}
// 保存pid到task
t.Pid = cmd.Process.Pid
if err := t.Save(); err != nil {
log.Errorf("save task pid error: %s", err.Error())
debug.PrintStack()
return err
}
// 同步等待进程完成
if err := cmd.Wait(); err != nil {
log.Errorf("wait process finish error: %s", err.Error())
debug.PrintStack()
if exitError, ok := err.(*exec.ExitError); ok {
exitCode := exitError.ExitCode()
log.Errorf("exit error, exit code: %d", exitCode)
// 非kill 的错误类型
if exitCode != -1 {
// 非手动kill保存为错误状态
t.Error = err.Error()
t.FinishTs = time.Now()
t.Status = constants.StatusError
_ = t.Save()
}
}
return err
}
ch <- constants.TaskFinish
return nil
}
// 生成日志目录
func MakeLogDir(t model.Task) (fileDir string, err error) {
// 日志目录
fileDir = filepath.Join(viper.GetString("log.path"), t.SpiderId.Hex())
// 如果日志目录不存在,生成该目录
if !utils.Exists(fileDir) {
if err := os.MkdirAll(fileDir, 0777); err != nil {
debug.PrintStack()
return "", err
}
}
return fileDir, nil
}
// 获取日志文件路径
func GetLogFilePaths(fileDir string) (filePath string) {
// 时间戳
ts := time.Now()
tsStr := ts.Format("20060102150405")
// stdout日志文件
filePath = filepath.Join(fileDir, tsStr+".log")
return filePath
}
// 生成执行任务方法
func GetExecuteTaskFunc(id int) func() {
return func() {
ExecuteTask(id)
}
}
func GetWorkerPrefix(id int) string {
return "[Worker " + strconv.Itoa(id) + "] "
}
// 统计任务结果数
func SaveTaskResultCount(id string) func() {
return func() {
if err := model.UpdateTaskResultCount(id); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
}
}
// 执行任务
func ExecuteTask(id int) {
if flag, _ := LockList.Load(id); flag.(bool) {
log.Debugf(GetWorkerPrefix(id) + "正在执行任务...")
return
}
// 上锁
LockList.Store(id, true)
// 解锁(延迟执行)
defer func() {
LockList.Delete(id)
LockList.Store(id, false)
}()
// 开始计时
tic := time.Now()
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
// 公共队列
queuePub := "tasks:public"
// 节点队列
queueCur := "tasks:node:" + node.Id.Hex()
// 节点队列任务
var msg string
msg, err = database.RedisClient.LPop(queueCur)
if err != nil {
if msg == "" {
// 节点队列没有任务,获取公共队列任务
msg, err = database.RedisClient.LPop(queuePub)
if err != nil {
if msg == "" {
// 公共队列没有任务
log.Debugf(GetWorkerPrefix(id) + "没有任务...")
return
} else {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
}
} else {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
}
// 反序列化
tMsg := TaskMessage{}
if err := json.Unmarshal([]byte(msg), &tMsg); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
// 获取任务
t, err := model.GetTask(tMsg.Id)
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
// 获取爬虫
spider, err := t.GetSpider()
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
// 创建日志目录
fileDir, err := MakeLogDir(t)
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
// 获取日志文件路径
t.LogPath = GetLogFilePaths(fileDir)
// 创建日志目录文件夹
fileStdoutDir := filepath.Dir(t.LogPath)
if !utils.Exists(fileStdoutDir) {
if err := os.MkdirAll(fileStdoutDir, os.ModePerm); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
}
// 工作目录
cwd := filepath.Join(
viper.GetString("spider.path"),
spider.Name,
)
// 执行命令
cmd := spider.Cmd
// 加入参数
if t.Param != "" {
cmd += " " + t.Param
}
// 任务赋值
t.NodeId = node.Id // 任务节点信息
t.StartTs = time.Now() // 任务开始时间
t.Status = constants.StatusRunning // 任务状态
t.WaitDuration = t.StartTs.Sub(t.CreateTs).Seconds() // 等待时长
// 开始执行任务
log.Infof(GetWorkerPrefix(id) + "开始执行任务(ID:" + t.Id + ")")
// 储存任务
if err := t.Save(); err != nil {
log.Errorf(err.Error())
HandleTaskError(t, err)
return
}
// 起一个cron执行器来统计任务结果数
if spider.Col != "" {
cronExec := cron.New(cron.WithSeconds())
_, err = cronExec.AddFunc("*/5 * * * * *", SaveTaskResultCount(t.Id))
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
cronExec.Start()
defer cronExec.Stop()
}
// 执行Shell命令
if err := ExecuteShellCmd(cmd, cwd, t, spider); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
// 更新任务结果数
if spider.Col != "" {
if err := model.UpdateTaskResultCount(t.Id); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
}
// 完成进程
t, err = model.GetTask(t.Id)
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
t.Status = constants.StatusFinished // 任务状态: 已完成
t.FinishTs = time.Now() // 结束时间
t.RuntimeDuration = t.FinishTs.Sub(t.StartTs).Seconds() // 运行时长
t.TotalDuration = t.FinishTs.Sub(t.CreateTs).Seconds() // 总时长
// 保存任务
if err := t.Save(); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
// 结束计时
toc := time.Now()
// 统计时长
duration := toc.Sub(tic).Seconds()
durationStr := strconv.FormatFloat(duration, 'f', 6, 64)
log.Infof(GetWorkerPrefix(id) + "任务(ID:" + t.Id + ")" + "执行完毕. 消耗时间:" + durationStr + "秒")
}
func GetTaskLog(id string) (logStr string, err error) {
task, err := model.GetTask(id)
if err != nil {
return "", err
}
logStr = ""
if IsMasterNode(task.NodeId.Hex()) {
// 若为主节点,获取本机日志
logBytes, err := model.GetLocalLog(task.LogPath)
logStr = utils.BytesToString(logBytes)
if err != nil {
log.Errorf(err.Error())
logStr = err.Error()
// return "", err
} else {
logStr = utils.BytesToString(logBytes)
}
} else {
// 若不为主节点,获取远端日志
logStr, err = GetRemoteLog(task)
if err != nil {
log.Errorf(err.Error())
return "", err
}
}
return logStr, nil
}
func CancelTask(id string) (err error) {
// 获取任务
task, err := model.GetTask(id)
if err != nil {
log.Errorf("task not found, task id : %s, error: %s", id, err.Error())
debug.PrintStack()
return err
}
// 如果任务状态不为pending或running返回错误
if task.Status != constants.StatusPending && task.Status != constants.StatusRunning {
return errors.New("task is not cancellable")
}
// 获取当前节点(默认当前节点为主节点)
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf("get current node error: %s", err.Error())
debug.PrintStack()
return err
}
log.Infof("current node id is: %s", node.Id.Hex())
log.Infof("task node id is: %s", task.NodeId.Hex())
if node.Id == task.NodeId {
// 任务节点为主节点
// 获取任务执行频道
ch := utils.TaskExecChanMap.ChanBlocked(id)
if ch != nil {
// 发出取消进程信号
ch <- constants.TaskCancel
} else {
if err := model.UpdateTaskToAbnormal(node.Id); err != nil {
log.Errorf("update task to abnormal : {}", err.Error())
debug.PrintStack()
return err
}
}
} else {
// 任务节点为工作节点
// 序列化消息
msg := entity.NodeMessage{
Type: constants.MsgTypeCancelTask,
TaskId: id,
}
msgBytes, err := json.Marshal(&msg)
if err != nil {
return err
}
// 发布消息
if _, err := database.RedisClient.Publish("nodes:"+task.NodeId.Hex(), utils.BytesToString(msgBytes)); err != nil {
return err
}
}
return nil
}
func HandleTaskError(t model.Task, err error) {
log.Error("handle task error:" + err.Error())
t.Status = constants.StatusError
t.Error = err.Error()
t.FinishTs = time.Now()
if err := t.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
debug.PrintStack()
}
func InitTaskExecutor() error {
c := cron.New(cron.WithSeconds())
Exec = &Executor{
Cron: c,
}
if err := Exec.Start(); err != nil {
return err
}
return nil
}