code cleanup

This commit is contained in:
marvzhang
2020-12-04 15:33:15 +08:00
parent c83b331101
commit 00cf719ecc
287 changed files with 63 additions and 39029 deletions

View File

@@ -1,20 +0,0 @@
package services
import (
"crawlab/constants"
"github.com/gin-gonic/gin"
"github.com/globalsign/mgo/bson"
)
func GetAuthQuery(query bson.M, c *gin.Context) bson.M {
user := GetCurrentUser(c)
if user.Role == constants.RoleAdmin {
// 获得所有数据
return query
} else {
// 只获取自己的数据
query["user_id"] = user.Id
return query
}
}

View File

@@ -1,138 +0,0 @@
package challenge
import (
"crawlab/constants"
"crawlab/model"
"encoding/json"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"io/ioutil"
"path"
"runtime/debug"
)
type Service interface {
Check() (bool, error)
}
func GetService(name string, uid bson.ObjectId) Service {
switch name {
case constants.ChallengeLogin7d:
return &Login7dService{UserId: uid}
case constants.ChallengeLogin30d:
return &Login30dService{UserId: uid}
case constants.ChallengeLogin90d:
return &Login90dService{UserId: uid}
case constants.ChallengeLogin180d:
return &Login180dService{UserId: uid}
case constants.ChallengeCreateCustomizedSpider:
return &CreateCustomizedSpiderService{UserId: uid}
case constants.ChallengeCreateConfigurableSpider:
return &CreateConfigurableSpiderService{UserId: uid}
case constants.ChallengeCreateSchedule:
return &CreateScheduleService{UserId: uid}
case constants.ChallengeCreateNodes:
return &CreateNodesService{UserId: uid}
case constants.ChallengeRunRandom:
return &RunRandomService{UserId: uid}
case constants.ChallengeScrape1k:
return &Scrape1kService{UserId: uid}
case constants.ChallengeScrape10k:
return &Scrape10kService{UserId: uid}
case constants.ChallengeScrape100k:
return &Scrape100kService{UserId: uid}
case constants.ChallengeInstallDep:
return &InstallDepService{UserId: uid}
case constants.ChallengeInstallLang:
return &InstallLangService{UserId: uid}
case constants.ChallengeViewDisclaimer:
return &ViewDisclaimerService{UserId: uid}
case constants.ChallengeCreateUser:
return &CreateUserService{UserId: uid}
}
return nil
}
func AddChallengeAchievement(name string, uid bson.ObjectId) error {
ch, err := model.GetChallengeByName(name)
if err != nil {
return err
}
ca := model.ChallengeAchievement{
ChallengeId: ch.Id,
UserId: uid,
}
if err := ca.Add(); err != nil {
return err
}
return nil
}
func CheckChallengeAndUpdate(ch model.Challenge, uid bson.ObjectId) error {
svc := GetService(ch.Name, uid)
achieved, err := svc.Check()
if err != nil {
return err
}
if achieved && !ch.Achieved {
if err := AddChallengeAchievement(ch.Name, uid); err != nil {
return err
}
}
return nil
}
func CheckChallengeAndUpdateAll(uid bson.ObjectId) error {
challenges, err := model.GetChallengeListWithAchieved(nil, 0, constants.Infinite, "-_id", uid)
if err != nil {
return err
}
for _, ch := range challenges {
if err := CheckChallengeAndUpdate(ch, uid); err != nil {
continue
}
}
return nil
}
func InitChallengeService() error {
// 读取文件
contentBytes, err := ioutil.ReadFile(path.Join("data", "challenge_data.json"))
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 反序列化
var challenges []model.Challenge
if err := json.Unmarshal(contentBytes, &challenges); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
for _, ch := range challenges {
chDb, err := model.GetChallengeByName(ch.Name)
if err != nil {
continue
}
if chDb.Name == "" {
if err := ch.Add(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
} else {
ch.Id = chDb.Id
ch.CreateTs = chDb.CreateTs
if err := ch.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
}
}
return nil
}

View File

@@ -1,23 +0,0 @@
package challenge
import (
"crawlab/constants"
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type CreateConfigurableSpiderService struct {
UserId bson.ObjectId
}
func (s *CreateConfigurableSpiderService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
"type": constants.Configurable,
}
_, count, err := model.GetSpiderList(query, 0, 1, "-_id")
if err != nil {
return false, err
}
return count > 0, nil
}

View File

@@ -1,23 +0,0 @@
package challenge
import (
"crawlab/constants"
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type CreateCustomizedSpiderService struct {
UserId bson.ObjectId
}
func (s *CreateCustomizedSpiderService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
"type": constants.Customized,
}
_, count, err := model.GetSpiderList(query, 0, 1, "-_id")
if err != nil {
return false, err
}
return count > 0, nil
}

View File

@@ -1,22 +0,0 @@
package challenge
import (
"crawlab/constants"
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type CreateNodesService struct {
UserId bson.ObjectId
}
func (s *CreateNodesService) Check() (bool, error) {
query := bson.M{
"status": constants.StatusOnline,
}
list, err := model.GetScheduleList(query)
if err != nil {
return false, err
}
return len(list) >= 3, nil
}

View File

@@ -1,21 +0,0 @@
package challenge
import (
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type CreateScheduleService struct {
UserId bson.ObjectId
}
func (s *CreateScheduleService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
}
list, err := model.GetScheduleList(query)
if err != nil {
return false, err
}
return len(list) > 0, nil
}

View File

@@ -1,21 +0,0 @@
package challenge
import (
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type CreateUserService struct {
UserId bson.ObjectId
}
func (s *CreateUserService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
}
list, err := model.GetUserList(query, 0, 1, "-_id")
if err != nil {
return false, err
}
return len(list) > 0, nil
}

View File

@@ -1,23 +0,0 @@
package challenge
import (
"crawlab/constants"
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type InstallDepService struct {
UserId bson.ObjectId
}
func (s *InstallDepService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
"type": constants.ActionTypeInstallDep,
}
list, err := model.GetActionList(query, 0, 1, "-_id")
if err != nil {
return false, err
}
return len(list) > 0, nil
}

View File

@@ -1,23 +0,0 @@
package challenge
import (
"crawlab/constants"
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type InstallLangService struct {
UserId bson.ObjectId
}
func (s *InstallLangService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
"type": constants.ActionTypeInstallLang,
}
list, err := model.GetActionList(query, 0, 1, "-_id")
if err != nil {
return false, err
}
return len(list) > 0, nil
}

View File

@@ -1,18 +0,0 @@
package challenge
import (
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type Login180dService struct {
UserId bson.ObjectId
}
func (s *Login180dService) Check() (bool, error) {
days, err := model.GetVisitDays(s.UserId)
if err != nil {
return false, err
}
return days >= 180, nil
}

View File

@@ -1,18 +0,0 @@
package challenge
import (
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type Login30dService struct {
UserId bson.ObjectId
}
func (s *Login30dService) Check() (bool, error) {
days, err := model.GetVisitDays(s.UserId)
if err != nil {
return false, err
}
return days >= 30, nil
}

View File

@@ -1,18 +0,0 @@
package challenge
import (
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type Login7dService struct {
UserId bson.ObjectId
}
func (s *Login7dService) Check() (bool, error) {
days, err := model.GetVisitDays(s.UserId)
if err != nil {
return false, err
}
return days >= 7, nil
}

View File

@@ -1,18 +0,0 @@
package challenge
import (
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type Login90dService struct {
UserId bson.ObjectId
}
func (s *Login90dService) Check() (bool, error) {
days, err := model.GetVisitDays(s.UserId)
if err != nil {
return false, err
}
return days >= 90, nil
}

View File

@@ -1,25 +0,0 @@
package challenge
import (
"crawlab/constants"
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type RunRandomService struct {
UserId bson.ObjectId
}
func (s *RunRandomService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
"run_type": constants.RunTypeRandom,
"status": constants.StatusFinished,
"schedule_id": bson.ObjectIdHex(constants.ObjectIdNull),
}
list, err := model.GetTaskList(query, 0, 1, "-_id")
if err != nil {
return false, err
}
return len(list) > 0, nil
}

View File

@@ -1,24 +0,0 @@
package challenge
import (
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type Scrape100kService struct {
UserId bson.ObjectId
}
func (s *Scrape100kService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
"result_count": bson.M{
"$gte": 100000,
},
}
list, err := model.GetTaskList(query, 0, 1, "-_id")
if err != nil {
return false, err
}
return len(list) > 0, nil
}

View File

@@ -1,24 +0,0 @@
package challenge
import (
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type Scrape10kService struct {
UserId bson.ObjectId
}
func (s *Scrape10kService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
"result_count": bson.M{
"$gte": 10000,
},
}
list, err := model.GetTaskList(query, 0, 1, "-_id")
if err != nil {
return false, err
}
return len(list) > 0, nil
}

View File

@@ -1,24 +0,0 @@
package challenge
import (
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type Scrape1kService struct {
UserId bson.ObjectId
}
func (s *Scrape1kService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
"result_count": bson.M{
"$gte": 1000,
},
}
list, err := model.GetTaskList(query, 0, 1, "-_id")
if err != nil {
return false, err
}
return len(list) > 0, nil
}

View File

@@ -1,23 +0,0 @@
package challenge
import (
"crawlab/constants"
"crawlab/model"
"github.com/globalsign/mgo/bson"
)
type ViewDisclaimerService struct {
UserId bson.ObjectId
}
func (s *ViewDisclaimerService) Check() (bool, error) {
query := bson.M{
"user_id": s.UserId,
"type": constants.ActionTypeViewDisclaimer,
}
list, err := model.GetActionList(query, 0, 1, "-_id")
if err != nil {
return false, err
}
return len(list) > 0, nil
}

View File

@@ -1,122 +0,0 @@
package services
import (
"crawlab/constants"
"crawlab/model"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"runtime/debug"
)
func InitTaskCleanUserIds() {
adminUser, err := GetAdminUser()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
tasks, err := model.GetTaskList(nil, 0, constants.Infinite, "+_id")
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
for _, t := range tasks {
if !t.ScheduleId.Valid() {
t.ScheduleId = bson.ObjectIdHex(constants.ObjectIdNull)
if err := t.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
}
if !t.UserId.Valid() {
t.UserId = adminUser.Id
if err := t.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
}
}
}
func InitProjectCleanUserIds() {
adminUser, err := GetAdminUser()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
projects, err := model.GetProjectList(nil, "+_id")
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
for _, p := range projects {
if !p.UserId.Valid() {
p.UserId = adminUser.Id
if err := p.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
}
}
}
func InitSpiderCleanUserIds() {
adminUser, err := GetAdminUser()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
spiders, _ := model.GetSpiderAllList(nil)
for _, s := range spiders {
if !s.UserId.Valid() {
s.UserId = adminUser.Id
if err := s.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
}
}
}
func InitScheduleCleanUserIds() {
adminUser, err := GetAdminUser()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
schedules, _ := model.GetScheduleList(nil)
for _, s := range schedules {
if !s.UserId.Valid() {
s.UserId = adminUser.Id
if err := s.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
}
}
}
func InitCleanService() error {
if model.IsMaster() {
// 清理任务UserIds
InitTaskCleanUserIds()
// 清理项目UserIds
InitProjectCleanUserIds()
// 清理爬虫UserIds
InitSpiderCleanUserIds()
// 清理定时任务UserIds
InitScheduleCleanUserIds()
}
return nil
}

View File

@@ -1,278 +0,0 @@
package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/model/config_spider"
"crawlab/services/spider_handler"
"crawlab/utils"
"errors"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"gopkg.in/yaml.v2"
"os"
"path/filepath"
"runtime/debug"
"strings"
)
func GenerateConfigSpiderFiles(spider model.Spider, configData entity.ConfigSpiderData) error {
// 校验Spiderfile正确性
if err := ValidateSpiderfile(configData); err != nil {
return err
}
// 构造代码生成器
generator := config_spider.ScrapyGenerator{
Spider: spider,
ConfigData: configData,
}
// 生成代码
if err := generator.Generate(); err != nil {
return err
}
return nil
}
// 验证Spiderfile
func ValidateSpiderfile(configData entity.ConfigSpiderData) error {
// 获取所有字段
fields := config_spider.GetAllFields(configData)
// 校验是否存在 start_url
if configData.StartUrl == "" {
return errors.New("spiderfile invalid: start_url is empty")
}
// 校验是否存在 start_stage
if configData.StartStage == "" {
return errors.New("spiderfile invalid: start_stage is empty")
}
// 校验是否存在 stages
if len(configData.Stages) == 0 {
return errors.New("spiderfile invalid: stages is empty")
}
// 校验stages
dict := map[string]int{}
for _, stage := range configData.Stages {
stageName := stage.Name
// stage 名称不能为空
if stageName == "" {
return errors.New("spiderfile invalid: stage name is empty")
}
// stage 名称不能为保留字符串
// NOTE: 如果有其他Engine可以扩展默认为Scrapy
if configData.Engine == "" || configData.Engine == constants.EngineScrapy {
if strings.Contains(constants.ScrapyProtectedStageNames, stageName) {
return errors.New(fmt.Sprintf("spiderfile invalid: stage name '%s' is protected", stageName))
}
} else {
return errors.New(fmt.Sprintf("spiderfile invalid: engine '%s' is not implemented", configData.Engine))
}
// stage 名称不能重复
if dict[stageName] == 1 {
return errors.New(fmt.Sprintf("spiderfile invalid: stage name '%s' is duplicated", stageName))
}
dict[stageName] = 1
// stage 字段不能为空
if len(stage.Fields) == 0 {
return errors.New(fmt.Sprintf("spiderfile invalid: stage '%s' has no fields", stageName))
}
// 是否包含 next_stage
hasNextStage := false
// 遍历字段列表
for _, field := range stage.Fields {
// stage 的 next stage 只能有一个
if field.NextStage != "" {
if hasNextStage {
return errors.New(fmt.Sprintf("spiderfile invalid: stage '%s' has more than 1 next_stage", stageName))
}
hasNextStage = true
}
// 字段里 css 和 xpath 只能包含一个
if field.Css != "" && field.Xpath != "" {
return errors.New(fmt.Sprintf("spiderfile invalid: field '%s' in stage '%s' has both css and xpath set which is prohibited", field.Name, stageName))
}
}
// stage 里 page_css 和 page_xpath 只能包含一个
if stage.PageCss != "" && stage.PageXpath != "" {
return errors.New(fmt.Sprintf("spiderfile invalid: stage '%s' has both page_css and page_xpath set which is prohibited", stageName))
}
// stage 里 list_css 和 list_xpath 只能包含一个
if stage.ListCss != "" && stage.ListXpath != "" {
return errors.New(fmt.Sprintf("spiderfile invalid: stage '%s' has both list_css and list_xpath set which is prohibited", stageName))
}
// 如果 stage 的 is_list 为 true 但 list_css 为空,报错
if stage.IsList && (stage.ListCss == "" && stage.ListXpath == "") {
return errors.New("spiderfile invalid: stage with is_list = true should have either list_css or list_xpath being set")
}
}
// 校验字段唯一性
if !IsUniqueConfigSpiderFields(fields) {
return errors.New("spiderfile invalid: fields not unique")
}
// 字段名称不能为保留字符串
for _, field := range fields {
if strings.Contains(constants.ScrapyProtectedFieldNames, field.Name) {
return errors.New(fmt.Sprintf("spiderfile invalid: field name '%s' is protected", field.Name))
}
}
return nil
}
func IsUniqueConfigSpiderFields(fields []entity.Field) bool {
dict := map[string]int{}
for _, field := range fields {
if dict[field.Name] == 1 {
return false
}
dict[field.Name] = 1
}
return true
}
func ProcessSpiderFilesFromConfigData(spider model.Spider, configData entity.ConfigSpiderData) error {
spiderDir := spider.Src
// 删除已有的爬虫文件
for _, fInfo := range utils.ListDir(spiderDir) {
// 不删除Spiderfile
if fInfo.Name() == "Spiderfile" {
continue
}
// 删除其他文件
if err := os.RemoveAll(filepath.Join(spiderDir, fInfo.Name())); err != nil {
return err
}
}
// 拷贝爬虫文件
tplDir := "./template/scrapy"
for _, fInfo := range utils.ListDir(tplDir) {
// 跳过Spiderfile
if fInfo.Name() == "Spiderfile" {
continue
}
srcPath := filepath.Join(tplDir, fInfo.Name())
if fInfo.IsDir() {
dirPath := filepath.Join(spiderDir, fInfo.Name())
if err := utils.CopyDir(srcPath, dirPath); err != nil {
return err
}
} else {
if err := utils.CopyFile(srcPath, filepath.Join(spiderDir, fInfo.Name())); err != nil {
return err
}
}
}
// 更改爬虫文件
if err := GenerateConfigSpiderFiles(spider, configData); err != nil {
return err
}
// 打包为 zip 文件
files, err := utils.GetFilesFromDir(spiderDir)
if err != nil {
return err
}
randomId := uuid.NewV4()
tmpFilePath := filepath.Join(viper.GetString("other.tmppath"), spider.Name+"."+randomId.String()+".zip")
spiderZipFileName := spider.Name + ".zip"
if err := utils.Compress(files, tmpFilePath); err != nil {
return err
}
// 获取 GridFS 实例
s, gf := database.GetGridFs("files")
defer s.Close()
// 判断文件是否已经存在
var gfFile model.GridFs
if err := gf.Find(bson.M{"filename": spiderZipFileName}).One(&gfFile); err == nil {
// 已经存在文件,则删除
if err := gf.RemoveId(gfFile.Id); err != nil {
log.Errorf("remove grid fs error: %s", err.Error())
debug.PrintStack()
return err
}
}
// 上传到GridFs
fid, err := RetryUploadToGridFs(spiderZipFileName, tmpFilePath)
if err != nil {
log.Errorf("upload to grid fs error: %s", err.Error())
return err
}
// 保存爬虫 FileId
spider.FileId = fid
_ = spider.Save()
// 获取爬虫同步实例
spiderSync := spider_handler.SpiderSync{
Spider: spider,
}
// 获取gfFile
gfFile2 := model.GetGridFs(spider.FileId)
// 生成MD5
spiderSync.CreateMd5File(gfFile2.Md5)
return nil
}
func GenerateSpiderfileFromConfigData(spider model.Spider, configData entity.ConfigSpiderData) error {
// Spiderfile 路径
sfPath := filepath.Join(spider.Src, "Spiderfile")
// 生成Yaml内容
sfContentByte, err := yaml.Marshal(configData)
if err != nil {
return err
}
// 打开文件
var f *os.File
if utils.Exists(sfPath) {
f, err = os.OpenFile(sfPath, os.O_WRONLY|os.O_TRUNC, 0777)
} else {
f, err = os.OpenFile(sfPath, os.O_CREATE, 0777)
}
if err != nil {
return err
}
defer f.Close()
// 写入内容
if _, err := f.Write(sfContentByte); err != nil {
return err
}
return nil
}

View File

@@ -1,99 +0,0 @@
package context
import (
"crawlab/constants"
"crawlab/errors"
"crawlab/model"
"fmt"
"github.com/apex/log"
"github.com/gin-gonic/gin"
"github.com/go-playground/validator/v10"
errors2 "github.com/pkg/errors"
"net/http"
"runtime/debug"
)
type Context struct {
*gin.Context
}
func (c *Context) User() *model.User {
userIfe, exists := c.Get(constants.ContextUser)
if !exists {
return nil
}
user, ok := userIfe.(*model.User)
if !ok {
return nil
}
return user
}
func (c *Context) Success(data interface{}, metas ...interface{}) {
var meta interface{}
if len(metas) == 0 {
meta = gin.H{}
} else {
meta = metas[0]
}
if data == nil {
data = gin.H{}
}
c.JSON(http.StatusOK, gin.H{
"status": "ok",
"message": "success",
"data": data,
"meta": meta,
"error": "",
})
}
func (c *Context) Failed(err error, variables ...interface{}) {
c.failed(err, http.StatusOK, variables...)
}
func (c *Context) failed(err error, httpCode int, variables ...interface{}) {
errStr := err.Error()
if len(variables) > 0 {
errStr = fmt.Sprintf(errStr, variables...)
}
log.Errorf("handle error:" + errStr)
debug.PrintStack()
causeError := errors2.Cause(err)
switch causeError.(type) {
case errors.OPError:
opError := causeError.(errors.OPError)
c.AbortWithStatusJSON(opError.HttpCode, gin.H{
"status": "ok",
"message": "error",
"error": errStr,
})
case validator.ValidationErrors:
validatorErrors := causeError.(validator.ValidationErrors)
//firstError := validatorErrors[0].(validator.FieldError)
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"status": "ok",
"message": "error",
"error": validatorErrors.Error(),
})
default:
fmt.Println("deprecated....")
c.AbortWithStatusJSON(httpCode, gin.H{
"status": "ok",
"message": "error",
"error": errStr,
})
}
}
func (c *Context) FailedWithError(err error, httpCode ...int) {
var code = 200
if len(httpCode) > 0 {
code = httpCode[0]
}
c.failed(err, code)
}
func WithGinContext(context *gin.Context) *Context {
return &Context{Context: context}
}

View File

@@ -1,27 +0,0 @@
package services
import (
"github.com/apex/log"
"github.com/imroc/req"
"runtime/debug"
)
func GetDocs() (data string, err error) {
// 获取远端数据
res, err := req.Get("https://docs.crawlab.cn/search_plus_index.json")
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return data, err
}
// 反序列化
data, err = res.ToString()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return data, err
}
return data, nil
}

View File

@@ -1,65 +0,0 @@
package services
import (
"crawlab/model"
"github.com/apex/log"
"os"
"path"
"runtime/debug"
"strings"
)
func GetFileNodeTree(dstPath string, level int) (f model.File, err error) {
return getFileNodeTree(dstPath, level, dstPath)
}
func getFileNodeTree(dstPath string, level int, rootPath string) (f model.File, err error) {
dstF, err := os.Open(dstPath)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return f, err
}
defer dstF.Close()
fileInfo, err := dstF.Stat()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return f, nil
}
if !fileInfo.IsDir() { //如果dstF是文件
return model.File{
Label: fileInfo.Name(),
Name: fileInfo.Name(),
Path: strings.Replace(dstPath, rootPath, "", -1),
IsDir: false,
Size: fileInfo.Size(),
Children: nil,
}, nil
} else { //如果dstF是文件夹
dir, err := dstF.Readdir(0) //获取文件夹下各个文件或文件夹的fileInfo
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return f, nil
}
f = model.File{
Label: path.Base(dstPath),
Name: path.Base(dstPath),
Path: strings.Replace(dstPath, rootPath, "", -1),
IsDir: true,
Size: 0,
Children: nil,
}
for _, subFileInfo := range dir {
subFileNode, err := getFileNodeTree(path.Join(dstPath, subFileInfo.Name()), level+1, rootPath)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return f, err
}
f.Children = append(f.Children, subFileNode)
}
return f, nil
}
}

View File

@@ -1,603 +0,0 @@
package services
import (
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/spider_handler"
"crawlab/utils"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"gopkg.in/src-d/go-git.v4"
"gopkg.in/src-d/go-git.v4/config"
"gopkg.in/src-d/go-git.v4/plumbing"
"gopkg.in/src-d/go-git.v4/plumbing/object"
"gopkg.in/src-d/go-git.v4/plumbing/transport/ssh"
"gopkg.in/src-d/go-git.v4/storage/memory"
"io/ioutil"
"net/url"
"os"
"path"
"regexp"
"runtime/debug"
"strings"
"time"
)
var GitCron *GitCronScheduler
type GitCronScheduler struct {
cron *cron.Cron
}
type GitBranch struct {
Hash string `json:"hash"`
Name string `json:"name"`
Label string `json:"label"`
}
type GitTag struct {
Hash string `json:"hash"`
Name string `json:"name"`
Label string `json:"label"`
}
type GitCommit struct {
Hash string `json:"hash"`
TreeHash string `json:"tree_hash"`
Author string `json:"author"`
Email string `json:"email"`
Message string `json:"message"`
IsHead bool `json:"is_head"`
Ts time.Time `json:"ts"`
Branches []GitBranch `json:"branches"`
RemoteBranches []GitBranch `json:"remote_branches"`
Tags []GitTag `json:"tags"`
}
func (g *GitCronScheduler) Start() error {
c := cron.New(cron.WithSeconds())
// 启动cron服务
g.cron.Start()
// 更新任务列表
if err := g.Update(); err != nil {
log.Errorf("update scheduler error: %s", err.Error())
debug.PrintStack()
return err
}
// 每30秒更新一次任务列表
spec := "*/30 * * * * *"
if _, err := c.AddFunc(spec, UpdateGitCron); err != nil {
log.Errorf("add func update schedulers error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}
func (g *GitCronScheduler) RemoveAll() {
entries := g.cron.Entries()
for i := 0; i < len(entries); i++ {
g.cron.Remove(entries[i].ID)
}
}
func (g *GitCronScheduler) Update() error {
// 删除所有定时任务
g.RemoveAll()
// 获取开启 Git 自动同步的爬虫
spiders, err := model.GetSpiderAllList(bson.M{"git_auto_sync": true})
if err != nil {
log.Errorf("get spider list error: %s", err.Error())
debug.PrintStack()
return err
}
// 遍历任务列表
for _, s := range spiders {
// 添加到定时任务
if err := g.AddJob(s); err != nil {
log.Errorf("add job error: %s, job: %s, cron: %s", err.Error(), s.Name, s.GitSyncFrequency)
debug.PrintStack()
return err
}
}
return nil
}
func (g *GitCronScheduler) AddJob(s model.Spider) error {
spec := s.GitSyncFrequency
// 添加定时任务
_, err := g.cron.AddFunc(spec, AddGitCronJob(s))
if err != nil {
log.Errorf("add func task error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}
// 保存爬虫Git同步错误
func SaveSpiderGitSyncError(s model.Spider, errMsg string) {
s, _ = model.GetSpider(s.Id)
s.GitSyncError = errMsg
if err := s.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
}
// 获得Git分支
func GetGitRemoteBranchesPlain(gitUrl string, username string, password string) (branches []string, err error) {
storage := memory.NewStorage()
u, _ := url.Parse(gitUrl)
var listOptions git.ListOptions
if strings.HasPrefix(gitUrl, "http") {
gitUrl = fmt.Sprintf(
"%s://%s:%s@%s%s",
u.Scheme,
username,
password,
u.Hostname(),
u.Path,
)
} else {
auth, err := ssh.NewPublicKeysFromFile(username, path.Join(os.Getenv("HOME"), ".ssh", "id_rsa"), "")
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return branches, err
}
listOptions = git.ListOptions{
Auth: auth,
}
}
remote := git.NewRemote(storage, &config.RemoteConfig{
URLs: []string{
gitUrl,
}})
rfs, err := remote.List(&listOptions)
if err != nil {
return
}
for _, rf := range rfs {
if rf.Type() == plumbing.SymbolicReference {
continue
}
regex := regexp.MustCompile("refs/heads/(.*)$")
res := regex.FindStringSubmatch(rf.String())
if len(res) > 1 {
branches = append(branches, res[1])
}
}
return branches, nil
}
// 重置爬虫Git
func ResetSpiderGit(s model.Spider) (err error) {
// 删除文件夹
if err := os.RemoveAll(s.Src); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 创建空文件夹
if err := os.MkdirAll(s.Src, os.ModePerm); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 同步到GridFS
if err := UploadSpiderToGridFsFromMaster(s); err != nil {
return err
}
return nil
}
// 同步爬虫Git
func SyncSpiderGit(s model.Spider) (err error) {
// 如果 .git 不存在,初始化一个仓库
if !utils.Exists(path.Join(s.Src, ".git")) {
_, err := git.PlainInit(s.Src, false)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return err
}
}
// 打开 repo
repo, err := git.PlainOpen(s.Src)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 生成 URL
gitUrl := s.GitUrl
if s.GitUsername != "" && s.GitPassword != "" {
u, err := url.Parse(s.GitUrl)
if err != nil {
SaveSpiderGitSyncError(s, err.Error())
return err
}
gitUrl = fmt.Sprintf(
"%s://%s:%s@%s%s",
u.Scheme,
s.GitUsername,
s.GitPassword,
u.Hostname(),
u.Path,
)
}
// 创建 remote
_ = repo.DeleteRemote("origin")
_, err = repo.CreateRemote(&config.RemoteConfig{
Name: "origin",
URLs: []string{gitUrl},
})
if err != nil {
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 生成验证信息
var auth ssh.AuthMethod
if !strings.HasPrefix(s.GitUrl, "http") {
// 为 SSH
regex := regexp.MustCompile("^(?:ssh://?)?([0-9a-zA-Z_]+)@")
res := regex.FindStringSubmatch(s.GitUrl)
username := s.GitUsername
if username == "" {
if len(res) > 1 {
username = res[1]
} else {
username = "git"
}
}
auth, err = ssh.NewPublicKeysFromFile(username, path.Join(os.Getenv("HOME"), ".ssh", "id_rsa"), "")
if err != nil {
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
}
// 获取 repo
_ = repo.Fetch(&git.FetchOptions{
RemoteName: "origin",
Force: true,
Auth: auth,
Tags: git.AllTags,
})
// 获得 WorkTree
wt, err := repo.Worktree()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 拉取 repo
if err := wt.Pull(&git.PullOptions{
RemoteName: "origin",
Auth: auth,
ReferenceName: plumbing.HEAD,
SingleBranch: false,
}); err != nil {
if err.Error() == "already up-to-date" {
// 检查是否为 Scrapy
sync := spider_handler.SpiderSync{Spider: s}
sync.CheckIsScrapy()
// 同步到GridFS
if err := UploadSpiderToGridFsFromMaster(s); err != nil {
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 如果没有错误,则保存空字符串
SaveSpiderGitSyncError(s, "")
return nil
}
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 切换分支
if err := wt.Checkout(&git.CheckoutOptions{
Branch: plumbing.NewBranchReferenceName(s.GitBranch),
}); err != nil {
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 同步到GridFS
if err := UploadSpiderToGridFsFromMaster(s); err != nil {
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 获取更新后的爬虫
s, err = model.GetSpider(s.Id)
if err != nil {
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 检查是否为 Scrapy
sync := spider_handler.SpiderSync{Spider: s}
sync.CheckIsScrapy()
// 如果没有错误,则保存空字符串
SaveSpiderGitSyncError(s, "")
return nil
}
// 添加Git定时任务
func AddGitCronJob(s model.Spider) func() {
return func() {
if err := SyncSpiderGit(s); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
}
}
// 更新Git定时任务
func UpdateGitCron() {
if err := GitCron.Update(); err != nil {
log.Errorf(err.Error())
return
}
}
// 获取SSH公钥
func GetGitSshPublicKey() string {
if !utils.Exists(path.Join(os.Getenv("HOME"), ".ssh")) ||
!utils.Exists(path.Join(os.Getenv("HOME"), ".ssh", "id_rsa")) ||
!utils.Exists(path.Join(os.Getenv("HOME"), ".ssh", "id_rsa.pub")) {
log.Errorf("no ssh public key")
debug.PrintStack()
return ""
}
content, err := ioutil.ReadFile(path.Join(os.Getenv("HOME"), ".ssh", "id_rsa.pub"))
if err != nil {
return ""
}
return string(content)
}
// 获取Git分支
func GetGitBranches(s model.Spider) (branches []GitBranch, err error) {
// 打开 repo
repo, err := git.PlainOpen(s.Src)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return branches, err
}
iter, err := repo.Branches()
if iter == nil {
return branches, nil
}
if err := iter.ForEach(func(reference *plumbing.Reference) error {
branches = append(branches, GitBranch{
Hash: reference.Hash().String(),
Name: reference.Name().String(),
Label: reference.Name().Short(),
})
return nil
}); err != nil {
return branches, err
}
return branches, nil
}
// 获取Git Tags
func GetGitTags(s model.Spider) (tags []GitTag, err error) {
// 打开 repo
repo, err := git.PlainOpen(s.Src)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return tags, err
}
iter, err := repo.Tags()
if iter == nil {
return tags, nil
}
if err := iter.ForEach(func(reference *plumbing.Reference) error {
tags = append(tags, GitTag{
Hash: reference.Hash().String(),
Name: reference.Name().String(),
Label: reference.Name().Short(),
})
return nil
}); err != nil {
return tags, err
}
return tags, nil
}
// 获取Git Head Hash
func GetGitHeadHash(repo *git.Repository) string {
head, _ := repo.Head()
return head.Hash().String()
}
// 获取Git远端分支
func GetGitRemoteBranches(s model.Spider) (branches []GitBranch, err error) {
// 打开 repo
repo, err := git.PlainOpen(s.Src)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return branches, err
}
iter, err := repo.References()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return branches, err
}
if err := iter.ForEach(func(reference *plumbing.Reference) error {
if reference.Name().IsRemote() {
log.Infof(reference.Hash().String())
log.Infof(reference.Name().String())
branches = append(branches, GitBranch{
Hash: reference.Hash().String(),
Name: reference.Name().String(),
Label: reference.Name().Short(),
})
}
return nil
}); err != nil {
log.Error(err.Error())
debug.PrintStack()
return branches, err
}
return branches, err
}
// 获取Git Commits
func GetGitCommits(s model.Spider) (commits []GitCommit, err error) {
// 打开 repo
repo, err := git.PlainOpen(s.Src)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return commits, err
}
// 获取分支列表
branches, err := GetGitBranches(s)
branchesDict := map[string][]GitBranch{}
for _, b := range branches {
branchesDict[b.Hash] = append(branchesDict[b.Hash], b)
}
// 获取分支列表
remoteBranches, err := GetGitRemoteBranches(s)
remoteBranchesDict := map[string][]GitBranch{}
for _, b := range remoteBranches {
remoteBranchesDict[b.Hash] = append(remoteBranchesDict[b.Hash], b)
}
// 获取标签列表
tags, err := GetGitTags(s)
tagsDict := map[string][]GitTag{}
for _, t := range tags {
tagsDict[t.Hash] = append(tagsDict[t.Hash], t)
}
// 获取日志遍历器
iter, err := repo.Log(&git.LogOptions{
All: true,
})
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return commits, err
}
// 遍历日志
if err := iter.ForEach(func(commit *object.Commit) error {
gc := GitCommit{
Hash: commit.Hash.String(),
TreeHash: commit.TreeHash.String(),
Message: commit.Message,
Author: commit.Author.Name,
Email: commit.Author.Email,
Ts: commit.Author.When,
IsHead: commit.Hash.String() == GetGitHeadHash(repo),
Branches: branchesDict[commit.Hash.String()],
RemoteBranches: remoteBranchesDict[commit.Hash.String()],
Tags: tagsDict[commit.Hash.String()],
}
commits = append(commits, gc)
return nil
}); err != nil {
log.Error(err.Error())
debug.PrintStack()
return commits, err
}
return commits, nil
}
func GitCheckout(s model.Spider, hash string) (err error) {
// 打开 repo
repo, err := git.PlainOpen(s.Src)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return err
}
// 获取worktree
wt, err := repo.Worktree()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return err
}
//判断远程origin路径是否和当前的GitUrl是同一个如果不是删掉原来的路径重新拉取远程代码
remote, err := repo.Remote("origin")
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return err
}
if remote.String() != s.GitUrl {
utils.RemoveFiles(s.Src)
return SyncSpiderGit(s)
}
// Checkout
if err := wt.Checkout(&git.CheckoutOptions{
Hash: plumbing.NewHash(hash),
Create: false,
Force: true,
Keep: false,
}); err != nil {
log.Error(err.Error())
debug.PrintStack()
return err
}
return nil
}

View File

@@ -1,25 +0,0 @@
package local_node
import (
"crawlab/model"
"github.com/spf13/viper"
)
func GetLocalNode() *LocalNode {
return localNode
}
func CurrentNode() *model.Node {
return GetLocalNode().Current()
}
func InitLocalNode() (node *LocalNode, err error) {
registerType := viper.GetString("server.register.type")
ip := viper.GetString("server.register.ip")
customNodeName := viper.GetString("server.register.customNodeName")
localNode, err = NewLocalNode(ip, customNodeName, registerType)
if err != nil {
return nil, err
}
return localNode, err
}

View File

@@ -1,62 +0,0 @@
package local_node
import (
"crawlab/model"
"github.com/apex/log"
"github.com/cenkalti/backoff/v4"
"go.uber.org/atomic"
"sync"
"time"
)
var locker atomic.Int32
type mongo struct {
node *model.Node
sync.RWMutex
}
func (n *mongo) load(retry bool) (err error) {
n.Lock()
defer n.Unlock()
var node model.Node
if retry {
b := backoff.NewConstantBackOff(1 * time.Second)
err = backoff.Retry(func() error {
node, err = model.GetNodeByKey(GetLocalNode().Identify)
if err != nil {
log.WithError(err).Warnf("Get current node info from database failed. Will after %f seconds, try again.", b.NextBackOff().Seconds())
}
return err
}, b)
} else {
node, err = model.GetNodeByKey(localNode.Identify)
}
if err != nil {
return
}
n.node = &node
return nil
}
func (n *mongo) watch() {
timer := time.NewTicker(time.Second * 5)
for range timer.C {
if locker.CAS(0, 1) {
err := n.load(false)
if err != nil {
log.WithError(err).Errorf("load current node from database failed")
}
locker.Store(0)
}
continue
}
}
func (n *mongo) Current() *model.Node {
n.RLock()
defer n.RUnlock()
return n.node
}

View File

@@ -1,74 +0,0 @@
package local_node
import (
"errors"
"github.com/hashicorp/go-sockaddr"
"os"
)
var localNode *LocalNode
type IdentifyType string
const (
Ip = IdentifyType("ip")
Mac = IdentifyType("mac")
Hostname = IdentifyType("hostname")
)
type local struct {
Ip string
Mac string
Hostname string
Identify string
IdentifyType IdentifyType
}
type LocalNode struct {
local
mongo
}
func (l *LocalNode) Ready() error {
err := localNode.load(true)
if err != nil {
return err
}
go localNode.watch()
return nil
}
func NewLocalNode(ip string, identify string, identifyTypeString string) (node *LocalNode, err error) {
addrs, err := sockaddr.GetPrivateInterfaces()
if ip == "" {
if err != nil {
return node, err
}
if len(addrs) == 0 {
return node, errors.New("address not found")
}
ipaddr := *sockaddr.ToIPAddr(addrs[0].SockAddr)
ip = ipaddr.NetIP().String()
}
mac := addrs[0].HardwareAddr.String()
hostname, err := os.Hostname()
if err != nil {
return node, err
}
local := local{Ip: ip, Mac: mac, Hostname: hostname}
switch IdentifyType(identifyTypeString) {
case Ip:
local.Identify = local.Ip
local.IdentifyType = Ip
case Mac:
local.Identify = local.Mac
local.IdentifyType = Mac
case Hostname:
local.Identify = local.Hostname
local.IdentifyType = Hostname
default:
local.Identify = identify
local.IdentifyType = IdentifyType(identifyTypeString)
}
return &LocalNode{local: local, mongo: mongo{}}, nil
}

View File

@@ -1,188 +0,0 @@
package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/utils"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"io/ioutil"
"os"
"path/filepath"
"runtime/debug"
"time"
)
// 任务日志频道映射
var TaskLogChanMap = utils.NewChanMap()
// 定时删除日志
func DeleteLogPeriodically() {
logDir := viper.GetString("log.path")
if !utils.Exists(logDir) {
log.Error("Can Not Set Delete Logs Periodically,No Log Dir")
return
}
rd, err := ioutil.ReadDir(logDir)
if err != nil {
log.Error("Read Log Dir Failed")
return
}
for _, fi := range rd {
if fi.IsDir() {
log.Info(filepath.Join(logDir, fi.Name()))
_ = os.RemoveAll(filepath.Join(logDir, fi.Name()))
log.Info("Delete Log File Success")
}
}
}
// 删除本地日志
func RemoveLocalLog(path string) error {
if err := model.RemoveFile(path); err != nil {
log.Error("remove local file error: " + err.Error())
return err
}
return nil
}
// 删除远程日志
func RemoveRemoteLog(task model.Task) error {
msg := entity.NodeMessage{
Type: constants.MsgTypeRemoveLog,
LogPath: task.LogPath,
TaskId: task.Id,
}
// 发布获取日志消息
channel := "nodes:" + task.NodeId.Hex()
if _, err := database.RedisClient.Publish(channel, utils.GetJson(msg)); err != nil {
log.Errorf("publish redis error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}
// 删除日志文件
func RemoveLogByTaskId(id string) error {
t, err := model.GetTask(id)
if err != nil {
log.Error("get task error:" + err.Error())
return err
}
removeLog(t)
return nil
}
func RemoveLogByTaskStatus(status string) error {
tasks, err := model.GetTaskList(bson.M{"status": status}, 0, constants.Infinite, "-create_ts")
if err != nil {
log.Error("get tasks error:" + err.Error())
return err
}
for _, task := range tasks {
RemoveLogByTaskId(task.Id)
}
return nil
}
func removeLog(t model.Task) {
if err := RemoveLocalLog(t.LogPath); err != nil {
log.Errorf("remove local log error: %s", err.Error())
debug.PrintStack()
}
if err := RemoveRemoteLog(t); err != nil {
log.Errorf("remove remote log error: %s", err.Error())
debug.PrintStack()
}
}
// 删除日志文件
func RemoveLogBySpiderId(id bson.ObjectId) error {
tasks, err := model.GetTaskList(bson.M{"spider_id": id}, 0, constants.Infinite, "-create_ts")
if err != nil {
log.Errorf("get tasks error: %s", err.Error())
debug.PrintStack()
}
for _, task := range tasks {
removeLog(task)
}
return nil
}
// 初始化定时删除日志
func InitDeleteLogPeriodically() error {
c := cron.New(cron.WithSeconds())
if _, err := c.AddFunc(viper.GetString("log.deleteFrequency"), DeleteLogPeriodically); err != nil {
return err
}
c.Start()
return nil
}
func InitLogIndexes() error {
s, c := database.GetCol("logs")
defer s.Close()
se, ce := database.GetCol("error_logs")
defer se.Close()
_ = c.EnsureIndex(mgo.Index{
Key: []string{"task_id", "seq"},
})
_ = c.EnsureIndex(mgo.Index{
Key: []string{"task_id", "msg"},
})
_ = c.EnsureIndex(mgo.Index{
Key: []string{"expire_ts"},
Sparse: true,
ExpireAfter: 1 * time.Second,
})
_ = ce.EnsureIndex(mgo.Index{
Key: []string{"task_id"},
})
_ = ce.EnsureIndex(mgo.Index{
Key: []string{"log_id"},
})
_ = ce.EnsureIndex(mgo.Index{
Key: []string{"expire_ts"},
Sparse: true,
ExpireAfter: 1 * time.Second,
})
return nil
}
func InitLogService() error {
logLevel := viper.GetString("log.level")
if logLevel != "" {
log.SetLevelFromString(logLevel)
}
log.Info("initialized log config successfully")
if viper.GetString("log.isDeletePeriodically") == "Y" {
if err := InitDeleteLogPeriodically(); err != nil {
log.Error("init DeletePeriodically failed")
return err
}
log.Info("initialized periodically cleaning log successfully")
} else {
log.Info("periodically cleaning log is switched off")
}
if model.IsMaster() {
if err := InitLogIndexes(); err != nil {
log.Errorf(err.Error())
return err
}
}
return nil
}

View File

@@ -1,41 +0,0 @@
package services
import (
"crawlab/config"
"crawlab/utils"
"fmt"
"github.com/apex/log"
. "github.com/smartystreets/goconvey/convey"
"github.com/spf13/viper"
"os"
"testing"
)
func TestDeleteLogPeriodically(t *testing.T) {
Convey("Test DeleteLogPeriodically", t, func() {
err := config.InitConfig("../conf/config.yml")
So(err, ShouldBeNil)
log.Info("初始化配置成功")
logDir := viper.GetString("log.path")
log.Info(logDir)
DeleteLogPeriodically()
})
}
func TestGetLocalLog(t *testing.T) {
//create a log file for test
logPath := "../logs/crawlab/test.log"
f, err := os.Create(logPath)
defer utils.Close(f)
if err != nil {
fmt.Println(err)
} else {
_, err = f.WriteString("This is for test")
fmt.Println(err)
}
//delete the test log file
_ = os.Remove(logPath)
}

View File

@@ -1,37 +0,0 @@
package msg_handler
import (
"crawlab/constants"
"crawlab/entity"
"github.com/apex/log"
)
type Handler interface {
Handle() error
}
func GetMsgHandler(msg entity.NodeMessage) Handler {
log.Debugf("received msg , type is : %s", msg.Type)
//if msg.Type == constants.MsgTypeGetLog || msg.Type == constants.MsgTypeRemoveLog {
// // 日志相关
// return &Log{
// msg: msg,
// }
//} else if msg.Type == constants.MsgTypeCancelTask {
// // 任务相关
// return &Task{
// msg: msg,
// }
if msg.Type == constants.MsgTypeGetSystemInfo {
// 系统信息相关
return &SystemInfo{
msg: msg,
}
} else if msg.Type == constants.MsgTypeRemoveSpider {
// 爬虫相关
return &Spider{
SpiderId: msg.SpiderId,
}
}
return nil
}

View File

@@ -1,54 +0,0 @@
package msg_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"github.com/apex/log"
"runtime/debug"
)
type Log struct {
msg entity.NodeMessage
}
func (g *Log) Handle() error {
if g.msg.Type == constants.MsgTypeGetLog {
return g.get()
} else if g.msg.Type == constants.MsgTypeRemoveLog {
return g.remove()
}
return nil
}
func (g *Log) get() error {
// 发出的消息
msgSd := entity.NodeMessage{
Type: constants.MsgTypeGetLog,
TaskId: g.msg.TaskId,
}
// 获取本地日志
logStr, err := model.GetLocalLog(g.msg.LogPath)
if err != nil {
log.Errorf("get node local log error: %s", err.Error())
debug.PrintStack()
msgSd.Error = err.Error()
msgSd.Log = err.Error()
} else {
msgSd.Log = utils.BytesToString(logStr)
}
// 发布消息给主节点
if err := database.Pub(constants.ChannelMasterNode, msgSd); err != nil {
log.Errorf("pub log to master node error: %s", err.Error())
debug.PrintStack()
return err
}
log.Infof(msgSd.Log)
return nil
}
func (g *Log) remove() error {
return model.RemoveFile(g.msg.LogPath)
}

View File

@@ -1,24 +0,0 @@
package msg_handler
import (
"crawlab/model"
"crawlab/utils"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"path/filepath"
)
type Spider struct {
SpiderId string
}
func (s *Spider) Handle() error {
// 移除本地的爬虫目录
spider, err := model.GetSpider(bson.ObjectIdHex(s.SpiderId))
if err != nil {
return err
}
path := filepath.Join(viper.GetString("spider.path"), spider.Name)
utils.RemoveFiles(path)
return nil
}

View File

@@ -1,29 +0,0 @@
package msg_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
)
type SystemInfo struct {
msg entity.NodeMessage
}
func (s *SystemInfo) Handle() error {
// 获取环境信息
sysInfo, err := model.GetLocalSystemInfo()
if err != nil {
return err
}
msgSd := entity.NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: s.msg.NodeId,
SysInfo: sysInfo,
}
if err := database.Pub(constants.ChannelMasterNode, msgSd); err != nil {
return err
}
return nil
}

View File

@@ -1,40 +0,0 @@
package msg_handler
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"github.com/apex/log"
"runtime/debug"
"time"
)
type Task struct {
msg entity.NodeMessage
}
func (t *Task) Handle() error {
log.Infof("received cancel task msg, task_id: %s", t.msg.TaskId)
// 取消任务
ch := utils.TaskExecChanMap.ChanBlocked(t.msg.TaskId)
if ch != nil {
ch <- constants.TaskCancel
} else {
log.Infof("chan is empty, update status to abnormal")
// 节点可能被重启找不到chan
task, err := model.GetTask(t.msg.TaskId)
if err != nil {
log.Errorf("task not found, task_id: %s", t.msg.TaskId)
debug.PrintStack()
return err
}
task.Status = constants.StatusAbnormal
task.FinishTs = time.Now()
if err := task.Save(); err != nil {
debug.PrintStack()
log.Infof("cancel task error: %s", err.Error())
}
}
return nil
}

View File

@@ -1,255 +0,0 @@
package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/model"
"crawlab/services/local_node"
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"runtime/debug"
"time"
)
type Data struct {
Key string `json:"key"`
Mac string `json:"mac"`
Ip string `json:"ip"`
Hostname string `json:"hostname"`
Name string `json:"name"`
NameType string `json:"name_type"`
Master bool `json:"master"`
UpdateTs time.Time `json:"update_ts"`
UpdateTsUnix int64 `json:"update_ts_unix"`
}
// 所有调用IsMasterNode的方法都永远会在master节点执行所以GetCurrentNode方法返回永远是master节点
// 该ID的节点是否为主节点
func IsMasterNode(id string) bool {
curNode := local_node.CurrentNode()
//curNode, _ := model.GetCurrentNode()
node, _ := model.GetNode(bson.ObjectIdHex(id))
return curNode.Id == node.Id
}
// 获取节点数据
func GetNodeData() (Data, error) {
localNode := local_node.GetLocalNode()
key := localNode.Identify
if key == "" {
return Data{}, nil
}
value, err := database.RedisClient.HGet("nodes", key)
data := Data{}
if err := json.Unmarshal([]byte(value), &data); err != nil {
return data, err
}
return data, err
}
func GetRedisNode(key string) (*Data, error) {
// 获取节点数据
value, err := database.RedisClient.HGet("nodes", key)
if err != nil {
log.Errorf(err.Error())
return nil, err
}
// 解析节点列表数据
var data Data
if err := json.Unmarshal([]byte(value), &data); err != nil {
log.Errorf(err.Error())
return nil, err
}
return &data, nil
}
// 更新所有节点状态
func UpdateNodeStatus() {
// 从Redis获取节点keys
list, err := database.RedisClient.HScan("nodes")
if err != nil {
log.Errorf("get redis node keys error: %s", err.Error())
return
}
var offlineKeys []string
// 遍历节点keys
for _, dataStr := range list {
var data Data
if err := json.Unmarshal([]byte(dataStr), &data); err != nil {
log.Errorf(err.Error())
continue
}
// 如果记录的更新时间超过60秒该节点被认为离线
if time.Now().Unix()-data.UpdateTsUnix > 60 {
offlineKeys = append(offlineKeys, data.Key)
// 在Redis中删除该节点
if err := database.RedisClient.HDel("nodes", data.Key); err != nil {
log.Errorf("delete redis node key error:%s, key:%s", err.Error(), data.Key)
}
continue
}
// 处理node信息
if err = UpdateNodeInfo(&data); err != nil {
log.Errorf(err.Error())
continue
}
}
if len(offlineKeys) > 0 {
s, c := database.GetCol("nodes")
defer s.Close()
_, err = c.UpdateAll(bson.M{
"key": bson.M{
"$in": offlineKeys,
},
}, bson.M{
"$set": bson.M{
"status": constants.StatusOffline,
"update_ts": time.Now(),
"update_ts_unix": time.Now().Unix(),
},
})
if err != nil {
log.Errorf(err.Error())
}
}
}
// 处理节点信息
func UpdateNodeInfo(data *Data) (err error) {
// 更新节点信息到数据库
s, c := database.GetCol("nodes")
defer s.Close()
_, err = c.Upsert(bson.M{"key": data.Key}, bson.M{
"$set": bson.M{
"status": constants.StatusOnline,
"key": data.Key,
"name_type": data.NameType,
"ip": data.Ip,
"port": "8000",
"mac": data.Mac,
"is_master": data.Master,
"update_ts": time.Now(),
"update_ts_unix": time.Now().Unix(),
},
"$setOnInsert": bson.M{
"name": data.Name,
"_id": bson.NewObjectId(),
},
})
return err
}
// 更新节点数据
func UpdateNodeData() {
localNode := local_node.GetLocalNode()
key := localNode.Identify
// 构造节点数据
data := Data{
Key: key,
Mac: localNode.Mac,
Ip: localNode.Ip,
Hostname: localNode.Hostname,
Name: localNode.Identify,
NameType: string(localNode.IdentifyType),
Master: model.IsMaster(),
UpdateTs: time.Now(),
UpdateTsUnix: time.Now().Unix(),
}
// 注册节点到Redis
dataBytes, err := json.Marshal(&data)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
if err := database.RedisClient.HSet("nodes", key, utils.BytesToString(dataBytes)); err != nil {
log.Errorf(err.Error())
return
}
}
// 发送心跳信息到Redis每5秒发送一次
func SendHeartBeat() {
for {
UpdateNodeData()
time.Sleep(5 * time.Second)
}
}
// 每10秒刷新一次节点信息
func UpdateNodeStatusPeriodically() {
for {
UpdateNodeStatus()
time.Sleep(10 * time.Second)
}
}
// 每60秒更新异常节点信息
func UpdateOfflineNodeTaskToAbnormalPeriodically() {
for {
nodes, err := model.GetNodeList(bson.M{"status": constants.StatusOffline})
if err != nil {
log.Errorf("get nodes error: " + err.Error())
debug.PrintStack()
continue
}
for _, n := range nodes {
if err := model.UpdateTaskToAbnormal(n.Id); err != nil {
log.Errorf("update task to abnormal error: " + err.Error())
debug.PrintStack()
continue
}
}
time.Sleep(60 * time.Second)
}
}
// 初始化节点服务
func InitNodeService() error {
node, err := local_node.InitLocalNode()
if err != nil {
return err
}
// 每5秒更新一次本节点信息
go SendHeartBeat()
// 首次更新节点数据注册到Redis
UpdateNodeData()
if model.IsMaster() {
err = model.UpdateMasterNodeInfo(node.Identify, node.Ip, node.Mac, node.Hostname)
if err != nil {
return err
}
}
// 节点准备完毕
if err = node.Ready(); err != nil {
return err
}
// 如果为主节点
if model.IsMaster() {
// 每10秒刷新所有节点信息
go UpdateNodeStatusPeriodically()
// 每60秒更新离线节点任务为异常
go UpdateOfflineNodeTaskToAbnormalPeriodically()
}
// 更新在当前节点执行中的任务状态为abnormal
if err := model.UpdateTaskToAbnormal(node.Current().Id); err != nil {
debug.PrintStack()
return err
}
return nil
}

View File

@@ -1,138 +0,0 @@
package notification
import (
"errors"
"github.com/apex/log"
"github.com/matcornic/hermes"
"gopkg.in/gomail.v2"
"net/mail"
"os"
"runtime/debug"
"strconv"
)
func SendMail(toEmail string, toName string, subject string, content string) error {
// hermes instance
h := hermes.Hermes{
Theme: new(hermes.Default),
Product: hermes.Product{
Name: "Crawlab Team",
Copyright: "© 2019 Crawlab, Made by Crawlab-Team",
},
}
// config
port, _ := strconv.Atoi(os.Getenv("CRAWLAB_NOTIFICATION_MAIL_PORT"))
password := os.Getenv("CRAWLAB_NOTIFICATION_MAIL_SMTP_PASSWORD")
SMTPUser := os.Getenv("CRAWLAB_NOTIFICATION_MAIL_SMTP_USER")
smtpConfig := smtpAuthentication{
Server: os.Getenv("CRAWLAB_NOTIFICATION_MAIL_SERVER"),
Port: port,
SenderEmail: os.Getenv("CRAWLAB_NOTIFICATION_MAIL_SENDEREMAIL"),
SenderIdentity: os.Getenv("CRAWLAB_NOTIFICATION_MAIL_SENDERIDENTITY"),
SMTPPassword: password,
SMTPUser: SMTPUser,
}
options := sendOptions{
To: toEmail,
Subject: subject,
}
// email instance
email := hermes.Email{
Body: hermes.Body{
Name: toName,
FreeMarkdown: hermes.Markdown(content + GetFooter()),
},
}
// generate html
html, err := h.GenerateHTML(email)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// generate text
text, err := h.GeneratePlainText(email)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// send the email
if err := send(smtpConfig, options, html, text); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
return nil
}
type smtpAuthentication struct {
Server string
Port int
SenderEmail string
SenderIdentity string
SMTPUser string
SMTPPassword string
}
// sendOptions are options for sending an email
type sendOptions struct {
To string
Subject string
}
// send sends the email
func send(smtpConfig smtpAuthentication, options sendOptions, htmlBody string, txtBody string) error {
if smtpConfig.Server == "" {
return errors.New("SMTP server config is empty")
}
if smtpConfig.Port == 0 {
return errors.New("SMTP port config is empty")
}
if smtpConfig.SMTPUser == "" {
return errors.New("SMTP user is empty")
}
if smtpConfig.SenderIdentity == "" {
return errors.New("SMTP sender identity is empty")
}
if smtpConfig.SenderEmail == "" {
return errors.New("SMTP sender email is empty")
}
if options.To == "" {
return errors.New("no receiver emails configured")
}
from := mail.Address{
Name: smtpConfig.SenderIdentity,
Address: smtpConfig.SenderEmail,
}
m := gomail.NewMessage()
m.SetHeader("From", from.String())
m.SetHeader("To", options.To)
m.SetHeader("Subject", options.Subject)
m.SetBody("text/plain", txtBody)
m.AddAlternative("text/html", htmlBody)
d := gomail.NewPlainDialer(smtpConfig.Server, smtpConfig.Port, smtpConfig.SMTPUser, smtpConfig.SMTPPassword)
return d.DialAndSend(m)
}
func GetFooter() string {
return `
[Github](https://github.com/crawlab-team/crawlab) | [Documentation](http://docs.crawlab.cn) | [Docker](https://hub.docker.com/r/tikazyq/crawlab)
`
}

View File

@@ -1,59 +0,0 @@
package notification
import (
"errors"
"github.com/apex/log"
"github.com/imroc/req"
"runtime/debug"
)
func SendMobileNotification(webhook string, title string, content string) error {
type ResBody struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
}
// 请求头
header := req.Header{
"Content-Type": "application/json; charset=utf-8",
}
// 请求数据
data := req.Param{
"msgtype": "markdown",
"markdown": req.Param{
"title": title,
"text": content,
"content": content,
},
"at": req.Param{
"atMobiles": []string{},
"isAtAll": false,
},
}
// 发起请求
res, err := req.Post(webhook, header, req.BodyJSON(&data))
if err != nil {
log.Errorf("dingtalk notification error: " + err.Error())
debug.PrintStack()
return err
}
// 解析响应
var resBody ResBody
if err := res.ToJSON(&resBody); err != nil {
log.Errorf("dingtalk notification error: " + err.Error())
debug.PrintStack()
return err
}
// 判断响应是否报错
if resBody.ErrCode != 0 {
log.Errorf("dingtalk notification error: " + resBody.ErrMsg)
debug.PrintStack()
return errors.New(resBody.ErrMsg)
}
return nil
}

View File

@@ -1,93 +0,0 @@
package services
import (
"crawlab/constants"
"crawlab/model"
"crawlab/utils"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"github.com/imroc/req"
uuid "github.com/satori/go.uuid"
"github.com/spf13/viper"
"path"
"path/filepath"
"runtime/debug"
"strings"
)
func DownloadRepo(fullName string, userId bson.ObjectId) (err error) {
// 下载 zip 文件
url := fmt.Sprintf("%s/%s.zip", viper.GetString("repo.ossUrl"), fullName)
progress := func(current, total int64) {
fmt.Println(float32(current)/float32(total)*100, "%")
}
res, err := req.Get(url, req.DownloadProgress(progress))
if err != nil {
log.Errorf("download repo error: " + err.Error())
debug.PrintStack()
return err
}
spiderName := strings.Replace(fullName, "/", "_", -1)
randomId := uuid.NewV4()
tmpFilePath := filepath.Join(viper.GetString("other.tmppath"), spiderName+"."+randomId.String()+".zip")
if err := res.ToFile(tmpFilePath); err != nil {
log.Errorf("to file error: " + err.Error())
debug.PrintStack()
return err
}
// 解压 zip 文件
tmpFile := utils.OpenFile(tmpFilePath)
if err := utils.DeCompress(tmpFile, viper.GetString("other.tmppath")); err != nil {
log.Errorf("de-compress error: " + err.Error())
debug.PrintStack()
return err
}
// 拷贝文件
spiderPath := path.Join(viper.GetString("spider.path"), spiderName)
srcDirPath := fmt.Sprintf("%s/data/github.com/%s", viper.GetString("other.tmppath"), fullName)
if err := utils.CopyDir(srcDirPath, spiderPath); err != nil {
log.Errorf("copy error: " + err.Error())
debug.PrintStack()
return err
}
// 创建爬虫
spider := model.GetSpiderByName(spiderName)
if spider.Name == "" {
// 新增
spider = model.Spider{
Id: bson.NewObjectId(),
Name: spiderName,
DisplayName: spiderName,
Type: constants.Customized,
Src: spiderPath,
ProjectId: bson.ObjectIdHex(constants.ObjectIdNull),
FileId: bson.ObjectIdHex(constants.ObjectIdNull),
UserId: userId,
}
if err := spider.Add(); err != nil {
log.Error("add spider error: " + err.Error())
debug.PrintStack()
return err
}
} else {
// 更新
if err := spider.Save(); err != nil {
log.Error("save spider error: " + err.Error())
debug.PrintStack()
return err
}
}
// 上传爬虫
if err := UploadSpiderToGridFsFromMaster(spider); err != nil {
log.Error("upload spider error: " + err.Error())
debug.PrintStack()
return err
}
return nil
}

View File

@@ -1,146 +0,0 @@
package rpc
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/model"
"crawlab/services/local_node"
"crawlab/utils"
"encoding/json"
"errors"
"fmt"
"github.com/apex/log"
"github.com/cenkalti/backoff/v4"
"github.com/gomodule/redigo/redis"
uuid "github.com/satori/go.uuid"
"runtime/debug"
)
// RPC服务基础类
type Service interface {
ServerHandle() (entity.RpcMessage, error)
ClientHandle() (interface{}, error)
}
// 客户端处理消息函数
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
return func() (replyMsg entity.RpcMessage, err error) {
// 请求ID
msg.Id = uuid.NewV4().String()
// 发送RPC消息
msgStr := utils.ObjectToString(msg)
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 获取RPC回复消息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
if err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 反序列化消息
if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 如果返回消息有错误,返回错误
if replyMsg.Error != "" {
return replyMsg, errors.New(replyMsg.Error)
}
return
}
}
// 获取RPC服务
func GetService(msg entity.RpcMessage) Service {
switch msg.Method {
case constants.RpcInstallLang:
return &InstallLangService{msg: msg}
case constants.RpcInstallDep:
return &InstallDepService{msg: msg}
case constants.RpcUninstallDep:
return &UninstallDepService{msg: msg}
case constants.RpcGetLang:
return &GetLangService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
case constants.RpcCancelTask:
return &CancelTaskService{msg: msg}
case constants.RpcGetSystemInfoService:
return &GetSystemInfoService{msg: msg}
}
return nil
}
// 处理RPC消息
func handleMsg(msgStr string, node *model.Node) {
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
// 获取service
service := GetService(msg)
// 根据Method调用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
// 发送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}
// 初始化服务端RPC服务
func InitRpcService() error {
go func() {
node := local_node.CurrentNode()
for {
// 获取当前节点
//node, err := model.GetCurrentNode()
//if err != nil {
// log.Errorf(err.Error())
// debug.PrintStack()
// continue
//}
b := backoff.NewExponentialBackOff()
bp := backoff.WithMaxRetries(b, 10)
var msgStr string
var err error
err = backoff.Retry(func() error {
msgStr, err = database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil && err != redis.ErrNil {
log.WithError(err).Warnf("waiting for redis pool active connection. will after %f seconds try again.", b.NextBackOff().Seconds())
return err
}
return err
}, bp)
if err != nil {
continue
}
// 处理消息
go handleMsg(msgStr, node)
}
}()
return nil
}

View File

@@ -1,63 +0,0 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"errors"
"fmt"
"github.com/globalsign/mgo/bson"
)
type CancelTaskService struct {
msg entity.RpcMessage
}
func (s *CancelTaskService) ServerHandle() (entity.RpcMessage, error) {
taskId := utils.GetRpcParam("task_id", s.msg.Params)
nodeId := utils.GetRpcParam("node_id", s.msg.Params)
if err := CancelTaskLocal(taskId, nodeId); err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
s.msg.Result = "success"
return s.msg, nil
}
func (s *CancelTaskService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
_, err = ClientFunc(s.msg)()
if err != nil {
return
}
return
}
func CancelTaskLocal(taskId string, nodeId string) error {
if !utils.TaskExecChanMap.HasChanKey(taskId) {
_ = model.UpdateTaskToAbnormal(bson.ObjectIdHex(nodeId))
return errors.New(fmt.Sprintf("task id (%s) does not exist", taskId))
}
ch := utils.TaskExecChanMap.ChanBlocked(taskId)
ch <- constants.TaskCancel
return nil
}
func CancelTaskRemote(taskId string, nodeId string) (err error) {
params := make(map[string]string)
params["task_id"] = taskId
params["node_id"] = nodeId
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcCancelTask,
Params: params,
Timeout: 60,
})
_, err = s.ClientHandle()
if err != nil {
return
}
return
}

View File

@@ -1,123 +0,0 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/utils"
"encoding/json"
"os/exec"
"regexp"
"runtime/debug"
"strings"
)
type GetInstalledDepsService struct {
msg entity.RpcMessage
}
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
deps, err := GetInstalledDepsLocal(lang)
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
resultStr, _ := json.Marshal(deps)
s.msg.Result = string(resultStr)
return s.msg, nil
}
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
// 反序列化
var output []entity.Dependency
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}
// 获取本地已安装依赖列表
func GetInstalledDepsLocal(lang string) (deps []entity.Dependency, err error) {
if lang == constants.Python {
deps, err = GetPythonInstalledDepListLocal()
} else if lang == constants.Nodejs {
deps, err = GetNodejsInstalledDepListLocal()
}
return deps, err
}
// 获取Python本地已安装依赖列表
func GetPythonInstalledDepListLocal() ([]entity.Dependency, error) {
var list []entity.Dependency
cmd := exec.Command("pip", "freeze")
outputBytes, err := cmd.Output()
if err != nil {
debug.PrintStack()
return list, err
}
for _, line := range strings.Split(string(outputBytes), "\n") {
arr := strings.Split(line, "==")
if len(arr) < 2 {
continue
}
dep := entity.Dependency{
Name: strings.ToLower(arr[0]),
Version: arr[1],
Installed: true,
}
list = append(list, dep)
}
return list, nil
}
// 获取Node.js本地已安装依赖列表
func GetNodejsInstalledDepListLocal() ([]entity.Dependency, error) {
var list []entity.Dependency
cmd := exec.Command("npm", "ls", "-g", "--depth", "0")
outputBytes, _ := cmd.Output()
regex := regexp.MustCompile("\\s(.*)@(.*)")
for _, line := range strings.Split(string(outputBytes), "\n") {
arr := regex.FindStringSubmatch(line)
if len(arr) < 3 {
continue
}
dep := entity.Dependency{
Name: strings.ToLower(arr[1]),
Version: arr[2],
Installed: true,
}
list = append(list, dep)
}
return list, nil
}
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetInstalledDepList,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
deps = o.([]entity.Dependency)
return
}

View File

@@ -1,82 +0,0 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/utils"
"encoding/json"
)
type GetLangService struct {
msg entity.RpcMessage
}
func (s *GetLangService) ServerHandle() (entity.RpcMessage, error) {
langName := utils.GetRpcParam("lang", s.msg.Params)
lang := utils.GetLangFromLangNamePlain(langName)
l := GetLangLocal(lang)
lang.InstallStatus = l.InstallStatus
// 序列化
resultStr, _ := json.Marshal(lang)
s.msg.Result = string(resultStr)
return s.msg, nil
}
func (s *GetLangService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
var output entity.Lang
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}
func GetLangLocal(lang entity.Lang) entity.Lang {
// 检查是否存在执行路径
for _, p := range lang.ExecutablePaths {
if utils.Exists(p) {
lang.InstallStatus = constants.InstallStatusInstalled
return lang
}
}
//// 检查是否正在安装
//if utils.Exists(lang.LockPath) {
// lang.InstallStatus = constants.InstallStatusInstalling
// return lang
//}
//
//// 检查其他语言是否在安装
//if utils.Exists("/tmp/install.lock") {
// lang.InstallStatus = constants.InstallStatusInstallingOther
// return lang
//}
lang.InstallStatus = constants.InstallStatusNotInstalled
return lang
}
func GetLangRemote(nodeId string, lang entity.Lang) (l entity.Lang, err error) {
params := make(map[string]string)
params["lang"] = lang.ExecutableName
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetLang,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
l = o.(entity.Lang)
return
}

View File

@@ -1,67 +0,0 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"encoding/json"
)
type GetSystemInfoService struct {
msg entity.RpcMessage
}
func (s *GetSystemInfoService) ServerHandle() (entity.RpcMessage, error) {
sysInfo, err := GetSystemInfoServiceLocal()
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
// 序列化
resultStr, _ := json.Marshal(sysInfo)
s.msg.Result = string(resultStr)
return s.msg, nil
}
func (s *GetSystemInfoService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
var output entity.SystemInfo
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}
func GetSystemInfoServiceLocal() (sysInfo entity.SystemInfo, err error) {
// 获取环境信息
sysInfo, err = model.GetLocalSystemInfo()
if err != nil {
return sysInfo, err
}
return sysInfo, nil
}
func GetSystemInfoServiceRemote(nodeId string) (sysInfo entity.SystemInfo, err error) {
params := make(map[string]string)
params["node_id"] = nodeId
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetSystemInfoService,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
sysInfo = o.(entity.SystemInfo)
return
}

View File

@@ -1,100 +0,0 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/utils"
"errors"
"fmt"
"github.com/apex/log"
"os/exec"
"runtime/debug"
)
type InstallDepService struct {
msg entity.RpcMessage
}
func (s *InstallDepService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
depName := utils.GetRpcParam("dep_name", s.msg.Params)
if err := InstallDepLocal(lang, depName); err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
s.msg.Result = "success"
return s.msg, nil
}
func (s *InstallDepService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
_, err = ClientFunc(s.msg)()
if err != nil {
return
}
return
}
func InstallDepLocal(lang string, depName string) error {
if lang == constants.Python {
_, err := InstallPythonDepLocal(depName)
if err != nil {
return err
}
} else if lang == constants.Nodejs {
_, err := InstallNodejsDepLocal(depName)
if err != nil {
return err
}
} else {
return errors.New(fmt.Sprintf("%s is not implemented", lang))
}
return nil
}
// 安装Python本地依赖
func InstallPythonDepLocal(depName string) (string, error) {
// 依赖镜像URL
url := "https://pypi.tuna.tsinghua.edu.cn/simple"
cmd := exec.Command("pip", "install", depName, "-i", url)
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
func InstallNodejsDepLocal(depName string) (string, error) {
// 依赖镜像URL
url := "https://registry.npm.taobao.org"
cmd := exec.Command("npm", "install", depName, "-g", "--registry", url)
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
func InstallDepRemote(nodeId string, lang string, depName string) (err error) {
params := make(map[string]string)
params["lang"] = lang
params["dep_name"] = depName
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcInstallDep,
Params: params,
Timeout: 300,
})
_, err = s.ClientHandle()
if err != nil {
return
}
return
}

View File

@@ -1,73 +0,0 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/utils"
"errors"
"fmt"
"github.com/apex/log"
"os/exec"
"path"
"runtime/debug"
)
type InstallLangService struct {
msg entity.RpcMessage
}
func (s *InstallLangService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
output, err := InstallLangLocal(lang)
s.msg.Result = output
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
return s.msg, nil
}
func (s *InstallLangService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
go func() {
_, err := ClientFunc(s.msg)()
if err != nil {
return
}
}()
return
}
// 本地安装语言
func InstallLangLocal(lang string) (o string, err error) {
l := utils.GetLangFromLangNamePlain(lang)
if l.Name == "" || l.InstallScript == "" {
return "", errors.New(fmt.Sprintf("%s is not implemented", lang))
}
cmd := exec.Command("/bin/sh", path.Join("scripts", l.InstallScript))
output, err := cmd.Output()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return string(output), err
}
return
}
// 远端安装语言
func InstallLangRemote(nodeId string, lang string) (o string, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcInstallLang,
Params: params,
Timeout: 60,
})
_, err = s.ClientHandle()
if err != nil {
return
}
return
}

View File

@@ -1,62 +0,0 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"crawlab/utils"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"path/filepath"
)
type RemoveSpiderService struct {
msg entity.RpcMessage
}
func (s *RemoveSpiderService) ServerHandle() (entity.RpcMessage, error) {
spiderId := utils.GetRpcParam("spider_id", s.msg.Params)
if err := RemoveSpiderServiceLocal(spiderId); err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
s.msg.Result = "success"
return s.msg, nil
}
func (s *RemoveSpiderService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
_, err = ClientFunc(s.msg)()
if err != nil {
return
}
return
}
func RemoveSpiderServiceLocal(spiderId string) error {
// 移除本地的爬虫目录
spider, err := model.GetSpider(bson.ObjectIdHex(spiderId))
if err != nil {
return err
}
path := filepath.Join(viper.GetString("spider.path"), spider.Name)
utils.RemoveFiles(path)
return nil
}
func RemoveSpiderServiceRemote(spiderId string, nodeId string) (err error) {
params := make(map[string]string)
params["spider_id"] = spiderId
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcRemoveSpider,
Params: params,
Timeout: 60,
})
_, err = s.ClientHandle()
if err != nil {
return
}
return
}

View File

@@ -1,96 +0,0 @@
package rpc
import (
"crawlab/constants"
"crawlab/entity"
"crawlab/utils"
"errors"
"fmt"
"github.com/apex/log"
"os/exec"
"runtime/debug"
)
type UninstallDepService struct {
msg entity.RpcMessage
}
func (s *UninstallDepService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
depName := utils.GetRpcParam("dep_name", s.msg.Params)
if err := UninstallDepLocal(lang, depName); err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
s.msg.Result = "success"
return s.msg, nil
}
func (s *UninstallDepService) ClientHandle() (o interface{}, err error) {
// 发起 RPC 请求,获取服务端数据
_, err = ClientFunc(s.msg)()
if err != nil {
return
}
return
}
func UninstallDepLocal(lang string, depName string) error {
if lang == constants.Python {
output, err := UninstallPythonDepLocal(depName)
if err != nil {
log.Debugf(output)
return err
}
} else if lang == constants.Nodejs {
output, err := UninstallNodejsDepLocal(depName)
if err != nil {
log.Debugf(output)
return err
}
} else {
return errors.New(fmt.Sprintf("%s is not implemented", lang))
}
return nil
}
func UninstallPythonDepLocal(depName string) (string, error) {
cmd := exec.Command("pip", "uninstall", "-y", depName)
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(string(outputBytes))
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
func UninstallNodejsDepLocal(depName string) (string, error) {
cmd := exec.Command("npm", "uninstall", depName, "-g")
outputBytes, err := cmd.Output()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return fmt.Sprintf("error: %s", err.Error()), err
}
return string(outputBytes), nil
}
func UninstallDepRemote(nodeId string, lang string, depName string) (err error) {
params := make(map[string]string)
params["lang"] = lang
params["dep_name"] = depName
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcUninstallDep,
Params: params,
Timeout: 300,
})
_, err = s.ClientHandle()
if err != nil {
return
}
return
}

View File

@@ -1,274 +0,0 @@
package services
import (
"crawlab/constants"
"crawlab/lib/cron"
"crawlab/model"
"errors"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
uuid "github.com/satori/go.uuid"
"runtime/debug"
)
var Sched *Scheduler
type Scheduler struct {
cron *cron.Cron
}
func AddScheduleTask(s model.Schedule) func() {
return func() {
// 生成任务ID
id := uuid.NewV4()
// 参数
var param string
// 爬虫
spider, err := model.GetSpider(s.SpiderId)
if err != nil {
return
}
// scrapy 爬虫
if spider.IsScrapy {
if s.ScrapySpider == "" {
log.Errorf("scrapy spider is not set")
debug.PrintStack()
return
}
param = s.ScrapySpider + " -L " + s.ScrapyLogLevel + " " + s.Param
} else {
param = s.Param
}
if s.RunType == constants.RunTypeAllNodes {
// 所有节点
nodes, err := model.GetNodeList(nil)
if err != nil {
return
}
for _, node := range nodes {
t := model.Task{
Id: id.String(),
SpiderId: s.SpiderId,
NodeId: node.Id,
Param: param,
UserId: s.UserId,
RunType: constants.RunTypeAllNodes,
ScheduleId: s.Id,
Type: constants.TaskTypeSpider,
}
if _, err := AddTask(t); err != nil {
return
}
}
} else if s.RunType == constants.RunTypeRandom {
// 随机
t := model.Task{
Id: id.String(),
SpiderId: s.SpiderId,
Param: param,
UserId: s.UserId,
RunType: constants.RunTypeRandom,
ScheduleId: s.Id,
Type: constants.TaskTypeSpider,
}
if _, err := AddTask(t); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
} else if s.RunType == constants.RunTypeSelectedNodes {
// 指定节点
for _, nodeId := range s.NodeIds {
t := model.Task{
Id: id.String(),
SpiderId: s.SpiderId,
NodeId: nodeId,
Param: param,
UserId: s.UserId,
RunType: constants.RunTypeSelectedNodes,
ScheduleId: s.Id,
Type: constants.TaskTypeSpider,
}
if _, err := AddTask(t); err != nil {
return
}
}
} else {
return
}
}
}
func UpdateSchedules() {
if err := Sched.Update(); err != nil {
log.Errorf(err.Error())
return
}
}
func (s *Scheduler) Start() error {
exec := cron.New(cron.WithSeconds())
// 启动cron服务
s.cron.Start()
// 更新任务列表
if err := s.Update(); err != nil {
log.Errorf("update scheduler error: %s", err.Error())
debug.PrintStack()
return err
}
// 每30秒更新一次任务列表
spec := "*/30 * * * * *"
if _, err := exec.AddFunc(spec, UpdateSchedules); err != nil {
log.Errorf("add func update schedulers error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}
func (s *Scheduler) AddJob(job model.Schedule) error {
spec := job.Cron
// 添加定时任务
eid, err := s.cron.AddFunc(spec, AddScheduleTask(job))
if err != nil {
log.Errorf("add func task error: %s", err.Error())
debug.PrintStack()
return err
}
// 更新EntryID
job.EntryId = eid
// 更新状态
job.Status = constants.ScheduleStatusRunning
job.Enabled = true
// 保存定时任务
if err := job.Save(); err != nil {
log.Errorf("job save error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}
func (s *Scheduler) RemoveAll() {
entries := s.cron.Entries()
for i := 0; i < len(entries); i++ {
s.cron.Remove(entries[i].ID)
}
}
// 验证cron表达式是否正确
func ParserCron(spec string) error {
parser := cron.NewParser(
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
if _, err := parser.Parse(spec); err != nil {
return err
}
return nil
}
// 禁用定时任务
func (s *Scheduler) Disable(id bson.ObjectId) error {
schedule, err := model.GetSchedule(id)
if err != nil {
return err
}
if schedule.EntryId == 0 {
return errors.New("entry id not found")
}
// 从cron服务中删除该任务
s.cron.Remove(schedule.EntryId)
// 更新状态
schedule.Status = constants.ScheduleStatusStop
schedule.Enabled = false
if err = schedule.Save(); err != nil {
return err
}
return nil
}
// 启用定时任务
func (s *Scheduler) Enable(id bson.ObjectId) error {
schedule, err := model.GetSchedule(id)
if err != nil {
return err
}
if err := s.AddJob(schedule); err != nil {
return err
}
return nil
}
func (s *Scheduler) Update() error {
// 删除所有定时任务
s.RemoveAll()
// 获取所有定时任务
sList, err := model.GetScheduleList(bson.M{"enabled": true})
if err != nil {
log.Errorf("get scheduler list error: %s", err.Error())
debug.PrintStack()
return err
}
user, err := model.GetUserByUsername("admin")
if err != nil {
log.Errorf("get admin user error: %s", err.Error())
return err
}
// 遍历任务列表
for i := 0; i < len(sList); i++ {
// 单个任务
job := sList[i]
if job.Status == constants.ScheduleStatusStop {
continue
}
// 兼容以前版本
if job.UserId.Hex() == "" {
job.UserId = user.Id
}
// 添加到定时任务
if err := s.AddJob(job); err != nil {
log.Errorf("add job error: %s, job: %s, cron: %s", err.Error(), job.Name, job.Cron)
debug.PrintStack()
return err
}
}
return nil
}
func InitScheduler() error {
Sched = &Scheduler{
cron: cron.New(cron.WithSeconds()),
}
if err := Sched.Start(); err != nil {
log.Errorf("start scheduler error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}

View File

@@ -1,285 +0,0 @@
package services
import (
"bytes"
"crawlab/constants"
"crawlab/entity"
"crawlab/model"
"encoding/json"
"errors"
"fmt"
"github.com/Unknwon/goconfig"
"github.com/apex/log"
"io/ioutil"
"os"
"os/exec"
"path"
"runtime/debug"
"strconv"
"strings"
)
func GetScrapySpiderNames(s model.Spider) ([]string, error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("scrapy", "list")
cmd.Dir = s.Src
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return []string{}, errors.New(stderr.String())
}
spiderNames := strings.Split(stdout.String(), "\n")
var res []string
for _, sn := range spiderNames {
if sn != "" {
res = append(res, sn)
}
}
return res, nil
}
func GetScrapySettings(s model.Spider) (res []map[string]interface{}, err error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("crawlab", "settings")
cmd.Dir = s.Src
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
log.Errorf(err.Error())
log.Errorf(stderr.String())
debug.PrintStack()
return res, errors.New(stderr.String())
}
if err := json.Unmarshal([]byte(stdout.String()), &res); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return res, err
}
return res, nil
}
func SaveScrapySettings(s model.Spider, settingsData []entity.ScrapySettingParam) (err error) {
// 读取 scrapy.cfg
cfg, err := goconfig.LoadConfigFile(path.Join(s.Src, "scrapy.cfg"))
if err != nil {
return
}
modName, err := cfg.GetValue("settings", "default")
if err != nil {
return
}
// 定位到 settings.py 文件
arr := strings.Split(modName, ".")
dirName := arr[0]
fileName := arr[1]
filePath := fmt.Sprintf("%s/%s/%s.py", s.Src, dirName, fileName)
// 生成文件内容
content := ""
for _, param := range settingsData {
var line string
switch param.Type {
case constants.String:
line = fmt.Sprintf("%s = '%s'", param.Key, param.Value)
case constants.Number:
n := int64(param.Value.(float64))
s := strconv.FormatInt(n, 10)
line = fmt.Sprintf("%s = %s", param.Key, s)
case constants.Boolean:
if param.Value.(bool) {
line = fmt.Sprintf("%s = %s", param.Key, "True")
} else {
line = fmt.Sprintf("%s = %s", param.Key, "False")
}
case constants.Array:
arr := param.Value.([]interface{})
var arrStr []string
for _, s := range arr {
arrStr = append(arrStr, s.(string))
}
line = fmt.Sprintf("%s = ['%s']", param.Key, strings.Join(arrStr, "','"))
case constants.Object:
value := param.Value.(map[string]interface{})
var arr []string
for k, v := range value {
n := int64(v.(float64))
s := strconv.FormatInt(n, 10)
arr = append(arr, fmt.Sprintf("'%s': %s", k, s))
}
line = fmt.Sprintf("%s = {%s}", param.Key, strings.Join(arr, ","))
}
content += line + "\n"
}
// 写到 settings.py
if err := ioutil.WriteFile(filePath, []byte(content), os.ModePerm); err != nil {
return err
}
// 同步到GridFS
if err := UploadSpiderToGridFsFromMaster(s); err != nil {
return err
}
return
}
func GetScrapyItems(s model.Spider) (res []map[string]interface{}, err error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("crawlab", "items")
cmd.Dir = s.Src
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
log.Errorf(err.Error())
log.Errorf(stderr.String())
debug.PrintStack()
return res, errors.New(stderr.String())
}
if err := json.Unmarshal([]byte(stdout.String()), &res); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return res, err
}
return res, nil
}
func SaveScrapyItems(s model.Spider, itemsData []entity.ScrapyItem) (err error) {
// 读取 scrapy.cfg
cfg, err := goconfig.LoadConfigFile(path.Join(s.Src, "scrapy.cfg"))
if err != nil {
return
}
modName, err := cfg.GetValue("settings", "default")
if err != nil {
return
}
// 定位到 settings.py 文件
arr := strings.Split(modName, ".")
dirName := arr[0]
fileName := "items"
filePath := fmt.Sprintf("%s/%s/%s.py", s.Src, dirName, fileName)
// 生成文件内容
content := ""
content += "import scrapy\n"
content += "\n\n"
for _, item := range itemsData {
content += fmt.Sprintf("class %s(scrapy.Item):\n", item.Name)
for _, field := range item.Fields {
content += fmt.Sprintf(" %s = scrapy.Field()\n", field)
}
content += "\n\n"
}
// 写到 settings.py
if err := ioutil.WriteFile(filePath, []byte(content), os.ModePerm); err != nil {
return err
}
// 同步到GridFS
if err := UploadSpiderToGridFsFromMaster(s); err != nil {
return err
}
return
}
func GetScrapyPipelines(s model.Spider) (res []string, err error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("crawlab", "pipelines")
cmd.Dir = s.Src
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
log.Errorf(err.Error())
log.Errorf(stderr.String())
debug.PrintStack()
return res, errors.New(stderr.String())
}
if err := json.Unmarshal([]byte(stdout.String()), &res); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return res, err
}
return res, nil
}
func GetScrapySpiderFilepath(s model.Spider, spiderName string) (res string, err error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("crawlab", "find_spider_filepath", "-n", spiderName)
cmd.Dir = s.Src
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
log.Errorf(err.Error())
log.Errorf(stderr.String())
debug.PrintStack()
return res, err
}
res = strings.Replace(stdout.String(), "\n", "", 1)
return res, nil
}
func CreateScrapySpider(s model.Spider, name string, domain string, template string) (err error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("scrapy", "genspider", name, domain, "-t", template)
cmd.Dir = s.Src
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
log.Errorf(err.Error())
log.Errorf("stdout: " + stdout.String())
log.Errorf("stderr: " + stderr.String())
debug.PrintStack()
return err
}
return
}
func CreateScrapyProject(s model.Spider) (err error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("scrapy", "startproject", s.Name, s.Src)
cmd.Dir = s.Src
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
log.Errorf(err.Error())
log.Errorf("stdout: " + stdout.String())
log.Errorf("stderr: " + stderr.String())
debug.PrintStack()
return err
}
return
}

View File

@@ -1,624 +0,0 @@
package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/spider_handler"
"crawlab/utils"
"errors"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/satori/go.uuid"
"github.com/spf13/viper"
"gopkg.in/yaml.v2"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"runtime/debug"
"time"
)
type SpiderFileData struct {
FileName string
File []byte
}
type SpiderUploadMessage struct {
FileId string
FileName string
SpiderId string
}
// 从主节点上传爬虫到GridFS
func UploadSpiderToGridFsFromMaster(spider model.Spider) error {
// 爬虫所在目录
spiderDir := spider.Src
// 打包为 zip 文件
files, err := utils.GetFilesFromDir(spiderDir)
if err != nil {
return err
}
randomId := uuid.NewV4()
tmpFilePath := filepath.Join(viper.GetString("other.tmppath"), spider.Name+"."+randomId.String()+".zip")
spiderZipFileName := spider.Name + ".zip"
if err := utils.Compress(files, tmpFilePath); err != nil {
return err
}
// 获取 GridFS 实例
s, gf := database.GetGridFs("files")
defer s.Close()
// 判断文件是否已经存在
var gfFile model.GridFs
if err := gf.Find(bson.M{"filename": spiderZipFileName}).One(&gfFile); err == nil {
// 已经存在文件,则删除
log.Errorf(gfFile.Id.Hex() + " already exists. removing...")
if err := gf.RemoveId(gfFile.Id); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
}
// 上传到GridFs
fid, err := RetryUploadToGridFs(spiderZipFileName, tmpFilePath)
if err != nil {
log.Errorf("upload to grid fs error: %s", err.Error())
}
// 保存爬虫 FileId
spider.FileId = fid
if err := spider.Save(); err != nil {
return err
}
// 获取爬虫同步实例
spiderSync := spider_handler.SpiderSync{
Spider: spider,
}
// 获取gfFile
gfFile2 := model.GetGridFs(spider.FileId)
// 生成MD5
spiderSync.CreateMd5File(gfFile2.Md5)
// 检查是否为 Scrapy 爬虫
spiderSync.CheckIsScrapy()
return nil
}
// 上传zip文件到GridFS
func UploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err error) {
fid = ""
// 获取MongoDB GridFS连接
s, gf := database.GetGridFs("files")
defer s.Close()
// 创建一个新GridFS文件
f, err := gf.Create(fileName)
if err != nil {
log.Errorf("create file error: " + err.Error())
debug.PrintStack()
return
}
// 分片读取爬虫zip文件
err = ReadFileByStep(filePath, WriteToGridFS, f)
if err != nil {
log.Errorf("read file by step error: " + err.Error())
debug.PrintStack()
return "", err
}
// 删除zip文件
if err = os.Remove(filePath); err != nil {
log.Errorf("remove file error: " + err.Error())
debug.PrintStack()
return
}
// 关闭文件,提交写入
if err = f.Close(); err != nil {
log.Errorf("close file error: " + err.Error())
debug.PrintStack()
return "", err
}
// 文件ID
fid = f.Id().(bson.ObjectId)
return fid, nil
}
// 带重试功能的上传至 GridFS
func RetryUploadToGridFs(fileName string, filePath string) (fid bson.ObjectId, err error) {
maxErrCount := 10
errCount := 0
for {
if errCount > maxErrCount {
break
}
fid, err = UploadToGridFs(fileName, filePath)
if err != nil {
errCount++
log.Errorf("upload to grid fs error: %s", err.Error())
time.Sleep(3 * time.Second)
continue
}
return fid, nil
}
return fid, errors.New("unable to upload to gridfs, please re-upload the spider")
}
// 写入grid fs
func WriteToGridFS(content []byte, f *mgo.GridFile) {
if _, err := f.Write(content); err != nil {
debug.PrintStack()
return
}
}
//分片读取大文件
func ReadFileByStep(filePath string, handle func([]byte, *mgo.GridFile), fileCreate *mgo.GridFile) error {
f, err := os.OpenFile(filePath, os.O_RDONLY, 0777)
if err != nil {
log.Infof("can't opened this file")
return err
}
defer utils.Close(f)
s := make([]byte, 4096)
for {
switch nr, err := f.Read(s[:]); true {
case nr < 0:
_, _ = fmt.Fprintf(os.Stderr, "cat: error reading: %s\n", err.Error())
debug.PrintStack()
case nr == 0: // EOF
return nil
case nr > 0:
handle(s[0:nr], fileCreate)
}
}
}
// 发布所有爬虫
func PublishAllSpiders() {
// 获取爬虫列表
spiders, _, _ := model.GetSpiderList(nil, 0, constants.Infinite, "-_id")
if len(spiders) == 0 {
return
}
log.Infof("start sync spider to local, total: %d", len(spiders))
// 遍历爬虫列表
for _, spider := range spiders {
// 异步发布爬虫
go func(s model.Spider) {
PublishSpider(s)
}(spider)
}
}
// 发布爬虫
func PublishSpider(spider model.Spider) {
var gfFile *model.GridFs
if spider.FileId.Hex() != constants.ObjectIdNull {
// 查询gf file不存在则标记为爬虫文件不存在
gfFile = model.GetGridFs(spider.FileId)
if gfFile == nil {
log.Errorf("get grid fs file error: cannot find grid fs file")
log.Errorf("grid fs file_id: " + spider.FileId.Hex())
log.Errorf("spider_name: " + spider.Name)
debug.PrintStack()
//spider.FileId = constants.ObjectIdNull
//if err := spider.Save(); err != nil {
// return
//}
return
}
}
// 如果FileId为空表示还没有上传爬虫到GridFS则跳过
if spider.FileId == bson.ObjectIdHex(constants.ObjectIdNull) {
return
}
// 获取爬虫同步实例
spiderSync := spider_handler.SpiderSync{
Spider: spider,
}
// 安装依赖
if viper.GetString("setting.autoInstall") == "Y" {
go spiderSync.InstallDeps()
}
//目录不存在,则直接下载
path := filepath.Join(viper.GetString("spider.path"), spider.Name)
if !utils.Exists(path) {
log.Infof("path not found: %s", path)
spiderSync.Download()
spiderSync.CreateMd5File(gfFile.Md5)
spiderSync.CheckIsScrapy()
return
}
// md5文件不存在则下载
md5 := filepath.Join(path, spider_handler.Md5File)
if !utils.Exists(md5) {
log.Infof("md5 file not found: %s", md5)
spiderSync.RemoveDownCreate(gfFile.Md5)
return
}
// md5值不一样则下载
md5Str := utils.GetSpiderMd5Str(md5)
if gfFile.Md5 != md5Str {
log.Infof("md5 is different, gf-md5:%s, file-md5:%s", gfFile.Md5, md5Str)
spiderSync.RemoveDownCreate(gfFile.Md5)
return
}
}
func RemoveSpider(id string) error {
// 获取该爬虫
spider, err := model.GetSpider(bson.ObjectIdHex(id))
if err != nil {
return err
}
// 删除爬虫文件目录
path := filepath.Join(viper.GetString("spider.path"), spider.Name)
utils.RemoveFiles(path)
// 删除其他节点的爬虫目录
//msg := entity.NodeMessage{
// Type: constants.MsgTypeRemoveSpider,
// SpiderId: id,
//}
//if err := database.Pub(constants.ChannelAllNode, msg); err != nil {
// return err
//}
// 从数据库中删除该爬虫
if err := model.RemoveSpider(bson.ObjectIdHex(id)); err != nil {
return err
}
// 删除日志文件
if err := RemoveLogBySpiderId(spider.Id); err != nil {
return err
}
// 删除爬虫对应的task任务
if err := model.RemoveTaskBySpiderId(spider.Id); err != nil {
return err
}
// TODO 定时任务如何处理
return nil
}
func CancelSpider(id string) error {
// 获取该爬虫
spider, err := model.GetSpider(bson.ObjectIdHex(id))
if err != nil {
return err
}
// 获取该爬虫待定或运行中的任务列表
query := bson.M{
"spider_id": spider.Id,
"status": bson.M{
"$in": []string{
constants.StatusPending,
constants.StatusRunning,
},
},
}
tasks, err := model.GetTaskList(query, 0, constants.Infinite, "-create_ts")
if err != nil {
return err
}
// 遍历任务列表,依次停止
for _, task := range tasks {
if err := CancelTask(task.Id); err != nil {
return err
}
}
return nil
}
func cloneGridFsFile(spider model.Spider, newName string) (err error) {
// 构造新爬虫
newSpider := spider
newSpider.Id = bson.NewObjectId()
newSpider.Name = newName
newSpider.DisplayName = newName
newSpider.Src = path.Join(path.Dir(spider.Src), newName)
newSpider.CreateTs = time.Now()
newSpider.UpdateTs = time.Now()
// GridFS连接实例
s, gf := database.GetGridFs("files")
defer s.Close()
// 被克隆爬虫的GridFS文件
f, err := gf.OpenId(spider.FileId)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 新爬虫的GridFS文件
fNew, err := gf.Create(newSpider.Name + ".zip")
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 生成唯一ID
randomId := uuid.NewV4()
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.MkdirAll(tmpPath, 0777); err != nil {
log.Errorf("mkdir other.tmppath error: %v", err.Error())
return err
}
}
// 创建临时文件
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
tmpFile := utils.OpenFile(tmpFilePath)
// 拷贝到临时文件
if _, err := io.Copy(tmpFile, f); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 关闭临时文件
if err := tmpFile.Close(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 读取内容
fContent, err := ioutil.ReadFile(tmpFilePath)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 写入GridFS文件
if _, err := fNew.Write(fContent); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 关闭被克隆爬虫GridFS文件
if err = f.Close(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 将新爬虫文件复制
newSpider.FileId = fNew.Id().(bson.ObjectId)
// 保存新爬虫
if err := newSpider.Add(); err != nil {
return err
}
// 关闭新爬虫GridFS文件
if err := fNew.Close(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 删除临时文件
if err := os.RemoveAll(tmpFilePath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return err
}
// 同步爬虫
PublishSpider(newSpider)
return nil
}
func CopySpider(spider model.Spider, newName string) error {
// 克隆GridFS文件
if err := cloneGridFsFile(spider, newName); err != nil {
return err
}
return nil
}
func UpdateSpiderDedup(spider model.Spider) error {
col := utils.GetSpiderCol(spider.Col, spider.Name)
s, c := database.GetCol(col)
defer s.Close()
if !spider.IsDedup {
_ = c.DropIndex(spider.DedupField)
//if err := c.DropIndex(spider.DedupField); err != nil {
// return err
//}
return nil
}
if err := c.EnsureIndex(mgo.Index{
Key: []string{spider.DedupField},
Unique: true,
}); err != nil {
return err
}
return nil
}
func InitDemoSpiders() {
// 添加Demo爬虫
templateSpidersDir := "./template/spiders"
for _, info := range utils.ListDir(templateSpidersDir) {
if !info.IsDir() {
continue
}
spiderName := info.Name()
// 如果爬虫在数据库中不存在,则添加
spider := model.GetSpiderByName(spiderName)
if spider.Name != "" {
// 存在同名爬虫,跳过
continue
}
// 拷贝爬虫
templateSpiderPath := path.Join(templateSpidersDir, spiderName)
spiderPath := path.Join(viper.GetString("spider.path"), spiderName)
if utils.Exists(spiderPath) {
utils.RemoveFiles(spiderPath)
}
if err := utils.CopyDir(templateSpiderPath, spiderPath); err != nil {
log.Errorf("copy error: " + err.Error())
debug.PrintStack()
continue
}
// 构造配置数据
configData := entity.ConfigSpiderData{}
// 读取YAML文件
yamlFile, err := ioutil.ReadFile(path.Join(spiderPath, "Spiderfile"))
if err != nil {
log.Errorf("read yaml error: " + err.Error())
//debug.PrintStack()
continue
}
// 反序列化
if err := yaml.Unmarshal(yamlFile, &configData); err != nil {
log.Errorf("unmarshal error: " + err.Error())
debug.PrintStack()
continue
}
if configData.Type == constants.Customized {
// 添加该爬虫到数据库
spider = model.Spider{
Id: bson.NewObjectId(),
Name: spiderName,
DisplayName: configData.DisplayName,
Type: constants.Customized,
Col: configData.Col,
Src: spiderPath,
Remark: configData.Remark,
ProjectId: bson.ObjectIdHex(constants.ObjectIdNull),
FileId: bson.ObjectIdHex(constants.ObjectIdNull),
Cmd: configData.Cmd,
UserId: bson.ObjectIdHex(constants.ObjectIdNull),
}
if err := spider.Add(); err != nil {
log.Errorf("add spider error: " + err.Error())
debug.PrintStack()
continue
}
// 上传爬虫到GridFS
if err := UploadSpiderToGridFsFromMaster(spider); err != nil {
log.Errorf("upload spider error: " + err.Error())
debug.PrintStack()
continue
}
} else if configData.Type == constants.Configurable || configData.Type == "config" {
// 添加该爬虫到数据库
spider = model.Spider{
Id: bson.NewObjectId(),
Name: configData.Name,
DisplayName: configData.DisplayName,
Type: constants.Configurable,
Col: configData.Col,
Src: spiderPath,
Remark: configData.Remark,
ProjectId: bson.ObjectIdHex(constants.ObjectIdNull),
FileId: bson.ObjectIdHex(constants.ObjectIdNull),
Config: configData,
UserId: bson.ObjectIdHex(constants.ObjectIdNull),
}
if err := spider.Add(); err != nil {
log.Errorf("add spider error: " + err.Error())
debug.PrintStack()
continue
}
// 根据序列化后的数据处理爬虫文件
if err := ProcessSpiderFilesFromConfigData(spider, configData); err != nil {
log.Errorf("add spider error: " + err.Error())
debug.PrintStack()
continue
}
}
}
// 发布所有爬虫
PublishAllSpiders()
}
// 启动爬虫服务
func InitSpiderService() error {
// 构造定时任务执行器
cPub := cron.New(cron.WithSeconds())
if _, err := cPub.AddFunc("0 * * * * *", PublishAllSpiders); err != nil {
return err
}
// 启动定时任务
cPub.Start()
if model.IsMaster() && viper.GetString("setting.demoSpiders") == "Y" {
// 初始化Demo爬虫
InitDemoSpiders()
}
if model.IsMaster() {
// 构造 Git 定时任务
GitCron = &GitCronScheduler{
cron: cron.New(cron.WithSeconds()),
}
// 启动 Git 定时任务
if err := GitCron.Start(); err != nil {
return err
}
// 清理UserId
InitSpiderCleanUserIds()
}
return nil
}

View File

@@ -1,250 +0,0 @@
package spider_handler
import (
"crawlab/constants"
"crawlab/database"
"crawlab/model"
"crawlab/services/local_node"
"crawlab/utils"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"github.com/satori/go.uuid"
"github.com/spf13/viper"
"io"
"os"
"os/exec"
"path"
"path/filepath"
"runtime/debug"
"strings"
"sync"
)
const (
Md5File = "md5.txt"
)
type SpiderSync struct {
Spider model.Spider
}
func (s *SpiderSync) CreateMd5File(md5 string) {
path := filepath.Join(viper.GetString("spider.path"), s.Spider.Name)
utils.CreateDirPath(path)
fileName := filepath.Join(path, Md5File)
file := utils.OpenFile(fileName)
defer utils.Close(file)
if file != nil {
if _, err := file.WriteString(md5 + "\n"); err != nil {
log.Errorf("file write string error: %s", err.Error())
debug.PrintStack()
}
}
}
func (s *SpiderSync) CheckIsScrapy() {
if s.Spider.Type == constants.Configurable {
return
}
if viper.GetString("setting.checkScrapy") != "Y" {
return
}
s.Spider.IsScrapy = utils.Exists(path.Join(s.Spider.Src, "scrapy.cfg"))
if s.Spider.IsScrapy {
s.Spider.Cmd = "scrapy crawl"
}
if err := s.Spider.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
}
func (s *SpiderSync) AfterRemoveDownCreate() {
if model.IsMaster() {
s.CheckIsScrapy()
}
}
func (s *SpiderSync) RemoveDownCreate(md5 string) {
s.RemoveSpiderFile()
s.Download()
s.CreateMd5File(md5)
s.AfterRemoveDownCreate()
}
// 获得下载锁的key
func (s *SpiderSync) GetLockDownloadKey(spiderId string) string {
//node, _ := model.GetCurrentNode()
node := local_node.CurrentNode()
return node.Id.Hex() + "#" + spiderId
}
// 删除本地文件
func (s *SpiderSync) RemoveSpiderFile() {
path := filepath.Join(
viper.GetString("spider.path"),
s.Spider.Name,
)
//爬虫文件有变化,先删除本地文件
if err := os.RemoveAll(path); err != nil {
log.Errorf("remove spider files error: %s, path: %s", err.Error(), path)
debug.PrintStack()
}
}
// 检测是否已经下载中
func (s *SpiderSync) CheckDownLoading(spiderId string, fileId string) (bool, string) {
key := s.GetLockDownloadKey(spiderId)
key2, err := database.RedisClient.HGet("spider", key)
if err != nil {
return false, key2
}
if key2 == "" {
return false, key2
}
return true, key2
}
// 下载爬虫
func (s *SpiderSync) Download() {
spiderId := s.Spider.Id.Hex()
fileId := s.Spider.FileId.Hex()
isDownloading, key := s.CheckDownLoading(spiderId, fileId)
if isDownloading {
log.Infof(fmt.Sprintf("spider is already being downloaded, spider id: %s", s.Spider.Id.Hex()))
return
} else {
_ = database.RedisClient.HSet("spider", key, key)
}
session, gf := database.GetGridFs("files")
defer session.Close()
f, err := gf.OpenId(bson.ObjectIdHex(fileId))
defer utils.Close(f)
if err != nil {
log.Errorf("open file id: " + fileId + ", spider id:" + spiderId + ", error: " + err.Error())
debug.PrintStack()
return
}
// 生成唯一ID
randomId := uuid.NewV4()
tmpPath := viper.GetString("other.tmppath")
if !utils.Exists(tmpPath) {
if err := os.MkdirAll(tmpPath, 0777); err != nil {
log.Errorf("mkdir other.tmppath error: %v", err.Error())
return
}
}
// 创建临时文件
tmpFilePath := filepath.Join(tmpPath, randomId.String()+".zip")
tmpFile := utils.OpenFile(tmpFilePath)
// 将该文件写入临时文件
if _, err := io.Copy(tmpFile, f); err != nil {
log.Errorf("copy file error: %s, file_id: %s", err.Error(), f.Id())
debug.PrintStack()
return
}
// 解压缩临时文件到目标文件夹
dstPath := filepath.Join(
viper.GetString("spider.path"),
s.Spider.Name,
)
if err := utils.DeCompress(tmpFile, dstPath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
//递归修改目标文件夹权限
// 解决scrapy.setting中开启LOG_ENABLED 和 LOG_FILE时不能创建log文件的问题
cmd := exec.Command("chmod", "-R", "777", dstPath)
if err := cmd.Run(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
// 关闭临时文件
if err := tmpFile.Close(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
// 删除临时文件
if err := os.Remove(tmpFilePath); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
_ = database.RedisClient.HDel("spider", key)
}
// locks for dependency installation
var installLockMap sync.Map
// install dependencies
func (s *SpiderSync) InstallDeps() {
langs := utils.GetLangList()
for _, l := range langs {
// no dep file name is found, skip
if l.DepFileName == "" {
continue
}
// being locked, i.e. installation is running, skip
key := s.Spider.Name + "|" + l.Name
_, locked := installLockMap.Load(key)
if locked {
continue
}
// no dep file found, skip
if !utils.Exists(path.Join(s.Spider.Src, l.DepFileName)) {
continue
}
// no dep install executable found, skip
if !utils.Exists(l.DepExecutablePath) {
continue
}
// lock
installLockMap.Store(key, true)
// command to install dependencies
cmd := exec.Command(l.DepExecutablePath, strings.Split(l.InstallDepArgs, " ")...)
// working directory
cmd.Dir = s.Spider.Src
// compatibility with node.js
if l.ExecutableName == constants.Nodejs {
deps, err := utils.GetPackageJsonDeps(path.Join(s.Spider.Src, l.DepFileName))
if err != nil {
continue
}
cmd = exec.Command(l.DepExecutablePath, strings.Split(l.InstallDepArgs+" "+strings.Join(deps, " "), " ")...)
}
// start executing command
output, err := cmd.Output()
if err != nil {
log.Errorf("install dep error: " + err.Error())
log.Errorf(string(output))
debug.PrintStack()
}
// unlock
installLockMap.Delete(key)
}
}

View File

@@ -1,53 +0,0 @@
package spider_handler
import (
"crawlab/config"
"crawlab/database"
"crawlab/model"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"runtime/debug"
"testing"
)
var s SpiderSync
func init() {
if err := config.InitConfig("../../conf/config.yml"); err != nil {
log.Fatal("Init config failed")
}
log.Infof("初始化配置成功")
// 初始化Mongodb数据库
if err := database.InitMongo(); err != nil {
log.Error("init mongodb error:" + err.Error())
debug.PrintStack()
panic(err)
}
log.Info("初始化Mongodb数据库成功")
// 初始化Redis数据库
if err := database.InitRedis(); err != nil {
log.Error("init redis error:" + err.Error())
debug.PrintStack()
panic(err)
}
log.Info("初始化Redis数据库成功")
s = SpiderSync{
Spider: model.Spider{
Id: bson.ObjectIdHex("5d8d8326bc3c4f000186e5df"),
Name: "scrapy-pre_sale",
FileId: bson.ObjectIdHex("5d8d8326bc3c4f000186e5db"),
Src: "/opt/crawlab/spiders/scrapy-pre_sale",
},
}
}
func TestSpiderSync_CreateMd5File(t *testing.T) {
s.CreateMd5File("this is md5")
}
func TestSpiderSync_Download(t *testing.T) {
s.Download()
}

View File

@@ -1,30 +0,0 @@
// +build !windows
package sys_exec
import (
"os/exec"
"syscall"
)
func BuildCmd(cmdStr string) *exec.Cmd {
return exec.Command("sh", "-c", cmdStr)
}
func Setpgid(cmd *exec.Cmd) {
if cmd == nil {
return
}
if cmd.SysProcAttr == nil {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
} else {
cmd.SysProcAttr.Setpgid = true
}
}
func KillProcess(cmd *exec.Cmd) error {
if cmd == nil {
return nil
}
return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
}

View File

@@ -1,24 +0,0 @@
// +build windows
package sys_exec
import (
"os/exec"
)
func BuildCmd(cmdStr string) *exec.Cmd {
return exec.Command("cmd", "/C", cmdStr)
}
func Setpgid(cmd *exec.Cmd) {
return
}
func KillProcess(cmd *exec.Cmd) error {
if cmd != nil && cmd.Process != nil {
if err := cmd.Process.Kill(); err != nil {
return err
}
}
return nil
}

View File

@@ -1,393 +0,0 @@
package services
import (
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/rpc"
"crawlab/utils"
"encoding/json"
"errors"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/imroc/req"
"os/exec"
"regexp"
"runtime/debug"
"sort"
"strings"
"sync"
)
// 系统信息 chan 映射
var SystemInfoChanMap = utils.NewChanMap()
// 从远端获取系统信息
func GetRemoteSystemInfo(nodeId string) (sysInfo entity.SystemInfo, err error) {
// 发送消息
msg := entity.NodeMessage{
Type: constants.MsgTypeGetSystemInfo,
NodeId: nodeId,
}
// 序列化
msgBytes, _ := json.Marshal(&msg)
if _, err := database.RedisClient.Publish("nodes:"+nodeId, utils.BytesToString(msgBytes)); err != nil {
return entity.SystemInfo{}, err
}
// 通道
ch := SystemInfoChanMap.ChanBlocked(nodeId)
// 等待响应,阻塞
sysInfoStr := <-ch
// 反序列化
if err := json.Unmarshal([]byte(sysInfoStr), &sysInfo); err != nil {
return sysInfo, err
}
return sysInfo, nil
}
// 获取系统信息
func GetSystemInfo(nodeId string) (sysInfo entity.SystemInfo, err error) {
if IsMasterNode(nodeId) {
sysInfo, err = rpc.GetSystemInfoServiceLocal()
} else {
sysInfo, err = rpc.GetSystemInfoServiceRemote(nodeId)
}
return
}
// 获取语言列表
func GetLangList(nodeId string) []entity.Lang {
list := utils.GetLangList()
for i, lang := range list {
status, _ := GetLangInstallStatus(nodeId, lang)
list[i].InstallStatus = status
}
return list
}
// 获取语言安装状态
func GetLangInstallStatus(nodeId string, lang entity.Lang) (string, error) {
_, err := model.GetTaskByFilter(bson.M{
"node_id": nodeId,
"cmd": fmt.Sprintf("sh %s", utils.GetSystemScriptPath(lang.InstallScript)),
"status": bson.M{
"$in": []string{constants.StatusPending, constants.StatusRunning},
},
})
if err == nil {
// 任务正在运行,正在安装
return constants.InstallStatusInstalling, nil
}
if err != mgo.ErrNotFound {
// 发生错误
return "", err
}
// 获取状态
if IsMasterNode(nodeId) {
lang := rpc.GetLangLocal(lang)
return lang.InstallStatus, nil
} else {
lang, err := rpc.GetLangRemote(nodeId, lang)
if err != nil {
return "", err
}
return lang.InstallStatus, nil
}
}
// 是否已安装该依赖
func IsInstalledDep(installedDepList []entity.Dependency, dep entity.Dependency) bool {
for _, _dep := range installedDepList {
if strings.ToLower(_dep.Name) == strings.ToLower(dep.Name) {
return true
}
}
return false
}
// ========Python========
// 初始化函数
func InitDepsFetcher() error {
c := cron.New(cron.WithSeconds())
c.Start()
if _, err := c.AddFunc("0 */5 * * * *", UpdatePythonDepList); err != nil {
return err
}
go func() {
UpdatePythonDepList()
}()
return nil
}
type PythonDepJsonData struct {
Info PythonDepJsonDataInfo `json:"info"`
}
type PythonDepJsonDataInfo struct {
Name string `json:"name"`
Summary string `json:"summary"`
Version string `json:"version"`
}
type PythonDepNameDict struct {
Name string `json:"name"`
Weight int `json:"weight"`
}
type PythonDepNameDictSlice []PythonDepNameDict
func (s PythonDepNameDictSlice) Len() int { return len(s) }
func (s PythonDepNameDictSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s PythonDepNameDictSlice) Less(i, j int) bool { return s[i].Weight > s[j].Weight }
// 获取Python本地依赖列表
func GetPythonDepList(nodeId string, searchDepName string) ([]entity.Dependency, error) {
var list []entity.Dependency
// 先从 Redis 获取
depList, err := GetPythonDepListFromRedis()
if err != nil {
return list, err
}
// 过滤相似的依赖
var depNameList PythonDepNameDictSlice
for _, depName := range depList {
if strings.HasPrefix(strings.ToLower(depName), strings.ToLower(searchDepName)) {
var weight int
if strings.ToLower(depName) == strings.ToLower(searchDepName) {
weight = 3
} else if strings.HasPrefix(strings.ToLower(depName), strings.ToLower(searchDepName)) {
weight = 2
} else {
weight = 1
}
depNameList = append(depNameList, PythonDepNameDict{
Name: depName,
Weight: weight,
})
}
}
// 获取已安装依赖列表
var installedDepList []entity.Dependency
if IsMasterNode(nodeId) {
installedDepList, err = rpc.GetInstalledDepsLocal(constants.Python)
if err != nil {
return list, err
}
} else {
installedDepList, err = rpc.GetInstalledDepsRemote(nodeId, constants.Python)
if err != nil {
return list, err
}
}
// 根据依赖名排序
sort.Stable(depNameList)
// 遍历依赖名列表取前20个
for i, depNameDict := range depNameList {
if i > 20 {
break
}
dep := entity.Dependency{
Name: depNameDict.Name,
}
dep.Installed = IsInstalledDep(installedDepList, dep)
list = append(list, dep)
}
// 从依赖源获取信息
//list, err = GetPythonDepListWithInfo(list)
return list, nil
}
// 获取Python依赖的源数据信息
func GetPythonDepListWithInfo(depList []entity.Dependency) ([]entity.Dependency, error) {
var goSync sync.WaitGroup
for i, dep := range depList {
if i > 10 {
break
}
goSync.Add(1)
go func(i int, dep entity.Dependency, depList []entity.Dependency, n *sync.WaitGroup) {
url := fmt.Sprintf("https://pypi.org/pypi/%s/json", dep.Name)
res, err := req.Get(url)
if err != nil {
n.Done()
return
}
var data PythonDepJsonData
if err := res.ToJSON(&data); err != nil {
n.Done()
return
}
depList[i].Version = data.Info.Version
depList[i].Description = data.Info.Summary
n.Done()
}(i, dep, depList, &goSync)
}
goSync.Wait()
return depList, nil
}
func FetchPythonDepInfo(depName string) (entity.Dependency, error) {
url := fmt.Sprintf("https://pypi.org/pypi/%s/json", depName)
res, err := req.Get(url)
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return entity.Dependency{}, err
}
var data PythonDepJsonData
if res.Response().StatusCode == 404 {
return entity.Dependency{}, errors.New("get depName from [https://pypi.org] error: 404")
}
if err := res.ToJSON(&data); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return entity.Dependency{}, err
}
dep := entity.Dependency{
Name: depName,
Version: data.Info.Version,
Description: data.Info.Summary,
}
return dep, nil
}
// 从Redis获取Python依赖列表
func GetPythonDepListFromRedis() ([]string, error) {
var list []string
// 从 Redis 获取字符串
rawData, err := database.RedisClient.HGet("system", "deps:python")
if err != nil {
return list, err
}
// 反序列化
if err := json.Unmarshal([]byte(rawData), &list); err != nil {
return list, err
}
// 如果为空,则从依赖源获取列表
if len(list) == 0 {
UpdatePythonDepList()
}
return list, nil
}
// 从Python依赖源获取依赖列表并返回
func FetchPythonDepList() ([]string, error) {
// 依赖URL
url := "https://pypi.tuna.tsinghua.edu.cn/simple"
// 输出列表
var list []string
// 请求URL
res, err := req.Get(url)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return list, err
}
// 获取响应数据
text, err := res.ToString()
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return list, err
}
// 从响应数据中提取依赖名
regex := regexp.MustCompile("<a href=\".*/\">(.*)</a>")
for _, line := range strings.Split(text, "\n") {
arr := regex.FindStringSubmatch(line)
if len(arr) < 2 {
continue
}
list = append(list, arr[1])
}
// 赋值给列表
return list, nil
}
// 更新Python依赖列表到Redis
func UpdatePythonDepList() {
// 从依赖源获取列表
list, _ := FetchPythonDepList()
// 序列化
listBytes, err := json.Marshal(list)
if err != nil {
log.Error(err.Error())
debug.PrintStack()
return
}
// 设置Redis
if err := database.RedisClient.HSet("system", "deps:python", string(listBytes)); err != nil {
log.Error(err.Error())
debug.PrintStack()
return
}
}
// ========./Python========
// ========Node.js========
// 获取Nodejs本地依赖列表
func GetNodejsDepList(nodeId string, searchDepName string) (depList []entity.Dependency, err error) {
// 执行shell命令
cmd := exec.Command("npm", "search", "--json", searchDepName)
outputBytes, _ := cmd.Output()
// 获取已安装依赖列表
var installedDepList []entity.Dependency
if IsMasterNode(nodeId) {
installedDepList, err = rpc.GetInstalledDepsLocal(constants.Nodejs)
if err != nil {
return depList, err
}
} else {
installedDepList, err = rpc.GetInstalledDepsRemote(nodeId, constants.Nodejs)
if err != nil {
return depList, err
}
}
// 反序列化
if err := json.Unmarshal(outputBytes, &depList); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return depList, err
}
// 遍历安装列表
for i, dep := range depList {
depList[i].Installed = IsInstalledDep(installedDepList, dep)
}
return depList, nil
}
// ========./Node.js========

File diff suppressed because it is too large Load Diff

View File

@@ -1,133 +0,0 @@
package services
import (
"crawlab/constants"
"crawlab/model"
"crawlab/utils"
"errors"
"github.com/dgrijalva/jwt-go"
"github.com/gin-gonic/gin"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"strings"
"time"
)
func InitUserService() error {
_ = CreateNewUser("admin", "admin", constants.RoleAdmin, "", bson.ObjectIdHex(constants.ObjectIdNull))
return nil
}
func MakeToken(user *model.User) (tokenStr string, err error) {
token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{
"id": user.Id,
"username": user.Username,
"nbf": time.Now().Unix(),
})
return token.SignedString([]byte(viper.GetString("server.secret")))
}
//func GetToken(username string) (tokenStr string, err error) {
// user, err := model.GetUserByUsername(username)
// if err != nil {
// log.Errorf(err.Error())
// debug.PrintStack()
// return
// }
//
// token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{
// "id": user.Id,
// "username": user.Username,
// "nbf": time.Now().Unix(),
// })
//
// tokenStr, err = token.SignedString([]byte(viper.GetString("server.secret")))
// if err != nil {
// return
// }
// return
//}
func SecretFunc() jwt.Keyfunc {
return func(token *jwt.Token) (interface{}, error) {
return []byte(viper.GetString("server.secret")), nil
}
}
func CheckToken(tokenStr string) (user model.User, err error) {
token, err := jwt.Parse(tokenStr, SecretFunc())
if err != nil {
return
}
claim, ok := token.Claims.(jwt.MapClaims)
if !ok {
err = errors.New("cannot convert claim to mapclaim")
return
}
//验证token如果token被修改过则为false
if !token.Valid {
err = errors.New("token is invalid")
return
}
id := bson.ObjectIdHex(claim["id"].(string))
username := claim["username"].(string)
user, err = model.GetUser(id)
if err != nil {
err = errors.New("cannot get user")
return
}
if username != user.Username {
err = errors.New("username does not match")
return
}
return
}
func CreateNewUser(username string, password string, role string, email string, uid bson.ObjectId) error {
user := model.User{
Username: strings.ToLower(username),
Password: utils.EncryptPassword(password),
Role: role,
Email: email,
UserId: uid,
Setting: model.UserSetting{
NotificationTrigger: constants.NotificationTriggerNever,
EnabledNotifications: []string{
constants.NotificationTypeMail,
constants.NotificationTypeDingTalk,
constants.NotificationTypeWechat,
},
},
}
if err := user.Add(); err != nil {
return err
}
return nil
}
func GetCurrentUser(c *gin.Context) *model.User {
data, _ := c.Get(constants.ContextUser)
if data == nil {
return &model.User{}
}
return data.(*model.User)
}
func GetCurrentUserId(c *gin.Context) bson.ObjectId {
return GetCurrentUser(c).Id
}
func GetAdminUser() (user *model.User, err error) {
u, err := model.GetUserByUsername("admin")
if err != nil {
return user, err
}
return &u, nil
}

View File

@@ -1,29 +0,0 @@
package services
import (
"crawlab/entity"
"github.com/apex/log"
"github.com/imroc/req"
"runtime/debug"
"sort"
)
func GetLatestRelease() (release entity.Release, err error) {
res, err := req.Get("https://api.github.com/repos/crawlab-team/crawlab/releases")
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return release, err
}
var releaseDataList entity.ReleaseSlices
if err := res.ToJSON(&releaseDataList); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return release, err
}
sort.Sort(releaseDataList)
return releaseDataList[len(releaseDataList)-1], nil
}