Feature 新增定时任务可暂停/运行

This commit is contained in:
陈景阳
2019-12-07 14:02:54 +08:00
parent 6fb831b322
commit 6e7634d70f
6 changed files with 96 additions and 32 deletions

View File

@@ -0,0 +1,6 @@
package constants
const (
ScheduleStatusStop = "stop"
ScheduleStatusRunning = "running"
)

View File

@@ -160,11 +160,13 @@ func main() {
authGroup.GET("/tasks/:id/results", routes.GetTaskResults) // 任务结果
authGroup.GET("/tasks/:id/results/download", routes.DownloadTaskResultsCsv) // 下载任务结果
// 定时任务
authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表
authGroup.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情
authGroup.PUT("/schedules", routes.PutSchedule) // 创建定时任务
authGroup.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务
authGroup.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务
authGroup.GET("/schedules", routes.GetScheduleList) // 定时任务列表
authGroup.GET("/schedules/:id", routes.GetSchedule) // 定时任务详情
authGroup.PUT("/schedules", routes.PutSchedule) // 创建定时任务
authGroup.POST("/schedules/:id", routes.PostSchedule) // 修改定时任务
authGroup.DELETE("/schedules/:id", routes.DeleteSchedule) // 删除定时任务
authGroup.POST("/schedules/:id/stop", routes.StopSchedule) // 停止定时任务
authGroup.POST("/schedules/:id/run", routes.RunSchedule) // 运行定时任务
// 统计数据
authGroup.GET("/stats/home", routes.GetHomeStats) // 首页统计数据
// 用户

View File

@@ -21,6 +21,8 @@ type Schedule struct {
Cron string `json:"cron" bson:"cron"`
EntryId cron.EntryID `json:"entry_id" bson:"entry_id"`
Param string `json:"param" bson:"param"`
// 状态
Status string `json:"status" bson:"status"`
// 前端展示
SpiderName string `json:"spider_name" bson:"spider_name"`

View File

@@ -14,11 +14,7 @@ func GetScheduleList(c *gin.Context) {
HandleError(http.StatusInternalServerError, c, err)
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
Data: results,
})
HandleSuccessData(c, results)
}
func GetSchedule(c *gin.Context) {
@@ -29,11 +25,8 @@ func GetSchedule(c *gin.Context) {
HandleError(http.StatusInternalServerError, c, err)
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
Data: result,
})
HandleSuccessData(c, result)
}
func PostSchedule(c *gin.Context) {
@@ -65,10 +58,7 @@ func PostSchedule(c *gin.Context) {
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
})
HandleSuccess(c)
}
func PutSchedule(c *gin.Context) {
@@ -98,10 +88,7 @@ func PutSchedule(c *gin.Context) {
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
})
HandleSuccess(c)
}
func DeleteSchedule(c *gin.Context) {
@@ -119,8 +106,25 @@ func DeleteSchedule(c *gin.Context) {
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
})
HandleSuccess(c)
}
// 停止定时任务
func StopSchedule(c *gin.Context) {
id := c.Param("id")
if err := services.Sched.Stop(bson.ObjectIdHex(id)); err != nil {
HandleError(http.StatusOK, c, err)
return
}
HandleSuccess(c)
}
// 运行定时任务
func RunSchedule(c *gin.Context) {
id := c.Param("id")
if err := services.Sched.Run(bson.ObjectIdHex(id)); err != nil {
HandleError(http.StatusOK, c, err)
return
}
HandleSuccess(c)
}

View File

@@ -1,17 +1,15 @@
package routes
import (
"github.com/apex/log"
"github.com/gin-gonic/gin"
"net/http"
"runtime/debug"
)
func HandleError(statusCode int, c *gin.Context, err error) {
log.Errorf("handle error:" + err.Error())
debug.PrintStack()
c.AbortWithStatusJSON(statusCode, Response{
Status: "ok",
Message: "error",
Status: "error",
Message: "failure",
Error: err.Error(),
})
}
@@ -24,3 +22,18 @@ func HandleErrorF(statusCode int, c *gin.Context, err string) {
Error: err,
})
}
func HandleSuccess(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
})
}
func HandleSuccessData(c *gin.Context, data interface{}) {
c.AbortWithStatusJSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
Data: data,
})
}

View File

@@ -4,7 +4,9 @@ import (
"crawlab/constants"
"crawlab/lib/cron"
"crawlab/model"
"errors"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"github.com/satori/go.uuid"
"runtime/debug"
)
@@ -106,6 +108,7 @@ func (s *Scheduler) AddJob(job model.Schedule) error {
// 更新EntryID
job.EntryId = eid
job.Status = constants.ScheduleStatusRunning
if err := job.Save(); err != nil {
log.Errorf("job save error: %s", err.Error())
debug.PrintStack()
@@ -134,6 +137,36 @@ func ParserCron(spec string) error {
return nil
}
// 停止定时任务
func (s *Scheduler) Stop(id bson.ObjectId) error {
schedule, err := model.GetSchedule(id)
if err != nil {
return err
}
if schedule.EntryId == 0 {
return errors.New("entry id not found")
}
s.cron.Remove(schedule.EntryId)
// 更新状态
schedule.Status = constants.ScheduleStatusStop
if err = schedule.Save(); err != nil {
return err
}
return nil
}
// 运行任务
func (s *Scheduler) Run(id bson.ObjectId) error {
schedule, err := model.GetSchedule(id)
if err != nil {
return err
}
if err := s.AddJob(schedule); err != nil {
return err
}
return nil
}
func (s *Scheduler) Update() error {
// 删除所有定时任务
s.RemoveAll()
@@ -151,6 +184,10 @@ func (s *Scheduler) Update() error {
// 单个任务
job := sList[i]
if job.Status == constants.ScheduleStatusStop {
continue
}
// 添加到定时任务
if err := s.AddJob(job); err != nil {
log.Errorf("add job error: %s, job: %s, cron: %s", err.Error(), job.Name, job.Cron)