mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
@@ -4,4 +4,5 @@ const (
|
||||
MsgTypeGetLog = "get-log"
|
||||
MsgTypeGetSystemInfo = "get-sys-info"
|
||||
MsgTypeCancelTask = "cancel-task"
|
||||
MsgTypeRemoveLog = "remove-log"
|
||||
)
|
||||
|
||||
@@ -19,7 +19,7 @@ func (r *Redis) Close() {
|
||||
}
|
||||
func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
|
||||
psc := redis.PubSubConn{Conn: r.pool.Get()}
|
||||
if err := psc.Subscribe(redis.Args{}.AddFlat(channel)); err != nil {
|
||||
if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
|
||||
return err
|
||||
}
|
||||
done := make(chan error, 1)
|
||||
|
||||
@@ -1,8 +1,25 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"crawlab/utils"
|
||||
"github.com/apex/log"
|
||||
"os"
|
||||
)
|
||||
|
||||
type File struct {
|
||||
Name string `json:"name"`
|
||||
Path string `json:"path"`
|
||||
IsDir bool `json:"is_dir"`
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
func RemoveFile(path string) error {
|
||||
if !utils.Exists(path) {
|
||||
log.Info("file not found: " + path)
|
||||
return nil
|
||||
}
|
||||
if err := os.Remove(path); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
43
backend/model/log.go
Normal file
43
backend/model/log.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"github.com/apex/log"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
// 获取本地日志
|
||||
func GetLocalLog(logPath string) (fileBytes []byte, err error) {
|
||||
|
||||
f, err := os.Open(logPath)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
debug.PrintStack()
|
||||
return nil, err
|
||||
}
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
debug.PrintStack()
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
const bufLen = 2 * 1024 * 1024
|
||||
logBuf := make([]byte, bufLen)
|
||||
|
||||
off := int64(0)
|
||||
if fi.Size() > int64(len(logBuf)) {
|
||||
off = fi.Size() - int64(len(logBuf))
|
||||
}
|
||||
n, err := f.ReadAt(logBuf, off)
|
||||
|
||||
//到文件结尾会有EOF标识
|
||||
if err != nil && err.Error() != "EOF" {
|
||||
log.Error(err.Error())
|
||||
debug.PrintStack()
|
||||
return nil, err
|
||||
}
|
||||
logBuf = logBuf[:n]
|
||||
return logBuf, nil
|
||||
}
|
||||
@@ -1,5 +1,40 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"github.com/apex/log"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var executableNameMap = map[string]string{
|
||||
// python
|
||||
"python": "Python",
|
||||
"python2": "Python 2",
|
||||
"python2.7": "Python 2.7",
|
||||
"python3": "Python 3",
|
||||
"python3.5": "Python 3.5",
|
||||
"python3.6": "Python 3.6",
|
||||
"python3.7": "Python 3.7",
|
||||
"python3.8": "Python 3.8",
|
||||
// java
|
||||
"java": "Java",
|
||||
// go
|
||||
"go": "Go",
|
||||
// node
|
||||
"node": "NodeJS",
|
||||
// php
|
||||
"php": "PHP",
|
||||
// windows command
|
||||
"cmd": "Windows Command Prompt",
|
||||
// linux shell
|
||||
"sh": "Shell",
|
||||
"bash": "bash",
|
||||
}
|
||||
|
||||
type SystemInfo struct {
|
||||
ARCH string `json:"arch"`
|
||||
OS string `json:"os"`
|
||||
@@ -13,3 +48,64 @@ type Executable struct {
|
||||
FileName string `json:"file_name"`
|
||||
DisplayName string `json:"display_name"`
|
||||
}
|
||||
|
||||
func GetLocalSystemInfo() (sysInfo SystemInfo, err error) {
|
||||
executables, err := GetExecutables()
|
||||
if err != nil {
|
||||
return sysInfo, err
|
||||
}
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return sysInfo, err
|
||||
}
|
||||
|
||||
return SystemInfo{
|
||||
ARCH: runtime.GOARCH,
|
||||
OS: runtime.GOOS,
|
||||
NumCpu: runtime.GOMAXPROCS(0),
|
||||
Hostname: hostname,
|
||||
Executables: executables,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func GetSystemEnv(key string) string {
|
||||
return os.Getenv(key)
|
||||
}
|
||||
|
||||
func GetPathValues() (paths []string) {
|
||||
pathEnv := GetSystemEnv("PATH")
|
||||
return strings.Split(pathEnv, ":")
|
||||
}
|
||||
|
||||
func GetExecutables() (executables []Executable, err error) {
|
||||
pathValues := GetPathValues()
|
||||
|
||||
cache := map[string]string{}
|
||||
|
||||
for _, path := range pathValues {
|
||||
fileList, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
continue
|
||||
}
|
||||
|
||||
for _, file := range fileList {
|
||||
displayName := executableNameMap[file.Name()]
|
||||
filePath := filepath.Join(path, file.Name())
|
||||
|
||||
if cache[filePath] == "" {
|
||||
if displayName != "" {
|
||||
executables = append(executables, Executable{
|
||||
Path: filePath,
|
||||
FileName: file.Name(),
|
||||
DisplayName: displayName,
|
||||
})
|
||||
}
|
||||
cache[filePath] = filePath
|
||||
}
|
||||
}
|
||||
}
|
||||
return executables, nil
|
||||
}
|
||||
|
||||
@@ -191,7 +191,7 @@ func RemoveTask(id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func RemoveTaskBySpiderId(id string) error {
|
||||
func RemoveTaskBySpiderId(id bson.ObjectId) error {
|
||||
tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")
|
||||
if err != nil {
|
||||
log.Error("get tasks error:" + err.Error())
|
||||
|
||||
@@ -229,8 +229,14 @@ func DeleteSpider(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 删除日志文件
|
||||
if err := services.RemoveLogBySpiderId(spider.Id); err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 删除爬虫对应的task任务
|
||||
if err := model.RemoveTaskBySpiderId(spider.Id.Hex()); err != nil {
|
||||
if err := model.RemoveTaskBySpiderId(spider.Id); err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -124,6 +124,13 @@ func PutTask(c *gin.Context) {
|
||||
func DeleteTask(c *gin.Context) {
|
||||
id := c.Param("id")
|
||||
|
||||
// 删除日志文件
|
||||
if err := services.RemoveLogByTaskId(id); err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 删除task
|
||||
if err := model.RemoveTask(id); err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
|
||||
@@ -5,9 +5,11 @@ import (
|
||||
"crawlab/database"
|
||||
"crawlab/lib/cron"
|
||||
"crawlab/model"
|
||||
"crawlab/services/msg_handler"
|
||||
"crawlab/utils"
|
||||
"encoding/json"
|
||||
"github.com/apex/log"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
"github.com/spf13/viper"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@@ -18,46 +20,10 @@ import (
|
||||
// 任务日志频道映射
|
||||
var TaskLogChanMap = utils.NewChanMap()
|
||||
|
||||
// 获取本地日志
|
||||
func GetLocalLog(logPath string) (fileBytes []byte, err error) {
|
||||
|
||||
f, err := os.Open(logPath)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
debug.PrintStack()
|
||||
return nil, err
|
||||
}
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
debug.PrintStack()
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
const bufLen = 2 * 1024 * 1024
|
||||
logBuf := make([]byte, bufLen)
|
||||
|
||||
off := int64(0)
|
||||
if fi.Size() > int64(len(logBuf)) {
|
||||
off = fi.Size() - int64(len(logBuf))
|
||||
}
|
||||
n, err := f.ReadAt(logBuf, off)
|
||||
|
||||
//到文件结尾会有EOF标识
|
||||
if err != nil && err.Error() != "EOF" {
|
||||
log.Error(err.Error())
|
||||
debug.PrintStack()
|
||||
return nil, err
|
||||
}
|
||||
logBuf = logBuf[:n]
|
||||
return logBuf, nil
|
||||
}
|
||||
|
||||
// 获取远端日志
|
||||
func GetRemoteLog(task model.Task) (logStr string, err error) {
|
||||
// 序列化消息
|
||||
msg := NodeMessage{
|
||||
msg := msg_handler.NodeMessage{
|
||||
Type: constants.MsgTypeGetLog,
|
||||
LogPath: task.LogPath,
|
||||
TaskId: task.Id,
|
||||
@@ -85,6 +51,7 @@ func GetRemoteLog(task model.Task) (logStr string, err error) {
|
||||
return logStr, nil
|
||||
}
|
||||
|
||||
// 定时删除日志
|
||||
func DeleteLogPeriodically() {
|
||||
logDir := viper.GetString("log.path")
|
||||
if !utils.Exists(logDir) {
|
||||
@@ -107,6 +74,71 @@ func DeleteLogPeriodically() {
|
||||
|
||||
}
|
||||
|
||||
// 删除本地日志
|
||||
func RemoveLocalLog(path string) error {
|
||||
if err := model.RemoveFile(path); err != nil {
|
||||
log.Error("remove local file error: " + err.Error())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 删除远程日志
|
||||
func RemoveRemoteLog(task model.Task) error {
|
||||
msg := msg_handler.NodeMessage{
|
||||
Type: constants.MsgTypeRemoveLog,
|
||||
LogPath: task.LogPath,
|
||||
TaskId: task.Id,
|
||||
}
|
||||
msgBytes, err := json.Marshal(&msg)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
// 发布获取日志消息
|
||||
channel := "nodes:" + task.NodeId.Hex()
|
||||
if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 删除日志文件
|
||||
func RemoveLogByTaskId(id string) error {
|
||||
t, err := model.GetTask(id)
|
||||
if err != nil {
|
||||
log.Error("get task error:" + err.Error())
|
||||
return err
|
||||
}
|
||||
removeLog(t)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeLog(t model.Task) {
|
||||
if err := RemoveLocalLog(t.LogPath); err != nil {
|
||||
log.Error("remove local log error:" + err.Error())
|
||||
}
|
||||
if err := RemoveRemoteLog(t); err != nil {
|
||||
log.Error("remove remote log error:" + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// 删除日志文件
|
||||
func RemoveLogBySpiderId(id bson.ObjectId) error {
|
||||
tasks, err := model.GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")
|
||||
if err != nil {
|
||||
log.Error("get tasks error:" + err.Error())
|
||||
}
|
||||
for _, task := range tasks {
|
||||
removeLog(task)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 初始化定时删除日志
|
||||
func InitDeleteLogPeriodically() error {
|
||||
c := cron.New(cron.WithSeconds())
|
||||
if _, err := c.AddFunc(viper.GetString("log.deleteFrequency"), DeleteLogPeriodically); err != nil {
|
||||
|
||||
48
backend/services/msg_handler/handler.go
Normal file
48
backend/services/msg_handler/handler.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package msg_handler
|
||||
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/model"
|
||||
)
|
||||
|
||||
type Handler interface {
|
||||
Handle() error
|
||||
}
|
||||
|
||||
func GetMsgHandler(msg NodeMessage) Handler {
|
||||
if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog {
|
||||
return &Log{
|
||||
msg: msg,
|
||||
}
|
||||
} else if msg.Type == constants.MsgTypeCancelTask {
|
||||
return &Task{
|
||||
msg: msg,
|
||||
}
|
||||
} else if msg.Type == constants.MsgTypeGetSystemInfo {
|
||||
return &SystemInfo{
|
||||
msg: msg,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type NodeMessage struct {
|
||||
// 通信类别
|
||||
Type string `json:"type"`
|
||||
|
||||
// 任务相关
|
||||
TaskId string `json:"task_id"` // 任务ID
|
||||
|
||||
// 节点相关
|
||||
NodeId string `json:"node_id"` // 节点ID
|
||||
|
||||
// 日志相关
|
||||
LogPath string `json:"log_path"` // 日志路径
|
||||
Log string `json:"log"` // 日志
|
||||
|
||||
// 系统信息
|
||||
SysInfo model.SystemInfo `json:"sys_info"`
|
||||
|
||||
// 错误相关
|
||||
Error string `json:"error"`
|
||||
}
|
||||
60
backend/services/msg_handler/msg_log.go
Normal file
60
backend/services/msg_handler/msg_log.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package msg_handler
|
||||
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"crawlab/model"
|
||||
"crawlab/utils"
|
||||
"encoding/json"
|
||||
"github.com/apex/log"
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
type Log struct {
|
||||
msg NodeMessage
|
||||
}
|
||||
|
||||
func (g *Log) Handle() error {
|
||||
if g.msg.Type == constants.MsgTypeGetLog {
|
||||
return g.get()
|
||||
} else if g.msg.Type == constants.MsgTypeRemoveLog {
|
||||
return g.remove()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Log) get() error {
|
||||
// 发出的消息
|
||||
msgSd := NodeMessage{
|
||||
Type: constants.MsgTypeGetLog,
|
||||
TaskId: g.msg.TaskId,
|
||||
}
|
||||
// 获取本地日志
|
||||
logStr, err := model.GetLocalLog(g.msg.LogPath)
|
||||
log.Info(utils.BytesToString(logStr))
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
msgSd.Error = err.Error()
|
||||
msgSd.Log = err.Error()
|
||||
} else {
|
||||
msgSd.Log = utils.BytesToString(logStr)
|
||||
}
|
||||
|
||||
// 序列化
|
||||
msgSdBytes, err := json.Marshal(&msgSd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 发布消息给主节点
|
||||
log.Info("publish get log msg to master")
|
||||
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Log) remove() error {
|
||||
return model.RemoveFile(g.msg.LogPath)
|
||||
}
|
||||
39
backend/services/msg_handler/msg_system_info.go
Normal file
39
backend/services/msg_handler/msg_system_info.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package msg_handler
|
||||
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"crawlab/model"
|
||||
"crawlab/utils"
|
||||
"encoding/json"
|
||||
"github.com/apex/log"
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
type SystemInfo struct {
|
||||
msg NodeMessage
|
||||
}
|
||||
|
||||
func (s *SystemInfo) Handle() error {
|
||||
// 获取环境信息
|
||||
sysInfo, err := model.GetLocalSystemInfo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msgSd := NodeMessage{
|
||||
Type: constants.MsgTypeGetSystemInfo,
|
||||
NodeId: s.msg.NodeId,
|
||||
SysInfo: sysInfo,
|
||||
}
|
||||
msgSdBytes, err := json.Marshal(&msgSd)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
17
backend/services/msg_handler/msg_task.go
Normal file
17
backend/services/msg_handler/msg_task.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package msg_handler
|
||||
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/utils"
|
||||
)
|
||||
|
||||
type Task struct {
|
||||
msg NodeMessage
|
||||
}
|
||||
|
||||
func (t *Task) Handle() error {
|
||||
// 取消任务
|
||||
ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId)
|
||||
ch <- constants.TaskCancel
|
||||
return nil
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"crawlab/database"
|
||||
"crawlab/lib/cron"
|
||||
"crawlab/model"
|
||||
"crawlab/services/msg_handler"
|
||||
"crawlab/services/register"
|
||||
"crawlab/utils"
|
||||
"encoding/json"
|
||||
@@ -27,27 +28,6 @@ type Data struct {
|
||||
UpdateTsUnix int64 `json:"update_ts_unix"`
|
||||
}
|
||||
|
||||
type NodeMessage struct {
|
||||
// 通信类别
|
||||
Type string `json:"type"`
|
||||
|
||||
// 任务相关
|
||||
TaskId string `json:"task_id"` // 任务ID
|
||||
|
||||
// 节点相关
|
||||
NodeId string `json:"node_id"` // 节点ID
|
||||
|
||||
// 日志相关
|
||||
LogPath string `json:"log_path"` // 日志路径
|
||||
Log string `json:"log"` // 日志
|
||||
|
||||
// 系统信息
|
||||
SysInfo model.SystemInfo `json:"sys_info"`
|
||||
|
||||
// 错误相关
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
const (
|
||||
Yes = "Y"
|
||||
No = "N"
|
||||
@@ -263,7 +243,7 @@ func UpdateNodeData() {
|
||||
|
||||
func MasterNodeCallback(message redis.Message) (err error) {
|
||||
// 反序列化
|
||||
var msg NodeMessage
|
||||
var msg msg_handler.NodeMessage
|
||||
if err := json.Unmarshal(message.Data, &msg); err != nil {
|
||||
|
||||
return err
|
||||
@@ -288,72 +268,17 @@ func MasterNodeCallback(message redis.Message) (err error) {
|
||||
|
||||
func WorkerNodeCallback(message redis.Message) (err error) {
|
||||
// 反序列化
|
||||
msg := NodeMessage{}
|
||||
msg := msg_handler.NodeMessage{}
|
||||
if err := json.Unmarshal(message.Data, &msg); err != nil {
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if msg.Type == constants.MsgTypeGetLog {
|
||||
// 消息类型为获取日志
|
||||
|
||||
// 发出的消息
|
||||
msgSd := NodeMessage{
|
||||
Type: constants.MsgTypeGetLog,
|
||||
TaskId: msg.TaskId,
|
||||
}
|
||||
|
||||
// 获取本地日志
|
||||
logStr, err := GetLocalLog(msg.LogPath)
|
||||
log.Info(utils.BytesToString(logStr))
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
msgSd.Error = err.Error()
|
||||
msgSd.Log = err.Error()
|
||||
} else {
|
||||
msgSd.Log = utils.BytesToString(logStr)
|
||||
}
|
||||
|
||||
// 序列化
|
||||
msgSdBytes, err := json.Marshal(&msgSd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 发布消息给主节点
|
||||
log.Info("publish get log msg to master")
|
||||
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
|
||||
|
||||
return err
|
||||
}
|
||||
} else if msg.Type == constants.MsgTypeCancelTask {
|
||||
// 取消任务
|
||||
ch := TaskExecChanMap.ChanBlocked(msg.TaskId)
|
||||
ch <- constants.TaskCancel
|
||||
} else if msg.Type == constants.MsgTypeGetSystemInfo {
|
||||
// 获取环境信息
|
||||
sysInfo, err := GetLocalSystemInfo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msgSd := NodeMessage{
|
||||
Type: constants.MsgTypeGetSystemInfo,
|
||||
NodeId: msg.NodeId,
|
||||
SysInfo: sysInfo,
|
||||
}
|
||||
msgSdBytes, err := json.Marshal(&msgSd)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return err
|
||||
}
|
||||
// worker message handle
|
||||
if err := msg_handler.GetMsgHandler(msg).Handle(); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// 初始化节点服务
|
||||
|
||||
@@ -109,13 +109,28 @@ func SaveSpiders(spiders []model.Spider) error {
|
||||
if spider.Type != constants.Customized {
|
||||
continue
|
||||
}
|
||||
|
||||
var spider_ *model.Spider
|
||||
if err := c.Find(bson.M{"src": spider.Src}).One(&spider_); err != nil {
|
||||
spider_ := []*model.Spider{}
|
||||
_ = c.Find(bson.M{"src": spider.Src}).All(&spider_)
|
||||
// 以防出现多个重复的爬虫
|
||||
if len(spider_) > 1 {
|
||||
if _, err := c.RemoveAll(bson.M{"src": spider.Src}); err != nil {
|
||||
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
|
||||
debug.PrintStack()
|
||||
continue
|
||||
}
|
||||
if err := spider.Add(); err != nil {
|
||||
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
|
||||
debug.PrintStack()
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
if len(spider_) == 0 {
|
||||
// 不存在
|
||||
if err := spider.Add(); err != nil {
|
||||
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
|
||||
debug.PrintStack()
|
||||
return err
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -151,11 +166,14 @@ func ZipSpider(spider model.Spider) (filePath string, err error) {
|
||||
// 临时文件路径
|
||||
randomId := uuid.NewV4()
|
||||
|
||||
filePath = filepath.Join(
|
||||
viper.GetString("other.tmppath"),
|
||||
randomId.String()+".zip",
|
||||
)
|
||||
|
||||
tmpPath := viper.GetString("other.tmppath")
|
||||
if !utils.Exists(tmpPath) {
|
||||
if err := os.MkdirAll(tmpPath, 0777); err != nil {
|
||||
log.Errorf("mkdir other.tmppath error: %v", err.Error())
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
filePath = filepath.Join(tmpPath, randomId.String()+".zip")
|
||||
// 将源文件夹打包为zip文件
|
||||
d, err := os.Open(spider.Src)
|
||||
if err != nil {
|
||||
@@ -340,9 +358,16 @@ func OnFileUpload(message redis.Message) (err error) {
|
||||
|
||||
// 生成唯一ID
|
||||
randomId := uuid.NewV4()
|
||||
|
||||
tmpPath := viper.GetString("other.tmppath")
|
||||
if !utils.Exists(tmpPath) {
|
||||
if err := os.MkdirAll(tmpPath, 0777); err != nil {
|
||||
log.Errorf("mkdir other.tmppath error: %v", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
// 创建临时文件
|
||||
tmpFilePath := filepath.Join(viper.GetString("other.tmppath"), randomId.String()+".zip")
|
||||
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
|
||||
|
||||
tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
|
||||
@@ -4,108 +4,16 @@ import (
|
||||
"crawlab/constants"
|
||||
"crawlab/database"
|
||||
"crawlab/model"
|
||||
"crawlab/services/msg_handler"
|
||||
"crawlab/utils"
|
||||
"encoding/json"
|
||||
"github.com/apex/log"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var SystemInfoChanMap = utils.NewChanMap()
|
||||
|
||||
var executableNameMap = map[string]string{
|
||||
// python
|
||||
"python": "Python",
|
||||
"python2": "Python 2",
|
||||
"python2.7": "Python 2.7",
|
||||
"python3": "Python 3",
|
||||
"python3.5": "Python 3.5",
|
||||
"python3.6": "Python 3.6",
|
||||
"python3.7": "Python 3.7",
|
||||
"python3.8": "Python 3.8",
|
||||
// java
|
||||
"java": "Java",
|
||||
// go
|
||||
"go": "Go",
|
||||
// node
|
||||
"node": "NodeJS",
|
||||
// php
|
||||
"php": "PHP",
|
||||
// windows command
|
||||
"cmd": "Windows Command Prompt",
|
||||
// linux shell
|
||||
"sh": "Shell",
|
||||
"bash": "bash",
|
||||
}
|
||||
|
||||
func GetSystemEnv(key string) string {
|
||||
return os.Getenv(key)
|
||||
}
|
||||
|
||||
func GetPathValues() (paths []string) {
|
||||
pathEnv := GetSystemEnv("PATH")
|
||||
return strings.Split(pathEnv, ":")
|
||||
}
|
||||
|
||||
func GetExecutables() (executables []model.Executable, err error) {
|
||||
pathValues := GetPathValues()
|
||||
|
||||
cache := map[string]string{}
|
||||
|
||||
for _, path := range pathValues {
|
||||
fileList, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
continue
|
||||
}
|
||||
|
||||
for _, file := range fileList {
|
||||
displayName := executableNameMap[file.Name()]
|
||||
filePath := filepath.Join(path, file.Name())
|
||||
|
||||
if cache[filePath] == "" {
|
||||
if displayName != "" {
|
||||
executables = append(executables, model.Executable{
|
||||
Path: filePath,
|
||||
FileName: file.Name(),
|
||||
DisplayName: displayName,
|
||||
})
|
||||
}
|
||||
cache[filePath] = filePath
|
||||
}
|
||||
}
|
||||
}
|
||||
return executables, nil
|
||||
}
|
||||
|
||||
func GetLocalSystemInfo() (sysInfo model.SystemInfo, err error) {
|
||||
executables, err := GetExecutables()
|
||||
if err != nil {
|
||||
return sysInfo, err
|
||||
}
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return sysInfo, err
|
||||
}
|
||||
|
||||
return model.SystemInfo{
|
||||
ARCH: runtime.GOARCH,
|
||||
OS: runtime.GOOS,
|
||||
NumCpu: runtime.GOMAXPROCS(0),
|
||||
Hostname: hostname,
|
||||
Executables: executables,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
|
||||
// 发送消息
|
||||
msg := NodeMessage{
|
||||
msg := msg_handler.NodeMessage{
|
||||
Type: constants.MsgTypeGetSystemInfo,
|
||||
NodeId: id,
|
||||
}
|
||||
@@ -132,7 +40,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
|
||||
|
||||
func GetSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
|
||||
if IsMasterNode(id) {
|
||||
sysInfo, err = GetLocalSystemInfo()
|
||||
sysInfo, err = model.GetLocalSystemInfo()
|
||||
} else {
|
||||
sysInfo, err = GetRemoteSystemInfo(id)
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"crawlab/database"
|
||||
"crawlab/lib/cron"
|
||||
"crawlab/model"
|
||||
"crawlab/services/msg_handler"
|
||||
"crawlab/utils"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@@ -25,6 +26,7 @@ var Exec *Executor
|
||||
// 任务执行锁
|
||||
//Added by cloud: 2019/09/04,solve data race
|
||||
var LockList sync.Map
|
||||
|
||||
// 任务消息
|
||||
type TaskMessage struct {
|
||||
Id string
|
||||
@@ -69,8 +71,6 @@ func (ex *Executor) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var TaskExecChanMap = utils.NewChanMap()
|
||||
|
||||
// 派发任务
|
||||
func AssignTask(task model.Task) error {
|
||||
// 生成任务信息
|
||||
@@ -135,7 +135,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
|
||||
}
|
||||
|
||||
// 起一个goroutine来监控进程
|
||||
ch := TaskExecChanMap.ChanBlocked(t.Id)
|
||||
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
|
||||
go func() {
|
||||
// 传入信号,此处阻塞
|
||||
signal := <-ch
|
||||
@@ -408,7 +408,7 @@ func GetTaskLog(id string) (logStr string, err error) {
|
||||
logStr = ""
|
||||
if IsMasterNode(task.NodeId.Hex()) {
|
||||
// 若为主节点,获取本机日志
|
||||
logBytes, err := GetLocalLog(task.LogPath)
|
||||
logBytes, err := model.GetLocalLog(task.LogPath)
|
||||
logStr = utils.BytesToString(logBytes)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
@@ -452,7 +452,7 @@ func CancelTask(id string) (err error) {
|
||||
// 任务节点为主节点
|
||||
|
||||
// 获取任务执行频道
|
||||
ch := TaskExecChanMap.ChanBlocked(id)
|
||||
ch := utils.TaskExecChanMap.ChanBlocked(id)
|
||||
|
||||
// 发出取消进程信号
|
||||
ch <- constants.TaskCancel
|
||||
@@ -460,7 +460,7 @@ func CancelTask(id string) (err error) {
|
||||
// 任务节点为工作节点
|
||||
|
||||
// 序列化消息
|
||||
msg := NodeMessage{
|
||||
msg := msg_handler.NodeMessage{
|
||||
Type: constants.MsgTypeCancelTask,
|
||||
TaskId: id,
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package utils
|
||||
|
||||
var TaskExecChanMap = NewChanMap()
|
||||
|
||||
type ChanMap struct {
|
||||
m map[string]chan string
|
||||
}
|
||||
|
||||
@@ -207,7 +207,7 @@
|
||||
:width="col.width">
|
||||
</el-table-column>
|
||||
</template>
|
||||
<el-table-column :label="$t('Action')" align="left" width="150px" fixed="right">
|
||||
<el-table-column :label="$t('Action')" align="left" fixed="right">
|
||||
<template slot-scope="scope">
|
||||
<el-tooltip :content="$t('View')" placement="top">
|
||||
<el-button type="primary" icon="el-icon-search" size="mini" @click="onView(scope.row)"></el-button>
|
||||
|
||||
Reference in New Issue
Block a user