fix: unable to sync directories to work nodes

This commit is contained in:
Marvin Zhang
2024-07-01 15:59:20 +08:00
parent 7f35e1b2ee
commit 840100dbc3
40 changed files with 768 additions and 1540 deletions

View File

@@ -4,14 +4,14 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/apex/log"
"github.com/cenkalti/backoff/v4"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/container"
"github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/errors"
fs2 "github.com/crawlab-team/crawlab/core/fs"
client2 "github.com/crawlab-team/crawlab/core/grpc/client"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/models/client"
"github.com/crawlab-team/crawlab/core/models/models"
@@ -21,7 +21,6 @@ import (
grpc "github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
"github.com/shirou/gopsutil/process"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -54,7 +53,7 @@ type RunnerV2 struct {
err error // standard process error
envs []models.Env // environment variables
cwd string // working directory
c interfaces.GrpcClient // grpc client
c *client2.GrpcClientV2 // grpc client
sub grpc.TaskService_SubscribeClient // grpc task service stream client
// log internals
@@ -181,7 +180,7 @@ func (r *RunnerV2) Cancel() (err error) {
// make sure the process does not exist
op := func() error {
if exists, _ := process.PidExists(int32(r.pid)); exists {
return errors.ErrorTaskProcessStillExists
return errors.New(fmt.Sprintf("task process %d still exists", r.pid))
}
return nil
}
@@ -189,7 +188,8 @@ func (r *RunnerV2) Cancel() (err error) {
defer cancel()
b := backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), ctx)
if err := backoff.Retry(op, b); err != nil {
return trace.TraceError(errors.ErrorTaskUnableToCancel)
log.Errorf("Error canceling task %s: %v", r.tid, err)
return trace.TraceError(err)
}
return nil
@@ -362,7 +362,7 @@ func (r *RunnerV2) syncFiles() (err error) {
masterFilesMap[file.Path] = file
}
// create worker directory if not exists
// create working directory if not exists
if _, err := os.Stat(r.cwd); os.IsNotExist(err) {
if err := os.MkdirAll(r.cwd, os.ModePerm); err != nil {
log.Errorf("Error creating worker directory: %v", err)
@@ -377,10 +377,6 @@ func (r *RunnerV2) syncFiles() (err error) {
return trace.TraceError(err)
}
// set up wait group and error channel
var wg sync.WaitGroup
errCh := make(chan error, 1)
// delete files that are deleted on master node
for path, workerFile := range workerFiles {
if _, exists := masterFilesMap[path]; !exists {
@@ -392,50 +388,85 @@ func (r *RunnerV2) syncFiles() (err error) {
}
}
// set up wait group and error channel
var wg sync.WaitGroup
pool := make(chan struct{}, 10)
// download files that are new or modified on master node
for path, masterFile := range masterFilesMap {
workerFile, exists := workerFiles[path]
if !exists || masterFile.Hash != workerFile.Hash {
wg.Add(1)
go func(path string, masterFile entity.FsFileInfo) {
// acquire token
pool <- struct{}{}
// start goroutine to synchronize file or directory
go func(path string, masterFile *entity.FsFileInfo) {
defer wg.Done()
logrus.Infof("File needs to be synchronized: %s", path)
err := r.downloadFile(masterURL+"/download?path="+path, filepath.Join(r.cwd, path))
if err != nil {
logrus.Errorf("Error downloading file: %v", err)
select {
case errCh <- err:
default:
if masterFile.IsDir {
log.Infof("Directory needs to be synchronized: %s", path)
_err := os.MkdirAll(filepath.Join(r.cwd, path), masterFile.Mode)
if _err != nil {
log.Errorf("Error creating directory: %v", _err)
err = errors.Join(err, _err)
}
} else {
log.Infof("File needs to be synchronized: %s", path)
_err := r.downloadFile(masterURL+"/download?path="+path, filepath.Join(r.cwd, path), masterFile)
if _err != nil {
log.Errorf("Error downloading file: %v", _err)
err = errors.Join(err, _err)
}
}
}(path, masterFile)
// release token
<-pool
}(path, &masterFile)
}
}
// wait for all files and directories to be synchronized
wg.Wait()
close(errCh)
if err := <-errCh; err != nil {
return err
}
return nil
return err
}
func (r *RunnerV2) downloadFile(url string, filePath string) error {
func (r *RunnerV2) downloadFile(url string, filePath string, fileInfo *entity.FsFileInfo) error {
// get file response
resp, err := http.Get(url)
if err != nil {
log.Errorf("Error getting file response: %v", err)
return err
}
defer resp.Body.Close()
out, err := os.Create(filePath)
// create directory if not exists
dirPath := filepath.Dir(filePath)
utils.Exists(dirPath)
err = os.MkdirAll(dirPath, os.ModePerm)
if err != nil {
log.Errorf("Error creating directory: %v", err)
return err
}
// create local file
out, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fileInfo.Mode)
if err != nil {
log.Errorf("Error creating file: %v", err)
return err
}
defer out.Close()
// copy file content to local file
_, err = io.Copy(out, resp.Body)
return err
if err != nil {
log.Errorf("Error copying file: %v", err)
return err
}
return nil
}
// wait for process to finish and send task signal (constants.TaskSignal)
@@ -443,7 +474,8 @@ func (r *RunnerV2) downloadFile(url string, filePath string) error {
func (r *RunnerV2) wait() {
// wait for process to finish
if err := r.cmd.Wait(); err != nil {
exitError, ok := err.(*exec.ExitError)
var exitError *exec.ExitError
ok := errors.As(err, &exitError)
if !ok {
r.ch <- constants.TaskSignalError
return
@@ -505,7 +537,7 @@ func (r *RunnerV2) updateTask(status string, e error) (err error) {
}
func (r *RunnerV2) initSub() (err error) {
r.sub, err = r.c.GetTaskClient().Subscribe(context.Background())
r.sub, err = r.c.TaskClient.Subscribe(context.Background())
if err != nil {
return trace.TraceError(err)
}
@@ -577,7 +609,7 @@ func (r *RunnerV2) sendNotification() {
NodeKey: r.svc.GetNodeConfigService().GetNodeKey(),
Data: data,
}
_, err = r.c.GetTaskClient().SendNotification(context.Background(), req)
_, err = r.c.TaskClient.SendNotification(context.Background(), req)
if err != nil {
trace.PrintError(err)
return
@@ -617,7 +649,8 @@ func (r *RunnerV2) _updateSpiderStat(status string) {
},
}
default:
trace.PrintError(errors.ErrorTaskInvalidType)
log.Errorf("Invalid task status: %s", status)
trace.PrintError(errors.New("invalid task status"))
return
}
@@ -679,14 +712,8 @@ func NewTaskRunnerV2(id primitive.ObjectID, svc *ServiceV2) (r2 *RunnerV2, err e
// task fs service
r.fsSvc = fs2.NewFsServiceV2(filepath.Join(viper.GetString("workspace"), r.s.Id.Hex()))
// dependency injection
if err := container.GetContainer().Invoke(func(
c interfaces.GrpcClient,
) {
r.c = c
}); err != nil {
return nil, trace.TraceError(err)
}
// grpc client
r.c = client2.GetGrpcClientV2()
// initialize task runner
if err := r.Init(); err != nil {

View File

@@ -74,9 +74,10 @@ func (svc *ServiceV2) Cancel(taskId primitive.ObjectID) (err error) {
}
func (svc *ServiceV2) Fetch() {
ticker := time.NewTicker(svc.fetchInterval)
for {
// wait
time.Sleep(svc.fetchInterval)
<-ticker.C
// current node
n, err := svc.GetCurrentNode()
@@ -96,6 +97,7 @@ func (svc *ServiceV2) Fetch() {
// stop
if svc.stopped {
ticker.Stop()
return
}
@@ -115,10 +117,11 @@ func (svc *ServiceV2) Fetch() {
if err := svc.run(tid); err != nil {
trace.PrintError(err)
t, err := svc.GetTaskById(tid)
if err == nil && t.Status != constants.TaskStatusCancelled {
if err != nil && t.Status != constants.TaskStatusCancelled {
t.Error = err.Error()
t.Status = constants.TaskStatusError
t.SetUpdated(t.CreatedBy)
_ = client.NewModelServiceV2[models.TaskV2]().ReplaceById(t.Id, *t)
continue
}
continue
@@ -388,7 +391,7 @@ func (svc *ServiceV2) run(taskId primitive.ObjectID) (err error) {
return nil
}
func NewTaskHandlerServiceV2() (svc2 *ServiceV2, err error) {
func newTaskHandlerServiceV2() (svc2 *ServiceV2, err error) {
// service
svc := &ServiceV2{
exitWatchDuration: 60 * time.Second,
@@ -413,12 +416,18 @@ func NewTaskHandlerServiceV2() (svc2 *ServiceV2, err error) {
}
var _serviceV2 *ServiceV2
var _serviceV2Once = new(sync.Once)
func GetTaskHandlerServiceV2() (svr *ServiceV2, err error) {
if _serviceV2 != nil {
return _serviceV2, nil
}
_serviceV2, err = NewTaskHandlerServiceV2()
_serviceV2Once.Do(func() {
_serviceV2, err = newTaskHandlerServiceV2()
if err != nil {
log.Errorf("failed to create task handler service: %v", err)
}
})
if err != nil {
return nil, err
}