加入日志自动删除

This commit is contained in:
marvzhang
2020-04-19 13:50:06 +08:00
parent 5e6d7845f9
commit 468ee7bda0
7 changed files with 54 additions and 34 deletions

View File

@@ -45,6 +45,7 @@ setting:
runOnMaster: "Y"
demoSpiders: "N"
checkScrapy: "Y"
autoInstall: "Y"
notification:
mail:
server: ''

View File

@@ -12,20 +12,22 @@ import (
)
type LogItem struct {
Id bson.ObjectId `json:"_id" bson:"_id"`
Message string `json:"msg" bson:"msg"`
TaskId string `json:"task_id" bson:"task_id"`
Seq int64 `json:"seq" bson:"seq"`
Ts time.Time `json:"ts" bson:"ts"`
Id bson.ObjectId `json:"_id" bson:"_id"`
Message string `json:"msg" bson:"msg"`
TaskId string `json:"task_id" bson:"task_id"`
Seq int64 `json:"seq" bson:"seq"`
Ts time.Time `json:"ts" bson:"ts"`
ExpireTs time.Time `json:"expire_ts" bson:"expire_ts"`
}
type ErrorLogItem struct {
Id bson.ObjectId `json:"_id" bson:"_id"`
TaskId string `json:"task_id" bson:"task_id"`
Message string `json:"msg" bson:"msg"`
LogId bson.ObjectId `json:"log_id" bson:"log_id"`
Seq int64 `json:"seq" bson:"seq"`
Ts time.Time `json:"ts" bson:"ts"`
Id bson.ObjectId `json:"_id" bson:"_id"`
TaskId string `json:"task_id" bson:"task_id"`
Message string `json:"msg" bson:"msg"`
LogId bson.ObjectId `json:"log_id" bson:"log_id"`
Seq int64 `json:"seq" bson:"seq"`
Ts time.Time `json:"ts" bson:"ts"`
ExpireTs time.Time `json:"expire_ts" bson:"expire_ts"`
}
// 获取本地日志

View File

@@ -31,6 +31,7 @@ type UserSetting struct {
EnabledNotifications []string `json:"enabled_notifications" bson:"enabled_notifications"`
ErrorRegexPattern string `json:"error_regex_pattern" bson:"error_regex_pattern"`
MaxErrorLog int `json:"max_error_log" bson:"max_error_log"`
LogExpireDuration int64 `json:"log_expire_duration" bson:"log_expire_duration"`
}
func (user *User) Save() error {

View File

@@ -241,6 +241,7 @@ func PostMe(c *gin.Context) {
if reqBody.Setting.MaxErrorLog != 0 {
user.Setting.MaxErrorLog = reqBody.Setting.MaxErrorLog
}
user.Setting.LogExpireDuration = reqBody.Setting.LogExpireDuration
if user.UserId.Hex() == "" {
user.UserId = bson.ObjectIdHex(constants.ObjectIdNull)

View File

@@ -15,6 +15,7 @@ import (
"os"
"path/filepath"
"runtime/debug"
"time"
)
// 任务日志频道映射
@@ -141,12 +142,22 @@ func InitLogIndexes() error {
_ = c.EnsureIndex(mgo.Index{
Key: []string{"task_id", "msg"},
})
_ = c.EnsureIndex(mgo.Index{
Key: []string{"expire_ts"},
Sparse: true,
ExpireAfter: 0 * time.Second,
})
_ = ce.EnsureIndex(mgo.Index{
Key: []string{"task_id"},
})
_ = ce.EnsureIndex(mgo.Index{
Key: []string{"log_id"},
})
_ = ce.EnsureIndex(mgo.Index{
Key: []string{"expire_ts"},
Sparse: true,
ExpireAfter: 0 * time.Second,
})
return nil
}

View File

@@ -183,3 +183,7 @@ func (s *SpiderSync) Download() {
_ = database.RedisClient.HDel("spider", key)
}
func (s *SpiderSync) InstallDeps() {
//s.Spider.Src
}

View File

@@ -161,16 +161,7 @@ func SetEnv(cmd *exec.Cmd, envs []model.Env, task model.Task, spider model.Spide
return cmd
}
func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
//fLog, err := os.Create(path)
//if err != nil {
// log.Errorf("create task log file error: %s", path)
// debug.PrintStack()
// return err
//}
//cmd.Stdout = fLog
//cmd.Stderr = fLog
func SetLogConfig(cmd *exec.Cmd, t model.Task, u model.User) error {
// get stdout reader
stdout, err := cmd.StdoutPipe()
readerStdout := bufio.NewReader(stdout)
@@ -206,6 +197,13 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
}
}()
// expire duration (in seconds)
expireDuration := u.Setting.LogExpireDuration
if expireDuration == 0 {
// by default not expire
expireDuration = constants.Infinite
}
// read stdout
go func() {
for {
@@ -217,11 +215,12 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
line = strings.Replace(line, "\n", "", -1)
seq++
l := model.LogItem{
Id: bson.NewObjectId(),
Seq: seq,
Message: line,
TaskId: t.Id,
Ts: time.Now(),
Id: bson.NewObjectId(),
Seq: seq,
Message: line,
TaskId: t.Id,
Ts: time.Now(),
ExpireTs: time.Now().Add(time.Duration(expireDuration) * time.Second),
}
logs = append(logs, l)
}
@@ -238,11 +237,12 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
line = strings.Replace(line, "\n", "", -1)
seq++
l := model.LogItem{
Id: bson.NewObjectId(),
Seq: seq,
Message: line,
TaskId: t.Id,
Ts: time.Now(),
Id: bson.NewObjectId(),
Seq: seq,
Message: line,
TaskId: t.Id,
Ts: time.Now(),
ExpireTs: time.Now().Add(time.Duration(expireDuration) * time.Second),
}
logs = append(logs, l)
}
@@ -328,7 +328,7 @@ func WaitTaskProcess(cmd *exec.Cmd, t model.Task, s model.Spider) error {
}
// 执行shell命令
func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) {
func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider, u model.User) (err error) {
log.Infof("cwd: %s", cwd)
log.Infof("cmd: %s", cmdStr)
@@ -344,7 +344,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
cmd.Dir = cwd
// 日志配置
if err := SetLogConfig(cmd, t); err != nil {
if err := SetLogConfig(cmd, t, u); err != nil {
return err
}
@@ -602,7 +602,7 @@ func ExecuteTask(id int) {
}
// 执行Shell命令
if err := ExecuteShellCmd(cmd, cwd, t, spider); err != nil {
if err := ExecuteShellCmd(cmd, cwd, t, spider, user); err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
// 如果发生错误,则发送通知