增加当前节点本地定时缓存,修改部分潜在BUG,启动时Mongo或者redis无法正常连接时,进入启动等待

This commit is contained in:
yaziming
2020-05-23 15:15:39 +08:00
parent 962daab361
commit ff9c9d57ef
353 changed files with 23433 additions and 107516 deletions

View File

@@ -0,0 +1,72 @@
package local_node
import (
"crawlab/model"
"github.com/apex/log"
"github.com/cenkalti/backoff/v4"
"go.uber.org/atomic"
"sync"
"time"
)
var localNode *LocalNode
var locker atomic.Int32
var once sync.Once
type LocalNode struct {
node *model.Node
sync.RWMutex
}
func (n *LocalNode) load(retry bool) (err error) {
n.Lock()
defer n.Unlock()
var node model.Node
if retry {
b := backoff.NewConstantBackOff(1 * time.Second)
err = backoff.Retry(func() error {
node, err = model.GetCurrentNode()
if err != nil {
log.WithError(err).Warnf("Get current node info from database failed. Will after %f seconds, try again.", b.NextBackOff().Seconds())
}
return err
}, b)
} else {
node, err = model.GetCurrentNode()
}
if err != nil {
return
}
n.node = &node
return nil
}
func (n *LocalNode) watch() {
timer := time.NewTicker(time.Second * 5)
for range timer.C {
if locker.CAS(0, 1) {
err := n.load(false)
if err != nil {
log.WithError(err).Errorf("load current node from database failed,")
}
locker.Store(0)
}
continue
}
}
func (n *LocalNode) Current() *model.Node {
n.RLock()
defer n.RUnlock()
return n.node
}
func CurrentNode() *model.Node {
once.Do(func() {
localNode = &LocalNode{}
_ = localNode.load(true)
go localNode.watch()
})
return localNode.Current()
}

View File

@@ -6,6 +6,7 @@ import (
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/local_node"
"crawlab/services/msg_handler"
"crawlab/services/register"
"crawlab/utils"
@@ -33,7 +34,8 @@ type Data struct {
// 所有调用IsMasterNode的方法都永远会在master节点执行所以GetCurrentNode方法返回永远是master节点
// 该ID的节点是否为主节点
func IsMasterNode(id string) bool {
curNode, _ := model.GetCurrentNode()
curNode := local_node.CurrentNode()
//curNode, _ := model.GetCurrentNode()
node, _ := model.GetNode(bson.ObjectIdHex(id))
return curNode.Id == node.Id
}
@@ -268,11 +270,13 @@ func InitNodeService() error {
UpdateNodeData()
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
return err
}
//node, err := model.GetCurrentNode()
//
//if err != nil {
// log.Errorf(err.Error())
// return err
//}
node := local_node.CurrentNode()
if model.IsMaster() {
// 如果为主节点,订阅主节点通信频道
@@ -310,3 +314,12 @@ func InitNodeService() error {
c.Start()
return nil
}
func InitMasterNodeInfo() (err error) {
// 获取本机信息
ip, mac, hostname, key, err := model.GetNodeBaseInfo()
if err != nil {
debug.PrintStack()
return err
}
return model.UpdateMasterNodeInfo(key, ip, mac, hostname)
}

View File

@@ -5,11 +5,13 @@ import (
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/services/local_node"
"crawlab/utils"
"encoding/json"
"errors"
"fmt"
"github.com/apex/log"
"github.com/cenkalti/backoff/v4"
"github.com/gomodule/redigo/redis"
uuid "github.com/satori/go.uuid"
"runtime/debug"
@@ -77,7 +79,7 @@ func GetService(msg entity.RpcMessage) Service {
}
// 处理RPC消息
func handleMsg(msgStr string, node model.Node) {
func handleMsg(msgStr string, node *model.Node) {
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
@@ -107,23 +109,29 @@ func InitRpcService() error {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
node := local_node.CurrentNode()
//node, err := model.GetCurrentNode()
//if err != nil {
// log.Errorf(err.Error())
// debug.PrintStack()
// continue
//}
b := backoff.NewExponentialBackOff()
bp := backoff.WithMaxRetries(b, 10)
var msgStr string
var err error
err = backoff.Retry(func() error {
msgStr, err = database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
// 获取获取消息队列信息
msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
if err != nil && err != redis.ErrNil {
log.WithError(err).Warnf("waiting for redis pool active connection. will after %f seconds try again.", b.NextBackOff().Seconds())
return err
}
return err
}, bp)
if err != nil && err != redis.ErrNil {
continue
}
// 处理消息
go handleMsg(msgStr, node)
}

View File

@@ -4,6 +4,7 @@ import (
"crawlab/constants"
"crawlab/database"
"crawlab/model"
"crawlab/services/local_node"
"crawlab/utils"
"fmt"
"github.com/apex/log"
@@ -76,7 +77,9 @@ func (s *SpiderSync) RemoveDownCreate(md5 string) {
// 获得下载锁的key
func (s *SpiderSync) GetLockDownloadKey(spiderId string) string {
node, _ := model.GetCurrentNode()
//node, _ := model.GetCurrentNode()
node := local_node.CurrentNode()
return node.Id.Hex() + "#" + spiderId
}

View File

@@ -7,6 +7,7 @@ import (
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/local_node"
"crawlab/services/notification"
"crawlab/services/spider_handler"
"crawlab/utils"
@@ -503,18 +504,20 @@ func ExecuteTask(id int) {
tic := time.Now()
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf("execute task get current node error: %s", err.Error())
debug.PrintStack()
return
}
//node, err := model.GetCurrentNode()
//if err != nil {
// log.Errorf("execute task get current node error: %s", err.Error())
// debug.PrintStack()
// return
//}
node := local_node.CurrentNode()
// 节点队列
queueCur := "tasks:node:" + node.Id.Hex()
// 节点队列任务
var msg string
var err error
if msg, err = database.RedisClient.LPop(queueCur); err != nil {
// 节点队列没有任务,获取公共队列任务
queuePub := "tasks:public"
@@ -765,12 +768,13 @@ func CancelTask(id string) (err error) {
}
// 获取当前节点(默认当前节点为主节点)
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf("get current node error: %s", err.Error())
debug.PrintStack()
return err
}
//node, err := model.GetCurrentNode()
//if err != nil {
// log.Errorf("get current node error: %s", err.Error())
// debug.PrintStack()
// return err
//}
node := local_node.CurrentNode()
log.Infof("current node id is: %s", node.Id.Hex())
log.Infof("task node id is: %s", task.NodeId.Hex())