修复子节点不能先注册的问题

This commit is contained in:
yaziming
2020-06-02 15:32:25 +08:00
parent 93c06d0464
commit eed50ba38f
4 changed files with 24 additions and 32 deletions

View File

@@ -39,8 +39,6 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
continue
}
case redis.Subscription:
fmt.Println(msg)
if msg.Count == 0 {
// all channels are unsubscribed
return

View File

@@ -11,7 +11,6 @@ import (
"crawlab/routes"
"crawlab/services"
"crawlab/services/challenge"
"crawlab/services/local_node"
"crawlab/services/rpc"
"github.com/apex/log"
"github.com/gin-gonic/gin"
@@ -74,8 +73,10 @@ func main() {
panic(err)
}
log.Info("initialized log successfully") // 初始化日志设置
if err := local_node.InitLocalNodeInfo(); err != nil {
log.Error("init local node error:" + err.Error())
// 初始化节点服务
if err := services.InitNodeService(); err != nil {
log.Error("init node service error:" + err.Error())
panic(err)
}
log.Info("initialized local node successfully")
@@ -131,13 +132,6 @@ func main() {
}
log.Info("initialized task executor successfully")
// 初始化节点服务
if err := services.InitNodeService(); err != nil {
log.Error("init node service error:" + err.Error())
panic(err)
}
log.Info("initialized node service successfully")
// 初始化爬虫服务
if err := services.InitSpiderService(); err != nil {
log.Error("init spider service error:" + err.Error())

View File

@@ -12,21 +12,14 @@ func CurrentNode() *model.Node {
return GetLocalNode().Current()
}
func InitLocalNodeInfo() (err error) {
func InitLocalNode() (node *LocalNode, err error) {
registerType := viper.GetString("server.register.type")
ip := viper.GetString("server.register.ip")
customNodeName := viper.GetString("server.register.customNodeName")
localNode, err = NewLocalNode(ip, customNodeName, registerType)
if err != nil {
return err
return nil, err
}
if model.IsMaster() {
err = model.UpdateMasterNodeInfo(localNode.Identify, localNode.Ip, localNode.Mac, localNode.Hostname)
if err != nil {
return err
}
}
return localNode.Ready()
return localNode, err
}

View File

@@ -226,6 +226,11 @@ func WorkerNodeCallback(message redis.Message) (err error) {
// 初始化节点服务
func InitNodeService() error {
node, err := local_node.InitLocalNode()
if err != nil {
return err
}
// 构造定时任务
c := cron.New(cron.WithSeconds())
@@ -239,23 +244,25 @@ func InitNodeService() error {
// 首次更新节点数据注册到Redis
UpdateNodeData()
// 获取当前节点
//node, err := model.GetCurrentNode()
//
//if err != nil {
// log.Errorf(err.Error())
// return err
//}
node := local_node.CurrentNode()
err = node.Ready()
if err != nil {
return err
}
if model.IsMaster() {
err = model.UpdateMasterNodeInfo(node.Identify, node.Ip, node.Mac, node.Hostname)
if err != nil {
return err
}
// 如果为主节点,订阅主节点通信频道
if err := database.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil {
return err
}
} else {
// 若为工作节点,订阅单独指定通信频道
channel := constants.ChannelWorkerNode + node.Id.Hex()
channel := constants.ChannelWorkerNode + node.Current().Id.Hex()
if err := database.Sub(channel, WorkerNodeCallback); err != nil {
return err
}
@@ -276,7 +283,7 @@ func InitNodeService() error {
}
// 更新在当前节点执行中的任务状态为abnormal
if err := model.UpdateTaskToAbnormal(node.Id); err != nil {
if err := model.UpdateTaskToAbnormal(node.Current().Id); err != nil {
debug.PrintStack()
return err
}