修复定时任务不一致问题

This commit is contained in:
marvzhang
2019-12-12 13:13:42 +08:00
parent aebf66a525
commit 26227dd0dc
5 changed files with 159 additions and 101 deletions

View File

@@ -12,15 +12,18 @@ import (
)
type Schedule struct {
Id bson.ObjectId `json:"_id" bson:"_id"`
Name string `json:"name" bson:"name"`
Description string `json:"description" bson:"description"`
SpiderId bson.ObjectId `json:"spider_id" bson:"spider_id"`
NodeId bson.ObjectId `json:"node_id" bson:"node_id"`
NodeKey string `json:"node_key" bson:"node_key"`
Cron string `json:"cron" bson:"cron"`
EntryId cron.EntryID `json:"entry_id" bson:"entry_id"`
Param string `json:"param" bson:"param"`
Id bson.ObjectId `json:"_id" bson:"_id"`
Name string `json:"name" bson:"name"`
Description string `json:"description" bson:"description"`
SpiderId bson.ObjectId `json:"spider_id" bson:"spider_id"`
//NodeId bson.ObjectId `json:"node_id" bson:"node_id"`
//NodeKey string `json:"node_key" bson:"node_key"`
Cron string `json:"cron" bson:"cron"`
EntryId cron.EntryID `json:"entry_id" bson:"entry_id"`
Param string `json:"param" bson:"param"`
RunType string `json:"run_type" bson:"run_type"`
NodeIds []bson.ObjectId `json:"node_ids" bson:"node_ids"`
// 状态
Status string `json:"status" bson:"status"`
@@ -49,26 +52,26 @@ func (sch *Schedule) Delete() error {
return c.RemoveId(sch.Id)
}
func (sch *Schedule) SyncNodeIdAndSpiderId(node Node, spider Spider) {
sch.syncNodeId(node)
sch.syncSpiderId(spider)
}
//func (sch *Schedule) SyncNodeIdAndSpiderId(node Node, spider Spider) {
// sch.syncNodeId(node)
// sch.syncSpiderId(spider)
//}
func (sch *Schedule) syncNodeId(node Node) {
if node.Id.Hex() == sch.NodeId.Hex() {
return
}
sch.NodeId = node.Id
_ = sch.Save()
}
//func (sch *Schedule) syncNodeId(node Node) {
// if node.Id.Hex() == sch.NodeId.Hex() {
// return
// }
// sch.NodeId = node.Id
// _ = sch.Save()
//}
func (sch *Schedule) syncSpiderId(spider Spider) {
if spider.Id.Hex() == sch.SpiderId.Hex() {
return
}
sch.SpiderId = spider.Id
_ = sch.Save()
}
//func (sch *Schedule) syncSpiderId(spider Spider) {
// if spider.Id.Hex() == sch.SpiderId.Hex() {
// return
// }
// sch.SpiderId = spider.Id
// _ = sch.Save()
//}
func GetScheduleList(filter interface{}) ([]Schedule, error) {
s, c := database.GetCol("schedules")
@@ -81,20 +84,20 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) {
var schs []Schedule
for _, schedule := range schedules {
// 获取节点名称
if schedule.NodeId == bson.ObjectIdHex(constants.ObjectIdNull) {
// 选择所有节点
schedule.NodeName = "All Nodes"
} else {
// 选择单一节点
node, err := GetNode(schedule.NodeId)
if err != nil {
schedule.Status = constants.ScheduleStatusError
schedule.Message = constants.ScheduleStatusErrorNotFoundNode
} else {
schedule.NodeName = node.Name
}
}
// TODO: 获取节点名称
//if schedule.NodeId == bson.ObjectIdHex(constants.ObjectIdNull) {
// // 选择所有节点
// schedule.NodeName = "All Nodes"
//} else {
// // 选择单一节点
// node, err := GetNode(schedule.NodeId)
// if err != nil {
// schedule.Status = constants.ScheduleStatusError
// schedule.Message = constants.ScheduleStatusErrorNotFoundNode
// } else {
// schedule.NodeName = node.Name
// }
//}
// 获取爬虫名称
spider, err := GetSpider(schedule.SpiderId)
@@ -130,12 +133,13 @@ func UpdateSchedule(id bson.ObjectId, item Schedule) error {
if err := c.FindId(id).One(&result); err != nil {
return err
}
node, err := GetNode(item.NodeId)
if err != nil {
return err
}
//node, err := GetNode(item.NodeId)
//if err != nil {
// return err
//}
item.NodeKey = node.Key
item.UpdateTs = time.Now()
//item.NodeKey = node.Key
if err := item.Save(); err != nil {
return err
}
@@ -146,15 +150,15 @@ func AddSchedule(item Schedule) error {
s, c := database.GetCol("schedules")
defer s.Close()
node, err := GetNode(item.NodeId)
if err != nil {
return err
}
//node, err := GetNode(item.NodeId)
//if err != nil {
// return err
//}
item.Id = bson.NewObjectId()
item.CreateTs = time.Now()
item.UpdateTs = time.Now()
item.NodeKey = node.Key
//item.NodeKey = node.Key
if err := c.Insert(&item); err != nil {
debug.PrintStack()

View File

@@ -119,7 +119,6 @@ func PutTask(c *gin.Context) {
return
}
}
} else if reqBody.RunType == constants.RunTypeRandom {
// 随机
t := model.Task{
@@ -130,7 +129,6 @@ func PutTask(c *gin.Context) {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else if reqBody.RunType == constants.RunTypeSelectedNodes {
// 指定节点
for _, nodeId := range reqBody.NodeIds {
@@ -145,7 +143,6 @@ func PutTask(c *gin.Context) {
return
}
}
} else {
HandleErrorF(http.StatusInternalServerError, c, "invalid run_type")
return

View File

@@ -7,7 +7,7 @@ import (
"errors"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"github.com/satori/go.uuid"
uuid "github.com/satori/go.uuid"
"runtime/debug"
)
@@ -19,48 +19,87 @@ type Scheduler struct {
func AddScheduleTask(s model.Schedule) func() {
return func() {
node, err := model.GetNodeByKey(s.NodeKey)
if err != nil || node.Id.Hex() == "" {
log.Errorf("get node by key error: %s", err.Error())
debug.PrintStack()
return
}
spider := model.GetSpiderByName(s.SpiderName)
if spider == nil || spider.Id.Hex() == "" {
log.Errorf("get spider by name error: %s", err.Error())
debug.PrintStack()
return
}
// 同步ID到定时任务
s.SyncNodeIdAndSpiderId(node, *spider)
// 生成任务ID
id := uuid.NewV4()
// 生成任务模型
t := model.Task{
Id: id.String(),
SpiderId: spider.Id,
NodeId: node.Id,
Status: constants.StatusPending,
Param: s.Param,
}
if s.RunType == constants.RunTypeAllNodes {
// 所有节点
nodes, err := model.GetNodeList(nil)
if err != nil {
return
}
for _, node := range nodes {
t := model.Task{
Id: id.String(),
SpiderId: s.SpiderId,
NodeId: node.Id,
Param: s.Param,
}
// 将任务存入数据库
if err := model.AddTask(t); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
if err := AddTask(t); err != nil {
return
}
if err := AssignTask(t); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
}
} else if s.RunType == constants.RunTypeRandom {
// 随机
t := model.Task{
Id: id.String(),
SpiderId: s.SpiderId,
Param: s.Param,
}
if err := AddTask(t); err != nil {
return
}
if err := AssignTask(t); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
} else if s.RunType == constants.RunTypeSelectedNodes {
// 指定节点
for _, nodeId := range s.NodeIds {
t := model.Task{
Id: id.String(),
SpiderId: s.SpiderId,
NodeId: nodeId,
Param: s.Param,
}
if err := AddTask(t); err != nil {
return
}
if err := AssignTask(t); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
}
} else {
return
}
// 加入任务队列
if err := AssignTask(t); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
//node, err := model.GetNodeByKey(s.NodeKey)
//if err != nil || node.Id.Hex() == "" {
// log.Errorf("get node by key error: %s", err.Error())
// debug.PrintStack()
// return
//}
//
//spider := model.GetSpiderByName(s.SpiderName)
//if spider == nil || spider.Id.Hex() == "" {
// log.Errorf("get spider by name error: %s", err.Error())
// debug.PrintStack()
// return
//}
//
//// 同步ID到定时任务
//s.SyncNodeIdAndSpiderId(node, *spider)
}
}

View File

@@ -1,7 +1,9 @@
import request from '../../api/request'
const state = {
scheduleList: [],
scheduleForm: {}
scheduleForm: {
node_ids: []
}
}
const getters = {}

View File

@@ -14,15 +14,15 @@
<el-form-item :label="$t('Schedule Name')" prop="name" required>
<el-input v-model="scheduleForm.name" :placeholder="$t('Schedule Name')"></el-input>
</el-form-item>
<el-form-item :label="$t('Run Type')" prop="runType" required>
<el-select v-model="scheduleForm.runType" :placeholder="$t('Run Type')">
<el-form-item :label="$t('Run Type')" prop="run_type" required>
<el-select v-model="scheduleForm.run_type" :placeholder="$t('Run Type')">
<el-option value="all-nodes" :label="$t('All Nodes')"/>
<el-option value="selected-nodes" :label="$t('Selected Nodes')"/>
<el-option value="random" :label="$t('Random')"/>
</el-select>
</el-form-item>
<el-form-item v-if="scheduleForm.runType === 'selected-nodes'" :label="$t('Node')" prop="node_id" required>
<el-select v-model="scheduleForm.node_id">
<el-form-item v-if="scheduleForm.run_type === 'selected-nodes'" :label="$t('Nodes')" prop="node_ids" required>
<el-select v-model="scheduleForm.node_ids" :placeholder="$t('Nodes')" multiple filterable>
<el-option
v-for="op in nodeList"
:key="op._id"
@@ -33,20 +33,19 @@
</el-select>
</el-form-item>
<el-form-item :label="$t('Spider')" prop="spider_id" required>
<el-select v-model="scheduleForm.spider_id" filterable>
<el-select v-model="scheduleForm.spider_id" :placeholder="$t('Spider')" filterable>
<el-option
v-for="op in spiderList"
:key="op._id"
:value="op._id"
:label="op.name"
:label="`${op.display_name} (${op.name})`"
:disabled="isDisabledSpider(op)"
>
</el-option>
</el-select>
</el-form-item>
<el-form-item :label="$t('Cron')" prop="cron" required>
<el-input style="padding-right:10px"
v-model="scheduleForm.cron"
<el-input v-model="scheduleForm.cron"
:placeholder="$t('schedules.cron')">
</el-input>
<!--<el-button size="small" style="width:100px" type="primary" @click="onShowCronDialog">{{$t('schedules.add_cron')}}</el-button>-->
@@ -116,6 +115,13 @@
</el-tag>
</template>
</el-table-column>
<el-table-column v-else-if="col.name === 'run_type'" :key="col.name" :label="$t(col.label)">
<template slot-scope="scope">
<template v-if="scope.row.run_type === 'all-nodes'">{{$t('All Nodes')}}</template>
<template v-else-if="scope.row.run_type === 'selected-nodes'">{{$t('Selected Nodes')}}</template>
<template v-else-if="scope.row.run_type === 'random'">{{$t('Random')}}</template>
</template>
</el-table-column>
<el-table-column v-else :key="col.name"
:property="col.name"
:label="$t(col.label)"
@@ -137,7 +143,7 @@
<el-tooltip :content="$t('Remove')" placement="top">
<el-button type="danger" icon="el-icon-delete" size="mini" @click="onRemove(scope.row)"></el-button>
</el-tooltip>
<el-tooltip content="暂停/运行" placement="top">
<el-tooltip v-if="false" :content="$t(getStatusTooltip(scope.row))" placement="top">
<el-button type="success" icon="fa fa-bug" size="mini" @click="onCrawl(scope.row)"></el-button>
</el-tooltip>
</template>
@@ -162,6 +168,7 @@ export default {
columns: [
{ name: 'name', label: 'Name', width: '180' },
{ name: 'cron', label: 'Cron', width: '120' },
{ name: 'run_type', label: 'Run Type', width: '150' },
{ name: 'node_name', label: 'Node', width: '150' },
{ name: 'spider_name', label: 'Spider', width: '150' },
{ name: 'param', label: 'Parameters', width: '150' },
@@ -204,7 +211,7 @@ export default {
onAdd () {
this.isEdit = false
this.dialogVisible = true
this.$store.commit('schedule/SET_SCHEDULE_FORM', {})
this.$store.commit('schedule/SET_SCHEDULE_FORM', { node_ids: [] })
this.$st.sendEv('定时任务', '添加')
},
onAddSubmit () {
@@ -308,6 +315,15 @@ export default {
} else {
return false
}
},
getStatusTooltip (row) {
if (row.status === 'stop') {
return 'Start'
} else if (row.status === 'running') {
return 'Stop'
} else if (row.status === 'error') {
return 'Start'
}
}
},
created () {