迁移日志到MongoDB

This commit is contained in:
marvzhang
2020-04-01 16:52:23 +08:00
parent a9f6967135
commit 63c95f8618
10 changed files with 222 additions and 77 deletions

View File

@@ -9,6 +9,7 @@ import (
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"io/ioutil"
@@ -162,5 +163,42 @@ func InitDeleteLogPeriodically() error {
c.Start()
return nil
}
func InitLogIndexes() error {
s, c := database.GetCol("logs")
defer s.Close()
_ = c.EnsureIndexKey("task_id")
_ = c.EnsureIndex(mgo.Index{
Key: []string{"$text:msg"},
})
return nil
}
func InitLogService() error {
logLevel := viper.GetString("log.level")
if logLevel != "" {
log.SetLevelFromString(logLevel)
}
log.Info("initialized log config successfully")
if viper.GetString("log.isDeletePeriodically") == "Y" {
if err := InitDeleteLogPeriodically(); err != nil {
log.Error("init DeletePeriodically failed")
return err
}
log.Info("initialized periodically cleaning log successfully")
} else {
log.Info("periodically cleaning log is switched off")
}
if model.IsMaster() {
if err := InitLogIndexes(); err != nil {
log.Errorf(err.Error())
return err
}
}
return nil
}

View File

@@ -1,6 +1,7 @@
package services
import (
"bufio"
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
@@ -160,15 +161,70 @@ func SetEnv(cmd *exec.Cmd, envs []model.Env, task model.Task, spider model.Spide
return cmd
}
func SetLogConfig(cmd *exec.Cmd, path string) error {
fLog, err := os.Create(path)
func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
//fLog, err := os.Create(path)
//if err != nil {
// log.Errorf("create task log file error: %s", path)
// debug.PrintStack()
// return err
//}
//cmd.Stdout = fLog
//cmd.Stderr = fLog
// get stdout reader
stdout, err := cmd.StdoutPipe()
readerStdout := bufio.NewReader(stdout)
if err != nil {
log.Errorf("create task log file error: %s", path)
log.Errorf("get stdout error: %s", err.Error())
debug.PrintStack()
return err
}
cmd.Stdout = fLog
cmd.Stderr = fLog
// get stderr reader
stderr, err := cmd.StderrPipe()
readerStderr := bufio.NewReader(stderr)
if err != nil {
log.Errorf("get stdout error: %s", err.Error())
debug.PrintStack()
return err
}
// read stdout
go func() {
for {
line, err := readerStdout.ReadString('\n')
if err != nil {
break
}
line = strings.Replace(line, "\n", "", -1)
_ = model.AddLogItem(model.LogItem{
Id: bson.NewObjectId(),
Message: line,
TaskId: t.Id,
IsError: false,
Ts: time.Now(),
})
}
}()
// read stderr
go func() {
for {
line, err := readerStderr.ReadString('\n')
line = strings.Replace(line, "\n", "", -1)
if err != nil {
break
}
_ = model.AddLogItem(model.LogItem{
Id: bson.NewObjectId(),
Message: line,
TaskId: t.Id,
IsError: true,
Ts: time.Now(),
})
}
}()
return nil
}
@@ -260,7 +316,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
cmd.Dir = cwd
// 日志配置
if err := SetLogConfig(cmd, t.LogPath); err != nil {
if err := SetLogConfig(cmd, t); err != nil {
return err
}
@@ -566,54 +622,60 @@ func SpiderFileCheck(t model.Task, spider model.Spider) error {
return nil
}
func GetTaskLog(id string) (logStr string, err error) {
func GetTaskLog(id string) (logItems []model.LogItem, err error) {
task, err := model.GetTask(id)
if err != nil {
return
}
if IsMasterNode(task.NodeId.Hex()) {
if !utils.Exists(task.LogPath) {
fileDir, err := MakeLogDir(task)
if err != nil {
log.Errorf(err.Error())
}
fileP := GetLogFilePaths(fileDir, task)
// 获取日志文件路径
fLog, err := os.Create(fileP)
defer fLog.Close()
if err != nil {
log.Errorf("create task log file error: %s", fileP)
debug.PrintStack()
}
task.LogPath = fileP
if err := task.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}
// 若为主节点,获取本机日志
logBytes, err := model.GetLocalLog(task.LogPath)
if err != nil {
log.Errorf(err.Error())
logStr = err.Error()
} else {
logStr = utils.BytesToString(logBytes)
}
return logStr, err
}
// 若不为主节点,获取远端日志
logStr, err = GetRemoteLog(task)
logItems, err = task.GetLogItems()
if err != nil {
log.Errorf(err.Error())
return logItems, err
}
return logStr, err
return logItems, nil
//if IsMasterNode(task.NodeId.Hex()) {
// if !utils.Exists(task.LogPath) {
// fileDir, err := MakeLogDir(task)
//
// if err != nil {
// log.Errorf(err.Error())
// }
//
// fileP := GetLogFilePaths(fileDir, task)
//
// // 获取日志文件路径
// fLog, err := os.Create(fileP)
// defer fLog.Close()
// if err != nil {
// log.Errorf("create task log file error: %s", fileP)
// debug.PrintStack()
// }
// task.LogPath = fileP
// if err := task.Save(); err != nil {
// log.Errorf(err.Error())
// debug.PrintStack()
// }
//
// }
// // 若为主节点,获取本机日志
// logBytes, err := model.GetLocalLog(task.LogPath)
// if err != nil {
// log.Errorf(err.Error())
// logStr = err.Error()
// } else {
// logStr = utils.BytesToString(logBytes)
// }
// return logStr, err
//}
//// 若不为主节点,获取远端日志
//logStr, err = GetRemoteLog(task)
//if err != nil {
// log.Errorf(err.Error())
//
//}
//return logStr, err
}
func CancelTask(id string) (err error) {