diff --git a/core/sys_exec/sys_exec.go b/core/sys_exec/sys_exec.go index 01027084..fb22f32d 100644 --- a/core/sys_exec/sys_exec.go +++ b/core/sys_exec/sys_exec.go @@ -1,16 +1,13 @@ package sys_exec import ( - "bufio" "github.com/crawlab-team/crawlab/trace" "github.com/shirou/gopsutil/process" "os/exec" - "time" ) type KillProcessOptions struct { - Timeout time.Duration - Force bool + Force bool } func KillProcess(cmd *exec.Cmd, opts *KillProcessOptions) error { @@ -25,29 +22,8 @@ func KillProcess(cmd *exec.Cmd, opts *KillProcessOptions) error { return killProcessRecursive(p, opts.Force) } - if opts.Timeout != 0 { - // with timeout - return killProcessWithTimeout(p, opts.Timeout, killFunc) - } else { - // without timeout - return killFunc(p) - } -} - -func killProcessWithTimeout(p *process.Process, timeout time.Duration, killFunc func(*process.Process) error) error { - go func() { - if err := killFunc(p); err != nil { - trace.PrintError(err) - } - }() - for i := 0; i < int(timeout.Seconds()); i++ { - ok, err := process.PidExists(p.Pid) - if err == nil && !ok { - return nil - } - time.Sleep(1 * time.Second) - } - return killProcess(p, true) + // without timeout + return killFunc(p) } func killProcessRecursive(p *process.Process, force bool) (err error) { @@ -78,12 +54,3 @@ func killProcess(p *process.Process, force bool) (err error) { } return nil } - -func ConfigureCmdLogging(cmd *exec.Cmd, fn func(scanner *bufio.Scanner)) { - stdout, _ := (*cmd).StdoutPipe() - stderr, _ := (*cmd).StderrPipe() - scannerStdout := bufio.NewScanner(stdout) - scannerStderr := bufio.NewScanner(stderr) - go fn(scannerStdout) - go fn(scannerStderr) -} diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index 29118cc7..51f1a397 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -30,7 +30,6 @@ import ( "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/grpc" "github.com/crawlab-team/crawlab/trace" - "github.com/shirou/gopsutil/process" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -152,9 +151,6 @@ func (r *Runner) Run() (err error) { return err } - // wait for process to finish - go r.wait() - // start health check go r.startHealthCheck() @@ -172,35 +168,8 @@ func (r *Runner) Run() (err error) { close(r.ipcChan) // Close IPC channel }() - // declare task status - status := "" - - // wait for signal - signal := <-r.ch - switch signal { - case constants.TaskSignalFinish: - err = nil - status = constants.TaskStatusFinished - case constants.TaskSignalCancel: - err = constants.ErrTaskCancelled - status = constants.TaskStatusCancelled - case constants.TaskSignalError: - err = r.err - status = constants.TaskStatusError - case constants.TaskSignalLost: - err = constants.ErrTaskLost - status = constants.TaskStatusError - default: - err = constants.ErrInvalidSignal - status = constants.TaskStatusError - } - - // update task status - if err := r.updateTask(status, err); err != nil { - return err - } - - return err + // wait for process to finish + return r.wait() } // Cancel terminates the running task. If force is true, the process will be killed immediately @@ -210,11 +179,11 @@ func (r *Runner) Cancel(force bool) (err error) { r.cancel() // Kill process - opts := &sys_exec.KillProcessOptions{ - Timeout: r.svc.GetCancelTimeout(), - Force: force, - } - if err := sys_exec.KillProcess(r.cmd, opts); err != nil { + err = sys_exec.KillProcess(r.cmd, &sys_exec.KillProcessOptions{ + Force: force, + }) + if err != nil { + log.Errorf("kill process error: %v", err) return err } @@ -310,8 +279,7 @@ func (r *Runner) startHealthCheck() { case <-r.ctx.Done(): return case <-ticker.C: - exists, _ := process.PidExists(int32(r.pid)) - if !exists { + if !utils.ProcessIdExists(r.pid) { // process lost r.ch <- constants.TaskSignalLost return @@ -536,30 +504,63 @@ func (r *Runner) getHttpRequestHeaders() (headers map[string]string) { // - TaskSignalFinish for successful completion // - TaskSignalCancel for cancellation // - TaskSignalError for execution errors -func (r *Runner) wait() { - // wait for process to finish - if err := r.cmd.Wait(); err != nil { - var exitError *exec.ExitError - ok := errors.As(err, &exitError) - if !ok { +func (r *Runner) wait() (err error) { + // start a goroutine to wait for process to finish + go func() { + err = r.cmd.Wait() + if err != nil { + var exitError *exec.ExitError + if !errors.As(err, &exitError) { + r.ch <- constants.TaskSignalError + return + } + exitCode := exitError.ExitCode() + if exitCode == -1 { + // cancel error + r.ch <- constants.TaskSignalCancel + return + } + + // standard error + r.err = err r.ch <- constants.TaskSignalError return } - exitCode := exitError.ExitCode() - if exitCode == -1 { - // cancel error - r.ch <- constants.TaskSignalCancel - return - } - // standard error - r.err = err - r.ch <- constants.TaskSignalError - return + // success + r.ch <- constants.TaskSignalFinish + }() + + // declare task status + status := "" + + // wait for signal + signal := <-r.ch + switch signal { + case constants.TaskSignalFinish: + err = nil + status = constants.TaskStatusFinished + case constants.TaskSignalCancel: + err = constants.ErrTaskCancelled + status = constants.TaskStatusCancelled + case constants.TaskSignalError: + err = r.err + status = constants.TaskStatusError + case constants.TaskSignalLost: + err = constants.ErrTaskLost + status = constants.TaskStatusError + default: + err = constants.ErrInvalidSignal + status = constants.TaskStatusError } - // success - r.ch <- constants.TaskSignalFinish + // update task status + if err := r.updateTask(status, err); err != nil { + log.Errorf("error updating task status: %v", err) + return err + } + + return nil } // updateTask updates the task status and related statistics in the database diff --git a/core/task/handler/service.go b/core/task/handler/service.go index ac1d3008..ae295d5a 100644 --- a/core/task/handler/service.go +++ b/core/task/handler/service.go @@ -28,12 +28,10 @@ type Service struct { c *grpcclient.GrpcClient // grpc client // settings - //maxRunners int - exitWatchDuration time.Duration - reportInterval time.Duration - fetchInterval time.Duration - fetchTimeout time.Duration - cancelTimeout time.Duration + reportInterval time.Duration + fetchInterval time.Duration + fetchTimeout time.Duration + cancelTimeout time.Duration // internals variables stopped bool @@ -136,10 +134,6 @@ func (svc *Service) reportStatus() { } } -func (svc *Service) GetExitWatchDuration() (duration time.Duration) { - return svc.exitWatchDuration -} - func (svc *Service) GetCancelTimeout() (timeout time.Duration) { return svc.cancelTimeout } @@ -444,13 +438,12 @@ func (svc *Service) cancelTask(taskId primitive.ObjectID, force bool) (err error func newTaskHandlerService() *Service { // service svc := &Service{ - exitWatchDuration: 60 * time.Second, - fetchInterval: 1 * time.Second, - fetchTimeout: 15 * time.Second, - reportInterval: 5 * time.Second, - cancelTimeout: 5 * time.Second, - mu: sync.Mutex{}, - runners: sync.Map{}, + fetchInterval: 1 * time.Second, + fetchTimeout: 15 * time.Second, + reportInterval: 5 * time.Second, + cancelTimeout: 60 * time.Second, + mu: sync.Mutex{}, + runners: sync.Map{}, } // dependency injection diff --git a/core/utils/process.go b/core/utils/process.go index 009d41d6..70e868bd 100644 --- a/core/utils/process.go +++ b/core/utils/process.go @@ -1,32 +1,16 @@ package utils import ( - "os" + "github.com/crawlab-team/crawlab/trace" + "github.com/shirou/gopsutil/process" "os/exec" "runtime" "strings" - "syscall" - - "github.com/crawlab-team/crawlab/trace" ) func ProcessIdExists(pid int) (ok bool) { - // Find process by pid - p, err := os.FindProcess(pid) - if err != nil { - // Process not found - return false - } - - // Check if process exists - err = p.Signal(syscall.Signal(0)) - if err == nil { - // Process exists - return true - } - - // Process not found - return false + ok, _ = process.PidExists(int32(pid)) + return ok } func ListProcess(text string) (lines []string, err error) {