fix: unable to sync files to worker nodes when running tasks

This commit is contained in:
Marvin Zhang
2025-06-18 21:50:53 +08:00
parent 22497d22f7
commit 09cfe37272
9 changed files with 42 additions and 68 deletions

View File

@@ -8,21 +8,21 @@ import (
"os"
"sync"
"github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/fs"
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/gin-gonic/gin"
)
func GetBaseFileListDir(rootPath, path string) (response *Response[[]interfaces.FsFileInfo], err error) {
func GetBaseFileListDir(rootPath, path string) (response *Response[[]entity.FsFileInfo], err error) {
fsSvc, err := fs.GetBaseFileFsSvc(rootPath)
if err != nil {
return GetErrorResponse[[]interfaces.FsFileInfo](err)
return GetErrorResponse[[]entity.FsFileInfo](err)
}
files, err := fsSvc.List(path)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return GetErrorResponse[[]interfaces.FsFileInfo](err)
return GetErrorResponse[[]entity.FsFileInfo](err)
}
}
@@ -43,15 +43,15 @@ func GetBaseFileContent(rootPath, path string) (response *Response[string], err
return GetDataResponse(string(data))
}
func GetBaseFileInfo(rootPath, path string) (response *Response[interfaces.FsFileInfo], err error) {
func GetBaseFileInfo(rootPath, path string) (response *Response[*entity.FsFileInfo], err error) {
fsSvc, err := fs.GetBaseFileFsSvc(rootPath)
if err != nil {
return GetErrorResponse[interfaces.FsFileInfo](err)
return GetErrorResponse[*entity.FsFileInfo](err)
}
info, err := fsSvc.GetFileInfo(path)
if err != nil {
return GetErrorResponse[interfaces.FsFileInfo](err)
return GetErrorResponse[*entity.FsFileInfo](err)
}
return GetDataResponse(info)

View File

@@ -1,6 +1,7 @@
package controllers
import (
"github.com/crawlab-team/crawlab/core/entity"
"mime/multipart"
"os"
"path/filepath"
@@ -358,10 +359,10 @@ type GetSpiderFilesParams struct {
Path string `query:"path" description:"Directory path"`
}
func GetSpiderFiles(c *gin.Context, params *GetSpiderFilesParams) (response *Response[[]interfaces.FsFileInfo], err error) {
func GetSpiderFiles(c *gin.Context, params *GetSpiderFilesParams) (response *Response[[]entity.FsFileInfo], err error) {
rootPath, err := getSpiderRootPathByContext(c)
if err != nil {
return GetErrorResponse[[]interfaces.FsFileInfo](err)
return GetErrorResponse[[]entity.FsFileInfo](err)
}
return GetBaseFileListDir(rootPath, params.Path)
}
@@ -384,10 +385,10 @@ type GetSpiderFileInfoParams struct {
Path string `query:"path" description:"File path"`
}
func GetSpiderFileInfo(c *gin.Context, params *GetSpiderFileInfoParams) (response *Response[interfaces.FsFileInfo], err error) {
func GetSpiderFileInfo(c *gin.Context, params *GetSpiderFileInfoParams) (response *Response[*entity.FsFileInfo], err error) {
rootPath, err := getSpiderRootPathByContext(c)
if err != nil {
return GetErrorResponse[interfaces.FsFileInfo](err)
return GetErrorResponse[*entity.FsFileInfo](err)
}
return GetBaseFileInfo(rootPath, params.Path)
}
@@ -612,14 +613,14 @@ func GetSpiderResults(c *gin.Context, params *GetSpiderResultsParams) (response
return GetListResponse(results, total)
}
func getSpiderFsSvc(s *models.Spider) (svc interfaces.FsService, err error) {
func getSpiderFsSvc(s *models.Spider) (svc *fs.Service, err error) {
workspacePath := utils.GetWorkspace()
fsSvc := fs.NewFsService(filepath.Join(workspacePath, s.Id.Hex()))
return fsSvc, nil
}
func getSpiderFsSvcById(id primitive.ObjectID) (svc interfaces.FsService, err error) {
func getSpiderFsSvcById(id primitive.ObjectID) (svc *fs.Service, err error) {
s, err := service.NewModelService[models.Spider]().GetById(id)
if err != nil {
logger.Errorf("failed to get spider: %v", err)

View File

@@ -1,22 +1,21 @@
package entity
import (
"github.com/crawlab-team/crawlab/core/interfaces"
"os"
"time"
)
type FsFileInfo struct {
Name string `json:"name"` // file name
Path string `json:"path"` // file path
FullPath string `json:"full_path"` // file full path
Extension string `json:"extension"` // file extension
IsDir bool `json:"is_dir"` // whether it is directory
FileSize int64 `json:"file_size"` // file size (bytes)
Children []interfaces.FsFileInfo `json:"children"` // children for subdirectory
ModTime time.Time `json:"mod_time"` // modification time
Mode os.FileMode `json:"mode"` // file mode
Hash string `json:"hash"` // file hash
Name string `json:"name"` // file name
Path string `json:"path"` // file path
FullPath string `json:"full_path"` // file full path
Extension string `json:"extension"` // file extension
IsDir bool `json:"is_dir"` // whether it is directory
FileSize int64 `json:"file_size"` // file size (bytes)
Children []FsFileInfo `json:"children"` // children for subdirectory
ModTime time.Time `json:"mod_time"` // modification time
Mode os.FileMode `json:"mode"` // file mode
Hash string `json:"hash"` // file hash
}
func (f *FsFileInfo) GetName() string {
@@ -55,6 +54,6 @@ func (f *FsFileInfo) GetHash() string {
return f.Hash
}
func (f *FsFileInfo) GetChildren() []interfaces.FsFileInfo {
func (f *FsFileInfo) GetChildren() []FsFileInfo {
return f.Children
}

View File

@@ -19,7 +19,7 @@ type Service struct {
interfaces.Logger
}
func (svc *Service) List(path string) (files []interfaces.FsFileInfo, err error) {
func (svc *Service) List(path string) (files []entity.FsFileInfo, err error) {
// Normalize the provided path
normPath := filepath.Clean(path)
if normPath == "." {
@@ -67,7 +67,7 @@ func (svc *Service) List(path string) (files []interfaces.FsFileInfo, err error)
}
if parentDir := filepath.Dir(p); parentDir != p && dirMap[parentDir] != nil {
dirMap[parentDir].Children = append(dirMap[parentDir].Children, fi)
dirMap[parentDir].Children = append(dirMap[parentDir].Children, *fi)
}
return nil
@@ -86,7 +86,7 @@ func (svc *Service) GetFile(path string) (data []byte, err error) {
return os.ReadFile(filepath.Join(svc.rootPath, path))
}
func (svc *Service) GetFileInfo(path string) (file interfaces.FsFileInfo, err error) {
func (svc *Service) GetFileInfo(path string) (file *entity.FsFileInfo, err error) {
f, err := os.Stat(filepath.Join(svc.rootPath, path))
if err != nil {
svc.Errorf("failed to get file info: %v", err)
@@ -184,7 +184,7 @@ func (svc *Service) Export() (resultPath string, err error) {
return zipFilePath, nil
}
func NewFsService(path string) (svc interfaces.FsService) {
func NewFsService(path string) (svc *Service) {
return &Service{
rootPath: path,
skipNames: []string{".git"},

View File

@@ -1,12 +1,11 @@
package fs
import (
"github.com/crawlab-team/crawlab/core/interfaces"
"github.com/crawlab-team/crawlab/core/utils"
"path/filepath"
)
func GetBaseFileFsSvc(rootPath string) (svc interfaces.FsService, err error) {
func GetBaseFileFsSvc(rootPath string) (svc *Service, err error) {
workspacePath := utils.GetWorkspace()
fsSvc := NewFsService(filepath.Join(workspacePath, rootPath))

View File

@@ -1,19 +0,0 @@
package interfaces
import (
"os"
"time"
)
type FsFileInfo interface {
GetName() string
GetPath() string
GetFullPath() string
GetExtension() string
GetIsDir() bool
GetFileSize() int64
GetModTime() time.Time
GetMode() os.FileMode
GetHash() string
GetChildren() []FsFileInfo
}

View File

@@ -1,13 +0,0 @@
package interfaces
type FsService interface {
List(path string) (files []FsFileInfo, err error)
GetFile(path string) (data []byte, err error)
GetFileInfo(path string) (file FsFileInfo, err error)
Save(path string, data []byte) (err error)
CreateDir(path string) (err error)
Rename(path, newPath string) (err error)
Delete(path string) (err error)
Copy(path, newPath string) (err error)
Export() (resultPath string, err error)
}

View File

@@ -87,8 +87,8 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) {
// Runner represents a task execution handler that manages the lifecycle of a running task
type Runner struct {
// dependencies
svc *Service // task handler service
fsSvc interfaces.FsService // task fs service
svc *Service // task handler service
fsSvc *fs.Service // task fs service
// settings
subscribeTimeout time.Duration // maximum time to wait for task subscription
@@ -499,8 +499,10 @@ func (r *Runner) syncFiles() (err error) {
r.Errorf("error reading response body: %v", err)
return err
}
var masterFiles map[string]entity.FsFileInfo
err = json.Unmarshal(body, &masterFiles)
var response struct {
Data map[string]entity.FsFileInfo `json:"data"`
}
err = json.Unmarshal(body, &response)
if err != nil {
r.Errorf("error unmarshaling JSON for URL: %s", resp.Request.URL.String())
r.Errorf("error details: %v", err)
@@ -509,7 +511,7 @@ func (r *Runner) syncFiles() (err error) {
// create a map for master files
masterFilesMap := make(map[string]entity.FsFileInfo)
for _, file := range masterFiles {
for _, file := range response.Data {
masterFilesMap[file.Path] = file
}

View File

@@ -111,6 +111,11 @@ const spiderSelectOptions = computed<SelectOption[]>(() =>
);
onBeforeMount(getSpiders);
const getNodes = async () => {
await store.dispatch('node/getAllNodes');
};
onBeforeMount(getNodes);
const validate = async () => {
await formRef.value?.validate();
};