加入可配置爬虫

This commit is contained in:
marvzhang
2019-11-24 17:57:12 +08:00
parent 6d9c2ce653
commit 9c282ddb4d
20 changed files with 638 additions and 61 deletions

View File

@@ -0,0 +1,8 @@
package constants
const (
AnchorStartStage = "START_STAGE"
AnchorStartUrl = "START_URL"
AnchorItems = "ITEMS"
AnchorParsers = "PARSERS"
)

View File

@@ -0,0 +1,6 @@
package constants
const (
EngineScrapy = "scrapy"
EngineColly = "colly"
)

View File

@@ -0,0 +1,5 @@
package constants
const ScrapyProtectedStageNames = "start_requests"
const ScrapyProtectedFieldNames = "_id,task_id,ts"

View File

@@ -1,22 +1,23 @@
package entity
type Field struct {
Name string `yaml:"name" json:"name"`
Css string `yaml:"css" json:"css"`
Xpath string `yaml:"xpath" json:"xpath"`
Attr string `yaml:"attr" json:"attr"`
Stage string `yaml:"stage" json:"stage"`
Name string `yaml:"name" json:"name"`
Css string `yaml:"css" json:"css"`
Xpath string `yaml:"xpath" json:"xpath"`
Attr string `yaml:"attr" json:"attr"`
NextStage string `yaml:"next_stage" json:"next_stage"`
}
type Stage struct {
List bool `yaml:"list" json:"list"`
Css string `yaml:"css" json:"css"`
Xpath string `yaml:"xpath" json:"xpath"`
Fields []Field `yaml:"fields" json:"fields"`
IsList bool `yaml:"is_list" json:"is_list"`
ListCss string `yaml:"list_css" json:"list_css"`
PageCss string `yaml:"page_css" json:"page_css"`
Fields []Field `yaml:"fields" json:"fields"`
}
type ConfigSpiderData struct {
Version string `yaml:"version" json:"version"`
StartUrl string `yaml:"startUrl" json:"start_url"`
Engine string `yaml:"engine" json:"engine"`
StartUrl string `yaml:"start_url" json:"start_url"`
Stages map[string]Stage `yaml:"stages" json:"stages"`
}

View File

@@ -0,0 +1,22 @@
package config_spider
import "crawlab/entity"
func GetAllFields(data entity.ConfigSpiderData) []entity.Field {
var fields []entity.Field
for _, stage := range data.Stages {
if stage.IsList {
for _, field := range stage.Fields {
fields = append(fields, field)
}
}
}
return fields
}
func GetStartStageName(data entity.ConfigSpiderData) string {
for stageName := range data.Stages {
return stageName
}
return ""
}

View File

@@ -0,0 +1,214 @@
package config_spider
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"errors"
"fmt"
"path/filepath"
)
type ScrapyGenerator struct {
Spider model.Spider
ConfigData entity.ConfigSpiderData
}
// 生成爬虫文件
func (g ScrapyGenerator) Generate() error {
// 生成 items.py
if err := g.ProcessItems(); err != nil {
return err
}
// 生成 spider.py
if err := g.ProcessSpider(); err != nil {
return err
}
return nil
}
// 生成 items.py
func (g ScrapyGenerator) ProcessItems() error {
// 待处理文件名
src := g.Spider.Src
filePath := filepath.Join(src, "items.py")
// 获取所有字段
fields := g.GetAllFields()
// 字段名列表(包含默认字段名)
fieldNames := []string{
"_id",
"task_id",
"ts",
}
// 加入字段
for _, field := range fields {
fieldNames = append(fieldNames, field.Name)
}
// 将字段名转化为python代码
str := ""
for _, fieldName := range fieldNames {
line := fmt.Sprintf("%s = scrapy.Field()", fieldName)
str += line
}
// 将占位符替换为代码
if err := utils.SetFileVariable(filePath, constants.AnchorItems, str); err != nil {
return err
}
return nil
}
// 生成 spider.py
func (g ScrapyGenerator) ProcessSpider() error {
// 待处理文件名
src := g.Spider.Src
filePath := filepath.Join(src, "spiders", "spider.py")
// 替换 start_stage
if err := utils.SetFileVariable(filePath, constants.AnchorStartStage, GetStartStageName(g.ConfigData)); err != nil {
return err
}
// 替换 start_url
if err := utils.SetFileVariable(filePath, constants.AnchorStartUrl, g.ConfigData.StartUrl); err != nil {
return err
}
// 替换 parsers
strParser := ""
for stageName, stage := range g.ConfigData.Stages {
stageStr := g.GetParserString(stageName, stage)
strParser += stageStr
}
if err := utils.SetFileVariable(filePath, constants.AnchorParsers, strParser); err != nil {
return err
}
return nil
}
func (g ScrapyGenerator) GetParserString(stageName string, stage entity.Stage) string {
// 构造函数定义行
strDef := g.PadCode(fmt.Sprintf("def %s(self, response):", stageName), 1)
strParse := ""
if stage.IsList {
// 列表逻辑
strParse = g.GetListParserString(stage)
} else {
// 非列表逻辑
strParse = g.GetNonListParserString(stage)
}
// 构造
str := fmt.Sprintf(`%s%s`, strDef, strParse)
return str
}
func (g ScrapyGenerator) PadCode(str string, num int) string {
res := ""
for i := 0; i < num; i++ {
res += "\t"
}
res += str
res += "\n"
return res
}
func (g ScrapyGenerator) GetNonListParserString(stage entity.Stage) string {
str := ""
// 获取或构造item
str += g.PadCode("item = Item() if response.meta.get('item') is None else response.meta.get('item')", 2)
// 遍历字段列表
for _, f := range stage.Fields {
line := ""
if f.Attr == "" {
line += fmt.Sprintf(`item['%s'] = response.css('%s::text).extract_first()'`, f.Name, f.Css)
} else {
line += fmt.Sprintf(`item['%s'] = response.css('%s::attr("%s")).extract_first()'`, f.Name, f.Css, f.Attr)
}
line = g.PadCode(line, 2)
}
// next stage 字段
if f, err := g.GetNextStageField(stage); err == nil {
// 如果找到 next stage 字段,进行下一个回调
str += g.PadCode(fmt.Sprintf(`yield scrapy.Request(url="item['%s']", callback='%s', meta={'item': item})`, f.Name, f.NextStage), 3)
} else {
// 如果没找到 next stage 字段,返回 item
str += g.PadCode(fmt.Sprintf(`yield item`), 3)
}
return str
}
func (g ScrapyGenerator) GetListParserString(stage entity.Stage) string {
str := ""
// 获取前一个 stage 的 item
str += g.PadCode(`prev_item = response.meta.get('item')`, 2)
// for 循环遍历列表
str += g.PadCode(fmt.Sprintf(`for elem in response.css('%s')`, stage.ListCss), 2)
// 构造item
str += g.PadCode(`item = Item()`, 3)
// 遍历字段列表
for _, f := range stage.Fields {
line := ""
if f.Attr == "" {
line += fmt.Sprintf(`item['%s'] = elem.css('%s::text).extract_first()'`, f.Name, f.Css)
} else {
line += fmt.Sprintf(`item['%s'] = elem.css('%s::attr("%s")).extract_first()'`, f.Name, f.Css, f.Attr)
}
line = g.PadCode(line, 3)
}
// 把前一个 stage 的 item 值赋给当前 item
str += g.PadCode(`if prev_item is not None:`, 3)
str += g.PadCode(`for key, value in prev_item.items():`, 4)
str += g.PadCode(`item[key] = value`, 5)
// next stage 字段
if f, err := g.GetNextStageField(stage); err == nil {
// 如果找到 next stage 字段,进行下一个回调
str += g.PadCode(fmt.Sprintf(`yield scrapy.Request(url="item['%s']", callback='%s', meta={'item': item})`, f.Name, f.NextStage), 3)
} else {
// 如果没找到 next stage 字段,返回 item
str += g.PadCode(fmt.Sprintf(`yield item`), 3)
}
// 分页
if stage.PageCss != "" {
str += g.PadCode(fmt.Sprintf(`next_url = response.css('%s').extract_first()`, stage.PageCss), 2)
str += g.PadCode(`yield scrapy.Request(url=next_url, meta={'item': item})`, 2)
}
return str
}
// 获取所有字段
func (g ScrapyGenerator) GetAllFields() []entity.Field {
return GetAllFields(g.ConfigData)
}
// 获取包含 next stage 的字段
func (g ScrapyGenerator) GetNextStageField(stage entity.Stage) (entity.Field, error) {
for _, field := range stage.Fields {
if field.NextStage != "" {
return field, nil
}
}
return entity.Field{}, errors.New("cannot find next stage field")
}

View File

@@ -4,12 +4,11 @@ import (
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"crawlab/services"
"crawlab/utils"
"fmt"
"github.com/apex/log"
"github.com/gin-gonic/gin"
"github.com/globalsign/mgo/bson"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"gopkg.in/yaml.v2"
"io"
@@ -17,7 +16,6 @@ import (
"net/http"
"os"
"path/filepath"
"runtime/debug"
)
// 添加可配置爬虫
@@ -46,6 +44,20 @@ func PutConfigSpider(c *gin.Context) {
// 将FileId置空
spider.FileId = bson.ObjectIdHex(constants.ObjectIdNull)
// 创建爬虫目录
spiderDir := filepath.Join(viper.GetString("spider.path"), spider.Name)
if utils.Exists(spiderDir) {
if err := os.RemoveAll(spiderDir); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
if err := os.MkdirAll(spiderDir, 0777); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
spider.Src = spiderDir
// 添加爬虫到数据库
if err := spider.Add(); err != nil {
HandleError(http.StatusInternalServerError, c, err)
@@ -64,7 +76,17 @@ func PostConfigSpider(c *gin.Context) {
PostSpider(c)
}
// 上传可配置爬虫Spiderfile
func UploadConfigSpider(c *gin.Context) {
id := c.Param("id")
// 获取爬虫
var spider model.Spider
spider, err := model.GetSpider(bson.ObjectIdHex(id))
if err != nil {
HandleErrorF(http.StatusBadRequest, c, fmt.Sprintf("cannot find spider (id: %s)", id))
}
// 获取上传文件
file, header, err := c.Request.FormFile("file")
if err != nil {
@@ -79,50 +101,99 @@ func UploadConfigSpider(c *gin.Context) {
return
}
// 以防tmp目录不存在
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.MkdirAll(tmpPath, os.ModePerm); err != nil {
log.Error("mkdir other.tmppath dir error:" + err.Error())
debug.PrintStack()
HandleError(http.StatusBadRequest, c, err)
return
// 爬虫目录
spiderDir := filepath.Join(viper.GetString("spider.path"), spider.Name)
// 爬虫Spiderfile文件路径
sfPath := filepath.Join(spiderDir, filename)
// 创建如果不存在或打开Spiderfile如果存在
var f *os.File
if utils.Exists(sfPath) {
f, err = os.OpenFile(sfPath, os.O_WRONLY, 0777)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
}
} else {
f, err = os.Create(sfPath)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
}
}
//创建文件
randomId := uuid.NewV4()
tmpFilePath := filepath.Join(tmpPath, "Spiderfile."+randomId.String())
out, err := os.Create(tmpFilePath)
if err != nil {
}
_, err = io.Copy(out, file)
// 将上传的文件拷贝到爬虫Spiderfile文件
_, err = io.Copy(f, file)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
_ = out.Close()
// 关闭Spiderfile文件
_ = f.Close()
// 构造配置数据
data := entity.ConfigSpiderData{}
configData := entity.ConfigSpiderData{}
// 读取YAML文件
yamlFile, err := ioutil.ReadFile(tmpFilePath)
yamlFile, err := ioutil.ReadFile(sfPath)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// 反序列化
if err := yaml.Unmarshal(yamlFile, &data); err != nil {
if err := yaml.Unmarshal(yamlFile, &configData); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// TODO: 生成爬虫文件
// 删除已有的爬虫文件
for _, fInfo := range utils.ListDir(spiderDir) {
// 不删除Spiderfile
if fInfo.Name() == filename {
continue
}
// 删除其他文件
if err := os.RemoveAll(filepath.Join(spiderDir, fInfo.Name())); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
// 拷贝爬虫文件
tplDir := "./template/scrapy"
for _, fInfo := range utils.ListDir(tplDir) {
// 跳过Spiderfile
if fInfo.Name() == "Spiderfile" {
continue
}
srcPath := filepath.Join(tplDir, fInfo.Name())
if fInfo.IsDir() {
if err := utils.CopyDir(srcPath, spiderDir); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
} else {
if _, err := utils.CopyFile(srcPath, filepath.Join(spiderDir, fInfo.Name())); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
}
}
// 更改爬虫文件
if err := services.GenerateConfigSpiderFiles(spider, configData); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
// TODO: 上传到GridFS
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
Data: data,
Data: configData,
})
}

View File

@@ -0,0 +1,118 @@
package services
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"crawlab/model/config_spider"
"errors"
"fmt"
"strings"
)
func GenerateConfigSpiderFiles(spider model.Spider, configData entity.ConfigSpiderData) error {
// 校验Spiderfile正确性
if err := ValidateSpiderfile(configData); err != nil {
return err
}
// 构造代码生成器
generator := config_spider.ScrapyGenerator{
Spider: spider,
ConfigData: configData,
}
// 生成代码
if err := generator.Generate(); err != nil {
return err
}
return nil
}
// 验证Spiderfile
func ValidateSpiderfile(configData entity.ConfigSpiderData) error {
// 获取所有字段
fields := config_spider.GetAllFields(configData)
// 校验是否存在 start_url
if configData.StartUrl == "" {
return errors.New("spiderfile start_url is empty")
}
// 校验是否存在 stages
if len(configData.Stages) == 0 {
return errors.New("spiderfile stages is empty")
}
// 校验stages
dict := map[string]int{}
for stageName, stage := range configData.Stages {
// stage 名称不能为空
if stageName == "" {
return errors.New("spiderfile stage name is empty")
}
// stage 名称不能为保留字符串
// NOTE: 如果有其他Engine可以扩展默认为Scrapy
if configData.Engine == "" || configData.Engine == constants.EngineScrapy {
if strings.Contains(constants.ScrapyProtectedStageNames, stageName) {
return errors.New(fmt.Sprintf("spiderfile stage name '%s' is protected", stageName))
}
} else if configData.Engine == constants.EngineColly {
return errors.New(fmt.Sprintf("engine '%s' is not implemented", stageName))
}
// stage 名称不能重复
if dict[stageName] == 1 {
return errors.New("spiderfile stage name should be unique")
}
dict[stageName] = 1
// stage 字段不能为空
if len(stage.Fields) == 0 {
return errors.New(fmt.Sprintf("spiderfile stage '%s' has no fields", stageName))
}
// stage 的下一个 stage 只能有一个
hasNextStage := false
for _, field := range stage.Fields {
if field.NextStage != "" {
if hasNextStage {
return errors.New("spiderfile stage fields should have only 1 next_stage")
}
hasNextStage = true
}
}
// 如果 stage 的 is_list 为 true 但 list_css 为空,报错
if stage.IsList && stage.ListCss == "" {
return errors.New("spiderfile stage with is_list = true should have list_css being set")
}
}
// 校验字段唯一性
if !IsUniqueConfigSpiderFields(fields) {
return errors.New("spiderfile fields not unique")
}
// 字段名称不能为保留字符串
for _, field := range fields {
if strings.Contains(constants.ScrapyProtectedFieldNames, field.Name) {
return errors.New(fmt.Sprintf("spiderfile field name '%s' is protected", field.Name))
}
}
return nil
}
func IsUniqueConfigSpiderFields(fields []entity.Field) bool {
dict := map[string]int{}
for _, field := range fields {
if dict[field.Name] == 1 {
return false
}
dict[field.Name] = 1
}
return true
}

View File

@@ -117,14 +117,11 @@ func PublishAllSpiders() {
// 发布爬虫
func PublishSpider(spider model.Spider) {
// 查询gf file不存在则标记为爬虫文件不存在
var gfFile *model.GridFs
if spider.Type == constants.Customized {
gfFile = model.GetGridFs(spider.FileId)
if gfFile == nil {
spider.FileId = constants.ObjectIdNull
_ = spider.Save()
return
}
gfFile := model.GetGridFs(spider.FileId)
if gfFile == nil {
spider.FileId = constants.ObjectIdNull
_ = spider.Save()
return
}
// 如果FileId为空表示还没有上传爬虫到GridFS则跳过

View File

@@ -1,12 +1,19 @@
version: 0.4.0
start_url: "https://baidu.com/s?wd=crawlab"
engine: "scrapy"
stages:
stage1:
list: false # default: false
stage_1:
is_list: true # default: false
list_css: "#content_left > .result"
page_css: "#page > a.n:last-child"
fields:
- name: "title"
css: "a"
- name: "url"
css: "a"
attr: "href"
next_stage: "stage_2"
stage_2:
list: false
fields:
- name: ""

View File

@@ -1,11 +0,0 @@
# -*- coding: utf-8 -*-
import scrapy
class SpiderSpider(scrapy.Spider):
name = 'spider'
allowed_domains = ['baidu.com']
start_urls = ['http://baidu.com/']
def parse(self, response):
pass

View File

@@ -8,7 +8,6 @@
import scrapy
class ConfigSpiderItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
class Item(scrapy.Item):
###ITEMS###
pass

View File

@@ -64,9 +64,9 @@ ROBOTSTXT_OBEY = True
# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
#ITEM_PIPELINES = {
# 'config_spider.pipelines.ConfigSpiderPipeline': 300,
#}
ITEM_PIPELINES = {
'config_spider.pipelines.ConfigSpiderPipeline': 300,
}
# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html

View File

@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
import scrapy
from config_spider.items import Item
class ConfigSpider(scrapy.Spider):
name = 'config_spider'
def start_requests(self):
return scrapy.Request(url='###START_URL###', callback='###START_STAGE###')
###PARSERS###

View File

@@ -3,11 +3,15 @@ package utils
import (
"archive/zip"
"bufio"
"errors"
"fmt"
"github.com/apex/log"
"io"
"io/ioutil"
"os"
"path/filepath"
"runtime/debug"
"strings"
)
// 删除文件
@@ -71,6 +75,16 @@ func IsDir(path string) bool {
return s.IsDir()
}
func ListDir(path string) []os.FileInfo {
list, err := ioutil.ReadDir(path)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return nil
}
return list
}
// 判断所给路径是否为文件
func IsFile(path string) bool {
return !IsDir(path)
@@ -239,3 +253,117 @@ func _Compress(file *os.File, prefix string, zw *zip.Writer) error {
}
return nil
}
/**
* 拷贝文件夹,同时拷贝文件夹中的文件
* @param srcPath 需要拷贝的文件夹路径: D:/test
* @param destPath 拷贝到的位置: D:/backup/
*/
func CopyDir(srcPath string, destPath string) error {
// 检测目录正确性
if srcInfo, err := os.Stat(srcPath); err != nil {
fmt.Println(err.Error())
return err
} else {
if !srcInfo.IsDir() {
e := errors.New("srcPath不是一个正确的目录")
fmt.Println(e.Error())
return e
}
}
if destInfo, err := os.Stat(destPath); err != nil {
fmt.Println(err.Error())
return err
} else {
if !destInfo.IsDir() {
e := errors.New("destInfo不是一个正确的目录")
fmt.Println(e.Error())
return e
}
}
err := filepath.Walk(srcPath, func(path string, f os.FileInfo, err error) error {
if f == nil {
return err
}
if !f.IsDir() {
path := strings.Replace(path, "\\", "/", -1)
destNewPath := strings.Replace(path, srcPath, destPath, -1)
_, _ = CopyFile(path, destNewPath)
}
return nil
})
if err != nil {
fmt.Printf(err.Error())
}
return err
}
// 生成目录并拷贝文件
func CopyFile(src, dest string) (w int64, err error) {
srcFile, err := os.Open(src)
if err != nil {
fmt.Println(err.Error())
return
}
defer srcFile.Close()
// 分割path目录
destSplitPathDirs := strings.Split(dest, "/")
// 检测时候存在目录
destSplitPath := ""
for index, dir := range destSplitPathDirs {
if index < len(destSplitPathDirs)-1 {
destSplitPath = destSplitPath + dir + "/"
if !Exists(destSplitPath) {
//创建目录
err := os.Mkdir(destSplitPath, os.ModePerm)
if err != nil {
fmt.Println(err)
}
}
}
}
dstFile, err := os.Create(dest)
if err != nil {
fmt.Println(err.Error())
return
}
defer dstFile.Close()
return io.Copy(dstFile, srcFile)
}
// 设置文件变量值
// 可以理解为将文件中的变量占位符替换为想要设置的值
func SetFileVariable(filePath string, key string, value string) error {
// 占位符标识
sep := "###"
// 读取文件到字节
contentBytes, err := ioutil.ReadFile(filePath)
if err != nil {
return err
}
// 将字节转化为文本
content := string(contentBytes)
// 替换文本
content = strings.ReplaceAll(content, fmt.Sprintf("%s%s%s", sep, key, sep), value)
// 打开文件
f, err := os.OpenFile(filePath, os.O_WRONLY, 0777)
if err != nil {
return err
}
// 将替换后的内容写入文件
if _, err := f.Write([]byte(content)); err != nil {
return err
}
f.Close()
return nil
}