mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
fix 节点注册异常情况
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"crawlab/services/register"
|
||||
"github.com/apex/log"
|
||||
"github.com/globalsign/mgo"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
@@ -79,6 +81,7 @@ func GetNodeList(filter interface{}) ([]Node, error) {
|
||||
|
||||
var results []Node
|
||||
if err := c.Find(filter).All(&results); err != nil {
|
||||
log.Error("get node list error: " + err.Error())
|
||||
debug.PrintStack()
|
||||
return results, err
|
||||
}
|
||||
@@ -153,3 +156,47 @@ func GetNodeCount(query interface{}) (int, error) {
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// 节点基本信息
|
||||
func GetNodeBaseInfo() (ip string, mac string, key string, error error) {
|
||||
ip, err := register.GetRegister().GetIp()
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return "", "", "", err
|
||||
}
|
||||
|
||||
mac, err = register.GetRegister().GetMac()
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return "", "", "", err
|
||||
}
|
||||
|
||||
key, err = register.GetRegister().GetKey()
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return "", "", "", err
|
||||
}
|
||||
return ip, mac, key, nil
|
||||
}
|
||||
|
||||
// 根据redis的key值,重置node节点为offline
|
||||
func ResetNodeStatusToOffline(list []string) {
|
||||
nodes, _ := GetNodeList(nil)
|
||||
for _, node := range nodes {
|
||||
hasNode := false
|
||||
for _, key := range list {
|
||||
if key == node.Key {
|
||||
hasNode = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasNode || node.Status == "" {
|
||||
node.Status = constants.StatusOffline
|
||||
if err := node.Save(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
50
backend/model/node_test.go
Normal file
50
backend/model/node_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"crawlab/config"
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"github.com/apex/log"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"runtime/debug"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAddNode(t *testing.T) {
|
||||
Convey("Test AddNode", t, func() {
|
||||
if err := config.InitConfig("../conf/config.yml"); err != nil {
|
||||
log.Error("init config error:" + err.Error())
|
||||
panic(err)
|
||||
}
|
||||
log.Info("初始化配置成功")
|
||||
|
||||
// 初始化Mongodb数据库
|
||||
if err := database.InitMongo(); err != nil {
|
||||
log.Error("init mongodb error:" + err.Error())
|
||||
debug.PrintStack()
|
||||
panic(err)
|
||||
}
|
||||
log.Info("初始化Mongodb数据库成功")
|
||||
|
||||
// 初始化Redis数据库
|
||||
if err := database.InitRedis(); err != nil {
|
||||
log.Error("init redis error:" + err.Error())
|
||||
debug.PrintStack()
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var node = Node{
|
||||
Key: "c4:b3:01:bd:b5:e7",
|
||||
Name: "10.27.238.101",
|
||||
Ip: "10.27.238.101",
|
||||
Port: "8000",
|
||||
Mac: "c4:b3:01:bd:b5:e7",
|
||||
Status: constants.StatusOnline,
|
||||
IsMaster: true,
|
||||
}
|
||||
if err := node.Add(); err != nil {
|
||||
log.Error("add node error:" + err.Error())
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -73,23 +73,11 @@ func GetCurrentNode() (model.Node, error) {
|
||||
if err != nil {
|
||||
// 如果为主节点,表示为第一次注册,插入节点信息
|
||||
if IsMaster() {
|
||||
// 获取本机IP地址
|
||||
ip, err := register.GetRegister().GetIp()
|
||||
// 获取本机信息
|
||||
ip, mac, key, err := model.GetNodeBaseInfo()
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return model.Node{}, err
|
||||
}
|
||||
|
||||
mac, err := register.GetRegister().GetMac()
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return model.Node{}, err
|
||||
}
|
||||
|
||||
key, err := register.GetRegister().GetKey()
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return model.Node{}, err
|
||||
return node, err
|
||||
}
|
||||
|
||||
// 生成节点
|
||||
@@ -179,70 +167,56 @@ func UpdateNodeStatus() {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 在MongoDB中该节点设置状态为离线
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
var node model.Node
|
||||
if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
node.Status = constants.StatusOffline
|
||||
if err := node.Save(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
keys, _ := database.RedisClient.HKeys("nodes")
|
||||
model.ResetNodeStatusToOffline(keys)
|
||||
continue
|
||||
}
|
||||
|
||||
// 更新节点信息到数据库
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
var node model.Node
|
||||
if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
|
||||
// 数据库不存在该节点
|
||||
node = model.Node{
|
||||
Key: key,
|
||||
Name: data.Ip,
|
||||
Ip: data.Ip,
|
||||
Port: "8000",
|
||||
Mac: data.Mac,
|
||||
Status: constants.StatusOnline,
|
||||
IsMaster: data.Master,
|
||||
}
|
||||
if err := node.Add(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// 数据库存在该节点
|
||||
node.Status = constants.StatusOnline
|
||||
if err := node.Save(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
// 处理node信息
|
||||
handleNodeInfo(key, data)
|
||||
}
|
||||
|
||||
// 重置不在redis的key为offline
|
||||
model.ResetNodeStatusToOffline(list)
|
||||
}
|
||||
|
||||
func handleNodeInfo(key string, data Data) {
|
||||
// 更新节点信息到数据库
|
||||
s, c := database.GetCol("nodes")
|
||||
defer s.Close()
|
||||
|
||||
// 同个key可能因为并发,被注册多次
|
||||
var nodes []model.Node
|
||||
_ = c.Find(bson.M{"key": key}).All(&nodes)
|
||||
if nodes != nil && len(nodes) > 1 {
|
||||
for _, node := range nodes {
|
||||
_ = c.RemoveId(node.Id)
|
||||
}
|
||||
}
|
||||
|
||||
// 遍历数据库中的节点列表
|
||||
nodes, err := model.GetNodeList(nil)
|
||||
for _, node := range nodes {
|
||||
hasNode := false
|
||||
for _, key := range list {
|
||||
if key == node.Key {
|
||||
hasNode = true
|
||||
break
|
||||
}
|
||||
var node model.Node
|
||||
if err := c.Find(bson.M{"key": key}).One(&node); err != nil {
|
||||
// 数据库不存在该节点
|
||||
node = model.Node{
|
||||
Key: key,
|
||||
Name: data.Ip,
|
||||
Ip: data.Ip,
|
||||
Port: "8000",
|
||||
Mac: data.Mac,
|
||||
Status: constants.StatusOnline,
|
||||
IsMaster: data.Master,
|
||||
}
|
||||
if !hasNode {
|
||||
node.Status = constants.StatusOffline
|
||||
if err := node.Save(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
continue
|
||||
if err := node.Add(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// 数据库存在该节点
|
||||
node.Status = constants.StatusOnline
|
||||
if err := node.Save(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ func GetSpidersFromDir() ([]model.Spider, error) {
|
||||
|
||||
// 如果爬虫项目目录不存在,则创建一个
|
||||
if !utils.Exists(srcPath) {
|
||||
mask := syscall.Umask(0) // 改为 0000 八进制
|
||||
mask := syscall.Umask(0) // 改为 0000 八进制
|
||||
defer syscall.Umask(mask) // 改为原来的 umask
|
||||
if err := os.MkdirAll(srcPath, 0666); err != nil {
|
||||
debug.PrintStack()
|
||||
@@ -301,7 +301,6 @@ func PublishSpider(spider model.Spider) (err error) {
|
||||
return
|
||||
}
|
||||
channel := "files:upload"
|
||||
log.Info("publish files.upload event, file id:" + msg.FileId)
|
||||
if err = database.Publish(channel, string(msgStr)); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
@@ -313,7 +312,6 @@ func PublishSpider(spider model.Spider) (err error) {
|
||||
|
||||
// 上传爬虫回调
|
||||
func OnFileUpload(channel string, msgStr string) {
|
||||
log.Info("received files.upload event, msgStr:" + msgStr)
|
||||
s, gf := database.GetGridFs("files")
|
||||
defer s.Close()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user