fix 同步锁可能异常的情况

This commit is contained in:
陈景阳
2019-12-10 09:22:09 +08:00
parent 8a513e66b5
commit 728dbd5147
2 changed files with 28 additions and 12 deletions

View File

@@ -155,42 +155,57 @@ func Sub(channel string, consume ConsumeFunc) error {
return nil
}
// 构建同步锁key
func (r *Redis) getLockKey(lockKey string) string {
lockKey = strings.ReplaceAll(lockKey, ":", "-")
return "nodes:lock:" + lockKey
}
func (r *Redis) Lock(lockKey string) error {
// 获得锁
func (r *Redis) Lock(lockKey string) (int64, error) {
c := r.pool.Get()
defer utils.Close(c)
lockKey = r.getLockKey(lockKey)
ts := time.Now()
v, err := c.Do("SET", lockKey, ts, "NX", "PX", 30000)
ts := time.Now().Unix()
ok, err := c.Do("SET", lockKey, ts, "NX", "PX", 30000)
if err != nil {
log.Errorf("get lock fail with error: %s", err.Error())
debug.PrintStack()
return err
return 0, err
}
if err == nil && v == nil {
if err == nil && ok == nil {
log.Errorf("the lockKey is locked: key=%s", lockKey)
return errors.New("the lockKey is locked")
return 0, errors.New("the lockKey is locked")
}
return nil
return ts, nil
}
func (r *Redis) UnLock(lockKey string) {
func (r *Redis) UnLock(lockKey string, value int64) {
c := r.pool.Get()
defer utils.Close(c)
lockKey = r.getLockKey(lockKey)
v, err := c.Do("DEL", lockKey)
getValue, err := redis.Int64(c.Do("GET", lockKey))
if err != nil {
log.Errorf("get lockKey error: %s", err.Error())
debug.PrintStack()
return
}
if getValue != value {
log.Errorf("the lockKey value diff: %d, %d", value, getValue)
return
}
v, err := redis.Int64(c.Do("DEL", lockKey))
if err != nil {
log.Errorf("unlock failed, error: %s", err.Error())
debug.PrintStack()
return
}
if v.(int64) == 0 {
if v == 0 {
log.Errorf("unlock failed: key=%s", lockKey)
return
}

View File

@@ -96,10 +96,11 @@ func UpdateNodeStatus() {
func handleNodeInfo(key string, data Data) {
// 添加同步锁
if err := database.RedisClient.Lock(key); err != nil {
v, err := database.RedisClient.Lock(key)
if err != nil {
return
}
defer database.RedisClient.UnLock(key)
defer database.RedisClient.UnLock(key, v)
// 更新节点信息到数据库
s, c := database.GetCol("nodes")