Files
crawlab/backend/services/rpc/base.go
2020-03-11 10:22:39 +08:00

133 lines
3.0 KiB
Go

package rpc
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"errors"
"fmt"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
uuid "github.com/satori/go.uuid"
"runtime/debug"
)
// RPC服务基础类
type Service interface {
ServerHandle() (entity.RpcMessage, error)
ClientHandle() (interface{}, error)
}
// 客户端处理消息函数
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
return func() (replyMsg entity.RpcMessage, err error) {
// 请求ID
msg.Id = uuid.NewV4().String()
// 发送RPC消息
msgStr := utils.ObjectToString(msg)
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 获取RPC回复消息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
if err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 反序列化消息
if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 如果返回消息有错误,返回错误
if replyMsg.Error != "" {
return replyMsg, errors.New(replyMsg.Error)
}
return
}
}
// 获取RPC服务
func GetService(msg entity.RpcMessage) Service {
switch msg.Method {
case constants.RpcInstallLang:
return &InstallLangService{msg: msg}
case constants.RpcInstallDep:
return &InstallDepService{msg: msg}
case constants.RpcUninstallDep:
return &UninstallDepService{msg: msg}
case constants.RpcGetLang:
return &GetLangService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
}
return nil
}
// 处理RPC消息
func handleMsg(msgStr string, node model.Node) {
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 获取service
service := GetService(msg)
// 根据Method调用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}
// 初始化服务端RPC服务
func InitRpcService() error {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取获取消息队列信息
msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 处理消息
go handleMsg(msgStr, node)
}
}()
return nil
}