mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
256 lines
5.8 KiB
Go
256 lines
5.8 KiB
Go
package services
|
||
|
||
import (
|
||
"crawlab/constants"
|
||
"crawlab/database"
|
||
"crawlab/model"
|
||
"crawlab/services/local_node"
|
||
"crawlab/utils"
|
||
"encoding/json"
|
||
"github.com/apex/log"
|
||
"github.com/globalsign/mgo/bson"
|
||
"runtime/debug"
|
||
"time"
|
||
)
|
||
|
||
type Data struct {
|
||
Key string `json:"key"`
|
||
Mac string `json:"mac"`
|
||
Ip string `json:"ip"`
|
||
Hostname string `json:"hostname"`
|
||
Name string `json:"name"`
|
||
NameType string `json:"name_type"`
|
||
|
||
Master bool `json:"master"`
|
||
UpdateTs time.Time `json:"update_ts"`
|
||
UpdateTsUnix int64 `json:"update_ts_unix"`
|
||
}
|
||
|
||
// 所有调用IsMasterNode的方法,都永远会在master节点执行,所以GetCurrentNode方法返回永远是master节点
|
||
// 该ID的节点是否为主节点
|
||
func IsMasterNode(id string) bool {
|
||
curNode := local_node.CurrentNode()
|
||
//curNode, _ := model.GetCurrentNode()
|
||
node, _ := model.GetNode(bson.ObjectIdHex(id))
|
||
return curNode.Id == node.Id
|
||
}
|
||
|
||
// 获取节点数据
|
||
func GetNodeData() (Data, error) {
|
||
localNode := local_node.GetLocalNode()
|
||
key := localNode.Identify
|
||
if key == "" {
|
||
return Data{}, nil
|
||
}
|
||
|
||
value, err := database.RedisClient.HGet("nodes", key)
|
||
data := Data{}
|
||
if err := json.Unmarshal([]byte(value), &data); err != nil {
|
||
return data, err
|
||
}
|
||
return data, err
|
||
}
|
||
func GetRedisNode(key string) (*Data, error) {
|
||
// 获取节点数据
|
||
value, err := database.RedisClient.HGet("nodes", key)
|
||
if err != nil {
|
||
log.Errorf(err.Error())
|
||
return nil, err
|
||
}
|
||
|
||
// 解析节点列表数据
|
||
var data Data
|
||
if err := json.Unmarshal([]byte(value), &data); err != nil {
|
||
log.Errorf(err.Error())
|
||
return nil, err
|
||
}
|
||
return &data, nil
|
||
}
|
||
|
||
// 更新所有节点状态
|
||
func UpdateNodeStatus() {
|
||
// 从Redis获取节点keys
|
||
list, err := database.RedisClient.HScan("nodes")
|
||
if err != nil {
|
||
log.Errorf("get redis node keys error: %s", err.Error())
|
||
return
|
||
}
|
||
var offlineKeys []string
|
||
// 遍历节点keys
|
||
for _, dataStr := range list {
|
||
var data Data
|
||
if err := json.Unmarshal([]byte(dataStr), &data); err != nil {
|
||
log.Errorf(err.Error())
|
||
continue
|
||
}
|
||
// 如果记录的更新时间超过60秒,该节点被认为离线
|
||
if time.Now().Unix()-data.UpdateTsUnix > 60 {
|
||
offlineKeys = append(offlineKeys, data.Key)
|
||
// 在Redis中删除该节点
|
||
if err := database.RedisClient.HDel("nodes", data.Key); err != nil {
|
||
log.Errorf("delete redis node key error:%s, key:%s", err.Error(), data.Key)
|
||
}
|
||
continue
|
||
}
|
||
|
||
// 处理node信息
|
||
if err = UpdateNodeInfo(&data); err != nil {
|
||
log.Errorf(err.Error())
|
||
continue
|
||
}
|
||
}
|
||
if len(offlineKeys) > 0 {
|
||
s, c := database.GetCol("nodes")
|
||
defer s.Close()
|
||
_, err = c.UpdateAll(bson.M{
|
||
"key": bson.M{
|
||
"$in": offlineKeys,
|
||
},
|
||
}, bson.M{
|
||
"$set": bson.M{
|
||
"status": constants.StatusOffline,
|
||
"update_ts": time.Now(),
|
||
"update_ts_unix": time.Now().Unix(),
|
||
},
|
||
})
|
||
if err != nil {
|
||
log.Errorf(err.Error())
|
||
}
|
||
}
|
||
}
|
||
|
||
// 处理节点信息
|
||
func UpdateNodeInfo(data *Data) (err error) {
|
||
// 更新节点信息到数据库
|
||
s, c := database.GetCol("nodes")
|
||
defer s.Close()
|
||
|
||
_, err = c.Upsert(bson.M{"key": data.Key}, bson.M{
|
||
"$set": bson.M{
|
||
"status": constants.StatusOnline,
|
||
"key": data.Key,
|
||
"name_type": data.NameType,
|
||
"ip": data.Ip,
|
||
"port": "8000",
|
||
"mac": data.Mac,
|
||
"is_master": data.Master,
|
||
"update_ts": time.Now(),
|
||
"update_ts_unix": time.Now().Unix(),
|
||
},
|
||
"$setOnInsert": bson.M{
|
||
"name": data.Name,
|
||
"_id": bson.NewObjectId(),
|
||
},
|
||
})
|
||
return err
|
||
}
|
||
|
||
// 更新节点数据
|
||
func UpdateNodeData() {
|
||
localNode := local_node.GetLocalNode()
|
||
key := localNode.Identify
|
||
// 构造节点数据
|
||
data := Data{
|
||
Key: key,
|
||
Mac: localNode.Mac,
|
||
Ip: localNode.Ip,
|
||
Hostname: localNode.Hostname,
|
||
Name: localNode.Identify,
|
||
NameType: string(localNode.IdentifyType),
|
||
Master: model.IsMaster(),
|
||
UpdateTs: time.Now(),
|
||
UpdateTsUnix: time.Now().Unix(),
|
||
}
|
||
|
||
// 注册节点到Redis
|
||
dataBytes, err := json.Marshal(&data)
|
||
if err != nil {
|
||
log.Errorf(err.Error())
|
||
debug.PrintStack()
|
||
return
|
||
}
|
||
|
||
if err := database.RedisClient.HSet("nodes", key, utils.BytesToString(dataBytes)); err != nil {
|
||
log.Errorf(err.Error())
|
||
return
|
||
}
|
||
}
|
||
|
||
// 发送心跳信息到Redis,每5秒发送一次
|
||
func SendHeartBeat() {
|
||
for {
|
||
UpdateNodeData()
|
||
time.Sleep(5 * time.Second)
|
||
}
|
||
}
|
||
|
||
// 每10秒刷新一次节点信息
|
||
func UpdateNodeStatusPeriodically() {
|
||
for {
|
||
UpdateNodeStatus()
|
||
time.Sleep(10 * time.Second)
|
||
}
|
||
}
|
||
|
||
// 每60秒更新异常节点信息
|
||
func UpdateOfflineNodeTaskToAbnormalPeriodically() {
|
||
for {
|
||
nodes, err := model.GetNodeList(bson.M{"status": constants.StatusOffline})
|
||
if err != nil {
|
||
log.Errorf("get nodes error: " + err.Error())
|
||
debug.PrintStack()
|
||
continue
|
||
}
|
||
for _, n := range nodes {
|
||
if err := model.UpdateTaskToAbnormal(n.Id); err != nil {
|
||
log.Errorf("update task to abnormal error: " + err.Error())
|
||
debug.PrintStack()
|
||
continue
|
||
}
|
||
}
|
||
time.Sleep(60 * time.Second)
|
||
}
|
||
}
|
||
|
||
// 初始化节点服务
|
||
func InitNodeService() error {
|
||
node, err := local_node.InitLocalNode()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 每5秒更新一次本节点信息
|
||
go SendHeartBeat()
|
||
|
||
// 首次更新节点数据(注册到Redis)
|
||
UpdateNodeData()
|
||
if model.IsMaster() {
|
||
err = model.UpdateMasterNodeInfo(node.Identify, node.Ip, node.Mac, node.Hostname)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
// 节点准备完毕
|
||
if err = node.Ready(); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 如果为主节点
|
||
if model.IsMaster() {
|
||
// 每10秒刷新所有节点信息
|
||
go UpdateNodeStatusPeriodically()
|
||
|
||
// 每60秒更新离线节点任务为异常
|
||
go UpdateOfflineNodeTaskToAbnormalPeriodically()
|
||
}
|
||
|
||
// 更新在当前节点执行中的任务状态为:abnormal
|
||
if err := model.UpdateTaskToAbnormal(node.Current().Id); err != nil {
|
||
debug.PrintStack()
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|