package services import ( "crawlab/constants" "crawlab/database" "crawlab/entity" "crawlab/lib/cron" "crawlab/model" "crawlab/services/notification" "crawlab/services/spider_handler" "crawlab/utils" "encoding/json" "errors" "fmt" "github.com/apex/log" "github.com/globalsign/mgo/bson" uuid "github.com/satori/go.uuid" "github.com/spf13/viper" "os" "os/exec" "path" "path/filepath" "runtime" "runtime/debug" "strconv" "sync" "syscall" "time" ) var Exec *Executor // 任务执行锁 //Added by cloud: 2019/09/04,solve data race var LockList sync.Map // 任务消息 type TaskMessage struct { Id string Cmd string } // 序列化任务消息 func (m *TaskMessage) ToString() (string, error) { data, err := json.Marshal(&m) if err != nil { return "", err } return utils.BytesToString(data), err } // 任务执行器 type Executor struct { Cron *cron.Cron } // 启动任务执行器 func (ex *Executor) Start() error { // 启动cron服务 ex.Cron.Start() // 加入执行器到定时任务 spec := "0/1 * * * * *" // 每秒执行一次 for i := 0; i < viper.GetInt("task.workers"); i++ { // WorkerID id := i // 初始化任务锁 LockList.Store(id, false) // 加入定时任务 _, err := ex.Cron.AddFunc(spec, GetExecuteTaskFunc(id)) if err != nil { return err } } return nil } // 派发任务 func AssignTask(task model.Task) error { // 生成任务信息 msg := TaskMessage{ Id: task.Id, } // 序列化 msgStr, err := msg.ToString() if err != nil { return err } // 队列名称 var queue string if utils.IsObjectIdNull(task.NodeId) { queue = "tasks:public" } else { queue = "tasks:node:" + task.NodeId.Hex() } // 任务入队 if err := database.RedisClient.RPush(queue, msgStr); err != nil { return err } return nil } // 设置环境变量 func SetEnv(cmd *exec.Cmd, envs []model.Env, taskId string, dataCol string) *exec.Cmd { // 默认把Node.js的全局node_modules加入环境变量 envPath := os.Getenv("PATH") homePath := os.Getenv("HOME") nodeVersion := "v8.12.0" nodePath := path.Join(homePath, ".nvm/versions/node", nodeVersion, "lib/node_modules") _ = os.Setenv("PATH", nodePath+":"+envPath) _ = os.Setenv("NODE_PATH", nodePath) // 默认环境变量 cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+taskId) cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+dataCol) cmd.Env = append(cmd.Env, "CRAWLAB_MONGO_HOST="+viper.GetString("mongo.host")) cmd.Env = append(cmd.Env, "CRAWLAB_MONGO_PORT="+viper.GetString("mongo.port")) if viper.GetString("mongo.db") != "" { cmd.Env = append(cmd.Env, "CRAWLAB_MONGO_DB="+viper.GetString("mongo.db")) } if viper.GetString("mongo.username") != "" { cmd.Env = append(cmd.Env, "CRAWLAB_MONGO_USERNAME="+viper.GetString("mongo.username")) } if viper.GetString("mongo.password") != "" { cmd.Env = append(cmd.Env, "CRAWLAB_MONGO_PASSWORD="+viper.GetString("mongo.password")) } if viper.GetString("mongo.authSource") != "" { cmd.Env = append(cmd.Env, "CRAWLAB_MONGO_AUTHSOURCE="+viper.GetString("mongo.authSource")) } cmd.Env = append(cmd.Env, "PYTHONUNBUFFERED=0") cmd.Env = append(cmd.Env, "PYTHONIOENCODING=utf-8") cmd.Env = append(cmd.Env, "TZ=Asia/Shanghai") //任务环境变量 for _, env := range envs { cmd.Env = append(cmd.Env, env.Name+"="+env.Value) } // 全局环境变量 variables := model.GetVariableList() for _, variable := range variables { cmd.Env = append(cmd.Env, variable.Key+"="+variable.Value) } return cmd } func SetLogConfig(cmd *exec.Cmd, path string) error { fLog, err := os.Create(path) if err != nil { log.Errorf("create task log file error: %s", path) debug.PrintStack() return err } cmd.Stdout = fLog cmd.Stderr = fLog return nil } func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) { // 传入信号,此处阻塞 signal := <-ch log.Infof("process received signal: %s", signal) if signal == constants.TaskCancel && cmd.Process != nil { var err error // 兼容windows if runtime.GOOS == constants.Windows { err = cmd.Process.Kill() } else { err = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) } // 取消进程 if err != nil { log.Errorf("process kill error: %s", err.Error()) debug.PrintStack() t.Error = "kill process error: " + err.Error() t.Status = constants.StatusError } else { t.Error = "user kill the process ..." t.Status = constants.StatusCancelled } } else { // 保存任务 t.Status = constants.StatusFinished } t.FinishTs = time.Now() _ = t.Save() } func StartTaskProcess(cmd *exec.Cmd, t model.Task) error { if err := cmd.Start(); err != nil { log.Errorf("start spider error:{}", err.Error()) debug.PrintStack() t.Error = "start task error: " + err.Error() t.Status = constants.StatusError t.FinishTs = time.Now() _ = t.Save() return err } return nil } func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error { if err := cmd.Wait(); err != nil { log.Errorf("wait process finish error: %s", err.Error()) debug.PrintStack() if exitError, ok := err.(*exec.ExitError); ok { exitCode := exitError.ExitCode() log.Errorf("exit error, exit code: %d", exitCode) // 非kill 的错误类型 if exitCode != -1 { // 非手动kill保存为错误状态 t.Error = err.Error() t.FinishTs = time.Now() t.Status = constants.StatusError _ = t.Save() } } return err } return nil } // 执行shell命令 func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) { log.Infof("cwd: %s", cwd) log.Infof("cmd: %s", cmdStr) // 生成执行命令 var cmd *exec.Cmd if runtime.GOOS == constants.Windows { cmd = exec.Command("cmd", "/C", cmdStr) } else { cmd = exec.Command("sh", "-c", cmdStr) } // 工作目录 cmd.Dir = cwd // 日志配置 if err := SetLogConfig(cmd, t.LogPath); err != nil { return err } // 环境变量配置 envs := s.Envs if s.Type == constants.Configurable { // 数据库配置 envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_HOST", Value: viper.GetString("mongo.host")}) envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_PORT", Value: viper.GetString("mongo.port")}) envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_DB", Value: viper.GetString("mongo.db")}) envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_USERNAME", Value: viper.GetString("mongo.username")}) envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_PASSWORD", Value: viper.GetString("mongo.password")}) envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_AUTHSOURCE", Value: viper.GetString("mongo.authSource")}) // 设置配置 for envName, envValue := range s.Config.Settings { envs = append(envs, model.Env{Name: "CRAWLAB_SETTING_" + envName, Value: envValue}) } } cmd = SetEnv(cmd, envs, t.Id, s.Col) // 起一个goroutine来监控进程 ch := utils.TaskExecChanMap.ChanBlocked(t.Id) go FinishOrCancelTask(ch, cmd, t) // kill的时候,可以kill所有的子进程 if runtime.GOOS != constants.Windows { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} } // 启动进程 if err := StartTaskProcess(cmd, t); err != nil { return err } // 同步等待进程完成 if err := WaitTaskProcess(cmd, t); err != nil { return err } ch <- constants.TaskFinish return nil } // 生成日志目录 func MakeLogDir(t model.Task) (fileDir string, err error) { // 日志目录 fileDir = filepath.Join(viper.GetString("log.path"), t.SpiderId.Hex()) // 如果日志目录不存在,生成该目录 if !utils.Exists(fileDir) { if err := os.MkdirAll(fileDir, 0777); err != nil { log.Errorf("execute task, make log dir error: %s", err.Error()) debug.PrintStack() return "", err } } return fileDir, nil } // 获取日志文件路径 func GetLogFilePaths(fileDir string, t model.Task) (filePath string) { // 时间戳 ts := time.Now() tsStr := ts.Format("20060102150405") // stdout日志文件 filePath = filepath.Join(fileDir, t.Id+"_"+tsStr+".log") return filePath } // 生成执行任务方法 func GetExecuteTaskFunc(id int) func() { return func() { ExecuteTask(id) } } func GetWorkerPrefix(id int) string { return "[Worker " + strconv.Itoa(id) + "] " } // 统计任务结果数 func SaveTaskResultCount(id string) func() { return func() { if err := model.UpdateTaskResultCount(id); err != nil { log.Errorf(err.Error()) debug.PrintStack() return } } } // 执行任务 func ExecuteTask(id int) { if flag, ok := LockList.Load(id); ok { if flag.(bool) { log.Debugf(GetWorkerPrefix(id) + "running tasks...") return } } // 上锁 LockList.Store(id, true) // 解锁(延迟执行) defer func() { LockList.Delete(id) LockList.Store(id, false) }() // 开始计时 tic := time.Now() // 获取当前节点 node, err := model.GetCurrentNode() if err != nil { log.Errorf("execute task get current node error: %s", err.Error()) debug.PrintStack() return } // 节点队列 queueCur := "tasks:node:" + node.Id.Hex() // 节点队列任务 var msg string if msg, err = database.RedisClient.LPop(queueCur); err != nil { // 节点队列没有任务,获取公共队列任务 queuePub := "tasks:public" if msg, err = database.RedisClient.LPop(queuePub); err != nil { } } // 如果没有获取到任务,返回 if msg == "" { return } // 反序列化 tMsg := TaskMessage{} if err := json.Unmarshal([]byte(msg), &tMsg); err != nil { log.Errorf("json string to struct error: %s", err.Error()) return } // 获取任务 t, err := model.GetTask(tMsg.Id) if err != nil { log.Errorf("execute task, get task error: %s", err.Error()) return } // 获取爬虫 spider, err := t.GetSpider() if err != nil { log.Errorf("execute task, get spider error: %s", err.Error()) return } // 创建日志目录 var fileDir string if fileDir, err = MakeLogDir(t); err != nil { return } // 获取日志文件路径 t.LogPath = GetLogFilePaths(fileDir, t) // 工作目录 cwd := filepath.Join( viper.GetString("spider.path"), spider.Name, ) // 执行命令 var cmd string if spider.Type == constants.Configurable { // 可配置爬虫命令 cmd = "scrapy crawl config_spider" } else { // 自定义爬虫命令 cmd = spider.Cmd } // 加入参数 if t.Param != "" { cmd += " " + t.Param } // 任务赋值 t.NodeId = node.Id // 任务节点信息 t.StartTs = time.Now() // 任务开始时间 t.Status = constants.StatusRunning // 任务状态 t.WaitDuration = t.StartTs.Sub(t.CreateTs).Seconds() // 等待时长 // 文件检查 if err := SpiderFileCheck(t, spider); err != nil { log.Errorf("spider file check error: %s", err.Error()) return } // 开始执行任务 log.Infof(GetWorkerPrefix(id) + "开始执行任务(ID:" + t.Id + ")") // 储存任务 _ = t.Save() // 起一个cron执行器来统计任务结果数 if spider.Col != "" { cronExec := cron.New(cron.WithSeconds()) _, err = cronExec.AddFunc("*/5 * * * * *", SaveTaskResultCount(t.Id)) if err != nil { log.Errorf(GetWorkerPrefix(id) + err.Error()) return } cronExec.Start() defer cronExec.Stop() } // 获得触发任务用户 user, err := model.GetUser(t.UserId) if err != nil { log.Errorf(GetWorkerPrefix(id) + err.Error()) return } // 执行Shell命令 if err := ExecuteShellCmd(cmd, cwd, t, spider); err != nil { log.Errorf(GetWorkerPrefix(id) + err.Error()) // 如果发生错误,则发送通知 t, _ = model.GetTask(t.Id) if user.Setting.NotificationTrigger == constants.NotificationTriggerOnTaskEnd || user.Setting.NotificationTrigger == constants.NotificationTriggerOnTaskError { SendNotifications(user, t, spider) } return } // 更新任务结果数 if spider.Col != "" { if err := model.UpdateTaskResultCount(t.Id); err != nil { log.Errorf(GetWorkerPrefix(id) + err.Error()) return } } // 完成进程 t, err = model.GetTask(t.Id) if err != nil { log.Errorf(GetWorkerPrefix(id) + err.Error()) return } // 统计数据 t.Status = constants.StatusFinished // 任务状态: 已完成 t.FinishTs = time.Now() // 结束时间 t.RuntimeDuration = t.FinishTs.Sub(t.StartTs).Seconds() // 运行时长 t.TotalDuration = t.FinishTs.Sub(t.CreateTs).Seconds() // 总时长 // 如果是任务结束时发送通知,则发送通知 if user.Setting.NotificationTrigger == constants.NotificationTriggerOnTaskEnd { SendNotifications(user, t, spider) } // 保存任务 if err := t.Save(); err != nil { log.Errorf(GetWorkerPrefix(id) + err.Error()) return } // 结束计时 toc := time.Now() // 统计时长 duration := toc.Sub(tic).Seconds() durationStr := strconv.FormatFloat(duration, 'f', 6, 64) log.Infof(GetWorkerPrefix(id) + "任务(ID:" + t.Id + ")" + "执行完毕. 消耗时间:" + durationStr + "秒") } func SpiderFileCheck(t model.Task, spider model.Spider) error { // 判断爬虫文件是否存在 gfFile := model.GetGridFs(spider.FileId) if gfFile == nil { t.Error = "cannot find spider files, please re-upload" t.Status = constants.StatusError t.FinishTs = time.Now() // 结束时间 t.RuntimeDuration = t.FinishTs.Sub(t.StartTs).Seconds() // 运行时长 t.TotalDuration = t.FinishTs.Sub(t.CreateTs).Seconds() // 总时长 _ = t.Save() return errors.New(t.Error) } // 判断md5值是否一致 path := filepath.Join(viper.GetString("spider.path"), spider.Name) md5File := filepath.Join(path, spider_handler.Md5File) md5 := utils.GetSpiderMd5Str(md5File) if gfFile.Md5 != md5 { spiderSync := spider_handler.SpiderSync{Spider: spider} spiderSync.RemoveDownCreate(gfFile.Md5) } return nil } func GetTaskLog(id string) (logStr string, err error) { task, err := model.GetTask(id) if err != nil { return } if IsMasterNode(task.NodeId.Hex()) { if !utils.Exists(task.LogPath) { fileDir, err := MakeLogDir(task) if err != nil { log.Errorf(err.Error()) } fileP := GetLogFilePaths(fileDir, task) // 获取日志文件路径 fLog, err := os.Create(fileP) defer fLog.Close() if err != nil { log.Errorf("create task log file error: %s", fileP) debug.PrintStack() } task.LogPath = fileP if err := task.Save(); err != nil { log.Errorf(err.Error()) debug.PrintStack() } } // 若为主节点,获取本机日志 logBytes, err := model.GetLocalLog(task.LogPath) if err != nil { log.Errorf(err.Error()) logStr = err.Error() } else { logStr = utils.BytesToString(logBytes) } return logStr, err } // 若不为主节点,获取远端日志 logStr, err = GetRemoteLog(task) if err != nil { log.Errorf(err.Error()) } return logStr, err } func CancelTask(id string) (err error) { // 获取任务 task, err := model.GetTask(id) if err != nil { log.Errorf("task not found, task id : %s, error: %s", id, err.Error()) debug.PrintStack() return err } // 如果任务状态不为pending或running,返回错误 if task.Status != constants.StatusPending && task.Status != constants.StatusRunning { return errors.New("task is not cancellable") } // 获取当前节点(默认当前节点为主节点) node, err := model.GetCurrentNode() if err != nil { log.Errorf("get current node error: %s", err.Error()) debug.PrintStack() return err } log.Infof("current node id is: %s", node.Id.Hex()) log.Infof("task node id is: %s", task.NodeId.Hex()) if node.Id == task.NodeId { // 任务节点为主节点 // 获取任务执行频道 ch := utils.TaskExecChanMap.ChanBlocked(id) if ch != nil { // 发出取消进程信号 ch <- constants.TaskCancel } else { if err := model.UpdateTaskToAbnormal(node.Id); err != nil { log.Errorf("update task to abnormal : {}", err.Error()) debug.PrintStack() return err } } } else { // 任务节点为工作节点 // 序列化消息 msg := entity.NodeMessage{ Type: constants.MsgTypeCancelTask, TaskId: id, } msgBytes, err := json.Marshal(&msg) if err != nil { return err } // 发布消息 if _, err := database.RedisClient.Publish("nodes:"+task.NodeId.Hex(), utils.BytesToString(msgBytes)); err != nil { return err } } return nil } func AddTask(t model.Task) (string, error) { // 生成任务ID id := uuid.NewV4() t.Id = id.String() // 设置任务状态 t.Status = constants.StatusPending // 如果没有传入node_id,则置为null if t.NodeId.Hex() == "" { t.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) } // 将任务存入数据库 if err := model.AddTask(t); err != nil { log.Errorf(err.Error()) debug.PrintStack() return t.Id, err } // 加入任务队列 if err := AssignTask(t); err != nil { log.Errorf(err.Error()) debug.PrintStack() return t.Id, err } return t.Id, nil } func GetTaskEmailMarkdownContent(t model.Task, s model.Spider) string { n, _ := model.GetNode(t.NodeId) errMsg := "" statusMsg := fmt.Sprintf(`%s`, t.Status) if t.Status == constants.StatusError { errMsg = " with errors" statusMsg = fmt.Sprintf(`%s`, t.Status) } return fmt.Sprintf(` Your task has finished%s. Please find the task info below. | --: | :-- **Task ID:** | %s **Task Status:** | %s **Task Param:** | %s **Spider ID:** | %s **Spider Name:** | %s **Node:** | %s **Create Time:** | %s **Start Time:** | %s **Finish Time:** | %s **Wait Duration:** | %.0f sec **Runtime Duration:** | %.0f sec **Total Duration:** | %.0f sec **Number of Results:** | %d **Error:** | %s Please login to Crawlab to view the details. `, errMsg, t.Id, statusMsg, t.Param, s.Id.Hex(), s.Name, n.Name, utils.GetLocalTimeString(t.CreateTs), utils.GetLocalTimeString(t.StartTs), utils.GetLocalTimeString(t.FinishTs), t.WaitDuration, t.RuntimeDuration, t.TotalDuration, t.ResultCount, t.Error, ) } func GetTaskMarkdownContent(t model.Task, s model.Spider) string { n, _ := model.GetNode(t.NodeId) errMsg := "" errLog := "-" statusMsg := fmt.Sprintf(`%s`, t.Status) if t.Status == constants.StatusError { errMsg = `(有错误)` errLog = fmt.Sprintf(`%s`, t.Error) statusMsg = fmt.Sprintf(`%s`, t.Status) } return fmt.Sprintf(` 您的任务已完成%s,请查看任务信息如下。 > **任务ID:** %s > **任务状态:** %s > **任务参数:** %s > **爬虫ID:** %s > **爬虫名称:** %s > **节点:** %s > **创建时间:** %s > **开始时间:** %s > **完成时间:** %s > **等待时间:** %.0f秒 > **运行时间:** %.0f秒 > **总时间:** %.0f秒 > **结果数:** %d > **错误:** %s 请登录Crawlab查看详情。 `, errMsg, t.Id, statusMsg, t.Param, s.Id.Hex(), s.Name, n.Name, utils.GetLocalTimeString(t.CreateTs), utils.GetLocalTimeString(t.StartTs), utils.GetLocalTimeString(t.FinishTs), t.WaitDuration, t.RuntimeDuration, t.TotalDuration, t.ResultCount, errLog, ) } func SendTaskEmail(u model.User, t model.Task, s model.Spider) { statusMsg := "has finished" if t.Status == constants.StatusError { statusMsg = "has an error" } title := fmt.Sprintf("[Crawlab] Task for \"%s\" %s", s.Name, statusMsg) if err := notification.SendMail( u.Email, u.Username, title, GetTaskEmailMarkdownContent(t, s), ); err != nil { log.Errorf("mail error: " + err.Error()) debug.PrintStack() } } func SendTaskDingTalk(u model.User, t model.Task, s model.Spider) { statusMsg := "已完成" if t.Status == constants.StatusError { statusMsg = "发生错误" } title := fmt.Sprintf("[Crawlab] \"%s\" 任务%s", s.Name, statusMsg) content := GetTaskMarkdownContent(t, s) if err := notification.SendMobileNotification(u.Setting.DingTalkRobotWebhook, title, content); err != nil { log.Errorf(err.Error()) debug.PrintStack() } } func SendTaskWechat(u model.User, t model.Task, s model.Spider) { content := GetTaskMarkdownContent(t, s) if err := notification.SendMobileNotification(u.Setting.WechatRobotWebhook, "", content); err != nil { log.Errorf(err.Error()) debug.PrintStack() } } func SendNotifications(u model.User, t model.Task, s model.Spider) { if u.Email != "" && utils.StringArrayContains(u.Setting.EnabledNotifications, constants.NotificationTypeMail) { go func() { SendTaskEmail(u, t, s) }() } if u.Setting.DingTalkRobotWebhook != "" && utils.StringArrayContains(u.Setting.EnabledNotifications, constants.NotificationTypeDingTalk) { go func() { SendTaskDingTalk(u, t, s) }() } if u.Setting.WechatRobotWebhook != "" && utils.StringArrayContains(u.Setting.EnabledNotifications, constants.NotificationTypeWechat) { go func() { SendTaskWechat(u, t, s) }() } } func InitTaskExecutor() error { // 构造任务执行器 c := cron.New(cron.WithSeconds()) Exec = &Executor{ Cron: c, } // 如果不允许主节点运行任务,则跳过 if model.IsMaster() && viper.GetString("setting.runOnMaster") == "N" { return nil } // 运行定时任务 if err := Exec.Start(); err != nil { return err } return nil }