From 63c95f861891d3a78ec55267dbb1efeb4259eecd Mon Sep 17 00:00:00 2001 From: marvzhang Date: Wed, 1 Apr 2020 16:52:23 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=81=E7=A7=BB=E6=97=A5=E5=BF=97=E5=88=B0Mo?= =?UTF-8?q?ngoDB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/main.go | 29 +--- backend/model/log.go | 35 ++++ backend/model/task.go | 13 ++ backend/routes/task.go | 4 +- backend/services/log.go | 40 ++++- backend/services/task.go | 156 ++++++++++++------ devops/master/mongo-pv.yaml | 4 +- .../src/components/ScrollView/LogItem.vue | 6 + .../src/components/ScrollView/LogView.vue | 1 + frontend/src/store/modules/task.js | 11 +- 10 files changed, 222 insertions(+), 77 deletions(-) diff --git a/backend/main.go b/backend/main.go index b494a8b3..0a04c70f 100644 --- a/backend/main.go +++ b/backend/main.go @@ -34,31 +34,13 @@ func main() { panic(err) } log.Info("initialized config successfully") - - // 初始化日志设置 - logLevel := viper.GetString("log.level") - if logLevel != "" { - log.SetLevelFromString(logLevel) - } - log.Info("initialized log config successfully") - if viper.GetString("log.isDeletePeriodically") == "Y" { - err := services.InitDeleteLogPeriodically() - if err != nil { - log.Error("init DeletePeriodically failed") - panic(err) - } - log.Info("initialized periodically cleaning log successfully") - } else { - log.Info("periodically cleaning log is switched off") - } - // 初始化Mongodb数据库 if err := database.InitMongo(); err != nil { log.Error("init mongodb error:" + err.Error()) debug.PrintStack() panic(err) } - log.Info("initialized MongoDB successfully") + log.Info("initialized mongodb successfully") // 初始化Redis数据库 if err := database.InitRedis(); err != nil { @@ -66,7 +48,14 @@ func main() { debug.PrintStack() panic(err) } - log.Info("initialized Redis successfully") + log.Info("initialized redis successfully") + + // 初始化日志设置 + if err := services.InitLogService(); err != nil { + log.Error("init log error:" + err.Error()) + panic(err) + } + log.Info("initialized log successfully") if model.IsMaster() { // 初始化定时任务 diff --git a/backend/model/log.go b/backend/model/log.go index 77e5094f..32d77694 100644 --- a/backend/model/log.go +++ b/backend/model/log.go @@ -1,12 +1,23 @@ package model import ( + "crawlab/database" "crawlab/utils" "github.com/apex/log" + "github.com/globalsign/mgo/bson" "os" "runtime/debug" + "time" ) +type LogItem struct { + Id bson.ObjectId `json:"_id" bson:"_id"` + Message string `json:"msg" bson:"msg"` + TaskId string `json:"task_id" bson:"task_id"` + IsError bool `json:"is_error" bson:"is_error"` + Ts time.Time `json:"ts" bson:"ts"` +} + // 获取本地日志 func GetLocalLog(logPath string) (fileBytes []byte, err error) { @@ -42,3 +53,27 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) { logBuf = logBuf[:n] return logBuf, nil } + +func AddLogItem(l LogItem) error { + s, c := database.GetCol("logs") + defer s.Close() + if err := c.Insert(l); err != nil { + log.Errorf("insert log error: " + err.Error()) + debug.PrintStack() + return err + } + return nil +} + +func GetLogItemList(filter interface{}, skip int, limit int, sortStr string) ([]LogItem, error) { + s, c := database.GetCol("logs") + defer s.Close() + + var logItems []LogItem + if err := c.Find(filter).Skip(skip).Limit(limit).Sort(sortStr).All(&logItems); err != nil { + debug.PrintStack() + return logItems, err + } + + return logItems, nil +} diff --git a/backend/model/task.go b/backend/model/task.go index 75edd631..24076409 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -109,6 +109,19 @@ func (t *Task) GetResults(pageNum int, pageSize int) (results []interface{}, tot return } +func (t *Task) GetLogItems() (logItems []LogItem, err error) { + query := bson.M{ + "task_id": t.Id, + } + + logItems, err = GetLogItemList(query, 0, constants.Infinite, "+_id") + if err != nil { + return logItems, err + } + + return logItems, nil +} + func GetTaskList(filter interface{}, skip int, limit int, sortKey string) ([]Task, error) { s, c := database.GetCol("tasks") defer s.Close() diff --git a/backend/routes/task.go b/backend/routes/task.go index ff674766..9239d057 100644 --- a/backend/routes/task.go +++ b/backend/routes/task.go @@ -235,12 +235,12 @@ func DeleteTask(c *gin.Context) { func GetTaskLog(c *gin.Context) { id := c.Param("id") - logStr, err := services.GetTaskLog(id) + logItems, err := services.GetTaskLog(id) if err != nil { HandleError(http.StatusInternalServerError, c, err) return } - HandleSuccessData(c, logStr) + HandleSuccessData(c, logItems) } func GetTaskResults(c *gin.Context) { diff --git a/backend/services/log.go b/backend/services/log.go index 2034794d..d2055886 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -9,6 +9,7 @@ import ( "crawlab/utils" "encoding/json" "github.com/apex/log" + "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "github.com/spf13/viper" "io/ioutil" @@ -162,5 +163,42 @@ func InitDeleteLogPeriodically() error { c.Start() return nil - +} + +func InitLogIndexes() error { + s, c := database.GetCol("logs") + defer s.Close() + + _ = c.EnsureIndexKey("task_id") + _ = c.EnsureIndex(mgo.Index{ + Key: []string{"$text:msg"}, + }) + + return nil +} + +func InitLogService() error { + logLevel := viper.GetString("log.level") + if logLevel != "" { + log.SetLevelFromString(logLevel) + } + log.Info("initialized log config successfully") + if viper.GetString("log.isDeletePeriodically") == "Y" { + if err := InitDeleteLogPeriodically(); err != nil { + log.Error("init DeletePeriodically failed") + return err + } + log.Info("initialized periodically cleaning log successfully") + } else { + log.Info("periodically cleaning log is switched off") + } + + if model.IsMaster() { + if err := InitLogIndexes(); err != nil { + log.Errorf(err.Error()) + return err + } + } + + return nil } diff --git a/backend/services/task.go b/backend/services/task.go index 0a9392f9..cf47c632 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -1,6 +1,7 @@ package services import ( + "bufio" "crawlab/constants" "crawlab/database" "crawlab/entity" @@ -160,15 +161,70 @@ func SetEnv(cmd *exec.Cmd, envs []model.Env, task model.Task, spider model.Spide return cmd } -func SetLogConfig(cmd *exec.Cmd, path string) error { - fLog, err := os.Create(path) +func SetLogConfig(cmd *exec.Cmd, t model.Task) error { + //fLog, err := os.Create(path) + //if err != nil { + // log.Errorf("create task log file error: %s", path) + // debug.PrintStack() + // return err + //} + //cmd.Stdout = fLog + //cmd.Stderr = fLog + + // get stdout reader + stdout, err := cmd.StdoutPipe() + readerStdout := bufio.NewReader(stdout) if err != nil { - log.Errorf("create task log file error: %s", path) + log.Errorf("get stdout error: %s", err.Error()) debug.PrintStack() return err } - cmd.Stdout = fLog - cmd.Stderr = fLog + + // get stderr reader + stderr, err := cmd.StderrPipe() + readerStderr := bufio.NewReader(stderr) + if err != nil { + log.Errorf("get stdout error: %s", err.Error()) + debug.PrintStack() + return err + } + + // read stdout + go func() { + for { + line, err := readerStdout.ReadString('\n') + if err != nil { + break + } + line = strings.Replace(line, "\n", "", -1) + _ = model.AddLogItem(model.LogItem{ + Id: bson.NewObjectId(), + Message: line, + TaskId: t.Id, + IsError: false, + Ts: time.Now(), + }) + } + }() + + // read stderr + go func() { + for { + line, err := readerStderr.ReadString('\n') + line = strings.Replace(line, "\n", "", -1) + if err != nil { + break + } + _ = model.AddLogItem(model.LogItem{ + Id: bson.NewObjectId(), + Message: line, + TaskId: t.Id, + IsError: true, + Ts: time.Now(), + }) + } + }() + return nil } @@ -260,7 +316,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e cmd.Dir = cwd // 日志配置 - if err := SetLogConfig(cmd, t.LogPath); err != nil { + if err := SetLogConfig(cmd, t); err != nil { return err } @@ -566,54 +622,60 @@ func SpiderFileCheck(t model.Task, spider model.Spider) error { return nil } -func GetTaskLog(id string) (logStr string, err error) { +func GetTaskLog(id string) (logItems []model.LogItem, err error) { task, err := model.GetTask(id) - if err != nil { return } - if IsMasterNode(task.NodeId.Hex()) { - if !utils.Exists(task.LogPath) { - fileDir, err := MakeLogDir(task) - - if err != nil { - log.Errorf(err.Error()) - } - - fileP := GetLogFilePaths(fileDir, task) - - // 获取日志文件路径 - fLog, err := os.Create(fileP) - defer fLog.Close() - if err != nil { - log.Errorf("create task log file error: %s", fileP) - debug.PrintStack() - } - task.LogPath = fileP - if err := task.Save(); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - } - - } - // 若为主节点,获取本机日志 - logBytes, err := model.GetLocalLog(task.LogPath) - if err != nil { - log.Errorf(err.Error()) - logStr = err.Error() - } else { - logStr = utils.BytesToString(logBytes) - } - return logStr, err - } - // 若不为主节点,获取远端日志 - logStr, err = GetRemoteLog(task) + logItems, err = task.GetLogItems() if err != nil { - log.Errorf(err.Error()) - + return logItems, err } - return logStr, err + + return logItems, nil + + //if IsMasterNode(task.NodeId.Hex()) { + // if !utils.Exists(task.LogPath) { + // fileDir, err := MakeLogDir(task) + // + // if err != nil { + // log.Errorf(err.Error()) + // } + // + // fileP := GetLogFilePaths(fileDir, task) + // + // // 获取日志文件路径 + // fLog, err := os.Create(fileP) + // defer fLog.Close() + // if err != nil { + // log.Errorf("create task log file error: %s", fileP) + // debug.PrintStack() + // } + // task.LogPath = fileP + // if err := task.Save(); err != nil { + // log.Errorf(err.Error()) + // debug.PrintStack() + // } + // + // } + // // 若为主节点,获取本机日志 + // logBytes, err := model.GetLocalLog(task.LogPath) + // if err != nil { + // log.Errorf(err.Error()) + // logStr = err.Error() + // } else { + // logStr = utils.BytesToString(logBytes) + // } + // return logStr, err + //} + //// 若不为主节点,获取远端日志 + //logStr, err = GetRemoteLog(task) + //if err != nil { + // log.Errorf(err.Error()) + // + //} + //return logStr, err } func CancelTask(id string) (err error) { diff --git a/devops/master/mongo-pv.yaml b/devops/master/mongo-pv.yaml index ad914131..648bb8ae 100644 --- a/devops/master/mongo-pv.yaml +++ b/devops/master/mongo-pv.yaml @@ -8,7 +8,7 @@ metadata: spec: storageClassName: manual capacity: - storage: 10Gi + storage: 3Gi accessModes: - ReadWriteOnce hostPath: @@ -25,4 +25,4 @@ spec: - ReadWriteOnce resources: requests: - storage: 10Gi \ No newline at end of file + storage: 3Gi \ No newline at end of file diff --git a/frontend/src/components/ScrollView/LogItem.vue b/frontend/src/components/ScrollView/LogItem.vue index 87a986dc..7257a5fd 100644 --- a/frontend/src/components/ScrollView/LogItem.vue +++ b/frontend/src/components/ScrollView/LogItem.vue @@ -20,6 +20,12 @@ export default { type: Number, default: 1 }, + logItem: { + type: Object, + default () { + return {} + } + }, data: { type: String, default: '' diff --git a/frontend/src/components/ScrollView/LogView.vue b/frontend/src/components/ScrollView/LogView.vue index a062b5ae..6e806e41 100644 --- a/frontend/src/components/ScrollView/LogView.vue +++ b/frontend/src/components/ScrollView/LogView.vue @@ -158,6 +158,7 @@ export default { // https://vuejs.org/v2/guide/render-function.html#createElement-Arguments props: { index: logItem.index, + logItem, data: isAnsi ? convert.toHtml(logItem.data) : logItem.data, searchString: this.searchString, active: logItem.active, diff --git a/frontend/src/store/modules/task.js b/frontend/src/store/modules/task.js index 23a08bda..0ec36957 100644 --- a/frontend/src/store/modules/task.js +++ b/frontend/src/store/modules/task.js @@ -6,7 +6,7 @@ const state = { taskList: [], taskListTotalCount: 0, taskForm: {}, - taskLog: '', + taskLog: [], currentLogIndex: 0, taskResultsData: [], taskResultsColumns: [], @@ -41,12 +41,13 @@ const getters = { return keys }, logData (state) { - const data = state.taskLog.split('\n') + const data = state.taskLog .map((d, i) => { return { index: i + 1, - data: d, - active: state.currentLogIndex === i + 1 + active: state.currentLogIndex === i + 1, + data: d.msg, + ...d } }) if (state.taskForm && state.taskForm.status === 'running') { @@ -151,7 +152,7 @@ const actions = { getTaskLog ({ state, commit }, id) { return request.get(`/tasks/${id}/log`) .then(response => { - commit('SET_TASK_LOG', response.data.data) + commit('SET_TASK_LOG', response.data.data || []) }) }, getTaskResults ({ state, commit }, id) {