重构安装卸载依赖RPC逻辑

This commit is contained in:
marvzhang
2020-03-10 14:54:28 +08:00
parent 5ae77a9d30
commit 987d2224d9
8 changed files with 221 additions and 436 deletions

View File

@@ -4,7 +4,6 @@ const (
RpcInstallLang = "install_lang"
RpcInstallDep = "install_dep"
RpcUninstallDep = "uninstall_dep"
RpcGetDepList = "get_dep_list"
RpcGetInstalledDepList = "get_installed_dep_list"
RpcGetLang = "get_lang"
)

View File

@@ -135,41 +135,18 @@ func InstallDep(c *gin.Context) {
return
}
if reqBody.Lang == constants.Python {
if services.IsMasterNode(nodeId) {
_, err := services.InstallPythonLocalDep(reqBody.DepName)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else {
_, err := services.InstallPythonRemoteDep(nodeId, reqBody.DepName)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
} else if reqBody.Lang == constants.Nodejs {
if services.IsMasterNode(nodeId) {
_, err := services.InstallNodejsLocalDep(reqBody.DepName)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else {
_, err := services.InstallNodejsRemoteDep(nodeId, reqBody.DepName)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
if services.IsMasterNode(nodeId) {
if err := rpc.InstallDepLocal(reqBody.Lang, reqBody.DepName); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else {
HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", reqBody.Lang))
return
if err := rpc.InstallDepRemote(nodeId, reqBody.Lang, reqBody.DepName); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
// TODO: check if install is successful
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
@@ -189,41 +166,18 @@ func UninstallDep(c *gin.Context) {
HandleError(http.StatusBadRequest, c, err)
}
if reqBody.Lang == constants.Python {
if services.IsMasterNode(nodeId) {
_, err := services.UninstallPythonLocalDep(reqBody.DepName)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else {
_, err := services.UninstallPythonRemoteDep(nodeId, reqBody.DepName)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
} else if reqBody.Lang == constants.Nodejs {
if services.IsMasterNode(nodeId) {
_, err := services.UninstallNodejsLocalDep(reqBody.DepName)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else {
_, err := services.UninstallNodejsRemoteDep(nodeId, reqBody.DepName)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
if services.IsMasterNode(nodeId) {
if err := rpc.UninstallDepLocal(reqBody.Lang, reqBody.DepName); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else {
HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", reqBody.Lang))
return
if err := rpc.UninstallDepRemote(nodeId, reqBody.Lang, reqBody.DepName); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
// TODO: check if uninstall is successful
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",

View File

@@ -1,152 +0,0 @@
package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/utils"
"encoding/json"
"fmt"
"github.com/apex/log"
uuid "github.com/satori/go.uuid"
"runtime/debug"
)
type RpcMessage struct {
Id string `json:"id"`
Method string `json:"method"`
Blocked bool `json:"blocked"`
Params map[string]string `json:"params"`
Result string `json:"result"`
}
// ========安装依赖========
func RpcServerInstallDep(msg RpcMessage) RpcMessage {
lang := GetRpcParam("lang", msg.Params)
depName := GetRpcParam("dep_name", msg.Params)
if lang == constants.Python {
output, _ := InstallPythonLocalDep(depName)
msg.Result = output
}
return msg
}
func RpcClientInstallDep(nodeId string, lang string, depName string) (output string, err error) {
params := map[string]string{}
params["lang"] = lang
params["dep_name"] = depName
data, err := RpcClientFunc(nodeId, constants.RpcInstallDep, params, 60)()
if err != nil {
return
}
output = data
return
}
// ========./安装依赖========
// ========卸载依赖========
func RpcServerUninstallDep(msg RpcMessage) RpcMessage {
lang := GetRpcParam("lang", msg.Params)
depName := GetRpcParam("dep_name", msg.Params)
if lang == constants.Python {
output, _ := UninstallPythonLocalDep(depName)
msg.Result = output
}
return msg
}
func RpcClientUninstallDep(nodeId string, lang string, depName string) (output string, err error) {
params := map[string]string{}
params["lang"] = lang
params["dep_name"] = depName
data, err := RpcClientFunc(nodeId, constants.RpcUninstallDep, params, 60)()
if err != nil {
return
}
output = data
return
}
// ========./卸载依赖========
// ========获取已安装依赖列表========
func RpcClientGetInstalledDepList(nodeId string, lang string) (list []entity.Dependency, err error) {
params := map[string]string{}
params["lang"] = lang
data, err := RpcClientFunc(nodeId, constants.RpcGetInstalledDepList, params, 30)()
if err != nil {
return
}
// 反序列化结果
if err := json.Unmarshal([]byte(data), &list); err != nil {
return list, err
}
return
}
// ========./获取已安装依赖列表========
// RPC 客户端函数
func RpcClientFunc(nodeId string, method string, params map[string]string, timeout int) func() (string, error) {
return func() (result string, err error) {
// 请求ID
id := uuid.NewV4().String()
// 构造RPC消息
msg := RpcMessage{
Id: id,
Method: method,
Params: params,
Result: "",
}
// 发送RPC消息
msgStr := ObjectToString(msg)
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", nodeId), msgStr); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return result, err
}
// 获取RPC回复消息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", nodeId), timeout)
if err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return result, err
}
// 反序列化消息
if err := json.Unmarshal([]byte(dataStr), &msg); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return result, err
}
return msg.Result, nil
}
}
// 获取 RPC 参数
func GetRpcParam(key string, params map[string]string) string {
return params[key]
}
// Object 转化为 String
func ObjectToString(params interface{}) string {
bytes, _ := json.Marshal(params)
return utils.BytesToString(bytes)
}

View File

@@ -58,10 +58,12 @@ func GetService(msg entity.RpcMessage) Service {
switch msg.Method {
case constants.RpcInstallLang:
return &InstallLangService{msg: msg}
case constants.RpcInstallDep:
return &InstallDepService{msg: msg}
case constants.RpcUninstallDep:
return &UninstallDepService{msg: msg}
case constants.RpcGetLang:
return &GetLangService{msg: msg}
case constants.RpcGetDepList:
return &GetDepsService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
}

View File

@@ -1,39 +0,0 @@
package rpc
import (
"crawlab/entity"
"crawlab/utils"
"encoding/json"
)
type GetDepsService struct {
msg entity.RpcMessage
}
func (s *GetDepsService) ServerHandle() (entity.RpcMessage, error) {
langName := utils.GetRpcParam("lang", s.msg.Params)
lang := utils.GetLangFromLangNamePlain(langName)
l := GetLangLocal(lang)
lang.InstallStatus = l.InstallStatus
// 序列化
resultStr, _ := json.Marshal(lang)
s.msg.Result = string(resultStr)
return s.msg, nil
}
func (s *GetDepsService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
var output entity.Lang
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}

View File

@@ -0,0 +1,99 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/utils"
"errors"
"fmt"
"github.com/apex/log"
"os/exec"
"runtime/debug"
)
type InstallDepService struct {
msg entity.RpcMessage
}
func (s *InstallDepService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
depName := utils.GetRpcParam("dep_name", s.msg.Params)
if err := InstallDepLocal(lang, depName); err != nil {
return entity.RpcMessage{}, err
}
s.msg.Result = "success"
return s.msg, nil
}
func (s *InstallDepService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
_, err = ClientFunc(s.msg)()
if err != nil {
return
}
return
}
func InstallDepLocal(lang string, depName string) error {
if lang == constants.Python {
_, err := InstallPythonDepLocal(depName)
if err != nil {
return err
}
} else if lang == constants.Nodejs {
_, err := InstallNodejsDepLocal(depName)
if err != nil {
return err
}
} else {
return errors.New(fmt.Sprintf("%s is not implemented", lang))
}
return nil
}
// 安装Python本地依赖
func InstallPythonDepLocal(depName string) (string, error) {
// 依赖镜像URL
url := "https://pypi.tuna.tsinghua.edu.cn/simple"
cmd := exec.Command("pip", "install", depName, "-i", url)
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
func InstallNodejsDepLocal(depName string) (string, error) {
// 依赖镜像URL
url := "https://registry.npm.taobao.org"
cmd := exec.Command("npm", "install", depName, "-g", "--registry", url)
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
func InstallDepRemote(nodeId string, lang string, depName string) (err error) {
params := make(map[string]string)
params["lang"] = lang
params["dep_name"] = depName
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcInstallDep,
Params: params,
Timeout: 300,
})
_, err = s.ClientHandle()
if err != nil {
return
}
return
}

View File

@@ -0,0 +1,95 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/utils"
"errors"
"fmt"
"github.com/apex/log"
"os/exec"
"runtime/debug"
)
type UninstallDepService struct {
msg entity.RpcMessage
}
func (s *UninstallDepService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
depName := utils.GetRpcParam("dep_name", s.msg.Params)
if err := UninstallDepLocal(lang, depName); err != nil {
return entity.RpcMessage{}, err
}
s.msg.Result = "success"
return s.msg, nil
}
func (s *UninstallDepService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
_, err = ClientFunc(s.msg)()
if err != nil {
return
}
return
}
func UninstallDepLocal(lang string, depName string) error {
if lang == constants.Python {
output, err := UninstallPythonDepLocal(depName)
if err != nil {
log.Debugf(output)
return err
}
} else if lang == constants.Nodejs {
output, err := UninstallNodejsDepLocal(depName)
if err != nil {
log.Debugf(output)
return err
}
} else {
return errors.New(fmt.Sprintf("%s is not implemented", lang))
}
return nil
}
func UninstallPythonDepLocal(depName string) (string, error) {
cmd := exec.Command("pip", "uninstall", "-y", depName)
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(string(outputBytes))
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
func UninstallNodejsDepLocal(depName string) (string, error) {
cmd := exec.Command("npm", "uninstall", depName, "-g")
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
func UninstallDepRemote(nodeId string, lang string, depName string) (err error) {
params := make(map[string]string)
params["lang"] = lang
params["dep_name"] = depName
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcUninstallDep,
Params: params,
Timeout: 300,
})
_, err = s.ClientHandle()
if err != nil {
return
}
return
}

View File

@@ -72,17 +72,6 @@ func GetLangList(nodeId string) []entity.Lang {
return list
}
// 根据语言名获取语言实例
func GetLangFromLangName(nodeId string, name string) entity.Lang {
langList := GetLangList(nodeId)
for _, lang := range langList {
if lang.ExecutableName == name {
return lang
}
}
return entity.Lang{}
}
func GetLangInstallStatus(nodeId string, lang entity.Lang) (string, error) {
if IsMasterNode(nodeId) {
lang := rpc.GetLangLocal(lang)
@@ -96,22 +85,6 @@ func GetLangInstallStatus(nodeId string, lang entity.Lang) (string, error) {
}
}
// 是否已安装该依赖
func IsInstalledLang(nodeId string, lang entity.Lang) bool {
sysInfo, err := GetSystemInfo(nodeId)
if err != nil {
return false
}
for _, exec := range sysInfo.Executables {
for _, path := range lang.ExecutablePaths {
if exec.Path == path {
return true
}
}
}
return false
}
// 是否已安装该依赖
func IsInstalledDep(installedDepList []entity.Dependency, dep entity.Dependency) bool {
for _, _dep := range installedDepList {
@@ -191,12 +164,12 @@ func GetPythonDepList(nodeId string, searchDepName string) ([]entity.Dependency,
// 获取已安装依赖列表
var installedDepList []entity.Dependency
if IsMasterNode(nodeId) {
//installedDepList, err = GetPythonLocalInstalledDepList(nodeId)
//if err != nil {
// return list, err
//}
installedDepList, err = rpc.GetInstalledDepsLocal(constants.Python)
if err != nil {
return list, err
}
} else {
installedDepList, err = GetPythonRemoteInstalledDepList(nodeId)
installedDepList, err = rpc.GetInstalledDepsRemote(nodeId, constants.Python)
if err != nil {
return list, err
}
@@ -359,152 +332,10 @@ func UpdatePythonDepList() {
}
}
// 获取Python远端依赖列表
func GetPythonRemoteInstalledDepList(nodeId string) ([]entity.Dependency, error) {
depList, err := RpcClientGetInstalledDepList(nodeId, constants.Python)
if err != nil {
return depList, err
}
return depList, nil
}
// 安装Python本地依赖
func InstallPythonLocalDep(depName string) (string, error) {
// 依赖镜像URL
url := "https://pypi.tuna.tsinghua.edu.cn/simple"
cmd := exec.Command("pip", "install", depName, "-i", url)
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
// 获取Python远端依赖列表
func InstallPythonRemoteDep(nodeId string, depName string) (string, error) {
output, err := RpcClientInstallDep(nodeId, constants.Python, depName)
if err != nil {
return output, err
}
return output, nil
}
// 安装Python本地依赖
func UninstallPythonLocalDep(depName string) (string, error) {
cmd := exec.Command("pip", "uninstall", "-y", depName)
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(string(outputBytes))
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
// 获取Python远端依赖列表
func UninstallPythonRemoteDep(nodeId string, depName string) (string, error) {
output, err := RpcClientUninstallDep(nodeId, constants.Python, depName)
if err != nil {
return output, err
}
return output, nil
}
// ========./Python========
// ========Node.js========
// 获取Nodejs本地已安装的依赖列表
func GetNodejsLocalInstalledDepList(nodeId string) ([]entity.Dependency, error) {
var list []entity.Dependency
lang := GetLangFromLangName(nodeId, constants.Nodejs)
if !IsInstalledLang(nodeId, lang) {
return list, errors.New("nodejs is not installed")
}
cmd := exec.Command("npm", "ls", "-g", "--depth", "0")
outputBytes, _ := cmd.Output()
//if err != nil {
// log.Error("error: " + string(outputBytes))
// debug.PrintStack()
// return list, err
//}
regex := regexp.MustCompile("\\s(.*)@(.*)")
for _, line := range strings.Split(string(outputBytes), "\n") {
arr := regex.FindStringSubmatch(line)
if len(arr) < 3 {
continue
}
dep := entity.Dependency{
Name: strings.ToLower(arr[1]),
Version: arr[2],
Installed: true,
}
list = append(list, dep)
}
return list, nil
}
// 获取Nodejs远端依赖列表
func GetNodejsRemoteInstalledDepList(nodeId string) ([]entity.Dependency, error) {
depList, err := RpcClientGetInstalledDepList(nodeId, constants.Nodejs)
if err != nil {
return depList, err
}
return depList, nil
}
// 安装Nodejs本地依赖
func InstallNodejsLocalDep(depName string) (string, error) {
// 依赖镜像URL
url := "https://registry.npm.taobao.org"
cmd := exec.Command("npm", "install", depName, "-g", "--registry", url)
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
// 获取Nodejs远端依赖列表
func InstallNodejsRemoteDep(nodeId string, depName string) (string, error) {
output, err := RpcClientInstallDep(nodeId, constants.Nodejs, depName)
if err != nil {
return output, err
}
return output, nil
}
// 安装Nodejs本地依赖
func UninstallNodejsLocalDep(depName string) (string, error) {
cmd := exec.Command("npm", "uninstall", depName, "-g")
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
// 获取Nodejs远端依赖列表
func UninstallNodejsRemoteDep(nodeId string, depName string) (string, error) {
output, err := RpcClientUninstallDep(nodeId, constants.Nodejs, depName)
if err != nil {
return output, err
}
return output, nil
}
// 获取Nodejs本地依赖列表
func GetNodejsDepList(nodeId string, searchDepName string) (depList []entity.Dependency, err error) {
// 执行shell命令
@@ -514,12 +345,12 @@ func GetNodejsDepList(nodeId string, searchDepName string) (depList []entity.Dep
// 获取已安装依赖列表
var installedDepList []entity.Dependency
if IsMasterNode(nodeId) {
installedDepList, err = GetNodejsLocalInstalledDepList(nodeId)
installedDepList, err = rpc.GetInstalledDepsLocal(constants.Nodejs)
if err != nil {
return depList, err
}
} else {
installedDepList, err = GetNodejsRemoteInstalledDepList(nodeId)
installedDepList, err = rpc.GetInstalledDepsRemote(nodeId, constants.Nodejs)
if err != nil {
return depList, err
}
@@ -541,7 +372,3 @@ func GetNodejsDepList(nodeId string, searchDepName string) (depList []entity.Dep
}
// ========./Node.js========
// ========Java========
// ========./Java========