Merge pull request #224 from wo10378931/develop

Develop
This commit is contained in:
Marvin Zhang
2019-09-10 16:24:10 +08:00
committed by GitHub
18 changed files with 460 additions and 234 deletions

View File

@@ -4,4 +4,5 @@ const (
MsgTypeGetLog = "get-log"
MsgTypeGetSystemInfo = "get-sys-info"
MsgTypeCancelTask = "cancel-task"
MsgTypeRemoveLog = "remove-log"
)

View File

@@ -1,8 +1,25 @@
package model
import (
"crawlab/utils"
"github.com/apex/log"
"os"
)
type File struct {
Name string `json:"name"`
Path string `json:"path"`
IsDir bool `json:"is_dir"`
Size int64 `json:"size"`
}
func RemoveFile(path string) error {
if !utils.Exists(path) {
log.Info("file not found: " + path)
return nil
}
if err := os.Remove(path); err != nil {
return err
}
return nil
}

43
backend/model/log.go Normal file
View File

@@ -0,0 +1,43 @@
package model
import (
"github.com/apex/log"
"os"
"runtime/debug"
)
// 获取本地日志
func GetLocalLog(logPath string) (fileBytes []byte, err error) {
f, err := os.Open(logPath)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return nil, err
}
fi, err := f.Stat()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return nil, err
}
defer f.Close()
const bufLen = 2 * 1024 * 1024
logBuf := make([]byte, bufLen)
off := int64(0)
if fi.Size() > int64(len(logBuf)) {
off = fi.Size() - int64(len(logBuf))
}
n, err := f.ReadAt(logBuf, off)
//到文件结尾会有EOF标识
if err != nil && err.Error() != "EOF" {
log.Error(err.Error())
debug.PrintStack()
return nil, err
}
logBuf = logBuf[:n]
return logBuf, nil
}

View File

@@ -1,5 +1,40 @@
package model
import (
"github.com/apex/log"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"runtime/debug"
"strings"
)
var executableNameMap = map[string]string{
// python
"python": "Python",
"python2": "Python 2",
"python2.7": "Python 2.7",
"python3": "Python 3",
"python3.5": "Python 3.5",
"python3.6": "Python 3.6",
"python3.7": "Python 3.7",
"python3.8": "Python 3.8",
// java
"java": "Java",
// go
"go": "Go",
// node
"node": "NodeJS",
// php
"php": "PHP",
// windows command
"cmd": "Windows Command Prompt",
// linux shell
"sh": "Shell",
"bash": "bash",
}
type SystemInfo struct {
ARCH string `json:"arch"`
OS string `json:"os"`
@@ -13,3 +48,64 @@ type Executable struct {
FileName string `json:"file_name"`
DisplayName string `json:"display_name"`
}
func GetLocalSystemInfo() (sysInfo SystemInfo, err error) {
executables, err := GetExecutables()
if err != nil {
return sysInfo, err
}
hostname, err := os.Hostname()
if err != nil {
debug.PrintStack()
return sysInfo, err
}
return SystemInfo{
ARCH: runtime.GOARCH,
OS: runtime.GOOS,
NumCpu: runtime.GOMAXPROCS(0),
Hostname: hostname,
Executables: executables,
}, nil
}
func GetSystemEnv(key string) string {
return os.Getenv(key)
}
func GetPathValues() (paths []string) {
pathEnv := GetSystemEnv("PATH")
return strings.Split(pathEnv, ":")
}
func GetExecutables() (executables []Executable, err error) {
pathValues := GetPathValues()
cache := map[string]string{}
for _, path := range pathValues {
fileList, err := ioutil.ReadDir(path)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
for _, file := range fileList {
displayName := executableNameMap[file.Name()]
filePath := filepath.Join(path, file.Name())
if cache[filePath] == "" {
if displayName != "" {
executables = append(executables, Executable{
Path: filePath,
FileName: file.Name(),
DisplayName: displayName,
})
}
cache[filePath] = filePath
}
}
}
return executables, nil
}

View File

@@ -191,7 +191,7 @@ func RemoveTask(id string) error {
return nil
}
func RemoveTaskBySpiderId(id string) error {
func RemoveTaskBySpiderId(id bson.ObjectId) error {
tasks, err := GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")
if err != nil {
log.Error("get tasks error:" + err.Error())

View File

@@ -229,8 +229,14 @@ func DeleteSpider(c *gin.Context) {
return
}
// 删除日志文件
if err := services.RemoveLogBySpiderId(spider.Id); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 删除爬虫对应的task任务
if err := model.RemoveTaskBySpiderId(spider.Id.Hex()); err != nil {
if err := model.RemoveTaskBySpiderId(spider.Id); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}

View File

@@ -124,6 +124,13 @@ func PutTask(c *gin.Context) {
func DeleteTask(c *gin.Context) {
id := c.Param("id")
// 删除日志文件
if err := services.RemoveLogByTaskId(id); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 删除task
if err := model.RemoveTask(id); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return

View File

@@ -5,9 +5,11 @@ import (
"crawlab/database"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/msg_handler"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"io/ioutil"
"os"
@@ -18,46 +20,10 @@ import (
// 任务日志频道映射
var TaskLogChanMap = utils.NewChanMap()
// 获取本地日志
func GetLocalLog(logPath string) (fileBytes []byte, err error) {
f, err := os.Open(logPath)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return nil, err
}
fi, err := f.Stat()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return nil, err
}
defer f.Close()
const bufLen = 2 * 1024 * 1024
logBuf := make([]byte, bufLen)
off := int64(0)
if fi.Size() > int64(len(logBuf)) {
off = fi.Size() - int64(len(logBuf))
}
n, err := f.ReadAt(logBuf, off)
//到文件结尾会有EOF标识
if err != nil && err.Error() != "EOF" {
log.Error(err.Error())
debug.PrintStack()
return nil, err
}
logBuf = logBuf[:n]
return logBuf, nil
}
// 获取远端日志
func GetRemoteLog(task model.Task) (logStr string, err error) {
// 序列化消息
msg := NodeMessage{
msg := msg_handler.NodeMessage{
Type: constants.MsgTypeGetLog,
LogPath: task.LogPath,
TaskId: task.Id,
@@ -85,6 +51,7 @@ func GetRemoteLog(task model.Task) (logStr string, err error) {
return logStr, nil
}
// 定时删除日志
func DeleteLogPeriodically() {
logDir := viper.GetString("log.path")
if !utils.Exists(logDir) {
@@ -107,6 +74,71 @@ func DeleteLogPeriodically() {
}
// 删除本地日志
func RemoveLocalLog(path string) error {
if err := model.RemoveFile(path); err != nil {
log.Error("remove local file error: " + err.Error())
return err
}
return nil
}
// 删除远程日志
func RemoveRemoteLog(task model.Task) error {
msg := msg_handler.NodeMessage{
Type: constants.MsgTypeRemoveLog,
LogPath: task.LogPath,
TaskId: task.Id,
}
msgBytes, err := json.Marshal(&msg)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 发布获取日志消息
channel := "nodes:" + task.NodeId.Hex()
if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil {
log.Errorf(err.Error())
return err
}
return nil
}
// 删除日志文件
func RemoveLogByTaskId(id string) error {
t, err := model.GetTask(id)
if err != nil {
log.Error("get task error:" + err.Error())
return err
}
removeLog(t)
return nil
}
func removeLog(t model.Task) {
if err := RemoveLocalLog(t.LogPath); err != nil {
log.Error("remove local log error:" + err.Error())
}
if err := RemoveRemoteLog(t); err != nil {
log.Error("remove remote log error:" + err.Error())
}
}
// 删除日志文件
func RemoveLogBySpiderId(id bson.ObjectId) error {
tasks, err := model.GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")
if err != nil {
log.Error("get tasks error:" + err.Error())
}
for _, task := range tasks {
removeLog(task)
}
return nil
}
// 初始化定时删除日志
func InitDeleteLogPeriodically() error {
c := cron.New(cron.WithSeconds())
if _, err := c.AddFunc(viper.GetString("log.deleteFrequency"), DeleteLogPeriodically); err != nil {

View File

@@ -0,0 +1,48 @@
package msg_handler
import (
"crawlab/constants"
"crawlab/model"
)
type Handler interface {
Handle() error
}
func GetMsgHandler(msg NodeMessage) Handler {
if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog {
return &Log{
msg: msg,
}
} else if msg.Type == constants.MsgTypeCancelTask {
return &Task{
msg: msg,
}
} else if msg.Type == constants.MsgTypeGetSystemInfo {
return &SystemInfo{
msg: msg,
}
}
return nil
}
type NodeMessage struct {
// 通信类别
Type string `json:"type"`
// 任务相关
TaskId string `json:"task_id"` // 任务ID
// 节点相关
NodeId string `json:"node_id"` // 节点ID
// 日志相关
LogPath string `json:"log_path"` // 日志路径
Log string `json:"log"` // 日志
// 系统信息
SysInfo model.SystemInfo `json:"sys_info"`
// 错误相关
Error string `json:"error"`
}

View File

@@ -0,0 +1,60 @@
package msg_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"runtime/debug"
)
type Log struct {
msg NodeMessage
}
func (g *Log) Handle() error {
if g.msg.Type == constants.MsgTypeGetLog {
return g.get()
} else if g.msg.Type == constants.MsgTypeRemoveLog {
return g.remove()
}
return nil
}
func (g *Log) get() error {
// 发出的消息
msgSd := NodeMessage{
Type: constants.MsgTypeGetLog,
TaskId: g.msg.TaskId,
}
// 获取本地日志
logStr, err := model.GetLocalLog(g.msg.LogPath)
log.Info(utils.BytesToString(logStr))
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
msgSd.Error = err.Error()
msgSd.Log = err.Error()
} else {
msgSd.Log = utils.BytesToString(logStr)
}
// 序列化
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
return err
}
// 发布消息给主节点
log.Info("publish get log msg to master")
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
return err
}
return nil
}
func (g *Log) remove() error {
return model.RemoveFile(g.msg.LogPath)
}

View File

@@ -0,0 +1,39 @@
package msg_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"runtime/debug"
)
type SystemInfo struct {
msg NodeMessage
}
func (s *SystemInfo) Handle() error {
// 获取环境信息
sysInfo, err := model.GetLocalSystemInfo()
if err != nil {
return err
}
msgSd := NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: s.msg.NodeId,
SysInfo: sysInfo,
}
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
log.Errorf(err.Error())
return err
}
return nil
}

View File

@@ -0,0 +1,17 @@
package msg_handler
import (
"crawlab/constants"
"crawlab/utils"
)
type Task struct {
msg NodeMessage
}
func (t *Task) Handle() error {
// 取消任务
ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId)
ch <- constants.TaskCancel
return nil
}

View File

@@ -6,6 +6,7 @@ import (
"crawlab/database"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/msg_handler"
"crawlab/services/register"
"crawlab/utils"
"encoding/json"
@@ -27,27 +28,6 @@ type Data struct {
UpdateTsUnix int64 `json:"update_ts_unix"`
}
type NodeMessage struct {
// 通信类别
Type string `json:"type"`
// 任务相关
TaskId string `json:"task_id"` // 任务ID
// 节点相关
NodeId string `json:"node_id"` // 节点ID
// 日志相关
LogPath string `json:"log_path"` // 日志路径
Log string `json:"log"` // 日志
// 系统信息
SysInfo model.SystemInfo `json:"sys_info"`
// 错误相关
Error string `json:"error"`
}
const (
Yes = "Y"
No = "N"
@@ -263,7 +243,7 @@ func UpdateNodeData() {
func MasterNodeCallback(message redis.Message) (err error) {
// 反序列化
var msg NodeMessage
var msg msg_handler.NodeMessage
if err := json.Unmarshal(message.Data, &msg); err != nil {
return err
@@ -288,72 +268,17 @@ func MasterNodeCallback(message redis.Message) (err error) {
func WorkerNodeCallback(message redis.Message) (err error) {
// 反序列化
msg := NodeMessage{}
msg := msg_handler.NodeMessage{}
if err := json.Unmarshal(message.Data, &msg); err != nil {
return err
}
if msg.Type == constants.MsgTypeGetLog {
// 消息类型为获取日志
// 发出的消息
msgSd := NodeMessage{
Type: constants.MsgTypeGetLog,
TaskId: msg.TaskId,
}
// 获取本地日志
logStr, err := GetLocalLog(msg.LogPath)
log.Info(utils.BytesToString(logStr))
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
msgSd.Error = err.Error()
msgSd.Log = err.Error()
} else {
msgSd.Log = utils.BytesToString(logStr)
}
// 序列化
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
return err
}
// 发布消息给主节点
log.Info("publish get log msg to master")
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
return err
}
} else if msg.Type == constants.MsgTypeCancelTask {
// 取消任务
ch := TaskExecChanMap.ChanBlocked(msg.TaskId)
ch <- constants.TaskCancel
} else if msg.Type == constants.MsgTypeGetSystemInfo {
// 获取环境信息
sysInfo, err := GetLocalSystemInfo()
if err != nil {
return err
}
msgSd := NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: msg.NodeId,
SysInfo: sysInfo,
}
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
log.Errorf(err.Error())
return err
}
// worker message handle
if err := msg_handler.GetMsgHandler(msg).Handle(); err != nil {
return err
}
return
return nil
}
// 初始化节点服务

View File

@@ -109,13 +109,28 @@ func SaveSpiders(spiders []model.Spider) error {
if spider.Type != constants.Customized {
continue
}
var spider_ *model.Spider
if err := c.Find(bson.M{"src": spider.Src}).One(&spider_); err != nil {
spider_ := []*model.Spider{}
_ = c.Find(bson.M{"src": spider.Src}).All(&spider_)
// 以防出现多个重复的爬虫
if len(spider_) > 1 {
if _, err := c.RemoveAll(bson.M{"src": spider.Src}); err != nil {
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
debug.PrintStack()
continue
}
if err := spider.Add(); err != nil {
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
debug.PrintStack()
continue
}
continue
}
if len(spider_) == 0 {
// 不存在
if err := spider.Add(); err != nil {
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
debug.PrintStack()
return err
continue
}
}
}
@@ -151,11 +166,14 @@ func ZipSpider(spider model.Spider) (filePath string, err error) {
// 临时文件路径
randomId := uuid.NewV4()
filePath = filepath.Join(
viper.GetString("other.tmppath"),
randomId.String()+".zip",
)
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.MkdirAll(tmpPath, 0777); err != nil {
log.Errorf("mkdir other.tmppath error: %v", err.Error())
return "", err
}
}
filePath = filepath.Join(tmpPath, randomId.String()+".zip")
// 将源文件夹打包为zip文件
d, err := os.Open(spider.Src)
if err != nil {
@@ -340,9 +358,16 @@ func OnFileUpload(message redis.Message) (err error) {
// 生成唯一ID
randomId := uuid.NewV4()
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.MkdirAll(tmpPath, 0777); err != nil {
log.Errorf("mkdir other.tmppath error: %v", err.Error())
return err
}
}
// 创建临时文件
tmpFilePath := filepath.Join(viper.GetString("other.tmppath"), randomId.String()+".zip")
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
log.Errorf(err.Error())

View File

@@ -4,108 +4,16 @@ import (
"crawlab/constants"
"crawlab/database"
"crawlab/model"
"crawlab/services/msg_handler"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"runtime/debug"
"strings"
)
var SystemInfoChanMap = utils.NewChanMap()
var executableNameMap = map[string]string{
// python
"python": "Python",
"python2": "Python 2",
"python2.7": "Python 2.7",
"python3": "Python 3",
"python3.5": "Python 3.5",
"python3.6": "Python 3.6",
"python3.7": "Python 3.7",
"python3.8": "Python 3.8",
// java
"java": "Java",
// go
"go": "Go",
// node
"node": "NodeJS",
// php
"php": "PHP",
// windows command
"cmd": "Windows Command Prompt",
// linux shell
"sh": "Shell",
"bash": "bash",
}
func GetSystemEnv(key string) string {
return os.Getenv(key)
}
func GetPathValues() (paths []string) {
pathEnv := GetSystemEnv("PATH")
return strings.Split(pathEnv, ":")
}
func GetExecutables() (executables []model.Executable, err error) {
pathValues := GetPathValues()
cache := map[string]string{}
for _, path := range pathValues {
fileList, err := ioutil.ReadDir(path)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
for _, file := range fileList {
displayName := executableNameMap[file.Name()]
filePath := filepath.Join(path, file.Name())
if cache[filePath] == "" {
if displayName != "" {
executables = append(executables, model.Executable{
Path: filePath,
FileName: file.Name(),
DisplayName: displayName,
})
}
cache[filePath] = filePath
}
}
}
return executables, nil
}
func GetLocalSystemInfo() (sysInfo model.SystemInfo, err error) {
executables, err := GetExecutables()
if err != nil {
return sysInfo, err
}
hostname, err := os.Hostname()
if err != nil {
debug.PrintStack()
return sysInfo, err
}
return model.SystemInfo{
ARCH: runtime.GOARCH,
OS: runtime.GOOS,
NumCpu: runtime.GOMAXPROCS(0),
Hostname: hostname,
Executables: executables,
}, nil
}
func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
// 发送消息
msg := NodeMessage{
msg := msg_handler.NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: id,
}
@@ -132,7 +40,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
func GetSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
if IsMasterNode(id) {
sysInfo, err = GetLocalSystemInfo()
sysInfo, err = model.GetLocalSystemInfo()
} else {
sysInfo, err = GetRemoteSystemInfo(id)
}

View File

@@ -5,6 +5,7 @@ import (
"crawlab/database"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/msg_handler"
"crawlab/utils"
"encoding/json"
"errors"
@@ -25,6 +26,7 @@ var Exec *Executor
// 任务执行锁
//Added by cloud: 2019/09/04,solve data race
var LockList sync.Map
// 任务消息
type TaskMessage struct {
Id string
@@ -69,8 +71,6 @@ func (ex *Executor) Start() error {
return nil
}
var TaskExecChanMap = utils.NewChanMap()
// 派发任务
func AssignTask(task model.Task) error {
// 生成任务信息
@@ -135,7 +135,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
}
// 起一个goroutine来监控进程
ch := TaskExecChanMap.ChanBlocked(t.Id)
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
go func() {
// 传入信号,此处阻塞
signal := <-ch
@@ -408,7 +408,7 @@ func GetTaskLog(id string) (logStr string, err error) {
logStr = ""
if IsMasterNode(task.NodeId.Hex()) {
// 若为主节点,获取本机日志
logBytes, err := GetLocalLog(task.LogPath)
logBytes, err := model.GetLocalLog(task.LogPath)
logStr = utils.BytesToString(logBytes)
if err != nil {
log.Errorf(err.Error())
@@ -452,7 +452,7 @@ func CancelTask(id string) (err error) {
// 任务节点为主节点
// 获取任务执行频道
ch := TaskExecChanMap.ChanBlocked(id)
ch := utils.TaskExecChanMap.ChanBlocked(id)
// 发出取消进程信号
ch <- constants.TaskCancel
@@ -460,7 +460,7 @@ func CancelTask(id string) (err error) {
// 任务节点为工作节点
// 序列化消息
msg := NodeMessage{
msg := msg_handler.NodeMessage{
Type: constants.MsgTypeCancelTask,
TaskId: id,
}

View File

@@ -1,5 +1,7 @@
package utils
var TaskExecChanMap = NewChanMap()
type ChanMap struct {
m map[string]chan string
}

View File

@@ -207,7 +207,7 @@
:width="col.width">
</el-table-column>
</template>
<el-table-column :label="$t('Action')" align="left" width="150px" fixed="right">
<el-table-column :label="$t('Action')" align="left" fixed="right">
<template slot-scope="scope">
<el-tooltip :content="$t('View')" placement="top">
<el-button type="primary" icon="el-icon-search" size="mini" @click="onView(scope.row)"></el-button>