From 460c8d958a07e123a5baf7795b3ea70e5cd24cc5 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Sun, 23 Jun 2024 23:20:58 +0800 Subject: [PATCH] fix: missing relational models issues --- core/controllers/base_file_v2.go | 30 ++--- core/controllers/project_v2.go | 89 +++++++++++++ core/controllers/router_v2.go | 102 ++++++++------- core/controllers/spider_v2.go | 212 +++++++++++++++++++++++-------- core/controllers/sync.go | 9 +- core/controllers/task_v2.go | 57 ++++++--- core/task/handler/runner_v2.go | 59 ++++++--- core/task/handler/service_v2.go | 5 +- vcs/entity.go | 11 +- 9 files changed, 406 insertions(+), 168 deletions(-) create mode 100644 core/controllers/project_v2.go diff --git a/core/controllers/base_file_v2.go b/core/controllers/base_file_v2.go index 6c398abd..38987be7 100644 --- a/core/controllers/base_file_v2.go +++ b/core/controllers/base_file_v2.go @@ -8,7 +8,6 @@ import ( "github.com/crawlab-team/crawlab/core/interfaces" "github.com/gin-gonic/gin" "github.com/spf13/viper" - "go.mongodb.org/mongo-driver/bson/primitive" "io" "os" "path/filepath" @@ -18,7 +17,7 @@ import ( func GetBaseFileListDir(rootPath string, c *gin.Context) { path := c.Query("path") - fsSvc, err := getBaseFileFsSvc(rootPath, c) + fsSvc, err := getBaseFileFsSvc(rootPath) if err != nil { HandleErrorBadRequest(c, err) return @@ -38,7 +37,7 @@ func GetBaseFileListDir(rootPath string, c *gin.Context) { func GetBaseFileFile(rootPath string, c *gin.Context) { path := c.Query("path") - fsSvc, err := getBaseFileFsSvc(rootPath, c) + fsSvc, err := getBaseFileFsSvc(rootPath) if err != nil { HandleErrorBadRequest(c, err) return @@ -56,7 +55,7 @@ func GetBaseFileFile(rootPath string, c *gin.Context) { func GetBaseFileFileInfo(rootPath string, c *gin.Context) { path := c.Query("path") - fsSvc, err := getBaseFileFsSvc(rootPath, c) + fsSvc, err := getBaseFileFsSvc(rootPath) if err != nil { HandleErrorBadRequest(c, err) return @@ -72,7 +71,7 @@ func GetBaseFileFileInfo(rootPath string, c *gin.Context) { } func PostBaseFileSaveFile(rootPath string, c *gin.Context) { - fsSvc, err := getBaseFileFsSvc(rootPath, c) + fsSvc, err := getBaseFileFsSvc(rootPath) if err != nil { HandleErrorInternalServerError(c, err) return @@ -122,7 +121,7 @@ func PostBaseFileSaveFile(rootPath string, c *gin.Context) { } func PostBaseFileSaveFiles(rootPath string, c *gin.Context) { - fsSvc, err := getBaseFileFsSvc(rootPath, c) + fsSvc, err := getBaseFileFsSvc(rootPath) if err != nil { HandleErrorInternalServerError(c, err) return @@ -183,7 +182,7 @@ func PostBaseFileSaveDir(rootPath string, c *gin.Context) { return } - fsSvc, err := getBaseFileFsSvc(rootPath, c) + fsSvc, err := getBaseFileFsSvc(rootPath) if err != nil { HandleErrorBadRequest(c, err) return @@ -207,7 +206,7 @@ func PostBaseFileRenameFile(rootPath string, c *gin.Context) { return } - fsSvc, err := getBaseFileFsSvc(rootPath, c) + fsSvc, err := getBaseFileFsSvc(rootPath) if err != nil { HandleErrorBadRequest(c, err) return @@ -231,7 +230,7 @@ func DeleteBaseFileFile(rootPath string, c *gin.Context) { payload.Path = "." } - fsSvc, err := getBaseFileFsSvc(rootPath, c) + fsSvc, err := getBaseFileFsSvc(rootPath) if err != nil { HandleErrorBadRequest(c, err) return @@ -259,7 +258,7 @@ func PostBaseFileCopyFile(rootPath string, c *gin.Context) { return } - fsSvc, err := getBaseFileFsSvc(rootPath, c) + fsSvc, err := getBaseFileFsSvc(rootPath) if err != nil { HandleErrorBadRequest(c, err) return @@ -274,7 +273,7 @@ func PostBaseFileCopyFile(rootPath string, c *gin.Context) { } func PostBaseFileExport(rootPath string, c *gin.Context) { - fsSvc, err := getBaseFileFsSvc(rootPath, c) + fsSvc, err := getBaseFileFsSvc(rootPath) if err != nil { HandleErrorBadRequest(c, err) return @@ -292,14 +291,9 @@ func PostBaseFileExport(rootPath string, c *gin.Context) { c.File(zipFilePath) } -func getBaseFileFsSvc(rootPath string, c *gin.Context) (svc interfaces.FsServiceV2, err error) { - id, err := primitive.ObjectIDFromHex(c.Param("id")) - if err != nil { - return nil, err - } - +func getBaseFileFsSvc(rootPath string) (svc interfaces.FsServiceV2, err error) { workspacePath := viper.GetString("workspace") - fsSvc := fs.NewFsServiceV2(filepath.Join(workspacePath, id.Hex(), rootPath)) + fsSvc := fs.NewFsServiceV2(filepath.Join(workspacePath, rootPath)) return fsSvc, nil } diff --git a/core/controllers/project_v2.go b/core/controllers/project_v2.go new file mode 100644 index 00000000..dd2d8926 --- /dev/null +++ b/core/controllers/project_v2.go @@ -0,0 +1,89 @@ +package controllers + +import ( + "github.com/crawlab-team/crawlab/core/errors" + "github.com/crawlab-team/crawlab/core/models/models" + "github.com/crawlab-team/crawlab/core/models/service" + "github.com/crawlab-team/crawlab/db/mongo" + "github.com/gin-gonic/gin" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + mongo2 "go.mongodb.org/mongo-driver/mongo" +) + +func GetProjectList(c *gin.Context) { + // get all list + all := MustGetFilterAll(c) + if all { + NewControllerV2[models.ProjectV2]().getAll(c) + return + } + + // params + pagination := MustGetPagination(c) + query := MustGetFilterQuery(c) + sort := MustGetSortOption(c) + + // get list + projects, err := service.NewModelServiceV2[models.ProjectV2]().GetMany(query, &mongo.FindOptions{ + Sort: sort, + Skip: pagination.Size * (pagination.Page - 1), + Limit: pagination.Size, + }) + if err != nil { + if err.Error() != mongo2.ErrNoDocuments.Error() { + HandleErrorInternalServerError(c, err) + } + return + } + if len(projects) == 0 { + HandleSuccessWithListData(c, []models.ProjectV2{}, 0) + return + } + + // total count + total, err := service.NewModelServiceV2[models.ProjectV2]().Count(query) + if err != nil { + HandleErrorInternalServerError(c, err) + return + } + + // project ids + var ids []primitive.ObjectID + + // count cache + cache := map[primitive.ObjectID]int{} + + // iterate + for _, p := range projects { + ids = append(ids, p.Id) + cache[p.Id] = 0 + } + + // spiders + spiders, err := service.NewModelServiceV2[models.SpiderV2]().GetMany(bson.M{ + "project_id": bson.M{ + "$in": ids, + }, + }, nil) + if err != nil { + HandleErrorInternalServerError(c, err) + return + } + for _, s := range spiders { + _, ok := cache[s.ProjectId] + if !ok { + HandleErrorInternalServerError(c, errors.ErrorControllerMissingInCache) + return + } + cache[s.ProjectId]++ + } + + // assign + for _, p := range projects { + p.Spiders = cache[p.Id] + projects = append(projects, p) + } + + HandleSuccessWithListData(c, projects, total) +} diff --git a/core/controllers/router_v2.go b/core/controllers/router_v2.go index 908368bd..1e274bff 100644 --- a/core/controllers/router_v2.go +++ b/core/controllers/router_v2.go @@ -61,204 +61,210 @@ func InitRoutes(app *gin.Engine) (err error) { RegisterController(groups.AuthGroup, "/nodes", NewControllerV2[models.NodeV2]()) RegisterController(groups.AuthGroup, "/notifications/settings", NewControllerV2[models.SettingV2]()) RegisterController(groups.AuthGroup, "/permissions", NewControllerV2[models.PermissionV2]()) - RegisterController(groups.AuthGroup, "/projects", NewControllerV2[models.ProjectV2]()) + RegisterController(groups.AuthGroup, "/projects", NewControllerV2[models.ProjectV2]([]Action{ + { + Method: http.MethodGet, + Path: "", + HandlerFunc: GetProjectList, + }, + }...)) RegisterController(groups.AuthGroup, "/roles", NewControllerV2[models.RoleV2]()) - RegisterController(groups.AuthGroup, "/schedules", NewControllerV2[models.ScheduleV2]( - Action{ + RegisterController(groups.AuthGroup, "/schedules", NewControllerV2[models.ScheduleV2]([]Action{ + { Method: http.MethodPost, Path: "", HandlerFunc: PostSchedule, }, - Action{ + { Method: http.MethodPut, Path: "/:id", HandlerFunc: PutScheduleById, }, - Action{ + { Method: http.MethodPost, Path: "/:id/enable", HandlerFunc: PostScheduleEnable, }, - Action{ + { Method: http.MethodPost, Path: "/:id/disable", HandlerFunc: PostScheduleDisable, }, - )) - RegisterController(groups.AuthGroup, "/spiders", NewControllerV2[models.SpiderV2]( - Action{ + }...)) + RegisterController(groups.AuthGroup, "/spiders", NewControllerV2[models.SpiderV2]([]Action{ + { Method: http.MethodGet, Path: "/:id", HandlerFunc: GetSpiderById, }, - Action{ + { Method: http.MethodGet, Path: "", HandlerFunc: GetSpiderList, }, - Action{ + { Method: http.MethodPost, Path: "", HandlerFunc: PostSpider, }, - Action{ + { Method: http.MethodPut, Path: "/:id", HandlerFunc: PutSpiderById, }, - Action{ + { Method: http.MethodDelete, Path: "/:id", HandlerFunc: DeleteSpiderById, }, - Action{ + { Method: http.MethodDelete, Path: "", HandlerFunc: DeleteSpiderList, }, - Action{ + { Method: http.MethodGet, Path: "/:id/files/list", HandlerFunc: GetSpiderListDir, }, - Action{ + { Method: http.MethodGet, Path: "/:id/files/get", HandlerFunc: GetSpiderFile, }, - Action{ + { Method: http.MethodGet, Path: "/:id/files/info", HandlerFunc: GetSpiderFileInfo, }, - Action{ + { Method: http.MethodPost, Path: "/:id/files/save", HandlerFunc: PostSpiderSaveFile, }, - Action{ + { Method: http.MethodPost, Path: "/:id/files/save/batch", HandlerFunc: PostSpiderSaveFiles, }, - Action{ + { Method: http.MethodPost, Path: "/:id/files/save/dir", HandlerFunc: PostSpiderSaveDir, }, - Action{ + { Method: http.MethodPost, Path: "/:id/files/rename", HandlerFunc: PostSpiderRenameFile, }, - Action{ + { Method: http.MethodDelete, Path: "/:id/files", HandlerFunc: DeleteSpiderFile, }, - Action{ + { Method: http.MethodPost, Path: "/:id/files/copy", HandlerFunc: PostSpiderCopyFile, }, - Action{ + { Method: http.MethodPost, Path: "/:id/files/export", HandlerFunc: PostSpiderExport, }, - Action{ + { Method: http.MethodPost, Path: "/:id/run", HandlerFunc: PostSpiderRun, }, - Action{ + { Method: http.MethodGet, Path: "/:id/data-source", HandlerFunc: GetSpiderDataSource, }, - Action{ + { Method: http.MethodPost, Path: "/:id/data-source/:ds_id", HandlerFunc: PostSpiderDataSource, }, - )) - RegisterController(groups.AuthGroup, "/tasks", NewControllerV2[models.TaskV2]( - Action{ + }...)) + RegisterController(groups.AuthGroup, "/tasks", NewControllerV2[models.TaskV2]([]Action{ + { Method: http.MethodGet, Path: "/:id", HandlerFunc: GetTaskById, }, - Action{ + { Method: http.MethodGet, Path: "", HandlerFunc: GetTaskList, }, - Action{ + { Method: http.MethodDelete, Path: "/:id", HandlerFunc: DeleteTaskById, }, - Action{ + { Method: http.MethodDelete, Path: "", HandlerFunc: DeleteList, }, - Action{ + { Method: http.MethodPost, Path: "/run", HandlerFunc: PostTaskRun, }, - Action{ + { Method: http.MethodPost, Path: "/:id/restart", HandlerFunc: PostTaskRestart, }, - Action{ + { Method: http.MethodPost, Path: "/:id/cancel", HandlerFunc: PostTaskCancel, }, - Action{ + { Method: http.MethodGet, Path: "/:id/logs", HandlerFunc: GetTaskLogs, }, - Action{ + { Method: http.MethodGet, Path: "/:id/data", HandlerFunc: GetTaskData, }, - )) - RegisterController(groups.AuthGroup, "/tokens", NewControllerV2[models.TokenV2]( - Action{ + }...)) + RegisterController(groups.AuthGroup, "/tokens", NewControllerV2[models.TokenV2]([]Action{ + { Method: http.MethodPost, Path: "", HandlerFunc: PostToken, }, - )) - RegisterController(groups.AuthGroup, "/users", NewControllerV2[models.UserV2]( - Action{ + }...)) + RegisterController(groups.AuthGroup, "/users", NewControllerV2[models.UserV2]([]Action{ + { Method: http.MethodPost, Path: "", HandlerFunc: PostUser, }, - Action{ + { Method: http.MethodPost, Path: "/:id/change-password", HandlerFunc: PostUserChangePassword, }, - Action{ + { Method: http.MethodGet, Path: "/me", HandlerFunc: GetUserMe, }, - Action{ + { Method: http.MethodPut, Path: "/me", HandlerFunc: PutUserById, }, - )) + }...)) RegisterActions(groups.AuthGroup, "/results", []Action{ { diff --git a/core/controllers/spider_v2.go b/core/controllers/spider_v2.go index 9b28b94d..78b4d249 100644 --- a/core/controllers/spider_v2.go +++ b/core/controllers/spider_v2.go @@ -2,7 +2,7 @@ package controllers import ( "errors" - log2 "github.com/apex/log" + "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/fs" "github.com/crawlab-team/crawlab/core/interfaces" @@ -11,6 +11,7 @@ import ( "github.com/crawlab-team/crawlab/core/spider/admin" "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/db/mongo" + "github.com/crawlab-team/crawlab/trace" "github.com/gin-gonic/gin" "github.com/spf13/viper" "go.mongodb.org/mongo-driver/bson" @@ -64,12 +65,25 @@ func GetSpiderById(c *gin.Context) { } func GetSpiderList(c *gin.Context) { + // get all list + all := MustGetFilterAll(c) + if all { + NewControllerV2[models.ProjectV2]().getAll(c) + return + } + + // get list withStats := c.Query("stats") if withStats == "" { NewControllerV2[models.SpiderV2]().GetList(c) return } + // get list with stats + getSpiderListWithStats(c) +} + +func getSpiderListWithStats(c *gin.Context) { // params pagination := MustGetPagination(c) query := MustGetFilterQuery(c) @@ -205,6 +219,7 @@ func PostSpider(c *gin.Context) { return } + // user u := GetUserFromContextV2(c) // add @@ -229,7 +244,12 @@ func PostSpider(c *gin.Context) { } // create folder - err = getSpiderFsSvcById(id).CreateDir(".") + fsSvc, err := getSpiderFsSvcById(id) + if err != nil { + HandleErrorInternalServerError(c, err) + return + } + err = fsSvc.CreateDir(".") if err != nil { HandleErrorInternalServerError(c, err) return @@ -336,7 +356,7 @@ func DeleteSpiderById(c *gin.Context) { // delete task logs logPath := filepath.Join(viper.GetString("log.path"), id) if err := os.RemoveAll(logPath); err != nil { - log2.Warnf("failed to remove task log directory: %s", logPath) + log.Warnf("failed to remove task log directory: %s", logPath) } wg.Done() }(id.Hex()) @@ -349,6 +369,35 @@ func DeleteSpiderById(c *gin.Context) { return } + go func() { + // spider + s, err := service.NewModelServiceV2[models.SpiderV2]().GetById(id) + if err != nil { + log.Errorf("failed to get spider: %s", err.Error()) + trace.PrintError(err) + return + } + + // skip spider with git + if !s.GitId.IsZero() { + return + } + + // delete spider directory + fsSvc, err := getSpiderFsSvcById(id) + if err != nil { + log.Errorf("failed to get spider fs service: %s", err.Error()) + trace.PrintError(err) + return + } + err = fsSvc.Delete(".") + if err != nil { + log.Errorf("failed to delete spider directory: %s", err.Error()) + trace.PrintError(err) + return + } + }() + HandleSuccess(c) } @@ -414,7 +463,7 @@ func DeleteSpiderList(c *gin.Context) { // delete task logs logPath := filepath.Join(viper.GetString("log.path"), id) if err := os.RemoveAll(logPath); err != nil { - log2.Warnf("failed to remove task log directory: %s", logPath) + log.Warnf("failed to remove task log directory: %s", logPath) } wg.Done() }(id.Hex()) @@ -427,97 +476,136 @@ func DeleteSpiderList(c *gin.Context) { return } + // delete spider directories + go func() { + wg := sync.WaitGroup{} + wg.Add(len(payload.Ids)) + for _, id := range payload.Ids { + go func(id primitive.ObjectID) { + defer wg.Done() + + // spider + s, err := service.NewModelServiceV2[models.SpiderV2]().GetById(id) + if err != nil { + log.Errorf("failed to get spider: %s", err.Error()) + trace.PrintError(err) + return + } + + // skip spider with git + if !s.GitId.IsZero() { + return + } + + // delete spider directory + fsSvc, err := getSpiderFsSvcById(id) + if err != nil { + log.Errorf("failed to get spider fs service: %s", err.Error()) + trace.PrintError(err) + return + } + err = fsSvc.Delete(".") + if err != nil { + log.Errorf("failed to delete spider directory: %s", err.Error()) + trace.PrintError(err) + return + } + }(id) + } + wg.Wait() + }() + HandleSuccess(c) } func GetSpiderListDir(c *gin.Context) { - s, err := allowSpiderGit(c) + rootPath, err := getSpiderRootPath(c) if err != nil { - HandleErrorInternalServerError(c, err) + HandleErrorForbidden(c, err) return } - GetBaseFileListDir(s.GitRootPath, c) + GetBaseFileListDir(rootPath, c) } func GetSpiderFile(c *gin.Context) { - s, err := allowSpiderGit(c) + rootPath, err := getSpiderRootPath(c) if err != nil { - HandleErrorInternalServerError(c, err) + HandleErrorForbidden(c, err) return } - GetBaseFileFile(s.GitRootPath, c) + GetBaseFileFile(rootPath, c) } func GetSpiderFileInfo(c *gin.Context) { - s, err := allowSpiderGit(c) + rootPath, err := getSpiderRootPath(c) if err != nil { - HandleErrorInternalServerError(c, err) + HandleErrorForbidden(c, err) return } - GetBaseFileFileInfo(s.GitRootPath, c) + GetBaseFileFileInfo(rootPath, c) } func PostSpiderSaveFile(c *gin.Context) { - s, err := allowSpiderGit(c) + rootPath, err := getSpiderRootPath(c) if err != nil { - HandleErrorInternalServerError(c, err) + HandleErrorForbidden(c, err) return } - PostBaseFileSaveFile(s.GitRootPath, c) + PostBaseFileSaveFile(rootPath, c) } func PostSpiderSaveFiles(c *gin.Context) { - s, err := allowSpiderGit(c) + rootPath, err := getSpiderRootPath(c) if err != nil { - HandleErrorInternalServerError(c, err) + HandleErrorForbidden(c, err) return } - PostBaseFileSaveFiles(s.GitRootPath, c) + PostBaseFileSaveFiles(rootPath, c) } func PostSpiderSaveDir(c *gin.Context) { - s, err := allowSpiderGit(c) + rootPath, err := getSpiderRootPath(c) if err != nil { - HandleErrorInternalServerError(c, err) + HandleErrorForbidden(c, err) return } - PostBaseFileSaveDir(s.GitRootPath, c) + PostBaseFileSaveDir(rootPath, c) } func PostSpiderRenameFile(c *gin.Context) { - s, err := allowSpiderGit(c) + rootPath, err := getSpiderRootPath(c) if err != nil { - HandleErrorInternalServerError(c, err) + HandleErrorForbidden(c, err) return } - PostBaseFileRenameFile(s.GitRootPath, c) + PostBaseFileRenameFile(rootPath, c) } func DeleteSpiderFile(c *gin.Context) { - s, err := allowSpiderGit(c) + rootPath, err := getSpiderRootPath(c) if err != nil { - HandleErrorInternalServerError(c, err) + HandleErrorForbidden(c, err) return } - DeleteBaseFileFile(s.GitRootPath, c) + DeleteBaseFileFile(rootPath, c) } func PostSpiderCopyFile(c *gin.Context) { - s, err := allowSpiderGit(c) + rootPath, err := getSpiderRootPath(c) if err != nil { - HandleErrorInternalServerError(c, err) + HandleErrorForbidden(c, err) return } - PostBaseFileCopyFile(s.GitRootPath, c) + PostBaseFileCopyFile(rootPath, c) } func PostSpiderExport(c *gin.Context) { - s, err := allowSpiderGit(c) + rootPath, err := getSpiderRootPath(c) if err != nil { - HandleErrorInternalServerError(c, err) + HandleErrorForbidden(c, err) return } - PostBaseFileExport(s.GitRootPath, c) + PostBaseFileExport(rootPath, c) } func PostSpiderRun(c *gin.Context) { @@ -628,22 +716,25 @@ func PostSpiderDataSource(c *gin.Context) { HandleSuccess(c) } -func getSpiderFsSvc(c *gin.Context) (svc interfaces.FsServiceV2, err error) { - id, err := primitive.ObjectIDFromHex(c.Param("id")) - if err != nil { - return nil, err - } - +func getSpiderFsSvc(s *models.SpiderV2) (svc interfaces.FsServiceV2, err error) { workspacePath := viper.GetString("workspace") - fsSvc := fs.NewFsServiceV2(filepath.Join(workspacePath, id.Hex())) + fsSvc := fs.NewFsServiceV2(filepath.Join(workspacePath, s.Id.Hex())) return fsSvc, nil } -func getSpiderFsSvcById(id primitive.ObjectID) interfaces.FsServiceV2 { - workspacePath := viper.GetString("workspace") - fsSvc := fs.NewFsServiceV2(filepath.Join(workspacePath, id.Hex())) - return fsSvc +func GetSpiderFsSvcById(id primitive.ObjectID) (svc interfaces.FsServiceV2, err error) { + return getSpiderFsSvcById(id) +} + +func getSpiderFsSvcById(id primitive.ObjectID) (svc interfaces.FsServiceV2, err error) { + s, err := service.NewModelServiceV2[models.SpiderV2]().GetById(id) + if err != nil { + log.Errorf("failed to get spider: %s", err.Error()) + trace.PrintError(err) + return nil, err + } + return getSpiderFsSvc(s) } func upsertSpiderDataCollection(s *models.SpiderV2) (err error) { @@ -685,21 +776,32 @@ func upsertSpiderDataCollection(s *models.SpiderV2) (err error) { return nil } -func allowSpiderGit(c *gin.Context) (s models.SpiderV2, err error) { - if utils.IsPro() { - return s, nil - } +func UpsertSpiderDataCollection(s *models.SpiderV2) (err error) { + return upsertSpiderDataCollection(s) +} + +func getSpiderRootPath(c *gin.Context) (rootPath string, err error) { + // spider id id, err := primitive.ObjectIDFromHex(c.Param("id")) if err != nil { - return s, err + return "", err } - _s, err := service.NewModelServiceV2[models.SpiderV2]().GetById(id) + + // spider + s, err := service.NewModelServiceV2[models.SpiderV2]().GetById(id) if err != nil { - return s, err + return "", err } + + // check git permission + if !utils.IsPro() && !s.GitId.IsZero() { + return "", errors.New("git is not allowed in the community version") + } + + // if git id is zero, return spider id as root path if s.GitId.IsZero() { - return s, errors.New("git is not allowed in this edition") + return id.Hex(), nil } - s = *_s - return s, nil + + return filepath.Join(s.GitId.Hex(), rootPath), nil } diff --git a/core/controllers/sync.go b/core/controllers/sync.go index 674bff76..5f9874c8 100644 --- a/core/controllers/sync.go +++ b/core/controllers/sync.go @@ -31,7 +31,8 @@ type syncContext struct { func (ctx *syncContext) scan(c *gin.Context) { id := c.Param("id") - dir := ctx._getDir(id) + path := c.Query("path") + dir := ctx._getDir(id, path) files, err := utils.ScanDirectory(dir) if err != nil { HandleErrorInternalServerError(c, err) @@ -43,13 +44,13 @@ func (ctx *syncContext) scan(c *gin.Context) { func (ctx *syncContext) download(c *gin.Context) { id := c.Param("id") filePath := c.Query("path") - dir := ctx._getDir(id) + dir := ctx._getDir(id, "") c.File(filepath.Join(dir, filePath)) } -func (ctx *syncContext) _getDir(id string) string { +func (ctx *syncContext) _getDir(id string, path string) string { workspacePath := viper.GetString("workspace") - return filepath.Join(workspacePath, id) + return filepath.Join(workspacePath, id, path) } func newSyncContext() syncContext { diff --git a/core/controllers/task_v2.go b/core/controllers/task_v2.go index 95c58de6..aad4fcf3 100644 --- a/core/controllers/task_v2.go +++ b/core/controllers/task_v2.go @@ -71,8 +71,8 @@ func GetTaskList(c *gin.Context) { query := MustGetFilterQuery(c) sort := MustGetSortOption(c) - // get list - list, err := service.NewModelServiceV2[models.TaskV2]().GetMany(query, &mongo.FindOptions{ + // get tasks + tasks, err := service.NewModelServiceV2[models.TaskV2]().GetMany(query, &mongo.FindOptions{ Sort: sort, Skip: pagination.Size * (pagination.Page - 1), Limit: pagination.Size, @@ -87,15 +87,17 @@ func GetTaskList(c *gin.Context) { } // check empty list - if len(list) == 0 { + if len(tasks) == 0 { HandleSuccessWithListData(c, nil, 0) return } // ids - var ids []primitive.ObjectID - for _, t := range list { - ids = append(ids, t.Id) + var taskIds []primitive.ObjectID + var spiderIds []primitive.ObjectID + for _, t := range tasks { + taskIds = append(taskIds, t.Id) + spiderIds = append(spiderIds, t.SpiderId) } // total count @@ -106,33 +108,56 @@ func GetTaskList(c *gin.Context) { } // stat list - query = bson.M{ + stats, err := service.NewModelServiceV2[models.TaskStatV2]().GetMany(bson.M{ "_id": bson.M{ - "$in": ids, + "$in": taskIds, }, - } - stats, err := service.NewModelServiceV2[models.TaskStatV2]().GetMany(query, nil) + }, nil) if err != nil { HandleErrorInternalServerError(c, err) return } // cache stat list to dict - dict := map[primitive.ObjectID]models.TaskStatV2{} + statsDict := map[primitive.ObjectID]models.TaskStatV2{} for _, s := range stats { - dict[s.Id] = s + statsDict[s.Id] = s + } + + // spider list + spiders, err := service.NewModelServiceV2[models.SpiderV2]().GetMany(bson.M{ + "_id": bson.M{ + "$in": spiderIds, + }, + }, nil) + if err != nil { + HandleErrorInternalServerError(c, err) + return + } + + // cache spider list to dict + spiderDict := map[primitive.ObjectID]models.SpiderV2{} + for _, s := range spiders { + spiderDict[s.Id] = s } // iterate list again - for i, t := range list { - ts, ok := dict[t.Id] + for i, t := range tasks { + // task stat + ts, ok := statsDict[t.Id] if ok { - list[i].Stat = &ts + tasks[i].Stat = &ts + } + + // spider + s, ok := spiderDict[t.SpiderId] + if ok { + tasks[i].Spider = &s } } // response - HandleSuccessWithListData(c, list, total) + HandleSuccessWithListData(c, tasks, total) } func DeleteTaskById(c *gin.Context) { diff --git a/core/task/handler/runner_v2.go b/core/task/handler/runner_v2.go index 7424a2c1..57f95edb 100644 --- a/core/task/handler/runner_v2.go +++ b/core/task/handler/runner_v2.go @@ -71,16 +71,8 @@ func (r *RunnerV2) Init() (err error) { // start grpc client if !r.c.IsStarted() { - r.c.Start() - } - - // working directory - workspacePath := viper.GetString("workspace") - r.cwd = filepath.Join(workspacePath, r.s.Id.Hex()) - - // sync files from master - if !utils.IsMaster() { - if err := r.syncFiles(); err != nil { + err := r.c.Start() + if err != nil { return err } } @@ -97,6 +89,16 @@ func (r *RunnerV2) Run() (err error) { // log task started log.Infof("task[%s] started", r.tid.Hex()) + // configure working directory + r.configureCwd() + + // sync files worker nodes + if !utils.IsMaster() { + if err := r.syncFiles(); err != nil { + return err + } + } + // configure cmd r.configureCmd() @@ -317,26 +319,31 @@ func (r *RunnerV2) configureEnv() { } func (r *RunnerV2) syncFiles() (err error) { - masterURL := fmt.Sprintf("%s/sync/%s", viper.GetString("api.endpoint"), r.s.Id.Hex()) - workspacePath := viper.GetString("workspace") - workerDir := filepath.Join(workspacePath, r.s.Id.Hex()) + var id string + if r.s.GitId.IsZero() { + id = r.s.Id.Hex() + } else { + id = r.s.GitId.Hex() + } + masterURL := fmt.Sprintf("%s/sync/%s", viper.GetString("api.endpoint"), id) + workerDir := r.cwd // get file list from master - resp, err := http.Get(masterURL + "/scan") + resp, err := http.Get(masterURL + "/scan?path=" + workerDir) if err != nil { - fmt.Println("Error getting file list from master:", err) + log.Errorf("Error getting file list from master: %v", err) return trace.TraceError(err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - fmt.Println("Error reading response body:", err) + log.Errorf("Error reading response body: %v", err) return trace.TraceError(err) } var masterFiles map[string]entity.FsFileInfo err = json.Unmarshal(body, &masterFiles) if err != nil { - fmt.Println("Error unmarshaling JSON:", err) + log.Errorf("Error unmarshaling JSON: %v", err) return trace.TraceError(err) } @@ -349,7 +356,7 @@ func (r *RunnerV2) syncFiles() (err error) { // create worker directory if not exists if _, err := os.Stat(workerDir); os.IsNotExist(err) { if err := os.MkdirAll(workerDir, os.ModePerm); err != nil { - fmt.Println("Error creating worker directory:", err) + log.Errorf("Error creating worker directory: %v", err) return trace.TraceError(err) } } @@ -357,7 +364,7 @@ func (r *RunnerV2) syncFiles() (err error) { // get file list from worker workerFiles, err := utils.ScanDirectory(workerDir) if err != nil { - fmt.Println("Error scanning worker directory:", err) + log.Errorf("Error scanning worker directory: %v", err) return trace.TraceError(err) } @@ -368,10 +375,10 @@ func (r *RunnerV2) syncFiles() (err error) { // delete files that are deleted on master node for path, workerFile := range workerFiles { if _, exists := masterFilesMap[path]; !exists { - fmt.Println("Deleting file:", path) + log.Infof("Deleting file: %s", path) err := os.Remove(workerFile.FullPath) if err != nil { - fmt.Println("Error deleting file:", err) + log.Errorf("Error deleting file: %v", err) } } } @@ -619,7 +626,17 @@ func (r *RunnerV2) _updateSpiderStat(status string) { return } } +} +func (r *RunnerV2) configureCwd() { + workspacePath := viper.GetString("workspace") + if r.s.GitId.IsZero() { + // not git + r.cwd = filepath.Join(workspacePath, r.s.Id.Hex()) + } else { + // git + r.cwd = filepath.Join(workspacePath, r.s.GitId.Hex(), r.s.GitRootPath) + } } func NewTaskRunnerV2(id primitive.ObjectID, svc *ServiceV2) (r2 *RunnerV2, err error) { diff --git a/core/task/handler/service_v2.go b/core/task/handler/service_v2.go index ad0dcdd9..4635ef48 100644 --- a/core/task/handler/service_v2.go +++ b/core/task/handler/service_v2.go @@ -43,7 +43,10 @@ type ServiceV2 struct { func (svc *ServiceV2) Start() { // Initialize gRPC if not started if !svc.c.IsStarted() { - svc.c.Start() + err := svc.c.Start() + if err != nil { + return + } } go svc.ReportStatus() diff --git a/vcs/entity.go b/vcs/entity.go index 509e3784..e96c2feb 100644 --- a/vcs/entity.go +++ b/vcs/entity.go @@ -9,11 +9,12 @@ type GitOptions struct { } type GitRef struct { - Type string `json:"type"` - Name string `json:"name"` - FullName string `json:"full_name"` - Hash string `json:"hash"` - Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` + Name string `json:"name"` + FullName string `json:"full_name"` + Hash string `json:"hash"` + Timestamp time.Time `json:"timestamp"` + RemoteTrack string `json:"remote_track"` } type GitLog struct {