From 475bf4c69ee8684b67d007a8696ae4fd58589812 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=99=AF=E9=98=B3?= <1656488874@qq.com> Date: Thu, 26 Sep 2019 11:38:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=88=AC=E8=99=AB=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/routes/spider.go | 44 ------------------------ backend/services/spider.go | 70 ++------------------------------------ 2 files changed, 3 insertions(+), 111 deletions(-) diff --git a/backend/routes/spider.go b/backend/routes/spider.go index 8a9e643f..07e84e21 100644 --- a/backend/routes/spider.go +++ b/backend/routes/spider.go @@ -171,50 +171,6 @@ func PutSpider(c *gin.Context) { } _ = spider.Save() - // 读取临时文件 - //tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777) - //if err != nil { - // debug.PrintStack() - // HandleError(http.StatusInternalServerError, c, err) - // return - //} - //if err = tmpFile.Close(); err != nil { - // debug.PrintStack() - // HandleError(http.StatusInternalServerError, c, err) - // return - //} - - // 目标目录 - //dstPath := filepath.Join( - // viper.GetString("spider.path"), - // strings.Replace(file.Filename, ".zip", "", 1), - //) - - // 如果目标目录已存在,删除目标目录 - //if utils.Exists(dstPath) { - // if err := os.RemoveAll(dstPath); err != nil { - // debug.PrintStack() - // HandleError(http.StatusInternalServerError, c, err) - // } - //} - - // 将临时文件解压到爬虫目录 - //if err := utils.DeCompress(tmpFile, dstPath); err != nil { - // debug.PrintStack() - // HandleError(http.StatusInternalServerError, c, err) - // return - //} - - // 删除临时文件 - //if err = os.Remove(tmpFilePath); err != nil { - // debug.PrintStack() - // HandleError(http.StatusInternalServerError, c, err) - // return - //} - - // 更新爬虫 - // services.UpdateSpiders() - c.JSON(http.StatusOK, Response{ Status: "ok", Message: "success", diff --git a/backend/services/spider.go b/backend/services/spider.go index bcf98700..620cede4 100644 --- a/backend/services/spider.go +++ b/backend/services/spider.go @@ -1,18 +1,15 @@ package services import ( - "context" "crawlab/constants" "crawlab/database" "crawlab/lib/cron" "crawlab/model" "crawlab/utils" - "encoding/json" "fmt" "github.com/apex/log" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" - "github.com/gomodule/redigo/redis" "github.com/satori/go.uuid" "github.com/spf13/viper" "io" @@ -126,11 +123,7 @@ func PublishAllSpidersJob() { } // 发布爬虫 -// 1. 将源文件夹打包为zip文件 -// 2. 上传zip文件到GridFS -// 3. 发布消息给工作节点 func PublishSpider(spider model.Spider) (err error) { - s, gf := database.GetGridFs("files") defer s.Close() @@ -144,48 +137,6 @@ func PublishSpider(spider model.Spider) (err error) { return err } - // 发布消息给工作节点 - msg := SpiderUploadMessage{ - FileId: spider.FileId.Hex(), - FileName: f.Name(), - SpiderId: spider.Id.Hex(), - } - msgStr, err := json.Marshal(msg) - if err != nil { - return - } - channel := "files:upload" - if _, err = database.RedisClient.Publish(channel, utils.BytesToString(msgStr)); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return - } - - return -} - -// 上传爬虫回调 -func OnFileUpload(message redis.Message) (err error) { - s, gf := database.GetGridFs("files") - defer s.Close() - - // 反序列化消息 - var msg SpiderUploadMessage - if err := json.Unmarshal(message.Data, &msg); err != nil { - log.Errorf(err.Error()) - debug.PrintStack() - return err - } - - // 从GridFS获取该文件 - f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId)) - defer f.Close() - if err != nil { - log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error()) - debug.PrintStack() - return err - } - // 生成唯一ID randomId := uuid.NewV4() tmpPath := viper.GetString("other.tmppath") @@ -197,7 +148,6 @@ func OnFileUpload(message redis.Message) (err error) { } // 创建临时文件 tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip") - tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm) if err != nil { log.Errorf(err.Error()) @@ -236,6 +186,7 @@ func OnFileUpload(message redis.Message) (err error) { debug.PrintStack() return err } + return nil } @@ -243,24 +194,9 @@ func OnFileUpload(message redis.Message) (err error) { func InitSpiderService() error { // 构造定时任务执行器 c := cron.New(cron.WithSeconds()) - - if IsMaster() { - // 主节点 - - // 每60秒同步爬虫给工作节点 - if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil { - return err - } - } else { - // 非主节点 - - // 订阅文件上传 - channel := "files:upload" - ctx := context.Background() - return database.RedisClient.Subscribe(ctx, OnFileUpload, channel) - + if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil { + return err } - // 启动定时任务 c.Start()