mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-24 17:41:03 +01:00
完成可能重复注册节点的问题
This commit is contained in:
@@ -3,4 +3,8 @@ package constants
|
||||
const (
|
||||
ScheduleStatusStop = "stop"
|
||||
ScheduleStatusRunning = "running"
|
||||
ScheduleStatusError = "error"
|
||||
|
||||
ScheduleStatusErrorNotFoundNode = "Not Found Node"
|
||||
ScheduleStatusErrorNotFoundSpider = "Not Found Spider"
|
||||
)
|
||||
|
||||
@@ -4,10 +4,12 @@ import (
|
||||
"context"
|
||||
"crawlab/entity"
|
||||
"crawlab/utils"
|
||||
"errors"
|
||||
"github.com/apex/log"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/spf13/viper"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -17,9 +19,18 @@ type Redis struct {
|
||||
pool *redis.Pool
|
||||
}
|
||||
|
||||
type Mutex struct {
|
||||
Name string
|
||||
expiry time.Duration
|
||||
tries int
|
||||
delay time.Duration
|
||||
value string
|
||||
}
|
||||
|
||||
func NewRedisClient() *Redis {
|
||||
return &Redis{pool: NewRedisPool()}
|
||||
}
|
||||
|
||||
func (r *Redis) RPush(collection string, value interface{}) error {
|
||||
c := r.pool.Get()
|
||||
defer utils.Close(c)
|
||||
@@ -143,3 +154,44 @@ func Sub(channel string, consume ConsumeFunc) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Redis) getLockKey(lockKey string) string {
|
||||
lockKey = strings.ReplaceAll(lockKey, ":", "-")
|
||||
return "nodes:lock:" + lockKey
|
||||
}
|
||||
|
||||
func (r *Redis) Lock(lockKey string) error {
|
||||
c := r.pool.Get()
|
||||
defer utils.Close(c)
|
||||
lockKey = r.getLockKey(lockKey)
|
||||
|
||||
ts := time.Now()
|
||||
v, err := c.Do("SET", lockKey, ts, "NX", "PX", 30000)
|
||||
if err != nil {
|
||||
log.Errorf("get lock fail with error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
if err == nil && v == nil {
|
||||
log.Errorf("the lockKey is locked: key=%s", lockKey)
|
||||
return errors.New("the lockKey is locked")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Redis) UnLock(lockKey string) {
|
||||
c := r.pool.Get()
|
||||
defer utils.Close(c)
|
||||
lockKey = r.getLockKey(lockKey)
|
||||
|
||||
v, err := c.Do("DEL", lockKey)
|
||||
if err != nil {
|
||||
log.Errorf("unlock failed, error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
if v.(int64) == 0 {
|
||||
log.Errorf("unlock failed: key=%s", lockKey)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ type Schedule struct {
|
||||
// 前端展示
|
||||
SpiderName string `json:"spider_name" bson:"spider_name"`
|
||||
NodeName string `json:"node_name" bson:"node_name"`
|
||||
Message string `json:"message" bson:"message"`
|
||||
|
||||
CreateTs time.Time `json:"create_ts" bson:"create_ts"`
|
||||
UpdateTs time.Time `json:"update_ts" bson:"update_ts"`
|
||||
@@ -88,21 +89,23 @@ func GetScheduleList(filter interface{}) ([]Schedule, error) {
|
||||
// 选择单一节点
|
||||
node, err := GetNode(schedule.NodeId)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
continue
|
||||
schedule.Status = constants.ScheduleStatusError
|
||||
schedule.Message = constants.ScheduleStatusErrorNotFoundNode
|
||||
} else {
|
||||
schedule.NodeName = node.Name
|
||||
}
|
||||
schedule.NodeName = node.Name
|
||||
}
|
||||
|
||||
// 获取爬虫名称
|
||||
spider, err := GetSpider(schedule.SpiderId)
|
||||
if err != nil && err == mgo.ErrNotFound {
|
||||
log.Errorf("get spider by id: %s, error: %s", schedule.SpiderId.Hex(), err.Error())
|
||||
debug.PrintStack()
|
||||
_ = schedule.Delete()
|
||||
continue
|
||||
schedule.Status = constants.ScheduleStatusError
|
||||
schedule.Message = constants.ScheduleStatusErrorNotFoundSpider
|
||||
} else {
|
||||
schedule.SpiderName = spider.Name
|
||||
}
|
||||
schedule.SpiderName = spider.Name
|
||||
|
||||
schs = append(schs, schedule)
|
||||
}
|
||||
return schs, nil
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
func GetScheduleList(c *gin.Context) {
|
||||
results, err := model.GetScheduleList(nil)
|
||||
if err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
HandleError(http.StatusOK, c, err)
|
||||
return
|
||||
}
|
||||
HandleSuccessData(c, results)
|
||||
@@ -22,7 +22,7 @@ func GetSchedule(c *gin.Context) {
|
||||
|
||||
result, err := model.GetSchedule(bson.ObjectIdHex(id))
|
||||
if err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
HandleError(http.StatusOK, c, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -95,19 +95,16 @@ func UpdateNodeStatus() {
|
||||
}
|
||||
|
||||
func handleNodeInfo(key string, data Data) {
|
||||
// 添加同步锁
|
||||
if err := database.RedisClient.Lock(key); err != nil {
|
||||
return
|
||||
}
|
||||
defer database.RedisClient.UnLock(key)
|
||||
|
||||
// 更新节点信息到数据库
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
|
||||
// 同个key可能因为并发,被注册多次
|
||||
var nodes []model.Node
|
||||
_ = c.Find(bson.M{"key": key}).All(&nodes)
|
||||
if len(nodes) > 1 {
|
||||
for _, node := range nodes {
|
||||
_ = c.RemoveId(node.Id)
|
||||
}
|
||||
}
|
||||
|
||||
var node model.Node
|
||||
if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
|
||||
// 数据库不存在该节点
|
||||
|
||||
Reference in New Issue
Block a user