From 5ae77a9d3025bb32049da4afb3201b9c4b5f0808 Mon Sep 17 00:00:00 2001 From: marvzhang Date: Tue, 10 Mar 2020 13:48:08 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E8=8E=B7=E5=8F=96=E5=B7=B2?= =?UTF-8?q?=E5=AE=89=E8=A3=85=E4=BE=9D=E8=B5=96=E5=88=97=E8=A1=A8RPC?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/conf/config.yml | 2 +- backend/routes/system.go | 49 +++----- backend/services/rpc.go | 81 -------------- backend/services/rpc/base.go | 110 +++++++++--------- backend/services/rpc/get_deps.go | 39 +++++++ backend/services/rpc/get_installed_deps.go | 123 +++++++++++++++++++++ backend/services/rpc/get_lang_deps.go | 1 - backend/services/rpc/install_lang.go | 6 +- backend/services/system.go | 39 +------ 9 files changed, 244 insertions(+), 206 deletions(-) create mode 100644 backend/services/rpc/get_deps.go create mode 100644 backend/services/rpc/get_installed_deps.go delete mode 100644 backend/services/rpc/get_lang_deps.go diff --git a/backend/conf/config.yml b/backend/conf/config.yml index f1443520..1b724c42 100644 --- a/backend/conf/config.yml +++ b/backend/conf/config.yml @@ -36,7 +36,7 @@ spider: task: workers: 4 rpc: - workers: 4 + workers: 16 other: tmppath: "/tmp" version: 0.4.7 diff --git a/backend/routes/system.go b/backend/routes/system.go index 98cdb4cf..544d9d8a 100644 --- a/backend/routes/system.go +++ b/backend/routes/system.go @@ -56,41 +56,20 @@ func GetInstalledDepList(c *gin.Context) { nodeId := c.Param("id") lang := c.Query("lang") var depList []entity.Dependency - if lang == constants.Python { - if services.IsMasterNode(nodeId) { - list, err := services.GetPythonLocalInstalledDepList(nodeId) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - depList = list - } else { - list, err := services.GetPythonRemoteInstalledDepList(nodeId) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - depList = list - } - } else if lang == constants.Nodejs { - if services.IsMasterNode(nodeId) { - list, err := services.GetNodejsLocalInstalledDepList(nodeId) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - depList = list - } else { - list, err := services.GetNodejsRemoteInstalledDepList(nodeId) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - depList = list + if services.IsMasterNode(nodeId) { + list, err := rpc.GetInstalledDepsLocal(lang) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return } + depList = list } else { - HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", lang)) - return + list, err := rpc.GetInstalledDepsRemote(nodeId, lang) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + depList = list } c.JSON(http.StatusOK, Response{ @@ -290,13 +269,13 @@ func InstallLang(c *gin.Context) { } if services.IsMasterNode(nodeId) { - _, err := rpc.InstallLocalLang(reqBody.Lang) + _, err := rpc.InstallLangLocal(reqBody.Lang) if err != nil { HandleError(http.StatusInternalServerError, c, err) return } } else { - _, err := rpc.InstallRemoteLang(nodeId, reqBody.Lang) + _, err := rpc.InstallLangRemote(nodeId, reqBody.Lang) if err != nil { HandleError(http.StatusInternalServerError, c, err) return diff --git a/backend/services/rpc.go b/backend/services/rpc.go index 263cabbd..b97a5c4f 100644 --- a/backend/services/rpc.go +++ b/backend/services/rpc.go @@ -4,14 +4,11 @@ import ( "crawlab/constants" "crawlab/database" "crawlab/entity" - "crawlab/model" "crawlab/utils" "encoding/json" "fmt" "github.com/apex/log" - "github.com/gomodule/redigo/redis" uuid "github.com/satori/go.uuid" - "github.com/spf13/viper" "runtime/debug" ) @@ -83,20 +80,6 @@ func RpcClientUninstallDep(nodeId string, lang string, depName string) (output s // ========获取已安装依赖列表======== -func RpcServerGetInstalledDepList(nodeId string, msg RpcMessage) RpcMessage { - lang := GetRpcParam("lang", msg.Params) - if lang == constants.Python { - depList, _ := GetPythonLocalInstalledDepList(nodeId) - resultStr, _ := json.Marshal(depList) - msg.Result = string(resultStr) - } else if lang == constants.Nodejs { - depList, _ := GetNodejsLocalInstalledDepList(nodeId) - resultStr, _ := json.Marshal(depList) - msg.Result = string(resultStr) - } - return msg -} - func RpcClientGetInstalledDepList(nodeId string, lang string) (list []entity.Dependency, err error) { params := map[string]string{} params["lang"] = lang @@ -167,67 +150,3 @@ func ObjectToString(params interface{}) string { bytes, _ := json.Marshal(params) return utils.BytesToString(bytes) } - -var IsRpcStopped = false - -func StopRpcService() { - IsRpcStopped = true -} - -// 初始化 RPC 服务 -func InitRpcService() error { - for i := 0; i < viper.GetInt("rpc.workers"); i++ { - go func() { - for { - // 获取当前节点 - node, err := model.GetCurrentNode() - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } - - // 获取获取消息队列信息 - dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0) - if err != nil { - if err != redis.ErrNil { - log.Errorf(err.Error()) - debug.PrintStack() - } - continue - } - - // 反序列化消息 - var msg RpcMessage - if err := json.Unmarshal([]byte(dataStr), &msg); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } - - // 根据Method调用本地方法 - var replyMsg RpcMessage - if msg.Method == constants.RpcInstallDep { - replyMsg = RpcServerInstallDep(msg) - } else if msg.Method == constants.RpcUninstallDep { - replyMsg = RpcServerUninstallDep(msg) - } else { - continue - } - - // 发送返回消息 - if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", node.Id.Hex()), ObjectToString(replyMsg)); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } - - // 如果停止RPC服务,则返回 - if IsRpcStopped { - return - } - } - }() - } - return nil -} diff --git a/backend/services/rpc/base.go b/backend/services/rpc/base.go index 143f8048..b93fb19e 100644 --- a/backend/services/rpc/base.go +++ b/backend/services/rpc/base.go @@ -11,15 +11,16 @@ import ( "github.com/apex/log" "github.com/gomodule/redigo/redis" uuid "github.com/satori/go.uuid" - "github.com/spf13/viper" "runtime/debug" ) +// RPC服务基础类 type Service interface { ServerHandle() (entity.RpcMessage, error) ClientHandle() (interface{}, error) } +// 客户端处理消息函数 func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) { return func() (replyMsg entity.RpcMessage, err error) { // 请求ID @@ -52,63 +53,72 @@ func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) { } } +// 获取RPC服务 func GetService(msg entity.RpcMessage) Service { - if msg.Method == constants.RpcInstallLang { + switch msg.Method { + case constants.RpcInstallLang: return &InstallLangService{msg: msg} - } else if msg.Method == constants.RpcGetLang { + case constants.RpcGetLang: return &GetLangService{msg: msg} + case constants.RpcGetDepList: + return &GetDepsService{msg: msg} + case constants.RpcGetInstalledDepList: + return &GetInstalledDepsService{msg: msg} } return nil } +// 处理RPC消息 +func handleMsg(msgStr string, node model.Node) { + // 反序列化消息 + var msg entity.RpcMessage + if err := json.Unmarshal([]byte(msgStr), &msg); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + } + + // 获取service + service := GetService(msg) + + // 根据Method调用本地方法 + replyMsg, err := service.ServerHandle() + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + } + + // 发送返回消息 + if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + } +} + +// 初始化服务端RPC服务 func InitRpcService() error { - for i := 0; i < viper.GetInt("rpc.workers"); i++ { - go func() { - for { - // 获取当前节点 - node, err := model.GetCurrentNode() - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } - - // 获取获取消息队列信息 - dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0) - if err != nil { - if err != redis.ErrNil { - log.Errorf(err.Error()) - debug.PrintStack() - } - continue - } - - // 反序列化消息 - var msg entity.RpcMessage - if err := json.Unmarshal([]byte(dataStr), &msg); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } - - // 获取service - service := GetService(msg) - - // 根据Method调用本地方法 - replyMsg, err := service.ServerHandle() - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - } - - // 发送返回消息 - if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } + go func() { + for { + // 获取当前节点 + node, err := model.GetCurrentNode() + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue } - }() - } + + // 获取获取消息队列信息 + msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0) + if err != nil { + if err != redis.ErrNil { + log.Errorf(err.Error()) + debug.PrintStack() + } + continue + } + + // 处理消息 + go handleMsg(msgStr, node) + } + }() return nil } diff --git a/backend/services/rpc/get_deps.go b/backend/services/rpc/get_deps.go new file mode 100644 index 00000000..737af578 --- /dev/null +++ b/backend/services/rpc/get_deps.go @@ -0,0 +1,39 @@ +package rpc + +import ( + "crawlab/entity" + "crawlab/utils" + "encoding/json" +) + +type GetDepsService struct { + msg entity.RpcMessage +} + +func (s *GetDepsService) ServerHandle() (entity.RpcMessage, error) { + langName := utils.GetRpcParam("lang", s.msg.Params) + lang := utils.GetLangFromLangNamePlain(langName) + l := GetLangLocal(lang) + lang.InstallStatus = l.InstallStatus + + // 序列化 + resultStr, _ := json.Marshal(lang) + s.msg.Result = string(resultStr) + return s.msg, nil +} + +func (s *GetDepsService) ClientHandle() (o interface{}, err error) { + // 发起 RPC 请求,获取服务端数据 + s.msg, err = ClientFunc(s.msg)() + if err != nil { + return o, err + } + + var output entity.Lang + if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil { + return o, err + } + o = output + + return +} diff --git a/backend/services/rpc/get_installed_deps.go b/backend/services/rpc/get_installed_deps.go new file mode 100644 index 00000000..9017fa64 --- /dev/null +++ b/backend/services/rpc/get_installed_deps.go @@ -0,0 +1,123 @@ +package rpc + +import ( + "crawlab/constants" + "crawlab/entity" + "crawlab/utils" + "encoding/json" + "os/exec" + "regexp" + "runtime/debug" + "strings" +) + +type GetInstalledDepsService struct { + msg entity.RpcMessage +} + +func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) { + lang := utils.GetRpcParam("lang", s.msg.Params) + deps, err := GetInstalledDepsLocal(lang) + if err != nil { + s.msg.Error = err.Error() + return s.msg, err + } + resultStr, _ := json.Marshal(deps) + s.msg.Result = string(resultStr) + return s.msg, nil +} + +func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) { + // 发起 RPC 请求,获取服务端数据 + s.msg, err = ClientFunc(s.msg)() + if err != nil { + return o, err + } + + // 反序列化 + var output []entity.Dependency + if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil { + return o, err + } + o = output + + return +} + +// 获取本地已安装依赖列表 +func GetInstalledDepsLocal(lang string) (deps []entity.Dependency, err error) { + if lang == constants.Python { + deps, err = GetPythonInstalledDepListLocal() + } else if lang == constants.Nodejs { + deps, err = GetNodejsInstalledDepListLocal() + } + return deps, err +} + +// 获取Python本地已安装依赖列表 +func GetPythonInstalledDepListLocal() ([]entity.Dependency, error) { + var list []entity.Dependency + + cmd := exec.Command("pip", "freeze") + outputBytes, err := cmd.Output() + if err != nil { + debug.PrintStack() + return list, err + } + + for _, line := range strings.Split(string(outputBytes), "\n") { + arr := strings.Split(line, "==") + if len(arr) < 2 { + continue + } + dep := entity.Dependency{ + Name: strings.ToLower(arr[0]), + Version: arr[1], + Installed: true, + } + list = append(list, dep) + } + + return list, nil +} + +// 获取Node.js本地已安装依赖列表 +func GetNodejsInstalledDepListLocal() ([]entity.Dependency, error) { + var list []entity.Dependency + + cmd := exec.Command("npm", "ls", "-g", "--depth", "0") + outputBytes, _ := cmd.Output() + + regex := regexp.MustCompile("\\s(.*)@(.*)") + for _, line := range strings.Split(string(outputBytes), "\n") { + arr := regex.FindStringSubmatch(line) + if len(arr) < 3 { + continue + } + dep := entity.Dependency{ + Name: strings.ToLower(arr[1]), + Version: arr[2], + Installed: true, + } + list = append(list, dep) + } + + return list, nil +} + +func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) { + params := make(map[string]string) + params["lang"] = lang + s := GetService(entity.RpcMessage{ + NodeId: nodeId, + Method: constants.RpcGetInstalledDepList, + Params: params, + Timeout: 60, + }) + o, err := s.ClientHandle() + if err != nil { + return + } + deps = o.([]entity.Dependency) + return +} diff --git a/backend/services/rpc/get_lang_deps.go b/backend/services/rpc/get_lang_deps.go deleted file mode 100644 index 9ab1e3e8..00000000 --- a/backend/services/rpc/get_lang_deps.go +++ /dev/null @@ -1 +0,0 @@ -package rpc diff --git a/backend/services/rpc/install_lang.go b/backend/services/rpc/install_lang.go index 725113b8..7667b39c 100644 --- a/backend/services/rpc/install_lang.go +++ b/backend/services/rpc/install_lang.go @@ -18,7 +18,7 @@ type InstallLangService struct { func (s *InstallLangService) ServerHandle() (entity.RpcMessage, error) { lang := utils.GetRpcParam("lang", s.msg.Params) - output, err := InstallLocalLang(lang) + output, err := InstallLangLocal(lang) s.msg.Result = output if err != nil { s.msg.Error = err.Error() @@ -70,7 +70,7 @@ func InstallJavaLocalLang() (string, error) { } // 本地安装语言 -func InstallLocalLang(lang string) (o string, err error) { +func InstallLangLocal(lang string) (o string, err error) { if lang == constants.Nodejs { o, err = InstallNodejsLocalLang() } else if lang == constants.Java { @@ -82,7 +82,7 @@ func InstallLocalLang(lang string) (o string, err error) { } // 远端安装语言 -func InstallRemoteLang(nodeId string, lang string) (o string, err error) { +func InstallLangRemote(nodeId string, lang string) (o string, err error) { params := make(map[string]string) params["lang"] = lang s := GetService(entity.RpcMessage{ diff --git a/backend/services/system.go b/backend/services/system.go index 4c305861..a7182eb9 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -191,10 +191,10 @@ func GetPythonDepList(nodeId string, searchDepName string) ([]entity.Dependency, // 获取已安装依赖列表 var installedDepList []entity.Dependency if IsMasterNode(nodeId) { - installedDepList, err = GetPythonLocalInstalledDepList(nodeId) - if err != nil { - return list, err - } + //installedDepList, err = GetPythonLocalInstalledDepList(nodeId) + //if err != nil { + // return list, err + //} } else { installedDepList, err = GetPythonRemoteInstalledDepList(nodeId) if err != nil { @@ -359,37 +359,6 @@ func UpdatePythonDepList() { } } -// 获取Python本地已安装的依赖列表 -func GetPythonLocalInstalledDepList(nodeId string) ([]entity.Dependency, error) { - var list []entity.Dependency - - lang := GetLangFromLangName(nodeId, constants.Python) - if !IsInstalledLang(nodeId, lang) { - return list, errors.New("python is not installed") - } - cmd := exec.Command("pip", "freeze") - outputBytes, err := cmd.Output() - if err != nil { - debug.PrintStack() - return list, err - } - - for _, line := range strings.Split(string(outputBytes), "\n") { - arr := strings.Split(line, "==") - if len(arr) < 2 { - continue - } - dep := entity.Dependency{ - Name: strings.ToLower(arr[0]), - Version: arr[1], - Installed: true, - } - list = append(list, dep) - } - - return list, nil -} - // 获取Python远端依赖列表 func GetPythonRemoteInstalledDepList(nodeId string) ([]entity.Dependency, error) { depList, err := RpcClientGetInstalledDepList(nodeId, constants.Python)