From 494601ab625a53aa91e9cc3ce7c4cb5d45e84741 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Sat, 31 Aug 2019 13:49:34 +0800 Subject: [PATCH] =?UTF-8?q?fix=20=E8=8A=82=E7=82=B9=E6=B3=A8=E5=86=8C?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=83=85=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/node.go | 47 +++++++++++++++ backend/model/node_test.go | 50 ++++++++++++++++ backend/services/node.go | 116 ++++++++++++++----------------------- backend/services/spider.go | 4 +- 4 files changed, 143 insertions(+), 74 deletions(-) create mode 100644 backend/model/node_test.go diff --git a/backend/model/node.go b/backend/model/node.go index 61c20473..6211115c 100644 --- a/backend/model/node.go +++ b/backend/model/node.go @@ -1,7 +1,9 @@ package model import ( + "crawlab/constants" "crawlab/database" + "crawlab/services/register" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" @@ -79,6 +81,7 @@ func GetNodeList(filter interface{}) ([]Node, error) { var results []Node if err := c.Find(filter).All(&results); err != nil { + log.Error("get node list error: " + err.Error()) debug.PrintStack() return results, err } @@ -153,3 +156,47 @@ func GetNodeCount(query interface{}) (int, error) { return count, nil } + +// 节点基本信息 +func GetNodeBaseInfo() (ip string, mac string, key string, error error) { + ip, err := register.GetRegister().GetIp() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + + mac, err = register.GetRegister().GetMac() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + + key, err = register.GetRegister().GetKey() + if err != nil { + debug.PrintStack() + return "", "", "", err + } + return ip, mac, key, nil +} + +// 根据redis的key值,重置node节点为offline +func ResetNodeStatusToOffline(list []string) { + nodes, _ := GetNodeList(nil) + for _, node := range nodes { + hasNode := false + for _, key := range list { + if key == node.Key { + hasNode = true + break + } + } + if !hasNode || node.Status == "" { + node.Status = constants.StatusOffline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return + } + continue + } + } +} diff --git a/backend/model/node_test.go b/backend/model/node_test.go new file mode 100644 index 00000000..ba3f4aaa --- /dev/null +++ b/backend/model/node_test.go @@ -0,0 +1,50 @@ +package model + +import ( + "crawlab/config" + "crawlab/constants" + "crawlab/database" + "github.com/apex/log" + . "github.com/smartystreets/goconvey/convey" + "runtime/debug" + "testing" +) + +func TestAddNode(t *testing.T) { + Convey("Test AddNode", t, func() { + if err := config.InitConfig("../conf/config.yml"); err != nil { + log.Error("init config error:" + err.Error()) + panic(err) + } + log.Info("初始化配置成功") + + // 初始化Mongodb数据库 + if err := database.InitMongo(); err != nil { + log.Error("init mongodb error:" + err.Error()) + debug.PrintStack() + panic(err) + } + log.Info("初始化Mongodb数据库成功") + + // 初始化Redis数据库 + if err := database.InitRedis(); err != nil { + log.Error("init redis error:" + err.Error()) + debug.PrintStack() + panic(err) + } + + var node = Node{ + Key: "c4:b3:01:bd:b5:e7", + Name: "10.27.238.101", + Ip: "10.27.238.101", + Port: "8000", + Mac: "c4:b3:01:bd:b5:e7", + Status: constants.StatusOnline, + IsMaster: true, + } + if err := node.Add(); err != nil { + log.Error("add node error:" + err.Error()) + panic(err) + } + }) +} diff --git a/backend/services/node.go b/backend/services/node.go index 083fdc3d..3ed84149 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -73,23 +73,11 @@ func GetCurrentNode() (model.Node, error) { if err != nil { // 如果为主节点,表示为第一次注册,插入节点信息 if IsMaster() { - // 获取本机IP地址 - ip, err := register.GetRegister().GetIp() + // 获取本机信息 + ip, mac, key, err := model.GetNodeBaseInfo() if err != nil { debug.PrintStack() - return model.Node{}, err - } - - mac, err := register.GetRegister().GetMac() - if err != nil { - debug.PrintStack() - return model.Node{}, err - } - - key, err := register.GetRegister().GetKey() - if err != nil { - debug.PrintStack() - return model.Node{}, err + return node, err } // 生成节点 @@ -179,70 +167,56 @@ func UpdateNodeStatus() { log.Errorf(err.Error()) return } - // 在MongoDB中该节点设置状态为离线 - s, c := database.GetCol("nodes") - defer s.Close() - var node model.Node - if err := c.Find(bson.M{"key": key}).One(&node); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - node.Status = constants.StatusOffline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } + keys, _ := database.RedisClient.HKeys("nodes") + model.ResetNodeStatusToOffline(keys) continue } - // 更新节点信息到数据库 - s, c := database.GetCol("nodes") - defer s.Close() - var node model.Node - if err := c.Find(bson.M{"key": key}).One(&node); err != nil { - // 数据库不存在该节点 - node = model.Node{ - Key: key, - Name: data.Ip, - Ip: data.Ip, - Port: "8000", - Mac: data.Mac, - Status: constants.StatusOnline, - IsMaster: data.Master, - } - if err := node.Add(); err != nil { - log.Errorf(err.Error()) - return - } - } else { - // 数据库存在该节点 - node.Status = constants.StatusOnline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } + // 处理node信息 + handleNodeInfo(key, data) + } + + // 重置不在redis的key为offline + model.ResetNodeStatusToOffline(list) +} + +func handleNodeInfo(key string, data Data) { + // 更新节点信息到数据库 + s, c := database.GetCol("nodes") + defer s.Close() + + // 同个key可能因为并发,被注册多次 + var nodes []model.Node + _ = c.Find(bson.M{"key": key}).All(&nodes) + if nodes != nil && len(nodes) > 1 { + for _, node := range nodes { + _ = c.RemoveId(node.Id) } } - // 遍历数据库中的节点列表 - nodes, err := model.GetNodeList(nil) - for _, node := range nodes { - hasNode := false - for _, key := range list { - if key == node.Key { - hasNode = true - break - } + var node model.Node + if err := c.Find(bson.M{"key": key}).One(&node); err != nil { + // 数据库不存在该节点 + node = model.Node{ + Key: key, + Name: data.Ip, + Ip: data.Ip, + Port: "8000", + Mac: data.Mac, + Status: constants.StatusOnline, + IsMaster: data.Master, } - if !hasNode { - node.Status = constants.StatusOffline - if err := node.Save(); err != nil { - log.Errorf(err.Error()) - return - } - continue + if err := node.Add(); err != nil { + log.Errorf(err.Error()) + return + } + } else { + // 数据库存在该节点 + node.Status = constants.StatusOnline + if err := node.Save(); err != nil { + log.Errorf(err.Error()) + return } } } diff --git a/backend/services/spider.go b/backend/services/spider.go index ad0c0ae5..a3242849 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -40,7 +40,7 @@ func GetSpidersFromDir() ([]model.Spider, error) { // 如果爬虫项目目录不存在,则创建一个 if !utils.Exists(srcPath) { - mask := syscall.Umask(0) // 改为 0000 八进制 + mask := syscall.Umask(0) // 改为 0000 八进制 defer syscall.Umask(mask) // 改为原来的 umask if err := os.MkdirAll(srcPath, 0666); err != nil { debug.PrintStack() @@ -301,7 +301,6 @@ func PublishSpider(spider model.Spider) (err error) { return } channel := "files:upload" - log.Info("publish files.upload event, file id:" + msg.FileId) if err = database.Publish(channel, string(msgStr)); err != nil { log.Errorf(err.Error()) debug.PrintStack() @@ -313,7 +312,6 @@ func PublishSpider(spider model.Spider) (err error) { // 上传爬虫回调 func OnFileUpload(channel string, msgStr string) { - log.Info("received files.upload event, msgStr:" + msgStr) s, gf := database.GetGridFs("files") defer s.Close()