mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
fixed worker node unable to register issue
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"crawlab/entity"
|
||||
"crawlab/lib/cron"
|
||||
"crawlab/model"
|
||||
"crawlab/services/local_node"
|
||||
"crawlab/services/msg_handler"
|
||||
@@ -216,6 +215,22 @@ func WorkerNodeCallback(message redis.Message) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 发送心跳信息到Redis,每5秒发送一次
|
||||
func SendHeartBeat() {
|
||||
for {
|
||||
UpdateNodeData()
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// 每10秒刷新一次节点信息
|
||||
func UpdateNodeStatusPeriodically() {
|
||||
for {
|
||||
UpdateNodeStatus()
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// 初始化节点服务
|
||||
func InitNodeService() error {
|
||||
node, err := local_node.InitLocalNode()
|
||||
@@ -223,15 +238,8 @@ func InitNodeService() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 构造定时任务
|
||||
c := cron.New(cron.WithSeconds())
|
||||
|
||||
// 每5秒更新一次本节点信息
|
||||
spec := "0/5 * * * * *"
|
||||
if _, err := c.AddFunc(spec, UpdateNodeData); err != nil {
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
go SendHeartBeat()
|
||||
|
||||
// 首次更新节点数据(注册到Redis)
|
||||
UpdateNodeData()
|
||||
@@ -241,12 +249,17 @@ func InitNodeService() error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = node.Ready()
|
||||
|
||||
if err != nil {
|
||||
// 节点准备完毕
|
||||
if err = node.Ready(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 如果为主节点,每10秒刷新所有节点信息
|
||||
if model.IsMaster() {
|
||||
go UpdateNodeStatusPeriodically()
|
||||
}
|
||||
|
||||
if model.IsMaster() {
|
||||
// 如果为主节点,订阅主节点通信频道
|
||||
if err := database.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil {
|
||||
@@ -265,21 +278,11 @@ func InitNodeService() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 如果为主节点,每10秒刷新所有节点信息
|
||||
if model.IsMaster() {
|
||||
spec := "*/10 * * * * *"
|
||||
if _, err := c.AddFunc(spec, UpdateNodeStatus); err != nil {
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// 更新在当前节点执行中的任务状态为:abnormal
|
||||
if err := model.UpdateTaskToAbnormal(node.Current().Id); err != nil {
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
|
||||
c.Start()
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user