重构RPC逻辑

This commit is contained in:
marvzhang
2020-03-10 12:08:26 +08:00
parent ab33640a50
commit e5b4ac6310
13 changed files with 386 additions and 227 deletions

11
backend/entity/rpc.go Normal file
View File

@@ -0,0 +1,11 @@
package entity
type RpcMessage struct {
Id string `json:"id"`
Method string `json:"method"`
NodeId string `json:"node_id"`
Params map[string]string `json:"params"`
Timeout int `json:"timeout"`
Result string `json:"result"`
Error string `json:"error"`
}

View File

@@ -9,6 +9,7 @@ import (
"crawlab/model" "crawlab/model"
"crawlab/routes" "crawlab/routes"
"crawlab/services" "crawlab/services"
"crawlab/services/rpc"
"github.com/apex/log" "github.com/apex/log"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding" "github.com/gin-gonic/gin/binding"
@@ -116,7 +117,7 @@ func main() {
log.Info("initialized spider service successfully") log.Info("initialized spider service successfully")
// 初始化RPC服务 // 初始化RPC服务
if err := services.InitRpcService(); err != nil { if err := rpc.InitRpcService(); err != nil {
log.Error("init rpc service error:" + err.Error()) log.Error("init rpc service error:" + err.Error())
debug.PrintStack() debug.PrintStack()
panic(err) panic(err)

View File

@@ -4,6 +4,7 @@ import (
"crawlab/constants" "crawlab/constants"
"crawlab/entity" "crawlab/entity"
"crawlab/services" "crawlab/services"
"crawlab/services/rpc"
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"net/http" "net/http"
@@ -288,37 +289,18 @@ func InstallLang(c *gin.Context) {
return return
} }
if reqBody.Lang == constants.Nodejs { if services.IsMasterNode(nodeId) {
if services.IsMasterNode(nodeId) { _, err := rpc.InstallLocalLang(reqBody.Lang)
_, err := services.InstallNodejsLocalLang() if err != nil {
if err != nil { HandleError(http.StatusInternalServerError, c, err)
HandleError(http.StatusInternalServerError, c, err) return
return
}
} else {
_, err := services.InstallNodejsRemoteLang(nodeId)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
} else if reqBody.Lang == constants.Java {
if services.IsMasterNode(nodeId) {
_, err := services.InstallJavaLocalLang()
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else {
_, err := services.InstallJavaRemoteLang(nodeId)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} }
} else { } else {
HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", reqBody.Lang)) _, err := rpc.InstallRemoteLang(nodeId, reqBody.Lang)
return if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} }
// TODO: check if install is successful // TODO: check if install is successful

View File

@@ -11,7 +11,7 @@ type Handler interface {
} }
func GetMsgHandler(msg entity.NodeMessage) Handler { func GetMsgHandler(msg entity.NodeMessage) Handler {
log.Infof("received msg , type is : %s", msg.Type) log.Debugf("received msg , type is : %s", msg.Type)
if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog { if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog {
// 日志相关 // 日志相关
return &Log{ return &Log{

View File

@@ -23,67 +23,6 @@ type RpcMessage struct {
Result string `json:"result"` Result string `json:"result"`
} }
// ========安装语言========
func RpcServerInstallLang(msg RpcMessage) RpcMessage {
lang := GetRpcParam("lang", msg.Params)
if lang == constants.Nodejs {
output, _ := InstallNodejsLocalLang()
msg.Result = output
}
return msg
}
func RpcClientInstallLang(nodeId string, lang string) (output string, err error) {
params := map[string]string{}
params["lang"] = lang
// 发起 RPC 请求,获取服务端数据
go func() {
_, err := RpcClientFunc(nodeId, constants.RpcInstallLang, params, 600)()
if err != nil {
return
}
}()
return
}
// ========./安装语言========
// ========获取语言========
func RpcServerGetLang(msg RpcMessage) RpcMessage {
langName := GetRpcParam("lang", msg.Params)
lang := GetLangFromLangNamePlain(langName)
l := GetLangLocal(lang)
lang.InstallStatus = l.InstallStatus
// 序列化
resultStr, _ := json.Marshal(lang)
msg.Result = string(resultStr)
return msg
}
func RpcClientGetLang(nodeId string, langName string) (lang entity.Lang, err error) {
params := map[string]string{}
params["lang"] = langName
data, err := RpcClientFunc(nodeId, constants.RpcGetLang, params, 30)()
if err != nil {
return
}
// 反序列化结果
if err := json.Unmarshal([]byte(data), &lang); err != nil {
return lang, err
}
return
}
// ========./获取语言========
// ========安装依赖======== // ========安装依赖========
func RpcServerInstallDep(msg RpcMessage) RpcMessage { func RpcServerInstallDep(msg RpcMessage) RpcMessage {
@@ -272,12 +211,6 @@ func InitRpcService() error {
replyMsg = RpcServerInstallDep(msg) replyMsg = RpcServerInstallDep(msg)
} else if msg.Method == constants.RpcUninstallDep { } else if msg.Method == constants.RpcUninstallDep {
replyMsg = RpcServerUninstallDep(msg) replyMsg = RpcServerUninstallDep(msg)
} else if msg.Method == constants.RpcInstallLang {
replyMsg = RpcServerInstallLang(msg)
} else if msg.Method == constants.RpcGetInstalledDepList {
replyMsg = RpcServerGetInstalledDepList(node.Id.Hex(), msg)
} else if msg.Method == constants.RpcGetLang {
replyMsg = RpcServerGetLang(msg)
} else { } else {
continue continue
} }

View File

@@ -0,0 +1,114 @@
package rpc
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"fmt"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"runtime/debug"
)
type Service interface {
ServerHandle() (entity.RpcMessage, error)
ClientHandle() (interface{}, error)
}
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
return func() (replyMsg entity.RpcMessage, err error) {
// 请求ID
msg.Id = uuid.NewV4().String()
// 发送RPC消息
msgStr := utils.ObjectToString(msg)
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 获取RPC回复消息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
if err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 反序列化消息
if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
return
}
}
func GetService(msg entity.RpcMessage) Service {
if msg.Method == constants.RpcInstallLang {
return &InstallLangService{msg: msg}
} else if msg.Method == constants.RpcGetLang {
return &GetLangService{msg: msg}
}
return nil
}
func InitRpcService() error {
for i := 0; i < viper.GetInt("rpc.workers"); i++ {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取获取消息队列信息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(dataStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取service
service := GetService(msg)
// 根据Method调用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
}
}()
}
return nil
}

View File

@@ -0,0 +1,82 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/utils"
"encoding/json"
)
type GetLangService struct {
msg entity.RpcMessage
}
func (s *GetLangService) ServerHandle() (entity.RpcMessage, error) {
langName := utils.GetRpcParam("lang", s.msg.Params)
lang := utils.GetLangFromLangNamePlain(langName)
l := GetLangLocal(lang)
lang.InstallStatus = l.InstallStatus
// 序列化
resultStr, _ := json.Marshal(lang)
s.msg.Result = string(resultStr)
return s.msg, nil
}
func (s *GetLangService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
var output entity.Lang
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}
func GetLangLocal(lang entity.Lang) entity.Lang {
// 检查是否存在执行路径
for _, p := range lang.ExecutablePaths {
if utils.Exists(p) {
lang.InstallStatus = constants.InstallStatusInstalled
return lang
}
}
// 检查是否正在安装
if utils.Exists(lang.LockPath) {
lang.InstallStatus = constants.InstallStatusInstalling
return lang
}
// 检查其他语言是否在安装
if utils.Exists("/tmp/install.lock") {
lang.InstallStatus = constants.InstallStatusInstallingOther
return lang
}
lang.InstallStatus = constants.InstallStatusNotInstalled
return lang
}
func GetLangRemote(nodeId string, lang entity.Lang) (l entity.Lang, err error) {
params := make(map[string]string)
params["lang"] = lang.ExecutableName
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetLang,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
l = o.(entity.Lang)
return
}

View File

@@ -0,0 +1 @@
package rpc

View File

@@ -0,0 +1,100 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/utils"
"errors"
"fmt"
"github.com/apex/log"
"os/exec"
"path"
"runtime/debug"
)
type InstallLangService struct {
msg entity.RpcMessage
}
func (s *InstallLangService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
output, err := InstallLocalLang(lang)
s.msg.Result = output
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
return s.msg, nil
}
func (s *InstallLangService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
go func() {
_, err := ClientFunc(s.msg)()
if err != nil {
return
}
}()
return
}
// 本地安装Node.js
func InstallNodejsLocalLang() (string, error) {
cmd := exec.Command("/bin/sh", path.Join("scripts", "install-nodejs.sh"))
output, err := cmd.Output()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return string(output), err
}
// TODO: check if Node.js is installed successfully
return string(output), nil
}
// 本地安装Java
func InstallJavaLocalLang() (string, error) {
cmd := exec.Command("/bin/sh", path.Join("scripts", "install-java.sh"))
output, err := cmd.Output()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return string(output), err
}
// TODO: check if Java is installed successfully
return string(output), nil
}
// 本地安装语言
func InstallLocalLang(lang string) (o string, err error) {
if lang == constants.Nodejs {
o, err = InstallNodejsLocalLang()
} else if lang == constants.Java {
o, err = InstallNodejsLocalLang()
} else {
return "", errors.New(fmt.Sprintf("%s is not implemented", lang))
}
return
}
// 远端安装语言
func InstallRemoteLang(nodeId string, lang string) (o string, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcInstallLang,
Params: params,
Timeout: 60,
})
output, err := s.ClientHandle()
o = output.(string)
if err != nil {
return
}
return
}

View File

@@ -6,6 +6,7 @@ import (
"crawlab/entity" "crawlab/entity"
"crawlab/lib/cron" "crawlab/lib/cron"
"crawlab/model" "crawlab/model"
"crawlab/services/rpc"
"crawlab/utils" "crawlab/utils"
"encoding/json" "encoding/json"
"errors" "errors"
@@ -13,7 +14,6 @@ import (
"github.com/apex/log" "github.com/apex/log"
"github.com/imroc/req" "github.com/imroc/req"
"os/exec" "os/exec"
"path"
"regexp" "regexp"
"runtime/debug" "runtime/debug"
"sort" "sort"
@@ -62,35 +62,9 @@ func GetSystemInfo(nodeId string) (sysInfo entity.SystemInfo, err error) {
return return
} }
func getLangList() []entity.Lang {
list := []entity.Lang{
{
Name: "Python",
ExecutableName: "python",
ExecutablePaths: []string{"/usr/bin/python", "/usr/local/bin/python"},
DepExecutablePath: "/usr/local/bin/pip",
LockPath: "/tmp/install-python.lock",
},
{
Name: "Node.js",
ExecutableName: "node",
ExecutablePaths: []string{"/usr/bin/node", "/usr/local/bin/node"},
DepExecutablePath: "/usr/local/bin/npm",
LockPath: "/tmp/install-nodejs.lock",
},
{
Name: "Java",
ExecutableName: "java",
ExecutablePaths: []string{"/usr/bin/java", "/usr/local/bin/java"},
LockPath: "/tmp/install-java.lock",
},
}
return list
}
// 获取语言列表 // 获取语言列表
func GetLangList(nodeId string) []entity.Lang { func GetLangList(nodeId string) []entity.Lang {
list := getLangList() list := utils.GetLangList()
for i, lang := range list { for i, lang := range list {
status, _ := GetLangInstallStatus(nodeId, lang) status, _ := GetLangInstallStatus(nodeId, lang)
list[i].InstallStatus = status list[i].InstallStatus = status
@@ -98,12 +72,6 @@ func GetLangList(nodeId string) []entity.Lang {
return list return list
} }
// 获取语言列表
func GetLangListPlain() []entity.Lang {
list := getLangList()
return list
}
// 根据语言名获取语言实例 // 根据语言名获取语言实例
func GetLangFromLangName(nodeId string, name string) entity.Lang { func GetLangFromLangName(nodeId string, name string) entity.Lang {
langList := GetLangList(nodeId) langList := GetLangList(nodeId)
@@ -115,23 +83,12 @@ func GetLangFromLangName(nodeId string, name string) entity.Lang {
return entity.Lang{} return entity.Lang{}
} }
// 根据语言名获取语言实例,不包含状态
func GetLangFromLangNamePlain(name string) entity.Lang {
langList := GetLangListPlain()
for _, lang := range langList {
if lang.ExecutableName == name {
return lang
}
}
return entity.Lang{}
}
func GetLangInstallStatus(nodeId string, lang entity.Lang) (string, error) { func GetLangInstallStatus(nodeId string, lang entity.Lang) (string, error) {
if IsMasterNode(nodeId) { if IsMasterNode(nodeId) {
lang := GetLangLocal(lang) lang := rpc.GetLangLocal(lang)
return lang.InstallStatus, nil return lang.InstallStatus, nil
} else { } else {
lang, err := GetLangRemote(nodeId, lang) lang, err := rpc.GetLangRemote(nodeId, lang)
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -139,39 +96,6 @@ func GetLangInstallStatus(nodeId string, lang entity.Lang) (string, error) {
} }
} }
func GetLangLocal(lang entity.Lang) entity.Lang {
// 检查是否存在执行路径
for _, p := range lang.ExecutablePaths {
if utils.Exists(p) {
lang.InstallStatus = constants.InstallStatusInstalled
return lang
}
}
// 检查是否正在安装
if utils.Exists(lang.LockPath) {
lang.InstallStatus = constants.InstallStatusInstalling
return lang
}
// 检查其他语言是否在安装
if utils.Exists("/tmp/install.lock") {
lang.InstallStatus = constants.InstallStatusInstallingOther
return lang
}
lang.InstallStatus = constants.InstallStatusNotInstalled
return lang
}
func GetLangRemote(nodeId string, lang entity.Lang) (entity.Lang, error) {
l, err := RpcClientGetLang(nodeId, lang.ExecutableName)
if err != nil {
return l, err
}
return l, nil
}
// 是否已安装该依赖 // 是否已安装该依赖
func IsInstalledLang(nodeId string, lang entity.Lang) bool { func IsInstalledLang(nodeId string, lang entity.Lang) bool {
sysInfo, err := GetSystemInfo(nodeId) sysInfo, err := GetSystemInfo(nodeId)
@@ -525,30 +449,6 @@ func UninstallPythonRemoteDep(nodeId string, depName string) (string, error) {
// ========Node.js======== // ========Node.js========
// 本地安装Node.js
func InstallNodejsLocalLang() (string, error) {
cmd := exec.Command("/bin/sh", path.Join("scripts", "install-nodejs.sh"))
output, err := cmd.Output()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return string(output), err
}
// TODO: check if Node.js is installed successfully
return string(output), nil
}
// 远端安装Node.js
func InstallNodejsRemoteLang(nodeId string) (string, error) {
output, err := RpcClientInstallLang(nodeId, constants.Nodejs)
if err != nil {
return output, err
}
return output, nil
}
// 获取Nodejs本地已安装的依赖列表 // 获取Nodejs本地已安装的依赖列表
func GetNodejsLocalInstalledDepList(nodeId string) ([]entity.Dependency, error) { func GetNodejsLocalInstalledDepList(nodeId string) ([]entity.Dependency, error) {
var list []entity.Dependency var list []entity.Dependency
@@ -675,28 +575,4 @@ func GetNodejsDepList(nodeId string, searchDepName string) (depList []entity.Dep
// ========Java======== // ========Java========
// 本地安装Java
func InstallJavaLocalLang() (string, error) {
cmd := exec.Command("/bin/sh", path.Join("scripts", "install-java.sh"))
output, err := cmd.Output()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return string(output), err
}
// TODO: check if Java is installed successfully
return string(output), nil
}
// 远端安装Java
func InstallJavaRemoteLang(nodeId string) (string, error) {
output, err := RpcClientInstallLang(nodeId, constants.Java)
if err != nil {
return output, err
}
return output, nil
}
// ========./Java======== // ========./Java========

14
backend/utils/rpc.go Normal file
View File

@@ -0,0 +1,14 @@
package utils
import "encoding/json"
// Object 转化为 String
func ObjectToString(params interface{}) string {
bytes, _ := json.Marshal(params)
return BytesToString(bytes)
}
// 获取 RPC 参数
func GetRpcParam(key string, params map[string]string) string {
return params[key]
}

46
backend/utils/system.go Normal file
View File

@@ -0,0 +1,46 @@
package utils
import "crawlab/entity"
func GetLangList() []entity.Lang {
list := []entity.Lang{
{
Name: "Python",
ExecutableName: "python",
ExecutablePaths: []string{"/usr/bin/python", "/usr/local/bin/python"},
DepExecutablePath: "/usr/local/bin/pip",
LockPath: "/tmp/install-python.lock",
},
{
Name: "Node.js",
ExecutableName: "node",
ExecutablePaths: []string{"/usr/bin/node", "/usr/local/bin/node"},
DepExecutablePath: "/usr/local/bin/npm",
LockPath: "/tmp/install-nodejs.lock",
},
{
Name: "Java",
ExecutableName: "java",
ExecutablePaths: []string{"/usr/bin/java", "/usr/local/bin/java"},
LockPath: "/tmp/install-java.lock",
},
}
return list
}
// 获取语言列表
func GetLangListPlain() []entity.Lang {
list := GetLangList()
return list
}
// 根据语言名获取语言实例,不包含状态
func GetLangFromLangNamePlain(name string) entity.Lang {
langList := GetLangListPlain()
for _, lang := range langList {
if lang.ExecutableName == name {
return lang
}
}
return entity.Lang{}
}

View File

@@ -31,4 +31,3 @@ spec:
value: "Y" value: "Y"
- name: CRAWLAB_SERVER_REGISTER_TYPE - name: CRAWLAB_SERVER_REGISTER_TYPE
value: "hostname" value: "hostname"