优化爬虫获取逻辑

This commit is contained in:
陈景阳
2019-09-26 11:38:13 +08:00
parent 9237e62a48
commit 475bf4c69e
2 changed files with 3 additions and 111 deletions

View File

@@ -171,50 +171,6 @@ func PutSpider(c *gin.Context) {
}
_ = spider.Save()
// 读取临时文件
//tmpFile, err := os.OpenFile(tmpFilePath, os.O_RDONLY, 0777)
//if err != nil {
// debug.PrintStack()
// HandleError(http.StatusInternalServerError, c, err)
// return
//}
//if err = tmpFile.Close(); err != nil {
// debug.PrintStack()
// HandleError(http.StatusInternalServerError, c, err)
// return
//}
// 目标目录
//dstPath := filepath.Join(
// viper.GetString("spider.path"),
// strings.Replace(file.Filename, ".zip", "", 1),
//)
// 如果目标目录已存在,删除目标目录
//if utils.Exists(dstPath) {
// if err := os.RemoveAll(dstPath); err != nil {
// debug.PrintStack()
// HandleError(http.StatusInternalServerError, c, err)
// }
//}
// 将临时文件解压到爬虫目录
//if err := utils.DeCompress(tmpFile, dstPath); err != nil {
// debug.PrintStack()
// HandleError(http.StatusInternalServerError, c, err)
// return
//}
// 删除临时文件
//if err = os.Remove(tmpFilePath); err != nil {
// debug.PrintStack()
// HandleError(http.StatusInternalServerError, c, err)
// return
//}
// 更新爬虫
// services.UpdateSpiders()
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",

View File

@@ -1,18 +1,15 @@
package services
import (
"context"
"crawlab/constants"
"crawlab/database"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/utils"
"encoding/json"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/gomodule/redigo/redis"
"github.com/satori/go.uuid"
"github.com/spf13/viper"
"io"
@@ -126,11 +123,7 @@ func PublishAllSpidersJob() {
}
// 发布爬虫
// 1. 将源文件夹打包为zip文件
// 2. 上传zip文件到GridFS
// 3. 发布消息给工作节点
func PublishSpider(spider model.Spider) (err error) {
s, gf := database.GetGridFs("files")
defer s.Close()
@@ -144,48 +137,6 @@ func PublishSpider(spider model.Spider) (err error) {
return err
}
// 发布消息给工作节点
msg := SpiderUploadMessage{
FileId: spider.FileId.Hex(),
FileName: f.Name(),
SpiderId: spider.Id.Hex(),
}
msgStr, err := json.Marshal(msg)
if err != nil {
return
}
channel := "files:upload"
if _, err = database.RedisClient.Publish(channel, utils.BytesToString(msgStr)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
return
}
// 上传爬虫回调
func OnFileUpload(message redis.Message) (err error) {
s, gf := database.GetGridFs("files")
defer s.Close()
// 反序列化消息
var msg SpiderUploadMessage
if err := json.Unmarshal(message.Data, &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 从GridFS获取该文件
f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId))
defer f.Close()
if err != nil {
log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error())
debug.PrintStack()
return err
}
// 生成唯一ID
randomId := uuid.NewV4()
tmpPath := viper.GetString("other.tmppath")
@@ -197,7 +148,6 @@ func OnFileUpload(message redis.Message) (err error) {
}
// 创建临时文件
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
log.Errorf(err.Error())
@@ -236,6 +186,7 @@ func OnFileUpload(message redis.Message) (err error) {
debug.PrintStack()
return err
}
return nil
}
@@ -243,24 +194,9 @@ func OnFileUpload(message redis.Message) (err error) {
func InitSpiderService() error {
// 构造定时任务执行器
c := cron.New(cron.WithSeconds())
if IsMaster() {
// 主节点
// 每60秒同步爬虫给工作节点
if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil {
return err
}
} else {
// 非主节点
// 订阅文件上传
channel := "files:upload"
ctx := context.Background()
return database.RedisClient.Subscribe(ctx, OnFileUpload, channel)
if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil {
return err
}
// 启动定时任务
c.Start()