优化日志分页

This commit is contained in:
marvzhang
2020-04-18 16:06:38 +08:00
parent c20f5041f8
commit 9d7b42e295
8 changed files with 136 additions and 72 deletions

View File

@@ -15,6 +15,7 @@ type LogItem struct {
Message string `json:"msg" bson:"msg"`
TaskId string `json:"task_id" bson:"task_id"`
IsError bool `json:"is_error" bson:"is_error"`
Seq int64 `json:"seq" bson:"seq"`
Ts time.Time `json:"ts" bson:"ts"`
}
@@ -65,23 +66,53 @@ func AddLogItem(l LogItem) error {
return nil
}
func GetLogItemList(filter interface{}, skip int, limit int, sortStr string) ([]LogItem, error) {
func GetLogItemList(query bson.M, keyword string, skip int, limit int, sortStr string) ([]LogItem, error) {
s, c := database.GetCol("logs")
defer s.Close()
filter := query
var logItems []LogItem
if err := c.Find(filter).Skip(skip).Limit(limit).Sort(sortStr).All(&logItems); err != nil {
debug.PrintStack()
return logItems, err
if keyword == "" {
filter["seq"] = bson.M{
"$gte": skip,
"$lt": skip + limit,
}
if err := c.Find(filter).Sort(sortStr).All(&logItems); err != nil {
debug.PrintStack()
return logItems, err
}
} else {
filter["msg"] = bson.M{
"$regex": bson.RegEx{
Pattern: keyword,
Options: "i",
},
}
if err := c.Find(filter).Sort(sortStr).Skip(skip).Limit(limit).All(&logItems); err != nil {
debug.PrintStack()
return logItems, err
}
}
return logItems, nil
}
func GetLogItemTotal(filter interface{}) (int, error) {
func GetLogItemTotal(query bson.M, keyword string) (int, error) {
s, c := database.GetCol("logs")
defer s.Close()
filter := query
if keyword != "" {
filter["msg"] = bson.M{
"$regex": bson.RegEx{
Pattern: keyword,
Options: "i",
},
}
}
total, err := c.Find(filter).Count()
if err != nil {
debug.PrintStack()

View File

@@ -114,17 +114,12 @@ func (t *Task) GetLogItems(keyword string, page int, pageSize int) (logItems []L
"task_id": t.Id,
}
if keyword != "" {
query["$text"] = bson.M{
"$search": keyword,
}
}
logItems, err = GetLogItemList(query, (page-1)*pageSize, pageSize, "+_id")
logTotal, err = GetLogItemTotal(query, keyword)
if err != nil {
return logItems, logTotal, err
}
logTotal, err = GetLogItemTotal(query)
logItems, err = GetLogItemList(query, keyword, (page-1)*pageSize, pageSize, "+_id")
if err != nil {
return logItems, logTotal, err
}

View File

@@ -7,7 +7,6 @@ import (
"crawlab/lib/cron"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
@@ -16,47 +15,11 @@ import (
"os"
"path/filepath"
"runtime/debug"
"time"
)
// 任务日志频道映射
var TaskLogChanMap = utils.NewChanMap()
// 获取远端日志
func GetRemoteLog(task model.Task) (logStr string, err error) {
// 序列化消息
msg := entity.NodeMessage{
Type: constants.MsgTypeGetLog,
LogPath: task.LogPath,
TaskId: task.Id,
}
msgBytes, err := json.Marshal(&msg)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return "", err
}
// 发布获取日志消息
channel := "nodes:" + task.NodeId.Hex()
if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil {
log.Errorf(err.Error())
return "", err
}
// 生成频道等待获取log
ch := TaskLogChanMap.ChanBlocked(task.Id)
select {
case logStr = <-ch:
log.Infof("get remote log")
case <-time.After(30 * time.Second):
logStr = "get remote log timeout"
}
return logStr, nil
}
// 定时删除日志
func DeleteLogPeriodically() {
logDir := viper.GetString("log.path")
@@ -169,9 +132,11 @@ func InitLogIndexes() error {
s, c := database.GetCol("logs")
defer s.Close()
_ = c.EnsureIndexKey("task_id")
_ = c.EnsureIndex(mgo.Index{
Key: []string{"$text:msg"},
Key: []string{"task_id", "seq"},
})
_ = c.EnsureIndex(mgo.Index{
Key: []string{"task_id", "msg"},
})
return nil

View File

@@ -189,6 +189,8 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
return err
}
var seq int64
// read stdout
go func() {
for {
@@ -197,8 +199,10 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
break
}
line = strings.Replace(line, "\n", "", -1)
seq++
_ = model.AddLogItem(model.LogItem{
Id: bson.NewObjectId(),
Seq: seq,
Message: line,
TaskId: t.Id,
IsError: false,
@@ -211,12 +215,14 @@ func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
go func() {
for {
line, err := readerStderr.ReadString('\n')
line = strings.Replace(line, "\n", "", -1)
if err != nil {
break
}
line = strings.Replace(line, "\n", "", -1)
seq++
_ = model.AddLogItem(model.LogItem{
Id: bson.NewObjectId(),
Seq: seq,
Message: line,
TaskId: t.Id,
IsError: true,

View File

@@ -8,6 +8,12 @@
style="margin-right: 10px"
>
</el-switch>
<el-switch
v-model="isLogAutoFetch"
:inactive-text="$t('Auto-Refresh')"
style="margin-right: 10px"
>
</el-switch>
<el-input
v-model="logKeyword"
size="small"
@@ -51,6 +57,7 @@
</div>
<div class="content">
<div
v-loading="isLogFetchLoading"
class="log-view-wrapper"
:class="isErrorsCollapsed ? 'errors-collapsed' : ''"
>
@@ -130,7 +137,8 @@ export default {
...mapState('task', [
'taskForm',
'taskLogTotal',
'logKeyword'
'logKeyword',
'isLogFetchLoading'
]),
...mapGetters('task', [
'logData',
@@ -176,6 +184,22 @@ export default {
this.$store.commit('task/SET_IS_LOG_AUTO_SCROLL', value)
}
},
isLogAutoFetch: {
get () {
return this.$store.state.task.isLogAutoFetch
},
set (value) {
this.$store.commit('task/SET_IS_LOG_AUTO_FETCH', value)
}
},
isLogFetchLoading: {
get () {
return this.$store.state.task.isLogFetchLoading
},
set (value) {
this.$store.commit('task/SET_IS_LOG_FETCH_LOADING', value)
}
},
filteredLogData () {
return this.logData.filter(d => {
if (!this.searchString) return true

View File

@@ -260,6 +260,7 @@ export default {
'Selected Nodes': '指定节点',
'Search Log': '搜索日志',
'Auto-Scroll': '自动滚动',
'Auto-Refresh': '自动刷新',
'Updating log...': '正在更新日志...',
'Error Count': '错误数',
'Log with errors': '日志错误',

View File

@@ -6,12 +6,6 @@ const state = {
taskList: [],
taskListTotalCount: 0,
taskForm: {},
taskLog: [],
taskLogTotal: 0,
taskLogPage: 1,
taskLogPageSize: 5000,
currentLogIndex: 0,
isLogAutoScroll: false,
taskResultsData: [],
taskResultsColumns: [],
taskResultsTotalCount: 0,
@@ -26,8 +20,16 @@ const state = {
pageNum: 1,
pageSize: 10,
// log
currentLogIndex: 0,
logKeyword: '',
errorLogData: [],
isLogAutoScroll: false,
isLogAutoFetch: false,
isLogFetchLoading: false,
taskLog: [],
taskLogTotal: 0,
taskLogPage: 1,
taskLogPageSize: 5000,
// results
resultsPageNum: 1,
resultsPageSize: 10
@@ -133,6 +135,12 @@ const mutations = {
},
SET_IS_LOG_AUTO_SCROLL (state, value) {
state.isLogAutoScroll = value
},
SET_IS_LOG_AUTO_FETCH (state, value) {
state.isLogAutoFetch = value
},
SET_IS_LOG_FETCH_LOADING (state, value) {
state.isLogFetchLoading = value
}
}

View File

@@ -15,7 +15,7 @@
<task-overview @click-log="activeTabName = 'log'"/>
</el-tab-pane>
<el-tab-pane :label="$t('Log')" name="log">
<log-view @search="getTaskLog"/>
<log-view @search="getTaskLog(true)"/>
</el-tab-pane>
<el-tab-pane :label="$t('Results')" name="results">
<div class="button-group">
@@ -137,7 +137,8 @@ export default {
'taskResultsData',
'taskResultsTotalCount',
'taskLog',
'logKeyword'
'logKeyword',
'isLogAutoFetch'
]),
...mapGetters('task', [
'taskResultsColumns'
@@ -164,6 +165,30 @@ export default {
this.$store.commit('task/SET_RESULTS_PAGE_SIZE', value)
}
},
isLogAutoScroll: {
get () {
return this.$store.state.task.isLogAutoScroll
},
set (value) {
this.$store.commit('task/SET_IS_LOG_AUTO_SCROLL', value)
}
},
isLogAutoFetch: {
get () {
return this.$store.state.task.isLogAutoFetch
},
set (value) {
this.$store.commit('task/SET_IS_LOG_AUTO_FETCH', value)
}
},
isLogFetchLoading: {
get () {
return this.$store.state.task.isLogFetchLoading
},
set (value) {
this.$store.commit('task/SET_IS_LOG_FETCH_LOADING', value)
}
},
isRunning () {
return ['pending', 'running'].includes(this.taskForm.status)
}
@@ -185,21 +210,30 @@ export default {
this.$store.dispatch('task/getTaskResultExcel', this.$route.params.id)
this.$st.sendEv('任务详情', '结果', '下载CSV')
},
getTaskLog () {
this.$store.dispatch('task/getTaskLog', { id: this.$route.params.id, keyword: this.logKeyword })
this.$store.dispatch('task/getTaskErrorLog', this.$route.params.id)
async getTaskLog (showLoading) {
if (showLoading) {
this.isLogFetchLoading = true
}
await this.$store.dispatch('task/getTaskLog', { id: this.$route.params.id, keyword: this.logKeyword })
this.isLogFetchLoading = false
await this.$store.dispatch('task/getTaskErrorLog', this.$route.params.id)
}
},
created () {
this.$store.dispatch('task/getTaskData', this.$route.params.id)
this.$store.dispatch('task/getTaskResults', this.$route.params.id)
async created () {
await this.$store.dispatch('task/getTaskData', this.$route.params.id)
this.isLogAutoFetch = !!this.isRunning
this.isLogAutoScroll = !!this.isRunning
await this.$store.dispatch('task/getTaskResults', this.$route.params.id)
this.getTaskLog()
this.handle = setInterval(() => {
if (!this.isRunning) return
this.$store.dispatch('task/getTaskData', this.$route.params.id)
this.$store.dispatch('task/getTaskResults', this.$route.params.id)
this.getTaskLog()
if (this.isLogAutoFetch) {
this.$store.dispatch('task/getTaskData', this.$route.params.id)
this.$store.dispatch('task/getTaskResults', this.$route.params.id)
this.getTaskLog()
}
}, 5000)
},
mounted () {