From 14ab3b92089755734bbf07cc8ab3111b3d5ef612 Mon Sep 17 00:00:00 2001 From: marvzhang Date: Thu, 2 Jul 2020 12:07:07 +0800 Subject: [PATCH] fixed worker node unable to register issue --- backend/services/node.go | 45 +++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/backend/services/node.go b/backend/services/node.go index 827097b7..42a664c4 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -4,7 +4,6 @@ import ( "crawlab/constants" "crawlab/database" "crawlab/entity" - "crawlab/lib/cron" "crawlab/model" "crawlab/services/local_node" "crawlab/services/msg_handler" @@ -216,6 +215,22 @@ func WorkerNodeCallback(message redis.Message) (err error) { return nil } +// 发送心跳信息到Redis,每5秒发送一次 +func SendHeartBeat() { + for { + UpdateNodeData() + time.Sleep(5 * time.Second) + } +} + +// 每10秒刷新一次节点信息 +func UpdateNodeStatusPeriodically() { + for { + UpdateNodeStatus() + time.Sleep(10 * time.Second) + } +} + // 初始化节点服务 func InitNodeService() error { node, err := local_node.InitLocalNode() @@ -223,15 +238,8 @@ func InitNodeService() error { return err } - // 构造定时任务 - c := cron.New(cron.WithSeconds()) - // 每5秒更新一次本节点信息 - spec := "0/5 * * * * *" - if _, err := c.AddFunc(spec, UpdateNodeData); err != nil { - debug.PrintStack() - return err - } + go SendHeartBeat() // 首次更新节点数据(注册到Redis) UpdateNodeData() @@ -241,12 +249,17 @@ func InitNodeService() error { return err } } - err = node.Ready() - if err != nil { + // 节点准备完毕 + if err = node.Ready(); err != nil { return err } + // 如果为主节点,每10秒刷新所有节点信息 + if model.IsMaster() { + go UpdateNodeStatusPeriodically() + } + if model.IsMaster() { // 如果为主节点,订阅主节点通信频道 if err := database.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil { @@ -265,21 +278,11 @@ func InitNodeService() error { return err } - // 如果为主节点,每10秒刷新所有节点信息 - if model.IsMaster() { - spec := "*/10 * * * * *" - if _, err := c.AddFunc(spec, UpdateNodeStatus); err != nil { - debug.PrintStack() - return err - } - } - // 更新在当前节点执行中的任务状态为:abnormal if err := model.UpdateTaskToAbnormal(node.Current().Id); err != nil { debug.PrintStack() return err } - c.Start() return nil }