mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
feat: added fields to models
This commit is contained in:
@@ -9,5 +9,9 @@ type GitV2 struct {
|
||||
Username string `json:"username" bson:"username"`
|
||||
Password string `json:"password" bson:"password"`
|
||||
CurrentBranch string `json:"current_branch" bson:"current_branch"`
|
||||
AutoPull bool `json:"auto_pull" bson:"auto_pull"`
|
||||
Status string `json:"status" bson:"status"`
|
||||
Error string `json:"error" bson:"error"`
|
||||
|
||||
// settings
|
||||
AutoPull bool `json:"auto_pull" bson:"auto_pull"`
|
||||
}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/apex/log"
|
||||
config2 "github.com/crawlab-team/crawlab/core/config"
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
"github.com/crawlab-team/crawlab/core/errors"
|
||||
@@ -11,15 +9,10 @@ import (
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
"github.com/crawlab-team/crawlab/core/node/config"
|
||||
"github.com/crawlab-team/crawlab/core/task/scheduler"
|
||||
"github.com/crawlab-team/crawlab/core/utils"
|
||||
"github.com/crawlab-team/crawlab/trace"
|
||||
"github.com/crawlab-team/crawlab/vcs"
|
||||
"github.com/robfig/cron/v3"
|
||||
"github.com/spf13/viper"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -35,7 +28,7 @@ type ServiceV2 struct {
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Start() (err error) {
|
||||
return svc.SyncGit()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) Schedule(id primitive.ObjectID, opts *interfaces.SpiderRunOptions) (taskIds []primitive.ObjectID, err error) {
|
||||
@@ -49,14 +42,6 @@ func (svc *ServiceV2) Schedule(id primitive.ObjectID, opts *interfaces.SpiderRun
|
||||
return svc.scheduleTasks(s, opts)
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) SyncGit() (err error) {
|
||||
if _, err = svc.cron.AddFunc("* * * * *", svc.syncGit); err != nil {
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
svc.cron.Start()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) scheduleTasks(s *models.SpiderV2, opts *interfaces.SpiderRunOptions) (taskIds []primitive.ObjectID, err error) {
|
||||
// main task
|
||||
mainTask := &models.TaskV2{
|
||||
@@ -175,99 +160,6 @@ func (svc *ServiceV2) isMultiTask(opts *interfaces.SpiderRunOptions) (res bool)
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) syncGit() {
|
||||
if svc.syncLock {
|
||||
log.Infof("[SpiderAdminService] sync git is locked, skip")
|
||||
return
|
||||
}
|
||||
log.Infof("[SpiderAdminService] start to sync git")
|
||||
|
||||
svc.syncLock = true
|
||||
defer func() {
|
||||
svc.syncLock = false
|
||||
}()
|
||||
|
||||
// spiders
|
||||
spiders, err := service.NewModelServiceV2[models.SpiderV2]().GetMany(nil, nil)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
|
||||
// spider ids
|
||||
var spiderIds []primitive.ObjectID
|
||||
for _, s := range spiders {
|
||||
spiderIds = append(spiderIds, s.Id)
|
||||
}
|
||||
|
||||
if len(spiderIds) > 0 {
|
||||
// gits
|
||||
gits, err := service.NewModelServiceV2[models.GitV2]().GetMany(bson.M{
|
||||
"_id": bson.M{
|
||||
"$in": spiderIds,
|
||||
},
|
||||
"auto_pull": true,
|
||||
}, nil)
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(gits))
|
||||
for _, g := range gits {
|
||||
go func(g models.GitV2) {
|
||||
svc.syncGitOne(&g)
|
||||
wg.Done()
|
||||
}(g)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
log.Infof("[SpiderAdminService] finished sync git")
|
||||
}
|
||||
|
||||
func (svc *ServiceV2) syncGitOne(g *models.GitV2) {
|
||||
log.Infof("[SpiderAdminService] sync git %s", g.Id)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// git client
|
||||
workspacePath := viper.GetString("workspace")
|
||||
gitClient, err := vcs.NewGitClient(vcs.WithPath(filepath.Join(workspacePath, g.Id.Hex())))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// set auth
|
||||
utils.InitGitClientAuthV2(g, gitClient)
|
||||
|
||||
// check if remote has changes
|
||||
ok, err := gitClient.IsRemoteChanged()
|
||||
if err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
if !ok {
|
||||
// no change
|
||||
return
|
||||
}
|
||||
|
||||
// pull and sync to workspace
|
||||
if err := gitClient.Reset(); err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
if err := gitClient.Pull(); err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
|
||||
// wait for context to end
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func NewSpiderAdminServiceV2() (svc2 *ServiceV2, err error) {
|
||||
svc := &ServiceV2{
|
||||
nodeCfgSvc: config.GetNodeConfigService(),
|
||||
|
||||
@@ -3,7 +3,6 @@ package utils
|
||||
import (
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
vcs "github.com/crawlab-team/crawlab/vcs"
|
||||
)
|
||||
|
||||
@@ -20,17 +19,3 @@ func InitGitClientAuth(g interfaces.Git, gitClient *vcs.GitClient) {
|
||||
gitClient.SetPrivateKey(g.GetPassword())
|
||||
}
|
||||
}
|
||||
|
||||
func InitGitClientAuthV2(g *models.GitV2, gitClient *vcs.GitClient) {
|
||||
// set auth
|
||||
switch g.AuthType {
|
||||
case constants.GitAuthTypeHttp:
|
||||
gitClient.SetAuthType(vcs.GitAuthTypeHTTP)
|
||||
gitClient.SetUsername(g.Username)
|
||||
gitClient.SetPassword(g.Password)
|
||||
case constants.GitAuthTypeSsh:
|
||||
gitClient.SetAuthType(vcs.GitAuthTypeSSH)
|
||||
gitClient.SetUsername(g.Username)
|
||||
gitClient.SetPrivateKey(g.Password)
|
||||
}
|
||||
}
|
||||
|
||||
142
vcs/git.go
142
vcs/git.go
@@ -1,6 +1,7 @@
|
||||
package vcs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/apex/log"
|
||||
"github.com/crawlab-team/crawlab/trace"
|
||||
"github.com/go-git/go-billy/v5"
|
||||
@@ -14,7 +15,7 @@ import (
|
||||
gitssh "github.com/go-git/go-git/v5/plumbing/transport/ssh"
|
||||
"github.com/go-git/go-git/v5/storage/memory"
|
||||
"golang.org/x/crypto/ssh"
|
||||
"io/ioutil"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
@@ -35,6 +36,7 @@ type GitClient struct {
|
||||
privateKey string
|
||||
privateKeyPath string
|
||||
defaultBranch string
|
||||
defaultInit bool
|
||||
|
||||
// internals
|
||||
r *git.Repository
|
||||
@@ -51,36 +53,6 @@ func (c *GitClient) Init() (err error) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if remote url is not empty and no remote exists
|
||||
// create default remote and pull from remote url
|
||||
remotes, err := c.r.Remotes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if c.remoteUrl != "" && len(remotes) == 0 {
|
||||
// attempt to get default remote
|
||||
if _, err := c.r.Remote(GitRemoteNameOrigin); err != nil {
|
||||
if err != git.ErrRemoteNotFound {
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
err = nil
|
||||
|
||||
// create default remote
|
||||
if err := c.createRemote(GitRemoteNameOrigin, c.remoteUrl); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//// pull
|
||||
//opts := []GitPullOption{
|
||||
// WithRemoteNamePull(GitRemoteNameOrigin),
|
||||
//}
|
||||
//if err := c.Pull(opts...); err != nil {
|
||||
// return err
|
||||
//}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -97,6 +69,33 @@ func (c *GitClient) Dispose() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GitClient) Clone() (err error) {
|
||||
// validate
|
||||
if c.remoteUrl == "" {
|
||||
return ErrUnableToCloneWithEmptyRemoteUrl
|
||||
}
|
||||
|
||||
// auth
|
||||
auth, err := c.getGitAuth()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// options
|
||||
o := &git.CloneOptions{
|
||||
URL: c.remoteUrl,
|
||||
Auth: auth,
|
||||
}
|
||||
|
||||
// clone
|
||||
_, err = git.PlainClone(c.path, false, o)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GitClient) Checkout(opts ...GitCheckoutOption) (err error) {
|
||||
// worktree
|
||||
wt, err := c.r.Worktree()
|
||||
@@ -163,16 +162,16 @@ func (c *GitClient) Pull(opts ...GitPullOption) (err error) {
|
||||
|
||||
// pull
|
||||
if err := wt.Pull(o); err != nil {
|
||||
if err == transport.ErrEmptyRemoteRepository {
|
||||
if errors.Is(err, transport.ErrEmptyRemoteRepository) {
|
||||
return nil
|
||||
}
|
||||
if err == transport.ErrEmptyUploadPackRequest {
|
||||
if errors.Is(err, transport.ErrEmptyUploadPackRequest) {
|
||||
return nil
|
||||
}
|
||||
if err == git.NoErrAlreadyUpToDate {
|
||||
if errors.Is(err, git.NoErrAlreadyUpToDate) {
|
||||
return nil
|
||||
}
|
||||
if err == git.ErrNonFastForwardUpdate {
|
||||
if errors.Is(err, git.ErrNonFastForwardUpdate) {
|
||||
return nil
|
||||
}
|
||||
return trace.TraceError(err)
|
||||
@@ -260,14 +259,14 @@ func (c *GitClient) CheckoutBranchWithRemote(branch, remote string, ref *plumbin
|
||||
}
|
||||
|
||||
// check if the branch exists
|
||||
b, err := c.r.Branch(branch)
|
||||
_, err = c.r.Branch(branch)
|
||||
if err != nil {
|
||||
if err == git.ErrBranchNotFound {
|
||||
if errors.Is(err, git.ErrBranchNotFound) {
|
||||
// create a new branch if it does not exist
|
||||
if err := c.createBranch(branch, remote, ref); err != nil {
|
||||
return err
|
||||
}
|
||||
b, err = c.r.Branch(branch)
|
||||
_, err = c.r.Branch(branch)
|
||||
if err != nil {
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
@@ -276,11 +275,6 @@ func (c *GitClient) CheckoutBranchWithRemote(branch, remote string, ref *plumbin
|
||||
}
|
||||
}
|
||||
|
||||
// set branch remote
|
||||
if remote != "" {
|
||||
b.Remote = remote
|
||||
}
|
||||
|
||||
// add to options
|
||||
opts = append(opts, WithBranch(branch))
|
||||
|
||||
@@ -338,14 +332,14 @@ func (c *GitClient) GetLogs() (logs []GitLog, err error) {
|
||||
return nil, trace.TraceError(err)
|
||||
}
|
||||
if err := iter.ForEach(func(commit *object.Commit) error {
|
||||
log := GitLog{
|
||||
gitLog := GitLog{
|
||||
Hash: commit.Hash.String(),
|
||||
Msg: commit.Message,
|
||||
AuthorName: commit.Author.Name,
|
||||
AuthorEmail: commit.Author.Email,
|
||||
Timestamp: commit.Author.When,
|
||||
}
|
||||
logs = append(logs, log)
|
||||
logs = append(logs, gitLog)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, trace.TraceError(err)
|
||||
@@ -524,10 +518,14 @@ func (c *GitClient) GetBranches() (branches []GitRef, err error) {
|
||||
}
|
||||
|
||||
func (c *GitClient) GetRemoteRefs(remoteName string) (gitRefs []GitRef, err error) {
|
||||
if remoteName == "" {
|
||||
remoteName = GitRemoteNameOrigin
|
||||
}
|
||||
|
||||
// remote
|
||||
r, err := c.r.Remote(remoteName)
|
||||
if err != nil {
|
||||
if err == git.ErrRemoteNotFound {
|
||||
if errors.Is(err, git.ErrRemoteNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, trace.TraceError(err)
|
||||
@@ -542,7 +540,7 @@ func (c *GitClient) GetRemoteRefs(remoteName string) (gitRefs []GitRef, err erro
|
||||
// refs
|
||||
refs, err := r.List(&git.ListOptions{Auth: auth})
|
||||
if err != nil {
|
||||
if err != transport.ErrEmptyRemoteRepository {
|
||||
if !errors.Is(err, transport.ErrEmptyRemoteRepository) {
|
||||
return nil, trace.TraceError(err)
|
||||
}
|
||||
return nil, nil
|
||||
@@ -699,7 +697,7 @@ func (c *GitClient) initMem() (err error) {
|
||||
// attempt to init
|
||||
c.r, err = git.Init(storage, wt)
|
||||
if err != nil {
|
||||
if err == git.ErrRepositoryAlreadyExists {
|
||||
if errors.Is(err, git.ErrRepositoryAlreadyExists) {
|
||||
// if already exists, attempt to open
|
||||
c.r, err = git.Open(storage, wt)
|
||||
if err != nil {
|
||||
@@ -730,43 +728,10 @@ func (c *GitClient) initFs() (err error) {
|
||||
|
||||
// try to open repo
|
||||
c.r, err = git.PlainOpen(c.path)
|
||||
if err == git.ErrRepositoryNotExists {
|
||||
// repo not exists, init
|
||||
c.r, err = git.PlainInit(c.path, false)
|
||||
if err != nil {
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
} else if err != nil {
|
||||
// error
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GitClient) clone() (err error) {
|
||||
// validate
|
||||
if c.remoteUrl == "" {
|
||||
return trace.TraceError(ErrUnableToCloneWithEmptyRemoteUrl)
|
||||
}
|
||||
|
||||
// auth
|
||||
auth, err := c.getGitAuth()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// options
|
||||
o := &git.CloneOptions{
|
||||
URL: c.remoteUrl,
|
||||
Auth: auth,
|
||||
}
|
||||
|
||||
// clone
|
||||
if _, err := git.PlainClone(c.path, false, o); err != nil {
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -843,7 +808,7 @@ func (c *GitClient) getGitAuth() (auth transport.AuthMethod, err error) {
|
||||
privateKeyData = []byte(c.privateKey)
|
||||
} else if c.privateKeyPath != "" {
|
||||
// read from private key file
|
||||
privateKeyData, err = ioutil.ReadFile(c.privateKeyPath)
|
||||
privateKeyData, err = os.ReadFile(c.privateKeyPath)
|
||||
if err != nil {
|
||||
return nil, trace.TraceError(err)
|
||||
}
|
||||
@@ -877,7 +842,7 @@ func (c *GitClient) getHeadRef() (ref string, err error) {
|
||||
if err != nil {
|
||||
return "", trace.TraceError(err)
|
||||
}
|
||||
data, err := ioutil.ReadAll(fh)
|
||||
data, err := io.ReadAll(fh)
|
||||
if err != nil {
|
||||
return "", trace.TraceError(err)
|
||||
}
|
||||
@@ -933,7 +898,7 @@ func (c *GitClient) createBranch(branch, remote string, ref *plumbing.Reference)
|
||||
ref, err = c.getBranchHashRef(branch, remote)
|
||||
|
||||
// if no matched remote branch, set to HEAD
|
||||
if err == ErrNoMatchedRemoteBranch {
|
||||
if errors.Is(err, ErrNoMatchedRemoteBranch) {
|
||||
ref, err = c.r.Head()
|
||||
if err != nil {
|
||||
return trace.TraceError(err)
|
||||
@@ -1003,6 +968,7 @@ func NewGitClient(opts ...GitOption) (c *GitClient, err error) {
|
||||
authType: GitAuthTypeNone,
|
||||
username: "git",
|
||||
privateKeyPath: getDefaultPublicKeyPath(),
|
||||
defaultInit: true,
|
||||
}
|
||||
|
||||
// apply options
|
||||
@@ -1010,9 +976,11 @@ func NewGitClient(opts ...GitOption) (c *GitClient, err error) {
|
||||
opt(c)
|
||||
}
|
||||
|
||||
// init
|
||||
if err := c.Init(); err != nil {
|
||||
return c, err
|
||||
// initialize
|
||||
if c.defaultInit {
|
||||
if err = c.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -59,6 +59,12 @@ func WithDefaultBranch(branch string) GitOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithDefaultInit(init bool) GitOption {
|
||||
return func(c *GitClient) {
|
||||
c.defaultInit = init
|
||||
}
|
||||
}
|
||||
|
||||
func WithPrivateKeyPath(path string) GitOption {
|
||||
return func(c *GitClient) {
|
||||
c.privateKeyPath = path
|
||||
|
||||
Reference in New Issue
Block a user