Files
crawlab/backend/services/node.go
Marvin Zhang 56c99b314f added golang
2019-07-22 12:51:52 +08:00

349 lines
7.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/lib/cron"
"crawlab/model"
"encoding/json"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"net"
"runtime/debug"
"time"
)
type Data struct {
Mac string `json:"mac"`
Ip string `json:"ip"`
Master bool `json:"master"`
UpdateTs time.Time `json:"update_ts"`
}
const (
Yes = "Y"
No = "N"
)
// 获取本机的IP地址
// TODO: 考虑多个IP地址的情况
func GetIp() (string, error) {
addrList, err := net.InterfaceAddrs()
if err != nil {
return "", err
}
for _, value := range addrList {
if ipNet, ok := value.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
return ipNet.IP.String(), nil
}
}
}
return "", nil
}
// 获取本机的MAC地址
func GetMac() (string, error) {
interfaces, err := net.Interfaces()
if err != nil {
debug.PrintStack()
return "", err
}
for _, inter := range interfaces {
if inter.HardwareAddr != nil {
mac := inter.HardwareAddr.String()
return mac, nil
}
}
return "", nil
}
// 获取本机节点
func GetCurrentNode() (model.Node, error) {
// 获取本机MAC地址
mac, err := GetMac()
if err != nil {
return model.Node{}, err
}
s, c := database.GetCol("nodes")
defer s.Close()
// 从数据库中获取当前节点
var n model.Node
if err := c.Find(bson.M{"mac": mac}).One(&n); err != nil {
return n, err
}
return n, nil
}
// 是否为主节点
func IsMaster() bool {
return viper.GetString("server.master") == Yes
}
// 获取节点数据
func GetNodeData() (Data, error) {
mac, err := GetMac()
if err != nil {
return Data{}, err
}
value, err := database.RedisClient.HGet("nodes", mac)
data := Data{}
if err := json.Unmarshal([]byte(value), &data); err != nil {
return data, err
}
return data, err
}
// 更新所有节点状态
func UpdateNodeStatus() {
// 从Redis获取节点keys
list, err := database.RedisClient.HKeys("nodes")
if err != nil {
log.Errorf(err.Error())
return
}
// 遍历节点keys
for _, mac := range list {
// 获取节点数据
value, err := database.RedisClient.HGet("nodes", mac)
if err != nil {
log.Errorf(err.Error())
return
}
// 解析节点列表数据
var data Data
if err := json.Unmarshal([]byte(value), &data); err != nil {
log.Errorf(err.Error())
return
}
// 如果记录的更新时间超过60秒该节点被认为离线
if time.Now().Sub(data.UpdateTs) > 60*time.Second {
// 在Redis中删除该节点
if err := database.RedisClient.HDel("nodes", data.Mac); err != nil {
log.Errorf(err.Error())
return
}
// 在MongoDB中该节点设置状态为离线
s, c := database.GetCol("nodes")
defer s.Close()
var node model.Node
if err := c.Find(bson.M{"mac": mac}).One(&node); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
node.Status = constants.StatusOffline
if err := node.Save(); err != nil {
log.Errorf(err.Error())
return
}
continue
}
// 更新节点信息到数据库
s, c := database.GetCol("nodes")
defer s.Close()
var node model.Node
if err := c.Find(bson.M{"mac": mac}).One(&node); err != nil {
// 数据库不存在该节点
node = model.Node{
Name: data.Mac,
Ip: data.Ip,
Port: "8000",
Mac: data.Mac,
Status: constants.StatusOnline,
}
if err := node.Add(); err != nil {
log.Errorf(err.Error())
return
}
} else {
// 数据库存在该节点
node.Status = constants.StatusOnline
if err := node.Save(); err != nil {
log.Errorf(err.Error())
return
}
}
}
}
// 更新节点数据
func UpdateNodeData() {
// 获取MAC地址
mac, err := GetMac()
if err != nil {
log.Errorf(err.Error())
return
}
// 获取IP地址
ip, err := GetIp()
if err != nil {
log.Errorf(err.Error())
return
}
// 构造节点数据
data := Data{
Mac: mac,
Ip: ip,
Master: IsMaster(),
UpdateTs: time.Now(),
}
// 注册节点到Redis
dataBytes, err := json.Marshal(&data)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
if err := database.RedisClient.HSet("nodes", mac, string(dataBytes)); err != nil {
log.Errorf(err.Error())
return
}
}
// TODO: 查看节点PATH
func GetNodePath() {
//os.Environ()
}
type NodeMessage struct {
// 通信类别
Type string `json:"type"`
// 任务相关
TaskId string `json:"task_id"` // 任务ID
// 日志相关
LogPath string `json:"log_path"` // 日志路径
Log string `json:"log"` // 日志
// 环境变量
Env string `json:"env"`
// 错误相关
Error string `json:"error"`
}
func MasterNodeCallback(channel string, msgStr string) {
// 反序列化
var msg NodeMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
if msg.Type == constants.MsgTypeGetLog {
fmt.Println(msg)
time.Sleep(10 * time.Millisecond)
ch := TaskLogChanMap.ChanBlocked(msg.TaskId)
ch <- msg.Log
}
}
func WorkerNodeCallback(channel string, msgStr string) {
// 反序列化
msg := NodeMessage{}
fmt.Println(msgStr)
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
if msg.Type == constants.MsgTypeGetLog {
// 消息类型为获取日志
msgSd := NodeMessage{
Type: constants.MsgTypeGetLog,
TaskId: msg.TaskId,
}
// 获取本地日志
logStr, err := GetLocalLog(msg.LogPath)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
msgSd.Error = err.Error()
}
msgSd.Log = string(logStr)
// 序列化
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
// 发布消息给主节点
fmt.Println(msgSd)
if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil {
log.Errorf(err.Error())
return
}
} else if msg.Type == constants.MsgTypeCancelTask {
ch := TaskExecChanMap.ChanBlocked(msg.TaskId)
ch <- constants.TaskCancel
}
}
// 初始化节点服务
func InitNodeService() error {
// 构造定时任务
c := cron.New(cron.WithSeconds())
// 每5秒更新一次本节点信息
spec := "0/5 * * * * *"
if _, err := c.AddFunc(spec, UpdateNodeData); err != nil {
debug.PrintStack()
return err
}
// 消息订阅
var sub database.Subscriber
sub.Connect()
// 获取当前节点
node, err := GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
return err
}
if IsMaster() {
// 如果为主节点,订阅主节点通信频道
channel := "nodes:master"
sub.Subscribe(channel, MasterNodeCallback)
} else {
// 若为工作节点,订阅单独指定通信频道
channel := "nodes:" + node.Id.Hex()
sub.Subscribe(channel, WorkerNodeCallback)
}
// 如果为主节点每30秒刷新所有节点信息
if IsMaster() {
spec := "*/10 * * * * *"
if _, err := c.AddFunc(spec, UpdateNodeStatus); err != nil {
debug.PrintStack()
return err
}
}
c.Start()
return nil
}