diff --git a/backend/conf/config.yml b/backend/conf/config.yml index 1e98a2eb..1b724c42 100644 --- a/backend/conf/config.yml +++ b/backend/conf/config.yml @@ -26,13 +26,17 @@ server: # mac地址/ip地址/hostname, 如果是ip,则需要手动指定IP type: "mac" ip: "" - lang: # 安装语言环境, Y 为安装,N 为不安装,只对 Docker 有效 + lang: # 安装语言环境, Y 为安装,N 为不安装 python: "Y" node: "N" + java: "N" + dotnet: "N" spider: path: "/app/spiders" task: workers: 4 +rpc: + workers: 16 other: tmppath: "/tmp" version: 0.4.7 diff --git a/backend/services/rpc.go b/backend/services/rpc.go index 57a208f8..1a422d3d 100644 --- a/backend/services/rpc.go +++ b/backend/services/rpc.go @@ -11,14 +11,16 @@ import ( "github.com/apex/log" "github.com/gomodule/redigo/redis" uuid "github.com/satori/go.uuid" + "github.com/spf13/viper" "runtime/debug" ) type RpcMessage struct { - Id string `json:"id"` - Method string `json:"method"` - Params map[string]string `json:"params"` - Result string `json:"result"` + Id string `json:"id"` + Method string `json:"method"` + Blocked bool `json:"blocked"` + Params map[string]string `json:"params"` + Result string `json:"result"` } // ========安装语言======== @@ -36,13 +38,13 @@ func RpcClientInstallLang(nodeId string, lang string) (output string, err error) params := map[string]string{} params["lang"] = lang - // 发起 RPC 请求并阻塞,获取服务端数据 - data, err := RpcClientFunc(nodeId, constants.RpcInstallLang, params, 600)() - if err != nil { - return - } - - output = data + // 发起 RPC 请求,获取服务端数据 + go func() { + _, err := RpcClientFunc(nodeId, constants.RpcInstallLang, params, 600)() + if err != nil { + return + } + }() return } @@ -235,62 +237,64 @@ func StopRpcService() { // 初始化 RPC 服务 func InitRpcService() error { - go func() { - for { - // 获取当前节点 - node, err := model.GetCurrentNode() - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } - - // 获取获取消息队列信息 - dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0) - if err != nil { - if err != redis.ErrNil { + for i := 0; i < viper.GetInt("rpc.workers"); i++ { + go func() { + for { + // 获取当前节点 + node, err := model.GetCurrentNode() + if err != nil { log.Errorf(err.Error()) debug.PrintStack() + continue } - continue - } - // 反序列化消息 - var msg RpcMessage - if err := json.Unmarshal([]byte(dataStr), &msg); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } + // 获取获取消息队列信息 + dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0) + if err != nil { + if err != redis.ErrNil { + log.Errorf(err.Error()) + debug.PrintStack() + } + continue + } - // 根据Method调用本地方法 - var replyMsg RpcMessage - if msg.Method == constants.RpcInstallDep { - replyMsg = RpcServerInstallDep(msg) - } else if msg.Method == constants.RpcUninstallDep { - replyMsg = RpcServerUninstallDep(msg) - } else if msg.Method == constants.RpcInstallLang { - replyMsg = RpcServerInstallLang(msg) - } else if msg.Method == constants.RpcGetInstalledDepList { - replyMsg = RpcServerGetInstalledDepList(node.Id.Hex(), msg) - } else if msg.Method == constants.RpcGetLang { - replyMsg = RpcServerGetLang(msg) - } else { - continue - } + // 反序列化消息 + var msg RpcMessage + if err := json.Unmarshal([]byte(dataStr), &msg); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } - // 发送返回消息 - if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", node.Id.Hex()), ObjectToString(replyMsg)); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } + // 根据Method调用本地方法 + var replyMsg RpcMessage + if msg.Method == constants.RpcInstallDep { + replyMsg = RpcServerInstallDep(msg) + } else if msg.Method == constants.RpcUninstallDep { + replyMsg = RpcServerUninstallDep(msg) + } else if msg.Method == constants.RpcInstallLang { + replyMsg = RpcServerInstallLang(msg) + } else if msg.Method == constants.RpcGetInstalledDepList { + replyMsg = RpcServerGetInstalledDepList(node.Id.Hex(), msg) + } else if msg.Method == constants.RpcGetLang { + replyMsg = RpcServerGetLang(msg) + } else { + continue + } - // 如果停止RPC服务,则返回 - if IsRpcStopped { - return + // 发送返回消息 + if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", node.Id.Hex()), ObjectToString(replyMsg)); err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } + + // 如果停止RPC服务,则返回 + if IsRpcStopped { + return + } } - } - }() + }() + } return nil } diff --git a/frontend/src/components/Node/NodeInstallationMatrix.vue b/frontend/src/components/Node/NodeInstallationMatrix.vue index a144c53d..127e4262 100644 --- a/frontend/src/components/Node/NodeInstallationMatrix.vue +++ b/frontend/src/components/Node/NodeInstallationMatrix.vue @@ -113,13 +113,6 @@ export default { }, methods: { async getData () { - // await Promise.all(this.nodeList.map(async n => { - // const res = await this.$request.get(`/nodes/${n._id}/langs`) - // res.data.data.forEach(l => { - // const key = n._id + '|' + l.executable_name - // this.$set(this.dataDict, key, l) - // }) - // })) for (let i = 0; i < this.nodeList.length; i++) { const n = this.nodeList[i] const res = await this.$request.get(`/nodes/${n._id}/langs`) @@ -169,7 +162,7 @@ export default { } }, async created () { - await this.getData() + this.getData() this.handle = setInterval(() => { this.getData()