fix 删除爬虫的问题

This commit is contained in:
陈景阳
2019-09-30 12:09:37 +08:00
parent d93bff63e7
commit 41556cab74
20 changed files with 240 additions and 142 deletions

View File

@@ -0,0 +1,9 @@
package constants
const (
ChannelAllNode = "nodes:public"
ChannelWorkerNode = "nodes:"
ChannelMasterNode = "nodes:master"
)

View File

@@ -5,4 +5,5 @@ const (
MsgTypeGetSystemInfo = "get-sys-info"
MsgTypeCancelTask = "cancel-task"
MsgTypeRemoveLog = "remove-log"
MsgTypeRemoveSpider = "remove-spider"
)

25
backend/entity/node.go Normal file
View File

@@ -0,0 +1,25 @@
package entity
type NodeMessage struct {
// 通信类别
Type string `json:"type"`
// 任务相关
TaskId string `json:"task_id"` // 任务ID
// 节点相关
NodeId string `json:"node_id"` // 节点ID
// 日志相关
LogPath string `json:"log_path"` // 日志路径
Log string `json:"log"` // 日志
// 系统信息
SysInfo SystemInfo `json:"sys_info"`
// 爬虫相关
SpiderId string `json:"spider_id"` //爬虫ID
// 错误相关
Error string `json:"error"`
}

15
backend/entity/system.go Normal file
View File

@@ -0,0 +1,15 @@
package entity
type SystemInfo struct {
ARCH string `json:"arch"`
OS string `json:"os"`
Hostname string `json:"host_name"`
NumCpu int `json:"num_cpu"`
Executables []Executable `json:"executables"`
}
type Executable struct {
Path string `json:"path"`
FileName string `json:"file_name"`
DisplayName string `json:"display_name"`
}

View File

@@ -68,7 +68,7 @@ func RemoveFile(path string) error {
debug.PrintStack()
return nil
}
if err := os.Remove(path); err != nil {
if err := os.RemoveAll(path); err != nil {
return err
}
return nil

View File

@@ -157,6 +157,7 @@ func GetSpider(id bson.ObjectId) (Spider, error) {
var result Spider
if err := c.FindId(id).One(&result); err != nil {
if err != mgo.ErrNotFound {
log.Errorf("get spider error: %s, id: %id", err.Error(), id.Hex())
debug.PrintStack()
}
return result, err
@@ -190,6 +191,8 @@ func RemoveSpider(id bson.ObjectId) error {
}
if err := c.RemoveId(id); err != nil {
log.Errorf("remove spider error: %s, id:%s", err.Error(), id.Hex())
debug.PrintStack()
return err
}
@@ -199,6 +202,7 @@ func RemoveSpider(id bson.ObjectId) error {
if err := gf.RemoveId(result.FileId); err != nil {
log.Error("remove file error, id:" + result.FileId.Hex())
debug.PrintStack()
return err
}

View File

@@ -1,6 +1,7 @@
package model
import (
"crawlab/entity"
"github.com/apex/log"
"io/ioutil"
"os"
@@ -35,21 +36,7 @@ var executableNameMap = map[string]string{
"bash": "bash",
}
type SystemInfo struct {
ARCH string `json:"arch"`
OS string `json:"os"`
Hostname string `json:"host_name"`
NumCpu int `json:"num_cpu"`
Executables []Executable `json:"executables"`
}
type Executable struct {
Path string `json:"path"`
FileName string `json:"file_name"`
DisplayName string `json:"display_name"`
}
func GetLocalSystemInfo() (sysInfo SystemInfo, err error) {
func GetLocalSystemInfo() (sysInfo entity.SystemInfo, err error) {
executables, err := GetExecutables()
if err != nil {
return sysInfo, err
@@ -60,7 +47,7 @@ func GetLocalSystemInfo() (sysInfo SystemInfo, err error) {
return sysInfo, err
}
return SystemInfo{
return entity.SystemInfo{
ARCH: runtime.GOARCH,
OS: runtime.GOOS,
NumCpu: runtime.GOMAXPROCS(0),
@@ -78,7 +65,7 @@ func GetPathValues() (paths []string) {
return strings.Split(pathEnv, ":")
}
func GetExecutables() (executables []Executable, err error) {
func GetExecutables() (executables []entity.Executable, err error) {
pathValues := GetPathValues()
cache := map[string]string{}
@@ -97,7 +84,7 @@ func GetExecutables() (executables []Executable, err error) {
if cache[filePath] == "" {
if displayName != "" {
executables = append(executables, Executable{
executables = append(executables, entity.Executable{
Path: filePath,
FileName: file.Name(),
DisplayName: displayName,

View File

@@ -199,33 +199,7 @@ func DeleteSpider(c *gin.Context) {
return
}
// 获取该爬虫
spider, err := model.GetSpider(bson.ObjectIdHex(id))
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 删除爬虫文件目录
if err := os.RemoveAll(spider.Src); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 从数据库中删除该爬虫
if err := model.RemoveSpider(bson.ObjectIdHex(id)); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 删除日志文件
if err := services.RemoveLogBySpiderId(spider.Id); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 删除爬虫对应的task任务
if err := model.RemoveTaskBySpiderId(spider.Id); err != nil {
if err := services.RemoveSpider(id); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}

View File

@@ -3,9 +3,9 @@ package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/msg_handler"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
@@ -23,7 +23,7 @@ var TaskLogChanMap = utils.NewChanMap()
// 获取远端日志
func GetRemoteLog(task model.Task) (logStr string, err error) {
// 序列化消息
msg := msg_handler.NodeMessage{
msg := entity.NodeMessage{
Type: constants.MsgTypeGetLog,
LogPath: task.LogPath,
TaskId: task.Id,
@@ -85,21 +85,16 @@ func RemoveLocalLog(path string) error {
// 删除远程日志
func RemoveRemoteLog(task model.Task) error {
msg := msg_handler.NodeMessage{
msg := entity.NodeMessage{
Type: constants.MsgTypeRemoveLog,
LogPath: task.LogPath,
TaskId: task.Id,
}
msgBytes, err := json.Marshal(&msg)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 发布获取日志消息
channel := "nodes:" + task.NodeId.Hex()
if _, err := database.RedisClient.Publish(channel, utils.BytesToString(msgBytes)); err != nil {
log.Errorf(err.Error())
if _, err := database.RedisClient.Publish(channel, utils.GetJson(msg)); err != nil {
log.Errorf("publish redis error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
@@ -119,10 +114,12 @@ func RemoveLogByTaskId(id string) error {
func removeLog(t model.Task) {
if err := RemoveLocalLog(t.LogPath); err != nil {
log.Error("remove local log error:" + err.Error())
log.Errorf("remove local log error: %s", err.Error())
debug.PrintStack()
}
if err := RemoveRemoteLog(t); err != nil {
log.Error("remove remote log error:" + err.Error())
log.Errorf("remove remote log error: %s", err.Error())
debug.PrintStack()
}
}
@@ -130,7 +127,8 @@ func removeLog(t model.Task) {
func RemoveLogBySpiderId(id bson.ObjectId) error {
tasks, err := model.GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")
if err != nil {
log.Error("get tasks error:" + err.Error())
log.Errorf("get tasks error: %s", err.Error())
debug.PrintStack()
}
for _, task := range tasks {
removeLog(task)

View File

@@ -2,47 +2,34 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/model"
"crawlab/entity"
)
type Handler interface {
Handle() error
}
func GetMsgHandler(msg NodeMessage) Handler {
func GetMsgHandler(msg entity.NodeMessage) Handler {
if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog {
// 日志相关
return &Log{
msg: msg,
}
} else if msg.Type == constants.MsgTypeCancelTask {
// 任务相关
return &Task{
msg: msg,
}
} else if msg.Type == constants.MsgTypeGetSystemInfo {
// 系统信息相关
return &SystemInfo{
msg: msg,
}
} else if msg.Type == constants.MsgTypeRemoveSpider {
// 爬虫相关
return &Spider{
SpiderId: msg.SpiderId,
}
}
return nil
}
type NodeMessage struct {
// 通信类别
Type string `json:"type"`
// 任务相关
TaskId string `json:"task_id"` // 任务ID
// 节点相关
NodeId string `json:"node_id"` // 节点ID
// 日志相关
LogPath string `json:"log_path"` // 日志路径
Log string `json:"log"` // 日志
// 系统信息
SysInfo model.SystemInfo `json:"sys_info"`
// 错误相关
Error string `json:"error"`
}

View File

@@ -2,16 +2,15 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"runtime/debug"
)
type Log struct {
msg NodeMessage
msg entity.NodeMessage
}
func (g *Log) Handle() error {
@@ -25,31 +24,22 @@ func (g *Log) Handle() error {
func (g *Log) get() error {
// 发出的消息
msgSd := NodeMessage{
msgSd := entity.NodeMessage{
Type: constants.MsgTypeGetLog,
TaskId: g.msg.TaskId,
}
// 获取本地日志
logStr, err := model.GetLocalLog(g.msg.LogPath)
log.Info(utils.BytesToString(logStr))
if err != nil {
log.Errorf(err.Error())
log.Errorf("get node local log error: %s", err.Error())
debug.PrintStack()
msgSd.Error = err.Error()
msgSd.Log = err.Error()
} else {
msgSd.Log = utils.BytesToString(logStr)
}
// 序列化
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
return err
}
// 发布消息给主节点
log.Info("publish get log msg to master")
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil {
return err
}
return nil

View File

@@ -0,0 +1,24 @@
package msg_handler
import (
"crawlab/model"
"crawlab/utils"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"path/filepath"
)
type Spider struct {
SpiderId string
}
func (s *Spider) Handle() error {
// 移除本地的爬虫目录
spider, err := model.GetSpider(bson.ObjectIdHex(s.SpiderId))
if err != nil {
return err
}
path := filepath.Join(viper.GetString("spider.path"), spider.Name)
utils.RemoveFiles(path)
return nil
}

View File

@@ -2,16 +2,13 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"runtime/debug"
)
type SystemInfo struct {
msg NodeMessage
msg entity.NodeMessage
}
func (s *SystemInfo) Handle() error {
@@ -20,19 +17,12 @@ func (s *SystemInfo) Handle() error {
if err != nil {
return err
}
msgSd := NodeMessage{
msgSd := entity.NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: s.msg.NodeId,
SysInfo: sysInfo,
}
msgSdBytes, err := json.Marshal(&msgSd)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
if _, err := database.RedisClient.Publish("nodes:master", utils.BytesToString(msgSdBytes)); err != nil {
log.Errorf(err.Error())
if err := utils.Pub(constants.ChannelMasterNode, msgSd); err != nil {
return err
}
return nil

View File

@@ -2,6 +2,7 @@ package msg_handler
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"github.com/apex/log"
@@ -10,7 +11,7 @@ import (
)
type Task struct {
msg NodeMessage
msg entity.NodeMessage
}
func (t *Task) Handle() error {

View File

@@ -1,9 +1,9 @@
package services
import (
"context"
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/msg_handler"
@@ -175,7 +175,7 @@ func UpdateNodeData() {
func MasterNodeCallback(message redis.Message) (err error) {
// 反序列化
var msg msg_handler.NodeMessage
var msg entity.NodeMessage
if err := json.Unmarshal(message.Data, &msg); err != nil {
return err
@@ -183,7 +183,6 @@ func MasterNodeCallback(message redis.Message) (err error) {
if msg.Type == constants.MsgTypeGetLog {
// 获取日志
fmt.Println(msg)
time.Sleep(10 * time.Millisecond)
ch := TaskLogChanMap.ChanBlocked(msg.TaskId)
ch <- msg.Log
@@ -200,14 +199,8 @@ func MasterNodeCallback(message redis.Message) (err error) {
func WorkerNodeCallback(message redis.Message) (err error) {
// 反序列化
msg := msg_handler.NodeMessage{}
if err := json.Unmarshal(message.Data, &msg); err != nil {
return err
}
// worker message handle
if err := msg_handler.GetMsgHandler(msg).Handle(); err != nil {
msg := utils.GetMessage(message)
if err := msg_handler.GetMsgHandler(*msg).Handle(); err != nil {
return err
}
return nil
@@ -234,23 +227,25 @@ func InitNodeService() error {
log.Errorf(err.Error())
return err
}
ctx := context.Background()
if model.IsMaster() {
// 如果为主节点,订阅主节点通信频道
channel := "nodes:master"
err := database.RedisClient.Subscribe(ctx, MasterNodeCallback, channel)
if err != nil {
if err := utils.Sub(constants.ChannelMasterNode, MasterNodeCallback); err != nil {
return err
}
} else {
// 若为工作节点,订阅单独指定通信频道
channel := "nodes:" + node.Id.Hex()
err := database.RedisClient.Subscribe(ctx, WorkerNodeCallback, channel)
if err != nil {
channel := constants.ChannelWorkerNode + node.Id.Hex()
if err := utils.Sub(channel, WorkerNodeCallback); err != nil {
return err
}
}
// 订阅全通道
if err := utils.Sub(constants.ChannelAllNode, WorkerNodeCallback); err != nil {
return err
}
// 如果为主节点每30秒刷新所有节点信息
if model.IsMaster() {
spec := "*/10 * * * * *"
@@ -260,7 +255,7 @@ func InitNodeService() error {
}
}
// 更新在当前节点执行的任务状态为abnormal
// 更新在当前节点执行的任务状态为abnormal
if err := model.UpdateTaskToAbnormal(node.Id); err != nil {
debug.PrintStack()
return err

View File

@@ -3,6 +3,7 @@ package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/spider_handler"
@@ -65,6 +66,7 @@ func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err er
return fid, nil
}
// 写入grid fs
func WriteToGridFS(content []byte, f *mgo.GridFile) {
if _, err := f.Write(content); err != nil {
debug.PrintStack()
@@ -141,7 +143,7 @@ func PublishSpider(spider model.Spider) {
return
}
// md5值不一样则下载
md5Str := utils.ReadFile(md5)
md5Str := utils.ReadFileOneLine(md5)
if gfFile.Md5 != md5Str {
spiderSync.RemoveSpiderFile()
spiderSync.Download()
@@ -150,6 +152,45 @@ func PublishSpider(spider model.Spider) {
}
}
func RemoveSpider(id string) error {
// 获取该爬虫
spider, err := model.GetSpider(bson.ObjectIdHex(id))
if err != nil {
return err
}
// 删除爬虫文件目录
path := filepath.Join(viper.GetString("spider.path"), spider.Name)
utils.RemoveFiles(path)
// 删除其他节点的爬虫目录
msg := entity.NodeMessage{
Type: constants.MsgTypeRemoveSpider,
SpiderId: id,
}
if err := utils.Pub(constants.ChannelAllNode, msg); err != nil {
return err
}
// 从数据库中删除该爬虫
if err := model.RemoveSpider(bson.ObjectIdHex(id)); err != nil {
return err
}
// 删除日志文件
if err := RemoveLogBySpiderId(spider.Id); err != nil {
return err
}
// 删除爬虫对应的task任务
if err := model.RemoveTaskBySpiderId(spider.Id); err != nil {
return err
}
// TODO 定时任务如何处理
return nil
}
// 启动爬虫服务
func InitSpiderService() error {
// 构造定时任务执行器

View File

@@ -3,17 +3,17 @@ package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/services/msg_handler"
"crawlab/utils"
"encoding/json"
)
var SystemInfoChanMap = utils.NewChanMap()
func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
func GetRemoteSystemInfo(id string) (sysInfo entity.SystemInfo, err error) {
// 发送消息
msg := msg_handler.NodeMessage{
msg := entity.NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: id,
}
@@ -21,7 +21,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
// 序列化
msgBytes, _ := json.Marshal(&msg)
if _, err := database.RedisClient.Publish("nodes:"+id, utils.BytesToString(msgBytes)); err != nil {
return model.SystemInfo{}, err
return entity.SystemInfo{}, err
}
// 通道
@@ -38,7 +38,7 @@ func GetRemoteSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
return sysInfo, nil
}
func GetSystemInfo(id string) (sysInfo model.SystemInfo, err error) {
func GetSystemInfo(id string) (sysInfo entity.SystemInfo, err error) {
if IsMasterNode(id) {
sysInfo, err = model.GetLocalSystemInfo()
} else {

View File

@@ -3,9 +3,9 @@ package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/msg_handler"
"crawlab/utils"
"encoding/json"
"errors"
@@ -493,7 +493,7 @@ func CancelTask(id string) (err error) {
// 任务节点为工作节点
// 序列化消息
msg := msg_handler.NodeMessage{
msg := entity.NodeMessage{
Type: constants.MsgTypeCancelTask,
TaskId: id,
}

View File

@@ -10,7 +10,16 @@ import (
"runtime/debug"
)
func ReadFile(fileName string) string {
// 删除文件
func RemoveFiles(path string) {
if err := os.RemoveAll(path); err != nil {
log.Errorf("remove files error: %s, path: %s", err.Error(), path)
debug.PrintStack()
}
}
// 读取文件一行
func ReadFileOneLine(fileName string) string {
file := OpenFile(fileName)
defer file.Close()
buf := bufio.NewReader(file)

View File

@@ -1,7 +1,55 @@
package utils
import "unsafe"
import (
"context"
"crawlab/database"
"crawlab/entity"
"encoding/json"
"github.com/apex/log"
"github.com/gomodule/redigo/redis"
"runtime/debug"
"unsafe"
)
func BytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
func GetJson(message entity.NodeMessage) string {
msgBytes, err := json.Marshal(&message)
if err != nil {
log.Errorf("node message to json error: %s", err.Error())
debug.PrintStack()
return ""
}
return BytesToString(msgBytes)
}
func GetMessage(message redis.Message) *entity.NodeMessage {
msg := entity.NodeMessage{}
if err := json.Unmarshal(message.Data, &msg); err != nil {
log.Errorf("message byte to object error: %s", err.Error())
debug.PrintStack()
return nil
}
return &msg
}
func Pub(channel string, msg entity.NodeMessage) error {
if _, err := database.RedisClient.Publish(channel, GetJson(msg)); err != nil {
log.Errorf("publish redis error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}
func Sub(channel string, consume database.ConsumeFunc) error {
ctx := context.Background()
if err := database.RedisClient.Subscribe(ctx, consume, channel); err != nil {
log.Errorf("subscribe redis error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}