加入日志异常检测

This commit is contained in:
marvzhang
2020-04-19 10:32:36 +08:00
parent c7675ca40a
commit 7764b9fb82
7 changed files with 177 additions and 14 deletions

View File

@@ -218,6 +218,7 @@ func main() {
authGroup.DELETE("/tasks_by_status", routes.DeleteTaskByStatus) // 删除指定状态的任务
authGroup.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务
authGroup.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志
authGroup.GET("/tasks/:id/error-log", routes.GetTaskErrorLog) // 任务错误日志
authGroup.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果
authGroup.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果
authGroup.POST("/tasks/:id/restart", routes.RestartTask) // 重新开始任务

View File

@@ -4,6 +4,7 @@ import (
"crawlab/database"
"crawlab/utils"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"os"
"runtime/debug"
@@ -14,7 +15,15 @@ type LogItem struct {
Id bson.ObjectId `json:"_id" bson:"_id"`
Message string `json:"msg" bson:"msg"`
TaskId string `json:"task_id" bson:"task_id"`
IsError bool `json:"is_error" bson:"is_error"`
Seq int64 `json:"seq" bson:"seq"`
Ts time.Time `json:"ts" bson:"ts"`
}
type ErrorLogItem struct {
Id bson.ObjectId `json:"_id" bson:"_id"`
TaskId string `json:"task_id" bson:"task_id"`
Message string `json:"msg" bson:"msg"`
LogId bson.ObjectId `json:"log_id" bson:"log_id"`
Seq int64 `json:"seq" bson:"seq"`
Ts time.Time `json:"ts" bson:"ts"`
}
@@ -66,6 +75,21 @@ func AddLogItem(l LogItem) error {
return nil
}
func AddErrorLogItem(e ErrorLogItem) error {
s, c := database.GetCol("error_logs")
defer s.Close()
var l LogItem
err := c.FindId(bson.M{"log_id": e.LogId}).One(&l)
if err != nil && err == mgo.ErrNotFound {
if err := c.Insert(e); err != nil {
log.Errorf("insert log error: " + err.Error())
debug.PrintStack()
return err
}
}
return nil
}
func GetLogItemList(query bson.M, keyword string, skip int, limit int, sortStr string) ([]LogItem, error) {
s, c := database.GetCol("logs")
defer s.Close()

View File

@@ -127,6 +127,23 @@ func (t *Task) GetLogItems(keyword string, page int, pageSize int) (logItems []L
return logItems, logTotal, nil
}
func (t *Task) GetErrorLogItems() (errLogItems []ErrorLogItem, err error) {
s, c := database.GetCol("error_logs")
defer s.Close()
query := bson.M{
"task_id": t.Id,
}
if err := c.Find(query).All(&errLogItems); err != nil {
log.Errorf("find error logs error: " + err.Error())
debug.PrintStack()
return errLogItems, err
}
return errLogItems, nil
}
func GetTaskList(filter interface{}, skip int, limit int, sortKey string) ([]Task, error) {
s, c := database.GetCol("tasks")
defer s.Close()
@@ -390,6 +407,7 @@ func UpdateTaskResultCount(id string) (err error) {
return nil
}
// convert all running tasks to abnormal tasks
func UpdateTaskToAbnormal(nodeId bson.ObjectId) error {
s, c := database.GetCol("tasks")
defer s.Close()
@@ -411,3 +429,45 @@ func UpdateTaskToAbnormal(nodeId bson.ObjectId) error {
}
return nil
}
// update task error logs
func UpdateTaskErrorLogs(taskId string, errorRegexPattern string) error {
s, c := database.GetCol("logs")
defer s.Close()
if errorRegexPattern == "" {
errorRegexPattern = constants.ErrorRegexPattern
}
query := bson.M{
"task_id": taskId,
"msg": bson.M{
"$regex": bson.RegEx{
Pattern: errorRegexPattern,
Options: "i",
},
},
}
var logs []LogItem
if err := c.Find(query).All(&logs); err != nil {
log.Errorf("find error logs error: " + err.Error())
debug.PrintStack()
return err
}
for _, l := range logs {
e := ErrorLogItem{
Id: bson.NewObjectId(),
TaskId: l.TaskId,
Message: l.Message,
LogId: l.Id,
Seq: l.Seq,
Ts: time.Now(),
}
if err := AddErrorLogItem(e); err != nil {
return err
}
}
return nil
}

View File

@@ -258,6 +258,20 @@ func GetTaskLog(c *gin.Context) {
})
}
func GetTaskErrorLog(c *gin.Context) {
id := c.Param("id")
errLogItems, err := services.GetTaskErrorLog(id)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
Data: errLogItems,
})
}
func GetTaskResults(c *gin.Context) {
id := c.Param("id")

View File

@@ -237,6 +237,10 @@ func PostMe(c *gin.Context) {
user.Setting.WechatRobotWebhook = reqBody.Setting.WechatRobotWebhook
}
user.Setting.EnabledNotifications = reqBody.Setting.EnabledNotifications
if reqBody.Setting.ErrorRegexPattern != "" {
user.Setting.ErrorRegexPattern = reqBody.Setting.ErrorRegexPattern
}
user.Setting.ErrorRegexPattern = reqBody.Setting.ErrorRegexPattern
if user.UserId.Hex() == "" {
user.UserId = bson.ObjectIdHex(constants.ObjectIdNull)

View File

@@ -131,6 +131,9 @@ func InitDeleteLogPeriodically() error {
func InitLogIndexes() error {
s, c := database.GetCol("logs")
defer s.Close()
se, ce := database.GetCol("error_logs")
defer s.Close()
defer se.Close()
_ = c.EnsureIndex(mgo.Index{
Key: []string{"task_id", "seq"},
@@ -138,6 +141,12 @@ func InitLogIndexes() error {
_ = c.EnsureIndex(mgo.Index{
Key: []string{"task_id", "msg"},
})
_ = ce.EnsureIndex(mgo.Index{
Key: []string{"task_id"},
})
_ = ce.EnsureIndex(mgo.Index{
Key: []string{"log_id"},
})
return nil
}

View File

@@ -205,7 +205,6 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
Seq: seq,
Message: line,
TaskId: t.Id,
IsError: false,
Ts: time.Now(),
})
}
@@ -225,7 +224,6 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
Seq: seq,
Message: line,
TaskId: t.Id,
IsError: true,
Ts: time.Now(),
})
}
@@ -234,7 +232,7 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
return nil
}
func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) {
func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, s model.Spider, t model.Task) {
// 传入信号,此处阻塞
signal := <-ch
log.Infof("process received signal: %s", signal)
@@ -265,6 +263,8 @@ func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) {
t.FinishTs = time.Now()
_ = t.Save()
go FinishUpTask(s, t)
}
func StartTaskProcess(cmd *exec.Cmd, t model.Task) error {
@@ -281,7 +281,7 @@ func StartTaskProcess(cmd *exec.Cmd, t model.Task) error {
return nil
}
func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error {
func WaitTaskProcess(cmd *exec.Cmd, t model.Task, s model.Spider) error {
if err := cmd.Wait(); err != nil {
log.Errorf("wait process finish error: %s", err.Error())
debug.PrintStack()
@@ -297,11 +297,14 @@ func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error {
t.FinishTs = time.Now()
t.Status = constants.StatusError
_ = t.Save()
FinishUpTask(s, t)
}
}
return err
}
return nil
}
@@ -347,7 +350,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
// 起一个goroutine来监控进程
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
go FinishOrCancelTask(ch, cmd, t)
go FinishOrCancelTask(ch, cmd, s, t)
// kill的时候可以kill所有的子进程
if runtime.GOOS != constants.Windows {
@@ -360,7 +363,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
}
// 同步等待进程完成
if err := WaitTaskProcess(cmd, t); err != nil {
if err := WaitTaskProcess(cmd, t, s); err != nil {
return err
}
ch <- constants.TaskFinish
@@ -418,6 +421,19 @@ func SaveTaskResultCount(id string) func() {
}
}
// Scan Error Logs
func ScanErrorLogs(t model.Task) func() {
return func() {
u, err := model.GetUser(t.UserId)
if err != nil {
return
}
if err := model.UpdateTaskErrorLogs(t.Id, u.Setting.ErrorRegexPattern); err != nil {
return
}
}
}
// 执行任务
func ExecuteTask(id int) {
if flag, ok := LockList.Load(id); ok {
@@ -538,12 +554,24 @@ func ExecuteTask(id int) {
_, err = cronExec.AddFunc("*/5 * * * * *", SaveTaskResultCount(t.Id))
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
cronExec.Start()
defer cronExec.Stop()
}
// 起一个cron来更新错误日志
cronExecErrLog := cron.New(cron.WithSeconds())
_, err = cronExecErrLog.AddFunc("*/30 * * * * *", ScanErrorLogs(t))
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
cronExecErrLog.Start()
defer cronExecErrLog.Stop()
// 获得触发任务用户
user, err := model.GetUser(t.UserId)
if err != nil {
@@ -563,13 +591,8 @@ func ExecuteTask(id int) {
return
}
// 更新任务结果数
if spider.Col != "" {
if err := model.UpdateTaskResultCount(t.Id); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
}
// 完成任务收尾工作
go FinishUpTask(spider, t)
// 完成进程
t, err = model.GetTask(t.Id)
@@ -604,6 +627,22 @@ func ExecuteTask(id int) {
log.Infof(GetWorkerPrefix(id) + "task (id:" + t.Id + ")" + " finished. elapsed:" + durationStr + " sec")
}
func FinishUpTask(s model.Spider, t model.Task) {
// 更新任务结果数
go func() {
if s.Col != "" {
if err := model.UpdateTaskResultCount(t.Id); err != nil {
return
}
}
}()
// 更新任务错误日志
go func() {
ScanErrorLogs(t)()
}()
}
func SpiderFileCheck(t model.Task, spider model.Spider) error {
// 判断爬虫文件是否存在
gfFile := model.GetGridFs(spider.FileId)
@@ -642,6 +681,18 @@ func GetTaskLog(id string, keyword string, page int, pageSize int) (logItems []m
return logItems, logTotal, nil
}
func GetTaskErrorLog(id string) (errLogItems []model.ErrorLogItem, err error) {
task, err := model.GetTask(id)
if err != nil {
return
}
errLogItems, err = task.GetErrorLogItems()
if err != nil {
return
}
return errLogItems, nil
}
func CancelTask(id string) (err error) {
// 获取任务
task, err := model.GetTask(id)