diff --git a/backend/database/redis.go b/backend/database/redis.go index d93491f8..70c9db99 100644 --- a/backend/database/redis.go +++ b/backend/database/redis.go @@ -102,7 +102,34 @@ func (r *Redis) HDel(collection string, key string) error { } return nil } +func (r *Redis) HScan(collection string) (results []string, err error) { + c := r.pool.Get() + defer utils.Close(c) + var ( + cursor int64 + items []string + ) + for { + values, err := redis.Values(c.Do("HSCAN", collection, cursor)) + if err != nil { + return results, err + } + + values, err = redis.Scan(values, &cursor, &items) + if err != nil { + return results, err + } + + results = append(results, items[1]) + + if cursor == 0 { + break + } + } + return results, err + +} func (r *Redis) HKeys(collection string) ([]string, error) { c := r.pool.Get() defer utils.Close(c) diff --git a/backend/model/node.go b/backend/model/node.go index 715be30f..c02443d2 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -26,7 +26,7 @@ type Node struct { Key string `json:"key" bson:"key"` // 前端展示 - IsMaster bool `json:"is_master"` + IsMaster bool `json:"is_master" bson:"is_master"` UpdateTs time.Time `json:"update_ts" bson:"update_ts"` CreateTs time.Time `json:"create_ts" bson:"create_ts"` diff --git a/backend/services/node.go b/backend/services/node.go index 89187a81..60310759 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -12,7 +12,6 @@ import ( "encoding/json" "fmt" "github.com/apex/log" - "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "github.com/gomodule/redigo/redis" "github.com/spf13/viper" @@ -53,41 +52,25 @@ func GetNodeData() (Data, error) { return data, err } -func GetRedisNode(key string) (*Data, error) { - // 获取节点数据 - value, err := database.RedisClient.HGet("nodes", key) - if err != nil { - log.Errorf(err.Error()) - return nil, err - } - - // 解析节点列表数据 - var data Data - if err := json.Unmarshal([]byte(value), &data); err != nil { - log.Errorf(err.Error()) - return nil, err - } - return &data, nil -} - // 更新所有节点状态 func UpdateNodeStatus() { // 从Redis获取节点keys - list, err := database.RedisClient.HKeys("nodes") + list, err := database.RedisClient.HScan("nodes") if err != nil { log.Errorf("get redis node keys error: %s", err.Error()) return } - + var offlineKeys []string // 遍历节点keys - for _, key := range list { - - data, err := GetRedisNode(key) - if err != nil { + for _, dataStr := range list { + var data Data + if err := json.Unmarshal([]byte(dataStr), &data); err != nil { + log.Errorf(err.Error()) continue } // 如果记录的更新时间超过60秒,该节点被认为离线 if time.Now().Unix()-data.UpdateTsUnix > 60 { + offlineKeys = append(offlineKeys, data.Key) // 在Redis中删除该节点 if err := database.RedisClient.HDel("nodes", data.Key); err != nil { log.Errorf("delete redis node key error:%s, key:%s", err.Error(), data.Key) @@ -96,13 +79,27 @@ func UpdateNodeStatus() { } // 处理node信息 - handleNodeInfo(key, data) + if err = UpdateNodeInfo(&data); err != nil { + log.Errorf(err.Error()) + continue + } + } + if len(offlineKeys) > 0 { + s, c := database.GetCol("nodes") + defer s.Close() + _, err = c.UpdateAll(bson.M{ + "key": bson.M{ + "$in": offlineKeys, + }, + }, bson.M{ + "$set": bson.M{ + "status": constants.StatusOffline, + }, + }) + if err != nil { + log.Errorf(err.Error()) + } } - - // 重新获取list - list, _ = database.RedisClient.HKeys("nodes") - // 重置不在redis的key为offline - model.ResetNodeStatusToOffline(list) } func getNodeName(data *Data) string { @@ -119,46 +116,28 @@ func getNodeName(data *Data) string { } // 处理节点信息 -func handleNodeInfo(key string, data *Data) { - // 添加同步锁 - v, err := database.RedisClient.Lock(key) - if err != nil { - return - } - defer database.RedisClient.UnLock(key, v) - +func UpdateNodeInfo(data *Data) (err error) { // 更新节点信息到数据库 s, c := database.GetCol("nodes") defer s.Close() - var node model.Node - if err := c.Find(bson.M{"key": key}).One(&node); err != nil && err == mgo.ErrNotFound { - // 数据库不存在该节点 - node = model.Node{ - Key: key, - Name: getNodeName(data), - Ip: data.Ip, - Port: "8000", - Mac: data.Mac, - Status: constants.StatusOnline, - IsMaster: data.Master, - UpdateTs: time.Now(), - UpdateTsUnix: time.Now().Unix(), - } - if err := node.Add(); err != nil { - log.Errorf(err.Error()) - return - } - } else if node.Key != "" { - // 数据库存在该节点 - node.Status = constants.StatusOnline - node.UpdateTs = time.Now() - node.UpdateTsUnix = time.Now().Unix() - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } - } + _, err = c.Upsert(bson.M{"key": data.Key}, bson.M{ + "$set": bson.M{ + "status": constants.StatusOnline, + "update_ts": time.Now(), + "update_ts_unix": time.Now().Unix(), + }, + "$setOnInsert": bson.M{ + "_id": bson.NewObjectId(), + "key": data.Key, + "name": getNodeName(data), + "ip": data.Ip, + "port": "8000", + "mac": data.Mac, + "is_master": data.Master, + }, + }) + return err } // 更新节点数据