加入日志异常检测

This commit is contained in:
marvzhang
2020-04-19 10:32:36 +08:00
parent 3633331879
commit 59b42ddde2
7 changed files with 177 additions and 14 deletions

View File

@@ -131,6 +131,9 @@ func InitDeleteLogPeriodically() error {
func InitLogIndexes() error {
s, c := database.GetCol("logs")
defer s.Close()
se, ce := database.GetCol("error_logs")
defer s.Close()
defer se.Close()
_ = c.EnsureIndex(mgo.Index{
Key: []string{"task_id", "seq"},
@@ -138,6 +141,12 @@ func InitLogIndexes() error {
_ = c.EnsureIndex(mgo.Index{
Key: []string{"task_id", "msg"},
})
_ = ce.EnsureIndex(mgo.Index{
Key: []string{"task_id"},
})
_ = ce.EnsureIndex(mgo.Index{
Key: []string{"log_id"},
})
return nil
}

View File

@@ -205,7 +205,6 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
Seq: seq,
Message: line,
TaskId: t.Id,
IsError: false,
Ts: time.Now(),
})
}
@@ -225,7 +224,6 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
Seq: seq,
Message: line,
TaskId: t.Id,
IsError: true,
Ts: time.Now(),
})
}
@@ -234,7 +232,7 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
return nil
}
func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) {
func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, s model.Spider, t model.Task) {
// 传入信号,此处阻塞
signal := <-ch
log.Infof("process received signal: %s", signal)
@@ -265,6 +263,8 @@ func FinishOrCancelTask(ch chan string, cmd *exec.Cmd, t model.Task) {
t.FinishTs = time.Now()
_ = t.Save()
go FinishUpTask(s, t)
}
func StartTaskProcess(cmd *exec.Cmd, t model.Task) error {
@@ -281,7 +281,7 @@ func StartTaskProcess(cmd *exec.Cmd, t model.Task) error {
return nil
}
func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error {
func WaitTaskProcess(cmd *exec.Cmd, t model.Task, s model.Spider) error {
if err := cmd.Wait(); err != nil {
log.Errorf("wait process finish error: %s", err.Error())
debug.PrintStack()
@@ -297,11 +297,14 @@ func WaitTaskProcess(cmd *exec.Cmd, t model.Task) error {
t.FinishTs = time.Now()
t.Status = constants.StatusError
_ = t.Save()
FinishUpTask(s, t)
}
}
return err
}
return nil
}
@@ -347,7 +350,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
// 起一个goroutine来监控进程
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
go FinishOrCancelTask(ch, cmd, t)
go FinishOrCancelTask(ch, cmd, s, t)
// kill的时候可以kill所有的子进程
if runtime.GOOS != constants.Windows {
@@ -360,7 +363,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
}
// 同步等待进程完成
if err := WaitTaskProcess(cmd, t); err != nil {
if err := WaitTaskProcess(cmd, t, s); err != nil {
return err
}
ch <- constants.TaskFinish
@@ -418,6 +421,19 @@ func SaveTaskResultCount(id string) func() {
}
}
// Scan Error Logs
func ScanErrorLogs(t model.Task) func() {
return func() {
u, err := model.GetUser(t.UserId)
if err != nil {
return
}
if err := model.UpdateTaskErrorLogs(t.Id, u.Setting.ErrorRegexPattern); err != nil {
return
}
}
}
// 执行任务
func ExecuteTask(id int) {
if flag, ok := LockList.Load(id); ok {
@@ -538,12 +554,24 @@ func ExecuteTask(id int) {
_, err = cronExec.AddFunc("*/5 * * * * *", SaveTaskResultCount(t.Id))
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
cronExec.Start()
defer cronExec.Stop()
}
// 起一个cron来更新错误日志
cronExecErrLog := cron.New(cron.WithSeconds())
_, err = cronExecErrLog.AddFunc("*/30 * * * * *", ScanErrorLogs(t))
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
cronExecErrLog.Start()
defer cronExecErrLog.Stop()
// 获得触发任务用户
user, err := model.GetUser(t.UserId)
if err != nil {
@@ -563,13 +591,8 @@ func ExecuteTask(id int) {
return
}
// 更新任务结果数
if spider.Col != "" {
if err := model.UpdateTaskResultCount(t.Id); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
return
}
}
// 完成任务收尾工作
go FinishUpTask(spider, t)
// 完成进程
t, err = model.GetTask(t.Id)
@@ -604,6 +627,22 @@ func ExecuteTask(id int) {
log.Infof(GetWorkerPrefix(id) + "task (id:" + t.Id + ")" + " finished. elapsed:" + durationStr + " sec")
}
func FinishUpTask(s model.Spider, t model.Task) {
// 更新任务结果数
go func() {
if s.Col != "" {
if err := model.UpdateTaskResultCount(t.Id); err != nil {
return
}
}
}()
// 更新任务错误日志
go func() {
ScanErrorLogs(t)()
}()
}
func SpiderFileCheck(t model.Task, spider model.Spider) error {
// 判断爬虫文件是否存在
gfFile := model.GetGridFs(spider.FileId)
@@ -642,6 +681,18 @@ func GetTaskLog(id string, keyword string, page int, pageSize int) (logItems []m
return logItems, logTotal, nil
}
func GetTaskErrorLog(id string) (errLogItems []model.ErrorLogItem, err error) {
task, err := model.GetTask(id)
if err != nil {
return
}
errLogItems, err = task.GetErrorLogItems()
if err != nil {
return
}
return errLogItems, nil
}
func CancelTask(id string) (err error) {
// 获取任务
task, err := model.GetTask(id)