mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
优化爬虫获取逻辑
This commit is contained in:
@@ -1,11 +1,23 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"crawlab/database"
|
||||
"crawlab/utils"
|
||||
"github.com/apex/log"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
type GridFs struct {
|
||||
Id bson.ObjectId `json:"_id" bson:"_id"`
|
||||
ChunkSize int32 `json:"chunk_size" bson:"chunkSize"`
|
||||
UploadDate time.Time `json:"upload_date" bson:"uploadDate"`
|
||||
Length int32 `json:"length" bson:"length"`
|
||||
Md5 string `json:"md_5" bson:"md5"`
|
||||
Filename string `json:"filename" bson:"filename"`
|
||||
}
|
||||
|
||||
type File struct {
|
||||
Name string `json:"name"`
|
||||
Path string `json:"path"`
|
||||
@@ -13,6 +25,19 @@ type File struct {
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
func GetGridFs(id bson.ObjectId) *GridFs {
|
||||
s, gf := database.GetGridFs("files")
|
||||
defer s.Close()
|
||||
|
||||
var gfFile GridFs
|
||||
err := gf.Find(bson.M{"_id": id}).One(&gfFile)
|
||||
if err != nil {
|
||||
log.Errorf("get gf file error: %s, file_id: %s", err.Error(), id.Hex())
|
||||
return nil
|
||||
}
|
||||
return &gfFile
|
||||
}
|
||||
|
||||
func RemoveFile(path string) error {
|
||||
if !utils.Exists(path) {
|
||||
log.Info("file not found: " + path)
|
||||
|
||||
@@ -24,6 +24,8 @@ type Spider struct {
|
||||
Site string `json:"site"` // 爬虫网站
|
||||
Envs []Env `json:"envs" bson:"envs"` // 环境变量
|
||||
Remark string `json:"remark"` // 备注
|
||||
Md5 string `json:"md_5" bson:"md5"` // ZIP文件的MD5
|
||||
OldMd5 string `json:"old_md_5" bson:"old_md5"` //上一次的MD5值
|
||||
// 自定义爬虫
|
||||
Src string `json:"src" bson:"src"` // 源码位置
|
||||
Cmd string `json:"cmd" bson:"cmd"` // 执行命令
|
||||
@@ -122,6 +124,19 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) {
|
||||
return spiders, nil
|
||||
}
|
||||
|
||||
func GetSpiderByName(name string) *Spider {
|
||||
s, c := database.GetCol("spiders")
|
||||
defer s.Close()
|
||||
|
||||
var result *Spider
|
||||
if err := c.Find(bson.M{"name": name}).One(result); err != nil {
|
||||
log.Errorf("get spider error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return result
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func GetSpider(id bson.ObjectId) (Spider, error) {
|
||||
s, c := database.GetCol("spiders")
|
||||
defer s.Close()
|
||||
|
||||
@@ -79,18 +79,6 @@ func PostSpider(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
func PublishAllSpiders(c *gin.Context) {
|
||||
if err := services.PublishAllSpiders(); err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, Response{
|
||||
Status: "ok",
|
||||
Message: "success",
|
||||
})
|
||||
}
|
||||
|
||||
func PublishSpider(c *gin.Context) {
|
||||
id := c.Param("id")
|
||||
|
||||
@@ -104,10 +92,7 @@ func PublishSpider(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := services.PublishSpider(spider); err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
services.PublishSpider(spider)
|
||||
|
||||
c.JSON(http.StatusOK, Response{
|
||||
Status: "ok",
|
||||
@@ -117,7 +102,7 @@ func PublishSpider(c *gin.Context) {
|
||||
|
||||
func PutSpider(c *gin.Context) {
|
||||
// 从body中获取文件
|
||||
file, err := c.FormFile("file")
|
||||
uploadFile, err := c.FormFile("file")
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
@@ -125,7 +110,7 @@ func PutSpider(c *gin.Context) {
|
||||
}
|
||||
|
||||
// 如果不为zip文件,返回错误
|
||||
if !strings.HasSuffix(file.Filename, ".zip") {
|
||||
if !strings.HasSuffix(uploadFile.Filename, ".zip") {
|
||||
debug.PrintStack()
|
||||
HandleError(http.StatusBadRequest, c, errors.New("Not a valid zip file"))
|
||||
return
|
||||
@@ -145,31 +130,50 @@ func PutSpider(c *gin.Context) {
|
||||
// 保存到本地临时文件
|
||||
randomId := uuid.NewV4()
|
||||
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
|
||||
if err := c.SaveUploadedFile(file, tmpFilePath); err != nil {
|
||||
if err := c.SaveUploadedFile(uploadFile, tmpFilePath); err != nil {
|
||||
log.Error("save upload file error: " + err.Error())
|
||||
debug.PrintStack()
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
}
|
||||
|
||||
s, gf := database.GetGridFs("files")
|
||||
defer s.Close()
|
||||
|
||||
// 判断文件是否已经存在
|
||||
var gfFile model.GridFs
|
||||
if err := gf.Find(bson.M{"filename": uploadFile.Filename}).One(&gfFile); err == nil {
|
||||
// 已经存在文件,则删除
|
||||
_ = gf.RemoveId(gfFile.Id)
|
||||
}
|
||||
|
||||
// 上传到GridFs
|
||||
fid, err := services.UploadToGridFs(file.Filename, tmpFilePath)
|
||||
fid, md5, err := services.UploadToGridFs(uploadFile.Filename, tmpFilePath)
|
||||
if err != nil {
|
||||
log.Errorf("upload to grid fs error: %s", err.Error())
|
||||
debug.PrintStack()
|
||||
return
|
||||
}
|
||||
|
||||
// 保存爬虫信息
|
||||
srcPath := viper.GetString("spider.path")
|
||||
spider := model.Spider{
|
||||
Name: file.Filename,
|
||||
DisplayName: file.Filename,
|
||||
Type: constants.Customized,
|
||||
Src: filepath.Join(srcPath, file.Filename),
|
||||
FileId: fid,
|
||||
// 判断爬虫是否存在
|
||||
spiderName := strings.Replace(uploadFile.Filename, ".zip", "", -1)
|
||||
spider := model.GetSpiderByName(spiderName)
|
||||
if spider == nil {
|
||||
// 保存爬虫信息
|
||||
srcPath := viper.GetString("spider.path")
|
||||
spider := model.Spider{
|
||||
Name: spiderName,
|
||||
DisplayName: spiderName,
|
||||
Type: constants.Customized,
|
||||
Src: filepath.Join(srcPath, spiderName),
|
||||
FileId: fid,
|
||||
Md5: md5,
|
||||
}
|
||||
_ = spider.Add()
|
||||
} else {
|
||||
spider.OldMd5 = spider.Md5
|
||||
spider.Md5 = md5
|
||||
_ = spider.Save()
|
||||
}
|
||||
_ = spider.Save()
|
||||
|
||||
c.JSON(http.StatusOK, Response{
|
||||
Status: "ok",
|
||||
@@ -259,7 +263,8 @@ func GetSpiderDir(c *gin.Context) {
|
||||
}
|
||||
|
||||
// 获取目录下文件列表
|
||||
f, err := ioutil.ReadDir(filepath.Join(spider.Src, path))
|
||||
spiderPath := viper.GetString("spider.path")
|
||||
f, err := ioutil.ReadDir(filepath.Join(spiderPath, spider.Name, path))
|
||||
if err != nil {
|
||||
HandleError(http.StatusInternalServerError, c, err)
|
||||
return
|
||||
|
||||
@@ -30,7 +30,7 @@ type SpiderUploadMessage struct {
|
||||
}
|
||||
|
||||
// 上传zip文件到GridFS
|
||||
func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err error) {
|
||||
func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, md5 string, err error) {
|
||||
fid = ""
|
||||
|
||||
// 获取MongoDB GridFS连接
|
||||
@@ -48,7 +48,7 @@ func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err er
|
||||
err = ReadFileByStep(filePath, WriteToGridFS, f)
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
return "", err
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
// 删除zip文件
|
||||
@@ -58,12 +58,12 @@ func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err er
|
||||
}
|
||||
// 关闭文件,提交写入
|
||||
if err = f.Close(); err != nil {
|
||||
return "", err
|
||||
return "", "", err
|
||||
}
|
||||
// 文件ID
|
||||
fid = f.Id().(bson.ObjectId)
|
||||
|
||||
return fid, nil
|
||||
return fid, f.MD5(), nil
|
||||
}
|
||||
|
||||
func WriteToGridFS(content []byte, f *mgo.GridFile) {
|
||||
@@ -96,45 +96,59 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre
|
||||
}
|
||||
|
||||
// 发布所有爬虫
|
||||
func PublishAllSpiders() error {
|
||||
func PublishAllSpiders() {
|
||||
// 获取爬虫列表
|
||||
spiders, err := model.GetSpiderList(nil, 0, constants.Infinite)
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
return err
|
||||
spiders, _ := model.GetSpiderList(nil, 0, constants.Infinite)
|
||||
if len(spiders) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 遍历爬虫列表
|
||||
for _, spider := range spiders {
|
||||
// 发布爬虫
|
||||
if err := PublishSpider(spider); err != nil {
|
||||
log.Errorf("publish spider error:" + err.Error())
|
||||
// return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func PublishAllSpidersJob() {
|
||||
if err := PublishAllSpiders(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
// 异步发布爬虫
|
||||
go func() {
|
||||
PublishSpider(spider)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// 发布爬虫
|
||||
func PublishSpider(spider model.Spider) (err error) {
|
||||
func PublishSpider(spider model.Spider) {
|
||||
s, gf := database.GetGridFs("files")
|
||||
defer s.Close()
|
||||
|
||||
gfFile := model.GetGridFs(spider.FileId)
|
||||
if gfFile == nil {
|
||||
_ = model.RemoveSpider(spider.FileId)
|
||||
return
|
||||
}
|
||||
|
||||
// 爬虫文件没有变化
|
||||
if spider.Md5 == spider.OldMd5 {
|
||||
return
|
||||
}
|
||||
|
||||
//爬虫文件有变化,先删除本地文件
|
||||
_ = os.Remove(filepath.Join(
|
||||
viper.GetString("spider.path"),
|
||||
spider.Name,
|
||||
))
|
||||
|
||||
// 重新下载爬虫文件
|
||||
node, _ := GetCurrentNode()
|
||||
key := node.Id.Hex() + "#" + spider.Id.Hex()
|
||||
if _, err := database.RedisClient.HGet("spider", key); err == nil {
|
||||
log.Infof("downloading spider")
|
||||
return
|
||||
}
|
||||
_ = database.RedisClient.HSet("spider", key, key)
|
||||
defer database.RedisClient.HDel("spider", key)
|
||||
|
||||
f, err := gf.OpenId(spider.FileId)
|
||||
defer f.Close()
|
||||
if err != nil {
|
||||
log.Errorf("open file id: " + spider.FileId.Hex() + ", spider id:" + spider.Id.Hex() + ", error: " + err.Error())
|
||||
debug.PrintStack()
|
||||
// 爬虫和文件没有对应,则删除爬虫
|
||||
_ = model.RemoveSpider(spider.Id)
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
// 生成唯一ID
|
||||
@@ -143,7 +157,7 @@ func PublishSpider(spider model.Spider) (err error) {
|
||||
if !utils.Exists(tmpPath) {
|
||||
if err := os.MkdirAll(tmpPath, 0777); err != nil {
|
||||
log.Errorf("mkdir other.tmppath error: %v", err.Error())
|
||||
return err
|
||||
return
|
||||
}
|
||||
}
|
||||
// 创建临时文件
|
||||
@@ -152,7 +166,7 @@ func PublishSpider(spider model.Spider) (err error) {
|
||||
if err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
return
|
||||
}
|
||||
defer tmpFile.Close()
|
||||
|
||||
@@ -160,7 +174,7 @@ func PublishSpider(spider model.Spider) (err error) {
|
||||
if _, err := io.Copy(tmpFile, f); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
// 解压缩临时文件到目标文件夹
|
||||
@@ -170,31 +184,33 @@ func PublishSpider(spider model.Spider) (err error) {
|
||||
if err := utils.DeCompress(tmpFile, dstPath); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
// 关闭临时文件
|
||||
if err := tmpFile.Close(); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
// 删除临时文件
|
||||
if err := os.Remove(tmpFilePath); err != nil {
|
||||
log.Errorf(err.Error())
|
||||
debug.PrintStack()
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
return nil
|
||||
// 修改spider的MD5和上一次的MD一致
|
||||
spider.OldMd5 = spider.Md5
|
||||
_ = spider.Save()
|
||||
}
|
||||
|
||||
// 启动爬虫服务
|
||||
func InitSpiderService() error {
|
||||
// 构造定时任务执行器
|
||||
c := cron.New(cron.WithSeconds())
|
||||
if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil {
|
||||
if _, err := c.AddFunc("0/15 * * * * *", PublishAllSpiders); err != nil {
|
||||
return err
|
||||
}
|
||||
// 启动定时任务
|
||||
|
||||
Reference in New Issue
Block a user