mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
爬虫逻辑修改为从GridFS获取
This commit is contained in:
@@ -11,7 +11,7 @@ import (
|
||||
"github.com/globalsign/mgo"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
"github.com/pkg/errors"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"github.com/satori/go.uuid"
|
||||
"github.com/spf13/viper"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
@@ -152,49 +152,68 @@ func PutSpider(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 读取临时文件
|
||||
tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777)
|
||||
// 上传到GridFs
|
||||
fid, err := services.UploadToGridFs(file.Filename, tmpFilePath)
|
||||
if err != nil {
|
||||
log.Errorf("upload to grid fs error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
if err = tmpFile.Close(); err != nil {
|
||||
debug.PrintStack()
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
|
||||
// 保存爬虫信息
|
||||
srcPath := viper.GetString("spider.path")
|
||||
spider := model.Spider{
|
||||
Name: file.Filename,
|
||||
DisplayName: file.Filename,
|
||||
Type: constants.Customized,
|
||||
Src: filepath.Join(srcPath, file.Filename),
|
||||
FileId: fid,
|
||||
}
|
||||
_ = spider.Save()
|
||||
|
||||
// 读取临时文件
|
||||
//tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777)
|
||||
//if err != nil {
|
||||
// debug.PrintStack()
|
||||
// HandleError(http.StatusInternalServerError, c, err)
|
||||
// return
|
||||
//}
|
||||
//if err = tmpFile.Close(); err != nil {
|
||||
// debug.PrintStack()
|
||||
// HandleError(http.StatusInternalServerError, c, err)
|
||||
// return
|
||||
//}
|
||||
|
||||
// 目标目录
|
||||
dstPath := filepath.Join(
|
||||
viper.GetString("spider.path"),
|
||||
strings.Replace(file.Filename, ".zip", "", 1),
|
||||
)
|
||||
//dstPath := filepath.Join(
|
||||
// viper.GetString("spider.path"),
|
||||
// strings.Replace(file.Filename, ".zip", "", 1),
|
||||
//)
|
||||
|
||||
// 如果目标目录已存在,删除目标目录
|
||||
if utils.Exists(dstPath) {
|
||||
if err := os.RemoveAll(dstPath); err != nil {
|
||||
debug.PrintStack()
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
}
|
||||
}
|
||||
//if utils.Exists(dstPath) {
|
||||
// if err := os.RemoveAll(dstPath); err != nil {
|
||||
// debug.PrintStack()
|
||||
// HandleError(http.StatusInternalServerError, c, err)
|
||||
// }
|
||||
//}
|
||||
|
||||
// 将临时文件解压到爬虫目录
|
||||
if err := utils.DeCompress(tmpFile, dstPath); err != nil {
|
||||
debug.PrintStack()
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
//if err := utils.DeCompress(tmpFile, dstPath); err != nil {
|
||||
// debug.PrintStack()
|
||||
// HandleError(http.StatusInternalServerError, c, err)
|
||||
// return
|
||||
//}
|
||||
|
||||
// 删除临时文件
|
||||
if err = os.Remove(tmpFilePath); err != nil {
|
||||
debug.PrintStack()
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
//if err = os.Remove(tmpFilePath); err != nil {
|
||||
// debug.PrintStack()
|
||||
// HandleError(http.StatusInternalServerError, c, err)
|
||||
// return
|
||||
//}
|
||||
|
||||
// 更新爬虫
|
||||
services.UpdateSpiders()
|
||||
// services.UpdateSpiders()
|
||||
|
||||
c.JSON(http.StatusOK, Response{
|
||||
Status: "ok",
|
||||
|
||||
@@ -13,16 +13,12 @@ import (
|
||||
"github.com/globalsign/mgo"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/satori/go.uuid"
|
||||
"github.com/spf13/viper"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
type SpiderFileData struct {
|
||||
@@ -36,175 +32,14 @@ type SpiderUploadMessage struct {
|
||||
SpiderId string
|
||||
}
|
||||
|
||||
// 从项目目录中获取爬虫列表
|
||||
func GetSpidersFromDir() ([]model.Spider, error) {
|
||||
// 爬虫项目目录路径
|
||||
srcPath := viper.GetString("spider.path")
|
||||
|
||||
// 如果爬虫项目目录不存在,则创建一个
|
||||
if !utils.Exists(srcPath) {
|
||||
mask := syscall.Umask(0) // 改为 0000 八进制
|
||||
defer syscall.Umask(mask) // 改为原来的 umask
|
||||
if err := os.MkdirAll(srcPath, 0766); err != nil {
|
||||
debug.PrintStack()
|
||||
return []model.Spider{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// 获取爬虫项目目录下的所有子项
|
||||
items, err := ioutil.ReadDir(srcPath)
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return []model.Spider{}, err
|
||||
}
|
||||
|
||||
// 定义爬虫列表
|
||||
spiders := make([]model.Spider, 0)
|
||||
|
||||
// 遍历所有子项
|
||||
for _, item := range items {
|
||||
// 忽略不为目录的子项
|
||||
if !item.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
// 忽略隐藏目录
|
||||
if strings.HasPrefix(item.Name(), ".") {
|
||||
continue
|
||||
}
|
||||
|
||||
// 构造爬虫
|
||||
spider := model.Spider{
|
||||
Name: item.Name(),
|
||||
DisplayName: item.Name(),
|
||||
Type: constants.Customized,
|
||||
Src: filepath.Join(srcPath, item.Name()),
|
||||
FileId: bson.ObjectIdHex(constants.ObjectIdNull),
|
||||
}
|
||||
|
||||
// 将爬虫加入列表
|
||||
spiders = append(spiders, spider)
|
||||
}
|
||||
|
||||
return spiders, nil
|
||||
}
|
||||
|
||||
// 将爬虫保存到数据库
|
||||
func SaveSpiders(spiders []model.Spider) error {
|
||||
s, c := database.GetCol("spiders")
|
||||
defer s.Close()
|
||||
|
||||
if len(spiders) == 0 {
|
||||
err := model.RemoveAllSpider()
|
||||
if err != nil {
|
||||
log.Error("remove all spider error:" + err.Error())
|
||||
return err
|
||||
}
|
||||
log.Info("get spider from dir is empty,removed all spider")
|
||||
return nil
|
||||
}
|
||||
// 如果该爬虫不存在于数据库,则保存爬虫到数据库
|
||||
for _, spider := range spiders {
|
||||
// 忽略非自定义爬虫
|
||||
if spider.Type != constants.Customized {
|
||||
continue
|
||||
}
|
||||
spider_ := []*model.Spider{}
|
||||
_ = c.Find(bson.M{"src": spider.Src}).All(&spider_)
|
||||
// 以防出现多个重复的爬虫
|
||||
if len(spider_) > 1 {
|
||||
if _, err := c.RemoveAll(bson.M{"src": spider.Src}); err != nil {
|
||||
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
|
||||
debug.PrintStack()
|
||||
continue
|
||||
}
|
||||
if err := spider.Add(); err != nil {
|
||||
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
|
||||
debug.PrintStack()
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
if len(spider_) == 0 {
|
||||
// 不存在
|
||||
if err := spider.Add(); err != nil {
|
||||
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
|
||||
debug.PrintStack()
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 更新爬虫
|
||||
func UpdateSpiders() {
|
||||
// 从项目目录获取爬虫列表
|
||||
spiders, err := GetSpidersFromDir()
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 储存爬虫
|
||||
if err := SaveSpiders(spiders); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 打包爬虫目录为zip文件
|
||||
func ZipSpider(spider model.Spider) (filePath string, err error) {
|
||||
// 如果源文件夹不存在,抛错
|
||||
if !utils.Exists(spider.Src) {
|
||||
debug.PrintStack()
|
||||
// 删除该爬虫,否则会一直报错
|
||||
_ = model.RemoveSpider(spider.Id)
|
||||
return "", errors.New("source path does not exist")
|
||||
}
|
||||
|
||||
// 临时文件路径
|
||||
randomId := uuid.NewV4()
|
||||
|
||||
tmpPath := viper.GetString("other.tmppath")
|
||||
if !utils.Exists(tmpPath) {
|
||||
if err := os.MkdirAll(tmpPath, 0777); err != nil {
|
||||
log.Errorf("mkdir other.tmppath error: %v", err.Error())
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
filePath = filepath.Join(tmpPath, randomId.String()+".zip")
|
||||
// 将源文件夹打包为zip文件
|
||||
d, err := os.Open(spider.Src)
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return filePath, err
|
||||
}
|
||||
var files []*os.File
|
||||
files = append(files, d)
|
||||
if err := utils.Compress(files, filePath); err != nil {
|
||||
return filePath, err
|
||||
}
|
||||
|
||||
return filePath, nil
|
||||
}
|
||||
|
||||
// 上传zip文件到GridFS
|
||||
func UploadToGridFs(spider model.Spider, fileName string, filePath string) (fid bson.ObjectId, err error) {
|
||||
func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err error) {
|
||||
fid = ""
|
||||
|
||||
// 获取MongoDB GridFS连接
|
||||
s, gf := database.GetGridFs("files")
|
||||
defer s.Close()
|
||||
|
||||
// 如果存在FileId删除GridFS上的老文件
|
||||
if !utils.IsObjectIdNull(spider.FileId) {
|
||||
if err = gf.RemoveId(spider.FileId); err != nil {
|
||||
log.Error("remove gf file:" + err.Error())
|
||||
debug.PrintStack()
|
||||
}
|
||||
}
|
||||
|
||||
// 创建一个新GridFS文件
|
||||
f, err := gf.Create(fileName)
|
||||
if err != nil {
|
||||
@@ -295,29 +130,24 @@ func PublishAllSpidersJob() {
|
||||
// 2. 上传zip文件到GridFS
|
||||
// 3. 发布消息给工作节点
|
||||
func PublishSpider(spider model.Spider) (err error) {
|
||||
// 将源文件夹打包为zip文件
|
||||
filePath, err := ZipSpider(spider)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 上传zip文件到GridFS
|
||||
fileName := filepath.Base(spider.Src) + ".zip"
|
||||
fid, err := UploadToGridFs(spider, fileName, filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s, gf := database.GetGridFs("files")
|
||||
defer s.Close()
|
||||
|
||||
// 保存FileId
|
||||
spider.FileId = fid
|
||||
if err := spider.Save(); err != nil {
|
||||
f, err := gf.OpenId(spider.FileId)
|
||||
defer f.Close()
|
||||
if err != nil {
|
||||
log.Errorf("open file id: " + spider.FileId.Hex() + ", spider id:" + spider.Id.Hex() + ", error: " + err.Error())
|
||||
debug.PrintStack()
|
||||
// 爬虫和文件没有对应,则删除爬虫
|
||||
_ = model.RemoveSpider(spider.Id)
|
||||
return err
|
||||
}
|
||||
|
||||
// 发布消息给工作节点
|
||||
msg := SpiderUploadMessage{
|
||||
FileId: fid.Hex(),
|
||||
FileName: fileName,
|
||||
FileId: spider.FileId.Hex(),
|
||||
FileName: f.Name(),
|
||||
SpiderId: spider.Id.Hex(),
|
||||
}
|
||||
msgStr, err := json.Marshal(msg)
|
||||
@@ -349,12 +179,12 @@ func OnFileUpload(message redis.Message) (err error) {
|
||||
|
||||
// 从GridFS获取该文件
|
||||
f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId))
|
||||
defer f.Close()
|
||||
if err != nil {
|
||||
log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// 生成唯一ID
|
||||
randomId := uuid.NewV4()
|
||||
@@ -386,7 +216,6 @@ func OnFileUpload(message redis.Message) (err error) {
|
||||
// 解压缩临时文件到目标文件夹
|
||||
dstPath := filepath.Join(
|
||||
viper.GetString("spider.path"),
|
||||
// strings.Replace(msg.FileName, ".zip", "", -1),
|
||||
)
|
||||
if err := utils.DeCompress(tmpFile, dstPath); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
@@ -418,11 +247,6 @@ func InitSpiderService() error {
|
||||
if IsMaster() {
|
||||
// 主节点
|
||||
|
||||
// 每5秒更新一次爬虫信息
|
||||
if _, err := c.AddFunc("*/5 * * * * *", UpdateSpiders); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 每60秒同步爬虫给工作节点
|
||||
if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil {
|
||||
return err
|
||||
@@ -432,8 +256,6 @@ func InitSpiderService() error {
|
||||
|
||||
// 订阅文件上传
|
||||
channel := "files:upload"
|
||||
|
||||
//sub.Connect()
|
||||
ctx := context.Background()
|
||||
return database.RedisClient.Subscribe(ctx, OnFileUpload, channel)
|
||||
|
||||
|
||||
@@ -145,7 +145,6 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
|
||||
if err := cmd.Process.Kill(); err != nil {
|
||||
log.Errorf("process kill error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
t.Status = constants.StatusCancelled
|
||||
} else {
|
||||
@@ -487,10 +486,9 @@ func CancelTask(id string) (err error) {
|
||||
if err := model.UpdateTaskToAbnormal(node.Id); err != nil {
|
||||
log.Errorf("update task to abnormal : {}", err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// 任务节点为工作节点
|
||||
|
||||
|
||||
Reference in New Issue
Block a user