added part of the system tasks logic

This commit is contained in:
marvzhang
2020-07-30 23:06:11 +08:00
parent ec81344e20
commit d525099568
13 changed files with 258 additions and 56 deletions

View File

@@ -33,6 +33,7 @@ server:
java: "N"
dotnet: "N"
php: "N"
scripts: "/app/backend/scripts"
spider:
path: "/app/spiders"
task:

View File

@@ -25,3 +25,8 @@ const (
RunTypeRandom string = "random"
RunTypeSelectedNodes string = "selected-nodes"
)
const (
TaskTypeSpider string = "spider"
TaskTypeSystem string = "system"
)

View File

@@ -254,6 +254,11 @@ func main() {
authGroup.POST("/tasks-cancel", routes.CancelSelectedTask) // 批量取消任务
authGroup.POST("/tasks-restart", routes.RestartSelectedTask) // 批量重试任务
}
// 系统任务/脚本
{
authGroup.PUT("/system-tasks", routes.PutSystemTask) // 运行系统任务
authGroup.GET("/system-scripts", routes.GetSystemScripts) // 获取系统脚本列表
}
// 定时任务
{
authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表

View File

@@ -29,6 +29,7 @@ type Task struct {
Pid int `json:"pid" bson:"pid"`
RunType string `json:"run_type" bson:"run_type"`
ScheduleId bson.ObjectId `json:"schedule_id" bson:"schedule_id"`
Type string `json:"type" bson:"type"`
// 前端数据
SpiderName string `json:"spider_name"`

View File

@@ -812,6 +812,7 @@ func RunSelectedSpider(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeAllNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)
@@ -830,6 +831,7 @@ func RunSelectedSpider(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeRandom,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)
if err != nil {
@@ -847,6 +849,7 @@ func RunSelectedSpider(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeSelectedNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)

View File

@@ -0,0 +1,116 @@
package routes
import (
"crawlab/constants"
"crawlab/model"
"crawlab/services"
"crawlab/utils"
"fmt"
"github.com/gin-gonic/gin"
"github.com/globalsign/mgo/bson"
"net/http"
)
func GetSystemScripts(c *gin.Context) {
HandleSuccessData(c, utils.GetSystemScripts())
}
func PutSystemTask(c *gin.Context) {
type TaskRequestBody struct {
RunType string `json:"run_type"`
NodeIds []bson.ObjectId `json:"node_ids"`
Script string `json:"script"`
}
// 绑定数据
var reqBody TaskRequestBody
if err := c.ShouldBindJSON(&reqBody); err != nil {
HandleError(http.StatusBadRequest, c, err)
return
}
// 校验脚本参数不为空
if reqBody.Script == "" {
HandleErrorF(http.StatusBadRequest, c, "script cannot be empty")
return
}
// 校验脚本参数是否存在
var allScripts = utils.GetSystemScripts()
if !utils.StringArrayContains(allScripts, reqBody.Script) {
HandleErrorF(http.StatusBadRequest, c, "script does not exist")
return
}
// 获取执行命令
cmd := fmt.Sprintf("sh %s", utils.GetSystemScriptPath(reqBody.Script))
// 任务ID
var taskIds []string
if reqBody.RunType == constants.RunTypeAllNodes {
// 所有节点
nodes, err := model.GetNodeList(nil)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
for _, node := range nodes {
t := model.Task{
SpiderId: bson.ObjectIdHex(constants.ObjectIdNull),
NodeId: node.Id,
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeAllNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSystem,
Cmd: cmd,
}
id, err := services.AddTask(t)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
taskIds = append(taskIds, id)
}
} else if reqBody.RunType == constants.RunTypeRandom {
// 随机
t := model.Task{
SpiderId: bson.ObjectIdHex(constants.ObjectIdNull),
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeRandom,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSystem,
Cmd: cmd,
}
id, err := services.AddTask(t)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
taskIds = append(taskIds, id)
} else if reqBody.RunType == constants.RunTypeSelectedNodes {
// 指定节点
for _, nodeId := range reqBody.NodeIds {
t := model.Task{
SpiderId: bson.ObjectIdHex(constants.ObjectIdNull),
NodeId: nodeId,
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeSelectedNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSystem,
Cmd: cmd,
}
id, err := services.AddTask(t)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
taskIds = append(taskIds, id)
}
} else {
HandleErrorF(http.StatusInternalServerError, c, "invalid run_type")
return
}
HandleSuccessData(c, taskIds)
}

View File

@@ -19,6 +19,7 @@ type TaskListRequestData struct {
SpiderId string `form:"spider_id"`
ScheduleId string `form:"schedule_id"`
Status string `form:"status"`
Type string `form:"type"`
}
type TaskResultsRequestData struct {
@@ -64,6 +65,9 @@ func GetTaskList(c *gin.Context) {
if data.ScheduleId != "" {
query["schedule_id"] = bson.ObjectIdHex(data.ScheduleId)
}
if data.Type != "" {
query["type"] = data.Type
}
// 获取校验
query = services.GetAuthQuery(query, c)
@@ -150,6 +154,7 @@ func PutTask(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeAllNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)
@@ -168,6 +173,7 @@ func PutTask(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeRandom,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)
if err != nil {
@@ -185,6 +191,7 @@ func PutTask(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeSelectedNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)
@@ -225,6 +232,7 @@ func PutBatchTasks(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeAllNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)
@@ -242,6 +250,7 @@ func PutBatchTasks(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeRandom,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)
if err != nil {
@@ -259,6 +268,7 @@ func PutBatchTasks(c *gin.Context) {
UserId: services.GetCurrentUserId(c),
RunType: constants.RunTypeSelectedNodes,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: constants.TaskTypeSpider,
}
id, err := services.AddTask(t)

View File

@@ -58,6 +58,7 @@ func AddScheduleTask(s model.Schedule) func() {
UserId: s.UserId,
RunType: constants.RunTypeAllNodes,
ScheduleId: s.Id,
Type: constants.TaskTypeSpider,
}
if _, err := AddTask(t); err != nil {
@@ -73,6 +74,7 @@ func AddScheduleTask(s model.Schedule) func() {
UserId: s.UserId,
RunType: constants.RunTypeRandom,
ScheduleId: s.Id,
Type: constants.TaskTypeSpider,
}
if _, err := AddTask(t); err != nil {
log.Errorf(err.Error())
@@ -90,6 +92,7 @@ func AddScheduleTask(s model.Schedule) func() {
UserId: s.UserId,
RunType: constants.RunTypeSelectedNodes,
ScheduleId: s.Id,
Type: constants.TaskTypeSpider,
}
if _, err := AddTask(t); err != nil {

View File

@@ -542,20 +542,15 @@ func ExecuteTask(id int) {
}
// 获取爬虫
spider, err := t.GetSpider()
if err != nil {
log.Errorf("execute task, get spider error: %s", err.Error())
return
var spider model.Spider
if t.Type == constants.TaskTypeSpider {
spider, err = t.GetSpider()
if err != nil {
log.Errorf("execute task, get spider error: %s", err.Error())
return
}
}
// 创建日志目录
var fileDir string
if fileDir, err = MakeLogDir(t); err != nil {
return
}
// 获取日志文件路径
t.LogPath = GetLogFilePaths(fileDir, t)
// 工作目录
cwd := filepath.Join(
viper.GetString("spider.path"),
@@ -564,12 +559,19 @@ func ExecuteTask(id int) {
// 执行命令
var cmd string
if spider.Type == constants.Configurable {
// 可配置爬虫命令
cmd = "scrapy crawl config_spider"
} else {
// 自定义爬虫命令
cmd = spider.Cmd
if t.Type == constants.TaskTypeSpider {
// 爬虫任务
if spider.Type == constants.Configurable {
// 可配置爬虫命令
cmd = "scrapy crawl config_spider"
} else {
// 自定义爬虫命令
cmd = spider.Cmd
}
t.Cmd = cmd
} else if t.Type == constants.TaskTypeSystem {
// 系统任务
cmd = t.Cmd
}
// 加入参数
@@ -590,48 +592,51 @@ func ExecuteTask(id int) {
t.Status = constants.StatusRunning // 任务状态
t.WaitDuration = t.StartTs.Sub(t.CreateTs).Seconds() // 等待时长
// 发送 Web Hook 请求 (任务开始)
go SendWebHookRequest(user, t, spider)
// 文件检查
if err := SpiderFileCheck(t, spider); err != nil {
log.Errorf("spider file check error: %s", err.Error())
return
}
// 开始执行任务
log.Infof(GetWorkerPrefix(id) + "start task (id:" + t.Id + ")")
// 储存任务
_ = t.Save()
// 创建结果集索引
go func() {
col := utils.GetSpiderCol(spider.Col, spider.Name)
CreateResultsIndexes(col)
}()
// 发送 Web Hook 请求 (任务开始)
go SendWebHookRequest(user, t, spider)
// 起一个cron执行器来统计任务结果数
cronExec := cron.New(cron.WithSeconds())
_, err = cronExec.AddFunc("*/5 * * * * *", SaveTaskResultCount(t.Id))
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
cronExec.Start()
defer cronExec.Stop()
// 爬虫任务专属逻辑
if t.Type == constants.TaskTypeSpider {
// 文件检查
if err := SpiderFileCheck(t, spider); err != nil {
log.Errorf("spider file check error: %s", err.Error())
return
}
// 起一个cron来更新错误日志
cronExecErrLog := cron.New(cron.WithSeconds())
_, err = cronExecErrLog.AddFunc("*/30 * * * * *", ScanErrorLogs(t))
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
// 开始执行任务
log.Infof(GetWorkerPrefix(id) + "start task (id:" + t.Id + ")")
// 创建结果集索引
go func() {
col := utils.GetSpiderCol(spider.Col, spider.Name)
CreateResultsIndexes(col)
}()
// 起一个cron执行器来统计任务结果数
cronExec := cron.New(cron.WithSeconds())
_, err = cronExec.AddFunc("*/5 * * * * *", SaveTaskResultCount(t.Id))
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
cronExec.Start()
defer cronExec.Stop()
// 起一个cron来更新错误日志
cronExecErrLog := cron.New(cron.WithSeconds())
_, err = cronExecErrLog.AddFunc("*/30 * * * * *", ScanErrorLogs(t))
if err != nil {
log.Errorf(GetWorkerPrefix(id) + err.Error())
debug.PrintStack()
return
}
cronExecErrLog.Start()
defer cronExecErrLog.Stop()
}
cronExecErrLog.Start()
defer cronExecErrLog.Stop()
// 执行Shell命令
if err := ExecuteShellCmd(cmd, cwd, t, spider, user); err != nil {
@@ -813,6 +818,7 @@ func RestartTask(id string, uid bson.ObjectId) (err error) {
UserId: uid,
RunType: oldTask.RunType,
ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull),
Type: oldTask.Type,
}
// 加入任务队列

View File

@@ -5,8 +5,11 @@ import (
"crawlab/entity"
"encoding/json"
"github.com/apex/log"
"github.com/spf13/viper"
"io/ioutil"
"path"
"runtime/debug"
"strings"
)
func GetLangList() []entity.Lang {
@@ -123,3 +126,24 @@ func GetPackageJsonDeps(filepath string) (deps []string, err error) {
return deps, nil
}
// 获取系统脚本列表
func GetSystemScripts() (res []string) {
scriptsPath := viper.GetString("server.scripts")
for _, fInfo := range ListDir(scriptsPath) {
if !fInfo.IsDir() && strings.HasSuffix(fInfo.Name(), ".sh") {
res = append(res, fInfo.Name())
}
}
return res
}
func GetSystemScriptPath(scriptName string) string {
scriptsPath := viper.GetString("server.scripts")
for _, name := range GetSystemScripts() {
if name == scriptName {
return path.Join(scriptsPath, name)
}
}
return ""
}

View File

@@ -284,6 +284,9 @@ export default {
'Start Time': '开始时间',
'Finish Time': '结束时间',
'Update Time': '更新时间',
'Type': '类别',
'Spider Tasks': '爬虫任务',
'System Tasks': '系统任务',
// 部署
'Time': '时间',

View File

@@ -13,7 +13,8 @@ const state = {
node_id: '',
spider_id: '',
status: '',
schedule_id: ''
schedule_id: '',
type: 'spider'
},
// pagination
pageNum: 1,
@@ -174,7 +175,8 @@ const actions = {
node_id: state.filter.node_id || undefined,
spider_id: state.filter.spider_id || undefined,
status: state.filter.status || undefined,
schedule_id: state.filter.schedule_id || undefined
schedule_id: state.filter.schedule_id || undefined,
type: state.filter.type || undefined
})
.then(response => {
commit('SET_TASK_LIST', response.data.data || [])

View File

@@ -14,6 +14,24 @@
<div class="filter">
<div class="left">
<el-form class="filter-form" :model="filter" label-width="100px" label-position="right" inline>
<el-form-item :label="$t('Type')">
<el-button-group>
<el-button
size="small"
:type="filter.type === 'spider' ? 'primary' : ''"
@click="onClickType('spider')"
>
{{ $t('Spider Tasks') }}
</el-button>
<el-button
size="small"
:type="filter.type === 'system' ? 'primary' : ''"
@click="onClickType('system')"
>
{{ $t('System Tasks') }}
</el-button>
</el-button-group>
</el-form-item>
<el-form-item prop="node_id" :label="$t('Node')">
<el-select v-model="filter.node_id" size="small" :placeholder="$t('Node')" @change="onFilterChange">
<el-option value="" :label="$t('All')" />
@@ -584,6 +602,11 @@
onFilterChange() {
this.$store.dispatch('task/getTaskList')
this.$st.sendEv('任务列表', '筛选任务')
},
onClickType(type) {
this.$set(this.filter, 'type', type)
this.$store.dispatch('task/getTaskList')
this.$st.sendEv('任务列表', '选择类别', type)
}
}
}