diff --git a/backend/constants/message.go b/backend/constants/message.go index 521a2019..f76e8fc3 100644 --- a/backend/constants/message.go +++ b/backend/constants/message.go @@ -4,4 +4,5 @@ const ( MsgTypeGetLog = "get-log" MsgTypeGetSystemInfo = "get-sys-info" MsgTypeCancelTask = "cancel-task" + MsgTypeRemoveLog = "remove-log" ) diff --git a/backend/model/file.go b/backend/model/file.go index f8963d06..3cea7b39 100644 --- a/backend/model/file.go +++ b/backend/model/file.go @@ -1,8 +1,25 @@ package model +import ( + "crawlab/utils" + "github.com/apex/log" + "os" +) + type File struct { Name string `json:"name"` Path string `json:"path"` IsDir bool `json:"is_dir"` Size int64 `json:"size"` } + +func RemoveFile(path string) error { + if !utils.Exists(path) { + log.Info("file not found: " + path) + return nil + } + if err := os.Remove(path); err != nil { + return err + } + return nil +} diff --git a/backend/model/log.go b/backend/model/log.go new file mode 100644 index 00000000..ae6973b1 --- /dev/null +++ b/backend/model/log.go @@ -0,0 +1,43 @@ +package model + +import ( + "github.com/apex/log" + "os" + "runtime/debug" +) + +// 获取本地日志 +func GetLocalLog(logPath string) (fileBytes []byte, err error) { + + f, err := os.Open(logPath) + if err != nil { + log.Error(err.Error()) + debug.PrintStack() + return nil, err + } + fi, err := f.Stat() + if err != nil { + log.Error(err.Error()) + debug.PrintStack() + return nil, err + } + defer f.Close() + + const bufLen = 2 * 1024 * 1024 + logBuf := make([]byte, bufLen) + + off := int64(0) + if fi.Size() > int64(len(logBuf)) { + off = fi.Size() - int64(len(logBuf)) + } + n, err := f.ReadAt(logBuf, off) + + //到文件结尾会有EOF标识 + if err != nil && err.Error() != "EOF" { + log.Error(err.Error()) + debug.PrintStack() + return nil, err + } + logBuf = logBuf[:n] + return logBuf, nil +} diff --git a/backend/model/system.go b/backend/model/system.go index c4865a24..6091c963 100644 --- a/backend/model/system.go +++ b/backend/model/system.go @@ -1,5 +1,40 @@ package model +import ( + "github.com/apex/log" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "runtime/debug" + "strings" +) + +var executableNameMap = map[string]string{ + // python + "python": "Python", + "python2": "Python 2", + "python2.7": "Python 2.7", + "python3": "Python 3", + "python3.5": "Python 3.5", + "python3.6": "Python 3.6", + "python3.7": "Python 3.7", + "python3.8": "Python 3.8", + // java + "java": "Java", + // go + "go": "Go", + // node + "node": "NodeJS", + // php + "php": "PHP", + // windows command + "cmd": "Windows Command Prompt", + // linux shell + "sh": "Shell", + "bash": "bash", +} + type SystemInfo struct { ARCH string `json:"arch"` OS string `json:"os"` @@ -13,3 +48,64 @@ type Executable struct { FileName string `json:"file_name"` DisplayName string `json:"display_name"` } + +func GetLocalSystemInfo() (sysInfo SystemInfo, err error) { + executables, err := GetExecutables() + if err != nil { + return sysInfo, err + } + hostname, err := os.Hostname() + if err != nil { + debug.PrintStack() + return sysInfo, err + } + + return SystemInfo{ + ARCH: runtime.GOARCH, + OS: runtime.GOOS, + NumCpu: runtime.GOMAXPROCS(0), + Hostname: hostname, + Executables: executables, + }, nil +} + +func GetSystemEnv(key string) string { + return os.Getenv(key) +} + +func GetPathValues() (paths []string) { + pathEnv := GetSystemEnv("PATH") + return strings.Split(pathEnv, ":") +} + +func GetExecutables() (executables []Executable, err error) { + pathValues := GetPathValues() + + cache := map[string]string{} + + for _, path := range pathValues { + fileList, err := ioutil.ReadDir(path) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + continue + } + + for _, file := range fileList { + displayName := executableNameMap[file.Name()] + filePath := filepath.Join(path, file.Name()) + + if cache[filePath] == "" { + if displayName != "" { + executables = append(executables, Executable{ + Path: filePath, + FileName: file.Name(), + DisplayName: displayName, + }) + } + cache[filePath] = filePath + } + } + } + return executables, nil +} diff --git a/backend/model/task.go b/backend/model/task.go index 4957b577..177edccb 100644 --- a/backend/model/task.go +++ b/backend/model/task.go @@ -191,7 +191,7 @@ func RemoveTask(id string) error { return nil } -func RemoveTaskBySpiderId(id string) error { +func RemoveTaskBySpiderId(id bson.ObjectId) error { tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts") if err != nil { log.Error("get tasks error:" + err.Error()) diff --git a/backend/routes/spider.go b/backend/routes/spider.go index 76e5c568..e0afb1a8 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -229,8 +229,14 @@ func DeleteSpider(c *gin.Context) { return } + // 删除日志文件 + if err := services.RemoveLogBySpiderId(spider.Id); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + // 删除爬虫对应的task任务 - if err := model.RemoveTaskBySpiderId(spider.Id.Hex()); err != nil { + if err := model.RemoveTaskBySpiderId(spider.Id); err != nil { HandleError(http.StatusInternalServerError, c, err) return } diff --git a/backend/routes/task.go b/backend/routes/task.go index 4dc42f6f..c84ea210 100644 --- a/backend/routes/task.go +++ b/backend/routes/task.go @@ -124,6 +124,13 @@ func PutTask(c *gin.Context) { func DeleteTask(c *gin.Context) { id := c.Param("id") + // 删除日志文件 + if err := services.RemoveLogByTaskId(id); err != nil { + HandleError(http.StatusInternalServerError, c, err) + return + } + + // 删除task if err := model.RemoveTask(id); err != nil { HandleError(http.StatusInternalServerError, c, err) return diff --git a/backend/services/log.go b/backend/services/log.go index 1344b02f..95459f8f 100644 --- a/backend/services/log.go +++ b/backend/services/log.go @@ -5,9 +5,11 @@ import ( "crawlab/database" "crawlab/lib/cron" "crawlab/model" + "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" "github.com/apex/log" + "github.com/globalsign/mgo/bson" "github.com/spf13/viper" "io/ioutil" "os" @@ -18,46 +20,10 @@ import ( // 任务日志频道映射 var TaskLogChanMap = utils.NewChanMap() -// 获取本地日志 -func GetLocalLog(logPath string) (fileBytes []byte, err error) { - - f, err := os.Open(logPath) - if err != nil { - log.Error(err.Error()) - debug.PrintStack() - return nil, err - } - fi, err := f.Stat() - if err != nil { - log.Error(err.Error()) - debug.PrintStack() - return nil, err - } - defer f.Close() - - const bufLen = 2 * 1024 * 1024 - logBuf := make([]byte, bufLen) - - off := int64(0) - if fi.Size() > int64(len(logBuf)) { - off = fi.Size() - int64(len(logBuf)) - } - n, err := f.ReadAt(logBuf, off) - - //到文件结尾会有EOF标识 - if err != nil && err.Error() != "EOF" { - log.Error(err.Error()) - debug.PrintStack() - return nil, err - } - logBuf = logBuf[:n] - return logBuf, nil -} - // 获取远端日志 func GetRemoteLog(task model.Task) (logStr string, err error) { // 序列化消息 - msg := NodeMessage{ + msg := msg_handler.NodeMessage{ Type: constants.MsgTypeGetLog, LogPath: task.LogPath, TaskId: task.Id, @@ -85,6 +51,7 @@ func GetRemoteLog(task model.Task) (logStr string, err error) { return logStr, nil } +// 定时删除日志 func DeleteLogPeriodically() { logDir := viper.GetString("log.path") if !utils.Exists(logDir) { @@ -107,6 +74,71 @@ func DeleteLogPeriodically() { } +// 删除本地日志 +func RemoveLocalLog(path string) error { + if err := model.RemoveFile(path); err != nil { + log.Error("remove local file error: " + err.Error()) + return err + } + return nil +} + +// 删除远程日志 +func RemoveRemoteLog(task model.Task) error { + msg := msg_handler.NodeMessage{ + Type: constants.MsgTypeRemoveLog, + LogPath: task.LogPath, + TaskId: task.Id, + } + msgBytes, err := json.Marshal(&msg) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + // 发布获取日志消息 + channel := "nodes:" + task.NodeId.Hex() + if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil { + log.Errorf(err.Error()) + return err + } + return nil +} + +// 删除日志文件 +func RemoveLogByTaskId(id string) error { + t, err := model.GetTask(id) + if err != nil { + log.Error("get task error:" + err.Error()) + return err + } + removeLog(t) + + return nil +} + +func removeLog(t model.Task) { + if err := RemoveLocalLog(t.LogPath); err != nil { + log.Error("remove local log error:" + err.Error()) + } + if err := RemoveRemoteLog(t); err != nil { + log.Error("remove remote log error:" + err.Error()) + } +} + +// 删除日志文件 +func RemoveLogBySpiderId(id bson.ObjectId) error { + tasks, err := model.GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts") + if err != nil { + log.Error("get tasks error:" + err.Error()) + } + for _, task := range tasks { + removeLog(task) + } + return nil +} + +// 初始化定时删除日志 func InitDeleteLogPeriodically() error { c := cron.New(cron.WithSeconds()) if _, err := c.AddFunc(viper.GetString("log.deleteFrequency"), DeleteLogPeriodically); err != nil { diff --git a/backend/services/msg_handler/handler.go b/backend/services/msg_handler/handler.go new file mode 100644 index 00000000..61516bcf --- /dev/null +++ b/backend/services/msg_handler/handler.go @@ -0,0 +1,48 @@ +package msg_handler + +import ( + "crawlab/constants" + "crawlab/model" +) + +type Handler interface { + Handle() error +} + +func GetMsgHandler(msg NodeMessage) Handler { + if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog { + return &Log{ + msg: msg, + } + } else if msg.Type == constants.MsgTypeCancelTask { + return &Task{ + msg: msg, + } + } else if msg.Type == constants.MsgTypeGetSystemInfo { + return &SystemInfo{ + msg: msg, + } + } + return nil +} + +type NodeMessage struct { + // 通信类别 + Type string `json:"type"` + + // 任务相关 + TaskId string `json:"task_id"` // 任务ID + + // 节点相关 + NodeId string `json:"node_id"` // 节点ID + + // 日志相关 + LogPath string `json:"log_path"` // 日志路径 + Log string `json:"log"` // 日志 + + // 系统信息 + SysInfo model.SystemInfo `json:"sys_info"` + + // 错误相关 + Error string `json:"error"` +} diff --git a/backend/services/msg_handler/msg_log.go b/backend/services/msg_handler/msg_log.go new file mode 100644 index 00000000..0d09d784 --- /dev/null +++ b/backend/services/msg_handler/msg_log.go @@ -0,0 +1,60 @@ +package msg_handler + +import ( + "crawlab/constants" + "crawlab/database" + "crawlab/model" + "crawlab/utils" + "encoding/json" + "github.com/apex/log" + "runtime/debug" +) + +type Log struct { + msg NodeMessage +} + +func (g *Log) Handle() error { + if g.msg.Type == constants.MsgTypeGetLog { + return g.get() + } else if g.msg.Type == constants.MsgTypeRemoveLog { + return g.remove() + } + return nil +} + +func (g *Log) get() error { + // 发出的消息 + msgSd := NodeMessage{ + Type: constants.MsgTypeGetLog, + TaskId: g.msg.TaskId, + } + // 获取本地日志 + logStr, err := model.GetLocalLog(g.msg.LogPath) + log.Info(utils.BytesToString(logStr)) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + msgSd.Error = err.Error() + msgSd.Log = err.Error() + } else { + msgSd.Log = utils.BytesToString(logStr) + } + + // 序列化 + msgSdBytes, err := json.Marshal(&msgSd) + if err != nil { + return err + } + + // 发布消息给主节点 + log.Info("publish get log msg to master") + if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { + return err + } + return nil +} + +func (g *Log) remove() error { + return model.RemoveFile(g.msg.LogPath) +} diff --git a/backend/services/msg_handler/msg_system_info.go b/backend/services/msg_handler/msg_system_info.go new file mode 100644 index 00000000..c81cb0a0 --- /dev/null +++ b/backend/services/msg_handler/msg_system_info.go @@ -0,0 +1,39 @@ +package msg_handler + +import ( + "crawlab/constants" + "crawlab/database" + "crawlab/model" + "crawlab/utils" + "encoding/json" + "github.com/apex/log" + "runtime/debug" +) + +type SystemInfo struct { + msg NodeMessage +} + +func (s *SystemInfo) Handle() error { + // 获取环境信息 + sysInfo, err := model.GetLocalSystemInfo() + if err != nil { + return err + } + msgSd := NodeMessage{ + Type: constants.MsgTypeGetSystemInfo, + NodeId: s.msg.NodeId, + SysInfo: sysInfo, + } + msgSdBytes, err := json.Marshal(&msgSd) + if err != nil { + log.Errorf(err.Error()) + debug.PrintStack() + return err + } + if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { + log.Errorf(err.Error()) + return err + } + return nil +} diff --git a/backend/services/msg_handler/msg_task.go b/backend/services/msg_handler/msg_task.go new file mode 100644 index 00000000..1d218264 --- /dev/null +++ b/backend/services/msg_handler/msg_task.go @@ -0,0 +1,17 @@ +package msg_handler + +import ( + "crawlab/constants" + "crawlab/utils" +) + +type Task struct { + msg NodeMessage +} + +func (t *Task) Handle() error { + // 取消任务 + ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId) + ch <- constants.TaskCancel + return nil +} diff --git a/backend/services/node.go b/backend/services/node.go index 63373be8..44fa3905 100644 --- a/backend/services/node.go +++ b/backend/services/node.go @@ -6,6 +6,7 @@ import ( "crawlab/database" "crawlab/lib/cron" "crawlab/model" + "crawlab/services/msg_handler" "crawlab/services/register" "crawlab/utils" "encoding/json" @@ -27,27 +28,6 @@ type Data struct { UpdateTsUnix int64 `json:"update_ts_unix"` } -type NodeMessage struct { - // 通信类别 - Type string `json:"type"` - - // 任务相关 - TaskId string `json:"task_id"` // 任务ID - - // 节点相关 - NodeId string `json:"node_id"` // 节点ID - - // 日志相关 - LogPath string `json:"log_path"` // 日志路径 - Log string `json:"log"` // 日志 - - // 系统信息 - SysInfo model.SystemInfo `json:"sys_info"` - - // 错误相关 - Error string `json:"error"` -} - const ( Yes = "Y" No = "N" @@ -263,7 +243,7 @@ func UpdateNodeData() { func MasterNodeCallback(message redis.Message) (err error) { // 反序列化 - var msg NodeMessage + var msg msg_handler.NodeMessage if err := json.Unmarshal(message.Data, &msg); err != nil { return err @@ -288,72 +268,17 @@ func MasterNodeCallback(message redis.Message) (err error) { func WorkerNodeCallback(message redis.Message) (err error) { // 反序列化 - msg := NodeMessage{} + msg := msg_handler.NodeMessage{} if err := json.Unmarshal(message.Data, &msg); err != nil { return err } - if msg.Type == constants.MsgTypeGetLog { - // 消息类型为获取日志 - - // 发出的消息 - msgSd := NodeMessage{ - Type: constants.MsgTypeGetLog, - TaskId: msg.TaskId, - } - - // 获取本地日志 - logStr, err := GetLocalLog(msg.LogPath) - log.Info(utils.BytesToString(logStr)) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - msgSd.Error = err.Error() - msgSd.Log = err.Error() - } else { - msgSd.Log = utils.BytesToString(logStr) - } - - // 序列化 - msgSdBytes, err := json.Marshal(&msgSd) - if err != nil { - return err - } - - // 发布消息给主节点 - log.Info("publish get log msg to master") - if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { - - return err - } - } else if msg.Type == constants.MsgTypeCancelTask { - // 取消任务 - ch := TaskExecChanMap.ChanBlocked(msg.TaskId) - ch <- constants.TaskCancel - } else if msg.Type == constants.MsgTypeGetSystemInfo { - // 获取环境信息 - sysInfo, err := GetLocalSystemInfo() - if err != nil { - return err - } - msgSd := NodeMessage{ - Type: constants.MsgTypeGetSystemInfo, - NodeId: msg.NodeId, - SysInfo: sysInfo, - } - msgSdBytes, err := json.Marshal(&msgSd) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return err - } - if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil { - log.Errorf(err.Error()) - return err - } + // worker message handle + if err := msg_handler.GetMsgHandler(msg).Handle(); err != nil { + return err } - return + return nil } // 初始化节点服务 diff --git a/backend/services/spider.go b/backend/services/spider.go index 5763b3de..fdf09517 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -109,13 +109,28 @@ func SaveSpiders(spiders []model.Spider) error { if spider.Type != constants.Customized { continue } - - var spider_ *model.Spider - if err := c.Find(bson.M{"src": spider.Src}).One(&spider_); err != nil { + spider_ := []*model.Spider{} + _ = c.Find(bson.M{"src": spider.Src}).All(&spider_) + // 以防出现多个重复的爬虫 + if len(spider_) > 1 { + if _, err := c.RemoveAll(bson.M{"src": spider.Src}); err != nil { + log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src) + debug.PrintStack() + continue + } + if err := spider.Add(); err != nil { + log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src) + debug.PrintStack() + continue + } + continue + } + if len(spider_) == 0 { // 不存在 if err := spider.Add(); err != nil { + log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src) debug.PrintStack() - return err + continue } } } @@ -151,11 +166,14 @@ func ZipSpider(spider model.Spider) (filePath string, err error) { // 临时文件路径 randomId := uuid.NewV4() - filePath = filepath.Join( - viper.GetString("other.tmppath"), - randomId.String()+".zip", - ) - + tmpPath := viper.GetString("other.tmppath") + if !utils.Exists(tmpPath) { + if err := os.MkdirAll(tmpPath, 0777); err != nil { + log.Errorf("mkdir other.tmppath error: %v", err.Error()) + return "", err + } + } + filePath = filepath.Join(tmpPath, randomId.String()+".zip") // 将源文件夹打包为zip文件 d, err := os.Open(spider.Src) if err != nil { @@ -340,9 +358,16 @@ func OnFileUpload(message redis.Message) (err error) { // 生成唯一ID randomId := uuid.NewV4() - + tmpPath := viper.GetString("other.tmppath") + if !utils.Exists(tmpPath) { + if err := os.MkdirAll(tmpPath, 0777); err != nil { + log.Errorf("mkdir other.tmppath error: %v", err.Error()) + return err + } + } // 创建临时文件 - tmpFilePath := filepath.Join(viper.GetString("other.tmppath"), randomId.String()+".zip") + tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip") + tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm) if err != nil { log.Errorf(err.Error()) diff --git a/backend/services/system.go b/backend/services/system.go index b30b2bc7..2c7cd05a 100644 --- a/backend/services/system.go +++ b/backend/services/system.go @@ -4,108 +4,16 @@ import ( "crawlab/constants" "crawlab/database" "crawlab/model" + "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" - "github.com/apex/log" - "io/ioutil" - "os" - "path/filepath" - "runtime" - "runtime/debug" - "strings" ) var SystemInfoChanMap = utils.NewChanMap() -var executableNameMap = map[string]string{ - // python - "python": "Python", - "python2": "Python 2", - "python2.7": "Python 2.7", - "python3": "Python 3", - "python3.5": "Python 3.5", - "python3.6": "Python 3.6", - "python3.7": "Python 3.7", - "python3.8": "Python 3.8", - // java - "java": "Java", - // go - "go": "Go", - // node - "node": "NodeJS", - // php - "php": "PHP", - // windows command - "cmd": "Windows Command Prompt", - // linux shell - "sh": "Shell", - "bash": "bash", -} - -func GetSystemEnv(key string) string { - return os.Getenv(key) -} - -func GetPathValues() (paths []string) { - pathEnv := GetSystemEnv("PATH") - return strings.Split(pathEnv, ":") -} - -func GetExecutables() (executables []model.Executable, err error) { - pathValues := GetPathValues() - - cache := map[string]string{} - - for _, path := range pathValues { - fileList, err := ioutil.ReadDir(path) - if err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - continue - } - - for _, file := range fileList { - displayName := executableNameMap[file.Name()] - filePath := filepath.Join(path, file.Name()) - - if cache[filePath] == "" { - if displayName != "" { - executables = append(executables, model.Executable{ - Path: filePath, - FileName: file.Name(), - DisplayName: displayName, - }) - } - cache[filePath] = filePath - } - } - } - return executables, nil -} - -func GetLocalSystemInfo() (sysInfo model.SystemInfo, err error) { - executables, err := GetExecutables() - if err != nil { - return sysInfo, err - } - hostname, err := os.Hostname() - if err != nil { - debug.PrintStack() - return sysInfo, err - } - - return model.SystemInfo{ - ARCH: runtime.GOARCH, - OS: runtime.GOOS, - NumCpu: runtime.GOMAXPROCS(0), - Hostname: hostname, - Executables: executables, - }, nil -} - func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { // 发送消息 - msg := NodeMessage{ + msg := msg_handler.NodeMessage{ Type: constants.MsgTypeGetSystemInfo, NodeId: id, } @@ -132,7 +40,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) { func GetSystemInfo(id string) (sysInfo model.SystemInfo, err error) { if IsMasterNode(id) { - sysInfo, err = GetLocalSystemInfo() + sysInfo, err = model.GetLocalSystemInfo() } else { sysInfo, err = GetRemoteSystemInfo(id) } diff --git a/backend/services/task.go b/backend/services/task.go index dbaa2800..5f3a4d07 100644 --- a/backend/services/task.go +++ b/backend/services/task.go @@ -5,6 +5,7 @@ import ( "crawlab/database" "crawlab/lib/cron" "crawlab/model" + "crawlab/services/msg_handler" "crawlab/utils" "encoding/json" "errors" @@ -25,6 +26,7 @@ var Exec *Executor // 任务执行锁 //Added by cloud: 2019/09/04,solve data race var LockList sync.Map + // 任务消息 type TaskMessage struct { Id string @@ -69,8 +71,6 @@ func (ex *Executor) Start() error { return nil } -var TaskExecChanMap = utils.NewChanMap() - // 派发任务 func AssignTask(task model.Task) error { // 生成任务信息 @@ -135,7 +135,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e } // 起一个goroutine来监控进程 - ch := TaskExecChanMap.ChanBlocked(t.Id) + ch := utils.TaskExecChanMap.ChanBlocked(t.Id) go func() { // 传入信号,此处阻塞 signal := <-ch @@ -408,7 +408,7 @@ func GetTaskLog(id string) (logStr string, err error) { logStr = "" if IsMasterNode(task.NodeId.Hex()) { // 若为主节点,获取本机日志 - logBytes, err := GetLocalLog(task.LogPath) + logBytes, err := model.GetLocalLog(task.LogPath) logStr = utils.BytesToString(logBytes) if err != nil { log.Errorf(err.Error()) @@ -452,7 +452,7 @@ func CancelTask(id string) (err error) { // 任务节点为主节点 // 获取任务执行频道 - ch := TaskExecChanMap.ChanBlocked(id) + ch := utils.TaskExecChanMap.ChanBlocked(id) // 发出取消进程信号 ch <- constants.TaskCancel @@ -460,7 +460,7 @@ func CancelTask(id string) (err error) { // 任务节点为工作节点 // 序列化消息 - msg := NodeMessage{ + msg := msg_handler.NodeMessage{ Type: constants.MsgTypeCancelTask, TaskId: id, } diff --git a/backend/utils/chan.go b/backend/utils/chan.go index 3e9fde61..7b63ac0f 100644 --- a/backend/utils/chan.go +++ b/backend/utils/chan.go @@ -1,5 +1,7 @@ package utils +var TaskExecChanMap = NewChanMap() + type ChanMap struct { m map[string]chan string } diff --git a/frontend/src/views/spider/SpiderList.vue b/frontend/src/views/spider/SpiderList.vue index dbd1dda8..6ff5cb35 100644 --- a/frontend/src/views/spider/SpiderList.vue +++ b/frontend/src/views/spider/SpiderList.vue @@ -207,7 +207,7 @@ :width="col.width"> - +