diff --git a/backend/database/pubsub.go b/backend/database/pubsub.go index fcb80b3e..f9eae535 100644 --- a/backend/database/pubsub.go +++ b/backend/database/pubsub.go @@ -39,8 +39,6 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s continue } case redis.Subscription: - fmt.Println(msg) - if msg.Count == 0 { // all channels are unsubscribed return diff --git a/backend/main.go b/backend/main.go index 19846a03..4b46b033 100644 --- a/backend/main.go +++ b/backend/main.go @@ -11,7 +11,6 @@ import ( "crawlab/routes" "crawlab/services" "crawlab/services/challenge" - "crawlab/services/local_node" "crawlab/services/rpc" "github.com/apex/log" "github.com/gin-gonic/gin" @@ -74,8 +73,10 @@ func main() { panic(err) } log.Info("initialized log successfully") // 初始化日志设置 - if err := local_node.InitLocalNodeInfo(); err != nil { - log.Error("init local node error:" + err.Error()) + + // 初始化节点服务 + if err := services.InitNodeService(); err != nil { + log.Error("init node service error:" + err.Error()) panic(err) } log.Info("initialized local node successfully") @@ -131,13 +132,6 @@ func main() { } log.Info("initialized task executor successfully") - // 初始化节点服务 - if err := services.InitNodeService(); err != nil { - log.Error("init node service error:" + err.Error()) - panic(err) - } - log.Info("initialized node service successfully") - // 初始化爬虫服务 if err := services.InitSpiderService(); err != nil { log.Error("init spider service error:" + err.Error()) diff --git a/backend/services/local_node/local_node.go b/backend/services/local_node/local_node.go index 9b1e1229..ad1321ca 100644 --- a/backend/services/local_node/local_node.go +++ b/backend/services/local_node/local_node.go @@ -12,21 +12,14 @@ func CurrentNode() *model.Node { return GetLocalNode().Current() } -func InitLocalNodeInfo() (err error) { +func InitLocalNode() (node *LocalNode, err error) { registerType := viper.GetString("server.register.type") ip := viper.GetString("server.register.ip") customNodeName := viper.GetString("server.register.customNodeName") localNode, err = NewLocalNode(ip, customNodeName, registerType) if err != nil { - return err + return nil, err } - if model.IsMaster() { - err = model.UpdateMasterNodeInfo(localNode.Identify, localNode.Ip, localNode.Mac, localNode.Hostname) - - if err != nil { - return err - } - } - return localNode.Ready() + return localNode, err } diff --git a/backend/services/node.go b/backend/services/node.go index 0cfcebb2..95c326f3 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -226,6 +226,11 @@ func WorkerNodeCallback(message redis.Message) (err error) { // 初始化节点服务 func InitNodeService() error { + node, err := local_node.InitLocalNode() + if err != nil { + return err + } + // 构造定时任务 c := cron.New(cron.WithSeconds()) @@ -239,23 +244,25 @@ func InitNodeService() error { // 首次更新节点数据(注册到Redis) UpdateNodeData() - // 获取当前节点 - //node, err := model.GetCurrentNode() - // - //if err != nil { - // log.Errorf(err.Error()) - // return err - //} - node := local_node.CurrentNode() + err = node.Ready() + + if err != nil { + return err + } if model.IsMaster() { + err = model.UpdateMasterNodeInfo(node.Identify, node.Ip, node.Mac, node.Hostname) + + if err != nil { + return err + } // 如果为主节点,订阅主节点通信频道 if err := database.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil { return err } } else { // 若为工作节点,订阅单独指定通信频道 - channel := constants.ChannelWorkerNode + node.Id.Hex() + channel := constants.ChannelWorkerNode + node.Current().Id.Hex() if err := database.Sub(channel, WorkerNodeCallback); err != nil { return err } @@ -276,7 +283,7 @@ func InitNodeService() error { } // 更新在当前节点执行中的任务状态为:abnormal - if err := model.UpdateTaskToAbnormal(node.Id); err != nil { + if err := model.UpdateTaskToAbnormal(node.Current().Id); err != nil { debug.PrintStack() return err }