mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
fix: missing relational models issues
This commit is contained in:
@@ -71,16 +71,8 @@ func (r *RunnerV2) Init() (err error) {
|
||||
|
||||
// start grpc client
|
||||
if !r.c.IsStarted() {
|
||||
r.c.Start()
|
||||
}
|
||||
|
||||
// working directory
|
||||
workspacePath := viper.GetString("workspace")
|
||||
r.cwd = filepath.Join(workspacePath, r.s.Id.Hex())
|
||||
|
||||
// sync files from master
|
||||
if !utils.IsMaster() {
|
||||
if err := r.syncFiles(); err != nil {
|
||||
err := r.c.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -97,6 +89,16 @@ func (r *RunnerV2) Run() (err error) {
|
||||
// log task started
|
||||
log.Infof("task[%s] started", r.tid.Hex())
|
||||
|
||||
// configure working directory
|
||||
r.configureCwd()
|
||||
|
||||
// sync files worker nodes
|
||||
if !utils.IsMaster() {
|
||||
if err := r.syncFiles(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// configure cmd
|
||||
r.configureCmd()
|
||||
|
||||
@@ -317,26 +319,31 @@ func (r *RunnerV2) configureEnv() {
|
||||
}
|
||||
|
||||
func (r *RunnerV2) syncFiles() (err error) {
|
||||
masterURL := fmt.Sprintf("%s/sync/%s", viper.GetString("api.endpoint"), r.s.Id.Hex())
|
||||
workspacePath := viper.GetString("workspace")
|
||||
workerDir := filepath.Join(workspacePath, r.s.Id.Hex())
|
||||
var id string
|
||||
if r.s.GitId.IsZero() {
|
||||
id = r.s.Id.Hex()
|
||||
} else {
|
||||
id = r.s.GitId.Hex()
|
||||
}
|
||||
masterURL := fmt.Sprintf("%s/sync/%s", viper.GetString("api.endpoint"), id)
|
||||
workerDir := r.cwd
|
||||
|
||||
// get file list from master
|
||||
resp, err := http.Get(masterURL + "/scan")
|
||||
resp, err := http.Get(masterURL + "/scan?path=" + workerDir)
|
||||
if err != nil {
|
||||
fmt.Println("Error getting file list from master:", err)
|
||||
log.Errorf("Error getting file list from master: %v", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
fmt.Println("Error reading response body:", err)
|
||||
log.Errorf("Error reading response body: %v", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
var masterFiles map[string]entity.FsFileInfo
|
||||
err = json.Unmarshal(body, &masterFiles)
|
||||
if err != nil {
|
||||
fmt.Println("Error unmarshaling JSON:", err)
|
||||
log.Errorf("Error unmarshaling JSON: %v", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
@@ -349,7 +356,7 @@ func (r *RunnerV2) syncFiles() (err error) {
|
||||
// create worker directory if not exists
|
||||
if _, err := os.Stat(workerDir); os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(workerDir, os.ModePerm); err != nil {
|
||||
fmt.Println("Error creating worker directory:", err)
|
||||
log.Errorf("Error creating worker directory: %v", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
}
|
||||
@@ -357,7 +364,7 @@ func (r *RunnerV2) syncFiles() (err error) {
|
||||
// get file list from worker
|
||||
workerFiles, err := utils.ScanDirectory(workerDir)
|
||||
if err != nil {
|
||||
fmt.Println("Error scanning worker directory:", err)
|
||||
log.Errorf("Error scanning worker directory: %v", err)
|
||||
return trace.TraceError(err)
|
||||
}
|
||||
|
||||
@@ -368,10 +375,10 @@ func (r *RunnerV2) syncFiles() (err error) {
|
||||
// delete files that are deleted on master node
|
||||
for path, workerFile := range workerFiles {
|
||||
if _, exists := masterFilesMap[path]; !exists {
|
||||
fmt.Println("Deleting file:", path)
|
||||
log.Infof("Deleting file: %s", path)
|
||||
err := os.Remove(workerFile.FullPath)
|
||||
if err != nil {
|
||||
fmt.Println("Error deleting file:", err)
|
||||
log.Errorf("Error deleting file: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -619,7 +626,17 @@ func (r *RunnerV2) _updateSpiderStat(status string) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RunnerV2) configureCwd() {
|
||||
workspacePath := viper.GetString("workspace")
|
||||
if r.s.GitId.IsZero() {
|
||||
// not git
|
||||
r.cwd = filepath.Join(workspacePath, r.s.Id.Hex())
|
||||
} else {
|
||||
// git
|
||||
r.cwd = filepath.Join(workspacePath, r.s.GitId.Hex(), r.s.GitRootPath)
|
||||
}
|
||||
}
|
||||
|
||||
func NewTaskRunnerV2(id primitive.ObjectID, svc *ServiceV2) (r2 *RunnerV2, err error) {
|
||||
|
||||
@@ -43,7 +43,10 @@ type ServiceV2 struct {
|
||||
func (svc *ServiceV2) Start() {
|
||||
// Initialize gRPC if not started
|
||||
if !svc.c.IsStarted() {
|
||||
svc.c.Start()
|
||||
err := svc.c.Start()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
go svc.ReportStatus()
|
||||
|
||||
Reference in New Issue
Block a user