diff --git a/backend/conf/config.yml b/backend/conf/config.yml index 9a38fa70..bc06935b 100644 --- a/backend/conf/config.yml +++ b/backend/conf/config.yml @@ -33,6 +33,7 @@ server: java: "N" dotnet: "N" php: "N" + scripts: "/app/backend/scripts" spider: path: "/app/spiders" task: diff --git a/backend/constants/task.go b/backend/constants/task.go index 63144e8b..08539432 100644 --- a/backend/constants/task.go +++ b/backend/constants/task.go @@ -25,3 +25,8 @@ const ( RunTypeRandom string = "random" RunTypeSelectedNodes string = "selected-nodes" ) + +const ( + TaskTypeSpider string = "spider" + TaskTypeSystem string = "system" +) diff --git a/backend/main.go b/backend/main.go index 170aede4..86dccf0a 100644 --- a/backend/main.go +++ b/backend/main.go @@ -254,6 +254,11 @@ func main() { authGroup.POST("/tasks-cancel", routes.CancelSelectedTask) // 批量取消任务 authGroup.POST("/tasks-restart", routes.RestartSelectedTask) // 批量重试任务 } + // 系统任务/脚本 + { + authGroup.PUT("/system-tasks", routes.PutSystemTask) // 运行系统任务 + authGroup.GET("/system-scripts", routes.GetSystemScripts) // 获取系统脚本列表 + } // 定时任务 { authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表 diff --git a/backend/model/task.go b/backend/model/task.go index 6b8d9121..bcc37359 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -29,6 +29,7 @@ type Task struct { Pid int `json:"pid" bson:"pid"` RunType string `json:"run_type" bson:"run_type"` ScheduleId bson.ObjectId `json:"schedule_id" bson:"schedule_id"` + Type string `json:"type" bson:"type"` // 前端数据 SpiderName string `json:"spider_name"` diff --git a/backend/routes/spider.go b/backend/routes/spider.go index 310d802b..87ec7c26 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -812,6 +812,7 @@ func RunSelectedSpider(c *gin.Context) { UserId: services.GetCurrentUserId(c), RunType: constants.RunTypeAllNodes, ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSpider, } id, err := services.AddTask(t) @@ -830,6 +831,7 @@ func RunSelectedSpider(c *gin.Context) { UserId: services.GetCurrentUserId(c), RunType: constants.RunTypeRandom, ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSpider, } id, err := services.AddTask(t) if err != nil { @@ -847,6 +849,7 @@ func RunSelectedSpider(c *gin.Context) { UserId: services.GetCurrentUserId(c), RunType: constants.RunTypeSelectedNodes, ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSpider, } id, err := services.AddTask(t) diff --git a/backend/routes/system_tasks.go b/backend/routes/system_tasks.go new file mode 100644 index 00000000..3945f475 --- /dev/null +++ b/backend/routes/system_tasks.go @@ -0,0 +1,116 @@ +package routes + +import ( + "crawlab/constants" + "crawlab/model" + "crawlab/services" + "crawlab/utils" + "fmt" + "github.com/gin-gonic/gin" + "github.com/globalsign/mgo/bson" + "net/http" +) + +func GetSystemScripts(c *gin.Context) { + HandleSuccessData(c, utils.GetSystemScripts()) +} + +func PutSystemTask(c *gin.Context) { + type TaskRequestBody struct { + RunType string `json:"run_type"` + NodeIds []bson.ObjectId `json:"node_ids"` + Script string `json:"script"` + } + + // 绑定数据 + var reqBody TaskRequestBody + if err := c.ShouldBindJSON(&reqBody); err != nil { + HandleError(http.StatusBadRequest, c, err) + return + } + + // 校验脚本参数不为空 + if reqBody.Script == "" { + HandleErrorF(http.StatusBadRequest, c, "script cannot be empty") + return + } + + // 校验脚本参数是否存在 + var allScripts = utils.GetSystemScripts() + if !utils.StringArrayContains(allScripts, reqBody.Script) { + HandleErrorF(http.StatusBadRequest, c, "script does not exist") + return + } + + // 获取执行命令 + cmd := fmt.Sprintf("sh %s", utils.GetSystemScriptPath(reqBody.Script)) + + // 任务ID + var taskIds []string + + if reqBody.RunType == constants.RunTypeAllNodes { + // 所有节点 + nodes, err := model.GetNodeList(nil) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + for _, node := range nodes { + t := model.Task{ + SpiderId: bson.ObjectIdHex(constants.ObjectIdNull), + NodeId: node.Id, + UserId: services.GetCurrentUserId(c), + RunType: constants.RunTypeAllNodes, + ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSystem, + Cmd: cmd, + } + id, err := services.AddTask(t) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + taskIds = append(taskIds, id) + } + } else if reqBody.RunType == constants.RunTypeRandom { + // 随机 + t := model.Task{ + SpiderId: bson.ObjectIdHex(constants.ObjectIdNull), + UserId: services.GetCurrentUserId(c), + RunType: constants.RunTypeRandom, + ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSystem, + Cmd: cmd, + } + id, err := services.AddTask(t) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + taskIds = append(taskIds, id) + } else if reqBody.RunType == constants.RunTypeSelectedNodes { + // 指定节点 + for _, nodeId := range reqBody.NodeIds { + t := model.Task{ + SpiderId: bson.ObjectIdHex(constants.ObjectIdNull), + NodeId: nodeId, + UserId: services.GetCurrentUserId(c), + RunType: constants.RunTypeSelectedNodes, + ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSystem, + Cmd: cmd, + } + id, err := services.AddTask(t) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + taskIds = append(taskIds, id) + } + } else { + HandleErrorF(http.StatusInternalServerError, c, "invalid run_type") + return + } + + HandleSuccessData(c, taskIds) +} diff --git a/backend/routes/task.go b/backend/routes/task.go index d8ea39a1..2a74f4e8 100644 --- a/backend/routes/task.go +++ b/backend/routes/task.go @@ -19,6 +19,7 @@ type TaskListRequestData struct { SpiderId string `form:"spider_id"` ScheduleId string `form:"schedule_id"` Status string `form:"status"` + Type string `form:"type"` } type TaskResultsRequestData struct { @@ -64,6 +65,9 @@ func GetTaskList(c *gin.Context) { if data.ScheduleId != "" { query["schedule_id"] = bson.ObjectIdHex(data.ScheduleId) } + if data.Type != "" { + query["type"] = data.Type + } // 获取校验 query = services.GetAuthQuery(query, c) @@ -150,6 +154,7 @@ func PutTask(c *gin.Context) { UserId: services.GetCurrentUserId(c), RunType: constants.RunTypeAllNodes, ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSpider, } id, err := services.AddTask(t) @@ -168,6 +173,7 @@ func PutTask(c *gin.Context) { UserId: services.GetCurrentUserId(c), RunType: constants.RunTypeRandom, ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSpider, } id, err := services.AddTask(t) if err != nil { @@ -185,6 +191,7 @@ func PutTask(c *gin.Context) { UserId: services.GetCurrentUserId(c), RunType: constants.RunTypeSelectedNodes, ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSpider, } id, err := services.AddTask(t) @@ -225,6 +232,7 @@ func PutBatchTasks(c *gin.Context) { UserId: services.GetCurrentUserId(c), RunType: constants.RunTypeAllNodes, ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSpider, } id, err := services.AddTask(t) @@ -242,6 +250,7 @@ func PutBatchTasks(c *gin.Context) { UserId: services.GetCurrentUserId(c), RunType: constants.RunTypeRandom, ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSpider, } id, err := services.AddTask(t) if err != nil { @@ -259,6 +268,7 @@ func PutBatchTasks(c *gin.Context) { UserId: services.GetCurrentUserId(c), RunType: constants.RunTypeSelectedNodes, ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: constants.TaskTypeSpider, } id, err := services.AddTask(t) diff --git a/backend/services/schedule.go b/backend/services/schedule.go index 591fe4fb..017fba7d 100644 --- a/backend/services/schedule.go +++ b/backend/services/schedule.go @@ -58,6 +58,7 @@ func AddScheduleTask(s model.Schedule) func() { UserId: s.UserId, RunType: constants.RunTypeAllNodes, ScheduleId: s.Id, + Type: constants.TaskTypeSpider, } if _, err := AddTask(t); err != nil { @@ -73,6 +74,7 @@ func AddScheduleTask(s model.Schedule) func() { UserId: s.UserId, RunType: constants.RunTypeRandom, ScheduleId: s.Id, + Type: constants.TaskTypeSpider, } if _, err := AddTask(t); err != nil { log.Errorf(err.Error()) @@ -90,6 +92,7 @@ func AddScheduleTask(s model.Schedule) func() { UserId: s.UserId, RunType: constants.RunTypeSelectedNodes, ScheduleId: s.Id, + Type: constants.TaskTypeSpider, } if _, err := AddTask(t); err != nil { diff --git a/backend/services/task.go b/backend/services/task.go index 905bb6a7..247a230f 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -542,20 +542,15 @@ func ExecuteTask(id int) { } // 获取爬虫 - spider, err := t.GetSpider() - if err != nil { - log.Errorf("execute task, get spider error: %s", err.Error()) - return + var spider model.Spider + if t.Type == constants.TaskTypeSpider { + spider, err = t.GetSpider() + if err != nil { + log.Errorf("execute task, get spider error: %s", err.Error()) + return + } } - // 创建日志目录 - var fileDir string - if fileDir, err = MakeLogDir(t); err != nil { - return - } - // 获取日志文件路径 - t.LogPath = GetLogFilePaths(fileDir, t) - // 工作目录 cwd := filepath.Join( viper.GetString("spider.path"), @@ -564,12 +559,19 @@ func ExecuteTask(id int) { // 执行命令 var cmd string - if spider.Type == constants.Configurable { - // 可配置爬虫命令 - cmd = "scrapy crawl config_spider" - } else { - // 自定义爬虫命令 - cmd = spider.Cmd + if t.Type == constants.TaskTypeSpider { + // 爬虫任务 + if spider.Type == constants.Configurable { + // 可配置爬虫命令 + cmd = "scrapy crawl config_spider" + } else { + // 自定义爬虫命令 + cmd = spider.Cmd + } + t.Cmd = cmd + } else if t.Type == constants.TaskTypeSystem { + // 系统任务 + cmd = t.Cmd } // 加入参数 @@ -590,48 +592,51 @@ func ExecuteTask(id int) { t.Status = constants.StatusRunning // 任务状态 t.WaitDuration = t.StartTs.Sub(t.CreateTs).Seconds() // 等待时长 - // 发送 Web Hook 请求 (任务开始) - go SendWebHookRequest(user, t, spider) - - // 文件检查 - if err := SpiderFileCheck(t, spider); err != nil { - log.Errorf("spider file check error: %s", err.Error()) - return - } - - // 开始执行任务 - log.Infof(GetWorkerPrefix(id) + "start task (id:" + t.Id + ")") - // 储存任务 _ = t.Save() - // 创建结果集索引 - go func() { - col := utils.GetSpiderCol(spider.Col, spider.Name) - CreateResultsIndexes(col) - }() + // 发送 Web Hook 请求 (任务开始) + go SendWebHookRequest(user, t, spider) - // 起一个cron执行器来统计任务结果数 - cronExec := cron.New(cron.WithSeconds()) - _, err = cronExec.AddFunc("*/5 * * * * *", SaveTaskResultCount(t.Id)) - if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() - return - } - cronExec.Start() - defer cronExec.Stop() + // 爬虫任务专属逻辑 + if t.Type == constants.TaskTypeSpider { + // 文件检查 + if err := SpiderFileCheck(t, spider); err != nil { + log.Errorf("spider file check error: %s", err.Error()) + return + } - // 起一个cron来更新错误日志 - cronExecErrLog := cron.New(cron.WithSeconds()) - _, err = cronExecErrLog.AddFunc("*/30 * * * * *", ScanErrorLogs(t)) - if err != nil { - log.Errorf(GetWorkerPrefix(id) + err.Error()) - debug.PrintStack() - return + // 开始执行任务 + log.Infof(GetWorkerPrefix(id) + "start task (id:" + t.Id + ")") + + // 创建结果集索引 + go func() { + col := utils.GetSpiderCol(spider.Col, spider.Name) + CreateResultsIndexes(col) + }() + + // 起一个cron执行器来统计任务结果数 + cronExec := cron.New(cron.WithSeconds()) + _, err = cronExec.AddFunc("*/5 * * * * *", SaveTaskResultCount(t.Id)) + if err != nil { + log.Errorf(GetWorkerPrefix(id) + err.Error()) + debug.PrintStack() + return + } + cronExec.Start() + defer cronExec.Stop() + + // 起一个cron来更新错误日志 + cronExecErrLog := cron.New(cron.WithSeconds()) + _, err = cronExecErrLog.AddFunc("*/30 * * * * *", ScanErrorLogs(t)) + if err != nil { + log.Errorf(GetWorkerPrefix(id) + err.Error()) + debug.PrintStack() + return + } + cronExecErrLog.Start() + defer cronExecErrLog.Stop() } - cronExecErrLog.Start() - defer cronExecErrLog.Stop() // 执行Shell命令 if err := ExecuteShellCmd(cmd, cwd, t, spider, user); err != nil { @@ -813,6 +818,7 @@ func RestartTask(id string, uid bson.ObjectId) (err error) { UserId: uid, RunType: oldTask.RunType, ScheduleId: bson.ObjectIdHex(constants.ObjectIdNull), + Type: oldTask.Type, } // 加入任务队列 diff --git a/backend/utils/system.go b/backend/utils/system.go index 3732e89f..e6d2f591 100644 --- a/backend/utils/system.go +++ b/backend/utils/system.go @@ -5,8 +5,11 @@ import ( "crawlab/entity" "encoding/json" "github.com/apex/log" + "github.com/spf13/viper" "io/ioutil" + "path" "runtime/debug" + "strings" ) func GetLangList() []entity.Lang { @@ -123,3 +126,24 @@ func GetPackageJsonDeps(filepath string) (deps []string, err error) { return deps, nil } + +// 获取系统脚本列表 +func GetSystemScripts() (res []string) { + scriptsPath := viper.GetString("server.scripts") + for _, fInfo := range ListDir(scriptsPath) { + if !fInfo.IsDir() && strings.HasSuffix(fInfo.Name(), ".sh") { + res = append(res, fInfo.Name()) + } + } + return res +} + +func GetSystemScriptPath(scriptName string) string { + scriptsPath := viper.GetString("server.scripts") + for _, name := range GetSystemScripts() { + if name == scriptName { + return path.Join(scriptsPath, name) + } + } + return "" +} diff --git a/frontend/src/i18n/zh.js b/frontend/src/i18n/zh.js index 32b30cbd..6cf7900c 100644 --- a/frontend/src/i18n/zh.js +++ b/frontend/src/i18n/zh.js @@ -284,6 +284,9 @@ export default { 'Start Time': '开始时间', 'Finish Time': '结束时间', 'Update Time': '更新时间', + 'Type': '类别', + 'Spider Tasks': '爬虫任务', + 'System Tasks': '系统任务', // 部署 'Time': '时间', diff --git a/frontend/src/store/modules/task.js b/frontend/src/store/modules/task.js index e0014cb7..45a56977 100644 --- a/frontend/src/store/modules/task.js +++ b/frontend/src/store/modules/task.js @@ -13,7 +13,8 @@ const state = { node_id: '', spider_id: '', status: '', - schedule_id: '' + schedule_id: '', + type: 'spider' }, // pagination pageNum: 1, @@ -174,7 +175,8 @@ const actions = { node_id: state.filter.node_id || undefined, spider_id: state.filter.spider_id || undefined, status: state.filter.status || undefined, - schedule_id: state.filter.schedule_id || undefined + schedule_id: state.filter.schedule_id || undefined, + type: state.filter.type || undefined }) .then(response => { commit('SET_TASK_LIST', response.data.data || []) diff --git a/frontend/src/views/task/TaskList.vue b/frontend/src/views/task/TaskList.vue index 16076860..d2814ec4 100644 --- a/frontend/src/views/task/TaskList.vue +++ b/frontend/src/views/task/TaskList.vue @@ -14,6 +14,24 @@
+ + + + {{ $t('Spider Tasks') }} + + + {{ $t('System Tasks') }} + + + @@ -584,6 +602,11 @@ onFilterChange() { this.$store.dispatch('task/getTaskList') this.$st.sendEv('任务列表', '筛选任务') + }, + onClickType(type) { + this.$set(this.filter, 'type', type) + this.$store.dispatch('task/getTaskList') + this.$st.sendEv('任务列表', '选择类别', type) } } }