mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-23 17:31:11 +01:00
1. Mongo dial add 5 seconds connection timeout. 2. Redis uses connection pool mode. 3. Redis pool new connection have 10 seconds write timeout and read timeout and connection timeout.
416 lines
8.6 KiB
Go
416 lines
8.6 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"crawlab/constants"
|
|
"crawlab/database"
|
|
"crawlab/lib/cron"
|
|
"crawlab/model"
|
|
"crawlab/utils"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/apex/log"
|
|
"github.com/globalsign/mgo"
|
|
"github.com/globalsign/mgo/bson"
|
|
"github.com/gomodule/redigo/redis"
|
|
"github.com/pkg/errors"
|
|
"github.com/satori/go.uuid"
|
|
"github.com/spf13/viper"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime/debug"
|
|
"strings"
|
|
"syscall"
|
|
)
|
|
|
|
type SpiderFileData struct {
|
|
FileName string
|
|
File []byte
|
|
}
|
|
|
|
type SpiderUploadMessage struct {
|
|
FileId string
|
|
FileName string
|
|
SpiderId string
|
|
}
|
|
|
|
// 从项目目录中获取爬虫列表
|
|
func GetSpidersFromDir() ([]model.Spider, error) {
|
|
// 爬虫项目目录路径
|
|
srcPath := viper.GetString("spider.path")
|
|
|
|
// 如果爬虫项目目录不存在,则创建一个
|
|
if !utils.Exists(srcPath) {
|
|
mask := syscall.Umask(0) // 改为 0000 八进制
|
|
defer syscall.Umask(mask) // 改为原来的 umask
|
|
if err := os.MkdirAll(srcPath, 0766); err != nil {
|
|
debug.PrintStack()
|
|
return []model.Spider{}, err
|
|
}
|
|
}
|
|
|
|
// 获取爬虫项目目录下的所有子项
|
|
items, err := ioutil.ReadDir(srcPath)
|
|
if err != nil {
|
|
debug.PrintStack()
|
|
return []model.Spider{}, err
|
|
}
|
|
|
|
// 定义爬虫列表
|
|
spiders := make([]model.Spider, 0)
|
|
|
|
// 遍历所有子项
|
|
for _, item := range items {
|
|
// 忽略不为目录的子项
|
|
if !item.IsDir() {
|
|
continue
|
|
}
|
|
|
|
// 忽略隐藏目录
|
|
if strings.HasPrefix(item.Name(), ".") {
|
|
continue
|
|
}
|
|
|
|
// 构造爬虫
|
|
spider := model.Spider{
|
|
Name: item.Name(),
|
|
DisplayName: item.Name(),
|
|
Type: constants.Customized,
|
|
Src: filepath.Join(srcPath, item.Name()),
|
|
FileId: bson.ObjectIdHex(constants.ObjectIdNull),
|
|
}
|
|
|
|
// 将爬虫加入列表
|
|
spiders = append(spiders, spider)
|
|
}
|
|
|
|
return spiders, nil
|
|
}
|
|
|
|
// 将爬虫保存到数据库
|
|
func SaveSpiders(spiders []model.Spider) error {
|
|
// 遍历爬虫列表
|
|
for _, spider := range spiders {
|
|
// 忽略非自定义爬虫
|
|
if spider.Type != constants.Customized {
|
|
continue
|
|
}
|
|
|
|
// 如果该爬虫不存在于数据库,则保存爬虫到数据库
|
|
s, c := database.GetCol("spiders")
|
|
defer s.Close()
|
|
var spider_ *model.Spider
|
|
if err := c.Find(bson.M{"src": spider.Src}).One(&spider_); err != nil {
|
|
// 不存在
|
|
if err := spider.Add(); err != nil {
|
|
debug.PrintStack()
|
|
return err
|
|
}
|
|
} else {
|
|
// 存在
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// 更新爬虫
|
|
func UpdateSpiders() {
|
|
// 从项目目录获取爬虫列表
|
|
spiders, err := GetSpidersFromDir()
|
|
if err != nil {
|
|
log.Errorf(err.Error())
|
|
return
|
|
}
|
|
|
|
// 储存爬虫
|
|
if err := SaveSpiders(spiders); err != nil {
|
|
log.Errorf(err.Error())
|
|
return
|
|
}
|
|
}
|
|
|
|
// 打包爬虫目录为zip文件
|
|
func ZipSpider(spider model.Spider) (filePath string, err error) {
|
|
// 如果源文件夹不存在,抛错
|
|
if !utils.Exists(spider.Src) {
|
|
debug.PrintStack()
|
|
// 删除该爬虫,否则会一直报错
|
|
_ = model.RemoveSpider(spider.Id)
|
|
return "", errors.New("source path does not exist")
|
|
}
|
|
|
|
// 临时文件路径
|
|
randomId := uuid.NewV4()
|
|
|
|
filePath = filepath.Join(
|
|
viper.GetString("other.tmppath"),
|
|
randomId.String()+".zip",
|
|
)
|
|
|
|
// 将源文件夹打包为zip文件
|
|
d, err := os.Open(spider.Src)
|
|
if err != nil {
|
|
debug.PrintStack()
|
|
return filePath, err
|
|
}
|
|
var files []*os.File
|
|
files = append(files, d)
|
|
if err := utils.Compress(files, filePath); err != nil {
|
|
return filePath, err
|
|
}
|
|
|
|
return filePath, nil
|
|
}
|
|
|
|
// 上传zip文件到GridFS
|
|
func UploadToGridFs(spider model.Spider, fileName string, filePath string) (fid bson.ObjectId, err error) {
|
|
fid = ""
|
|
|
|
// 获取MongoDB GridFS连接
|
|
s, gf := database.GetGridFs("files")
|
|
defer s.Close()
|
|
|
|
// 如果存在FileId删除GridFS上的老文件
|
|
if !utils.IsObjectIdNull(spider.FileId) {
|
|
if err = gf.RemoveId(spider.FileId); err != nil {
|
|
log.Error("remove gf file:" + err.Error())
|
|
debug.PrintStack()
|
|
}
|
|
}
|
|
|
|
// 创建一个新GridFS文件
|
|
f, err := gf.Create(fileName)
|
|
if err != nil {
|
|
debug.PrintStack()
|
|
return
|
|
}
|
|
|
|
//分片读取爬虫zip文件
|
|
err = ReadFileByStep(filePath, WriteToGridFS, f)
|
|
if err != nil {
|
|
debug.PrintStack()
|
|
return "", err
|
|
}
|
|
|
|
// 删除zip文件
|
|
if err = os.Remove(filePath); err != nil {
|
|
debug.PrintStack()
|
|
return
|
|
}
|
|
// 关闭文件,提交写入
|
|
if err = f.Close(); err != nil {
|
|
return "", err
|
|
}
|
|
// 文件ID
|
|
fid = f.Id().(bson.ObjectId)
|
|
|
|
return fid, nil
|
|
}
|
|
|
|
func WriteToGridFS(content []byte, f *mgo.GridFile) {
|
|
if _, err := f.Write(content); err != nil {
|
|
debug.PrintStack()
|
|
return
|
|
}
|
|
}
|
|
|
|
//分片读取大文件
|
|
func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCreate *mgo.GridFile) error {
|
|
f, err := os.OpenFile(filePath, os.O_RDONLY, 0777)
|
|
if err != nil {
|
|
log.Infof("can't opened this file")
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
s := make([]byte, 4096)
|
|
for {
|
|
switch nr, err := f.Read(s[:]); true {
|
|
case nr < 0:
|
|
_, _ = fmt.Fprintf(os.Stderr, "cat: error reading: %s\n", err.Error())
|
|
debug.PrintStack()
|
|
case nr == 0: // EOF
|
|
return nil
|
|
case nr > 0:
|
|
handle(s[0:nr], fileCreate)
|
|
}
|
|
}
|
|
}
|
|
|
|
// 发布所有爬虫
|
|
func PublishAllSpiders() error {
|
|
// 获取爬虫列表
|
|
spiders, err := model.GetSpiderList(nil, 0, constants.Infinite)
|
|
if err != nil {
|
|
log.Errorf(err.Error())
|
|
return err
|
|
}
|
|
|
|
// 遍历爬虫列表
|
|
for _, spider := range spiders {
|
|
// 发布爬虫
|
|
if err := PublishSpider(spider); err != nil {
|
|
log.Errorf(err.Error())
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func PublishAllSpidersJob() {
|
|
if err := PublishAllSpiders(); err != nil {
|
|
log.Errorf(err.Error())
|
|
}
|
|
}
|
|
|
|
// 发布爬虫
|
|
// 1. 将源文件夹打包为zip文件
|
|
// 2. 上传zip文件到GridFS
|
|
// 3. 发布消息给工作节点
|
|
func PublishSpider(spider model.Spider) (err error) {
|
|
// 将源文件夹打包为zip文件
|
|
filePath, err := ZipSpider(spider)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 上传zip文件到GridFS
|
|
fileName := filepath.Base(spider.Src) + ".zip"
|
|
fid, err := UploadToGridFs(spider, fileName, filePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 保存FileId
|
|
spider.FileId = fid
|
|
if err := spider.Save(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 发布消息给工作节点
|
|
msg := SpiderUploadMessage{
|
|
FileId: fid.Hex(),
|
|
FileName: fileName,
|
|
SpiderId: spider.Id.Hex(),
|
|
}
|
|
msgStr, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
channel := "files:upload"
|
|
if _, err = database.RedisClient.Publish(channel, string(msgStr)); err != nil {
|
|
log.Errorf(err.Error())
|
|
debug.PrintStack()
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// 上传爬虫回调
|
|
func OnFileUpload(message redis.Message) (err error) {
|
|
s, gf := database.GetGridFs("files")
|
|
defer s.Close()
|
|
|
|
// 反序列化消息
|
|
var msg SpiderUploadMessage
|
|
if err := json.Unmarshal(message.Data, &msg); err != nil {
|
|
log.Errorf(err.Error())
|
|
debug.PrintStack()
|
|
return err
|
|
}
|
|
|
|
// 从GridFS获取该文件
|
|
f, err := gf.OpenId(bson.ObjectIdHex(msg.FileId))
|
|
if err != nil {
|
|
log.Errorf("open file id: " + msg.FileId + ", spider id:" + msg.SpiderId + ", error: " + err.Error())
|
|
debug.PrintStack()
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
// 生成唯一ID
|
|
randomId := uuid.NewV4()
|
|
|
|
// 创建临时文件
|
|
tmpFilePath := filepath.Join(viper.GetString("other.tmppath"), randomId.String()+".zip")
|
|
tmpFile, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_WRONLY, os.ModePerm)
|
|
if err != nil {
|
|
log.Errorf(err.Error())
|
|
debug.PrintStack()
|
|
return err
|
|
}
|
|
defer tmpFile.Close()
|
|
|
|
// 将该文件写入临时文件
|
|
if _, err := io.Copy(tmpFile, f); err != nil {
|
|
log.Errorf(err.Error())
|
|
debug.PrintStack()
|
|
return err
|
|
}
|
|
|
|
// 解压缩临时文件到目标文件夹
|
|
dstPath := filepath.Join(
|
|
viper.GetString("spider.path"),
|
|
//strings.Replace(msg.FileName, ".zip", "", -1),
|
|
)
|
|
if err := utils.DeCompress(tmpFile, dstPath); err != nil {
|
|
log.Errorf(err.Error())
|
|
debug.PrintStack()
|
|
return err
|
|
}
|
|
|
|
// 关闭临时文件
|
|
if err := tmpFile.Close(); err != nil {
|
|
log.Errorf(err.Error())
|
|
debug.PrintStack()
|
|
return err
|
|
}
|
|
|
|
// 删除临时文件
|
|
if err := os.Remove(tmpFilePath); err != nil {
|
|
log.Errorf(err.Error())
|
|
debug.PrintStack()
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// 启动爬虫服务
|
|
func InitSpiderService() error {
|
|
// 构造定时任务执行器
|
|
c := cron.New(cron.WithSeconds())
|
|
|
|
if IsMaster() {
|
|
// 主节点
|
|
|
|
// 每5秒更新一次爬虫信息
|
|
if _, err := c.AddFunc("*/5 * * * * *", UpdateSpiders); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 每60秒同步爬虫给工作节点
|
|
if _, err := c.AddFunc("0 * * * * *", PublishAllSpidersJob); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
// 非主节点
|
|
|
|
// 订阅文件上传
|
|
channel := "files:upload"
|
|
|
|
//sub.Connect()
|
|
ctx := context.Background()
|
|
return database.RedisClient.Subscribe(ctx, OnFileUpload, channel)
|
|
|
|
}
|
|
|
|
// 启动定时任务
|
|
c.Start()
|
|
|
|
return nil
|
|
}
|