mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-02-01 18:20:17 +01:00
added golang
This commit is contained in:
348
backend/services/node.go
Normal file
348
backend/services/node.go
Normal file
@@ -0,0 +1,348 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"crawlab/lib/cron"
|
||||
"crawlab/model"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/apex/log"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
"github.com/spf13/viper"
|
||||
"net"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Data struct {
|
||||
Mac string `json:"mac"`
|
||||
Ip string `json:"ip"`
|
||||
Master bool `json:"master"`
|
||||
UpdateTs time.Time `json:"update_ts"`
|
||||
}
|
||||
|
||||
const (
|
||||
Yes = "Y"
|
||||
No = "N"
|
||||
)
|
||||
|
||||
// 获取本机的IP地址
|
||||
// TODO: 考虑多个IP地址的情况
|
||||
func GetIp() (string, error) {
|
||||
addrList, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, value := range addrList {
|
||||
if ipNet, ok := value.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
|
||||
if ipNet.IP.To4() != nil {
|
||||
return ipNet.IP.String(), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// 获取本机的MAC地址
|
||||
func GetMac() (string, error) {
|
||||
interfaces, err := net.Interfaces()
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return "", err
|
||||
}
|
||||
for _, inter := range interfaces {
|
||||
if inter.HardwareAddr != nil {
|
||||
mac := inter.HardwareAddr.String()
|
||||
return mac, nil
|
||||
}
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// 获取本机节点
|
||||
func GetCurrentNode() (model.Node, error) {
|
||||
// 获取本机MAC地址
|
||||
mac, err := GetMac()
|
||||
if err != nil {
|
||||
return model.Node{}, err
|
||||
}
|
||||
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
|
||||
// 从数据库中获取当前节点
|
||||
var n model.Node
|
||||
if err := c.Find(bson.M{"mac": mac}).One(&n); err != nil {
|
||||
return n, err
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// 是否为主节点
|
||||
func IsMaster() bool {
|
||||
return viper.GetString("server.master") == Yes
|
||||
}
|
||||
|
||||
// 获取节点数据
|
||||
func GetNodeData() (Data, error) {
|
||||
mac, err := GetMac()
|
||||
if err != nil {
|
||||
return Data{}, err
|
||||
}
|
||||
|
||||
value, err := database.RedisClient.HGet("nodes", mac)
|
||||
data := Data{}
|
||||
if err := json.Unmarshal([]byte(value), &data); err != nil {
|
||||
return data, err
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
// 更新所有节点状态
|
||||
func UpdateNodeStatus() {
|
||||
// 从Redis获取节点keys
|
||||
list, err := database.RedisClient.HKeys("nodes")
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 遍历节点keys
|
||||
for _, mac := range list {
|
||||
// 获取节点数据
|
||||
value, err := database.RedisClient.HGet("nodes", mac)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 解析节点列表数据
|
||||
var data Data
|
||||
if err := json.Unmarshal([]byte(value), &data); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 如果记录的更新时间超过60秒,该节点被认为离线
|
||||
if time.Now().Sub(data.UpdateTs) > 60*time.Second {
|
||||
// 在Redis中删除该节点
|
||||
if err := database.RedisClient.HDel("nodes", data.Mac); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 在MongoDB中该节点设置状态为离线
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
var node model.Node
|
||||
if err := c.Find(bson.M{"mac": mac}).One(&node); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
node.Status = constants.StatusOffline
|
||||
if err := node.Save(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 更新节点信息到数据库
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
var node model.Node
|
||||
if err := c.Find(bson.M{"mac": mac}).One(&node); err != nil {
|
||||
// 数据库不存在该节点
|
||||
node = model.Node{
|
||||
Name: data.Mac,
|
||||
Ip: data.Ip,
|
||||
Port: "8000",
|
||||
Mac: data.Mac,
|
||||
Status: constants.StatusOnline,
|
||||
}
|
||||
if err := node.Add(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// 数据库存在该节点
|
||||
node.Status = constants.StatusOnline
|
||||
if err := node.Save(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 更新节点数据
|
||||
func UpdateNodeData() {
|
||||
// 获取MAC地址
|
||||
mac, err := GetMac()
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 获取IP地址
|
||||
ip, err := GetIp()
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 构造节点数据
|
||||
data := Data{
|
||||
Mac: mac,
|
||||
Ip: ip,
|
||||
Master: IsMaster(),
|
||||
UpdateTs: time.Now(),
|
||||
}
|
||||
|
||||
// 注册节点到Redis
|
||||
dataBytes, err := json.Marshal(&data)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
if err := database.RedisClient.HSet("nodes", mac, string(dataBytes)); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: 查看节点PATH
|
||||
func GetNodePath() {
|
||||
//os.Environ()
|
||||
}
|
||||
|
||||
type NodeMessage struct {
|
||||
// 通信类别
|
||||
Type string `json:"type"`
|
||||
|
||||
// 任务相关
|
||||
TaskId string `json:"task_id"` // 任务ID
|
||||
|
||||
// 日志相关
|
||||
LogPath string `json:"log_path"` // 日志路径
|
||||
Log string `json:"log"` // 日志
|
||||
|
||||
// 环境变量
|
||||
Env string `json:"env"`
|
||||
|
||||
// 错误相关
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func MasterNodeCallback(channel string, msgStr string) {
|
||||
// 反序列化
|
||||
var msg NodeMessage
|
||||
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
|
||||
if msg.Type == constants.MsgTypeGetLog {
|
||||
fmt.Println(msg)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
ch := TaskLogChanMap.ChanBlocked(msg.TaskId)
|
||||
ch <- msg.Log
|
||||
}
|
||||
}
|
||||
|
||||
func WorkerNodeCallback(channel string, msgStr string) {
|
||||
// 反序列化
|
||||
msg := NodeMessage{}
|
||||
fmt.Println(msgStr)
|
||||
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
|
||||
if msg.Type == constants.MsgTypeGetLog {
|
||||
// 消息类型为获取日志
|
||||
|
||||
msgSd := NodeMessage{
|
||||
Type: constants.MsgTypeGetLog,
|
||||
TaskId: msg.TaskId,
|
||||
}
|
||||
|
||||
// 获取本地日志
|
||||
logStr, err := GetLocalLog(msg.LogPath)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
msgSd.Error = err.Error()
|
||||
}
|
||||
msgSd.Log = string(logStr)
|
||||
|
||||
// 序列化
|
||||
msgSdBytes, err := json.Marshal(&msgSd)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
|
||||
// 发布消息给主节点
|
||||
fmt.Println(msgSd)
|
||||
if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
} else if msg.Type == constants.MsgTypeCancelTask {
|
||||
ch := TaskExecChanMap.ChanBlocked(msg.TaskId)
|
||||
ch <- constants.TaskCancel
|
||||
}
|
||||
}
|
||||
|
||||
// 初始化节点服务
|
||||
func InitNodeService() error {
|
||||
// 构造定时任务
|
||||
c := cron.New(cron.WithSeconds())
|
||||
|
||||
// 每5秒更新一次本节点信息
|
||||
spec := "0/5 * * * * *"
|
||||
if _, err := c.AddFunc(spec, UpdateNodeData); err != nil {
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
|
||||
// 消息订阅
|
||||
var sub database.Subscriber
|
||||
sub.Connect()
|
||||
|
||||
// 获取当前节点
|
||||
node, err := GetCurrentNode()
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if IsMaster() {
|
||||
// 如果为主节点,订阅主节点通信频道
|
||||
channel := "nodes:master"
|
||||
sub.Subscribe(channel, MasterNodeCallback)
|
||||
} else {
|
||||
// 若为工作节点,订阅单独指定通信频道
|
||||
channel := "nodes:" + node.Id.Hex()
|
||||
sub.Subscribe(channel, WorkerNodeCallback)
|
||||
}
|
||||
|
||||
// 如果为主节点,每30秒刷新所有节点信息
|
||||
if IsMaster() {
|
||||
spec := "*/10 * * * * *"
|
||||
if _, err := c.AddFunc(spec, UpdateNodeStatus); err != nil {
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c.Start()
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user