From 41556cab74c4decabf6c798c7bfb81749db77c4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Mon, 30 Sep 2019 12:09:37 +0800 Subject: [PATCH] =?UTF-8?q?fix=20=E5=88=A0=E9=99=A4=E7=88=AC=E8=99=AB?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/constants/channels.go | 9 ++++ backend/constants/message.go | 1 + backend/entity/node.go | 25 ++++++++++ backend/entity/system.go | 15 ++++++ backend/model/file.go | 2 +- backend/model/spider.go | 4 ++ backend/model/system.go | 23 ++------- backend/routes/spider.go | 28 +---------- backend/services/log.go | 26 +++++----- backend/services/msg_handler/handler.go | 33 ++++-------- backend/services/msg_handler/msg_log.go | 20 ++------ backend/services/msg_handler/msg_spider.go | 24 +++++++++ .../services/msg_handler/msg_system_info.go | 18 ++----- backend/services/msg_handler/msg_task.go | 3 +- backend/services/node.go | 33 ++++++------ backend/services/spider.go | 43 +++++++++++++++- backend/services/system.go | 10 ++-- backend/services/task.go | 4 +- backend/utils/file.go | 11 +++- backend/utils/helpers.go | 50 ++++++++++++++++++- 20 files changed, 240 insertions(+), 142 deletions(-) create mode 100644 backend/constants/channels.go create mode 100644 backend/entity/node.go create mode 100644 backend/entity/system.go create mode 100644 backend/services/msg_handler/msg_spider.go diff --git a/backend/constants/channels.go b/backend/constants/channels.go new file mode 100644 index 00000000..c38a5ac9 --- /dev/null +++ b/backend/constants/channels.go @@ -0,0 +1,9 @@ +package constants + +const ( + ChannelAllNode = "nodes:public" + + ChannelWorkerNode = "nodes:" + + ChannelMasterNode = "nodes:master" +) diff --git a/backend/constants/message.go b/backend/constants/message.go index f76e8fc3..72e5fab2 100644 --- a/backend/constants/message.go +++ b/backend/constants/message.go @@ -5,4 +5,5 @@ const ( MsgTypeGetSystemInfo = "get-sys-info" MsgTypeCancelTask = "cancel-task" MsgTypeRemoveLog = "remove-log" + MsgTypeRemoveSpider = "remove-spider" ) diff --git a/backend/entity/node.go b/backend/entity/node.go new file mode 100644 index 00000000..cf52fafb --- /dev/null +++ b/backend/entity/node.go @@ -0,0 +1,25 @@ +package entity + +type NodeMessage struct { + // 通信类别 + Type string `json:"type"` + + // 任务相关 + TaskId string `json:"task_id"` // 任务ID + + // 节点相关 + NodeId string `json:"node_id"` // 节点ID + + // 日志相关 + LogPath string `json:"log_path"` // 日志路径 + Log string `json:"log"` // 日志 + + // 系统信息 + SysInfo SystemInfo `json:"sys_info"` + + // 爬虫相关 + SpiderId string `json:"spider_id"` //爬虫ID + + // 错误相关 + Error string `json:"error"` +} diff --git a/backend/entity/system.go b/backend/entity/system.go new file mode 100644 index 00000000..dff637b7 --- /dev/null +++ b/backend/entity/system.go @@ -0,0 +1,15 @@ +package entity + +type SystemInfo struct { + ARCH string `json:"arch"` + OS string `json:"os"` + Hostname string `json:"host_name"` + NumCpu int `json:"num_cpu"` + Executables []Executable `json:"executables"` +} + +type Executable struct { + Path string `json:"path"` + FileName string `json:"file_name"` + DisplayName string `json:"display_name"` +} diff --git a/backend/model/file.go b/backend/model/file.go index 7aa88e3d..fe3ece0e 100644 --- a/backend/model/file.go +++ b/backend/model/file.go @@ -68,7 +68,7 @@ func RemoveFile(path string) error { debug.PrintStack() return nil } - if err := os.Remove(path); err != nil { + if err := os.RemoveAll(path); err != nil { return err } return nil diff --git a/backend/model/spider.go b/backend/model/spider.go index e63c5f57..93eafbc5 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -157,6 +157,7 @@ func GetSpider(id bson.ObjectId) (Spider, error) { var result Spider if err := c.FindId(id).One(&result); err != nil { if err != mgo.ErrNotFound { + log.Errorf("get spider error: %s, id: %id", err.Error(), id.Hex()) debug.PrintStack() } return result, err @@ -190,6 +191,8 @@ func RemoveSpider(id bson.ObjectId) error { } if err := c.RemoveId(id); err != nil { + log.Errorf("remove spider error: %s, id:%s", err.Error(), id.Hex()) + debug.PrintStack() return err } @@ -199,6 +202,7 @@ func RemoveSpider(id bson.ObjectId) error { if err := gf.RemoveId(result.FileId); err != nil { log.Error("remove file error, id:" + result.FileId.Hex()) + debug.PrintStack() return err } diff --git a/backend/model/system.go b/backend/model/system.go index 6091c963..5c2f5997 100644 --- a/backend/model/system.go +++ b/backend/model/system.go @@ -1,6 +1,7 @@ package model import ( + "crawlab/entity" "github.com/apex/log" "io/ioutil" "os" @@ -35,21 +36,7 @@ var executableNameMap = map[string]string{ "bash": "bash", } -type SystemInfo struct { - ARCH string `json:"arch"` - OS string `json:"os"` - Hostname string `json:"host_name"` - NumCpu int `json:"num_cpu"` - Executables []Executable `json:"executables"` -} - -type Executable struct { - Path string `json:"path"` - FileName string `json:"file_name"` - DisplayName string `json:"display_name"` -} - -func GetLocalSystemInfo() (sysInfo SystemInfo, err error) { +func GetLocalSystemInfo() (sysInfo entity.SystemInfo, err error) { executables, err := GetExecutables() if err != nil { return sysInfo, err @@ -60,7 +47,7 @@ func GetLocalSystemInfo() (sysInfo SystemInfo, err error) { return sysInfo, err } - return SystemInfo{ + return entity.SystemInfo{ ARCH: runtime.GOARCH, OS: runtime.GOOS, NumCpu: runtime.GOMAXPROCS(0), @@ -78,7 +65,7 @@ func GetPathValues() (paths []string) { return strings.Split(pathEnv, ":") } -func GetExecutables() (executables []Executable, err error) { +func GetExecutables() (executables []entity.Executable, err error) { pathValues := GetPathValues() cache := map[string]string{} @@ -97,7 +84,7 @@ func GetExecutables() (executables []Executable, err error) { if cache[filePath] == "" { if displayName != "" { - executables = append(executables, Executable{ + executables = append(executables, entity.Executable{ Path: filePath, FileName: file.Name(), DisplayName: displayName, diff --git a/backend/routes/spider.go b/backend/routes/spider.go index 064541bd..efd76c0b 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -199,33 +199,7 @@ func DeleteSpider(c *gin.Context) { return } - // 获取该爬虫 - spider, err := model.GetSpider(bson.ObjectIdHex(id)) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 删除爬虫文件目录 - if err := os.RemoveAll(spider.Src); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 从数据库中删除该爬虫 - if err := model.RemoveSpider(bson.ObjectIdHex(id)); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 删除日志文件 - if err := services.RemoveLogBySpiderId(spider.Id); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - // 删除爬虫对应的task任务 - if err := model.RemoveTaskBySpiderId(spider.Id); err != nil { + if err := services.RemoveSpider(id); err != nil { HandleError(http.StatusInternalServerError, c, err) return } diff --git a/backend/services/log.go b/backend/services/log.go index 95459f8f..485cb7dd 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -3,9 +3,9 @@ package services import ( "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/lib/cron" "crawlab/model" - "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" "github.com/apex/log" @@ -23,7 +23,7 @@ var TaskLogChanMap = utils.NewChanMap() // 获取远端日志 func GetRemoteLog(task model.Task) (logStr string, err error) { // 序列化消息 - msg := msg_handler.NodeMessage{ + msg := entity.NodeMessage{ Type: constants.MsgTypeGetLog, LogPath: task.LogPath, TaskId: task.Id, @@ -85,21 +85,16 @@ func RemoveLocalLog(path string) error { // 删除远程日志 func RemoveRemoteLog(task model.Task) error { - msg := msg_handler.NodeMessage{ + msg := entity.NodeMessage{ Type: constants.MsgTypeRemoveLog, LogPath: task.LogPath, TaskId: task.Id, } - msgBytes, err := json.Marshal(&msg) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return err - } // 发布获取日志消息 channel := "nodes:" + task.NodeId.Hex() - if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil { - log.Errorf(err.Error()) + if _, err := database.RedisClient.Publish(channel, utils.GetJson(msg)); err != nil { + log.Errorf("publish redis error: %s", err.Error()) + debug.PrintStack() return err } return nil @@ -119,10 +114,12 @@ func RemoveLogByTaskId(id string) error { func removeLog(t model.Task) { if err := RemoveLocalLog(t.LogPath); err != nil { - log.Error("remove local log error:" + err.Error()) + log.Errorf("remove local log error: %s", err.Error()) + debug.PrintStack() } if err := RemoveRemoteLog(t); err != nil { - log.Error("remove remote log error:" + err.Error()) + log.Errorf("remove remote log error: %s", err.Error()) + debug.PrintStack() } } @@ -130,7 +127,8 @@ func removeLog(t model.Task) { func RemoveLogBySpiderId(id bson.ObjectId) error { tasks, err := model.GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts") if err != nil { - log.Error("get tasks error:" + err.Error()) + log.Errorf("get tasks error: %s", err.Error()) + debug.PrintStack() } for _, task := range tasks { removeLog(task) diff --git a/backend/services/msg_handler/handler.go b/backend/services/msg_handler/handler.go index 61516bcf..848e0c5d 100644 --- a/backend/services/msg_handler/handler.go +++ b/backend/services/msg_handler/handler.go @@ -2,47 +2,34 @@ package msg_handler import ( "crawlab/constants" - "crawlab/model" + "crawlab/entity" ) type Handler interface { Handle() error } -func GetMsgHandler(msg NodeMessage) Handler { +func GetMsgHandler(msg entity.NodeMessage) Handler { if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog { + // 日志相关 return &Log{ msg: msg, } } else if msg.Type == constants.MsgTypeCancelTask { + // 任务相关 return &Task{ msg: msg, } } else if msg.Type == constants.MsgTypeGetSystemInfo { + // 系统信息相关 return &SystemInfo{ msg: msg, } + } else if msg.Type == constants.MsgTypeRemoveSpider { + // 爬虫相关 + return &Spider{ + SpiderId: msg.SpiderId, + } } return nil } - -type NodeMessage struct { - // 通信类别 - Type string `json:"type"` - - // 任务相关 - TaskId string `json:"task_id"` // 任务ID - - // 节点相关 - NodeId string `json:"node_id"` // 节点ID - - // 日志相关 - LogPath string `json:"log_path"` // 日志路径 - Log string `json:"log"` // 日志 - - // 系统信息 - SysInfo model.SystemInfo `json:"sys_info"` - - // 错误相关 - Error string `json:"error"` -} diff --git a/backend/services/msg_handler/msg_log.go b/backend/services/msg_handler/msg_log.go index 0d09d784..37080bd6 100644 --- a/backend/services/msg_handler/msg_log.go +++ b/backend/services/msg_handler/msg_log.go @@ -2,16 +2,15 @@ package msg_handler import ( "crawlab/constants" - "crawlab/database" + "crawlab/entity" "crawlab/model" "crawlab/utils" - "encoding/json" "github.com/apex/log" "runtime/debug" ) type Log struct { - msg NodeMessage + msg entity.NodeMessage } func (g *Log) Handle() error { @@ -25,31 +24,22 @@ func (g *Log) Handle() error { func (g *Log) get() error { // 发出的消息 - msgSd := NodeMessage{ + msgSd := entity.NodeMessage{ Type: constants.MsgTypeGetLog, TaskId: g.msg.TaskId, } // 获取本地日志 logStr, err := model.GetLocalLog(g.msg.LogPath) - log.Info(utils.BytesToString(logStr)) if err != nil { - log.Errorf(err.Error()) + log.Errorf("get node local log error: %s", err.Error()) debug.PrintStack() msgSd.Error = err.Error() msgSd.Log = err.Error() } else { msgSd.Log = utils.BytesToString(logStr) } - - // 序列化 - msgSdBytes, err := json.Marshal(&msgSd) - if err != nil { - return err - } - // 发布消息给主节点 - log.Info("publish get log msg to master") - if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { + if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil { return err } return nil diff --git a/backend/services/msg_handler/msg_spider.go b/backend/services/msg_handler/msg_spider.go new file mode 100644 index 00000000..dcd6ce06 --- /dev/null +++ b/backend/services/msg_handler/msg_spider.go @@ -0,0 +1,24 @@ +package msg_handler + +import ( + "crawlab/model" + "crawlab/utils" + "github.com/globalsign/mgo/bson" + "github.com/spf13/viper" + "path/filepath" +) + +type Spider struct { + SpiderId string +} + +func (s *Spider) Handle() error { + // 移除本地的爬虫目录 + spider, err := model.GetSpider(bson.ObjectIdHex(s.SpiderId)) + if err != nil { + return err + } + path := filepath.Join(viper.GetString("spider.path"), spider.Name) + utils.RemoveFiles(path) + return nil +} diff --git a/backend/services/msg_handler/msg_system_info.go b/backend/services/msg_handler/msg_system_info.go index c81cb0a0..6b88e2cf 100644 --- a/backend/services/msg_handler/msg_system_info.go +++ b/backend/services/msg_handler/msg_system_info.go @@ -2,16 +2,13 @@ package msg_handler import ( "crawlab/constants" - "crawlab/database" + "crawlab/entity" "crawlab/model" "crawlab/utils" - "encoding/json" - "github.com/apex/log" - "runtime/debug" ) type SystemInfo struct { - msg NodeMessage + msg entity.NodeMessage } func (s *SystemInfo) Handle() error { @@ -20,19 +17,12 @@ func (s *SystemInfo) Handle() error { if err != nil { return err } - msgSd := NodeMessage{ + msgSd := entity.NodeMessage{ Type: constants.MsgTypeGetSystemInfo, NodeId: s.msg.NodeId, SysInfo: sysInfo, } - msgSdBytes, err := json.Marshal(&msgSd) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return err - } - if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { - log.Errorf(err.Error()) + if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil { return err } return nil diff --git a/backend/services/msg_handler/msg_task.go b/backend/services/msg_handler/msg_task.go index 5f120f80..21b95430 100644 --- a/backend/services/msg_handler/msg_task.go +++ b/backend/services/msg_handler/msg_task.go @@ -2,6 +2,7 @@ package msg_handler import ( "crawlab/constants" + "crawlab/entity" "crawlab/model" "crawlab/utils" "github.com/apex/log" @@ -10,7 +11,7 @@ import ( ) type Task struct { - msg NodeMessage + msg entity.NodeMessage } func (t *Task) Handle() error { diff --git a/backend/services/node.go b/backend/services/node.go index e3397e74..53af8d32 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -1,9 +1,9 @@ package services import ( - "context" "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/lib/cron" "crawlab/model" "crawlab/services/msg_handler" @@ -175,7 +175,7 @@ func UpdateNodeData() { func MasterNodeCallback(message redis.Message) (err error) { // 反序列化 - var msg msg_handler.NodeMessage + var msg entity.NodeMessage if err := json.Unmarshal(message.Data, &msg); err != nil { return err @@ -183,7 +183,6 @@ func MasterNodeCallback(message redis.Message) (err error) { if msg.Type == constants.MsgTypeGetLog { // 获取日志 - fmt.Println(msg) time.Sleep(10 * time.Millisecond) ch := TaskLogChanMap.ChanBlocked(msg.TaskId) ch <- msg.Log @@ -200,14 +199,8 @@ func MasterNodeCallback(message redis.Message) (err error) { func WorkerNodeCallback(message redis.Message) (err error) { // 反序列化 - msg := msg_handler.NodeMessage{} - if err := json.Unmarshal(message.Data, &msg); err != nil { - - return err - } - - // worker message handle - if err := msg_handler.GetMsgHandler(msg).Handle(); err != nil { + msg := utils.GetMessage(message) + if err := msg_handler.GetMsgHandler(*msg).Handle(); err != nil { return err } return nil @@ -234,23 +227,25 @@ func InitNodeService() error { log.Errorf(err.Error()) return err } - ctx := context.Background() + if model.IsMaster() { // 如果为主节点,订阅主节点通信频道 - channel := "nodes:master" - err := database.RedisClient.Subscribe(ctx, MasterNodeCallback, channel) - if err != nil { + if err := utils.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil { return err } } else { // 若为工作节点,订阅单独指定通信频道 - channel := "nodes:" + node.Id.Hex() - err := database.RedisClient.Subscribe(ctx, WorkerNodeCallback, channel) - if err != nil { + channel := constants.ChannelWorkerNode + node.Id.Hex() + if err := utils.Sub(channel, WorkerNodeCallback); err != nil { return err } } + // 订阅全通道 + if err := utils.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil { + return err + } + // 如果为主节点,每30秒刷新所有节点信息 if model.IsMaster() { spec := "*/10 * * * * *" @@ -260,7 +255,7 @@ func InitNodeService() error { } } - // 更新在当前节点执行的任务状态为:abnormal + // 更新在当前节点执行中的任务状态为:abnormal if err := model.UpdateTaskToAbnormal(node.Id); err != nil { debug.PrintStack() return err diff --git a/backend/services/spider.go b/backend/services/spider.go index ea3d374f..a2e9a60f 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -3,6 +3,7 @@ package services import ( "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/lib/cron" "crawlab/model" "crawlab/services/spider_handler" @@ -65,6 +66,7 @@ func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err er return fid, nil } +// 写入grid fs func WriteToGridFS(content []byte, f *mgo.GridFile) { if _, err := f.Write(content); err != nil { debug.PrintStack() @@ -141,7 +143,7 @@ func PublishSpider(spider model.Spider) { return } // md5值不一样,则下载 - md5Str := utils.ReadFile(md5) + md5Str := utils.ReadFileOneLine(md5) if gfFile.Md5 != md5Str { spiderSync.RemoveSpiderFile() spiderSync.Download() @@ -150,6 +152,45 @@ func PublishSpider(spider model.Spider) { } } +func RemoveSpider(id string) error { + // 获取该爬虫 + spider, err := model.GetSpider(bson.ObjectIdHex(id)) + if err != nil { + return err + } + + // 删除爬虫文件目录 + path := filepath.Join(viper.GetString("spider.path"), spider.Name) + utils.RemoveFiles(path) + + // 删除其他节点的爬虫目录 + msg := entity.NodeMessage{ + Type: constants.MsgTypeRemoveSpider, + SpiderId: id, + } + if err := utils.Pub(constants.ChannelAllNode, msg); err != nil { + return err + } + + // 从数据库中删除该爬虫 + if err := model.RemoveSpider(bson.ObjectIdHex(id)); err != nil { + return err + } + + // 删除日志文件 + if err := RemoveLogBySpiderId(spider.Id); err != nil { + return err + } + + // 删除爬虫对应的task任务 + if err := model.RemoveTaskBySpiderId(spider.Id); err != nil { + return err + } + + // TODO 定时任务如何处理 + return nil +} + // 启动爬虫服务 func InitSpiderService() error { // 构造定时任务执行器 diff --git a/backend/services/system.go b/backend/services/system.go index 2c7cd05a..92f9cf96 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -3,17 +3,17 @@ package services import ( "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/model" - "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" ) var SystemInfoChanMap = utils.NewChanMap() -func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { +func GetRemoteSystemInfo(id string) (sysInfo entity.SystemInfo, err error) { // 发送消息 - msg := msg_handler.NodeMessage{ + msg := entity.NodeMessage{ Type: constants.MsgTypeGetSystemInfo, NodeId: id, } @@ -21,7 +21,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { // 序列化 msgBytes, _ := json.Marshal(&msg) if _, err := database.RedisClient.Publish("nodes:"+id, utils.BytesToString(msgBytes)); err != nil { - return model.SystemInfo{}, err + return entity.SystemInfo{}, err } // 通道 @@ -38,7 +38,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { return sysInfo, nil } -func GetSystemInfo(id string) (sysInfo model.SystemInfo, err error) { +func GetSystemInfo(id string) (sysInfo entity.SystemInfo, err error) { if IsMasterNode(id) { sysInfo, err = model.GetLocalSystemInfo() } else { diff --git a/backend/services/task.go b/backend/services/task.go index 2a68f10e..0e8db964 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -3,9 +3,9 @@ package services import ( "crawlab/constants" "crawlab/database" + "crawlab/entity" "crawlab/lib/cron" "crawlab/model" - "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" "errors" @@ -493,7 +493,7 @@ func CancelTask(id string) (err error) { // 任务节点为工作节点 // 序列化消息 - msg := msg_handler.NodeMessage{ + msg := entity.NodeMessage{ Type: constants.MsgTypeCancelTask, TaskId: id, } diff --git a/backend/utils/file.go b/backend/utils/file.go index 282bee14..d65e7ab1 100644 --- a/backend/utils/file.go +++ b/backend/utils/file.go @@ -10,7 +10,16 @@ import ( "runtime/debug" ) -func ReadFile(fileName string) string { +// 删除文件 +func RemoveFiles(path string) { + if err := os.RemoveAll(path); err != nil { + log.Errorf("remove files error: %s, path: %s", err.Error(), path) + debug.PrintStack() + } +} + +// 读取文件一行 +func ReadFileOneLine(fileName string) string { file := OpenFile(fileName) defer file.Close() buf := bufio.NewReader(file) diff --git a/backend/utils/helpers.go b/backend/utils/helpers.go index 8e6de815..edc6200e 100644 --- a/backend/utils/helpers.go +++ b/backend/utils/helpers.go @@ -1,7 +1,55 @@ package utils -import "unsafe" +import ( + "context" + "crawlab/database" + "crawlab/entity" + "encoding/json" + "github.com/apex/log" + "github.com/gomodule/redigo/redis" + "runtime/debug" + "unsafe" +) func BytesToString(b []byte) string { return *(*string)(unsafe.Pointer(&b)) } + +func GetJson(message entity.NodeMessage) string { + msgBytes, err := json.Marshal(&message) + if err != nil { + log.Errorf("node message to json error: %s", err.Error()) + debug.PrintStack() + return "" + } + return BytesToString(msgBytes) +} + +func GetMessage(message redis.Message) *entity.NodeMessage { + msg := entity.NodeMessage{} + if err := json.Unmarshal(message.Data, &msg); err != nil { + log.Errorf("message byte to object error: %s", err.Error()) + debug.PrintStack() + return nil + } + return &msg +} + +func Pub(channel string, msg entity.NodeMessage) error { + if _, err := database.RedisClient.Publish(channel, GetJson(msg)); err != nil { + log.Errorf("publish redis error: %s", err.Error()) + debug.PrintStack() + return err + } + return nil +} + +func Sub(channel string, consume database.ConsumeFunc) error { + ctx := context.Background() + if err := database.RedisClient.Subscribe(ctx, consume, channel); err != nil { + log.Errorf("subscribe redis error: %s", err.Error()) + debug.PrintStack() + return err + } + return nil +}