From 468ee7bda06fd21c5c3cfacdb85b1797217e14d2 Mon Sep 17 00:00:00 2001 From: marvzhang Date: Sun, 19 Apr 2020 13:50:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E6=97=A5=E5=BF=97=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E5=88=A0=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/conf/config.yml | 1 + backend/model/log.go | 24 ++++++------ backend/model/user.go | 1 + backend/routes/user.go | 1 + backend/services/log.go | 11 ++++++ backend/services/spider_handler/spider.go | 4 ++ backend/services/task.go | 46 +++++++++++------------ 7 files changed, 54 insertions(+), 34 deletions(-) diff --git a/backend/conf/config.yml b/backend/conf/config.yml index 08f223c5..8b39fece 100644 --- a/backend/conf/config.yml +++ b/backend/conf/config.yml @@ -45,6 +45,7 @@ setting: runOnMaster: "Y" demoSpiders: "N" checkScrapy: "Y" + autoInstall: "Y" notification: mail: server: '' diff --git a/backend/model/log.go b/backend/model/log.go index c0f38db3..fecf7def 100644 --- a/backend/model/log.go +++ b/backend/model/log.go @@ -12,20 +12,22 @@ import ( ) type LogItem struct { - Id bson.ObjectId `json:"_id" bson:"_id"` - Message string `json:"msg" bson:"msg"` - TaskId string `json:"task_id" bson:"task_id"` - Seq int64 `json:"seq" bson:"seq"` - Ts time.Time `json:"ts" bson:"ts"` + Id bson.ObjectId `json:"_id" bson:"_id"` + Message string `json:"msg" bson:"msg"` + TaskId string `json:"task_id" bson:"task_id"` + Seq int64 `json:"seq" bson:"seq"` + Ts time.Time `json:"ts" bson:"ts"` + ExpireTs time.Time `json:"expire_ts" bson:"expire_ts"` } type ErrorLogItem struct { - Id bson.ObjectId `json:"_id" bson:"_id"` - TaskId string `json:"task_id" bson:"task_id"` - Message string `json:"msg" bson:"msg"` - LogId bson.ObjectId `json:"log_id" bson:"log_id"` - Seq int64 `json:"seq" bson:"seq"` - Ts time.Time `json:"ts" bson:"ts"` + Id bson.ObjectId `json:"_id" bson:"_id"` + TaskId string `json:"task_id" bson:"task_id"` + Message string `json:"msg" bson:"msg"` + LogId bson.ObjectId `json:"log_id" bson:"log_id"` + Seq int64 `json:"seq" bson:"seq"` + Ts time.Time `json:"ts" bson:"ts"` + ExpireTs time.Time `json:"expire_ts" bson:"expire_ts"` } // 获取本地日志 diff --git a/backend/model/user.go b/backend/model/user.go index e730db38..ba693cd9 100644 --- a/backend/model/user.go +++ b/backend/model/user.go @@ -31,6 +31,7 @@ type UserSetting struct { EnabledNotifications []string `json:"enabled_notifications" bson:"enabled_notifications"` ErrorRegexPattern string `json:"error_regex_pattern" bson:"error_regex_pattern"` MaxErrorLog int `json:"max_error_log" bson:"max_error_log"` + LogExpireDuration int64 `json:"log_expire_duration" bson:"log_expire_duration"` } func (user *User) Save() error { diff --git a/backend/routes/user.go b/backend/routes/user.go index d4ce3d08..0be8a5b9 100644 --- a/backend/routes/user.go +++ b/backend/routes/user.go @@ -241,6 +241,7 @@ func PostMe(c *gin.Context) { if reqBody.Setting.MaxErrorLog != 0 { user.Setting.MaxErrorLog = reqBody.Setting.MaxErrorLog } + user.Setting.LogExpireDuration = reqBody.Setting.LogExpireDuration if user.UserId.Hex() == "" { user.UserId = bson.ObjectIdHex(constants.ObjectIdNull) diff --git a/backend/services/log.go b/backend/services/log.go index 99822820..11aaa4e9 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -15,6 +15,7 @@ import ( "os" "path/filepath" "runtime/debug" + "time" ) // 任务日志频道映射 @@ -141,12 +142,22 @@ func InitLogIndexes() error { _ = c.EnsureIndex(mgo.Index{ Key: []string{"task_id", "msg"}, }) + _ = c.EnsureIndex(mgo.Index{ + Key: []string{"expire_ts"}, + Sparse: true, + ExpireAfter: 0 * time.Second, + }) _ = ce.EnsureIndex(mgo.Index{ Key: []string{"task_id"}, }) _ = ce.EnsureIndex(mgo.Index{ Key: []string{"log_id"}, }) + _ = ce.EnsureIndex(mgo.Index{ + Key: []string{"expire_ts"}, + Sparse: true, + ExpireAfter: 0 * time.Second, + }) return nil } diff --git a/backend/services/spider_handler/spider.go b/backend/services/spider_handler/spider.go index 189fed60..f8af323b 100644 --- a/backend/services/spider_handler/spider.go +++ b/backend/services/spider_handler/spider.go @@ -183,3 +183,7 @@ func (s *SpiderSync) Download() { _ = database.RedisClient.HDel("spider", key) } + +func (s *SpiderSync) InstallDeps() { + //s.Spider.Src +} diff --git a/backend/services/task.go b/backend/services/task.go index fb513275..3be237a0 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -161,16 +161,7 @@ func SetEnv(cmd *exec.Cmd, envs []model.Env, task model.Task, spider model.Spide return cmd } -func SetLogConfig(cmd *exec.Cmd, t model.Task) error { - //fLog, err := os.Create(path) - //if err != nil { - // log.Errorf("create task log file error: %s", path) - // debug.PrintStack() - // return err - //} - //cmd.Stdout = fLog - //cmd.Stderr = fLog - +func SetLogConfig(cmd *exec.Cmd, t model.Task, u model.User) error { // get stdout reader stdout, err := cmd.StdoutPipe() readerStdout := bufio.NewReader(stdout) @@ -206,6 +197,13 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error { } }() + // expire duration (in seconds) + expireDuration := u.Setting.LogExpireDuration + if expireDuration == 0 { + // by default not expire + expireDuration = constants.Infinite + } + // read stdout go func() { for { @@ -217,11 +215,12 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error { line = strings.Replace(line, "\n", "", -1) seq++ l := model.LogItem{ - Id: bson.NewObjectId(), - Seq: seq, - Message: line, - TaskId: t.Id, - Ts: time.Now(), + Id: bson.NewObjectId(), + Seq: seq, + Message: line, + TaskId: t.Id, + Ts: time.Now(), + ExpireTs: time.Now().Add(time.Duration(expireDuration) * time.Second), } logs = append(logs, l) } @@ -238,11 +237,12 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error { line = strings.Replace(line, "\n", "", -1) seq++ l := model.LogItem{ - Id: bson.NewObjectId(), - Seq: seq, - Message: line, - TaskId: t.Id, - Ts: time.Now(), + Id: bson.NewObjectId(), + Seq: seq, + Message: line, + TaskId: t.Id, + Ts: time.Now(), + ExpireTs: time.Now().Add(time.Duration(expireDuration) * time.Second), } logs = append(logs, l) } @@ -328,7 +328,7 @@ func WaitTaskProcess(cmd *exec.Cmd, t model.Task, s model.Spider) error { } // 执行shell命令 -func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (err error) { +func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider, u model.User) (err error) { log.Infof("cwd: %s", cwd) log.Infof("cmd: %s", cmdStr) @@ -344,7 +344,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e cmd.Dir = cwd // 日志配置 - if err := SetLogConfig(cmd, t); err != nil { + if err := SetLogConfig(cmd, t, u); err != nil { return err } @@ -602,7 +602,7 @@ func ExecuteTask(id int) { } // 执行Shell命令 - if err := ExecuteShellCmd(cmd, cwd, t, spider); err != nil { + if err := ExecuteShellCmd(cmd, cwd, t, spider, user); err != nil { log.Errorf(GetWorkerPrefix(id) + err.Error()) // 如果发生错误,则发送通知