diff --git a/backend/constants/task.go b/backend/constants/task.go index b6fb615c..63144e8b 100644 --- a/backend/constants/task.go +++ b/backend/constants/task.go @@ -19,3 +19,9 @@ const ( TaskFinish string = "finish" TaskCancel string = "cancel" ) + +const ( + RunTypeAllNodes string = "all-nodes" + RunTypeRandom string = "random" + RunTypeSelectedNodes string = "selected-nodes" +) diff --git a/backend/main.go b/backend/main.go index 92863a20..3f87125d 100644 --- a/backend/main.go +++ b/backend/main.go @@ -47,7 +47,7 @@ func main() { panic(err) } log.Info("初始化定期清理日志配置成功") - }else { + } else { log.Info("默认未开启定期清理日志配置") } @@ -154,6 +154,7 @@ func main() { authGroup.GET("/tasks/:id", routes.GetTask) // 任务详情 authGroup.PUT("/tasks", routes.PutTask) // 派发任务 authGroup.DELETE("/tasks/:id", routes.DeleteTask) // 删除任务 + authGroup.DELETE("/tasks_by_status", routes.DeleteTaskByStatus) //删除指定状态的任务 authGroup.POST("/tasks/:id/cancel", routes.CancelTask) // 取消任务 authGroup.GET("/tasks/:id/log", routes.GetTaskLog) // 任务日志 authGroup.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果 diff --git a/backend/model/task.go b/backend/model/task.go index 64f06cd7..588db6b3 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -158,6 +158,8 @@ func GetTask(id string) (Task, error) { return task, nil } + + func AddTask(item Task) error { s, c := database.GetCol("tasks") defer s.Close() @@ -187,6 +189,20 @@ func RemoveTask(id string) error { return nil } +func RemoveTaskByStatus(status string) error { + tasks, err := GetTaskList(bson.M{"status": status}, 0, constants.Infinite, "-create_ts") + if err != nil { + log.Error("get tasks error:" + err.Error()) + } + for _, task := range tasks { + if err := RemoveTask(task.Id); err != nil { + log.Error("remove task error:" + err.Error()) + continue + } + } + return nil +} + // 删除task by spider_id func RemoveTaskBySpiderId(id bson.ObjectId) error { tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts") diff --git a/backend/routes/task.go b/backend/routes/task.go index 9c0aa43f..3d6edece 100644 --- a/backend/routes/task.go +++ b/backend/routes/task.go @@ -9,7 +9,6 @@ import ( "encoding/csv" "github.com/gin-gonic/gin" "github.com/globalsign/mgo/bson" - uuid "github.com/satori/go.uuid" "net/http" ) @@ -18,6 +17,7 @@ type TaskListRequestData struct { PageSize int `form:"page_size"` NodeId string `form:"node_id"` SpiderId string `form:"spider_id"` + Status string `form:"status"` } type TaskResultsRequestData struct { @@ -47,6 +47,10 @@ func GetTaskList(c *gin.Context) { if data.SpiderId != "" { query["spider_id"] = bson.ObjectIdHex(data.SpiderId) } + //新增根据任务状态获取task列表 + if data.Status != "" { + query["status"] = data.Status + } // 获取任务列表 tasks, err := model.GetTaskList(query, (data.PageNum-1)*data.PageSize, data.PageSize, "-create_ts") @@ -86,31 +90,88 @@ func GetTask(c *gin.Context) { } func PutTask(c *gin.Context) { - // 生成任务ID - id := uuid.NewV4() + type TaskRequestBody struct { + SpiderId bson.ObjectId `json:"spider_id"` + RunType string `json:"run_type"` + NodeIds []bson.ObjectId `json:"node_ids"` + Param string `json:"param"` + } // 绑定数据 - var t model.Task - if err := c.ShouldBindJSON(&t); err != nil { + var reqBody TaskRequestBody + if err := c.ShouldBindJSON(&reqBody); err != nil { HandleError(http.StatusBadRequest, c, err) return } - t.Id = id.String() - t.Status = constants.StatusPending - // 如果没有传入node_id,则置为null - if t.NodeId.Hex() == "" { - t.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) + if reqBody.RunType == constants.RunTypeAllNodes { + // 所有节点 + nodes, err := model.GetNodeList(nil) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + for _, node := range nodes { + t := model.Task{ + SpiderId: reqBody.SpiderId, + NodeId: node.Id, + Param: reqBody.Param, + } + + if err := services.AddTask(t); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + } + + } else if reqBody.RunType == constants.RunTypeRandom { + // 随机 + t := model.Task{ + SpiderId: reqBody.SpiderId, + Param: reqBody.Param, + } + if err := services.AddTask(t); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + + } else if reqBody.RunType == constants.RunTypeSelectedNodes { + // 指定节点 + for _, nodeId := range reqBody.NodeIds { + t := model.Task{ + SpiderId: reqBody.SpiderId, + NodeId: nodeId, + Param: reqBody.Param, + } + + if err := services.AddTask(t); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + } + + } else { + HandleErrorF(http.StatusBadRequest, c, "invalid run_type") + return } - // 将任务存入数据库 - if err := model.AddTask(t); err != nil { + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + }) +} + +func DeleteTaskByStatus(c *gin.Context) { + status := c.Query("status") + + //删除相应的日志文件 + if err := services.RemoveLogByTaskStatus(status); err != nil { HandleError(http.StatusInternalServerError, c, err) return } - // 加入任务队列 - if err := services.AssignTask(t); err != nil { + //删除该状态下的task + if err := model.RemoveTaskByStatus(status); err != nil { HandleError(http.StatusInternalServerError, c, err) return } diff --git a/backend/services/log.go b/backend/services/log.go index 5b5cd7ae..60909c61 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -119,6 +119,18 @@ func RemoveLogByTaskId(id string) error { return nil } +func RemoveLogByTaskStatus(status string) error { + tasks, err := model.GetTaskList(bson.M{"status": status}, 0, constants.Infinite, "-create_ts") + if err != nil { + log.Error("get tasks error:" + err.Error()) + return err + } + for _, task := range tasks { + RemoveLogByTaskId(task.Id) + } + return nil +} + func removeLog(t model.Task) { if err := RemoveLocalLog(t.LogPath); err != nil { log.Errorf("remove local log error: %s", err.Error()) diff --git a/backend/services/schedule.go b/backend/services/schedule.go index d4c1635b..52a6492e 100644 --- a/backend/services/schedule.go +++ b/backend/services/schedule.go @@ -15,7 +15,7 @@ type Scheduler struct { cron *cron.Cron } -func AddTask(s model.Schedule) func() { +func AddScheduleTask(s model.Schedule) func() { return func() { node, err := model.GetNodeByKey(s.NodeKey) if err != nil || node.Id.Hex() == "" { @@ -97,7 +97,7 @@ func (s *Scheduler) AddJob(job model.Schedule) error { spec := job.Cron // 添加任务 - eid, err := s.cron.AddFunc(spec, AddTask(job)) + eid, err := s.cron.AddFunc(spec, AddScheduleTask(job)) if err != nil { log.Errorf("add func task error: %s", err.Error()) debug.PrintStack() diff --git a/backend/services/task.go b/backend/services/task.go index 940f2478..859a24f0 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -10,6 +10,8 @@ import ( "encoding/json" "errors" "github.com/apex/log" + "github.com/globalsign/mgo/bson" + uuid "github.com/satori/go.uuid" "github.com/spf13/viper" "os" "os/exec" @@ -590,6 +592,32 @@ func CancelTask(id string) (err error) { return nil } +func AddTask(t model.Task) error { + // 生成任务ID + id := uuid.NewV4() + t.Id = id.String() + + // 设置任务状态 + t.Status = constants.StatusPending + + // 如果没有传入node_id,则置为null + if t.NodeId.Hex() == "" { + t.NodeId = bson.ObjectIdHex(constants.ObjectIdNull) + } + + // 将任务存入数据库 + if err := model.AddTask(t); err != nil { + return err + } + + // 加入任务队列 + if err := AssignTask(t); err != nil { + return err + } + + return nil +} + func HandleTaskError(t model.Task, err error) { log.Error("handle task error:" + err.Error()) t.Status = constants.StatusError diff --git a/frontend/src/components/Config/ConfigList.vue b/frontend/src/components/Config/ConfigList.vue index f419a36c..5c7a9dc2 100644 --- a/frontend/src/components/Config/ConfigList.vue +++ b/frontend/src/components/Config/ConfigList.vue @@ -549,7 +549,8 @@ export default { } }) }, - onCrawl () { + async onCrawl () { + await this.onSave() this.crawlConfirmDialogVisible = true this.$st.sendEv('爬虫详情-配置', '点击运行') },