diff --git a/core/controllers/project_v2.go b/core/controllers/project_v2.go index dd2d8926..34fb6b8d 100644 --- a/core/controllers/project_v2.go +++ b/core/controllers/project_v2.go @@ -80,10 +80,11 @@ func GetProjectList(c *gin.Context) { } // assign + var data []models.ProjectV2 for _, p := range projects { p.Spiders = cache[p.Id] - projects = append(projects, p) + data = append(data, p) } - HandleSuccessWithListData(c, projects, total) + HandleSuccessWithListData(c, data, total) } diff --git a/core/controllers/router_v2.go b/core/controllers/router_v2.go index 1e274bff..ec23515e 100644 --- a/core/controllers/router_v2.go +++ b/core/controllers/router_v2.go @@ -363,6 +363,18 @@ func InitRoutes(app *gin.Engine) (err error) { HandlerFunc: PostLogout, }, }) + RegisterActions(groups.AnonymousGroup, "/sync", []Action{ + { + Method: http.MethodGet, + Path: "/:id/scan", + HandlerFunc: GetSyncScan, + }, + { + Method: http.MethodGet, + Path: "/:id/download", + HandlerFunc: GetSyncDownload, + }, + }) return nil } diff --git a/core/controllers/sync_v2.go b/core/controllers/sync_v2.go new file mode 100644 index 00000000..5e44fcd4 --- /dev/null +++ b/core/controllers/sync_v2.go @@ -0,0 +1,31 @@ +package controllers + +import ( + "github.com/crawlab-team/crawlab/core/utils" + "github.com/gin-gonic/gin" + "github.com/spf13/viper" + "net/http" + "path/filepath" +) + +func GetSyncScan(c *gin.Context) { + id := c.Param("id") + path := c.Query("path") + + workspacePath := viper.GetString("workspace") + dirPath := filepath.Join(workspacePath, id, path) + files, err := utils.ScanDirectory(dirPath) + if err != nil { + HandleErrorInternalServerError(c, err) + return + } + c.AbortWithStatusJSON(http.StatusOK, files) +} + +func GetSyncDownload(c *gin.Context) { + id := c.Param("id") + path := c.Query("path") + workspacePath := viper.GetString("workspace") + filePath := filepath.Join(workspacePath, id, path) + c.File(filePath) +} diff --git a/core/grpc/server/node_server_v2.go b/core/grpc/server/node_server_v2.go index ac9499ca..6c69b14b 100644 --- a/core/grpc/server/node_server_v2.go +++ b/core/grpc/server/node_server_v2.go @@ -83,7 +83,7 @@ func (svr NodeServerV2) Register(ctx context.Context, req *grpc.Request) (res *g } node.SetCreated(primitive.NilObjectID) node.SetUpdated(primitive.NilObjectID) - _, err = service.NewModelServiceV2[models.NodeV2]().InsertOne(*nodeDb) + node.Id, err = service.NewModelServiceV2[models.NodeV2]().InsertOne(node) if err != nil { return HandleError(err) } diff --git a/core/node/config/config_service.go b/core/node/config/config_service.go index 7e05adde..c91c58bf 100644 --- a/core/node/config/config_service.go +++ b/core/node/config/config_service.go @@ -8,7 +8,7 @@ import ( "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/trace" "os" - "path" + "path/filepath" ) type Service struct { @@ -18,7 +18,7 @@ type Service struct { func (svc *Service) Init() (err error) { // check config directory path - configDirPath := path.Dir(svc.path) + configDirPath := filepath.Dir(svc.path) if !utils.Exists(configDirPath) { if err := os.MkdirAll(configDirPath, os.FileMode(0766)); err != nil { return trace.TraceError(err) @@ -55,13 +55,14 @@ func (svc *Service) Reload() (err error) { } func (svc *Service) GetBasicNodeInfo() (res interfaces.Entity) { - return &entity.NodeInfo{ + res = &entity.NodeInfo{ Key: svc.GetNodeKey(), Name: svc.GetNodeName(), IsMaster: svc.IsMaster(), AuthKey: svc.GetAuthKey(), MaxRunners: svc.GetMaxRunners(), } + return res } func (svc *Service) GetNodeKey() (res string) { diff --git a/core/sys_exec/sys_exec_darwin.go b/core/sys_exec/sys_exec_darwin.go index 130b7cb8..b6db18c2 100644 --- a/core/sys_exec/sys_exec_darwin.go +++ b/core/sys_exec/sys_exec_darwin.go @@ -4,12 +4,18 @@ package sys_exec import ( + "errors" "os/exec" + "strings" "syscall" ) -func BuildCmd(cmdStr string) *exec.Cmd { - return exec.Command("sh", "-c", cmdStr) +func BuildCmd(cmdStr string) (cmd *exec.Cmd, err error) { + if cmdStr == "" { + return nil, errors.New("command string is empty") + } + args := strings.Split(cmdStr, " ") + return exec.Command(args[0], args[1:]...), nil } func SetPgid(cmd *exec.Cmd) { diff --git a/core/sys_exec/sys_exec_linux.go b/core/sys_exec/sys_exec_linux.go index bf532a43..33af73e3 100644 --- a/core/sys_exec/sys_exec_linux.go +++ b/core/sys_exec/sys_exec_linux.go @@ -4,12 +4,18 @@ package sys_exec import ( + "errors" "os/exec" + "strings" "syscall" ) -func BuildCmd(cmdStr string) *exec.Cmd { - return exec.Command("sh", "-c", cmdStr) +func BuildCmd(cmdStr string) (cmd *exec.Cmd, err error) { + if cmdStr == "" { + return nil, errors.New("command string is empty") + } + args := strings.Split(cmdStr, " ") + return exec.Command(args[0], args[1:]...), nil } func SetPgid(cmd *exec.Cmd) { diff --git a/core/sys_exec/sys_exec_windows.go b/core/sys_exec/sys_exec_windows.go index e2678ccd..1f1afd5a 100644 --- a/core/sys_exec/sys_exec_windows.go +++ b/core/sys_exec/sys_exec_windows.go @@ -3,8 +3,16 @@ package sys_exec -import "os/exec" +import ( + "errors" + "os/exec" + "strings" +) -func BuildCmd(cmdStr string) *exec.Cmd { - return exec.Command("cmd", "/C", cmdStr) +func BuildCmd(cmdStr string) (cmd *exec.Cmd, err error) { + if cmdStr == "" { + return nil, errors.New("command string is empty") + } + args := strings.Split(cmdStr, " ") + return exec.Command(args[0], args[1:]...), nil } diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index b39d9445..32c7e5bf 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -237,7 +237,7 @@ func (r *Runner) configureCmd() { } // get cmd instance - r.cmd = sys_exec.BuildCmd(cmdStr) + r.cmd, _ = sys_exec.BuildCmd(cmdStr) // set working directory r.cmd.Dir = r.cwd diff --git a/core/task/handler/runner_v2.go b/core/task/handler/runner_v2.go index 57f95edb..341ff408 100644 --- a/core/task/handler/runner_v2.go +++ b/core/task/handler/runner_v2.go @@ -95,12 +95,15 @@ func (r *RunnerV2) Run() (err error) { // sync files worker nodes if !utils.IsMaster() { if err := r.syncFiles(); err != nil { - return err + return r.updateTask(constants.TaskStatusError, err) } } // configure cmd - r.configureCmd() + err = r.configureCmd() + if err != nil { + return r.updateTask(constants.TaskStatusError, err) + } // configure environment variables r.configureEnv() @@ -205,7 +208,7 @@ func (r *RunnerV2) GetTaskId() (id primitive.ObjectID) { return r.tid } -func (r *RunnerV2) configureCmd() { +func (r *RunnerV2) configureCmd() (err error) { var cmdStr string // customized spider @@ -223,13 +226,17 @@ func (r *RunnerV2) configureCmd() { } // get cmd instance - r.cmd = sys_exec.BuildCmd(cmdStr) + r.cmd, err = sys_exec.BuildCmd(cmdStr) + if err != nil { + log.Errorf("Error building command: %v", err) + trace.PrintError(err) + return err + } // set working directory r.cmd.Dir = r.cwd - // configure pgid to allow killing sub processes - //sys_exec.SetPgid(r.cmd) + return nil } func (r *RunnerV2) configureLogging() { @@ -320,16 +327,18 @@ func (r *RunnerV2) configureEnv() { func (r *RunnerV2) syncFiles() (err error) { var id string + var workingDir string if r.s.GitId.IsZero() { id = r.s.Id.Hex() + workingDir = "" } else { id = r.s.GitId.Hex() + workingDir = r.s.GitRootPath } masterURL := fmt.Sprintf("%s/sync/%s", viper.GetString("api.endpoint"), id) - workerDir := r.cwd // get file list from master - resp, err := http.Get(masterURL + "/scan?path=" + workerDir) + resp, err := http.Get(masterURL + "/scan?path=" + workingDir) if err != nil { log.Errorf("Error getting file list from master: %v", err) return trace.TraceError(err) @@ -354,15 +363,15 @@ func (r *RunnerV2) syncFiles() (err error) { } // create worker directory if not exists - if _, err := os.Stat(workerDir); os.IsNotExist(err) { - if err := os.MkdirAll(workerDir, os.ModePerm); err != nil { + if _, err := os.Stat(r.cwd); os.IsNotExist(err) { + if err := os.MkdirAll(r.cwd, os.ModePerm); err != nil { log.Errorf("Error creating worker directory: %v", err) return trace.TraceError(err) } } // get file list from worker - workerFiles, err := utils.ScanDirectory(workerDir) + workerFiles, err := utils.ScanDirectory(r.cwd) if err != nil { log.Errorf("Error scanning worker directory: %v", err) return trace.TraceError(err) @@ -391,7 +400,7 @@ func (r *RunnerV2) syncFiles() (err error) { go func(path string, masterFile entity.FsFileInfo) { defer wg.Done() logrus.Infof("File needs to be synchronized: %s", path) - err := r.downloadFile(masterURL+"/download?path="+path, filepath.Join(workerDir, path)) + err := r.downloadFile(masterURL+"/download?path="+path, filepath.Join(r.cwd, path)) if err != nil { logrus.Errorf("Error downloading file: %v", err) select { diff --git a/core/task/stats/service_v2.go b/core/task/stats/service_v2.go index b74b3441..fc39cbd5 100644 --- a/core/task/stats/service_v2.go +++ b/core/task/stats/service_v2.go @@ -17,7 +17,6 @@ import ( type ServiceV2 struct { // dependencies nodeCfgSvc interfaces.NodeConfigService - modelSvc service.ModelService // internals mu sync.Mutex @@ -64,7 +63,7 @@ func (svc *ServiceV2) getResultService(id primitive.ObjectID) (resultSvc interfa } // task - t, err := svc.modelSvc.GetTaskById(id) + t, err := service.NewModelServiceV2[models.TaskV2]().GetById(id) if err != nil { return nil, err } diff --git a/core/utils/demo.go b/core/utils/demo.go index bf667647..70b45044 100644 --- a/core/utils/demo.go +++ b/core/utils/demo.go @@ -31,7 +31,10 @@ func InitializedDemo() (ok bool) { func ImportDemo() (err error) { cmdStr := fmt.Sprintf("crawlab-cli login -a %s && crawlab-demo import", GetApiAddress()) - cmd := sys_exec.BuildCmd(cmdStr) + cmd, err := sys_exec.BuildCmd(cmdStr) + if err != nil { + return err + } if err := cmd.Run(); err != nil { trace.PrintError(err) } @@ -40,7 +43,10 @@ func ImportDemo() (err error) { func ReimportDemo() (err error) { cmdStr := fmt.Sprintf("crawlab-cli login -a %s && crawlab-demo reimport", GetApiAddress()) - cmd := sys_exec.BuildCmd(cmdStr) + cmd, err := sys_exec.BuildCmd(cmdStr) + if err != nil { + return err + } if err := cmd.Run(); err != nil { trace.PrintError(err) } @@ -49,7 +55,10 @@ func ReimportDemo() (err error) { func CleanupDemo() (err error) { cmdStr := fmt.Sprintf("crawlab-cli login -a %s && crawlab-demo reimport", GetApiAddress()) - cmd := sys_exec.BuildCmd(cmdStr) + cmd, err := sys_exec.BuildCmd(cmdStr) + if err != nil { + return err + } if err := cmd.Run(); err != nil { trace.PrintError(err) }