mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
Merge remote-tracking branch 'origin/develop' into develop
This commit is contained in:
@@ -58,9 +58,9 @@ func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...s
|
||||
}
|
||||
done <- nil
|
||||
case <-tick.C:
|
||||
//fmt.Printf("ping message \n")
|
||||
if err := psc.Ping(""); err != nil {
|
||||
done <- err
|
||||
fmt.Printf("ping message error: %s \n", err)
|
||||
//done <- err
|
||||
}
|
||||
case err := <-done:
|
||||
close(done)
|
||||
|
||||
@@ -167,27 +167,34 @@ func UpdateNodeData() {
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
// 构造节点数据
|
||||
data := Data{
|
||||
Key: key,
|
||||
Mac: mac,
|
||||
Ip: ip,
|
||||
Master: model.IsMaster(),
|
||||
UpdateTs: time.Now(),
|
||||
UpdateTsUnix: time.Now().Unix(),
|
||||
|
||||
//先获取所有Redis的nodekey
|
||||
list, _ := database.RedisClient.HKeys("nodes")
|
||||
|
||||
if i := utils.Contains(list, key); i == false {
|
||||
// 构造节点数据
|
||||
data := Data{
|
||||
Key: key,
|
||||
Mac: mac,
|
||||
Ip: ip,
|
||||
Master: model.IsMaster(),
|
||||
UpdateTs: time.Now(),
|
||||
UpdateTsUnix: time.Now().Unix(),
|
||||
}
|
||||
|
||||
// 注册节点到Redis
|
||||
dataBytes, err := json.Marshal(&data)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
if err := database.RedisClient.HSet("nodes", key, utils.BytesToString(dataBytes)); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 注册节点到Redis
|
||||
dataBytes, err := json.Marshal(&data)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
if err := database.RedisClient.HSet("nodes", key, utils.BytesToString(dataBytes)); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func MasterNodeCallback(message redis.Message) (err error) {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/apex/log"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"io"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"unsafe"
|
||||
)
|
||||
@@ -40,3 +41,20 @@ func Close(c io.Closer) {
|
||||
//log.WithError(err).Error("关闭资源文件失败。")
|
||||
}
|
||||
}
|
||||
|
||||
func Contains(array interface{}, val interface{}) (fla bool) {
|
||||
fla = false
|
||||
switch reflect.TypeOf(array).Kind() {
|
||||
case reflect.Slice:
|
||||
{
|
||||
s := reflect.ValueOf(array)
|
||||
for i := 0; i < s.Len(); i++ {
|
||||
if reflect.DeepEqual(val, s.Index(i).Interface()) {
|
||||
fla = true
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user