fix 消息无法订阅问题

fix 可能出现重复爬虫的问题
This commit is contained in:
陈景阳
2019-09-10 14:26:50 +08:00
parent 000b4ff730
commit 42e7647cd4
2 changed files with 37 additions and 12 deletions

View File

@@ -19,7 +19,7 @@ func (r *Redis) Close() {
}
func (r *Redis) subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
psc := redis.PubSubConn{Conn: r.pool.Get()}
if err := psc.Subscribe(redis.Args{}.AddFlat(channel)); err != nil {
if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
return err
}
done := make(chan error, 1)

View File

@@ -109,13 +109,28 @@ func SaveSpiders(spiders []model.Spider) error {
if spider.Type != constants.Customized {
continue
}
var spider_ *model.Spider
if err := c.Find(bson.M{"src": spider.Src}).One(&spider_); err != nil {
spider_ := []*model.Spider{}
_ = c.Find(bson.M{"src": spider.Src}).All(&spider_)
// 以防出现多个重复的爬虫
if len(spider_) > 1 {
if _, err := c.RemoveAll(bson.M{"src": spider.Src}); err != nil {
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
debug.PrintStack()
continue
}
if err := spider.Add(); err != nil {
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
debug.PrintStack()
continue
}
continue
}
if len(spider_) == 0 {
// 不存在
if err := spider.Add(); err != nil {
log.Errorf("remove spider error: %v, src:%v", err.Error(), spider.Src)
debug.PrintStack()
return err
continue
}
}
}
@@ -151,11 +166,14 @@ func ZipSpider(spider model.Spider) (filePath string, err error) {
// 临时文件路径
randomId := uuid.NewV4()
filePath = filepath.Join(
viper.GetString("other.tmppath"),
randomId.String()+".zip",
)
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.MkdirAll(tmpPath, 0777); err != nil {
log.Errorf("mkdir other.tmppath error: %v", err.Error())
return "", err
}
}
filePath = filepath.Join(tmpPath, randomId.String()+".zip")
// 将源文件夹打包为zip文件
d, err := os.Open(spider.Src)
if err != nil {
@@ -340,9 +358,16 @@ func OnFileUpload(message redis.Message) (err error) {
// 生成唯一ID
randomId := uuid.NewV4()
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.MkdirAll(tmpPath, 0777); err != nil {
log.Errorf("mkdir other.tmppath error: %v", err.Error())
return err
}
}
// 创建临时文件
tmpFilePath := filepath.Join(viper.GetString("other.tmppath"), randomId.String()+".zip")
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
log.Errorf(err.Error())