diff --git a/backend/main.go b/backend/main.go index 0a04c70f..ee53165a 100644 --- a/backend/main.go +++ b/backend/main.go @@ -218,6 +218,7 @@ func main() { authGroup.DELETE("/tasks_by_status", routes.DeleteTaskByStatus) // 删除指定状态的任务 authGroup.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务 authGroup.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志 + authGroup.GET("/tasks/:id/error-log", routes.GetTaskErrorLog) // 任务错误日志 authGroup.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果 authGroup.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果 authGroup.POST("/tasks/:id/restart", routes.RestartTask) // 重新开始任务 diff --git a/backend/model/log.go b/backend/model/log.go index 4307c617..59ea431e 100644 --- a/backend/model/log.go +++ b/backend/model/log.go @@ -4,6 +4,7 @@ import ( "crawlab/database" "crawlab/utils" "github.com/apex/log" + "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "os" "runtime/debug" @@ -14,7 +15,15 @@ type LogItem struct { Id bson.ObjectId `json:"_id" bson:"_id"` Message string `json:"msg" bson:"msg"` TaskId string `json:"task_id" bson:"task_id"` - IsError bool `json:"is_error" bson:"is_error"` + Seq int64 `json:"seq" bson:"seq"` + Ts time.Time `json:"ts" bson:"ts"` +} + +type ErrorLogItem struct { + Id bson.ObjectId `json:"_id" bson:"_id"` + TaskId string `json:"task_id" bson:"task_id"` + Message string `json:"msg" bson:"msg"` + LogId bson.ObjectId `json:"log_id" bson:"log_id"` Seq int64 `json:"seq" bson:"seq"` Ts time.Time `json:"ts" bson:"ts"` } @@ -66,6 +75,21 @@ func AddLogItem(l LogItem) error { return nil } +func AddErrorLogItem(e ErrorLogItem) error { + s, c := database.GetCol("error_logs") + defer s.Close() + var l LogItem + err := c.FindId(bson.M{"log_id": e.LogId}).One(&l) + if err != nil && err == mgo.ErrNotFound { + if err := c.Insert(e); err != nil { + log.Errorf("insert log error: " + err.Error()) + debug.PrintStack() + return err + } + } + return nil +} + func GetLogItemList(query bson.M, keyword string, skip int, limit int, sortStr string) ([]LogItem, error) { s, c := database.GetCol("logs") defer s.Close() diff --git a/backend/model/task.go b/backend/model/task.go index 0b53b226..6fefb42b 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -127,6 +127,23 @@ func (t *Task) GetLogItems(keyword string, page int, pageSize int) (logItems []L return logItems, logTotal, nil } +func (t *Task) GetErrorLogItems() (errLogItems []ErrorLogItem, err error) { + s, c := database.GetCol("error_logs") + defer s.Close() + + query := bson.M{ + "task_id": t.Id, + } + + if err := c.Find(query).All(&errLogItems); err != nil { + log.Errorf("find error logs error: " + err.Error()) + debug.PrintStack() + return errLogItems, err + } + + return errLogItems, nil +} + func GetTaskList(filter interface{}, skip int, limit int, sortKey string) ([]Task, error) { s, c := database.GetCol("tasks") defer s.Close() @@ -390,6 +407,7 @@ func UpdateTaskResultCount(id string) (err error) { return nil } +// convert all running tasks to abnormal tasks func UpdateTaskToAbnormal(nodeId bson.ObjectId) error { s, c := database.GetCol("tasks") defer s.Close() @@ -411,3 +429,45 @@ func UpdateTaskToAbnormal(nodeId bson.ObjectId) error { } return nil } + +// update task error logs +func UpdateTaskErrorLogs(taskId string, errorRegexPattern string) error { + s, c := database.GetCol("logs") + defer s.Close() + + if errorRegexPattern == "" { + errorRegexPattern = constants.ErrorRegexPattern + } + + query := bson.M{ + "task_id": taskId, + "msg": bson.M{ + "$regex": bson.RegEx{ + Pattern: errorRegexPattern, + Options: "i", + }, + }, + } + var logs []LogItem + if err := c.Find(query).All(&logs); err != nil { + log.Errorf("find error logs error: " + err.Error()) + debug.PrintStack() + return err + } + + for _, l := range logs { + e := ErrorLogItem{ + Id: bson.NewObjectId(), + TaskId: l.TaskId, + Message: l.Message, + LogId: l.Id, + Seq: l.Seq, + Ts: time.Now(), + } + if err := AddErrorLogItem(e); err != nil { + return err + } + } + + return nil +} diff --git a/backend/routes/task.go b/backend/routes/task.go index 907a5ec9..2cbd6426 100644 --- a/backend/routes/task.go +++ b/backend/routes/task.go @@ -258,6 +258,20 @@ func GetTaskLog(c *gin.Context) { }) } +func GetTaskErrorLog(c *gin.Context) { + id := c.Param("id") + errLogItems, err := services.GetTaskErrorLog(id) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + Data: errLogItems, + }) +} + func GetTaskResults(c *gin.Context) { id := c.Param("id") diff --git a/backend/routes/user.go b/backend/routes/user.go index 86d46a61..47951428 100644 --- a/backend/routes/user.go +++ b/backend/routes/user.go @@ -237,6 +237,10 @@ func PostMe(c *gin.Context) { user.Setting.WechatRobotWebhook = reqBody.Setting.WechatRobotWebhook } user.Setting.EnabledNotifications = reqBody.Setting.EnabledNotifications + if reqBody.Setting.ErrorRegexPattern != "" { + user.Setting.ErrorRegexPattern = reqBody.Setting.ErrorRegexPattern + } + user.Setting.ErrorRegexPattern = reqBody.Setting.ErrorRegexPattern if user.UserId.Hex() == "" { user.UserId = bson.ObjectIdHex(constants.ObjectIdNull) diff --git a/backend/services/log.go b/backend/services/log.go index cc621c4c..99822820 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -131,6 +131,9 @@ func InitDeleteLogPeriodically() error { func InitLogIndexes() error { s, c := database.GetCol("logs") defer s.Close() + se, ce := database.GetCol("error_logs") + defer s.Close() + defer se.Close() _ = c.EnsureIndex(mgo.Index{ Key: []string{"task_id", "seq"}, @@ -138,6 +141,12 @@ func InitLogIndexes() error { _ = c.EnsureIndex(mgo.Index{ Key: []string{"task_id", "msg"}, }) + _ = ce.EnsureIndex(mgo.Index{ + Key: []string{"task_id"}, + }) + _ = ce.EnsureIndex(mgo.Index{ + Key: []string{"log_id"}, + }) return nil } diff --git a/backend/services/task.go b/backend/services/task.go index e12f9551..396d74fd 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -205,7 +205,6 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error { Seq: seq, Message: line, TaskId: t.Id, - IsError: false, Ts: time.Now(), }) } @@ -225,7 +224,6 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error { Seq: seq, Message: line, TaskId: t.Id, - IsError: true, Ts: time.Now(), }) } @@ -234,7 +232,7 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error { return nil } -func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) { +func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, s model.Spider, t model.Task) { // 传入信号,此处阻塞 signal := <-ch log.Infof("process received signal: %s", signal) @@ -265,6 +263,8 @@ func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) { t.FinishTs = time.Now() _ = t.Save() + + go FinishUpTask(s, t) } func StartTaskProcess(cmd *exec.Cmd, t model.Task) error { @@ -281,7 +281,7 @@ func StartTaskProcess(cmd *exec.Cmd, t model.Task) error { return nil } -func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error { +func WaitTaskProcess(cmd *exec.Cmd, t model.Task, s model.Spider) error { if err := cmd.Wait(); err != nil { log.Errorf("wait process finish error: %s", err.Error()) debug.PrintStack() @@ -297,11 +297,14 @@ func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error { t.FinishTs = time.Now() t.Status = constants.StatusError _ = t.Save() + + FinishUpTask(s, t) } } return err } + return nil } @@ -347,7 +350,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e // 起一个goroutine来监控进程 ch := utils.TaskExecChanMap.ChanBlocked(t.Id) - go FinishOrCancelTask(ch, cmd, t) + go FinishOrCancelTask(ch, cmd, s, t) // kill的时候,可以kill所有的子进程 if runtime.GOOS != constants.Windows { @@ -360,7 +363,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e } // 同步等待进程完成 - if err := WaitTaskProcess(cmd, t); err != nil { + if err := WaitTaskProcess(cmd, t, s); err != nil { return err } ch <- constants.TaskFinish @@ -418,6 +421,19 @@ func SaveTaskResultCount(id string) func() { } } +// Scan Error Logs +func ScanErrorLogs(t model.Task) func() { + return func() { + u, err := model.GetUser(t.UserId) + if err != nil { + return + } + if err := model.UpdateTaskErrorLogs(t.Id, u.Setting.ErrorRegexPattern); err != nil { + return + } + } +} + // 执行任务 func ExecuteTask(id int) { if flag, ok := LockList.Load(id); ok { @@ -538,12 +554,24 @@ func ExecuteTask(id int) { _, err = cronExec.AddFunc("*/5 * * * * *", SaveTaskResultCount(t.Id)) if err != nil { log.Errorf(GetWorkerPrefix(id) + err.Error()) + debug.PrintStack() return } cronExec.Start() defer cronExec.Stop() } + // 起一个cron来更新错误日志 + cronExecErrLog := cron.New(cron.WithSeconds()) + _, err = cronExecErrLog.AddFunc("*/30 * * * * *", ScanErrorLogs(t)) + if err != nil { + log.Errorf(GetWorkerPrefix(id) + err.Error()) + debug.PrintStack() + return + } + cronExecErrLog.Start() + defer cronExecErrLog.Stop() + // 获得触发任务用户 user, err := model.GetUser(t.UserId) if err != nil { @@ -563,13 +591,8 @@ func ExecuteTask(id int) { return } - // 更新任务结果数 - if spider.Col != "" { - if err := model.UpdateTaskResultCount(t.Id); err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - return - } - } + // 完成任务收尾工作 + go FinishUpTask(spider, t) // 完成进程 t, err = model.GetTask(t.Id) @@ -604,6 +627,22 @@ func ExecuteTask(id int) { log.Infof(GetWorkerPrefix(id) + "task (id:" + t.Id + ")" + " finished. elapsed:" + durationStr + " sec") } +func FinishUpTask(s model.Spider, t model.Task) { + // 更新任务结果数 + go func() { + if s.Col != "" { + if err := model.UpdateTaskResultCount(t.Id); err != nil { + return + } + } + }() + + // 更新任务错误日志 + go func() { + ScanErrorLogs(t)() + }() +} + func SpiderFileCheck(t model.Task, spider model.Spider) error { // 判断爬虫文件是否存在 gfFile := model.GetGridFs(spider.FileId) @@ -642,6 +681,18 @@ func GetTaskLog(id string, keyword string, page int, pageSize int) (logItems []m return logItems, logTotal, nil } +func GetTaskErrorLog(id string) (errLogItems []model.ErrorLogItem, err error) { + task, err := model.GetTask(id) + if err != nil { + return + } + errLogItems, err = task.GetErrorLogItems() + if err != nil { + return + } + return errLogItems, nil +} + func CancelTask(id string) (err error) { // 获取任务 task, err := model.GetTask(id)