fix: unable to sync files from master in worker nodes

This commit is contained in:
Marvin Zhang
2025-06-20 14:42:52 +08:00
parent 52b180aaa0
commit 13038d4a1a
5 changed files with 68 additions and 26 deletions

View File

@@ -19,7 +19,7 @@ import (
type RouterGroups struct {
AuthGroup *fizz.RouterGroup // Routes requiring full authentication
AnonymousGroup *fizz.RouterGroup // Public routes that don't require auth
SyncAuthGroup *gin.RouterGroup // Routes for sync operations with special auth
SyncAuthGroup *fizz.RouterGroup // Routes for sync operations with special auth
Wrapper *openapi.FizzWrapper // OpenAPI wrapper for documentation
}
@@ -41,7 +41,7 @@ func NewRouterGroups(app *gin.Engine) (groups *RouterGroups) {
return &RouterGroups{
AuthGroup: f.Group("/", "AuthGroup", "Router group that requires authentication", middlewares.AuthorizationMiddleware()),
AnonymousGroup: f.Group("/", "AnonymousGroup", "Router group that doesn't require authentication"),
SyncAuthGroup: app.Group("/", middlewares.SyncAuthorizationMiddleware()),
SyncAuthGroup: f.Group("/", "SyncAuthGroup", "Router group for sync operations with special auth", middlewares.SyncAuthorizationMiddleware()),
Wrapper: globalWrapper,
}
}
@@ -704,8 +704,22 @@ func InitRoutes(app *gin.Engine) (err error) {
})
// Register sync routes that require special authentication
groups.SyncAuthGroup.GET("/sync/:id/scan", GetSyncScan)
groups.SyncAuthGroup.GET("/sync/:id/download", GetSyncDownload)
RegisterActions(groups.SyncAuthGroup.Group("", "Sync", "APIs for sync operations"), "/sync/:id", []Action{
{
Method: http.MethodGet,
Path: "/scan",
Name: "Scan files (sync)",
Description: "Scan files for a specific ID (Spider/Git)",
HandlerFunc: GetSyncScan,
},
{
Method: http.MethodGet,
Path: "/download",
Name: "Download file (sync)",
Description: "Download a file for a specific ID (Spider/Git)",
HandlerFunc: GetSyncDownload,
},
})
// Register health check route
groups.AnonymousGroup.GinRouterGroup().GET("/health", GetHealthFn(func() bool { return true }))

View File

@@ -1,25 +1,30 @@
package controllers
import (
"github.com/crawlab-team/crawlab/core/entity"
"github.com/juju/errors"
"path/filepath"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/gin-gonic/gin"
)
func GetSyncScan(c *gin.Context) {
func GetSyncScan(c *gin.Context) (response *Response[entity.FsFileInfoMap], err error) {
workspacePath := utils.GetWorkspace()
dirPath := filepath.Join(workspacePath, c.Param("id"), c.Param("path"))
files, err := utils.ScanDirectory(dirPath)
if err != nil {
HandleErrorInternalServerError(c, err)
return
return GetErrorResponse[entity.FsFileInfoMap](err)
}
HandleSuccessWithData(c, files)
return GetDataResponse(files)
}
func GetSyncDownload(c *gin.Context) {
func GetSyncDownload(c *gin.Context) (err error) {
workspacePath := utils.GetWorkspace()
filePath := filepath.Join(workspacePath, c.Param("id"), c.Param("path"))
filePath := filepath.Join(workspacePath, c.Param("id"), c.Query("path"))
if !utils.Exists(filePath) {
return errors.NotFoundf("file not exists: %s", filePath)
}
c.File(filePath)
return nil
}

View File

@@ -57,3 +57,5 @@ func (f *FsFileInfo) GetHash() string {
func (f *FsFileInfo) GetChildren() []FsFileInfo {
return f.Children
}
type FsFileInfoMap map[string]FsFileInfo

View File

@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
@@ -444,7 +445,7 @@ func (r *Runner) configureEnv() {
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_PARENT_PID="+fmt.Sprint(os.Getpid()))
}
func (r *Runner) createHttpRequest(method, path string) (*http.Response, error) {
func (r *Runner) performHttpRequest(method, path string, params url.Values) (*http.Response, error) {
// Normalize path
if strings.HasPrefix(path, "/") {
path = path[1:]
@@ -457,10 +458,10 @@ func (r *Runner) createHttpRequest(method, path string) (*http.Response, error)
} else {
id = r.s.GitId.Hex()
}
url := fmt.Sprintf("%s/sync/%s/%s", utils.GetApiEndpoint(), id, path)
requestUrl := fmt.Sprintf("%s/sync/%s/%s?%s", utils.GetApiEndpoint(), id, path, params.Encode())
// Create and execute request
req, err := http.NewRequest(method, url, nil)
req, err := http.NewRequest(method, requestUrl, nil)
if err != nil {
return nil, fmt.Errorf("error creating request: %v", err)
}
@@ -488,7 +489,10 @@ func (r *Runner) syncFiles() (err error) {
// get file list from master
r.Infof("fetching file list from master node")
resp, err := r.createHttpRequest("GET", "/scan?path="+workingDir)
params := url.Values{
"path": []string{workingDir},
}
resp, err := r.performHttpRequest("GET", "/scan", params)
if err != nil {
r.Errorf("error getting file list from master: %v", err)
return err
@@ -500,7 +504,7 @@ func (r *Runner) syncFiles() (err error) {
return err
}
var response struct {
Data map[string]entity.FsFileInfo `json:"data"`
Data entity.FsFileInfoMap `json:"data"`
}
err = json.Unmarshal(body, &response)
if err != nil {
@@ -510,7 +514,7 @@ func (r *Runner) syncFiles() (err error) {
}
// create a map for master files
masterFilesMap := make(map[string]entity.FsFileInfo)
masterFilesMap := make(entity.FsFileInfoMap)
for _, file := range response.Data {
masterFilesMap[file.Path] = file
}
@@ -591,8 +595,10 @@ func (r *Runner) syncFiles() (err error) {
// downloadFile downloads a file from the master node and saves it to the local file system
func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsFileInfo) error {
r.Debugf("downloading file: %s -> %s", path, filePath)
resp, err := r.createHttpRequest("GET", "/download?path="+path)
params := url.Values{
"path": []string{path},
}
resp, err := r.performHttpRequest("GET", "/download", params)
if err != nil {
r.Errorf("error getting file response: %v", err)
return err

View File

@@ -10,6 +10,7 @@ import (
"os"
"path"
"path/filepath"
"regexp"
"github.com/crawlab-team/crawlab/core/entity"
)
@@ -182,16 +183,35 @@ func GetFileHash(filePath string) (res string, err error) {
return hex.EncodeToString(hash.Sum(nil)), nil
}
const IgnoreFileRegexPattern = "^(node_modules)/"
const IgnoreFileRegexPattern = `(^node_modules|__pycache__)/|\.(tmp|temp|log|swp|swo|bak|orig|lock|pid|pyc|pyo)$`
func ScanDirectory(dir string) (res map[string]entity.FsFileInfo, err error) {
files := make(map[string]entity.FsFileInfo)
func ScanDirectory(dir string) (res entity.FsFileInfoMap, err error) {
files := make(entity.FsFileInfoMap)
// Compile the ignore pattern regex
ignoreRegex, err := regexp.Compile(IgnoreFileRegexPattern)
if err != nil {
return nil, fmt.Errorf("failed to compile ignore pattern: %v", err)
}
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
relPath, err := filepath.Rel(dir, path)
if err != nil {
return err
}
// Skip files that match the ignore pattern
if ignoreRegex.MatchString(relPath) {
if info.IsDir() {
return filepath.SkipDir
}
return nil
}
var hash string
if !info.IsDir() {
hash, err = GetFileHash(path)
@@ -200,11 +220,6 @@ func ScanDirectory(dir string) (res map[string]entity.FsFileInfo, err error) {
}
}
relPath, err := filepath.Rel(dir, path)
if err != nil {
return err
}
files[relPath] = entity.FsFileInfo{
Name: info.Name(),
Path: relPath,