重构获取已安装依赖列表RPC逻辑

This commit is contained in:
marvzhang
2020-03-10 13:48:08 +08:00
parent e5b4ac6310
commit 5ae77a9d30
9 changed files with 244 additions and 206 deletions

View File

@@ -36,7 +36,7 @@ spider:
task:
workers: 4
rpc:
workers: 4
workers: 16
other:
tmppath: "/tmp"
version: 0.4.7

View File

@@ -56,41 +56,20 @@ func GetInstalledDepList(c *gin.Context) {
nodeId := c.Param("id")
lang := c.Query("lang")
var depList []entity.Dependency
if lang == constants.Python {
if services.IsMasterNode(nodeId) {
list, err := services.GetPythonLocalInstalledDepList(nodeId)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
depList = list
} else {
list, err := services.GetPythonRemoteInstalledDepList(nodeId)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
depList = list
}
} else if lang == constants.Nodejs {
if services.IsMasterNode(nodeId) {
list, err := services.GetNodejsLocalInstalledDepList(nodeId)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
depList = list
} else {
list, err := services.GetNodejsRemoteInstalledDepList(nodeId)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
depList = list
if services.IsMasterNode(nodeId) {
list, err := rpc.GetInstalledDepsLocal(lang)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
depList = list
} else {
HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", lang))
return
list, err := rpc.GetInstalledDepsRemote(nodeId, lang)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
depList = list
}
c.JSON(http.StatusOK, Response{
@@ -290,13 +269,13 @@ func InstallLang(c *gin.Context) {
}
if services.IsMasterNode(nodeId) {
_, err := rpc.InstallLocalLang(reqBody.Lang)
_, err := rpc.InstallLangLocal(reqBody.Lang)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else {
_, err := rpc.InstallRemoteLang(nodeId, reqBody.Lang)
_, err := rpc.InstallLangRemote(nodeId, reqBody.Lang)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return

View File

@@ -4,14 +4,11 @@ import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"fmt"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"runtime/debug"
)
@@ -83,20 +80,6 @@ func RpcClientUninstallDep(nodeId string, lang string, depName string) (output s
// ========获取已安装依赖列表========
func RpcServerGetInstalledDepList(nodeId string, msg RpcMessage) RpcMessage {
lang := GetRpcParam("lang", msg.Params)
if lang == constants.Python {
depList, _ := GetPythonLocalInstalledDepList(nodeId)
resultStr, _ := json.Marshal(depList)
msg.Result = string(resultStr)
} else if lang == constants.Nodejs {
depList, _ := GetNodejsLocalInstalledDepList(nodeId)
resultStr, _ := json.Marshal(depList)
msg.Result = string(resultStr)
}
return msg
}
func RpcClientGetInstalledDepList(nodeId string, lang string) (list []entity.Dependency, err error) {
params := map[string]string{}
params["lang"] = lang
@@ -167,67 +150,3 @@ func ObjectToString(params interface{}) string {
bytes, _ := json.Marshal(params)
return utils.BytesToString(bytes)
}
var IsRpcStopped = false
func StopRpcService() {
IsRpcStopped = true
}
// 初始化 RPC 服务
func InitRpcService() error {
for i := 0; i < viper.GetInt("rpc.workers"); i++ {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取获取消息队列信息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 反序列化消息
var msg RpcMessage
if err := json.Unmarshal([]byte(dataStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 根据Method调用本地方法
var replyMsg RpcMessage
if msg.Method == constants.RpcInstallDep {
replyMsg = RpcServerInstallDep(msg)
} else if msg.Method == constants.RpcUninstallDep {
replyMsg = RpcServerUninstallDep(msg)
} else {
continue
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", node.Id.Hex()), ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 如果停止RPC服务则返回
if IsRpcStopped {
return
}
}
}()
}
return nil
}

View File

@@ -11,15 +11,16 @@ import (
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"runtime/debug"
)
// RPC服务基础类
type Service interface {
ServerHandle() (entity.RpcMessage, error)
ClientHandle() (interface{}, error)
}
// 客户端处理消息函数
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
return func() (replyMsg entity.RpcMessage, err error) {
// 请求ID
@@ -52,63 +53,72 @@ func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
}
}
// 获取RPC服务
func GetService(msg entity.RpcMessage) Service {
if msg.Method == constants.RpcInstallLang {
switch msg.Method {
case constants.RpcInstallLang:
return &InstallLangService{msg: msg}
} else if msg.Method == constants.RpcGetLang {
case constants.RpcGetLang:
return &GetLangService{msg: msg}
case constants.RpcGetDepList:
return &GetDepsService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
}
return nil
}
// 处理RPC消息
func handleMsg(msgStr string, node model.Node) {
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 获取service
service := GetService(msg)
// 根据Method调用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}
// 初始化服务端RPC服务
func InitRpcService() error {
for i := 0; i < viper.GetInt("rpc.workers"); i++ {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取获取消息队列信息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(dataStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取service
service := GetService(msg)
// 根据Method调用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
}()
}
// 获取获取消息队列信息
msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 处理消息
go handleMsg(msgStr, node)
}
}()
return nil
}

View File

@@ -0,0 +1,39 @@
package rpc
import (
"crawlab/entity"
"crawlab/utils"
"encoding/json"
)
type GetDepsService struct {
msg entity.RpcMessage
}
func (s *GetDepsService) ServerHandle() (entity.RpcMessage, error) {
langName := utils.GetRpcParam("lang", s.msg.Params)
lang := utils.GetLangFromLangNamePlain(langName)
l := GetLangLocal(lang)
lang.InstallStatus = l.InstallStatus
// 序列化
resultStr, _ := json.Marshal(lang)
s.msg.Result = string(resultStr)
return s.msg, nil
}
func (s *GetDepsService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
var output entity.Lang
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}

View File

@@ -0,0 +1,123 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/utils"
"encoding/json"
"os/exec"
"regexp"
"runtime/debug"
"strings"
)
type GetInstalledDepsService struct {
msg entity.RpcMessage
}
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
deps, err := GetInstalledDepsLocal(lang)
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
resultStr, _ := json.Marshal(deps)
s.msg.Result = string(resultStr)
return s.msg, nil
}
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
// 反序列化
var output []entity.Dependency
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}
// 获取本地已安装依赖列表
func GetInstalledDepsLocal(lang string) (deps []entity.Dependency, err error) {
if lang == constants.Python {
deps, err = GetPythonInstalledDepListLocal()
} else if lang == constants.Nodejs {
deps, err = GetNodejsInstalledDepListLocal()
}
return deps, err
}
// 获取Python本地已安装依赖列表
func GetPythonInstalledDepListLocal() ([]entity.Dependency, error) {
var list []entity.Dependency
cmd := exec.Command("pip", "freeze")
outputBytes, err := cmd.Output()
if err != nil {
debug.PrintStack()
return list, err
}
for _, line := range strings.Split(string(outputBytes), "\n") {
arr := strings.Split(line, "==")
if len(arr) < 2 {
continue
}
dep := entity.Dependency{
Name: strings.ToLower(arr[0]),
Version: arr[1],
Installed: true,
}
list = append(list, dep)
}
return list, nil
}
// 获取Node.js本地已安装依赖列表
func GetNodejsInstalledDepListLocal() ([]entity.Dependency, error) {
var list []entity.Dependency
cmd := exec.Command("npm", "ls", "-g", "--depth", "0")
outputBytes, _ := cmd.Output()
regex := regexp.MustCompile("\\s(.*)@(.*)")
for _, line := range strings.Split(string(outputBytes), "\n") {
arr := regex.FindStringSubmatch(line)
if len(arr) < 3 {
continue
}
dep := entity.Dependency{
Name: strings.ToLower(arr[1]),
Version: arr[2],
Installed: true,
}
list = append(list, dep)
}
return list, nil
}
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetInstalledDepList,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
deps = o.([]entity.Dependency)
return
}

View File

@@ -1 +0,0 @@
package rpc

View File

@@ -18,7 +18,7 @@ type InstallLangService struct {
func (s *InstallLangService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
output, err := InstallLocalLang(lang)
output, err := InstallLangLocal(lang)
s.msg.Result = output
if err != nil {
s.msg.Error = err.Error()
@@ -70,7 +70,7 @@ func InstallJavaLocalLang() (string, error) {
}
// 本地安装语言
func InstallLocalLang(lang string) (o string, err error) {
func InstallLangLocal(lang string) (o string, err error) {
if lang == constants.Nodejs {
o, err = InstallNodejsLocalLang()
} else if lang == constants.Java {
@@ -82,7 +82,7 @@ func InstallLocalLang(lang string) (o string, err error) {
}
// 远端安装语言
func InstallRemoteLang(nodeId string, lang string) (o string, err error) {
func InstallLangRemote(nodeId string, lang string) (o string, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{

View File

@@ -191,10 +191,10 @@ func GetPythonDepList(nodeId string, searchDepName string) ([]entity.Dependency,
// 获取已安装依赖列表
var installedDepList []entity.Dependency
if IsMasterNode(nodeId) {
installedDepList, err = GetPythonLocalInstalledDepList(nodeId)
if err != nil {
return list, err
}
//installedDepList, err = GetPythonLocalInstalledDepList(nodeId)
//if err != nil {
// return list, err
//}
} else {
installedDepList, err = GetPythonRemoteInstalledDepList(nodeId)
if err != nil {
@@ -359,37 +359,6 @@ func UpdatePythonDepList() {
}
}
// 获取Python本地已安装的依赖列表
func GetPythonLocalInstalledDepList(nodeId string) ([]entity.Dependency, error) {
var list []entity.Dependency
lang := GetLangFromLangName(nodeId, constants.Python)
if !IsInstalledLang(nodeId, lang) {
return list, errors.New("python is not installed")
}
cmd := exec.Command("pip", "freeze")
outputBytes, err := cmd.Output()
if err != nil {
debug.PrintStack()
return list, err
}
for _, line := range strings.Split(string(outputBytes), "\n") {
arr := strings.Split(line, "==")
if len(arr) < 2 {
continue
}
dep := entity.Dependency{
Name: strings.ToLower(arr[0]),
Version: arr[1],
Installed: true,
}
list = append(list, dep)
}
return list, nil
}
// 获取Python远端依赖列表
func GetPythonRemoteInstalledDepList(nodeId string) ([]entity.Dependency, error) {
depList, err := RpcClientGetInstalledDepList(nodeId, constants.Python)