diff --git a/backend/model/node.go b/backend/model/node.go index 1a1ebce5..2beb9e1c 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -4,6 +4,7 @@ import ( "crawlab/constants" "crawlab/database" "crawlab/services/register" + "errors" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" @@ -158,16 +159,19 @@ func GetNodeList(filter interface{}) ([]Node, error) { func GetNode(id bson.ObjectId) (Node, error) { var node Node + if id.Hex() == "" { - return node, nil + log.Infof("id is empty") + debug.PrintStack() + return node, errors.New("id is empty") } + s, c := database.GetCol("nodes") defer s.Close() + if err := c.FindId(id).One(&node); err != nil { - if err != mgo.ErrNotFound { - log.Errorf(err.Error()) - debug.PrintStack() - } + log.Errorf(err.Error()) + debug.PrintStack() return node, err } return node, nil diff --git a/backend/model/schedule.go b/backend/model/schedule.go index bcd051e3..36799ac3 100644 --- a/backend/model/schedule.go +++ b/backend/model/schedule.go @@ -45,6 +45,27 @@ func (sch *Schedule) Delete() error { return c.RemoveId(sch.Id) } +func (sch *Schedule) SyncNodeIdAndSpiderId(node Node, spider Spider) { + sch.syncNodeId(node) + sch.syncSpiderId(spider) +} + +func (sch *Schedule) syncNodeId(node Node) { + if node.Id.Hex() == sch.NodeId.Hex() { + return + } + sch.NodeId = node.Id + _ = sch.Save() +} + +func (sch *Schedule) syncSpiderId(spider Spider) { + if spider.Id.Hex() == sch.SpiderId.Hex() { + return + } + sch.SpiderId = spider.Id + _ = sch.Save() +} + func GetScheduleList(filter interface{}) ([]Schedule, error) { s, c := database.GetCol("schedules") defer s.Close() @@ -103,13 +124,11 @@ func UpdateSchedule(id bson.ObjectId, item Schedule) error { if err := c.FindId(id).One(&result); err != nil { return err } - node, err := GetNode(item.NodeId) if err != nil { - log.Errorf("get node error: %s", err.Error()) - debug.PrintStack() - return nil + return err } + item.NodeKey = node.Key if err := item.Save(); err != nil { return err @@ -123,9 +142,7 @@ func AddSchedule(item Schedule) error { node, err := GetNode(item.NodeId) if err != nil { - log.Errorf("get node error: %s", err.Error()) - debug.PrintStack() - return nil + return err } item.Id = bson.NewObjectId() diff --git a/backend/routes/schedule.go b/backend/routes/schedule.go index 24df0c0f..73b75323 100644 --- a/backend/routes/schedule.go +++ b/backend/routes/schedule.go @@ -45,13 +45,14 @@ func PostSchedule(c *gin.Context) { HandleError(http.StatusBadRequest, c, err) return } + + // 验证cron表达式 + if err := services.ParserCron(newItem.Cron); err != nil { + HandleError(http.StatusOK, c, err) + return + } + newItem.Id = bson.ObjectIdHex(id) - - // 如果node_id为空,则置为空ObjectId - //if newItem.NodeId == "" { - // newItem.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) - //} - // 更新数据库 if err := model.UpdateSchedule(bson.ObjectIdHex(id), newItem); err != nil { HandleError(http.StatusInternalServerError, c, err) @@ -79,10 +80,11 @@ func PutSchedule(c *gin.Context) { return } - // 如果node_id为空,则置为空ObjectId - //if item.NodeId == "" { - // item.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) - //} + // 验证cron表达式 + if err := services.ParserCron(item.Cron); err != nil { + HandleError(http.StatusOK, c, err) + return + } // 更新数据库 if err := model.AddSchedule(item); err != nil { diff --git a/backend/services/node.go b/backend/services/node.go index 04cbc0ef..8b1f998a 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -100,13 +100,13 @@ func handleNodeInfo(key string, data Data) { defer s.Close() // 同个key可能因为并发,被注册多次 - //var nodes []model.Node - //_ = c.Find(bson.M{"key": key}).All(&nodes) - //if nodes != nil && len(nodes) > 1 { - // for _, node := range nodes { - // _ = c.RemoveId(node.Id) - // } - //} + var nodes []model.Node + _ = c.Find(bson.M{"key": key}).All(&nodes) + if nodes != nil && len(nodes) > 1 { + for _, node := range nodes { + _ = c.RemoveId(node.Id) + } + } var node model.Node if err := c.Find(bson.M{"key": key}).One(&node); err != nil { @@ -227,7 +227,7 @@ func InitNodeService() error { } // 首次更新节点数据(注册到Redis) - // UpdateNodeData() + UpdateNodeData() // 获取当前节点 node, err := model.GetCurrentNode() diff --git a/backend/services/schedule.go b/backend/services/schedule.go index f011f02a..d4c1635b 100644 --- a/backend/services/schedule.go +++ b/backend/services/schedule.go @@ -5,7 +5,7 @@ import ( "crawlab/lib/cron" "crawlab/model" "github.com/apex/log" - uuid "github.com/satori/go.uuid" + "github.com/satori/go.uuid" "runtime/debug" ) @@ -31,6 +31,9 @@ func AddTask(s model.Schedule) func() { return } + // 同步ID到定时任务 + s.SyncNodeIdAndSpiderId(node, *spider) + // 生成任务ID id := uuid.NewV4() @@ -119,6 +122,18 @@ func (s *Scheduler) RemoveAll() { } } +// 验证cron表达式是否正确 +func ParserCron(spec string) error { + parser := cron.NewParser( + cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor, + ) + + if _, err := parser.Parse(spec); err != nil { + return err + } + return nil +} + func (s *Scheduler) Update() error { // 删除所有定时任务 s.RemoveAll() diff --git a/backend/services/task.go b/backend/services/task.go index 12f0330e..03038613 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -100,91 +100,85 @@ func AssignTask(task model.Task) error { return nil } -// 执行shell命令 -func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) { - log.Infof("cwd: " + cwd) - log.Infof("cmd: " + cmdStr) +// 设置环境变量 +func SetEnv(cmd *exec.Cmd, envs []model.Env, taskId string, dataCol string) *exec.Cmd { + // 默认环境变量 + cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+taskId) + cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+dataCol) + cmd.Env = append(cmd.Env, "PYTHONUNBUFFERED=0") + cmd.Env = append(cmd.Env, "PYTHONIOENCODING=utf-8") + cmd.Env = append(cmd.Env, "TZ=Asia/Shanghai") - // 生成执行命令 - var cmd *exec.Cmd - if runtime.GOOS == constants.Windows { - cmd = exec.Command("cmd", "/C", cmdStr) - } else { - cmd = exec.Command("sh", "-c", cmdStr) - } - - // 工作目录 - cmd.Dir = cwd - - // 指定stdout, stderr日志位置 - fLog, err := os.Create(t.LogPath) - if err != nil { - HandleTaskError(t, err) - return err - } - defer fLog.Close() - cmd.Stdout = fLog - cmd.Stderr = fLog - - // 添加默认环境变量 - cmd.Env = append(cmd.Env, "CRAWLAB_TASK_ID="+t.Id) - cmd.Env = append(cmd.Env, "CRAWLAB_COLLECTION="+s.Col) - - // 添加任务环境变量 - for _, env := range s.Envs { + //任务环境变量 + for _, env := range envs { cmd.Env = append(cmd.Env, env.Name+"="+env.Value) } - // 起一个goroutine来监控进程 - ch := utils.TaskExecChanMap.ChanBlocked(t.Id) - go func() { - // 传入信号,此处阻塞 - signal := <-ch - log.Infof("cancel process signal: %s", signal) - if signal == constants.TaskCancel && cmd.Process != nil { - // 取消进程 - if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { - log.Errorf("process kill error: %s", err.Error()) - debug.PrintStack() - } - t.Status = constants.StatusCancelled - t.Error = "user kill the process ..." - } else { - // 保存任务 - t.Status = constants.StatusFinished - } - t.FinishTs = time.Now() - if err := t.Save(); err != nil { - log.Infof("save task error: %s", err.Error()) + // TODO 全局环境变量 + return cmd +} + +func SetLogConfig(cmd *exec.Cmd, path string) error { + fLog, err := os.Create(path) + if err != nil { + log.Errorf("create task log file error: %s", path) + debug.PrintStack() + return err + } + cmd.Stdout = fLog + cmd.Stderr = fLog + return nil +} + +func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) { + // 传入信号,此处阻塞 + signal := <-ch + log.Infof("process received signal: %s", signal) + + if signal == constants.TaskCancel && cmd.Process != nil { + // 取消进程 + if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil { + log.Errorf("process kill error: %s", err.Error()) debug.PrintStack() - return + + t.Error = "kill process error: " + err.Error() + t.Status = constants.StatusError + } else { + t.Error = "user kill the process ..." + t.Status = constants.StatusCancelled } - }() + } else { + // 保存任务 + t.Status = constants.StatusFinished + } - // 在选择所有节点执行的时候,实际就是随机一个节点执行的, - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + t.FinishTs = time.Now() + _ = t.Save() +} - // 异步启动进程 +func StartTaskProcess(cmd *exec.Cmd, t model.Task) error { if err := cmd.Start(); err != nil { log.Errorf("start spider error:{}", err.Error()) debug.PrintStack() - return err - } - // 保存pid到task - t.Pid = cmd.Process.Pid - if err := t.Save(); err != nil { - log.Errorf("save task pid error: %s", err.Error()) - debug.PrintStack() + t.Error = "start task error: " + err.Error() + t.Status = constants.StatusError + t.FinishTs = time.Now() + _ = t.Save() return err } - // 同步等待进程完成 + return nil +} + +func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error { if err := cmd.Wait(); err != nil { log.Errorf("wait process finish error: %s", err.Error()) debug.PrintStack() + if exitError, ok := err.(*exec.ExitError); ok { exitCode := exitError.ExitCode() log.Errorf("exit error, exit code: %d", exitCode) + // 非kill 的错误类型 if exitCode != -1 { // 非手动kill保存为错误状态 @@ -194,6 +188,52 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e _ = t.Save() } } + + return err + } + return nil +} + +// 执行shell命令 +func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) { + log.Infof("cwd: %s", cwd) + log.Infof("cmd: %s", cmdStr) + + // 生成执行命令 + var cmd *exec.Cmd + if runtime.GOOS == constants.Windows { + cmd = exec.Command("cmd", "/C", cmdStr) + } else { + cmd = exec.Command("") + cmd = exec.Command("sh", "-c", cmdStr) + } + + // 工作目录 + cmd.Dir = cwd + + // 日志配置 + if err := SetLogConfig(cmd, t.LogPath); err != nil { + return err + } + + // 环境变量配置 + cmd = SetEnv(cmd, s.Envs, t.Id, s.Col) + + // 起一个goroutine来监控进程 + ch := utils.TaskExecChanMap.ChanBlocked(t.Id) + + go FinishOrCancelTask(ch, cmd, t) + + // kill的时候,可以kill所有的子进程 + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + // 启动进程 + if err := StartTaskProcess(cmd, t); err != nil { + return err + } + + // 同步等待进程完成 + if err := WaitTaskProcess(cmd, t); err != nil { return err } ch <- constants.TaskFinish @@ -208,6 +248,7 @@ func MakeLogDir(t model.Task) (fileDir string, err error) { // 如果日志目录不存在,生成该目录 if !utils.Exists(fileDir) { if err := os.MkdirAll(fileDir, 0777); err != nil { + log.Errorf("execute task, make log dir error: %s", err.Error()) debug.PrintStack() return "", err } @@ -272,85 +313,55 @@ func ExecuteTask(id int) { // 获取当前节点 node, err := model.GetCurrentNode() if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + log.Errorf("execute task get current node error: %s", err.Error()) + debug.PrintStack() return } - // 公共队列 - queuePub := "tasks:public" - // 节点队列 queueCur := "tasks:node:" + node.Id.Hex() - // 节点队列任务 var msg string - msg, err = database.RedisClient.LPop(queueCur) - if msg != "" { - log.Infof("queue cur: %s", msg) - } - if err != nil { - if msg == "" { - // 节点队列没有任务,获取公共队列任务 - msg, err = database.RedisClient.LPop(queuePub) - if err != nil { - if msg == "" { - // 公共队列没有任务 - log.Debugf(GetWorkerPrefix(id) + "没有任务...") - return - } else { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() - return - } - } - } else { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() - return + if msg, err = database.RedisClient.LPop(queueCur); err != nil { + // 节点队列没有任务,获取公共队列任务 + queuePub := "tasks:public" + if msg, err = database.RedisClient.LPop(queuePub); err != nil { } } + if msg == "" { + return + } + // 反序列化 tMsg := TaskMessage{} if err := json.Unmarshal([]byte(msg), &tMsg); err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() + log.Errorf("json string to struct error: %s", err.Error()) return } // 获取任务 t, err := model.GetTask(tMsg.Id) if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + log.Errorf("execute task, get task error: %s", err.Error()) return } // 获取爬虫 spider, err := t.GetSpider() if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + log.Errorf("execute task, get spider error: %s", err.Error()) return } // 创建日志目录 - fileDir, err := MakeLogDir(t) - if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) + var fileDir string + if fileDir, err = MakeLogDir(t); err != nil { return } - // 获取日志文件路径 t.LogPath = GetLogFilePaths(fileDir) - // 创建日志目录文件夹 - fileStdoutDir := filepath.Dir(t.LogPath) - if !utils.Exists(fileStdoutDir) { - if err := os.MkdirAll(fileStdoutDir, os.ModePerm); err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - return - } - } - // 工作目录 cwd := filepath.Join( viper.GetString("spider.path"), diff --git a/frontend/src/components/Common/CrawlConfirmDialog.vue b/frontend/src/components/Common/CrawlConfirmDialog.vue index 266ef2eb..2286beb2 100644 --- a/frontend/src/components/Common/CrawlConfirmDialog.vue +++ b/frontend/src/components/Common/CrawlConfirmDialog.vue @@ -9,9 +9,8 @@ - diff --git a/frontend/src/views/schedule/ScheduleList.vue b/frontend/src/views/schedule/ScheduleList.vue index 4d283966..b170c9ed 100644 --- a/frontend/src/views/schedule/ScheduleList.vue +++ b/frontend/src/views/schedule/ScheduleList.vue @@ -14,7 +14,7 @@ - + - - - + + + + + + + + + + + - {{$t('schedules.add_cron')}} + + {{$t('Cancel')}} {{$t('Submit')}} @@ -76,9 +78,9 @@ - - - + + + @@ -131,7 +133,7 @@