diff --git a/backend/constants/message.go b/backend/constants/message.go index 89701b33..521a2019 100644 --- a/backend/constants/message.go +++ b/backend/constants/message.go @@ -1,7 +1,7 @@ package constants const ( - MsgTypeGetLog = "get-log" - MsgTypeGetSystemInfo = "get-sys-info" - MsgTypeCancelTask = "cancel-task" + MsgTypeGetLog = "get-log" + MsgTypeGetSystemInfo = "get-sys-info" + MsgTypeCancelTask = "cancel-task" ) diff --git a/backend/main.go b/backend/main.go index 5ee066e7..79ea41d1 100644 --- a/backend/main.go +++ b/backend/main.go @@ -60,7 +60,6 @@ func main() { // 初始化节点服务 if err := services.InitNodeService(); err != nil { - debug.PrintStack() panic(err) } log.Info("初始化节点配置成功") diff --git a/backend/model/node.go b/backend/model/node.go index 8a49f121..63461ced 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -39,6 +39,7 @@ func (n *Node) Add() error { n.UpdateTs = time.Now() n.CreateTs = time.Now() if err := c.Insert(&n); err != nil { + debug.PrintStack() return err } return nil @@ -48,6 +49,7 @@ func (n *Node) Delete() error { s, c := database.GetCol("nodes") defer s.Close() if err := c.RemoveId(n.Id); err != nil { + debug.PrintStack() return err } return nil @@ -57,6 +59,7 @@ func (n *Node) GetTasks() ([]Task, error) { tasks, err := GetTaskList(bson.M{"node_id": n.Id}, 0, 10, "-create_ts") //tasks, err := GetTaskList(nil, 0, 10, "-create_ts") if err != nil { + debug.PrintStack() return []Task{}, err } @@ -69,6 +72,7 @@ func GetNodeList(filter interface{}) ([]Node, error) { var results []Node if err := c.Find(filter).All(&results); err != nil { + debug.PrintStack() return results, err } return results, nil @@ -78,15 +82,30 @@ func GetNode(id bson.ObjectId) (Node, error) { s, c := database.GetCol("nodes") defer s.Close() - var result Node - if err := c.FindId(id).One(&result); err != nil { + var node Node + if err := c.FindId(id).One(&node); err != nil { if err != mgo.ErrNotFound { log.Errorf(err.Error()) debug.PrintStack() } - return result, err + return node, err } - return result, nil + return node, nil +} + +func GetNodeByMac(mac string) (Node, error) { + s, c := database.GetCol("nodes") + defer s.Close() + + var node Node + if err := c.Find(bson.M{"mac": mac}).One(&node); err != nil { + if err != mgo.ErrNotFound { + log.Errorf(err.Error()) + debug.PrintStack() + } + return node, err + } + return node, nil } func UpdateNode(id bson.ObjectId, item Node) error { diff --git a/backend/routes/node.go b/backend/routes/node.go index cc30cae5..e9233b76 100644 --- a/backend/routes/node.go +++ b/backend/routes/node.go @@ -93,7 +93,9 @@ func GetNodeTaskList(c *gin.Context) { } func GetSystemInfo(c *gin.Context) { - sysInfo, _ := services.GetSystemInfo() + id := c.Param("id") + + sysInfo, _ := services.GetSystemInfo(id) c.JSON(http.StatusOK, Response{ Status: "ok", diff --git a/backend/services/node.go b/backend/services/node.go index d0d50c7d..c567b36f 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -22,6 +22,27 @@ type Data struct { UpdateTs time.Time `json:"update_ts"` } +type NodeMessage struct { + // 通信类别 + Type string `json:"type"` + + // 任务相关 + TaskId string `json:"task_id"` // 任务ID + + // 节点相关 + NodeId string `json:"node_id"` // 节点ID + + // 日志相关 + LogPath string `json:"log_path"` // 日志路径 + Log string `json:"log"` // 日志 + + // 系统信息 + SysInfo model.SystemInfo `json:"sys_info"` + + // 错误相关 + Error string `json:"error"` +} + const ( Yes = "Y" No = "N" @@ -65,25 +86,51 @@ func GetCurrentNode() (model.Node, error) { // 获取本机MAC地址 mac, err := GetMac() if err != nil { + debug.PrintStack() return model.Node{}, err } - s, c := database.GetCol("nodes") - defer s.Close() - // 从数据库中获取当前节点 - var n model.Node - if err := c.Find(bson.M{"mac": mac}).One(&n); err != nil { - return n, err + var node model.Node + errNum := 0 + for { + // 如果错误次数超过10次 + if errNum >= 10 { + panic("cannot get current node") + } + + // 尝试获取节点 + node, err = model.GetNodeByMac(mac) + + // 如果获取失败 + if err != nil { + // 增加错误次数 + errNum++ + + // 5秒后重试 + time.Sleep(5 * time.Second) + continue + } + + // 跳出循环 + break } - return n, nil + + return node, nil } -// 是否为主节点 +// 当前节点是否为主节点 func IsMaster() bool { return viper.GetString("server.master") == Yes } +// 该ID的节点是否为主节点 +func IsMasterNode(id string) bool { + curNode, _ := GetCurrentNode() + node, _ := model.GetNode(bson.ObjectIdHex(id)) + return curNode.Id == node.Id +} + // 获取节点数据 func GetNodeData() (Data, error) { mac, err := GetMac() @@ -127,6 +174,7 @@ func UpdateNodeStatus() { // 如果记录的更新时间超过60秒,该节点被认为离线 if time.Now().Sub(data.UpdateTs) > 60*time.Second { // 在Redis中删除该节点 + if err := database.RedisClient.HDel("nodes", data.Mac); err != nil { log.Errorf(err.Error()) return @@ -214,29 +262,6 @@ func UpdateNodeData() { } } -// TODO: 查看节点PATH -func GetNodePath() { - //os.Environ() -} - -type NodeMessage struct { - // 通信类别 - Type string `json:"type"` - - // 任务相关 - TaskId string `json:"task_id"` // 任务ID - - // 日志相关 - LogPath string `json:"log_path"` // 日志路径 - Log string `json:"log"` // 日志 - - // 环境变量 - Env string `json:"env"` - - // 错误相关 - Error string `json:"error"` -} - func MasterNodeCallback(channel string, msgStr string) { // 反序列化 var msg NodeMessage @@ -247,10 +272,18 @@ func MasterNodeCallback(channel string, msgStr string) { } if msg.Type == constants.MsgTypeGetLog { + // 获取日志 fmt.Println(msg) time.Sleep(10 * time.Millisecond) ch := TaskLogChanMap.ChanBlocked(msg.TaskId) ch <- msg.Log + } else if msg.Type == constants.MsgTypeGetSystemInfo { + // 获取系统信息 + fmt.Println(msg) + time.Sleep(10 * time.Millisecond) + ch := SystemInfoChanMap.ChanBlocked(msg.NodeId) + sysInfoBytes, _ := json.Marshal(&msg.SysInfo) + ch <- string(sysInfoBytes) } } @@ -267,6 +300,7 @@ func WorkerNodeCallback(channel string, msgStr string) { if msg.Type == constants.MsgTypeGetLog { // 消息类型为获取日志 + // 发出的消息 msgSd := NodeMessage{ Type: constants.MsgTypeGetLog, TaskId: msg.TaskId, @@ -296,8 +330,32 @@ func WorkerNodeCallback(channel string, msgStr string) { return } } else if msg.Type == constants.MsgTypeCancelTask { + // 取消任务 ch := TaskExecChanMap.ChanBlocked(msg.TaskId) ch <- constants.TaskCancel + } else if msg.Type == constants.MsgTypeGetSystemInfo { + // 获取环境信息 + sysInfo, err := GetLocalSystemInfo() + if err != nil { + log.Errorf(err.Error()) + return + } + msgSd := NodeMessage{ + Type: constants.MsgTypeGetSystemInfo, + NodeId: msg.NodeId, + SysInfo: sysInfo, + } + msgSdBytes, err := json.Marshal(&msgSd) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return + } + fmt.Println(msgSd) + if err := database.Publish("nodes:master", string(msgSdBytes)); err != nil { + log.Errorf(err.Error()) + return + } } } @@ -313,6 +371,9 @@ func InitNodeService() error { return err } + // 首次更新节点数据(注册到Redis) + UpdateNodeData() + // 消息订阅 var sub database.Subscriber sub.Connect() diff --git a/backend/services/system.go b/backend/services/system.go index 6fc67102..8f724082 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -1,14 +1,22 @@ package services import ( + "crawlab/constants" + "crawlab/database" "crawlab/model" + "crawlab/utils" + "encoding/json" + "github.com/apex/log" "io/ioutil" "os" "path/filepath" "runtime" + "runtime/debug" "strings" ) +var SystemInfoChanMap = utils.NewChanMap() + var executableNameMap = map[string]string{ // python "python": "Python", @@ -49,7 +57,9 @@ func GetExecutables() (executables []model.Executable, err error) { for _, path := range pathValues { fileList, err := ioutil.ReadDir(path) if err != nil { - return executables, err + log.Errorf(err.Error()) + debug.PrintStack() + continue } for _, file := range fileList { @@ -71,13 +81,14 @@ func GetExecutables() (executables []model.Executable, err error) { return executables, nil } -func GetSystemInfo() (sysInfo model.SystemInfo, err error) { +func GetLocalSystemInfo() (sysInfo model.SystemInfo, err error) { executables, err := GetExecutables() if err != nil { return sysInfo, err } hostname, err := os.Hostname() if err != nil { + debug.PrintStack() return sysInfo, err } @@ -89,3 +100,39 @@ func GetSystemInfo() (sysInfo model.SystemInfo, err error) { Executables: executables, }, nil } + +func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { + // 发送消息 + msg := NodeMessage{ + Type: constants.MsgTypeGetSystemInfo, + NodeId: id, + } + + // 序列化 + msgBytes, _ := json.Marshal(&msg) + if err := database.Publish("nodes:"+id, string(msgBytes)); err != nil { + return model.SystemInfo{}, err + } + + // 通道 + ch := SystemInfoChanMap.ChanBlocked(id) + + // 等待响应,阻塞 + sysInfoStr := <-ch + + // 反序列化 + if err := json.Unmarshal([]byte(sysInfoStr), &sysInfo); err != nil { + return sysInfo, err + } + + return sysInfo, nil +} + +func GetSystemInfo(id string) (sysInfo model.SystemInfo, err error) { + if IsMasterNode(id) { + sysInfo, err = GetLocalSystemInfo() + } else { + sysInfo, err = GetRemoteSystemInfo(id) + } + return +}