mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
package constants
|
||||
|
||||
const (
|
||||
ScheduleStatusStop = "stop"
|
||||
ScheduleStatusStop = "stopped"
|
||||
ScheduleStatusRunning = "running"
|
||||
ScheduleStatusError = "error"
|
||||
|
||||
|
||||
@@ -189,13 +189,13 @@ func main() {
|
||||
authGroup.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果
|
||||
authGroup.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果
|
||||
// 定时任务
|
||||
authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表
|
||||
authGroup.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情
|
||||
authGroup.PUT("/schedules", routes.PutSchedule) // 创建定时任务
|
||||
authGroup.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务
|
||||
authGroup.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务
|
||||
authGroup.POST("/schedules/:id/stop", routes.StopSchedule) // 停止定时任务
|
||||
authGroup.POST("/schedules/:id/run", routes.RunSchedule) // 运行定时任务
|
||||
authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表
|
||||
authGroup.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情
|
||||
authGroup.PUT("/schedules", routes.PutSchedule) // 创建定时任务
|
||||
authGroup.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务
|
||||
authGroup.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务
|
||||
authGroup.POST("/schedules/:id/disable", routes.DisableSchedule) // 禁用定时任务
|
||||
authGroup.POST("/schedules/:id/enable", routes.EnableSchedule) // 启用定时任务
|
||||
// 统计数据
|
||||
authGroup.GET("/stats/home", routes.GetHomeStats) // 首页统计数据
|
||||
// 用户
|
||||
|
||||
@@ -173,8 +173,8 @@ func GetNode(id bson.ObjectId) (Node, error) {
|
||||
defer s.Close()
|
||||
|
||||
if err := c.FindId(id).One(&node); err != nil {
|
||||
log.Errorf("get node error: %s, id: %s", err.Error(), id.Hex())
|
||||
debug.PrintStack()
|
||||
//log.Errorf("get node error: %s, id: %s", err.Error(), id.Hex())
|
||||
//debug.PrintStack()
|
||||
return node, err
|
||||
}
|
||||
return node, nil
|
||||
|
||||
@@ -16,20 +16,17 @@ type Schedule struct {
|
||||
Name string `json:"name" bson:"name"`
|
||||
Description string `json:"description" bson:"description"`
|
||||
SpiderId bson.ObjectId `json:"spider_id" bson:"spider_id"`
|
||||
//NodeId bson.ObjectId `json:"node_id" bson:"node_id"`
|
||||
//NodeKey string `json:"node_key" bson:"node_key"`
|
||||
Cron string `json:"cron" bson:"cron"`
|
||||
EntryId cron.EntryID `json:"entry_id" bson:"entry_id"`
|
||||
Param string `json:"param" bson:"param"`
|
||||
RunType string `json:"run_type" bson:"run_type"`
|
||||
NodeIds []bson.ObjectId `json:"node_ids" bson:"node_ids"`
|
||||
|
||||
// 状态
|
||||
Status string `json:"status" bson:"status"`
|
||||
Status string `json:"status" bson:"status"`
|
||||
Enabled bool `json:"enabled" bson:"enabled"`
|
||||
|
||||
// 前端展示
|
||||
SpiderName string `json:"spider_name" bson:"spider_name"`
|
||||
NodeName string `json:"node_name" bson:"node_name"`
|
||||
Nodes []Node `json:"nodes" bson:"nodes"`
|
||||
Message string `json:"message" bson:"message"`
|
||||
|
||||
CreateTs time.Time `json:"create_ts" bson:"create_ts"`
|
||||
@@ -84,20 +81,15 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) {
|
||||
|
||||
var schs []Schedule
|
||||
for _, schedule := range schedules {
|
||||
// TODO: 获取节点名称
|
||||
//if schedule.NodeId == bson.ObjectIdHex(constants.ObjectIdNull) {
|
||||
// // 选择所有节点
|
||||
// schedule.NodeName = "All Nodes"
|
||||
//} else {
|
||||
// // 选择单一节点
|
||||
// node, err := GetNode(schedule.NodeId)
|
||||
// if err != nil {
|
||||
// schedule.Status = constants.ScheduleStatusError
|
||||
// schedule.Message = constants.ScheduleStatusErrorNotFoundNode
|
||||
// } else {
|
||||
// schedule.NodeName = node.Name
|
||||
// }
|
||||
//}
|
||||
// 获取节点名称
|
||||
schedule.Nodes = []Node{}
|
||||
if schedule.RunType == constants.RunTypeSelectedNodes {
|
||||
for _, nodeId := range schedule.NodeIds {
|
||||
// 选择单一节点
|
||||
node, _ := GetNode(nodeId)
|
||||
schedule.Nodes = append(schedule.Nodes, node)
|
||||
}
|
||||
}
|
||||
|
||||
// 获取爬虫名称
|
||||
spider, err := GetSpider(schedule.SpiderId)
|
||||
|
||||
@@ -117,18 +117,12 @@ func GetTaskList(filter interface{}, skip int, limit int, sortKey string) ([]Tas
|
||||
|
||||
for i, task := range tasks {
|
||||
// 获取爬虫名称
|
||||
spider, err := task.GetSpider()
|
||||
if err != nil || spider.Id.Hex() == "" {
|
||||
_ = spider.Delete()
|
||||
} else {
|
||||
if spider, err := task.GetSpider(); err == nil {
|
||||
tasks[i].SpiderName = spider.DisplayName
|
||||
}
|
||||
|
||||
// 获取节点名称
|
||||
node, err := task.GetNode()
|
||||
if node.Id.Hex() == "" || err != nil {
|
||||
_ = task.Delete()
|
||||
} else {
|
||||
if node, err := task.GetNode(); err == nil {
|
||||
tasks[i].NodeName = node.Name
|
||||
}
|
||||
}
|
||||
@@ -142,6 +136,8 @@ func GetTaskListTotal(filter interface{}) (int, error) {
|
||||
var result int
|
||||
result, err := c.Find(filter).Count()
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return result, err
|
||||
}
|
||||
return result, nil
|
||||
@@ -168,6 +164,8 @@ func AddTask(item Task) error {
|
||||
item.UpdateTs = time.Now()
|
||||
|
||||
if err := c.Insert(&item); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -179,6 +177,8 @@ func RemoveTask(id string) error {
|
||||
|
||||
var result Task
|
||||
if err := c.FindId(id).One(&result); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -110,9 +110,9 @@ func DeleteSchedule(c *gin.Context) {
|
||||
}
|
||||
|
||||
// 停止定时任务
|
||||
func StopSchedule(c *gin.Context) {
|
||||
func DisableSchedule(c *gin.Context) {
|
||||
id := c.Param("id")
|
||||
if err := services.Sched.Stop(bson.ObjectIdHex(id)); err != nil {
|
||||
if err := services.Sched.Disable(bson.ObjectIdHex(id)); err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
@@ -120,9 +120,9 @@ func StopSchedule(c *gin.Context) {
|
||||
}
|
||||
|
||||
// 运行定时任务
|
||||
func RunSchedule(c *gin.Context) {
|
||||
func EnableSchedule(c *gin.Context) {
|
||||
id := c.Param("id")
|
||||
if err := services.Sched.Run(bson.ObjectIdHex(id)); err != nil {
|
||||
if err := services.Sched.Enable(bson.ObjectIdHex(id)); err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -53,6 +53,8 @@ func AddScheduleTask(s model.Schedule) func() {
|
||||
Param: s.Param,
|
||||
}
|
||||
if err := AddTask(t); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
if err := AssignTask(t); err != nil {
|
||||
@@ -137,7 +139,7 @@ func (s *Scheduler) Start() error {
|
||||
func (s *Scheduler) AddJob(job model.Schedule) error {
|
||||
spec := job.Cron
|
||||
|
||||
// 添加任务
|
||||
// 添加定时任务
|
||||
eid, err := s.cron.AddFunc(spec, AddScheduleTask(job))
|
||||
if err != nil {
|
||||
log.Errorf("add func task error: %s", err.Error())
|
||||
@@ -147,7 +149,12 @@ func (s *Scheduler) AddJob(job model.Schedule) error {
|
||||
|
||||
// 更新EntryID
|
||||
job.EntryId = eid
|
||||
|
||||
// 更新状态
|
||||
job.Status = constants.ScheduleStatusRunning
|
||||
job.Enabled = true
|
||||
|
||||
// 保存定时任务
|
||||
if err := job.Save(); err != nil {
|
||||
log.Errorf("job save error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
@@ -176,8 +183,8 @@ func ParserCron(spec string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 停止定时任务
|
||||
func (s *Scheduler) Stop(id bson.ObjectId) error {
|
||||
// 禁用定时任务
|
||||
func (s *Scheduler) Disable(id bson.ObjectId) error {
|
||||
schedule, err := model.GetSchedule(id)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -185,17 +192,22 @@ func (s *Scheduler) Stop(id bson.ObjectId) error {
|
||||
if schedule.EntryId == 0 {
|
||||
return errors.New("entry id not found")
|
||||
}
|
||||
|
||||
// 从cron服务中删除该任务
|
||||
s.cron.Remove(schedule.EntryId)
|
||||
|
||||
// 更新状态
|
||||
schedule.Status = constants.ScheduleStatusStop
|
||||
schedule.Enabled = false
|
||||
|
||||
if err = schedule.Save(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 运行任务
|
||||
func (s *Scheduler) Run(id bson.ObjectId) error {
|
||||
// 启用定时任务
|
||||
func (s *Scheduler) Enable(id bson.ObjectId) error {
|
||||
schedule, err := model.GetSchedule(id)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -627,11 +627,15 @@ func AddTask(t model.Task) error {
|
||||
|
||||
// 将任务存入数据库
|
||||
if err := model.AddTask(t); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
|
||||
// 加入任务队列
|
||||
if err := AssignTask(t); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user