From e5b4ac63107cfaba0595b0ec134fd74e837d1fbd Mon Sep 17 00:00:00 2001 From: marvzhang Date: Tue, 10 Mar 2020 12:08:26 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84RPC=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/entity/rpc.go | 11 ++ backend/main.go | 3 +- backend/routes/system.go | 40 ++----- backend/services/msg_handler/handler.go | 2 +- backend/services/rpc.go | 67 ------------ backend/services/rpc/base.go | 114 ++++++++++++++++++++ backend/services/rpc/get_lang.go | 82 +++++++++++++++ backend/services/rpc/get_lang_deps.go | 1 + backend/services/rpc/install_lang.go | 100 ++++++++++++++++++ backend/services/system.go | 132 +----------------------- backend/utils/rpc.go | 14 +++ backend/utils/system.go | 46 +++++++++ devops/develop/crawlab-worker.yaml | 1 - 13 files changed, 386 insertions(+), 227 deletions(-) create mode 100644 backend/entity/rpc.go create mode 100644 backend/services/rpc/base.go create mode 100644 backend/services/rpc/get_lang.go create mode 100644 backend/services/rpc/get_lang_deps.go create mode 100644 backend/services/rpc/install_lang.go create mode 100644 backend/utils/rpc.go create mode 100644 backend/utils/system.go diff --git a/backend/entity/rpc.go b/backend/entity/rpc.go new file mode 100644 index 00000000..3f5ddcea --- /dev/null +++ b/backend/entity/rpc.go @@ -0,0 +1,11 @@ +package entity + +type RpcMessage struct { + Id string `json:"id"` + Method string `json:"method"` + NodeId string `json:"node_id"` + Params map[string]string `json:"params"` + Timeout int `json:"timeout"` + Result string `json:"result"` + Error string `json:"error"` +} diff --git a/backend/main.go b/backend/main.go index ea9c30df..8b7af36f 100644 --- a/backend/main.go +++ b/backend/main.go @@ -9,6 +9,7 @@ import ( "crawlab/model" "crawlab/routes" "crawlab/services" + "crawlab/services/rpc" "github.com/apex/log" "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" @@ -116,7 +117,7 @@ func main() { log.Info("initialized spider service successfully") // 初始化RPC服务 - if err := services.InitRpcService(); err != nil { + if err := rpc.InitRpcService(); err != nil { log.Error("init rpc service error:" + err.Error()) debug.PrintStack() panic(err) diff --git a/backend/routes/system.go b/backend/routes/system.go index a63febb8..98cdb4cf 100644 --- a/backend/routes/system.go +++ b/backend/routes/system.go @@ -4,6 +4,7 @@ import ( "crawlab/constants" "crawlab/entity" "crawlab/services" + "crawlab/services/rpc" "fmt" "github.com/gin-gonic/gin" "net/http" @@ -288,37 +289,18 @@ func InstallLang(c *gin.Context) { return } - if reqBody.Lang == constants.Nodejs { - if services.IsMasterNode(nodeId) { - _, err := services.InstallNodejsLocalLang() - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - } else { - _, err := services.InstallNodejsRemoteLang(nodeId) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - } - } else if reqBody.Lang == constants.Java { - if services.IsMasterNode(nodeId) { - _, err := services.InstallJavaLocalLang() - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - } else { - _, err := services.InstallJavaRemoteLang(nodeId) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } + if services.IsMasterNode(nodeId) { + _, err := rpc.InstallLocalLang(reqBody.Lang) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return } } else { - HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", reqBody.Lang)) - return + _, err := rpc.InstallRemoteLang(nodeId, reqBody.Lang) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } } // TODO: check if install is successful diff --git a/backend/services/msg_handler/handler.go b/backend/services/msg_handler/handler.go index b8b8e231..bd98fce6 100644 --- a/backend/services/msg_handler/handler.go +++ b/backend/services/msg_handler/handler.go @@ -11,7 +11,7 @@ type Handler interface { } func GetMsgHandler(msg entity.NodeMessage) Handler { - log.Infof("received msg , type is : %s", msg.Type) + log.Debugf("received msg , type is : %s", msg.Type) if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog { // 日志相关 return &Log{ diff --git a/backend/services/rpc.go b/backend/services/rpc.go index 8b33a542..263cabbd 100644 --- a/backend/services/rpc.go +++ b/backend/services/rpc.go @@ -23,67 +23,6 @@ type RpcMessage struct { Result string `json:"result"` } -// ========安装语言======== - -func RpcServerInstallLang(msg RpcMessage) RpcMessage { - lang := GetRpcParam("lang", msg.Params) - if lang == constants.Nodejs { - output, _ := InstallNodejsLocalLang() - msg.Result = output - } - return msg -} - -func RpcClientInstallLang(nodeId string, lang string) (output string, err error) { - params := map[string]string{} - params["lang"] = lang - - // 发起 RPC 请求,获取服务端数据 - go func() { - _, err := RpcClientFunc(nodeId, constants.RpcInstallLang, params, 600)() - if err != nil { - return - } - }() - - return -} - -// ========./安装语言======== - -// ========获取语言======== - -func RpcServerGetLang(msg RpcMessage) RpcMessage { - langName := GetRpcParam("lang", msg.Params) - lang := GetLangFromLangNamePlain(langName) - l := GetLangLocal(lang) - lang.InstallStatus = l.InstallStatus - - // 序列化 - resultStr, _ := json.Marshal(lang) - msg.Result = string(resultStr) - return msg -} - -func RpcClientGetLang(nodeId string, langName string) (lang entity.Lang, err error) { - params := map[string]string{} - params["lang"] = langName - - data, err := RpcClientFunc(nodeId, constants.RpcGetLang, params, 30)() - if err != nil { - return - } - - // 反序列化结果 - if err := json.Unmarshal([]byte(data), &lang); err != nil { - return lang, err - } - - return -} - -// ========./获取语言======== - // ========安装依赖======== func RpcServerInstallDep(msg RpcMessage) RpcMessage { @@ -272,12 +211,6 @@ func InitRpcService() error { replyMsg = RpcServerInstallDep(msg) } else if msg.Method == constants.RpcUninstallDep { replyMsg = RpcServerUninstallDep(msg) - } else if msg.Method == constants.RpcInstallLang { - replyMsg = RpcServerInstallLang(msg) - } else if msg.Method == constants.RpcGetInstalledDepList { - replyMsg = RpcServerGetInstalledDepList(node.Id.Hex(), msg) - } else if msg.Method == constants.RpcGetLang { - replyMsg = RpcServerGetLang(msg) } else { continue } diff --git a/backend/services/rpc/base.go b/backend/services/rpc/base.go new file mode 100644 index 00000000..143f8048 --- /dev/null +++ b/backend/services/rpc/base.go @@ -0,0 +1,114 @@ +package rpc + +import ( + "crawlab/constants" + "crawlab/database" + "crawlab/entity" + "crawlab/model" + "crawlab/utils" + "encoding/json" + "fmt" + "github.com/apex/log" + "github.com/gomodule/redigo/redis" + uuid "github.com/satori/go.uuid" + "github.com/spf13/viper" + "runtime/debug" +) + +type Service interface { + ServerHandle() (entity.RpcMessage, error) + ClientHandle() (interface{}, error) +} + +func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) { + return func() (replyMsg entity.RpcMessage, err error) { + // 请求ID + msg.Id = uuid.NewV4().String() + + // 发送RPC消息 + msgStr := utils.ObjectToString(msg) + if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil { + log.Errorf("RpcClientFunc error: " + err.Error()) + debug.PrintStack() + return replyMsg, err + } + + // 获取RPC回复消息 + dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout) + if err != nil { + log.Errorf("RpcClientFunc error: " + err.Error()) + debug.PrintStack() + return replyMsg, err + } + + // 反序列化消息 + if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil { + log.Errorf("RpcClientFunc error: " + err.Error()) + debug.PrintStack() + return replyMsg, err + } + + return + } +} + +func GetService(msg entity.RpcMessage) Service { + if msg.Method == constants.RpcInstallLang { + return &InstallLangService{msg: msg} + } else if msg.Method == constants.RpcGetLang { + return &GetLangService{msg: msg} + } + return nil +} + +func InitRpcService() error { + for i := 0; i < viper.GetInt("rpc.workers"); i++ { + go func() { + for { + // 获取当前节点 + node, err := model.GetCurrentNode() + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } + + // 获取获取消息队列信息 + dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0) + if err != nil { + if err != redis.ErrNil { + log.Errorf(err.Error()) + debug.PrintStack() + } + continue + } + + // 反序列化消息 + var msg entity.RpcMessage + if err := json.Unmarshal([]byte(dataStr), &msg); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } + + // 获取service + service := GetService(msg) + + // 根据Method调用本地方法 + replyMsg, err := service.ServerHandle() + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + } + + // 发送返回消息 + if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } + } + }() + } + return nil +} diff --git a/backend/services/rpc/get_lang.go b/backend/services/rpc/get_lang.go new file mode 100644 index 00000000..d0662984 --- /dev/null +++ b/backend/services/rpc/get_lang.go @@ -0,0 +1,82 @@ +package rpc + +import ( + "crawlab/constants" + "crawlab/entity" + "crawlab/utils" + "encoding/json" +) + +type GetLangService struct { + msg entity.RpcMessage +} + +func (s *GetLangService) ServerHandle() (entity.RpcMessage, error) { + langName := utils.GetRpcParam("lang", s.msg.Params) + lang := utils.GetLangFromLangNamePlain(langName) + l := GetLangLocal(lang) + lang.InstallStatus = l.InstallStatus + + // 序列化 + resultStr, _ := json.Marshal(lang) + s.msg.Result = string(resultStr) + return s.msg, nil +} + +func (s *GetLangService) ClientHandle() (o interface{}, err error) { + // 发起 RPC 请求,获取服务端数据 + s.msg, err = ClientFunc(s.msg)() + if err != nil { + return o, err + } + + var output entity.Lang + if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil { + return o, err + } + o = output + + return +} + +func GetLangLocal(lang entity.Lang) entity.Lang { + // 检查是否存在执行路径 + for _, p := range lang.ExecutablePaths { + if utils.Exists(p) { + lang.InstallStatus = constants.InstallStatusInstalled + return lang + } + } + + // 检查是否正在安装 + if utils.Exists(lang.LockPath) { + lang.InstallStatus = constants.InstallStatusInstalling + return lang + } + + // 检查其他语言是否在安装 + if utils.Exists("/tmp/install.lock") { + lang.InstallStatus = constants.InstallStatusInstallingOther + return lang + } + + lang.InstallStatus = constants.InstallStatusNotInstalled + return lang +} + +func GetLangRemote(nodeId string, lang entity.Lang) (l entity.Lang, err error) { + params := make(map[string]string) + params["lang"] = lang.ExecutableName + s := GetService(entity.RpcMessage{ + NodeId: nodeId, + Method: constants.RpcGetLang, + Params: params, + Timeout: 60, + }) + o, err := s.ClientHandle() + if err != nil { + return + } + l = o.(entity.Lang) + return +} diff --git a/backend/services/rpc/get_lang_deps.go b/backend/services/rpc/get_lang_deps.go new file mode 100644 index 00000000..9ab1e3e8 --- /dev/null +++ b/backend/services/rpc/get_lang_deps.go @@ -0,0 +1 @@ +package rpc diff --git a/backend/services/rpc/install_lang.go b/backend/services/rpc/install_lang.go new file mode 100644 index 00000000..725113b8 --- /dev/null +++ b/backend/services/rpc/install_lang.go @@ -0,0 +1,100 @@ +package rpc + +import ( + "crawlab/constants" + "crawlab/entity" + "crawlab/utils" + "errors" + "fmt" + "github.com/apex/log" + "os/exec" + "path" + "runtime/debug" +) + +type InstallLangService struct { + msg entity.RpcMessage +} + +func (s *InstallLangService) ServerHandle() (entity.RpcMessage, error) { + lang := utils.GetRpcParam("lang", s.msg.Params) + output, err := InstallLocalLang(lang) + s.msg.Result = output + if err != nil { + s.msg.Error = err.Error() + return s.msg, err + } + return s.msg, nil +} + +func (s *InstallLangService) ClientHandle() (o interface{}, err error) { + // 发起 RPC 请求,获取服务端数据 + go func() { + _, err := ClientFunc(s.msg)() + if err != nil { + return + } + }() + + return +} + +// 本地安装Node.js +func InstallNodejsLocalLang() (string, error) { + cmd := exec.Command("/bin/sh", path.Join("scripts", "install-nodejs.sh")) + output, err := cmd.Output() + if err != nil { + log.Error(err.Error()) + debug.PrintStack() + return string(output), err + } + + // TODO: check if Node.js is installed successfully + + return string(output), nil +} + +// 本地安装Java +func InstallJavaLocalLang() (string, error) { + cmd := exec.Command("/bin/sh", path.Join("scripts", "install-java.sh")) + output, err := cmd.Output() + if err != nil { + log.Error(err.Error()) + debug.PrintStack() + return string(output), err + } + + // TODO: check if Java is installed successfully + + return string(output), nil +} + +// 本地安装语言 +func InstallLocalLang(lang string) (o string, err error) { + if lang == constants.Nodejs { + o, err = InstallNodejsLocalLang() + } else if lang == constants.Java { + o, err = InstallNodejsLocalLang() + } else { + return "", errors.New(fmt.Sprintf("%s is not implemented", lang)) + } + return +} + +// 远端安装语言 +func InstallRemoteLang(nodeId string, lang string) (o string, err error) { + params := make(map[string]string) + params["lang"] = lang + s := GetService(entity.RpcMessage{ + NodeId: nodeId, + Method: constants.RpcInstallLang, + Params: params, + Timeout: 60, + }) + output, err := s.ClientHandle() + o = output.(string) + if err != nil { + return + } + return +} diff --git a/backend/services/system.go b/backend/services/system.go index 829c2657..4c305861 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -6,6 +6,7 @@ import ( "crawlab/entity" "crawlab/lib/cron" "crawlab/model" + "crawlab/services/rpc" "crawlab/utils" "encoding/json" "errors" @@ -13,7 +14,6 @@ import ( "github.com/apex/log" "github.com/imroc/req" "os/exec" - "path" "regexp" "runtime/debug" "sort" @@ -62,35 +62,9 @@ func GetSystemInfo(nodeId string) (sysInfo entity.SystemInfo, err error) { return } -func getLangList() []entity.Lang { - list := []entity.Lang{ - { - Name: "Python", - ExecutableName: "python", - ExecutablePaths: []string{"/usr/bin/python", "/usr/local/bin/python"}, - DepExecutablePath: "/usr/local/bin/pip", - LockPath: "/tmp/install-python.lock", - }, - { - Name: "Node.js", - ExecutableName: "node", - ExecutablePaths: []string{"/usr/bin/node", "/usr/local/bin/node"}, - DepExecutablePath: "/usr/local/bin/npm", - LockPath: "/tmp/install-nodejs.lock", - }, - { - Name: "Java", - ExecutableName: "java", - ExecutablePaths: []string{"/usr/bin/java", "/usr/local/bin/java"}, - LockPath: "/tmp/install-java.lock", - }, - } - return list -} - // 获取语言列表 func GetLangList(nodeId string) []entity.Lang { - list := getLangList() + list := utils.GetLangList() for i, lang := range list { status, _ := GetLangInstallStatus(nodeId, lang) list[i].InstallStatus = status @@ -98,12 +72,6 @@ func GetLangList(nodeId string) []entity.Lang { return list } -// 获取语言列表 -func GetLangListPlain() []entity.Lang { - list := getLangList() - return list -} - // 根据语言名获取语言实例 func GetLangFromLangName(nodeId string, name string) entity.Lang { langList := GetLangList(nodeId) @@ -115,23 +83,12 @@ func GetLangFromLangName(nodeId string, name string) entity.Lang { return entity.Lang{} } -// 根据语言名获取语言实例,不包含状态 -func GetLangFromLangNamePlain(name string) entity.Lang { - langList := GetLangListPlain() - for _, lang := range langList { - if lang.ExecutableName == name { - return lang - } - } - return entity.Lang{} -} - func GetLangInstallStatus(nodeId string, lang entity.Lang) (string, error) { if IsMasterNode(nodeId) { - lang := GetLangLocal(lang) + lang := rpc.GetLangLocal(lang) return lang.InstallStatus, nil } else { - lang, err := GetLangRemote(nodeId, lang) + lang, err := rpc.GetLangRemote(nodeId, lang) if err != nil { return "", err } @@ -139,39 +96,6 @@ func GetLangInstallStatus(nodeId string, lang entity.Lang) (string, error) { } } -func GetLangLocal(lang entity.Lang) entity.Lang { - // 检查是否存在执行路径 - for _, p := range lang.ExecutablePaths { - if utils.Exists(p) { - lang.InstallStatus = constants.InstallStatusInstalled - return lang - } - } - - // 检查是否正在安装 - if utils.Exists(lang.LockPath) { - lang.InstallStatus = constants.InstallStatusInstalling - return lang - } - - // 检查其他语言是否在安装 - if utils.Exists("/tmp/install.lock") { - lang.InstallStatus = constants.InstallStatusInstallingOther - return lang - } - - lang.InstallStatus = constants.InstallStatusNotInstalled - return lang -} - -func GetLangRemote(nodeId string, lang entity.Lang) (entity.Lang, error) { - l, err := RpcClientGetLang(nodeId, lang.ExecutableName) - if err != nil { - return l, err - } - return l, nil -} - // 是否已安装该依赖 func IsInstalledLang(nodeId string, lang entity.Lang) bool { sysInfo, err := GetSystemInfo(nodeId) @@ -525,30 +449,6 @@ func UninstallPythonRemoteDep(nodeId string, depName string) (string, error) { // ========Node.js======== -// 本地安装Node.js -func InstallNodejsLocalLang() (string, error) { - cmd := exec.Command("/bin/sh", path.Join("scripts", "install-nodejs.sh")) - output, err := cmd.Output() - if err != nil { - log.Error(err.Error()) - debug.PrintStack() - return string(output), err - } - - // TODO: check if Node.js is installed successfully - - return string(output), nil -} - -// 远端安装Node.js -func InstallNodejsRemoteLang(nodeId string) (string, error) { - output, err := RpcClientInstallLang(nodeId, constants.Nodejs) - if err != nil { - return output, err - } - return output, nil -} - // 获取Nodejs本地已安装的依赖列表 func GetNodejsLocalInstalledDepList(nodeId string) ([]entity.Dependency, error) { var list []entity.Dependency @@ -675,28 +575,4 @@ func GetNodejsDepList(nodeId string, searchDepName string) (depList []entity.Dep // ========Java======== -// 本地安装Java -func InstallJavaLocalLang() (string, error) { - cmd := exec.Command("/bin/sh", path.Join("scripts", "install-java.sh")) - output, err := cmd.Output() - if err != nil { - log.Error(err.Error()) - debug.PrintStack() - return string(output), err - } - - // TODO: check if Java is installed successfully - - return string(output), nil -} - -// 远端安装Java -func InstallJavaRemoteLang(nodeId string) (string, error) { - output, err := RpcClientInstallLang(nodeId, constants.Java) - if err != nil { - return output, err - } - return output, nil -} - // ========./Java======== diff --git a/backend/utils/rpc.go b/backend/utils/rpc.go new file mode 100644 index 00000000..03414199 --- /dev/null +++ b/backend/utils/rpc.go @@ -0,0 +1,14 @@ +package utils + +import "encoding/json" + +// Object 转化为 String +func ObjectToString(params interface{}) string { + bytes, _ := json.Marshal(params) + return BytesToString(bytes) +} + +// 获取 RPC 参数 +func GetRpcParam(key string, params map[string]string) string { + return params[key] +} diff --git a/backend/utils/system.go b/backend/utils/system.go new file mode 100644 index 00000000..8ff5eb64 --- /dev/null +++ b/backend/utils/system.go @@ -0,0 +1,46 @@ +package utils + +import "crawlab/entity" + +func GetLangList() []entity.Lang { + list := []entity.Lang{ + { + Name: "Python", + ExecutableName: "python", + ExecutablePaths: []string{"/usr/bin/python", "/usr/local/bin/python"}, + DepExecutablePath: "/usr/local/bin/pip", + LockPath: "/tmp/install-python.lock", + }, + { + Name: "Node.js", + ExecutableName: "node", + ExecutablePaths: []string{"/usr/bin/node", "/usr/local/bin/node"}, + DepExecutablePath: "/usr/local/bin/npm", + LockPath: "/tmp/install-nodejs.lock", + }, + { + Name: "Java", + ExecutableName: "java", + ExecutablePaths: []string{"/usr/bin/java", "/usr/local/bin/java"}, + LockPath: "/tmp/install-java.lock", + }, + } + return list +} + +// 获取语言列表 +func GetLangListPlain() []entity.Lang { + list := GetLangList() + return list +} + +// 根据语言名获取语言实例,不包含状态 +func GetLangFromLangNamePlain(name string) entity.Lang { + langList := GetLangListPlain() + for _, lang := range langList { + if lang.ExecutableName == name { + return lang + } + } + return entity.Lang{} +} diff --git a/devops/develop/crawlab-worker.yaml b/devops/develop/crawlab-worker.yaml index 0d461f14..524e0efe 100644 --- a/devops/develop/crawlab-worker.yaml +++ b/devops/develop/crawlab-worker.yaml @@ -31,4 +31,3 @@ spec: value: "Y" - name: CRAWLAB_SERVER_REGISTER_TYPE value: "hostname" - \ No newline at end of file