mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-26 17:49:15 +01:00
refactor: consolidated configs
This commit is contained in:
@@ -6,7 +6,6 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -16,6 +15,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
|
||||
"github.com/apex/log"
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
"github.com/crawlab-team/crawlab/core/entity"
|
||||
@@ -220,7 +221,7 @@ func (r *Runner) configureCmd() (err error) {
|
||||
// get cmd instance
|
||||
r.cmd, err = sys_exec.BuildCmd(cmdStr)
|
||||
if err != nil {
|
||||
log.Errorf("Error building command: %v", err)
|
||||
log.Errorf("error building command: %v", err)
|
||||
trace.PrintError(err)
|
||||
return err
|
||||
}
|
||||
@@ -297,8 +298,8 @@ func (r *Runner) configureEnv() {
|
||||
|
||||
// Default envs
|
||||
r.cmd.Env = append(os.Environ(), "CRAWLAB_TASK_ID="+r.tid.Hex())
|
||||
if viper.GetString("grpc.address") != "" {
|
||||
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_ADDRESS="+viper.GetString("grpc.address"))
|
||||
if utils.GetGrpcAddress() != "" {
|
||||
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_ADDRESS="+utils.GetGrpcAddress())
|
||||
}
|
||||
if viper.GetString("grpc.authKey") != "" {
|
||||
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_AUTH_KEY="+viper.GetString("grpc.authKey"))
|
||||
@@ -317,35 +318,52 @@ func (r *Runner) configureEnv() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) syncFiles() (err error) {
|
||||
func (r *Runner) createHttpRequest(method, path string) (*http.Response, error) {
|
||||
// Construct master URL
|
||||
var id string
|
||||
var workingDir string
|
||||
if r.s.GitId.IsZero() {
|
||||
id = r.s.Id.Hex()
|
||||
workingDir = ""
|
||||
} else {
|
||||
id = r.s.GitId.Hex()
|
||||
}
|
||||
url := fmt.Sprintf("%s/sync/%s%s", utils.GetApiEndpoint(), id, path)
|
||||
|
||||
// Create and execute request
|
||||
req, err := http.NewRequest(method, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating request: %v", err)
|
||||
}
|
||||
|
||||
for k, v := range r.getHttpRequestHeaders() {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
|
||||
return http.DefaultClient.Do(req)
|
||||
}
|
||||
|
||||
func (r *Runner) syncFiles() (err error) {
|
||||
workingDir := ""
|
||||
if !r.s.GitId.IsZero() {
|
||||
workingDir = r.s.GitRootPath
|
||||
}
|
||||
masterURL := fmt.Sprintf("%s/sync/%s", viper.GetString("api.endpoint"), id)
|
||||
|
||||
// get file list from master
|
||||
resp, err := http.Get(masterURL + "/scan?path=" + workingDir)
|
||||
resp, err := r.createHttpRequest("GET", "/scan?path="+workingDir)
|
||||
if err != nil {
|
||||
log.Errorf("Error getting file list from master: %v", err)
|
||||
return trace.TraceError(err)
|
||||
log.Errorf("error getting file list from master: %v", err)
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Errorf("Error reading response body: %v", err)
|
||||
return trace.TraceError(err)
|
||||
log.Errorf("error reading response body: %v", err)
|
||||
return err
|
||||
}
|
||||
var masterFiles map[string]entity.FsFileInfo
|
||||
err = json.Unmarshal(body, &masterFiles)
|
||||
if err != nil {
|
||||
log.Errorf("Error unmarshaling JSON: %v", err)
|
||||
return trace.TraceError(err)
|
||||
log.Errorf("error unmarshaling JSON: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// create a map for master files
|
||||
@@ -357,25 +375,25 @@ func (r *Runner) syncFiles() (err error) {
|
||||
// create working directory if not exists
|
||||
if _, err := os.Stat(r.cwd); os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(r.cwd, os.ModePerm); err != nil {
|
||||
log.Errorf("Error creating worker directory: %v", err)
|
||||
return trace.TraceError(err)
|
||||
log.Errorf("error creating worker directory: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// get file list from worker
|
||||
workerFiles, err := utils.ScanDirectory(r.cwd)
|
||||
if err != nil {
|
||||
log.Errorf("Error scanning worker directory: %v", err)
|
||||
log.Errorf("error scanning worker directory: %v", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
// delete files that are deleted on master node
|
||||
for path, workerFile := range workerFiles {
|
||||
if _, exists := masterFilesMap[path]; !exists {
|
||||
log.Infof("Deleting file: %s", path)
|
||||
log.Infof("deleting file: %s", path)
|
||||
err := os.Remove(workerFile.FullPath)
|
||||
if err != nil {
|
||||
log.Errorf("Error deleting file: %v", err)
|
||||
log.Errorf("error deleting file: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -398,17 +416,17 @@ func (r *Runner) syncFiles() (err error) {
|
||||
defer wg.Done()
|
||||
|
||||
if masterFile.IsDir {
|
||||
log.Infof("Directory needs to be synchronized: %s", path)
|
||||
log.Infof("directory needs to be synchronized: %s", path)
|
||||
_err := os.MkdirAll(filepath.Join(r.cwd, path), masterFile.Mode)
|
||||
if _err != nil {
|
||||
log.Errorf("Error creating directory: %v", _err)
|
||||
log.Errorf("error creating directory: %v", _err)
|
||||
err = errors.Join(err, _err)
|
||||
}
|
||||
} else {
|
||||
log.Infof("File needs to be synchronized: %s", path)
|
||||
_err := r.downloadFile(masterURL+"/download?path="+filepath.Join(workingDir, path), filepath.Join(r.cwd, path), masterFile)
|
||||
log.Infof("file needs to be synchronized: %s", path)
|
||||
_err := r.downloadFile(path, filepath.Join(r.cwd, path), masterFile)
|
||||
if _err != nil {
|
||||
log.Errorf("Error downloading file: %v", _err)
|
||||
log.Errorf("error downloading file: %v", _err)
|
||||
err = errors.Join(err, _err)
|
||||
}
|
||||
}
|
||||
@@ -426,15 +444,14 @@ func (r *Runner) syncFiles() (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Runner) downloadFile(url string, filePath string, fileInfo *entity.FsFileInfo) error {
|
||||
// get file response
|
||||
resp, err := http.Get(url)
|
||||
func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsFileInfo) error {
|
||||
resp, err := r.createHttpRequest("GET", "/download?path="+path)
|
||||
if err != nil {
|
||||
log.Errorf("Error getting file response: %v", err)
|
||||
log.Errorf("error getting file response: %v", err)
|
||||
return err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.Errorf("Error downloading file: %s", resp.Status)
|
||||
log.Errorf("error downloading file: %s", resp.Status)
|
||||
return errors.New(resp.Status)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
@@ -444,14 +461,14 @@ func (r *Runner) downloadFile(url string, filePath string, fileInfo *entity.FsFi
|
||||
utils.Exists(dirPath)
|
||||
err = os.MkdirAll(dirPath, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Errorf("Error creating directory: %v", err)
|
||||
log.Errorf("error creating directory: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// create local file
|
||||
out, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fileInfo.Mode)
|
||||
if err != nil {
|
||||
log.Errorf("Error creating file: %v", err)
|
||||
log.Errorf("error creating file: %v", err)
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
@@ -459,12 +476,18 @@ func (r *Runner) downloadFile(url string, filePath string, fileInfo *entity.FsFi
|
||||
// copy file content to local file
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
if err != nil {
|
||||
log.Errorf("Error copying file: %v", err)
|
||||
log.Errorf("error copying file: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner) getHttpRequestHeaders() (headers map[string]string) {
|
||||
headers = make(map[string]string)
|
||||
headers["Authorization"] = utils.GetAuthKey()
|
||||
return headers
|
||||
}
|
||||
|
||||
// wait for process to finish and send task signal (constants.TaskSignal)
|
||||
// to task runner's channel (Runner.ch) according to exit code
|
||||
func (r *Runner) wait() {
|
||||
@@ -541,7 +564,7 @@ func (r *Runner) initConnection() (err error) {
|
||||
func (r *Runner) writeLogLines(lines []string) {
|
||||
linesBytes, err := json.Marshal(lines)
|
||||
if err != nil {
|
||||
log.Errorf("Error marshaling log lines: %v", err)
|
||||
log.Errorf("error marshaling log lines: %v", err)
|
||||
return
|
||||
}
|
||||
msg := &grpc.TaskServiceConnectRequest{
|
||||
@@ -550,7 +573,7 @@ func (r *Runner) writeLogLines(lines []string) {
|
||||
Data: linesBytes,
|
||||
}
|
||||
if err := r.conn.Send(msg); err != nil {
|
||||
log.Errorf("Error sending log lines: %v", err)
|
||||
log.Errorf("error sending log lines: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -598,7 +621,7 @@ func (r *Runner) sendNotification() {
|
||||
}
|
||||
_, err := r.c.TaskClient.SendNotification(context.Background(), req)
|
||||
if err != nil {
|
||||
log.Errorf("Error sending notification: %v", err)
|
||||
log.Errorf("error sending notification: %v", err)
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
@@ -659,7 +682,7 @@ func (r *Runner) _updateSpiderStat(status string) {
|
||||
}
|
||||
|
||||
func (r *Runner) configureCwd() {
|
||||
workspacePath := viper.GetString("workspace")
|
||||
workspacePath := utils.GetWorkspace()
|
||||
if r.s.GitId.IsZero() {
|
||||
// not git
|
||||
r.cwd = filepath.Join(workspacePath, r.s.Id.Hex())
|
||||
@@ -698,7 +721,7 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r2 *Runner, err error)
|
||||
}
|
||||
|
||||
// task fs service
|
||||
r.fsSvc = fs.NewFsService(filepath.Join(viper.GetString("workspace"), r.s.Id.Hex()))
|
||||
r.fsSvc = fs.NewFsService(filepath.Join(utils.GetWorkspace(), r.s.Id.Hex()))
|
||||
|
||||
// grpc client
|
||||
r.c = client2.GetGrpcClient()
|
||||
|
||||
Reference in New Issue
Block a user