Merge pull request #741 from yaziming/reactors/simplified-cluster-watcher

Reactors/simplified
This commit is contained in:
Marvin Zhang
2020-05-24 12:37:17 +08:00
committed by GitHub
2 changed files with 74 additions and 50 deletions

View File

@@ -107,7 +107,34 @@ func (r *Redis) HDel(collection string, key string) error {
}
return nil
}
func (r *Redis) HScan(collection string) (results []string, err error) {
c := r.pool.Get()
defer utils.Close(c)
var (
cursor int64
items []string
)
for {
values, err := redis.Values(c.Do("HSCAN", collection, cursor))
if err != nil {
return results, err
}
values, err = redis.Scan(values, &cursor, &items)
if err != nil {
return results, err
}
results = append(results, items[1])
if cursor == 0 {
break
}
}
return results, err
}
func (r *Redis) HKeys(collection string) ([]string, error) {
c := r.pool.Get()
defer utils.Close(c)

View File

@@ -13,7 +13,6 @@ import (
"encoding/json"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/gomodule/redigo/redis"
"github.com/spf13/viper"
@@ -54,7 +53,6 @@ func GetNodeData() (Data, error) {
}
return data, err
}
func GetRedisNode(key string) (*Data, error) {
// 获取节点数据
value, err := database.RedisClient.HGet("nodes", key)
@@ -75,21 +73,22 @@ func GetRedisNode(key string) (*Data, error) {
// 更新所有节点状态
func UpdateNodeStatus() {
// 从Redis获取节点keys
list, err := database.RedisClient.HKeys("nodes")
list, err := database.RedisClient.HScan("nodes")
if err != nil {
log.Errorf("get redis node keys error: %s", err.Error())
return
}
var offlineKeys []string
// 遍历节点keys
for _, key := range list {
data, err := GetRedisNode(key)
if err != nil {
for _, dataStr := range list {
var data Data
if err := json.Unmarshal([]byte(dataStr), &data); err != nil {
log.Errorf(err.Error())
continue
}
// 如果记录的更新时间超过60秒该节点被认为离线
if time.Now().Unix()-data.UpdateTsUnix > 60 {
offlineKeys = append(offlineKeys, data.Key)
// 在Redis中删除该节点
if err := database.RedisClient.HDel("nodes", data.Key); err != nil {
log.Errorf("delete redis node key error:%s, key:%s", err.Error(), data.Key)
@@ -98,13 +97,29 @@ func UpdateNodeStatus() {
}
// 处理node信息
handleNodeInfo(key, data)
if err = UpdateNodeInfo(&data); err != nil {
log.Errorf(err.Error())
continue
}
}
if len(offlineKeys) > 0 {
s, c := database.GetCol("nodes")
defer s.Close()
_, err = c.UpdateAll(bson.M{
"key": bson.M{
"$in": offlineKeys,
},
}, bson.M{
"$set": bson.M{
"status": constants.StatusOffline,
"update_ts": time.Now(),
"update_ts_unix": time.Now().Unix(),
},
})
if err != nil {
log.Errorf(err.Error())
}
}
// 重新获取list
list, _ = database.RedisClient.HKeys("nodes")
// 重置不在redis的key为offline
model.ResetNodeStatusToOffline(list)
}
func getNodeName(data *Data) string {
@@ -121,46 +136,28 @@ func getNodeName(data *Data) string {
}
// 处理节点信息
func handleNodeInfo(key string, data *Data) {
// 添加同步锁
v, err := database.RedisClient.Lock(key)
if err != nil {
return
}
defer database.RedisClient.UnLock(key, v)
func UpdateNodeInfo(data *Data) (err error) {
// 更新节点信息到数据库
s, c := database.GetCol("nodes")
defer s.Close()
var node model.Node
if err := c.Find(bson.M{"key": key}).One(&node); err != nil && err == mgo.ErrNotFound {
// 数据库不存在该节点
node = model.Node{
Key: key,
Name: getNodeName(data),
Ip: data.Ip,
Port: "8000",
Mac: data.Mac,
Status: constants.StatusOnline,
IsMaster: data.Master,
UpdateTs: time.Now(),
UpdateTsUnix: time.Now().Unix(),
}
if err := node.Add(); err != nil {
log.Errorf(err.Error())
return
}
} else if node.Key != "" {
// 数据库存在该节点
node.Status = constants.StatusOnline
node.UpdateTs = time.Now()
node.UpdateTsUnix = time.Now().Unix()
if err := node.Save(); err != nil {
log.Errorf(err.Error())
return
}
}
_, err = c.Upsert(bson.M{"key": data.Key}, bson.M{
"$set": bson.M{
"status": constants.StatusOnline,
"update_ts": time.Now(),
"update_ts_unix": time.Now().Unix(),
},
"$setOnInsert": bson.M{
"_id": bson.NewObjectId(),
"key": data.Key,
"name": getNodeName(data),
"ip": data.Ip,
"port": "8000",
"mac": data.Mac,
"is_master": data.Master,
},
})
return err
}
// 更新节点数据