From 152e94da01081e7c081993358d49cf14af044dc2 Mon Sep 17 00:00:00 2001 From: marvzhang Date: Thu, 2 Jan 2020 15:53:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5RPC=E6=9C=BA=E5=88=B6?= =?UTF-8?q?=EF=BC=8C=E5=AE=8C=E6=88=90=E9=83=A8=E5=88=86=E4=BE=9D=E8=B5=96?= =?UTF-8?q?=E5=AE=89=E8=A3=85=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/constants/rpc.go | 8 ++ backend/database/redis.go | 29 ++++- backend/main.go | 11 +- backend/routes/system.go | 78 ++++++++++++-- backend/services/rpc.go | 214 +++++++++++++++++++++++++++++++++++++ backend/services/system.go | 106 +++++++++++++----- 6 files changed, 408 insertions(+), 38 deletions(-) create mode 100644 backend/constants/rpc.go create mode 100644 backend/services/rpc.go diff --git a/backend/constants/rpc.go b/backend/constants/rpc.go new file mode 100644 index 00000000..117ed5c8 --- /dev/null +++ b/backend/constants/rpc.go @@ -0,0 +1,8 @@ +package constants + +const ( + RpcInstallDep = "install_dep" + RpcInstallLang = "install_lang" + RpcGetDepList = "get_dep_list" + RpcGetInstalledDepList = "get_installed_dep_list" +) diff --git a/backend/database/redis.go b/backend/database/redis.go index b165aaa3..1a488767 100644 --- a/backend/database/redis.go +++ b/backend/database/redis.go @@ -42,6 +42,17 @@ func (r *Redis) RPush(collection string, value interface{}) error { return nil } +func (r *Redis) LPush(collection string, value interface{}) error { + c := r.pool.Get() + defer utils.Close(c) + + if _, err := c.Do("RPUSH", collection, value); err != nil { + debug.PrintStack() + return err + } + return nil +} + func (r *Redis) LPop(collection string) (string, error) { c := r.pool.Get() defer utils.Close(c) @@ -96,6 +107,20 @@ func (r *Redis) HKeys(collection string) ([]string, error) { return value, nil } +func (r *Redis) BRPop(collection string, timeout int) (string, error) { + if timeout <= 0 { + timeout = 60 + } + c := r.pool.Get() + defer utils.Close(c) + + value, err2 := redis.String(c.Do("BRPOP", collection, timeout)) + if err2 != nil { + return value, err2 + } + return value, nil +} + func NewRedisPool() *redis.Pool { var address = viper.GetString("redis.address") var port = viper.GetString("redis.port") @@ -112,8 +137,8 @@ func NewRedisPool() *redis.Pool { Dial: func() (conn redis.Conn, e error) { return redis.DialURL(url, redis.DialConnectTimeout(time.Second*10), - redis.DialReadTimeout(time.Second*10), - redis.DialWriteTimeout(time.Second*15), + redis.DialReadTimeout(time.Second*600), + redis.DialWriteTimeout(time.Second*10), ) }, TestOnBorrow: func(c redis.Conn, t time.Time) error { diff --git a/backend/main.go b/backend/main.go index 6a807331..8968027c 100644 --- a/backend/main.go +++ b/backend/main.go @@ -110,12 +110,20 @@ func main() { // 初始化依赖服务 if err := services.InitDepsFetcher(); err != nil { - log.Error("init user service error:" + err.Error()) + log.Error("init dependency fetcher error:" + err.Error()) debug.PrintStack() panic(err) } log.Info("initialized dependency fetcher successfully") + // 初始化RPC服务 + if err := services.InitRpcService(); err != nil { + log.Error("init rpc service error:" + err.Error()) + debug.PrintStack() + panic(err) + } + log.Info("initialized rpc service successfully") + // 以下为主节点服务 if model.IsMaster() { // 中间件 @@ -139,6 +147,7 @@ func main() { authGroup.GET("/nodes/:id/langs", routes.GetLangList) // 节点语言环境列表 authGroup.GET("/nodes/:id/deps", routes.GetDepList) // 节点第三方依赖列表 authGroup.GET("/nodes/:id/deps/installed", routes.GetInstalledDepList) // 节点已安装第三方依赖列表 + authGroup.POST("/nodes/:id/deps/install", routes.InstallDep) // 节点安装依赖 // 爬虫 authGroup.GET("/spiders", routes.GetSpiderList) // 爬虫列表 authGroup.GET("/spiders/:id", routes.GetSpider) // 爬虫详情 diff --git a/backend/routes/system.go b/backend/routes/system.go index bcd186f8..409168d2 100644 --- a/backend/routes/system.go +++ b/backend/routes/system.go @@ -26,12 +26,21 @@ func GetDepList(c *gin.Context) { var depList []entity.Dependency if lang == constants.Python { - list, err := services.GetPythonDepList(nodeId, depName) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return + if services.IsMasterNode(nodeId) { + list, err := services.GetPythonLocalDepList(nodeId, depName) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + depList = list + } else { + list, err := services.GetPythonRemoteDepList(nodeId, depName) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + depList = list } - depList = list } else { HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", lang)) return @@ -49,12 +58,21 @@ func GetInstalledDepList(c *gin.Context) { lang := c.Query("lang") var depList []entity.Dependency if lang == constants.Python { - list, err := services.GetPythonInstalledDepList(nodeId) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return + if services.IsMasterNode(nodeId) { + list, err := services.GetPythonLocalInstalledDepList(nodeId) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + depList = list + } else { + list, err := services.GetPythonRemoteInstalledDepList(nodeId) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + depList = list } - depList = list } else { HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", lang)) return @@ -108,3 +126,43 @@ func GetAllDepList(c *gin.Context) { Data: returnList, }) } + +func InstallDep(c *gin.Context) { + type ReqBody struct { + Lang string `json:"lang"` + DepName string `json:"dep_name"` + } + + nodeId := c.Param("id") + + var reqBody ReqBody + if err := c.ShouldBindJSON(&reqBody); err != nil { + HandleError(http.StatusBadRequest, c, err) + } + + if reqBody.Lang == constants.Python { + if services.IsMasterNode(nodeId) { + _, err := services.InstallPythonLocalDep(reqBody.DepName) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + } else { + _, err := services.InstallPythonRemoteDep(nodeId, reqBody.DepName) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + } + } else { + HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", reqBody.Lang)) + return + } + + // TODO: check if install is successful + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + }) +} diff --git a/backend/services/rpc.go b/backend/services/rpc.go new file mode 100644 index 00000000..cc0762bb --- /dev/null +++ b/backend/services/rpc.go @@ -0,0 +1,214 @@ +package services + +import ( + "crawlab/constants" + "crawlab/database" + "crawlab/entity" + "crawlab/model" + "encoding/json" + "fmt" + "github.com/apex/log" + uuid "github.com/satori/go.uuid" + "runtime/debug" +) + +type RpcMessage struct { + Id string `json:"id"` + Method string `json:"method"` + Params string `json:"params"` + Result string `json:"result"` +} + +func RpcServerInstallLang(msg RpcMessage) RpcMessage { + // install dep rpc + return msg +} + +func RpcServerInstallDep(msg RpcMessage) RpcMessage { + lang := GetRpcParam("lang", msg.Params) + depName := GetRpcParam("dep_name", msg.Params) + if lang == constants.Python { + output, _ := InstallPythonLocalDep(depName) + msg.Result = output + } + return msg +} + +func RpcClientInstallDep(nodeId string, lang string, depName string) (output string, err error) { + params := map[string]string{} + params["lang"] = lang + params["dep_name"] = depName + + data, err := RpcClientFunc(nodeId, params, 10)() + if err != nil { + return + } + + output = data.(string) + + return +} + +func RpcServerGetDepList(nodeId string, msg RpcMessage) RpcMessage { + lang := GetRpcParam("lang", msg.Params) + searchDepName := GetRpcParam("search_dep_name", msg.Params) + if lang == constants.Python { + depList, _ := GetPythonLocalDepList(nodeId, searchDepName) + resultStr, _ := json.Marshal(depList) + msg.Result = string(resultStr) + } + return msg +} + +func RpcClientGetDepList(nodeId string, lang string, searchDepName string) (list []entity.Dependency, err error) { + params := map[string]string{} + params["lang"] = lang + params["search_dep_name"] = searchDepName + + data, err := RpcClientFunc(nodeId, params, 30)() + if err != nil { + return + } + + list = data.([]entity.Dependency) + + return +} + +func RpcServerGetInstalledDepList(nodeId string, msg RpcMessage) RpcMessage { + lang := GetRpcParam("lang", msg.Params) + if lang == constants.Python { + depList, _ := GetPythonLocalInstalledDepList(nodeId) + resultStr, _ := json.Marshal(depList) + msg.Result = string(resultStr) + } + return msg +} + +func RpcClientGetInstalledDepList(nodeId string, lang string) (list []entity.Dependency, err error) { + params := map[string]string{} + params["lang"] = lang + + data, err := RpcClientFunc(nodeId, params, 10)() + if err != nil { + return + } + + list = data.([]entity.Dependency) + + return +} + +func RpcClientFunc(nodeId string, params interface{}, timeout int) func() (interface{}, error) { + return func() (data interface{}, err error) { + // 请求ID + id := uuid.NewV4().String() + + // 构造RPC消息 + msg := RpcMessage{ + Id: id, + Method: constants.RpcGetDepList, + Params: ObjectToString(params), + Result: "", + } + + // 发送RPC消息 + if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", nodeId), ObjectToString(msg)); err != nil { + return data, err + } + + // 获取RPC回复消息 + dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", nodeId), timeout) + if err != nil { + return data, err + } + + // 反序列化消息 + if err := json.Unmarshal([]byte(dataStr), &msg); err != nil { + return data, err + } + + // 反序列化列表 + if err := json.Unmarshal([]byte(msg.Result), &data); err != nil { + return data, err + } + + return data, err + } +} + +func GetRpcParam(key string, params interface{}) string { + var paramsObj map[string]string + if err := json.Unmarshal([]byte(params.(string)), ¶msObj); err != nil { + return "" + } + return paramsObj[key] +} + +func ObjectToString(params interface{}) string { + str, _ := json.Marshal(params) + return string(str) +} + +var IsRpcStopped = false + +func StopRpcService() { + IsRpcStopped = true +} + +func InitRpcService() error { + go func() { + for { + // 获取当前节点 + node, err := model.GetCurrentNode() + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } + + // 获取获取消息队列信息 + dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 300) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } + + // 反序列化消息 + var msg RpcMessage + if err := json.Unmarshal([]byte(dataStr), &msg); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } + + // 根据Method调用本地方法 + var replyMsg RpcMessage + if msg.Method == constants.RpcInstallDep { + replyMsg = RpcServerInstallDep(msg) + } else if msg.Method == constants.RpcInstallLang { + replyMsg = RpcServerInstallLang(msg) + } else if msg.Method == constants.RpcGetDepList { + replyMsg = RpcServerGetDepList(node.Id.Hex(), msg) + } else if msg.Method == constants.RpcGetInstalledDepList { + replyMsg = RpcServerGetInstalledDepList(node.Id.Hex(), msg) + } else { + continue + } + + // 发送返回消息 + if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", node.Id.Hex()), ObjectToString(replyMsg)); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } + + // 如果停止RPC服务,则返回 + if IsRpcStopped { + return + } + } + }() + return nil +} diff --git a/backend/services/system.go b/backend/services/system.go index 045ecbff..fc7e0bad 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -41,8 +41,10 @@ func (s PythonDepNameDictSlice) Len() int { return len(s) } func (s PythonDepNameDictSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s PythonDepNameDictSlice) Less(i, j int) bool { return s[i].Weight > s[j].Weight } +// 系统信息 chan 映射 var SystemInfoChanMap = utils.NewChanMap() +// 从远端获取系统信息 func GetRemoteSystemInfo(nodeId string) (sysInfo entity.SystemInfo, err error) { // 发送消息 msg := entity.NodeMessage{ @@ -70,6 +72,7 @@ func GetRemoteSystemInfo(nodeId string) (sysInfo entity.SystemInfo, err error) { return sysInfo, nil } +// 获取系统信息 func GetSystemInfo(nodeId string) (sysInfo entity.SystemInfo, err error) { if IsMasterNode(nodeId) { sysInfo, err = model.GetLocalSystemInfo() @@ -79,6 +82,7 @@ func GetSystemInfo(nodeId string) (sysInfo entity.SystemInfo, err error) { return } +// 获取语言列表 func GetLangList(nodeId string) []entity.Lang { list := []entity.Lang{ {Name: "Python", ExecutableName: "python", ExecutablePath: "/usr/local/bin/python", DepExecutablePath: "/usr/local/bin/pip"}, @@ -91,6 +95,7 @@ func GetLangList(nodeId string) []entity.Lang { return list } +// 根据语言名获取语言实例 func GetLangFromLangName(nodeId string, name string) entity.Lang { langList := GetLangList(nodeId) for _, lang := range langList { @@ -101,7 +106,22 @@ func GetLangFromLangName(nodeId string, name string) entity.Lang { return entity.Lang{} } -func GetPythonDepList(nodeId string, searchDepName string) ([]entity.Dependency, error) { +// 是否已安装该依赖 +func IsInstalledLang(nodeId string, lang entity.Lang) bool { + sysInfo, err := GetSystemInfo(nodeId) + if err != nil { + return false + } + for _, exec := range sysInfo.Executables { + if exec.Path == lang.ExecutablePath { + return true + } + } + return false +} + +// 获取Python本地依赖列表 +func GetPythonLocalDepList(nodeId string, searchDepName string) ([]entity.Dependency, error) { var list []entity.Dependency // 先从 Redis 获取 @@ -130,7 +150,7 @@ func GetPythonDepList(nodeId string, searchDepName string) ([]entity.Dependency, } // 获取已安装依赖 - installedDepList, err := GetPythonInstalledDepList(nodeId) + installedDepList, err := GetPythonLocalInstalledDepList(nodeId) if err != nil { return list, err } @@ -170,6 +190,16 @@ func GetPythonDepList(nodeId string, searchDepName string) ([]entity.Dependency, return list, nil } +// 获取Python远端依赖列表 +func GetPythonRemoteDepList(nodeId string, searchDepName string) ([]entity.Dependency, error) { + depList, err := RpcClientGetDepList(nodeId, constants.Python, searchDepName) + if err != nil { + return depList, err + } + return depList, nil +} + +// 从Redis获取Python依赖列表 func GetPythonDepListFromRedis() ([]string, error) { var list []string @@ -192,28 +222,7 @@ func GetPythonDepListFromRedis() ([]string, error) { return list, nil } -func IsInstalledLang(nodeId string, lang entity.Lang) bool { - sysInfo, err := GetSystemInfo(nodeId) - if err != nil { - return false - } - for _, exec := range sysInfo.Executables { - if exec.Path == lang.ExecutablePath { - return true - } - } - return false -} - -func IsInstalledDep(installedDepList []entity.Dependency, dep entity.Dependency) bool { - for _, _dep := range installedDepList { - if strings.ToLower(_dep.Name) == strings.ToLower(dep.Name) { - return true - } - } - return false -} - +// 从Python依赖源获取依赖列表并返回 func FetchPythonDepList() ([]string, error) { // 依赖URL url := "https://pypi.tuna.tsinghua.edu.cn/simple" @@ -251,6 +260,7 @@ func FetchPythonDepList() ([]string, error) { return list, nil } +// 更新Python依赖列表到Redis func UpdatePythonDepList() { // 从依赖源获取列表 list, _ := FetchPythonDepList() @@ -271,7 +281,8 @@ func UpdatePythonDepList() { } } -func GetPythonInstalledDepList(nodeId string) ([]entity.Dependency, error){ +// 获取Python本地已安装的依赖列表 +func GetPythonLocalInstalledDepList(nodeId string) ([]entity.Dependency, error) { var list []entity.Dependency lang := GetLangFromLangName(nodeId, constants.Python) @@ -301,11 +312,56 @@ func GetPythonInstalledDepList(nodeId string) ([]entity.Dependency, error){ return list, nil } +// 获取Python远端依赖列表 +func GetPythonRemoteInstalledDepList(nodeId string) ([]entity.Dependency, error) { + depList, err := RpcClientGetInstalledDepList(nodeId, constants.Python) + if err != nil { + return depList, err + } + return depList, nil +} + +// 是否已安装该依赖 +func IsInstalledDep(installedDepList []entity.Dependency, dep entity.Dependency) bool { + for _, _dep := range installedDepList { + if strings.ToLower(_dep.Name) == strings.ToLower(dep.Name) { + return true + } + } + return false +} + +// 安装Python本地依赖 +func InstallPythonLocalDep(depName string) (string, error) { + // 依赖镜像URL + url := "https://pypi.tuna.tsinghua.edu.cn/simple" + + cmd := exec.Command("pip", "install", depName, "-i", url) + outputBytes, err := cmd.Output() + if err != nil { + return fmt.Sprintf("error: %s", err.Error()), err + } + return string(outputBytes), nil +} + +// 获取Python远端依赖列表 +func InstallPythonRemoteDep(nodeId string, depName string) (string, error) { + output, err := RpcClientInstallDep(nodeId, constants.Python, depName) + if err != nil { + return output, err + } + return output, nil +} + func InitDepsFetcher() error { c := cron.New(cron.WithSeconds()) c.Start() if _, err := c.AddFunc("0 */5 * * * *", UpdatePythonDepList); err != nil { return err } + + go func() { + UpdatePythonDepList() + }() return nil }