From 9514a8a6af361d2a2a5598f72cc6ca52a2b0a0da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Tue, 10 Sep 2019 09:32:48 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84msg=E7=9A=84=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/constants/message.go | 1 + backend/model/file.go | 17 ++++ backend/model/log.go | 43 ++++++++ backend/model/system.go | 96 ++++++++++++++++++ backend/routes/spider.go | 6 ++ backend/services/log.go | 90 ++++++++++------- backend/services/msg_handler/handler.go | 48 +++++++++ backend/services/msg_handler/msg_log.go | 60 ++++++++++++ .../services/msg_handler/msg_system_info.go | 39 ++++++++ backend/services/msg_handler/msg_task.go | 17 ++++ backend/services/node.go | 89 ++--------------- backend/services/system.go | 98 +------------------ backend/services/task.go | 12 +-- backend/utils/chan.go | 2 + 14 files changed, 398 insertions(+), 220 deletions(-) create mode 100644 backend/model/log.go create mode 100644 backend/services/msg_handler/handler.go create mode 100644 backend/services/msg_handler/msg_log.go create mode 100644 backend/services/msg_handler/msg_system_info.go create mode 100644 backend/services/msg_handler/msg_task.go diff --git a/backend/constants/message.go b/backend/constants/message.go index 521a2019..f76e8fc3 100644 --- a/backend/constants/message.go +++ b/backend/constants/message.go @@ -4,4 +4,5 @@ const ( MsgTypeGetLog = "get-log" MsgTypeGetSystemInfo = "get-sys-info" MsgTypeCancelTask = "cancel-task" + MsgTypeRemoveLog = "remove-log" ) diff --git a/backend/model/file.go b/backend/model/file.go index f8963d06..3cea7b39 100644 --- a/backend/model/file.go +++ b/backend/model/file.go @@ -1,8 +1,25 @@ package model +import ( + "crawlab/utils" + "github.com/apex/log" + "os" +) + type File struct { Name string `json:"name"` Path string `json:"path"` IsDir bool `json:"is_dir"` Size int64 `json:"size"` } + +func RemoveFile(path string) error { + if !utils.Exists(path) { + log.Info("file not found: " + path) + return nil + } + if err := os.Remove(path); err != nil { + return err + } + return nil +} diff --git a/backend/model/log.go b/backend/model/log.go new file mode 100644 index 00000000..ae6973b1 --- /dev/null +++ b/backend/model/log.go @@ -0,0 +1,43 @@ +package model + +import ( + "github.com/apex/log" + "os" + "runtime/debug" +) + +// 获取本地日志 +func GetLocalLog(logPath string) (fileBytes []byte, err error) { + + f, err := os.Open(logPath) + if err != nil { + log.Error(err.Error()) + debug.PrintStack() + return nil, err + } + fi, err := f.Stat() + if err != nil { + log.Error(err.Error()) + debug.PrintStack() + return nil, err + } + defer f.Close() + + const bufLen = 2 * 1024 * 1024 + logBuf := make([]byte, bufLen) + + off := int64(0) + if fi.Size() > int64(len(logBuf)) { + off = fi.Size() - int64(len(logBuf)) + } + n, err := f.ReadAt(logBuf, off) + + //到文件结尾会有EOF标识 + if err != nil && err.Error() != "EOF" { + log.Error(err.Error()) + debug.PrintStack() + return nil, err + } + logBuf = logBuf[:n] + return logBuf, nil +} diff --git a/backend/model/system.go b/backend/model/system.go index c4865a24..6091c963 100644 --- a/backend/model/system.go +++ b/backend/model/system.go @@ -1,5 +1,40 @@ package model +import ( + "github.com/apex/log" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "runtime/debug" + "strings" +) + +var executableNameMap = map[string]string{ + // python + "python": "Python", + "python2": "Python 2", + "python2.7": "Python 2.7", + "python3": "Python 3", + "python3.5": "Python 3.5", + "python3.6": "Python 3.6", + "python3.7": "Python 3.7", + "python3.8": "Python 3.8", + // java + "java": "Java", + // go + "go": "Go", + // node + "node": "NodeJS", + // php + "php": "PHP", + // windows command + "cmd": "Windows Command Prompt", + // linux shell + "sh": "Shell", + "bash": "bash", +} + type SystemInfo struct { ARCH string `json:"arch"` OS string `json:"os"` @@ -13,3 +48,64 @@ type Executable struct { FileName string `json:"file_name"` DisplayName string `json:"display_name"` } + +func GetLocalSystemInfo() (sysInfo SystemInfo, err error) { + executables, err := GetExecutables() + if err != nil { + return sysInfo, err + } + hostname, err := os.Hostname() + if err != nil { + debug.PrintStack() + return sysInfo, err + } + + return SystemInfo{ + ARCH: runtime.GOARCH, + OS: runtime.GOOS, + NumCpu: runtime.GOMAXPROCS(0), + Hostname: hostname, + Executables: executables, + }, nil +} + +func GetSystemEnv(key string) string { + return os.Getenv(key) +} + +func GetPathValues() (paths []string) { + pathEnv := GetSystemEnv("PATH") + return strings.Split(pathEnv, ":") +} + +func GetExecutables() (executables []Executable, err error) { + pathValues := GetPathValues() + + cache := map[string]string{} + + for _, path := range pathValues { + fileList, err := ioutil.ReadDir(path) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } + + for _, file := range fileList { + displayName := executableNameMap[file.Name()] + filePath := filepath.Join(path, file.Name()) + + if cache[filePath] == "" { + if displayName != "" { + executables = append(executables, Executable{ + Path: filePath, + FileName: file.Name(), + DisplayName: displayName, + }) + } + cache[filePath] = filePath + } + } + } + return executables, nil +} diff --git a/backend/routes/spider.go b/backend/routes/spider.go index 0777aa69..e0afb1a8 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -229,6 +229,12 @@ func DeleteSpider(c *gin.Context) { return } + // 删除日志文件 + if err := services.RemoveLogBySpiderId(spider.Id); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + // 删除爬虫对应的task任务 if err := model.RemoveTaskBySpiderId(spider.Id); err != nil { HandleError(http.StatusInternalServerError, c, err) diff --git a/backend/services/log.go b/backend/services/log.go index 1344b02f..a18672c9 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -5,9 +5,11 @@ import ( "crawlab/database" "crawlab/lib/cron" "crawlab/model" + "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" "github.com/apex/log" + "github.com/globalsign/mgo/bson" "github.com/spf13/viper" "io/ioutil" "os" @@ -18,46 +20,10 @@ import ( // 任务日志频道映射 var TaskLogChanMap = utils.NewChanMap() -// 获取本地日志 -func GetLocalLog(logPath string) (fileBytes []byte, err error) { - - f, err := os.Open(logPath) - if err != nil { - log.Error(err.Error()) - debug.PrintStack() - return nil, err - } - fi, err := f.Stat() - if err != nil { - log.Error(err.Error()) - debug.PrintStack() - return nil, err - } - defer f.Close() - - const bufLen = 2 * 1024 * 1024 - logBuf := make([]byte, bufLen) - - off := int64(0) - if fi.Size() > int64(len(logBuf)) { - off = fi.Size() - int64(len(logBuf)) - } - n, err := f.ReadAt(logBuf, off) - - //到文件结尾会有EOF标识 - if err != nil && err.Error() != "EOF" { - log.Error(err.Error()) - debug.PrintStack() - return nil, err - } - logBuf = logBuf[:n] - return logBuf, nil -} - // 获取远端日志 func GetRemoteLog(task model.Task) (logStr string, err error) { // 序列化消息 - msg := NodeMessage{ + msg := msg_handler.NodeMessage{ Type: constants.MsgTypeGetLog, LogPath: task.LogPath, TaskId: task.Id, @@ -85,6 +51,7 @@ func GetRemoteLog(task model.Task) (logStr string, err error) { return logStr, nil } +// 定时删除日志 func DeleteLogPeriodically() { logDir := viper.GetString("log.path") if !utils.Exists(logDir) { @@ -107,6 +74,55 @@ func DeleteLogPeriodically() { } +// 删除本地日志 +func RemoveLocalLog(path string) error { + if err := model.RemoveFile(path); err != nil { + log.Error("remove local file error: " + err.Error()) + return err + } + return nil +} + +// 删除远程日志 +func RemoveRemoteLog(task model.Task) error { + msg := msg_handler.NodeMessage{ + Type: constants.MsgTypeRemoveLog, + LogPath: task.LogPath, + TaskId: task.Id, + } + msgBytes, err := json.Marshal(&msg) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + // 发布获取日志消息 + channel := "nodes:" + task.NodeId.Hex() + if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil { + log.Errorf(err.Error()) + return err + } + return nil +} + +// 删除日志文件 +func RemoveLogBySpiderId(id bson.ObjectId) error { + tasks, err := model.GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts") + if err != nil { + log.Error("get tasks error:" + err.Error()) + } + for _, task := range tasks { + if err := RemoveLocalLog(task.LogPath); err != nil { + log.Error("remove local log error:" + err.Error()) + } + if err := RemoveRemoteLog(task); err != nil { + log.Error("remove remote log error:" + err.Error()) + } + } + return nil +} + +// 初始化定时删除日志 func InitDeleteLogPeriodically() error { c := cron.New(cron.WithSeconds()) if _, err := c.AddFunc(viper.GetString("log.deleteFrequency"), DeleteLogPeriodically); err != nil { diff --git a/backend/services/msg_handler/handler.go b/backend/services/msg_handler/handler.go new file mode 100644 index 00000000..68a1f9d6 --- /dev/null +++ b/backend/services/msg_handler/handler.go @@ -0,0 +1,48 @@ +package msg_handler + +import ( + "crawlab/constants" + "crawlab/model" +) + +type Handler interface { + Handle() error +} + +func GetMsgHandler(msg NodeMessage) Handler { + if msg.Type == constants.MsgTypeGetLog { + return &Log{ + msg: msg, + } + } else if msg.Type == constants.MsgTypeCancelTask { + return &Task{ + msg: msg, + } + } else if msg.Type == constants.MsgTypeGetSystemInfo { + return &SystemInfo{ + msg: msg, + } + } + return nil +} + +type NodeMessage struct { + // 通信类别 + Type string `json:"type"` + + // 任务相关 + TaskId string `json:"task_id"` // 任务ID + + // 节点相关 + NodeId string `json:"node_id"` // 节点ID + + // 日志相关 + LogPath string `json:"log_path"` // 日志路径 + Log string `json:"log"` // 日志 + + // 系统信息 + SysInfo model.SystemInfo `json:"sys_info"` + + // 错误相关 + Error string `json:"error"` +} diff --git a/backend/services/msg_handler/msg_log.go b/backend/services/msg_handler/msg_log.go new file mode 100644 index 00000000..0d09d784 --- /dev/null +++ b/backend/services/msg_handler/msg_log.go @@ -0,0 +1,60 @@ +package msg_handler + +import ( + "crawlab/constants" + "crawlab/database" + "crawlab/model" + "crawlab/utils" + "encoding/json" + "github.com/apex/log" + "runtime/debug" +) + +type Log struct { + msg NodeMessage +} + +func (g *Log) Handle() error { + if g.msg.Type == constants.MsgTypeGetLog { + return g.get() + } else if g.msg.Type == constants.MsgTypeRemoveLog { + return g.remove() + } + return nil +} + +func (g *Log) get() error { + // 发出的消息 + msgSd := NodeMessage{ + Type: constants.MsgTypeGetLog, + TaskId: g.msg.TaskId, + } + // 获取本地日志 + logStr, err := model.GetLocalLog(g.msg.LogPath) + log.Info(utils.BytesToString(logStr)) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + msgSd.Error = err.Error() + msgSd.Log = err.Error() + } else { + msgSd.Log = utils.BytesToString(logStr) + } + + // 序列化 + msgSdBytes, err := json.Marshal(&msgSd) + if err != nil { + return err + } + + // 发布消息给主节点 + log.Info("publish get log msg to master") + if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { + return err + } + return nil +} + +func (g *Log) remove() error { + return model.RemoveFile(g.msg.LogPath) +} diff --git a/backend/services/msg_handler/msg_system_info.go b/backend/services/msg_handler/msg_system_info.go new file mode 100644 index 00000000..c81cb0a0 --- /dev/null +++ b/backend/services/msg_handler/msg_system_info.go @@ -0,0 +1,39 @@ +package msg_handler + +import ( + "crawlab/constants" + "crawlab/database" + "crawlab/model" + "crawlab/utils" + "encoding/json" + "github.com/apex/log" + "runtime/debug" +) + +type SystemInfo struct { + msg NodeMessage +} + +func (s *SystemInfo) Handle() error { + // 获取环境信息 + sysInfo, err := model.GetLocalSystemInfo() + if err != nil { + return err + } + msgSd := NodeMessage{ + Type: constants.MsgTypeGetSystemInfo, + NodeId: s.msg.NodeId, + SysInfo: sysInfo, + } + msgSdBytes, err := json.Marshal(&msgSd) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { + log.Errorf(err.Error()) + return err + } + return nil +} diff --git a/backend/services/msg_handler/msg_task.go b/backend/services/msg_handler/msg_task.go new file mode 100644 index 00000000..1d218264 --- /dev/null +++ b/backend/services/msg_handler/msg_task.go @@ -0,0 +1,17 @@ +package msg_handler + +import ( + "crawlab/constants" + "crawlab/utils" +) + +type Task struct { + msg NodeMessage +} + +func (t *Task) Handle() error { + // 取消任务 + ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId) + ch <- constants.TaskCancel + return nil +} diff --git a/backend/services/node.go b/backend/services/node.go index 63373be8..44fa3905 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -6,6 +6,7 @@ import ( "crawlab/database" "crawlab/lib/cron" "crawlab/model" + "crawlab/services/msg_handler" "crawlab/services/register" "crawlab/utils" "encoding/json" @@ -27,27 +28,6 @@ type Data struct { UpdateTsUnix int64 `json:"update_ts_unix"` } -type NodeMessage struct { - // 通信类别 - Type string `json:"type"` - - // 任务相关 - TaskId string `json:"task_id"` // 任务ID - - // 节点相关 - NodeId string `json:"node_id"` // 节点ID - - // 日志相关 - LogPath string `json:"log_path"` // 日志路径 - Log string `json:"log"` // 日志 - - // 系统信息 - SysInfo model.SystemInfo `json:"sys_info"` - - // 错误相关 - Error string `json:"error"` -} - const ( Yes = "Y" No = "N" @@ -263,7 +243,7 @@ func UpdateNodeData() { func MasterNodeCallback(message redis.Message) (err error) { // 反序列化 - var msg NodeMessage + var msg msg_handler.NodeMessage if err := json.Unmarshal(message.Data, &msg); err != nil { return err @@ -288,72 +268,17 @@ func MasterNodeCallback(message redis.Message) (err error) { func WorkerNodeCallback(message redis.Message) (err error) { // 反序列化 - msg := NodeMessage{} + msg := msg_handler.NodeMessage{} if err := json.Unmarshal(message.Data, &msg); err != nil { return err } - if msg.Type == constants.MsgTypeGetLog { - // 消息类型为获取日志 - - // 发出的消息 - msgSd := NodeMessage{ - Type: constants.MsgTypeGetLog, - TaskId: msg.TaskId, - } - - // 获取本地日志 - logStr, err := GetLocalLog(msg.LogPath) - log.Info(utils.BytesToString(logStr)) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - msgSd.Error = err.Error() - msgSd.Log = err.Error() - } else { - msgSd.Log = utils.BytesToString(logStr) - } - - // 序列化 - msgSdBytes, err := json.Marshal(&msgSd) - if err != nil { - return err - } - - // 发布消息给主节点 - log.Info("publish get log msg to master") - if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { - - return err - } - } else if msg.Type == constants.MsgTypeCancelTask { - // 取消任务 - ch := TaskExecChanMap.ChanBlocked(msg.TaskId) - ch <- constants.TaskCancel - } else if msg.Type == constants.MsgTypeGetSystemInfo { - // 获取环境信息 - sysInfo, err := GetLocalSystemInfo() - if err != nil { - return err - } - msgSd := NodeMessage{ - Type: constants.MsgTypeGetSystemInfo, - NodeId: msg.NodeId, - SysInfo: sysInfo, - } - msgSdBytes, err := json.Marshal(&msgSd) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return err - } - if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { - log.Errorf(err.Error()) - return err - } + // worker message handle + if err := msg_handler.GetMsgHandler(msg).Handle(); err != nil { + return err } - return + return nil } // 初始化节点服务 diff --git a/backend/services/system.go b/backend/services/system.go index b30b2bc7..2c7cd05a 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -4,108 +4,16 @@ import ( "crawlab/constants" "crawlab/database" "crawlab/model" + "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" - "github.com/apex/log" - "io/ioutil" - "os" - "path/filepath" - "runtime" - "runtime/debug" - "strings" ) var SystemInfoChanMap = utils.NewChanMap() -var executableNameMap = map[string]string{ - // python - "python": "Python", - "python2": "Python 2", - "python2.7": "Python 2.7", - "python3": "Python 3", - "python3.5": "Python 3.5", - "python3.6": "Python 3.6", - "python3.7": "Python 3.7", - "python3.8": "Python 3.8", - // java - "java": "Java", - // go - "go": "Go", - // node - "node": "NodeJS", - // php - "php": "PHP", - // windows command - "cmd": "Windows Command Prompt", - // linux shell - "sh": "Shell", - "bash": "bash", -} - -func GetSystemEnv(key string) string { - return os.Getenv(key) -} - -func GetPathValues() (paths []string) { - pathEnv := GetSystemEnv("PATH") - return strings.Split(pathEnv, ":") -} - -func GetExecutables() (executables []model.Executable, err error) { - pathValues := GetPathValues() - - cache := map[string]string{} - - for _, path := range pathValues { - fileList, err := ioutil.ReadDir(path) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } - - for _, file := range fileList { - displayName := executableNameMap[file.Name()] - filePath := filepath.Join(path, file.Name()) - - if cache[filePath] == "" { - if displayName != "" { - executables = append(executables, model.Executable{ - Path: filePath, - FileName: file.Name(), - DisplayName: displayName, - }) - } - cache[filePath] = filePath - } - } - } - return executables, nil -} - -func GetLocalSystemInfo() (sysInfo model.SystemInfo, err error) { - executables, err := GetExecutables() - if err != nil { - return sysInfo, err - } - hostname, err := os.Hostname() - if err != nil { - debug.PrintStack() - return sysInfo, err - } - - return model.SystemInfo{ - ARCH: runtime.GOARCH, - OS: runtime.GOOS, - NumCpu: runtime.GOMAXPROCS(0), - Hostname: hostname, - Executables: executables, - }, nil -} - func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { // 发送消息 - msg := NodeMessage{ + msg := msg_handler.NodeMessage{ Type: constants.MsgTypeGetSystemInfo, NodeId: id, } @@ -132,7 +40,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { func GetSystemInfo(id string) (sysInfo model.SystemInfo, err error) { if IsMasterNode(id) { - sysInfo, err = GetLocalSystemInfo() + sysInfo, err = model.GetLocalSystemInfo() } else { sysInfo, err = GetRemoteSystemInfo(id) } diff --git a/backend/services/task.go b/backend/services/task.go index dbaa2800..5f3a4d07 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -5,6 +5,7 @@ import ( "crawlab/database" "crawlab/lib/cron" "crawlab/model" + "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" "errors" @@ -25,6 +26,7 @@ var Exec *Executor // 任务执行锁 //Added by cloud: 2019/09/04,solve data race var LockList sync.Map + // 任务消息 type TaskMessage struct { Id string @@ -69,8 +71,6 @@ func (ex *Executor) Start() error { return nil } -var TaskExecChanMap = utils.NewChanMap() - // 派发任务 func AssignTask(task model.Task) error { // 生成任务信息 @@ -135,7 +135,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e } // 起一个goroutine来监控进程 - ch := TaskExecChanMap.ChanBlocked(t.Id) + ch := utils.TaskExecChanMap.ChanBlocked(t.Id) go func() { // 传入信号,此处阻塞 signal := <-ch @@ -408,7 +408,7 @@ func GetTaskLog(id string) (logStr string, err error) { logStr = "" if IsMasterNode(task.NodeId.Hex()) { // 若为主节点,获取本机日志 - logBytes, err := GetLocalLog(task.LogPath) + logBytes, err := model.GetLocalLog(task.LogPath) logStr = utils.BytesToString(logBytes) if err != nil { log.Errorf(err.Error()) @@ -452,7 +452,7 @@ func CancelTask(id string) (err error) { // 任务节点为主节点 // 获取任务执行频道 - ch := TaskExecChanMap.ChanBlocked(id) + ch := utils.TaskExecChanMap.ChanBlocked(id) // 发出取消进程信号 ch <- constants.TaskCancel @@ -460,7 +460,7 @@ func CancelTask(id string) (err error) { // 任务节点为工作节点 // 序列化消息 - msg := NodeMessage{ + msg := msg_handler.NodeMessage{ Type: constants.MsgTypeCancelTask, TaskId: id, } diff --git a/backend/utils/chan.go b/backend/utils/chan.go index 3e9fde61..7b63ac0f 100644 --- a/backend/utils/chan.go +++ b/backend/utils/chan.go @@ -1,5 +1,7 @@ package utils +var TaskExecChanMap = NewChanMap() + type ChanMap struct { m map[string]chan string }