fetch sysinfo from remote

This commit is contained in:
Marvin Zhang
2019-07-22 22:25:29 +08:00
parent 8df06ba278
commit 072e60e17e
6 changed files with 170 additions and 42 deletions

View File

@@ -1,7 +1,7 @@
package constants
const (
MsgTypeGetLog = "get-log"
MsgTypeGetSystemInfo = "get-sys-info"
MsgTypeCancelTask = "cancel-task"
MsgTypeGetLog = "get-log"
MsgTypeGetSystemInfo = "get-sys-info"
MsgTypeCancelTask = "cancel-task"
)

View File

@@ -60,7 +60,6 @@ func main() {
// 初始化节点服务
if err := services.InitNodeService(); err != nil {
debug.PrintStack()
panic(err)
}
log.Info("初始化节点配置成功")

View File

@@ -39,6 +39,7 @@ func (n *Node) Add() error {
n.UpdateTs = time.Now()
n.CreateTs = time.Now()
if err := c.Insert(&n); err != nil {
debug.PrintStack()
return err
}
return nil
@@ -48,6 +49,7 @@ func (n *Node) Delete() error {
s, c := database.GetCol("nodes")
defer s.Close()
if err := c.RemoveId(n.Id); err != nil {
debug.PrintStack()
return err
}
return nil
@@ -57,6 +59,7 @@ func (n *Node) GetTasks() ([]Task, error) {
tasks, err := GetTaskList(bson.M{"node_id": n.Id}, 0, 10, "-create_ts")
//tasks, err := GetTaskList(nil, 0, 10, "-create_ts")
if err != nil {
debug.PrintStack()
return []Task{}, err
}
@@ -69,6 +72,7 @@ func GetNodeList(filter interface{}) ([]Node, error) {
var results []Node
if err := c.Find(filter).All(&results); err != nil {
debug.PrintStack()
return results, err
}
return results, nil
@@ -78,15 +82,30 @@ func GetNode(id bson.ObjectId) (Node, error) {
s, c := database.GetCol("nodes")
defer s.Close()
var result Node
if err := c.FindId(id).One(&result); err != nil {
var node Node
if err := c.FindId(id).One(&node); err != nil {
if err != mgo.ErrNotFound {
log.Errorf(err.Error())
debug.PrintStack()
}
return result, err
return node, err
}
return result, nil
return node, nil
}
func GetNodeByMac(mac string) (Node, error) {
s, c := database.GetCol("nodes")
defer s.Close()
var node Node
if err := c.Find(bson.M{"mac": mac}).One(&node); err != nil {
if err != mgo.ErrNotFound {
log.Errorf(err.Error())
debug.PrintStack()
}
return node, err
}
return node, nil
}
func UpdateNode(id bson.ObjectId, item Node) error {

View File

@@ -93,7 +93,9 @@ func GetNodeTaskList(c *gin.Context) {
}
func GetSystemInfo(c *gin.Context) {
sysInfo, _ := services.GetSystemInfo()
id := c.Param("id")
sysInfo, _ := services.GetSystemInfo(id)
c.JSON(http.StatusOK, Response{
Status: "ok",

View File

@@ -22,6 +22,27 @@ type Data struct {
UpdateTs time.Time `json:"update_ts"`
}
type NodeMessage struct {
// 通信类别
Type string `json:"type"`
// 任务相关
TaskId string `json:"task_id"` // 任务ID
// 节点相关
NodeId string `json:"node_id"` // 节点ID
// 日志相关
LogPath string `json:"log_path"` // 日志路径
Log string `json:"log"` // 日志
// 系统信息
SysInfo model.SystemInfo `json:"sys_info"`
// 错误相关
Error string `json:"error"`
}
const (
Yes = "Y"
No = "N"
@@ -65,25 +86,51 @@ func GetCurrentNode() (model.Node, error) {
// 获取本机MAC地址
mac, err := GetMac()
if err != nil {
debug.PrintStack()
return model.Node{}, err
}
s, c := database.GetCol("nodes")
defer s.Close()
// 从数据库中获取当前节点
var n model.Node
if err := c.Find(bson.M{"mac": mac}).One(&n); err != nil {
return n, err
var node model.Node
errNum := 0
for {
// 如果错误次数超过10次
if errNum >= 10 {
panic("cannot get current node")
}
// 尝试获取节点
node, err = model.GetNodeByMac(mac)
// 如果获取失败
if err != nil {
// 增加错误次数
errNum++
// 5秒后重试
time.Sleep(5 * time.Second)
continue
}
// 跳出循环
break
}
return n, nil
return node, nil
}
// 是否为主节点
// 当前节点是否为主节点
func IsMaster() bool {
return viper.GetString("server.master") == Yes
}
// 该ID的节点是否为主节点
func IsMasterNode(id string) bool {
curNode, _ := GetCurrentNode()
node, _ := model.GetNode(bson.ObjectIdHex(id))
return curNode.Id == node.Id
}
// 获取节点数据
func GetNodeData() (Data, error) {
mac, err := GetMac()
@@ -127,6 +174,7 @@ func UpdateNodeStatus() {
// 如果记录的更新时间超过60秒该节点被认为离线
if time.Now().Sub(data.UpdateTs) > 60*time.Second {
// 在Redis中删除该节点
if err := database.RedisClient.HDel("nodes", data.Mac); err != nil {
log.Errorf(err.Error())
return
@@ -214,29 +262,6 @@ func UpdateNodeData() {
}
}
// TODO: 查看节点PATH
func GetNodePath() {
//os.Environ()
}
type NodeMessage struct {
// 通信类别
Type string `json:"type"`
// 任务相关
TaskId string `json:"task_id"` // 任务ID
// 日志相关
LogPath string `json:"log_path"` // 日志路径
Log string `json:"log"` // 日志
// 环境变量
Env string `json:"env"`
// 错误相关
Error string `json:"error"`
}
func MasterNodeCallback(channel string, msgStr string) {
// 反序列化
var msg NodeMessage
@@ -247,10 +272,18 @@ func MasterNodeCallback(channel string, msgStr string) {
}
if msg.Type == constants.MsgTypeGetLog {
// 获取日志
fmt.Println(msg)
time.Sleep(10 * time.Millisecond)
ch := TaskLogChanMap.ChanBlocked(msg.TaskId)
ch <- msg.Log
} else if msg.Type == constants.MsgTypeGetSystemInfo {
// 获取系统信息
fmt.Println(msg)
time.Sleep(10 * time.Millisecond)
ch := SystemInfoChanMap.ChanBlocked(msg.NodeId)
sysInfoBytes, _ := json.Marshal(&msg.SysInfo)
ch <- string(sysInfoBytes)
}
}
@@ -267,6 +300,7 @@ func WorkerNodeCallback(channel string, msgStr string) {
if msg.Type == constants.MsgTypeGetLog {
// 消息类型为获取日志
// 发出的消息
msgSd := NodeMessage{
Type: constants.MsgTypeGetLog,
TaskId: msg.TaskId,
@@ -296,8 +330,32 @@ func WorkerNodeCallback(channel string, msgStr string) {
return
}
} else if msg.Type == constants.MsgTypeCancelTask {
// 取消任务
ch := TaskExecChanMap.ChanBlocked(msg.TaskId)
ch <- constants.TaskCancel
} else if msg.Type == constants.MsgTypeGetSystemInfo {
// 获取环境信息
sysInfo, err := GetLocalSystemInfo()
if err != nil {
log.Errorf(err.Error())
return
}
msgSd := NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: msg.NodeId,
SysInfo: sysInfo,
}
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
fmt.Println(msgSd)
if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil {
log.Errorf(err.Error())
return
}
}
}
@@ -313,6 +371,9 @@ func InitNodeService() error {
return err
}
// 首次更新节点数据注册到Redis
UpdateNodeData()
// 消息订阅
var sub database.Subscriber
sub.Connect()

View File

@@ -1,14 +1,22 @@
package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"runtime/debug"
"strings"
)
var SystemInfoChanMap = utils.NewChanMap()
var executableNameMap = map[string]string{
// python
"python": "Python",
@@ -49,7 +57,9 @@ func GetExecutables() (executables []model.Executable, err error) {
for _, path := range pathValues {
fileList, err := ioutil.ReadDir(path)
if err != nil {
return executables, err
log.Errorf(err.Error())
debug.PrintStack()
continue
}
for _, file := range fileList {
@@ -71,13 +81,14 @@ func GetExecutables() (executables []model.Executable, err error) {
return executables, nil
}
func GetSystemInfo() (sysInfo model.SystemInfo, err error) {
func GetLocalSystemInfo() (sysInfo model.SystemInfo, err error) {
executables, err := GetExecutables()
if err != nil {
return sysInfo, err
}
hostname, err := os.Hostname()
if err != nil {
debug.PrintStack()
return sysInfo, err
}
@@ -89,3 +100,39 @@ func GetSystemInfo() (sysInfo model.SystemInfo, err error) {
Executables: executables,
}, nil
}
func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
// 发送消息
msg := NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: id,
}
// 序列化
msgBytes, _ := json.Marshal(&msg)
if err := database.Publish("nodes:"+id, string(msgBytes)); err != nil {
return model.SystemInfo{}, err
}
// 通道
ch := SystemInfoChanMap.ChanBlocked(id)
// 等待响应,阻塞
sysInfoStr := <-ch
// 反序列化
if err := json.Unmarshal([]byte(sysInfoStr), &sysInfo); err != nil {
return sysInfo, err
}
return sysInfo, nil
}
func GetSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
if IsMasterNode(id) {
sysInfo, err = GetLocalSystemInfo()
} else {
sysInfo, err = GetRemoteSystemInfo(id)
}
return
}