Merge remote-tracking branch 'upstream/develop' into develop

This commit is contained in:
陈景阳
2019-12-05 07:15:07 +08:00
41 changed files with 2260 additions and 519 deletions

View File

@@ -2,11 +2,20 @@ package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/model/config_spider"
"crawlab/utils"
"errors"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"gopkg.in/yaml.v2"
"os"
"path/filepath"
"strings"
)
@@ -37,12 +46,17 @@ func ValidateSpiderfile(configData entity.ConfigSpiderData) error {
// 校验是否存在 start_url
if configData.StartUrl == "" {
return errors.New("spiderfile start_url is empty")
return errors.New("spiderfile invalid: start_url is empty")
}
// 校验是否存在 start_stage
if configData.StartStage == "" {
return errors.New("spiderfile invalid: start_stage is empty")
}
// 校验是否存在 stages
if len(configData.Stages) == 0 {
return errors.New("spiderfile stages is empty")
return errors.New("spiderfile invalid: stages is empty")
}
// 校验stages
@@ -50,56 +64,74 @@ func ValidateSpiderfile(configData entity.ConfigSpiderData) error {
for stageName, stage := range configData.Stages {
// stage 名称不能为空
if stageName == "" {
return errors.New("spiderfile stage name is empty")
return errors.New("spiderfile invalid: stage name is empty")
}
// stage 名称不能为保留字符串
// NOTE: 如果有其他Engine可以扩展默认为Scrapy
if configData.Engine == "" || configData.Engine == constants.EngineScrapy {
if strings.Contains(constants.ScrapyProtectedStageNames, stageName) {
return errors.New(fmt.Sprintf("spiderfile stage name '%s' is protected", stageName))
return errors.New(fmt.Sprintf("spiderfile invalid: stage name '%s' is protected", stageName))
}
} else if configData.Engine == constants.EngineColly {
return errors.New(fmt.Sprintf("engine '%s' is not implemented", stageName))
} else {
return errors.New(fmt.Sprintf("spiderfile invalid: engine '%s' is not implemented", configData.Engine))
}
// stage 名称不能重复
if dict[stageName] == 1 {
return errors.New("spiderfile stage name should be unique")
return errors.New(fmt.Sprintf("spiderfile invalid: stage name '%s' is duplicated", stageName))
}
dict[stageName] = 1
// stage 字段不能为空
if len(stage.Fields) == 0 {
return errors.New(fmt.Sprintf("spiderfile stage '%s' has no fields", stageName))
return errors.New(fmt.Sprintf("spiderfile invalid: stage '%s' has no fields", stageName))
}
// stage 的下一个 stage 只能有一个
// 是否包含 next_stage
hasNextStage := false
// 遍历字段列表
for _, field := range stage.Fields {
// stage 的 next stage 只能有一个
if field.NextStage != "" {
if hasNextStage {
return errors.New("spiderfile stage fields should have only 1 next_stage")
return errors.New(fmt.Sprintf("spiderfile invalid: stage '%s' has more than 1 next_stage", stageName))
}
hasNextStage = true
}
// 字段里 css 和 xpath 只能包含一个
if field.Css != "" && field.Xpath != "" {
return errors.New(fmt.Sprintf("spiderfile invalid: field '%s' in stage '%s' has both css and xpath set which is prohibited", field.Name, stageName))
}
}
// stage 里 page_css 和 page_xpath 只能包含一个
if stage.PageCss != "" && stage.PageXpath != "" {
return errors.New(fmt.Sprintf("spiderfile invalid: stage '%s' has both page_css and page_xpath set which is prohibited", stageName))
}
// stage 里 list_css 和 list_xpath 只能包含一个
if stage.ListCss != "" && stage.ListXpath != "" {
return errors.New(fmt.Sprintf("spiderfile invalid: stage '%s' has both list_css and list_xpath set which is prohibited", stageName))
}
// 如果 stage 的 is_list 为 true 但 list_css 为空,报错
if stage.IsList && stage.ListCss == "" {
return errors.New("spiderfile stage with is_list = true should have list_css being set")
if stage.IsList && (stage.ListCss == "" && stage.ListXpath == "") {
return errors.New("spiderfile invalid: stage with is_list = true should have either list_css or list_xpath being set")
}
}
// 校验字段唯一性
if !IsUniqueConfigSpiderFields(fields) {
return errors.New("spiderfile fields not unique")
return errors.New("spiderfile invalid: fields not unique")
}
// 字段名称不能为保留字符串
for _, field := range fields {
if strings.Contains(constants.ScrapyProtectedFieldNames, field.Name) {
return errors.New(fmt.Sprintf("spiderfile field name '%s' is protected", field.Name))
return errors.New(fmt.Sprintf("spiderfile invalid: field name '%s' is protected", field.Name))
}
}
@@ -116,3 +148,118 @@ func IsUniqueConfigSpiderFields(fields []entity.Field) bool {
}
return true
}
func ProcessSpiderFilesFromConfigData(spider model.Spider, configData entity.ConfigSpiderData) error {
spiderDir := spider.Src
// 赋值 stage_name
for stageName, stage := range configData.Stages {
stage.Name = stageName
configData.Stages[stageName] = stage
}
// 删除已有的爬虫文件
for _, fInfo := range utils.ListDir(spiderDir) {
// 不删除Spiderfile
if fInfo.Name() == "Spiderfile" {
continue
}
// 删除其他文件
if err := os.RemoveAll(filepath.Join(spiderDir, fInfo.Name())); err != nil {
return err
}
}
// 拷贝爬虫文件
tplDir := "./template/scrapy"
for _, fInfo := range utils.ListDir(tplDir) {
// 跳过Spiderfile
if fInfo.Name() == "Spiderfile" {
continue
}
srcPath := filepath.Join(tplDir, fInfo.Name())
if fInfo.IsDir() {
dirPath := filepath.Join(spiderDir, fInfo.Name())
if err := utils.CopyDir(srcPath, dirPath); err != nil {
return err
}
} else {
if err := utils.CopyFile(srcPath, filepath.Join(spiderDir, fInfo.Name())); err != nil {
return err
}
}
}
// 更改爬虫文件
if err := GenerateConfigSpiderFiles(spider, configData); err != nil {
return err
}
// 打包为 zip 文件
files, err := utils.GetFilesFromDir(spiderDir)
if err != nil {
return err
}
randomId := uuid.NewV4()
tmpFilePath := filepath.Join(viper.GetString("other.tmppath"), spider.Name+"."+randomId.String()+".zip")
spiderZipFileName := spider.Name + ".zip"
if err := utils.Compress(files, tmpFilePath); err != nil {
return err
}
// 获取 GridFS 实例
s, gf := database.GetGridFs("files")
defer s.Close()
// 判断文件是否已经存在
var gfFile model.GridFs
if err := gf.Find(bson.M{"filename": spiderZipFileName}).One(&gfFile); err == nil {
// 已经存在文件,则删除
_ = gf.RemoveId(gfFile.Id)
}
// 上传到GridFs
fid, err := UploadToGridFs(spiderZipFileName, tmpFilePath)
if err != nil {
log.Errorf("upload to grid fs error: %s", err.Error())
return err
}
// 保存爬虫 FileId
spider.FileId = fid
_ = spider.Save()
return nil
}
func GenerateSpiderfileFromConfigData(spider model.Spider, configData entity.ConfigSpiderData) error {
// Spiderfile 路径
sfPath := filepath.Join(spider.Src, "Spiderfile")
// 生成Yaml内容
sfContentByte, err := yaml.Marshal(configData)
if err != nil {
return err
}
// 打开文件
var f *os.File
if utils.Exists(sfPath) {
f, err = os.OpenFile(sfPath, os.O_WRONLY|os.O_TRUNC, 0777)
} else {
f, err = os.OpenFile(sfPath, os.O_CREATE, 0777)
}
if err != nil {
return err
}
defer f.Close()
// 写入内容
if _, err := f.Write(sfContentByte); err != nil {
return err
}
return nil
}

View File

@@ -116,12 +116,15 @@ func PublishAllSpiders() {
// 发布爬虫
func PublishSpider(spider model.Spider) {
// 查询gf file不存在则标记为爬虫文件不存在
gfFile := model.GetGridFs(spider.FileId)
if gfFile == nil {
spider.FileId = constants.ObjectIdNull
_ = spider.Save()
return
var gfFile *model.GridFs
if spider.FileId.Hex() != constants.ObjectIdNull {
// 查询gf file不存在则标记为爬虫文件不存在
gfFile = model.GetGridFs(spider.FileId)
if gfFile == nil {
spider.FileId = constants.ObjectIdNull
_ = spider.Save()
return
}
}
// 如果FileId为空表示还没有上传爬虫到GridFS则跳过

View File

@@ -10,6 +10,7 @@ import (
"github.com/spf13/viper"
"io"
"os"
"os/exec"
"path/filepath"
"runtime/debug"
)
@@ -99,7 +100,6 @@ func (s *SpiderSync) Download() {
// 创建临时文件
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
tmpFile := utils.OpenFile(tmpFilePath)
defer utils.Close(tmpFile)
// 将该文件写入临时文件
if _, err := io.Copy(tmpFile, f); err != nil {
@@ -119,6 +119,15 @@ func (s *SpiderSync) Download() {
return
}
//递归修改目标文件夹权限
// 解决scrapy.setting中开启LOG_ENABLED 和 LOG_FILE时不能创建log文件的问题
cmd := exec.Command("chmod", "-R", "777", dstPath)
if err := cmd.Run(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
// 关闭临时文件
if err := tmpFile.Close(); err != nil {
log.Errorf(err.Error())

View File

@@ -226,12 +226,18 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
// 环境变量配置
envs := s.Envs
if s.Type == constants.Configurable {
// 数据库配置
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_HOST", Value: viper.GetString("mongo.host")})
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_PORT", Value: viper.GetString("mongo.port")})
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_DB", Value: viper.GetString("mongo.db")})
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_USERNAME", Value: viper.GetString("mongo.username")})
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_PASSWORD", Value: viper.GetString("mongo.password")})
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_AUTHSOURCE", Value: viper.GetString("mongo.authSource")})
// 设置配置
for envName, envValue := range s.Config.Settings {
envs = append(envs, model.Env{Name: "CRAWLAB_SETTING_" + envName, Value: envValue})
}
}
cmd = SetEnv(cmd, envs, t.Id, s.Col)
@@ -311,9 +317,12 @@ func SaveTaskResultCount(id string) func() {
// 执行任务
func ExecuteTask(id int) {
if flag, _ := LockList.Load(id); flag.(bool) {
log.Debugf(GetWorkerPrefix(id) + "正在执行任务...")
return
if flag, ok := LockList.Load(id); ok {
if flag.(bool) {
log.Debugf(GetWorkerPrefix(id) + "正在执行任务...")
return
}
}
// 上锁
@@ -477,6 +486,29 @@ func GetTaskLog(id string) (logStr string, err error) {
}
if IsMasterNode(task.NodeId.Hex()) {
if !utils.Exists(task.LogPath) {
fileDir, err := MakeLogDir(task)
if err != nil {
log.Errorf(err.Error())
}
fileP := GetLogFilePaths(fileDir)
// 获取日志文件路径
fLog, err := os.Create(fileP)
defer fLog.Close()
if err != nil {
log.Errorf("create task log file error: %s", fileP)
debug.PrintStack()
}
task.LogPath = fileP
if err := task.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}
// 若为主节点,获取本机日志
logBytes, err := model.GetLocalLog(task.LogPath)
if err != nil {