mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-28 17:50:56 +01:00
118
backend/services/config_spider.go
Normal file
118
backend/services/config_spider.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"crawlab/constants"
|
||||
"crawlab/entity"
|
||||
"crawlab/model"
|
||||
"crawlab/model/config_spider"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func GenerateConfigSpiderFiles(spider model.Spider, configData entity.ConfigSpiderData) error {
|
||||
// 校验Spiderfile正确性
|
||||
if err := ValidateSpiderfile(configData); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 构造代码生成器
|
||||
generator := config_spider.ScrapyGenerator{
|
||||
Spider: spider,
|
||||
ConfigData: configData,
|
||||
}
|
||||
|
||||
// 生成代码
|
||||
if err := generator.Generate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 验证Spiderfile
|
||||
func ValidateSpiderfile(configData entity.ConfigSpiderData) error {
|
||||
// 获取所有字段
|
||||
fields := config_spider.GetAllFields(configData)
|
||||
|
||||
// 校验是否存在 start_url
|
||||
if configData.StartUrl == "" {
|
||||
return errors.New("spiderfile start_url is empty")
|
||||
}
|
||||
|
||||
// 校验是否存在 stages
|
||||
if len(configData.Stages) == 0 {
|
||||
return errors.New("spiderfile stages is empty")
|
||||
}
|
||||
|
||||
// 校验stages
|
||||
dict := map[string]int{}
|
||||
for stageName, stage := range configData.Stages {
|
||||
// stage 名称不能为空
|
||||
if stageName == "" {
|
||||
return errors.New("spiderfile stage name is empty")
|
||||
}
|
||||
|
||||
// stage 名称不能为保留字符串
|
||||
// NOTE: 如果有其他Engine,可以扩展,默认为Scrapy
|
||||
if configData.Engine == "" || configData.Engine == constants.EngineScrapy {
|
||||
if strings.Contains(constants.ScrapyProtectedStageNames, stageName) {
|
||||
return errors.New(fmt.Sprintf("spiderfile stage name '%s' is protected", stageName))
|
||||
}
|
||||
} else if configData.Engine == constants.EngineColly {
|
||||
return errors.New(fmt.Sprintf("engine '%s' is not implemented", stageName))
|
||||
}
|
||||
|
||||
// stage 名称不能重复
|
||||
if dict[stageName] == 1 {
|
||||
return errors.New("spiderfile stage name should be unique")
|
||||
}
|
||||
dict[stageName] = 1
|
||||
|
||||
// stage 字段不能为空
|
||||
if len(stage.Fields) == 0 {
|
||||
return errors.New(fmt.Sprintf("spiderfile stage '%s' has no fields", stageName))
|
||||
}
|
||||
|
||||
// stage 的下一个 stage 只能有一个
|
||||
hasNextStage := false
|
||||
for _, field := range stage.Fields {
|
||||
if field.NextStage != "" {
|
||||
if hasNextStage {
|
||||
return errors.New("spiderfile stage fields should have only 1 next_stage")
|
||||
}
|
||||
hasNextStage = true
|
||||
}
|
||||
}
|
||||
|
||||
// 如果 stage 的 is_list 为 true 但 list_css 为空,报错
|
||||
if stage.IsList && stage.ListCss == "" {
|
||||
return errors.New("spiderfile stage with is_list = true should have list_css being set")
|
||||
}
|
||||
}
|
||||
|
||||
// 校验字段唯一性
|
||||
if !IsUniqueConfigSpiderFields(fields) {
|
||||
return errors.New("spiderfile fields not unique")
|
||||
}
|
||||
|
||||
// 字段名称不能为保留字符串
|
||||
for _, field := range fields {
|
||||
if strings.Contains(constants.ScrapyProtectedFieldNames, field.Name) {
|
||||
return errors.New(fmt.Sprintf("spiderfile field name '%s' is protected", field.Name))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func IsUniqueConfigSpiderFields(fields []entity.Field) bool {
|
||||
dict := map[string]int{}
|
||||
for _, field := range fields {
|
||||
if dict[field.Name] == 1 {
|
||||
return false
|
||||
}
|
||||
dict[field.Name] = 1
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -258,7 +258,7 @@ func InitNodeService() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 如果为主节点,每30秒刷新所有节点信息
|
||||
// 如果为主节点,每10秒刷新所有节点信息
|
||||
if model.IsMaster() {
|
||||
spec := "*/10 * * * * *"
|
||||
if _, err := c.AddFunc(spec, UpdateNodeStatus); err != nil {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"net"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Register interface {
|
||||
@@ -97,25 +98,31 @@ func getMac() (string, error) {
|
||||
var register Register
|
||||
|
||||
// 获得注册器
|
||||
func GetRegister() Register {
|
||||
if register != nil {
|
||||
return register
|
||||
}
|
||||
var once sync.Once
|
||||
|
||||
registerType := viper.GetString("server.register.type")
|
||||
if registerType == "mac" {
|
||||
register = &MacRegister{}
|
||||
} else {
|
||||
ip := viper.GetString("server.register.ip")
|
||||
if ip == "" {
|
||||
log.Error("server.register.ip is empty")
|
||||
debug.PrintStack()
|
||||
return nil
|
||||
func GetRegister() Register {
|
||||
once.Do(func() {
|
||||
|
||||
if register != nil {
|
||||
register = register
|
||||
}
|
||||
register = &IpRegister{
|
||||
Ip: ip,
|
||||
|
||||
registerType := viper.GetString("server.register.type")
|
||||
if registerType == "mac" {
|
||||
register = &MacRegister{}
|
||||
} else {
|
||||
ip := viper.GetString("server.register.ip")
|
||||
if ip == "" {
|
||||
log.Error("server.register.ip is empty")
|
||||
debug.PrintStack()
|
||||
register = nil
|
||||
}
|
||||
register = &IpRegister{
|
||||
Ip: ip,
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Info("register type is :" + reflect.TypeOf(register).String())
|
||||
log.Info("register type is :" + reflect.TypeOf(register).String())
|
||||
|
||||
})
|
||||
return register
|
||||
}
|
||||
|
||||
@@ -116,12 +116,20 @@ func PublishAllSpiders() {
|
||||
|
||||
// 发布爬虫
|
||||
func PublishSpider(spider model.Spider) {
|
||||
// 查询gf file,不存在则删除
|
||||
// 查询gf file,不存在则标记为爬虫文件不存在
|
||||
gfFile := model.GetGridFs(spider.FileId)
|
||||
if gfFile == nil {
|
||||
_ = model.RemoveSpider(spider.Id)
|
||||
spider.FileId = constants.ObjectIdNull
|
||||
_ = spider.Save()
|
||||
return
|
||||
}
|
||||
|
||||
// 如果FileId为空,表示还没有上传爬虫到GridFS,则跳过
|
||||
if spider.FileId == bson.ObjectIdHex(constants.ObjectIdNull) {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取爬虫同步实例
|
||||
spiderSync := spider_handler.SpiderSync{
|
||||
Spider: spider,
|
||||
}
|
||||
|
||||
@@ -224,7 +224,16 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
|
||||
}
|
||||
|
||||
// 环境变量配置
|
||||
cmd = SetEnv(cmd, s.Envs, t.Id, s.Col)
|
||||
envs := s.Envs
|
||||
if s.Type == constants.Configurable {
|
||||
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_HOST", Value: viper.GetString("mongo.host")})
|
||||
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_PORT", Value: viper.GetString("mongo.port")})
|
||||
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_DB", Value: viper.GetString("mongo.db")})
|
||||
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_USERNAME", Value: viper.GetString("mongo.username")})
|
||||
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_PASSWORD", Value: viper.GetString("mongo.password")})
|
||||
envs = append(envs, model.Env{Name: "CRAWLAB_MONGO_AUTHSOURCE", Value: viper.GetString("mongo.authSource")})
|
||||
}
|
||||
cmd = SetEnv(cmd, envs, t.Id, s.Col)
|
||||
|
||||
// 起一个goroutine来监控进程
|
||||
ch := utils.TaskExecChanMap.ChanBlocked(t.Id)
|
||||
@@ -378,7 +387,14 @@ func ExecuteTask(id int) {
|
||||
)
|
||||
|
||||
// 执行命令
|
||||
cmd := spider.Cmd
|
||||
var cmd string
|
||||
if spider.Type == constants.Configurable {
|
||||
// 可配置爬虫命令
|
||||
cmd = "scrapy crawl config_spider"
|
||||
} else {
|
||||
// 自定义爬虫命令
|
||||
cmd = spider.Cmd
|
||||
}
|
||||
|
||||
// 加入参数
|
||||
if t.Param != "" {
|
||||
|
||||
Reference in New Issue
Block a user