diff --git a/backend/constants/rpc.go b/backend/constants/rpc.go index 117ed5c8..6eebf0d5 100644 --- a/backend/constants/rpc.go +++ b/backend/constants/rpc.go @@ -1,8 +1,9 @@ package constants const ( - RpcInstallDep = "install_dep" RpcInstallLang = "install_lang" + RpcInstallDep = "install_dep" + RpcUninstallDep = "uninstall_dep" RpcGetDepList = "get_dep_list" RpcGetInstalledDepList = "get_installed_dep_list" ) diff --git a/backend/database/redis.go b/backend/database/redis.go index 1a488767..bd4e5c10 100644 --- a/backend/database/redis.go +++ b/backend/database/redis.go @@ -114,11 +114,11 @@ func (r *Redis) BRPop(collection string, timeout int) (string, error) { c := r.pool.Get() defer utils.Close(c) - value, err2 := redis.String(c.Do("BRPOP", collection, timeout)) - if err2 != nil { - return value, err2 + values, err := redis.Strings(c.Do("BRPOP", collection, timeout)) + if err != nil { + return "", err } - return value, nil + return values[1], nil } func NewRedisPool() *redis.Pool { diff --git a/backend/main.go b/backend/main.go index 8968027c..5c3a4e88 100644 --- a/backend/main.go +++ b/backend/main.go @@ -148,6 +148,7 @@ func main() { authGroup.GET("/nodes/:id/deps", routes.GetDepList) // 节点第三方依赖列表 authGroup.GET("/nodes/:id/deps/installed", routes.GetInstalledDepList) // 节点已安装第三方依赖列表 authGroup.POST("/nodes/:id/deps/install", routes.InstallDep) // 节点安装依赖 + authGroup.POST("/nodes/:id/deps/uninstall", routes.UninstallDep) // 节点卸载依赖 // 爬虫 authGroup.GET("/spiders", routes.GetSpiderList) // 爬虫列表 authGroup.GET("/spiders/:id", routes.GetSpider) // 爬虫详情 diff --git a/backend/routes/system.go b/backend/routes/system.go index 409168d2..d883efb3 100644 --- a/backend/routes/system.go +++ b/backend/routes/system.go @@ -26,21 +26,12 @@ func GetDepList(c *gin.Context) { var depList []entity.Dependency if lang == constants.Python { - if services.IsMasterNode(nodeId) { - list, err := services.GetPythonLocalDepList(nodeId, depName) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - depList = list - } else { - list, err := services.GetPythonRemoteDepList(nodeId, depName) - if err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - depList = list + list, err := services.GetPythonDepList(nodeId, depName) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return } + depList = list } else { HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", lang)) return @@ -166,3 +157,43 @@ func InstallDep(c *gin.Context) { Message: "success", }) } + +func UninstallDep(c *gin.Context) { + type ReqBody struct { + Lang string `json:"lang"` + DepName string `json:"dep_name"` + } + + nodeId := c.Param("id") + + var reqBody ReqBody + if err := c.ShouldBindJSON(&reqBody); err != nil { + HandleError(http.StatusBadRequest, c, err) + } + + if reqBody.Lang == constants.Python { + if services.IsMasterNode(nodeId) { + _, err := services.UninstallPythonLocalDep(reqBody.DepName) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + } else { + _, err := services.UninstallPythonRemoteDep(nodeId, reqBody.DepName) + if err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + } + } else { + HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("%s is not implemented", reqBody.Lang)) + return + } + + // TODO: check if uninstall is successful + + c.JSON(http.StatusOK, Response{ + Status: "ok", + Message: "success", + }) +} diff --git a/backend/services/rpc.go b/backend/services/rpc.go index cc0762bb..77d0c217 100644 --- a/backend/services/rpc.go +++ b/backend/services/rpc.go @@ -5,6 +5,7 @@ import ( "crawlab/database" "crawlab/entity" "crawlab/model" + "crawlab/utils" "encoding/json" "fmt" "github.com/apex/log" @@ -13,10 +14,10 @@ import ( ) type RpcMessage struct { - Id string `json:"id"` - Method string `json:"method"` - Params string `json:"params"` - Result string `json:"result"` + Id string `json:"id"` + Method string `json:"method"` + Params map[string]string `json:"params"` + Result string `json:"result"` } func RpcServerInstallLang(msg RpcMessage) RpcMessage { @@ -39,38 +40,37 @@ func RpcClientInstallDep(nodeId string, lang string, depName string) (output str params["lang"] = lang params["dep_name"] = depName - data, err := RpcClientFunc(nodeId, params, 10)() + data, err := RpcClientFunc(nodeId, constants.RpcInstallDep, params, 10)() if err != nil { return } - output = data.(string) + output = data return } -func RpcServerGetDepList(nodeId string, msg RpcMessage) RpcMessage { +func RpcServerUninstallDep(msg RpcMessage) RpcMessage { lang := GetRpcParam("lang", msg.Params) - searchDepName := GetRpcParam("search_dep_name", msg.Params) + depName := GetRpcParam("dep_name", msg.Params) if lang == constants.Python { - depList, _ := GetPythonLocalDepList(nodeId, searchDepName) - resultStr, _ := json.Marshal(depList) - msg.Result = string(resultStr) + output, _ := UninstallPythonLocalDep(depName) + msg.Result = output } return msg } -func RpcClientGetDepList(nodeId string, lang string, searchDepName string) (list []entity.Dependency, err error) { +func RpcClientUninstallDep(nodeId string, lang string, depName string) (output string, err error) { params := map[string]string{} params["lang"] = lang - params["search_dep_name"] = searchDepName + params["dep_name"] = depName - data, err := RpcClientFunc(nodeId, params, 30)() + data, err := RpcClientFunc(nodeId, constants.RpcUninstallDep, params, 60)() if err != nil { return } - list = data.([]entity.Dependency) + output = data return } @@ -89,65 +89,60 @@ func RpcClientGetInstalledDepList(nodeId string, lang string) (list []entity.Dep params := map[string]string{} params["lang"] = lang - data, err := RpcClientFunc(nodeId, params, 10)() + data, err := RpcClientFunc(nodeId, constants.RpcGetInstalledDepList, params, 10)() if err != nil { return } - list = data.([]entity.Dependency) + // 反序列化结果 + if err := json.Unmarshal([]byte(data), &list); err != nil { + return list, err + } return } -func RpcClientFunc(nodeId string, params interface{}, timeout int) func() (interface{}, error) { - return func() (data interface{}, err error) { +func RpcClientFunc(nodeId string, method string, params map[string]string, timeout int) func() (string, error) { + return func() (result string, err error) { // 请求ID id := uuid.NewV4().String() // 构造RPC消息 msg := RpcMessage{ Id: id, - Method: constants.RpcGetDepList, - Params: ObjectToString(params), + Method: method, + Params: params, Result: "", } // 发送RPC消息 - if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", nodeId), ObjectToString(msg)); err != nil { - return data, err + msgStr := ObjectToString(msg) + if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", nodeId), msgStr); err != nil { + return result, err } // 获取RPC回复消息 dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", nodeId), timeout) if err != nil { - return data, err + return result, err } // 反序列化消息 if err := json.Unmarshal([]byte(dataStr), &msg); err != nil { - return data, err + return result, err } - // 反序列化列表 - if err := json.Unmarshal([]byte(msg.Result), &data); err != nil { - return data, err - } - - return data, err + return msg.Result, err } } -func GetRpcParam(key string, params interface{}) string { - var paramsObj map[string]string - if err := json.Unmarshal([]byte(params.(string)), ¶msObj); err != nil { - return "" - } - return paramsObj[key] +func GetRpcParam(key string, params map[string]string) string { + return params[key] } func ObjectToString(params interface{}) string { - str, _ := json.Marshal(params) - return string(str) + bytes, _ := json.Marshal(params) + return utils.BytesToString(bytes) } var IsRpcStopped = false @@ -187,10 +182,10 @@ func InitRpcService() error { var replyMsg RpcMessage if msg.Method == constants.RpcInstallDep { replyMsg = RpcServerInstallDep(msg) + } else if msg.Method == constants.RpcUninstallDep { + replyMsg = RpcServerUninstallDep(msg) } else if msg.Method == constants.RpcInstallLang { replyMsg = RpcServerInstallLang(msg) - } else if msg.Method == constants.RpcGetDepList { - replyMsg = RpcServerGetDepList(node.Id.Hex(), msg) } else if msg.Method == constants.RpcGetInstalledDepList { replyMsg = RpcServerGetInstalledDepList(node.Id.Hex(), msg) } else { diff --git a/backend/services/system.go b/backend/services/system.go index fc7e0bad..e84a1255 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -121,7 +121,7 @@ func IsInstalledLang(nodeId string, lang entity.Lang) bool { } // 获取Python本地依赖列表 -func GetPythonLocalDepList(nodeId string, searchDepName string) ([]entity.Dependency, error) { +func GetPythonDepList(nodeId string, searchDepName string) ([]entity.Dependency, error) { var list []entity.Dependency // 先从 Redis 获取 @@ -149,22 +149,51 @@ func GetPythonLocalDepList(nodeId string, searchDepName string) ([]entity.Depend } } - // 获取已安装依赖 - installedDepList, err := GetPythonLocalInstalledDepList(nodeId) - if err != nil { - return list, err + // 获取已安装依赖列表 + var installedDepList []entity.Dependency + if IsMasterNode(nodeId) { + installedDepList, err = GetPythonLocalInstalledDepList(nodeId) + if err != nil { + return list, err + } + } else { + installedDepList, err = GetPythonRemoteInstalledDepList(nodeId) + if err != nil { + return list, err + } } - // 从依赖源获取数据 - var goSync sync.WaitGroup + // 根据依赖名排序 sort.Stable(depNameList) + + // 遍历依赖名列表,取前10个 for i, depNameDict := range depNameList { + if i > 10 { + break + } + dep := entity.Dependency{ + Name: depNameDict.Name, + } + dep.Installed = IsInstalledDep(installedDepList, dep) + list = append(list, dep) + } + + // 从依赖源获取信息 + list, err = GetPythonDepListWithInfo(list) + + return list, nil +} + +// 获取Python依赖的源数据信息 +func GetPythonDepListWithInfo(depList []entity.Dependency) ([]entity.Dependency, error) { + var goSync sync.WaitGroup + for i, dep := range depList { if i > 10 { break } goSync.Add(1) - go func(depName string, n *sync.WaitGroup) { - url := fmt.Sprintf("https://pypi.org/pypi/%s/json", depName) + go func(i int, dep entity.Dependency, depList []entity.Dependency, n *sync.WaitGroup) { + url := fmt.Sprintf("https://pypi.org/pypi/%s/json", dep.Name) res, err := req.Get(url) if err != nil { n.Done() @@ -175,27 +204,12 @@ func GetPythonLocalDepList(nodeId string, searchDepName string) ([]entity.Depend n.Done() return } - dep := entity.Dependency{ - Name: depName, - Version: data.Info.Version, - Description: data.Info.Summary, - } - dep.Installed = IsInstalledDep(installedDepList, dep) - list = append(list, dep) + depList[i].Version = data.Info.Version + depList[i].Description = data.Info.Summary n.Done() - }(depNameDict.Name, &goSync) + }(i, dep, depList, &goSync) } goSync.Wait() - - return list, nil -} - -// 获取Python远端依赖列表 -func GetPythonRemoteDepList(nodeId string, searchDepName string) ([]entity.Dependency, error) { - depList, err := RpcClientGetDepList(nodeId, constants.Python, searchDepName) - if err != nil { - return depList, err - } return depList, nil } @@ -339,6 +353,8 @@ func InstallPythonLocalDep(depName string) (string, error) { cmd := exec.Command("pip", "install", depName, "-i", url) outputBytes, err := cmd.Output() if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() return fmt.Sprintf("error: %s", err.Error()), err } return string(outputBytes), nil @@ -353,6 +369,28 @@ func InstallPythonRemoteDep(nodeId string, depName string) (string, error) { return output, nil } +// 安装Python本地依赖 +func UninstallPythonLocalDep(depName string) (string, error) { + cmd := exec.Command("pip", "uninstall", "-y", depName) + outputBytes, err := cmd.Output() + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return fmt.Sprintf("error: %s", err.Error()), err + } + return string(outputBytes), nil +} + +// 获取Python远端依赖列表 +func UninstallPythonRemoteDep(nodeId string, depName string) (string, error) { + output, err := RpcClientUninstallDep(nodeId, constants.Python, depName) + if err != nil { + return output, err + } + return output, nil +} + +// 初始化函数 func InitDepsFetcher() error { c := cron.New(cron.WithSeconds()) c.Start() diff --git a/frontend/src/components/Node/NodeInstallation.vue b/frontend/src/components/Node/NodeInstallation.vue index d9de222d..df487434 100644 --- a/frontend/src/components/Node/NodeInstallation.vue +++ b/frontend/src/components/Node/NodeInstallation.vue @@ -51,10 +51,22 @@ > @@ -63,7 +75,10 @@