diff --git a/backend/constants/rpc.go b/backend/constants/rpc.go index c7380110..0fd7ad9f 100644 --- a/backend/constants/rpc.go +++ b/backend/constants/rpc.go @@ -1,10 +1,12 @@ package constants const ( - RpcInstallLang = "install_lang" - RpcInstallDep = "install_dep" - RpcUninstallDep = "uninstall_dep" - RpcGetInstalledDepList = "get_installed_dep_list" - RpcGetLang = "get_lang" - RpcCancelTask = "cancel_task" + RpcInstallLang = "install_lang" + RpcInstallDep = "install_dep" + RpcUninstallDep = "uninstall_dep" + RpcGetInstalledDepList = "get_installed_dep_list" + RpcGetLang = "get_lang" + RpcCancelTask = "cancel_task" + RpcGetSystemInfoService = "get_system_info" + RpcRemoveSpider = "remove_spider" ) diff --git a/backend/services/node.go b/backend/services/node.go index 42a664c4..641b403c 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -3,16 +3,12 @@ package services import ( "crawlab/constants" "crawlab/database" - "crawlab/entity" "crawlab/model" "crawlab/services/local_node" - "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" - "fmt" "github.com/apex/log" "github.com/globalsign/mgo/bson" - "github.com/gomodule/redigo/redis" "runtime/debug" "time" ) @@ -180,41 +176,6 @@ func UpdateNodeData() { } } -func MasterNodeCallback(message redis.Message) (err error) { - // 反序列化 - var msg entity.NodeMessage - if err := json.Unmarshal(message.Data, &msg); err != nil { - - return err - } - - if msg.Type == constants.MsgTypeGetLog { - // 获取日志 - time.Sleep(10 * time.Millisecond) - ch := TaskLogChanMap.ChanBlocked(msg.TaskId) - ch <- msg.Log - } else if msg.Type == constants.MsgTypeGetSystemInfo { - // 获取系统信息 - fmt.Println(msg) - time.Sleep(10 * time.Millisecond) - ch := SystemInfoChanMap.ChanBlocked(msg.NodeId) - sysInfoBytes, _ := json.Marshal(&msg.SysInfo) - ch <- utils.BytesToString(sysInfoBytes) - } - return nil -} - -func WorkerNodeCallback(message redis.Message) (err error) { - // 反序列化 - msg := utils.GetMessage(message) - if err := msg_handler.GetMsgHandler(*msg).Handle(); err != nil { - log.Errorf("msg handler error: %s", err.Error()) - debug.PrintStack() - return err - } - return nil -} - // 发送心跳信息到Redis,每5秒发送一次 func SendHeartBeat() { for { @@ -260,24 +221,6 @@ func InitNodeService() error { go UpdateNodeStatusPeriodically() } - if model.IsMaster() { - // 如果为主节点,订阅主节点通信频道 - if err := database.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil { - return err - } - } else { - // 若为工作节点,订阅单独指定通信频道 - channel := constants.ChannelWorkerNode + node.Current().Id.Hex() - if err := database.Sub(channel, WorkerNodeCallback); err != nil { - return err - } - } - - // 订阅全通道 - if err := database.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil { - return err - } - // 更新在当前节点执行中的任务状态为:abnormal if err := model.UpdateTaskToAbnormal(node.Current().Id); err != nil { debug.PrintStack() diff --git a/backend/services/rpc/base.go b/backend/services/rpc/base.go index 1609be16..866fe48b 100644 --- a/backend/services/rpc/base.go +++ b/backend/services/rpc/base.go @@ -76,6 +76,8 @@ func GetService(msg entity.RpcMessage) Service { return &GetInstalledDepsService{msg: msg} case constants.RpcCancelTask: return &CancelTaskService{msg: msg} + case constants.RpcGetSystemInfoService: + return &GetSystemInfoService{msg: msg} } return nil } diff --git a/backend/services/rpc/get_system_info.go b/backend/services/rpc/get_system_info.go new file mode 100644 index 00000000..7e290656 --- /dev/null +++ b/backend/services/rpc/get_system_info.go @@ -0,0 +1,67 @@ +package rpc + +import ( + "crawlab/constants" + "crawlab/entity" + "crawlab/model" + "encoding/json" +) + +type GetSystemInfoService struct { + msg entity.RpcMessage +} + +func (s *GetSystemInfoService) ServerHandle() (entity.RpcMessage, error) { + sysInfo, err := GetSystemInfoServiceLocal() + if err != nil { + s.msg.Error = err.Error() + return s.msg, err + } + + // 序列化 + resultStr, _ := json.Marshal(sysInfo) + s.msg.Result = string(resultStr) + return s.msg, nil +} + +func (s *GetSystemInfoService) ClientHandle() (o interface{}, err error) { + // 发起 RPC 请求,获取服务端数据 + s.msg, err = ClientFunc(s.msg)() + if err != nil { + return o, err + } + + var output entity.SystemInfo + if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil { + return o, err + } + o = output + + return +} + +func GetSystemInfoServiceLocal() (sysInfo entity.SystemInfo, err error) { + // 获取环境信息 + sysInfo, err = model.GetLocalSystemInfo() + if err != nil { + return sysInfo, err + } + return sysInfo, nil +} + +func GetSystemInfoServiceRemote(nodeId string) (sysInfo entity.SystemInfo, err error) { + params := make(map[string]string) + params["node_id"] = nodeId + s := GetService(entity.RpcMessage{ + NodeId: nodeId, + Method: constants.RpcGetSystemInfoService, + Params: params, + Timeout: 60, + }) + o, err := s.ClientHandle() + if err != nil { + return + } + sysInfo = o.(entity.SystemInfo) + return +} diff --git a/backend/services/rpc/remove_spider.go b/backend/services/rpc/remove_spider.go new file mode 100644 index 00000000..1d8ff90c --- /dev/null +++ b/backend/services/rpc/remove_spider.go @@ -0,0 +1,62 @@ +package rpc + +import ( + "crawlab/constants" + "crawlab/entity" + "crawlab/model" + "crawlab/utils" + "github.com/globalsign/mgo/bson" + "github.com/spf13/viper" + "path/filepath" +) + +type RemoveSpiderService struct { + msg entity.RpcMessage +} + +func (s *RemoveSpiderService) ServerHandle() (entity.RpcMessage, error) { + spiderId := utils.GetRpcParam("spider_id", s.msg.Params) + if err := RemoveSpiderServiceLocal(spiderId); err != nil { + s.msg.Error = err.Error() + return s.msg, err + } + s.msg.Result = "success" + return s.msg, nil +} + +func (s *RemoveSpiderService) ClientHandle() (o interface{}, err error) { + // 发起 RPC 请求,获取服务端数据 + _, err = ClientFunc(s.msg)() + if err != nil { + return + } + + return +} + +func RemoveSpiderServiceLocal(spiderId string) error { + // 移除本地的爬虫目录 + spider, err := model.GetSpider(bson.ObjectIdHex(spiderId)) + if err != nil { + return err + } + path := filepath.Join(viper.GetString("spider.path"), spider.Name) + utils.RemoveFiles(path) + return nil +} + +func RemoveSpiderServiceRemote(spiderId string, nodeId string) (err error) { + params := make(map[string]string) + params["spider_id"] = spiderId + s := GetService(entity.RpcMessage{ + NodeId: nodeId, + Method: constants.RpcRemoveSpider, + Params: params, + Timeout: 60, + }) + _, err = s.ClientHandle() + if err != nil { + return + } + return +} diff --git a/backend/services/spider.go b/backend/services/spider.go index ec1d8441..c088abb5 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -280,13 +280,13 @@ func RemoveSpider(id string) error { utils.RemoveFiles(path) // 删除其他节点的爬虫目录 - msg := entity.NodeMessage{ - Type: constants.MsgTypeRemoveSpider, - SpiderId: id, - } - if err := database.Pub(constants.ChannelAllNode, msg); err != nil { - return err - } + //msg := entity.NodeMessage{ + // Type: constants.MsgTypeRemoveSpider, + // SpiderId: id, + //} + //if err := database.Pub(constants.ChannelAllNode, msg); err != nil { + // return err + //} // 从数据库中删除该爬虫 if err := model.RemoveSpider(bson.ObjectIdHex(id)); err != nil { diff --git a/backend/services/system.go b/backend/services/system.go index fd9bb83d..49a47219 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -5,7 +5,6 @@ import ( "crawlab/database" "crawlab/entity" "crawlab/lib/cron" - "crawlab/model" "crawlab/services/rpc" "crawlab/utils" "encoding/json" @@ -55,9 +54,9 @@ func GetRemoteSystemInfo(nodeId string) (sysInfo entity.SystemInfo, err error) { // 获取系统信息 func GetSystemInfo(nodeId string) (sysInfo entity.SystemInfo, err error) { if IsMasterNode(nodeId) { - sysInfo, err = model.GetLocalSystemInfo() + sysInfo, err = rpc.GetSystemInfoServiceLocal() } else { - sysInfo, err = GetRemoteSystemInfo(nodeId) + sysInfo, err = rpc.GetSystemInfoServiceRemote(nodeId) } return } diff --git a/frontend/src/components/Settings/GitSettings.vue b/frontend/src/components/Settings/GitSettings.vue index 4f092bf4..98b87ded 100644 --- a/frontend/src/components/Settings/GitSettings.vue +++ b/frontend/src/components/Settings/GitSettings.vue @@ -287,7 +287,11 @@ if (!this.spiderForm.git_url) return this.isGitBranchesLoading = true try { - const res = await this.$request.get('/git/branches', { url: this.spiderForm.git_url }) + const res = await this.$request.get('/git/branches', { + url: this.spiderForm.git_url, + username: this.spiderForm.git_username, + password: this.spiderForm.git_password + }) this.gitBranches = res.data.data if (!this.spiderForm.git_branch && this.gitBranches.length > 0) { this.$set(this.spiderForm, 'git_branch', this.gitBranches[0])