优化RPC逻辑

This commit is contained in:
marvzhang
2020-03-08 17:50:26 +08:00
parent cc81b80397
commit fe97b813e8
3 changed files with 70 additions and 69 deletions

View File

@@ -26,13 +26,17 @@ server:
# mac地址/ip地址/hostname, 如果是ip则需要手动指定IP
type: "mac"
ip: ""
lang: # 安装语言环境, Y 为安装N 为不安装只对 Docker 有效
lang: # 安装语言环境, Y 为安装N 为不安装
python: "Y"
node: "N"
java: "N"
dotnet: "N"
spider:
path: "/app/spiders"
task:
workers: 4
rpc:
workers: 16
other:
tmppath: "/tmp"
version: 0.4.7

View File

@@ -11,14 +11,16 @@ import (
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"runtime/debug"
)
type RpcMessage struct {
Id string `json:"id"`
Method string `json:"method"`
Params map[string]string `json:"params"`
Result string `json:"result"`
Id string `json:"id"`
Method string `json:"method"`
Blocked bool `json:"blocked"`
Params map[string]string `json:"params"`
Result string `json:"result"`
}
// ========安装语言========
@@ -36,13 +38,13 @@ func RpcClientInstallLang(nodeId string, lang string) (output string, err error)
params := map[string]string{}
params["lang"] = lang
// 发起 RPC 请求并阻塞,获取服务端数据
data, err := RpcClientFunc(nodeId, constants.RpcInstallLang, params, 600)()
if err != nil {
return
}
output = data
// 发起 RPC 请求,获取服务端数据
go func() {
_, err := RpcClientFunc(nodeId, constants.RpcInstallLang, params, 600)()
if err != nil {
return
}
}()
return
}
@@ -235,62 +237,64 @@ func StopRpcService() {
// 初始化 RPC 服务
func InitRpcService() error {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取获取消息队列信息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
for i := 0; i < viper.GetInt("rpc.workers"); i++ {
go func() {
for {
// 获取当前节点
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
continue
}
// 反序列化消
var msg RpcMessage
if err := json.Unmarshal([]byte(dataStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 获取获取消息队列信
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 根据Method调用本地方法
var replyMsg RpcMessage
if msg.Method == constants.RpcInstallDep {
replyMsg = RpcServerInstallDep(msg)
} else if msg.Method == constants.RpcUninstallDep {
replyMsg = RpcServerUninstallDep(msg)
} else if msg.Method == constants.RpcInstallLang {
replyMsg = RpcServerInstallLang(msg)
} else if msg.Method == constants.RpcGetInstalledDepList {
replyMsg = RpcServerGetInstalledDepList(node.Id.Hex(), msg)
} else if msg.Method == constants.RpcGetLang {
replyMsg = RpcServerGetLang(msg)
} else {
continue
}
// 反序列化消息
var msg RpcMessage
if err := json.Unmarshal([]byte(dataStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", node.Id.Hex()), ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 根据Method调用本地方法
var replyMsg RpcMessage
if msg.Method == constants.RpcInstallDep {
replyMsg = RpcServerInstallDep(msg)
} else if msg.Method == constants.RpcUninstallDep {
replyMsg = RpcServerUninstallDep(msg)
} else if msg.Method == constants.RpcInstallLang {
replyMsg = RpcServerInstallLang(msg)
} else if msg.Method == constants.RpcGetInstalledDepList {
replyMsg = RpcServerGetInstalledDepList(node.Id.Hex(), msg)
} else if msg.Method == constants.RpcGetLang {
replyMsg = RpcServerGetLang(msg)
} else {
continue
}
// 如果停止RPC服务则返回
if IsRpcStopped {
return
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", node.Id.Hex()), ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 如果停止RPC服务则返回
if IsRpcStopped {
return
}
}
}
}()
}()
}
return nil
}

View File

@@ -113,13 +113,6 @@ export default {
},
methods: {
async getData () {
// await Promise.all(this.nodeList.map(async n => {
// const res = await this.$request.get(`/nodes/${n._id}/langs`)
// res.data.data.forEach(l => {
// const key = n._id + '|' + l.executable_name
// this.$set(this.dataDict, key, l)
// })
// }))
for (let i = 0; i < this.nodeList.length; i++) {
const n = this.nodeList[i]
const res = await this.$request.get(`/nodes/${n._id}/langs`)
@@ -169,7 +162,7 @@ export default {
}
},
async created () {
await this.getData()
this.getData()
this.handle = setInterval(() => {
this.getData()