diff --git a/backend/model/log.go b/backend/model/log.go index 07816d34..4307c617 100644 --- a/backend/model/log.go +++ b/backend/model/log.go @@ -15,6 +15,7 @@ type LogItem struct { Message string `json:"msg" bson:"msg"` TaskId string `json:"task_id" bson:"task_id"` IsError bool `json:"is_error" bson:"is_error"` + Seq int64 `json:"seq" bson:"seq"` Ts time.Time `json:"ts" bson:"ts"` } @@ -65,23 +66,53 @@ func AddLogItem(l LogItem) error { return nil } -func GetLogItemList(filter interface{}, skip int, limit int, sortStr string) ([]LogItem, error) { +func GetLogItemList(query bson.M, keyword string, skip int, limit int, sortStr string) ([]LogItem, error) { s, c := database.GetCol("logs") defer s.Close() + filter := query + var logItems []LogItem - if err := c.Find(filter).Skip(skip).Limit(limit).Sort(sortStr).All(&logItems); err != nil { - debug.PrintStack() - return logItems, err + if keyword == "" { + filter["seq"] = bson.M{ + "$gte": skip, + "$lt": skip + limit, + } + if err := c.Find(filter).Sort(sortStr).All(&logItems); err != nil { + debug.PrintStack() + return logItems, err + } + } else { + filter["msg"] = bson.M{ + "$regex": bson.RegEx{ + Pattern: keyword, + Options: "i", + }, + } + if err := c.Find(filter).Sort(sortStr).Skip(skip).Limit(limit).All(&logItems); err != nil { + debug.PrintStack() + return logItems, err + } } return logItems, nil } -func GetLogItemTotal(filter interface{}) (int, error) { +func GetLogItemTotal(query bson.M, keyword string) (int, error) { s, c := database.GetCol("logs") defer s.Close() + filter := query + + if keyword != "" { + filter["msg"] = bson.M{ + "$regex": bson.RegEx{ + Pattern: keyword, + Options: "i", + }, + } + } + total, err := c.Find(filter).Count() if err != nil { debug.PrintStack() diff --git a/backend/model/task.go b/backend/model/task.go index 06bab66b..0b53b226 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -114,17 +114,12 @@ func (t *Task) GetLogItems(keyword string, page int, pageSize int) (logItems []L "task_id": t.Id, } - if keyword != "" { - query["$text"] = bson.M{ - "$search": keyword, - } - } - - logItems, err = GetLogItemList(query, (page-1)*pageSize, pageSize, "+_id") + logTotal, err = GetLogItemTotal(query, keyword) if err != nil { return logItems, logTotal, err } - logTotal, err = GetLogItemTotal(query) + + logItems, err = GetLogItemList(query, keyword, (page-1)*pageSize, pageSize, "+_id") if err != nil { return logItems, logTotal, err } diff --git a/backend/services/log.go b/backend/services/log.go index d2055886..cc621c4c 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -7,7 +7,6 @@ import ( "crawlab/lib/cron" "crawlab/model" "crawlab/utils" - "encoding/json" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" @@ -16,47 +15,11 @@ import ( "os" "path/filepath" "runtime/debug" - "time" ) // 任务日志频道映射 var TaskLogChanMap = utils.NewChanMap() -// 获取远端日志 -func GetRemoteLog(task model.Task) (logStr string, err error) { - // 序列化消息 - msg := entity.NodeMessage{ - Type: constants.MsgTypeGetLog, - LogPath: task.LogPath, - TaskId: task.Id, - } - msgBytes, err := json.Marshal(&msg) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return "", err - } - - // 发布获取日志消息 - channel := "nodes:" + task.NodeId.Hex() - if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil { - log.Errorf(err.Error()) - return "", err - } - - // 生成频道,等待获取log - ch := TaskLogChanMap.ChanBlocked(task.Id) - - select { - case logStr = <-ch: - log.Infof("get remote log") - case <-time.After(30 * time.Second): - logStr = "get remote log timeout" - } - - return logStr, nil -} - // 定时删除日志 func DeleteLogPeriodically() { logDir := viper.GetString("log.path") @@ -169,9 +132,11 @@ func InitLogIndexes() error { s, c := database.GetCol("logs") defer s.Close() - _ = c.EnsureIndexKey("task_id") _ = c.EnsureIndex(mgo.Index{ - Key: []string{"$text:msg"}, + Key: []string{"task_id", "seq"}, + }) + _ = c.EnsureIndex(mgo.Index{ + Key: []string{"task_id", "msg"}, }) return nil diff --git a/backend/services/task.go b/backend/services/task.go index 3a237ea0..e12f9551 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -189,6 +189,8 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error { return err } + var seq int64 + // read stdout go func() { for { @@ -197,8 +199,10 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error { break } line = strings.Replace(line, "\n", "", -1) + seq++ _ = model.AddLogItem(model.LogItem{ Id: bson.NewObjectId(), + Seq: seq, Message: line, TaskId: t.Id, IsError: false, @@ -211,12 +215,14 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error { go func() { for { line, err := readerStderr.ReadString('\n') - line = strings.Replace(line, "\n", "", -1) if err != nil { break } + line = strings.Replace(line, "\n", "", -1) + seq++ _ = model.AddLogItem(model.LogItem{ Id: bson.NewObjectId(), + Seq: seq, Message: line, TaskId: t.Id, IsError: true, diff --git a/frontend/src/components/ScrollView/LogView.vue b/frontend/src/components/ScrollView/LogView.vue index 97882b96..b9fdfddc 100644 --- a/frontend/src/components/ScrollView/LogView.vue +++ b/frontend/src/components/ScrollView/LogView.vue @@ -8,6 +8,12 @@ style="margin-right: 10px" > + +
@@ -130,7 +137,8 @@ export default { ...mapState('task', [ 'taskForm', 'taskLogTotal', - 'logKeyword' + 'logKeyword', + 'isLogFetchLoading' ]), ...mapGetters('task', [ 'logData', @@ -176,6 +184,22 @@ export default { this.$store.commit('task/SET_IS_LOG_AUTO_SCROLL', value) } }, + isLogAutoFetch: { + get () { + return this.$store.state.task.isLogAutoFetch + }, + set (value) { + this.$store.commit('task/SET_IS_LOG_AUTO_FETCH', value) + } + }, + isLogFetchLoading: { + get () { + return this.$store.state.task.isLogFetchLoading + }, + set (value) { + this.$store.commit('task/SET_IS_LOG_FETCH_LOADING', value) + } + }, filteredLogData () { return this.logData.filter(d => { if (!this.searchString) return true diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index 25cf246d..1471c155 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -260,6 +260,7 @@ export default { 'Selected Nodes': '指定节点', 'Search Log': '搜索日志', 'Auto-Scroll': '自动滚动', + 'Auto-Refresh': '自动刷新', 'Updating log...': '正在更新日志...', 'Error Count': '错误数', 'Log with errors': '日志错误', diff --git a/frontend/src/store/modules/task.js b/frontend/src/store/modules/task.js index 3fe46dd7..55a668a0 100644 --- a/frontend/src/store/modules/task.js +++ b/frontend/src/store/modules/task.js @@ -6,12 +6,6 @@ const state = { taskList: [], taskListTotalCount: 0, taskForm: {}, - taskLog: [], - taskLogTotal: 0, - taskLogPage: 1, - taskLogPageSize: 5000, - currentLogIndex: 0, - isLogAutoScroll: false, taskResultsData: [], taskResultsColumns: [], taskResultsTotalCount: 0, @@ -26,8 +20,16 @@ const state = { pageNum: 1, pageSize: 10, // log + currentLogIndex: 0, logKeyword: '', errorLogData: [], + isLogAutoScroll: false, + isLogAutoFetch: false, + isLogFetchLoading: false, + taskLog: [], + taskLogTotal: 0, + taskLogPage: 1, + taskLogPageSize: 5000, // results resultsPageNum: 1, resultsPageSize: 10 @@ -133,6 +135,12 @@ const mutations = { }, SET_IS_LOG_AUTO_SCROLL (state, value) { state.isLogAutoScroll = value + }, + SET_IS_LOG_AUTO_FETCH (state, value) { + state.isLogAutoFetch = value + }, + SET_IS_LOG_FETCH_LOADING (state, value) { + state.isLogFetchLoading = value } } diff --git a/frontend/src/views/task/TaskDetail.vue b/frontend/src/views/task/TaskDetail.vue index 3e298c9f..5a34e2bd 100644 --- a/frontend/src/views/task/TaskDetail.vue +++ b/frontend/src/views/task/TaskDetail.vue @@ -15,7 +15,7 @@ - +
@@ -137,7 +137,8 @@ export default { 'taskResultsData', 'taskResultsTotalCount', 'taskLog', - 'logKeyword' + 'logKeyword', + 'isLogAutoFetch' ]), ...mapGetters('task', [ 'taskResultsColumns' @@ -164,6 +165,30 @@ export default { this.$store.commit('task/SET_RESULTS_PAGE_SIZE', value) } }, + isLogAutoScroll: { + get () { + return this.$store.state.task.isLogAutoScroll + }, + set (value) { + this.$store.commit('task/SET_IS_LOG_AUTO_SCROLL', value) + } + }, + isLogAutoFetch: { + get () { + return this.$store.state.task.isLogAutoFetch + }, + set (value) { + this.$store.commit('task/SET_IS_LOG_AUTO_FETCH', value) + } + }, + isLogFetchLoading: { + get () { + return this.$store.state.task.isLogFetchLoading + }, + set (value) { + this.$store.commit('task/SET_IS_LOG_FETCH_LOADING', value) + } + }, isRunning () { return ['pending', 'running'].includes(this.taskForm.status) } @@ -185,21 +210,30 @@ export default { this.$store.dispatch('task/getTaskResultExcel', this.$route.params.id) this.$st.sendEv('任务详情', '结果', '下载CSV') }, - getTaskLog () { - this.$store.dispatch('task/getTaskLog', { id: this.$route.params.id, keyword: this.logKeyword }) - this.$store.dispatch('task/getTaskErrorLog', this.$route.params.id) + async getTaskLog (showLoading) { + if (showLoading) { + this.isLogFetchLoading = true + } + await this.$store.dispatch('task/getTaskLog', { id: this.$route.params.id, keyword: this.logKeyword }) + this.isLogFetchLoading = false + await this.$store.dispatch('task/getTaskErrorLog', this.$route.params.id) } }, - created () { - this.$store.dispatch('task/getTaskData', this.$route.params.id) - this.$store.dispatch('task/getTaskResults', this.$route.params.id) + async created () { + await this.$store.dispatch('task/getTaskData', this.$route.params.id) + + this.isLogAutoFetch = !!this.isRunning + this.isLogAutoScroll = !!this.isRunning + + await this.$store.dispatch('task/getTaskResults', this.$route.params.id) this.getTaskLog() this.handle = setInterval(() => { - if (!this.isRunning) return - this.$store.dispatch('task/getTaskData', this.$route.params.id) - this.$store.dispatch('task/getTaskResults', this.$route.params.id) - this.getTaskLog() + if (this.isLogAutoFetch) { + this.$store.dispatch('task/getTaskData', this.$route.params.id) + this.$store.dispatch('task/getTaskResults', this.$route.params.id) + this.getTaskLog() + } }, 5000) }, mounted () {