Merge pull request #15 from crawlab-team/develop

Develop
This commit is contained in:
暗音
2019-12-07 10:56:04 +08:00
committed by GitHub
8 changed files with 143 additions and 18 deletions

View File

@@ -19,3 +19,9 @@ const (
TaskFinish string = "finish"
TaskCancel string = "cancel"
)
const (
RunTypeAllNodes string = "all-nodes"
RunTypeRandom string = "random"
RunTypeSelectedNodes string = "selected-nodes"
)

View File

@@ -47,7 +47,7 @@ func main() {
panic(err)
}
log.Info("初始化定期清理日志配置成功")
}else {
} else {
log.Info("默认未开启定期清理日志配置")
}
@@ -154,6 +154,7 @@ func main() {
authGroup.GET("/tasks/:id", routes.GetTask) // 任务详情
authGroup.PUT("/tasks", routes.PutTask) // 派发任务
authGroup.DELETE("/tasks/:id", routes.DeleteTask) // 删除任务
authGroup.DELETE("/tasks_by_status", routes.DeleteTaskByStatus) //删除指定状态的任务
authGroup.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务
authGroup.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志
authGroup.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果

View File

@@ -158,6 +158,8 @@ func GetTask(id string) (Task, error) {
return task, nil
}
func AddTask(item Task) error {
s, c := database.GetCol("tasks")
defer s.Close()
@@ -187,6 +189,20 @@ func RemoveTask(id string) error {
return nil
}
func RemoveTaskByStatus(status string) error {
tasks, err := GetTaskList(bson.M{"status": status}, 0, constants.Infinite, "-create_ts")
if err != nil {
log.Error("get tasks error:" + err.Error())
}
for _, task := range tasks {
if err := RemoveTask(task.Id); err != nil {
log.Error("remove task error:" + err.Error())
continue
}
}
return nil
}
// 删除task by spider_id
func RemoveTaskBySpiderId(id bson.ObjectId) error {
tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")

View File

@@ -9,7 +9,6 @@ import (
"encoding/csv"
"github.com/gin-gonic/gin"
"github.com/globalsign/mgo/bson"
uuid "github.com/satori/go.uuid"
"net/http"
)
@@ -18,6 +17,7 @@ type TaskListRequestData struct {
PageSize int `form:"page_size"`
NodeId string `form:"node_id"`
SpiderId string `form:"spider_id"`
Status string `form:"status"`
}
type TaskResultsRequestData struct {
@@ -47,6 +47,10 @@ func GetTaskList(c *gin.Context) {
if data.SpiderId != "" {
query["spider_id"] = bson.ObjectIdHex(data.SpiderId)
}
//新增根据任务状态获取task列表
if data.Status != "" {
query["status"] = data.Status
}
// 获取任务列表
tasks, err := model.GetTaskList(query, (data.PageNum-1)*data.PageSize, data.PageSize, "-create_ts")
@@ -86,31 +90,88 @@ func GetTask(c *gin.Context) {
}
func PutTask(c *gin.Context) {
// 生成任务ID
id := uuid.NewV4()
type TaskRequestBody struct {
SpiderId bson.ObjectId `json:"spider_id"`
RunType string `json:"run_type"`
NodeIds []bson.ObjectId `json:"node_ids"`
Param string `json:"param"`
}
// 绑定数据
var t model.Task
if err := c.ShouldBindJSON(&t); err != nil {
var reqBody TaskRequestBody
if err := c.ShouldBindJSON(&reqBody); err != nil {
HandleError(http.StatusBadRequest, c, err)
return
}
t.Id = id.String()
t.Status = constants.StatusPending
// 如果没有传入node_id则置为null
if t.NodeId.Hex() == "" {
t.NodeId = bson.ObjectIdHex(constants.ObjectIdNull)
if reqBody.RunType == constants.RunTypeAllNodes {
// 所有节点
nodes, err := model.GetNodeList(nil)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
for _, node := range nodes {
t := model.Task{
SpiderId: reqBody.SpiderId,
NodeId: node.Id,
Param: reqBody.Param,
}
if err := services.AddTask(t); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
} else if reqBody.RunType == constants.RunTypeRandom {
// 随机
t := model.Task{
SpiderId: reqBody.SpiderId,
Param: reqBody.Param,
}
if err := services.AddTask(t); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else if reqBody.RunType == constants.RunTypeSelectedNodes {
// 指定节点
for _, nodeId := range reqBody.NodeIds {
t := model.Task{
SpiderId: reqBody.SpiderId,
NodeId: nodeId,
Param: reqBody.Param,
}
if err := services.AddTask(t); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
} else {
HandleErrorF(http.StatusBadRequest, c, "invalid run_type")
return
}
// 将任务存入数据库
if err := model.AddTask(t); err != nil {
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
})
}
func DeleteTaskByStatus(c *gin.Context) {
status := c.Query("status")
//删除相应的日志文件
if err := services.RemoveLogByTaskStatus(status); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 加入任务队列
if err := services.AssignTask(t); err != nil {
//删除该状态下的task
if err := model.RemoveTaskByStatus(status); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}

View File

@@ -119,6 +119,18 @@ func RemoveLogByTaskId(id string) error {
return nil
}
func RemoveLogByTaskStatus(status string) error {
tasks, err := model.GetTaskList(bson.M{"status": status}, 0, constants.Infinite, "-create_ts")
if err != nil {
log.Error("get tasks error:" + err.Error())
return err
}
for _, task := range tasks {
RemoveLogByTaskId(task.Id)
}
return nil
}
func removeLog(t model.Task) {
if err := RemoveLocalLog(t.LogPath); err != nil {
log.Errorf("remove local log error: %s", err.Error())

View File

@@ -15,7 +15,7 @@ type Scheduler struct {
cron *cron.Cron
}
func AddTask(s model.Schedule) func() {
func AddScheduleTask(s model.Schedule) func() {
return func() {
node, err := model.GetNodeByKey(s.NodeKey)
if err != nil || node.Id.Hex() == "" {
@@ -97,7 +97,7 @@ func (s *Scheduler) AddJob(job model.Schedule) error {
spec := job.Cron
// 添加任务
eid, err := s.cron.AddFunc(spec, AddTask(job))
eid, err := s.cron.AddFunc(spec, AddScheduleTask(job))
if err != nil {
log.Errorf("add func task error: %s", err.Error())
debug.PrintStack()

View File

@@ -10,6 +10,8 @@ import (
"encoding/json"
"errors"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"os"
"os/exec"
@@ -590,6 +592,32 @@ func CancelTask(id string) (err error) {
return nil
}
func AddTask(t model.Task) error {
// 生成任务ID
id := uuid.NewV4()
t.Id = id.String()
// 设置任务状态
t.Status = constants.StatusPending
// 如果没有传入node_id则置为null
if t.NodeId.Hex() == "" {
t.NodeId = bson.ObjectIdHex(constants.ObjectIdNull)
}
// 将任务存入数据库
if err := model.AddTask(t); err != nil {
return err
}
// 加入任务队列
if err := AssignTask(t); err != nil {
return err
}
return nil
}
func HandleTaskError(t model.Task, err error) {
log.Error("handle task error:" + err.Error())
t.Status = constants.StatusError

View File

@@ -549,7 +549,8 @@ export default {
}
})
},
onCrawl () {
async onCrawl () {
await this.onSave()
this.crawlConfirmDialogVisible = true
this.$st.sendEv('爬虫详情-配置', '点击运行')
},