diff --git a/backend/constants/schedule.go b/backend/constants/schedule.go index 3f0afc7e..c3104601 100644 --- a/backend/constants/schedule.go +++ b/backend/constants/schedule.go @@ -3,4 +3,8 @@ package constants const ( ScheduleStatusStop = "stop" ScheduleStatusRunning = "running" + ScheduleStatusError = "error" + + ScheduleStatusErrorNotFoundNode = "Not Found Node" + ScheduleStatusErrorNotFoundSpider = "Not Found Spider" ) diff --git a/backend/database/redis.go b/backend/database/redis.go index bffc40be..54f716d0 100644 --- a/backend/database/redis.go +++ b/backend/database/redis.go @@ -4,10 +4,12 @@ import ( "context" "crawlab/entity" "crawlab/utils" + "errors" "github.com/apex/log" "github.com/gomodule/redigo/redis" "github.com/spf13/viper" "runtime/debug" + "strings" "time" ) @@ -17,9 +19,18 @@ type Redis struct { pool *redis.Pool } +type Mutex struct { + Name string + expiry time.Duration + tries int + delay time.Duration + value string +} + func NewRedisClient() *Redis { return &Redis{pool: NewRedisPool()} } + func (r *Redis) RPush(collection string, value interface{}) error { c := r.pool.Get() defer utils.Close(c) @@ -143,3 +154,44 @@ func Sub(channel string, consume ConsumeFunc) error { } return nil } + +func (r *Redis) getLockKey(lockKey string) string { + lockKey = strings.ReplaceAll(lockKey, ":", "-") + return "nodes:lock:" + lockKey +} + +func (r *Redis) Lock(lockKey string) error { + c := r.pool.Get() + defer utils.Close(c) + lockKey = r.getLockKey(lockKey) + + ts := time.Now() + v, err := c.Do("SET", lockKey, ts, "NX", "PX", 30000) + if err != nil { + log.Errorf("get lock fail with error: %s", err.Error()) + debug.PrintStack() + return err + } + if err == nil && v == nil { + log.Errorf("the lockKey is locked: key=%s", lockKey) + return errors.New("the lockKey is locked") + } + return nil +} + +func (r *Redis) UnLock(lockKey string) { + c := r.pool.Get() + defer utils.Close(c) + lockKey = r.getLockKey(lockKey) + + v, err := c.Do("DEL", lockKey) + if err != nil { + log.Errorf("unlock failed, error: %s", err.Error()) + debug.PrintStack() + return + } + if v.(int64) == 0 { + log.Errorf("unlock failed: key=%s", lockKey) + return + } +} diff --git a/backend/model/schedule.go b/backend/model/schedule.go index 02f8a656..39e1244f 100644 --- a/backend/model/schedule.go +++ b/backend/model/schedule.go @@ -27,6 +27,7 @@ type Schedule struct { // 前端展示 SpiderName string `json:"spider_name" bson:"spider_name"` NodeName string `json:"node_name" bson:"node_name"` + Message string `json:"message" bson:"message"` CreateTs time.Time `json:"create_ts" bson:"create_ts"` UpdateTs time.Time `json:"update_ts" bson:"update_ts"` @@ -88,21 +89,23 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) { // 选择单一节点 node, err := GetNode(schedule.NodeId) if err != nil { - log.Errorf(err.Error()) - continue + schedule.Status = constants.ScheduleStatusError + schedule.Message = constants.ScheduleStatusErrorNotFoundNode + } else { + schedule.NodeName = node.Name } - schedule.NodeName = node.Name } // 获取爬虫名称 spider, err := GetSpider(schedule.SpiderId) if err != nil && err == mgo.ErrNotFound { log.Errorf("get spider by id: %s, error: %s", schedule.SpiderId.Hex(), err.Error()) - debug.PrintStack() - _ = schedule.Delete() - continue + schedule.Status = constants.ScheduleStatusError + schedule.Message = constants.ScheduleStatusErrorNotFoundSpider + } else { + schedule.SpiderName = spider.Name } - schedule.SpiderName = spider.Name + schs = append(schs, schedule) } return schs, nil diff --git a/backend/routes/schedule.go b/backend/routes/schedule.go index 1328f05e..d1c3affa 100644 --- a/backend/routes/schedule.go +++ b/backend/routes/schedule.go @@ -11,7 +11,7 @@ import ( func GetScheduleList(c *gin.Context) { results, err := model.GetScheduleList(nil) if err != nil { - HandleError(http.StatusInternalServerError, c, err) + HandleError(http.StatusOK, c, err) return } HandleSuccessData(c, results) @@ -22,7 +22,7 @@ func GetSchedule(c *gin.Context) { result, err := model.GetSchedule(bson.ObjectIdHex(id)) if err != nil { - HandleError(http.StatusInternalServerError, c, err) + HandleError(http.StatusOK, c, err) return } diff --git a/backend/services/node.go b/backend/services/node.go index be916f10..26cd3b89 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -95,19 +95,16 @@ func UpdateNodeStatus() { } func handleNodeInfo(key string, data Data) { + // 添加同步锁 + if err := database.RedisClient.Lock(key); err != nil { + return + } + defer database.RedisClient.UnLock(key) + // 更新节点信息到数据库 s, c := database.GetCol("nodes") defer s.Close() - // 同个key可能因为并发,被注册多次 - var nodes []model.Node - _ = c.Find(bson.M{"key": key}).All(&nodes) - if len(nodes) > 1 { - for _, node := range nodes { - _ = c.RemoveId(node.Id) - } - } - var node model.Node if err := c.Find(bson.M{"key": key}).One(&node); err != nil { // 数据库不存在该节点