From 6df6155dafc85cfdbf1c292c985cd266b959bc1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Thu, 26 Sep 2019 16:26:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=88=AC=E8=99=AB=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/model/file.go | 25 +++++++++++ backend/model/spider.go | 15 +++++++ backend/routes/spider.go | 67 +++++++++++++++-------------- backend/services/spider.go | 86 ++++++++++++++++++++++---------------- 4 files changed, 127 insertions(+), 66 deletions(-) diff --git a/backend/model/file.go b/backend/model/file.go index 3cea7b39..f0968086 100644 --- a/backend/model/file.go +++ b/backend/model/file.go @@ -1,11 +1,23 @@ package model import ( + "crawlab/database" "crawlab/utils" "github.com/apex/log" + "github.com/globalsign/mgo/bson" "os" + "time" ) +type GridFs struct { + Id bson.ObjectId `json:"_id" bson:"_id"` + ChunkSize int32 `json:"chunk_size" bson:"chunkSize"` + UploadDate time.Time `json:"upload_date" bson:"uploadDate"` + Length int32 `json:"length" bson:"length"` + Md5 string `json:"md_5" bson:"md5"` + Filename string `json:"filename" bson:"filename"` +} + type File struct { Name string `json:"name"` Path string `json:"path"` @@ -13,6 +25,19 @@ type File struct { Size int64 `json:"size"` } +func GetGridFs(id bson.ObjectId) *GridFs { + s, gf := database.GetGridFs("files") + defer s.Close() + + var gfFile GridFs + err := gf.Find(bson.M{"_id": id}).One(&gfFile) + if err != nil { + log.Errorf("get gf file error: %s, file_id: %s", err.Error(), id.Hex()) + return nil + } + return &gfFile +} + func RemoveFile(path string) error { if !utils.Exists(path) { log.Info("file not found: " + path) diff --git a/backend/model/spider.go b/backend/model/spider.go index e0e5f836..c498287d 100644 --- a/backend/model/spider.go +++ b/backend/model/spider.go @@ -24,6 +24,8 @@ type Spider struct { Site string `json:"site"` // 爬虫网站 Envs []Env `json:"envs" bson:"envs"` // 环境变量 Remark string `json:"remark"` // 备注 + Md5 string `json:"md_5" bson:"md5"` // ZIP文件的MD5 + OldMd5 string `json:"old_md_5" bson:"old_md5"` //上一次的MD5值 // 自定义爬虫 Src string `json:"src" bson:"src"` // 源码位置 Cmd string `json:"cmd" bson:"cmd"` // 执行命令 @@ -122,6 +124,19 @@ func GetSpiderList(filter interface{}, skip int, limit int) ([]Spider, error) { return spiders, nil } +func GetSpiderByName(name string) *Spider { + s, c := database.GetCol("spiders") + defer s.Close() + + var result *Spider + if err := c.Find(bson.M{"name": name}).One(result); err != nil { + log.Errorf("get spider error: %s", err.Error()) + debug.PrintStack() + return result + } + return result +} + func GetSpider(id bson.ObjectId) (Spider, error) { s, c := database.GetCol("spiders") defer s.Close() diff --git a/backend/routes/spider.go b/backend/routes/spider.go index 07e84e21..9b8bd50d 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -79,18 +79,6 @@ func PostSpider(c *gin.Context) { }) } -func PublishAllSpiders(c *gin.Context) { - if err := services.PublishAllSpiders(); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } - - c.JSON(http.StatusOK, Response{ - Status: "ok", - Message: "success", - }) -} - func PublishSpider(c *gin.Context) { id := c.Param("id") @@ -104,10 +92,7 @@ func PublishSpider(c *gin.Context) { return } - if err := services.PublishSpider(spider); err != nil { - HandleError(http.StatusInternalServerError, c, err) - return - } + services.PublishSpider(spider) c.JSON(http.StatusOK, Response{ Status: "ok", @@ -117,7 +102,7 @@ func PublishSpider(c *gin.Context) { func PutSpider(c *gin.Context) { // 从body中获取文件 - file, err := c.FormFile("file") + uploadFile, err := c.FormFile("file") if err != nil { debug.PrintStack() HandleError(http.StatusInternalServerError, c, err) @@ -125,7 +110,7 @@ func PutSpider(c *gin.Context) { } // 如果不为zip文件,返回错误 - if !strings.HasSuffix(file.Filename, ".zip") { + if !strings.HasSuffix(uploadFile.Filename, ".zip") { debug.PrintStack() HandleError(http.StatusBadRequest, c, errors.New("Not a valid zip file")) return @@ -145,31 +130,50 @@ func PutSpider(c *gin.Context) { // 保存到本地临时文件 randomId := uuid.NewV4() tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip") - if err := c.SaveUploadedFile(file, tmpFilePath); err != nil { + if err := c.SaveUploadedFile(uploadFile, tmpFilePath); err != nil { log.Error("save upload file error: " + err.Error()) debug.PrintStack() HandleError(http.StatusInternalServerError, c, err) return } + s, gf := database.GetGridFs("files") + defer s.Close() + + // 判断文件是否已经存在 + var gfFile model.GridFs + if err := gf.Find(bson.M{"filename": uploadFile.Filename}).One(&gfFile); err == nil { + // 已经存在文件,则删除 + _ = gf.RemoveId(gfFile.Id) + } + // 上传到GridFs - fid, err := services.UploadToGridFs(file.Filename, tmpFilePath) + fid, md5, err := services.UploadToGridFs(uploadFile.Filename, tmpFilePath) if err != nil { log.Errorf("upload to grid fs error: %s", err.Error()) debug.PrintStack() return } - - // 保存爬虫信息 - srcPath := viper.GetString("spider.path") - spider := model.Spider{ - Name: file.Filename, - DisplayName: file.Filename, - Type: constants.Customized, - Src: filepath.Join(srcPath, file.Filename), - FileId: fid, + // 判断爬虫是否存在 + spiderName := strings.Replace(uploadFile.Filename, ".zip", "", -1) + spider := model.GetSpiderByName(spiderName) + if spider == nil { + // 保存爬虫信息 + srcPath := viper.GetString("spider.path") + spider := model.Spider{ + Name: spiderName, + DisplayName: spiderName, + Type: constants.Customized, + Src: filepath.Join(srcPath, spiderName), + FileId: fid, + Md5: md5, + } + _ = spider.Add() + } else { + spider.OldMd5 = spider.Md5 + spider.Md5 = md5 + _ = spider.Save() } - _ = spider.Save() c.JSON(http.StatusOK, Response{ Status: "ok", @@ -259,7 +263,8 @@ func GetSpiderDir(c *gin.Context) { } // 获取目录下文件列表 - f, err := ioutil.ReadDir(filepath.Join(spider.Src, path)) + spiderPath := viper.GetString("spider.path") + f, err := ioutil.ReadDir(filepath.Join(spiderPath, spider.Name, path)) if err != nil { HandleError(http.StatusInternalServerError, c, err) return diff --git a/backend/services/spider.go b/backend/services/spider.go index 620cede4..4a32c023 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -30,7 +30,7 @@ type SpiderUploadMessage struct { } // 上传zip文件到GridFS -func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err error) { +func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, md5 string, err error) { fid = "" // 获取MongoDB GridFS连接 @@ -48,7 +48,7 @@ func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err er err = ReadFileByStep(filePath, WriteToGridFS, f) if err != nil { debug.PrintStack() - return "", err + return "", "", err } // 删除zip文件 @@ -58,12 +58,12 @@ func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err er } // 关闭文件,提交写入 if err = f.Close(); err != nil { - return "", err + return "", "", err } // 文件ID fid = f.Id().(bson.ObjectId) - return fid, nil + return fid, f.MD5(), nil } func WriteToGridFS(content []byte, f *mgo.GridFile) { @@ -96,45 +96,59 @@ func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCre } // 发布所有爬虫 -func PublishAllSpiders() error { +func PublishAllSpiders() { // 获取爬虫列表 - spiders, err := model.GetSpiderList(nil, 0, constants.Infinite) - if err != nil { - log.Errorf(err.Error()) - return err + spiders, _ := model.GetSpiderList(nil, 0, constants.Infinite) + if len(spiders) == 0 { + return } - // 遍历爬虫列表 for _, spider := range spiders { - // 发布爬虫 - if err := PublishSpider(spider); err != nil { - log.Errorf("publish spider error:" + err.Error()) - // return err - } - } - - return nil -} - -func PublishAllSpidersJob() { - if err := PublishAllSpiders(); err != nil { - log.Errorf(err.Error()) + // 异步发布爬虫 + go func() { + PublishSpider(spider) + }() } } // 发布爬虫 -func PublishSpider(spider model.Spider) (err error) { +func PublishSpider(spider model.Spider) { s, gf := database.GetGridFs("files") defer s.Close() + gfFile := model.GetGridFs(spider.FileId) + if gfFile == nil { + _ = model.RemoveSpider(spider.FileId) + return + } + + // 爬虫文件没有变化 + if spider.Md5 == spider.OldMd5 { + return + } + + //爬虫文件有变化,先删除本地文件 + _ = os.Remove(filepath.Join( + viper.GetString("spider.path"), + spider.Name, + )) + + // 重新下载爬虫文件 + node, _ := GetCurrentNode() + key := node.Id.Hex() + "#" + spider.Id.Hex() + if _, err := database.RedisClient.HGet("spider", key); err == nil { + log.Infof("downloading spider") + return + } + _ = database.RedisClient.HSet("spider", key, key) + defer database.RedisClient.HDel("spider", key) + f, err := gf.OpenId(spider.FileId) defer f.Close() if err != nil { log.Errorf("open file id: " + spider.FileId.Hex() + ", spider id:" + spider.Id.Hex() + ", error: " + err.Error()) debug.PrintStack() - // 爬虫和文件没有对应,则删除爬虫 - _ = model.RemoveSpider(spider.Id) - return err + return } // 生成唯一ID @@ -143,7 +157,7 @@ func PublishSpider(spider model.Spider) (err error) { if !utils.Exists(tmpPath) { if err := os.MkdirAll(tmpPath, 0777); err != nil { log.Errorf("mkdir other.tmppath error: %v", err.Error()) - return err + return } } // 创建临时文件 @@ -152,7 +166,7 @@ func PublishSpider(spider model.Spider) (err error) { if err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } defer tmpFile.Close() @@ -160,7 +174,7 @@ func PublishSpider(spider model.Spider) (err error) { if _, err := io.Copy(tmpFile, f); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } // 解压缩临时文件到目标文件夹 @@ -170,31 +184,33 @@ func PublishSpider(spider model.Spider) (err error) { if err := utils.DeCompress(tmpFile, dstPath); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } // 关闭临时文件 if err := tmpFile.Close(); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } // 删除临时文件 if err := os.Remove(tmpFilePath); err != nil { log.Errorf(err.Error()) debug.PrintStack() - return err + return } - return nil + // 修改spider的MD5和上一次的MD一致 + spider.OldMd5 = spider.Md5 + _ = spider.Save() } // 启动爬虫服务 func InitSpiderService() error { // 构造定时任务执行器 c := cron.New(cron.WithSeconds()) - if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil { + if _, err := c.AddFunc("0/15 * * * * *", PublishAllSpiders); err != nil { return err } // 启动定时任务