加入Git同步

This commit is contained in:
marvzhang
2020-02-18 12:15:40 +08:00
parent 3bb1fe2a74
commit 2ce289c8f3
9 changed files with 296 additions and 11 deletions

View File

@@ -247,7 +247,8 @@ func main() {
// 文件
authGroup.GET("/file", routes.GetFile) // 获取文件
// Git
authGroup.GET("/git/branches", routes.GetGitBranches) // 获取 Git 分支
authGroup.GET("/git/branches", routes.GetGitBranches) // 获取 Git 分支
authGroup.GET("/git/public-key", routes.GetGitSshPublicKey) // 获取 SSH 公钥
}
}

View File

@@ -53,6 +53,7 @@ type Spider struct {
GitPassword string `json:"git_password" bson:"git_password"` // Git 密码
GitAutoSync bool `json:"git_auto_sync" bson:"git_auto_sync"` // Git 是否自动同步
GitSyncFrequency string `json:"git_sync_frequency" bson:"git_sync_frequency"` // Git 同步频率
GitSyncError string `json:"git_sync_error" bson:"git_sync_error"` // Git 同步错误
// 前端展示
LastRunTs time.Time `json:"last_run_ts"` // 最后一次执行时间
@@ -166,6 +167,15 @@ func GetSpiderList(filter interface{}, skip int, limit int, sortStr string) ([]S
return spiders, count, nil
}
// 获取所有爬虫列表
func GetSpiderAllList(filter interface{}) (spiders []Spider, err error) {
spiders, _, err = GetSpiderList(filter, 0, constants.Infinite, "_id")
if err != nil {
return spiders, err
}
return spiders, nil
}
// 获取爬虫(根据FileId)
func GetSpiderByFileId(fileId bson.ObjectId) *Spider {
s, c := database.GetCol("spiders")

View File

@@ -19,3 +19,11 @@ func GetGitBranches(c *gin.Context) {
})
}
func GetGitSshPublicKey(c *gin.Context) {
content := services.GetGitSshPublicKey()
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
Data: content,
})
}

View File

@@ -119,6 +119,12 @@ func PostSpider(c *gin.Context) {
return
}
// 更新 GitCron
if err := services.GitCron.Update(); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
@@ -197,6 +203,12 @@ func PutSpider(c *gin.Context) {
return
}
// 更新 GitCron
if err := services.GitCron.Update(); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",
@@ -434,6 +446,12 @@ func DeleteSpider(c *gin.Context) {
return
}
// 更新 GitCron
if err := services.GitCron.Update(); err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
c.JSON(http.StatusOK, Response{
Status: "ok",
Message: "success",

View File

@@ -2,13 +2,18 @@ package services
import (
"bytes"
"crawlab/lib/cron"
"crawlab/model"
"crawlab/services/spider_handler"
"crawlab/utils"
"fmt"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"gopkg.in/src-d/go-git.v4"
"gopkg.in/src-d/go-git.v4/config"
"gopkg.in/src-d/go-git.v4/plumbing"
"gopkg.in/src-d/go-git.v4/plumbing/transport/ssh"
"io/ioutil"
"net/url"
"os"
"os/exec"
@@ -18,6 +23,21 @@ import (
"strings"
)
var GitCron *GitCronScheduler
type GitCronScheduler struct {
cron *cron.Cron
}
func SaveSpiderGitSyncError(s model.Spider, errMsg string) {
s.GitSyncError = errMsg
if err := s.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
}
func GetGitBranches(url string) (branches []string, err error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
@@ -81,6 +101,7 @@ func SyncSpiderGit(s model.Spider) (err error) {
if err != nil {
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
@@ -89,6 +110,7 @@ func SyncSpiderGit(s model.Spider) (err error) {
if s.GitUsername != "" && s.GitPassword != "" {
u, err := url.Parse(s.GitUrl)
if err != nil {
SaveSpiderGitSyncError(s, err.Error())
return err
}
gitUrl = fmt.Sprintf(
@@ -110,13 +132,23 @@ func SyncSpiderGit(s model.Spider) (err error) {
if err != nil {
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 生成验证信息
var auth ssh.AuthMethod
if !strings.HasPrefix(s.GitUrl, "http") {
// 为 SSH
u, _ := url.Parse(s.GitUrl)
auth, _ = ssh.NewPublicKeysFromFile(u.User.String(), path.Join(os.Getenv("HOME"), ".ssh", "id_rsa"), "")
}
// 获取 repo
_ = repo.Fetch(&git.FetchOptions{
RemoteName: "origin",
Force: true,
Auth: auth,
})
// 获得 WorkTree
@@ -124,15 +156,23 @@ func SyncSpiderGit(s model.Spider) (err error) {
if err != nil {
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 拉取 repo
if err := wt.Pull(&git.PullOptions{
RemoteName: "origin",
Auth: auth,
}); err != nil {
if err.Error() == "already up-to-date" {
// 如果没有错误,则保存空字符串
SaveSpiderGitSyncError(s, "")
return nil
}
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
@@ -142,13 +182,127 @@ func SyncSpiderGit(s model.Spider) (err error) {
}); err != nil {
log.Error(err.Error())
debug.PrintStack()
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 同步到GridFS
if err := UploadSpiderToGridFsFromMaster(s); err != nil {
SaveSpiderGitSyncError(s, err.Error())
return err
}
// 检查是否为 Scrapy
sync := spider_handler.SpiderSync{Spider: s}
sync.CheckIsScrapy()
// 如果没有错误,则保存空字符串
SaveSpiderGitSyncError(s, "")
return nil
}
func (g *GitCronScheduler) Start() error {
c := cron.New(cron.WithSeconds())
// 启动cron服务
g.cron.Start()
// 更新任务列表
if err := g.Update(); err != nil {
log.Errorf("update scheduler error: %s", err.Error())
debug.PrintStack()
return err
}
// 每30秒更新一次任务列表
spec := "*/30 * * * * *"
if _, err := c.AddFunc(spec, UpdateGitCron); err != nil {
log.Errorf("add func update schedulers error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}
func (g *GitCronScheduler) RemoveAll() {
entries := g.cron.Entries()
for i := 0; i < len(entries); i++ {
g.cron.Remove(entries[i].ID)
}
}
func (g *GitCronScheduler) Update() error {
// 删除所有定时任务
g.RemoveAll()
// 获取开启 Git 自动同步的爬虫
spiders, err := model.GetSpiderAllList(bson.M{"git_auto_sync": true})
if err != nil {
log.Errorf("get spider list error: %s", err.Error())
debug.PrintStack()
return err
}
// 遍历任务列表
for _, s := range spiders {
// 添加到定时任务
if err := g.AddJob(s); err != nil {
log.Errorf("add job error: %s, job: %s, cron: %s", err.Error(), s.Name, s.GitSyncFrequency)
debug.PrintStack()
return err
}
}
return nil
}
func (g *GitCronScheduler) AddJob(s model.Spider) error {
spec := s.GitSyncFrequency
// 添加定时任务
_, err := g.cron.AddFunc(spec, AddGitCronJob(s))
if err != nil {
log.Errorf("add func task error: %s", err.Error())
debug.PrintStack()
return err
}
return nil
}
func AddGitCronJob(s model.Spider) func() {
return func() {
if err := SyncSpiderGit(s); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return
}
}
}
func UpdateGitCron() {
if err := GitCron.Update(); err != nil {
log.Errorf(err.Error())
return
}
}
func GetGitSshPublicKey() string {
if !utils.Exists(path.Join(os.Getenv("HOME"), ".ssh")) ||
!utils.Exists(path.Join(os.Getenv("HOME"), ".ssh", "id_rsa")) ||
!utils.Exists(path.Join(os.Getenv("HOME"), ".ssh", "id_rsa.pub")) {
cmd := exec.Command("ssh-keygen -q -t rsa -N \"\" -f $HOME/.ssh/id_rsa")
if err := cmd.Start(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
return ""
}
}
content, err := ioutil.ReadFile(path.Join(os.Getenv("HOME"), ".ssh", "id_rsa.pub"))
if err != nil {
return ""
}
return string(content)
}

View File

@@ -264,12 +264,13 @@ func RemoveSpider(id string) error {
// 启动爬虫服务
func InitSpiderService() error {
// 构造定时任务执行器
c := cron.New(cron.WithSeconds())
if _, err := c.AddFunc("0 * * * * *", PublishAllSpiders); err != nil {
cPub := cron.New(cron.WithSeconds())
if _, err := cPub.AddFunc("0 * * * * *", PublishAllSpiders); err != nil {
return err
}
// 启动定时任务
c.Start()
cPub.Start()
if model.IsMaster() {
// 添加Demo爬虫
@@ -374,6 +375,16 @@ func InitSpiderService() error {
// 发布所有爬虫
PublishAllSpiders()
// 构造 Git 定时任务
GitCron = &GitCronScheduler{
cron: cron.New(cron.WithSeconds()),
}
// 启动 Git 定时任务
if err := GitCron.Start(); err != nil {
return err
}
}
return nil