mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
simplified
This commit is contained in:
@@ -102,7 +102,34 @@ func (r *Redis) HDel(collection string, key string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (r *Redis) HScan(collection string) (results []string, err error) {
|
||||
c := r.pool.Get()
|
||||
defer utils.Close(c)
|
||||
var (
|
||||
cursor int64
|
||||
items []string
|
||||
)
|
||||
|
||||
for {
|
||||
values, err := redis.Values(c.Do("HSCAN", collection, cursor))
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
values, err = redis.Scan(values, &cursor, &items)
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
results = append(results, items[1])
|
||||
|
||||
if cursor == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return results, err
|
||||
|
||||
}
|
||||
func (r *Redis) HKeys(collection string) ([]string, error) {
|
||||
c := r.pool.Get()
|
||||
defer utils.Close(c)
|
||||
|
||||
@@ -26,7 +26,7 @@ type Node struct {
|
||||
Key string `json:"key" bson:"key"`
|
||||
|
||||
// 前端展示
|
||||
IsMaster bool `json:"is_master"`
|
||||
IsMaster bool `json:"is_master" bson:"is_master"`
|
||||
|
||||
UpdateTs time.Time `json:"update_ts" bson:"update_ts"`
|
||||
CreateTs time.Time `json:"create_ts" bson:"create_ts"`
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/apex/log"
|
||||
"github.com/globalsign/mgo"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/spf13/viper"
|
||||
@@ -53,41 +52,25 @@ func GetNodeData() (Data, error) {
|
||||
return data, err
|
||||
}
|
||||
|
||||
func GetRedisNode(key string) (*Data, error) {
|
||||
// 获取节点数据
|
||||
value, err := database.RedisClient.HGet("nodes", key)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 解析节点列表数据
|
||||
var data Data
|
||||
if err := json.Unmarshal([]byte(value), &data); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return nil, err
|
||||
}
|
||||
return &data, nil
|
||||
}
|
||||
|
||||
// 更新所有节点状态
|
||||
func UpdateNodeStatus() {
|
||||
// 从Redis获取节点keys
|
||||
list, err := database.RedisClient.HKeys("nodes")
|
||||
list, err := database.RedisClient.HScan("nodes")
|
||||
if err != nil {
|
||||
log.Errorf("get redis node keys error: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var offlineKeys []string
|
||||
// 遍历节点keys
|
||||
for _, key := range list {
|
||||
|
||||
data, err := GetRedisNode(key)
|
||||
if err != nil {
|
||||
for _, dataStr := range list {
|
||||
var data Data
|
||||
if err := json.Unmarshal([]byte(dataStr), &data); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
continue
|
||||
}
|
||||
// 如果记录的更新时间超过60秒,该节点被认为离线
|
||||
if time.Now().Unix()-data.UpdateTsUnix > 60 {
|
||||
offlineKeys = append(offlineKeys, data.Key)
|
||||
// 在Redis中删除该节点
|
||||
if err := database.RedisClient.HDel("nodes", data.Key); err != nil {
|
||||
log.Errorf("delete redis node key error:%s, key:%s", err.Error(), data.Key)
|
||||
@@ -96,13 +79,27 @@ func UpdateNodeStatus() {
|
||||
}
|
||||
|
||||
// 处理node信息
|
||||
handleNodeInfo(key, data)
|
||||
if err = UpdateNodeInfo(&data); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
if len(offlineKeys) > 0 {
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
_, err = c.UpdateAll(bson.M{
|
||||
"key": bson.M{
|
||||
"$in": offlineKeys,
|
||||
},
|
||||
}, bson.M{
|
||||
"$set": bson.M{
|
||||
"status": constants.StatusOffline,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// 重新获取list
|
||||
list, _ = database.RedisClient.HKeys("nodes")
|
||||
// 重置不在redis的key为offline
|
||||
model.ResetNodeStatusToOffline(list)
|
||||
}
|
||||
|
||||
func getNodeName(data *Data) string {
|
||||
@@ -119,46 +116,28 @@ func getNodeName(data *Data) string {
|
||||
}
|
||||
|
||||
// 处理节点信息
|
||||
func handleNodeInfo(key string, data *Data) {
|
||||
// 添加同步锁
|
||||
v, err := database.RedisClient.Lock(key)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer database.RedisClient.UnLock(key, v)
|
||||
|
||||
func UpdateNodeInfo(data *Data) (err error) {
|
||||
// 更新节点信息到数据库
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
|
||||
var node model.Node
|
||||
if err := c.Find(bson.M{"key": key}).One(&node); err != nil && err == mgo.ErrNotFound {
|
||||
// 数据库不存在该节点
|
||||
node = model.Node{
|
||||
Key: key,
|
||||
Name: getNodeName(data),
|
||||
Ip: data.Ip,
|
||||
Port: "8000",
|
||||
Mac: data.Mac,
|
||||
Status: constants.StatusOnline,
|
||||
IsMaster: data.Master,
|
||||
UpdateTs: time.Now(),
|
||||
UpdateTsUnix: time.Now().Unix(),
|
||||
}
|
||||
if err := node.Add(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
} else if node.Key != "" {
|
||||
// 数据库存在该节点
|
||||
node.Status = constants.StatusOnline
|
||||
node.UpdateTs = time.Now()
|
||||
node.UpdateTsUnix = time.Now().Unix()
|
||||
if err := node.Save(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
_, err = c.Upsert(bson.M{"key": data.Key}, bson.M{
|
||||
"$set": bson.M{
|
||||
"status": constants.StatusOnline,
|
||||
"update_ts": time.Now(),
|
||||
"update_ts_unix": time.Now().Unix(),
|
||||
},
|
||||
"$setOnInsert": bson.M{
|
||||
"_id": bson.NewObjectId(),
|
||||
"key": data.Key,
|
||||
"name": getNodeName(data),
|
||||
"ip": data.Ip,
|
||||
"port": "8000",
|
||||
"mac": data.Mac,
|
||||
"is_master": data.Master,
|
||||
},
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// 更新节点数据
|
||||
|
||||
Reference in New Issue
Block a user