From 13038d4a1ae786c37f9911084cb2b2744fbab62c Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Fri, 20 Jun 2025 14:42:52 +0800 Subject: [PATCH] fix: unable to sync files from master in worker nodes --- core/controllers/router.go | 22 ++++++++++++++++++---- core/controllers/sync.go | 17 +++++++++++------ core/entity/fs_file_info.go | 2 ++ core/task/handler/runner.go | 22 ++++++++++++++-------- core/utils/file.go | 31 +++++++++++++++++++++++-------- 5 files changed, 68 insertions(+), 26 deletions(-) diff --git a/core/controllers/router.go b/core/controllers/router.go index 09c25e4a..373f0dd7 100644 --- a/core/controllers/router.go +++ b/core/controllers/router.go @@ -19,7 +19,7 @@ import ( type RouterGroups struct { AuthGroup *fizz.RouterGroup // Routes requiring full authentication AnonymousGroup *fizz.RouterGroup // Public routes that don't require auth - SyncAuthGroup *gin.RouterGroup // Routes for sync operations with special auth + SyncAuthGroup *fizz.RouterGroup // Routes for sync operations with special auth Wrapper *openapi.FizzWrapper // OpenAPI wrapper for documentation } @@ -41,7 +41,7 @@ func NewRouterGroups(app *gin.Engine) (groups *RouterGroups) { return &RouterGroups{ AuthGroup: f.Group("/", "AuthGroup", "Router group that requires authentication", middlewares.AuthorizationMiddleware()), AnonymousGroup: f.Group("/", "AnonymousGroup", "Router group that doesn't require authentication"), - SyncAuthGroup: app.Group("/", middlewares.SyncAuthorizationMiddleware()), + SyncAuthGroup: f.Group("/", "SyncAuthGroup", "Router group for sync operations with special auth", middlewares.SyncAuthorizationMiddleware()), Wrapper: globalWrapper, } } @@ -704,8 +704,22 @@ func InitRoutes(app *gin.Engine) (err error) { }) // Register sync routes that require special authentication - groups.SyncAuthGroup.GET("/sync/:id/scan", GetSyncScan) - groups.SyncAuthGroup.GET("/sync/:id/download", GetSyncDownload) + RegisterActions(groups.SyncAuthGroup.Group("", "Sync", "APIs for sync operations"), "/sync/:id", []Action{ + { + Method: http.MethodGet, + Path: "/scan", + Name: "Scan files (sync)", + Description: "Scan files for a specific ID (Spider/Git)", + HandlerFunc: GetSyncScan, + }, + { + Method: http.MethodGet, + Path: "/download", + Name: "Download file (sync)", + Description: "Download a file for a specific ID (Spider/Git)", + HandlerFunc: GetSyncDownload, + }, + }) // Register health check route groups.AnonymousGroup.GinRouterGroup().GET("/health", GetHealthFn(func() bool { return true })) diff --git a/core/controllers/sync.go b/core/controllers/sync.go index 3f3216d1..c03230a5 100644 --- a/core/controllers/sync.go +++ b/core/controllers/sync.go @@ -1,25 +1,30 @@ package controllers import ( + "github.com/crawlab-team/crawlab/core/entity" + "github.com/juju/errors" "path/filepath" "github.com/crawlab-team/crawlab/core/utils" "github.com/gin-gonic/gin" ) -func GetSyncScan(c *gin.Context) { +func GetSyncScan(c *gin.Context) (response *Response[entity.FsFileInfoMap], err error) { workspacePath := utils.GetWorkspace() dirPath := filepath.Join(workspacePath, c.Param("id"), c.Param("path")) files, err := utils.ScanDirectory(dirPath) if err != nil { - HandleErrorInternalServerError(c, err) - return + return GetErrorResponse[entity.FsFileInfoMap](err) } - HandleSuccessWithData(c, files) + return GetDataResponse(files) } -func GetSyncDownload(c *gin.Context) { +func GetSyncDownload(c *gin.Context) (err error) { workspacePath := utils.GetWorkspace() - filePath := filepath.Join(workspacePath, c.Param("id"), c.Param("path")) + filePath := filepath.Join(workspacePath, c.Param("id"), c.Query("path")) + if !utils.Exists(filePath) { + return errors.NotFoundf("file not exists: %s", filePath) + } c.File(filePath) + return nil } diff --git a/core/entity/fs_file_info.go b/core/entity/fs_file_info.go index c024f850..68d0f0a6 100644 --- a/core/entity/fs_file_info.go +++ b/core/entity/fs_file_info.go @@ -57,3 +57,5 @@ func (f *FsFileInfo) GetHash() string { func (f *FsFileInfo) GetChildren() []FsFileInfo { return f.Children } + +type FsFileInfoMap map[string]FsFileInfo diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index c546a2e4..8a6b8a7f 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/http" + "net/url" "os" "os/exec" "path/filepath" @@ -444,7 +445,7 @@ func (r *Runner) configureEnv() { r.cmd.Env = append(r.cmd.Env, "CRAWLAB_PARENT_PID="+fmt.Sprint(os.Getpid())) } -func (r *Runner) createHttpRequest(method, path string) (*http.Response, error) { +func (r *Runner) performHttpRequest(method, path string, params url.Values) (*http.Response, error) { // Normalize path if strings.HasPrefix(path, "/") { path = path[1:] @@ -457,10 +458,10 @@ func (r *Runner) createHttpRequest(method, path string) (*http.Response, error) } else { id = r.s.GitId.Hex() } - url := fmt.Sprintf("%s/sync/%s/%s", utils.GetApiEndpoint(), id, path) + requestUrl := fmt.Sprintf("%s/sync/%s/%s?%s", utils.GetApiEndpoint(), id, path, params.Encode()) // Create and execute request - req, err := http.NewRequest(method, url, nil) + req, err := http.NewRequest(method, requestUrl, nil) if err != nil { return nil, fmt.Errorf("error creating request: %v", err) } @@ -488,7 +489,10 @@ func (r *Runner) syncFiles() (err error) { // get file list from master r.Infof("fetching file list from master node") - resp, err := r.createHttpRequest("GET", "/scan?path="+workingDir) + params := url.Values{ + "path": []string{workingDir}, + } + resp, err := r.performHttpRequest("GET", "/scan", params) if err != nil { r.Errorf("error getting file list from master: %v", err) return err @@ -500,7 +504,7 @@ func (r *Runner) syncFiles() (err error) { return err } var response struct { - Data map[string]entity.FsFileInfo `json:"data"` + Data entity.FsFileInfoMap `json:"data"` } err = json.Unmarshal(body, &response) if err != nil { @@ -510,7 +514,7 @@ func (r *Runner) syncFiles() (err error) { } // create a map for master files - masterFilesMap := make(map[string]entity.FsFileInfo) + masterFilesMap := make(entity.FsFileInfoMap) for _, file := range response.Data { masterFilesMap[file.Path] = file } @@ -591,8 +595,10 @@ func (r *Runner) syncFiles() (err error) { // downloadFile downloads a file from the master node and saves it to the local file system func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsFileInfo) error { r.Debugf("downloading file: %s -> %s", path, filePath) - - resp, err := r.createHttpRequest("GET", "/download?path="+path) + params := url.Values{ + "path": []string{path}, + } + resp, err := r.performHttpRequest("GET", "/download", params) if err != nil { r.Errorf("error getting file response: %v", err) return err diff --git a/core/utils/file.go b/core/utils/file.go index 4d2629fd..21328607 100644 --- a/core/utils/file.go +++ b/core/utils/file.go @@ -10,6 +10,7 @@ import ( "os" "path" "path/filepath" + "regexp" "github.com/crawlab-team/crawlab/core/entity" ) @@ -182,16 +183,35 @@ func GetFileHash(filePath string) (res string, err error) { return hex.EncodeToString(hash.Sum(nil)), nil } -const IgnoreFileRegexPattern = "^(node_modules)/" +const IgnoreFileRegexPattern = `(^node_modules|__pycache__)/|\.(tmp|temp|log|swp|swo|bak|orig|lock|pid|pyc|pyo)$` -func ScanDirectory(dir string) (res map[string]entity.FsFileInfo, err error) { - files := make(map[string]entity.FsFileInfo) +func ScanDirectory(dir string) (res entity.FsFileInfoMap, err error) { + files := make(entity.FsFileInfoMap) + + // Compile the ignore pattern regex + ignoreRegex, err := regexp.Compile(IgnoreFileRegexPattern) + if err != nil { + return nil, fmt.Errorf("failed to compile ignore pattern: %v", err) + } err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } + relPath, err := filepath.Rel(dir, path) + if err != nil { + return err + } + + // Skip files that match the ignore pattern + if ignoreRegex.MatchString(relPath) { + if info.IsDir() { + return filepath.SkipDir + } + return nil + } + var hash string if !info.IsDir() { hash, err = GetFileHash(path) @@ -200,11 +220,6 @@ func ScanDirectory(dir string) (res map[string]entity.FsFileInfo, err error) { } } - relPath, err := filepath.Rel(dir, path) - if err != nil { - return err - } - files[relPath] = entity.FsFileInfo{ Name: info.Name(), Path: relPath,