This commit is contained in:
陈景阳
2019-10-11 22:48:06 +08:00
parent 311f72da19
commit 7dae91ab50
4 changed files with 34 additions and 13 deletions

View File

@@ -16,6 +16,7 @@ type Schedule struct {
Description string `json:"description" bson:"description"`
SpiderId bson.ObjectId `json:"spider_id" bson:"spider_id"`
NodeId bson.ObjectId `json:"node_id" bson:"node_id"`
NodeKey string `json:"node_key" bson:"node_key"`
Cron string `json:"cron" bson:"cron"`
EntryId cron.EntryID `json:"entry_id" bson:"entry_id"`
Param string `json:"param" bson:"param"`
@@ -113,9 +114,17 @@ func AddSchedule(item Schedule) error {
s, c := database.GetCol("schedules")
defer s.Close()
node, err := GetNode(item.NodeId)
if err != nil {
log.Errorf("get node error: %s", err.Error())
debug.PrintStack()
return nil
}
item.Id = bson.NewObjectId()
item.CreateTs = time.Now()
item.UpdateTs = time.Now()
item.NodeKey = node.Key
if err := c.Insert(&item); err != nil {
debug.PrintStack()

View File

@@ -81,9 +81,9 @@ func PutSchedule(c *gin.Context) {
}
// 如果node_id为空则置为空ObjectId
if item.NodeId == "" {
item.NodeId = bson.ObjectIdHex(constants.ObjectIdNull)
}
//if item.NodeId == "" {
// item.NodeId = bson.ObjectIdHex(constants.ObjectIdNull)
//}
// 更新数据库
if err := model.AddSchedule(item); err != nil {

View File

@@ -100,13 +100,13 @@ func handleNodeInfo(key string, data Data) {
defer s.Close()
// 同个key可能因为并发被注册多次
var nodes []model.Node
_ = c.Find(bson.M{"key": key}).All(&nodes)
if nodes != nil && len(nodes) > 1 {
for _, node := range nodes {
_ = c.RemoveId(node.Id)
}
}
//var nodes []model.Node
//_ = c.Find(bson.M{"key": key}).All(&nodes)
//if nodes != nil && len(nodes) > 1 {
// for _, node := range nodes {
// _ = c.RemoveId(node.Id)
// }
//}
var node model.Node
if err := c.Find(bson.M{"key": key}).One(&node); err != nil {

View File

@@ -17,7 +17,19 @@ type Scheduler struct {
func AddTask(s model.Schedule) func() {
return func() {
nodeId := s.NodeId
node, err := model.GetNodeByKey(s.NodeKey)
if err != nil || node.Id.Hex() == "" {
log.Errorf("get node by key error: %s", err.Error())
debug.PrintStack()
return
}
spider := model.GetSpiderByName(s.SpiderName)
if spider == nil || spider.Id.Hex() == "" {
log.Errorf("get spider by name error: %s", err.Error())
debug.PrintStack()
return
}
// 生成任务ID
id := uuid.NewV4()
@@ -25,8 +37,8 @@ func AddTask(s model.Schedule) func() {
// 生成任务模型
t := model.Task{
Id: id.String(),
SpiderId: s.SpiderId,
NodeId: nodeId,
SpiderId: spider.Id,
NodeId: node.Id,
Status: constants.StatusPending,
Param: s.Param,
}