mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
@@ -143,6 +143,7 @@ func (n *Node) GetTasks() ([]Task, error) {
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// 节点列表
|
||||
func GetNodeList(filter interface{}) ([]Node, error) {
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
@@ -156,6 +157,7 @@ func GetNodeList(filter interface{}) ([]Node, error) {
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// 节点信息
|
||||
func GetNode(id bson.ObjectId) (Node, error) {
|
||||
var node Node
|
||||
|
||||
@@ -176,6 +178,7 @@ func GetNode(id bson.ObjectId) (Node, error) {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
// 节点信息
|
||||
func GetNodeByKey(key string) (Node, error) {
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
@@ -191,6 +194,7 @@ func GetNodeByKey(key string) (Node, error) {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
// 更新节点
|
||||
func UpdateNode(id bson.ObjectId, item Node) error {
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
@@ -206,6 +210,7 @@ func UpdateNode(id bson.ObjectId, item Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 任务列表
|
||||
func GetNodeTaskList(id bson.ObjectId) ([]Task, error) {
|
||||
node, err := GetNode(id)
|
||||
if err != nil {
|
||||
@@ -218,6 +223,7 @@ func GetNodeTaskList(id bson.ObjectId) ([]Task, error) {
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// 节点数
|
||||
func GetNodeCount(query interface{}) (int, error) {
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
|
||||
@@ -49,10 +49,8 @@ func GetRemoteLog(task model.Task) (logStr string, err error) {
|
||||
select {
|
||||
case logStr = <-ch:
|
||||
log.Infof("get remote log")
|
||||
break
|
||||
case <-time.After(30 * time.Second):
|
||||
logStr = "get remote log timeout"
|
||||
break
|
||||
}
|
||||
|
||||
return logStr, nil
|
||||
|
||||
@@ -50,36 +50,44 @@ func GetNodeData() (Data, error) {
|
||||
return data, err
|
||||
}
|
||||
|
||||
func GetRedisNode(key string) (*Data, error) {
|
||||
// 获取节点数据
|
||||
value, err := database.RedisClient.HGet("nodes", key)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 解析节点列表数据
|
||||
var data Data
|
||||
if err := json.Unmarshal([]byte(value), &data); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return nil, err
|
||||
}
|
||||
return &data, nil
|
||||
}
|
||||
|
||||
// 更新所有节点状态
|
||||
func UpdateNodeStatus() {
|
||||
// 从Redis获取节点keys
|
||||
list, err := database.RedisClient.HKeys("nodes")
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
log.Errorf("get redis node keys error: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 遍历节点keys
|
||||
for _, key := range list {
|
||||
// 获取节点数据
|
||||
value, err := database.RedisClient.HGet("nodes", key)
|
||||
|
||||
data, err := GetRedisNode(key)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
continue
|
||||
}
|
||||
|
||||
// 解析节点列表数据
|
||||
var data Data
|
||||
if err := json.Unmarshal([]byte(value), &data); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 如果记录的更新时间超过60秒,该节点被认为离线
|
||||
if time.Now().Unix()-data.UpdateTsUnix > 60 {
|
||||
// 在Redis中删除该节点
|
||||
if err := database.RedisClient.HDel("nodes", data.Key); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
log.Errorf("delete redis node key error:%s, key:%s", err.Error(), data.Key)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -94,7 +102,8 @@ func UpdateNodeStatus() {
|
||||
model.ResetNodeStatusToOffline(list)
|
||||
}
|
||||
|
||||
func handleNodeInfo(key string, data Data) {
|
||||
// 处理接到信息
|
||||
func handleNodeInfo(key string, data *Data) {
|
||||
// 添加同步锁
|
||||
v, err := database.RedisClient.Lock(key)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user