test: fix test cases

This commit is contained in:
Marvin Zhang
2024-11-22 12:45:04 +08:00
parent 739d796d1b
commit 6a65067cea
4 changed files with 76 additions and 131 deletions

View File

@@ -1,16 +1,13 @@
package sys_exec
import (
"bufio"
"github.com/crawlab-team/crawlab/trace"
"github.com/shirou/gopsutil/process"
"os/exec"
"time"
)
type KillProcessOptions struct {
Timeout time.Duration
Force bool
Force bool
}
func KillProcess(cmd *exec.Cmd, opts *KillProcessOptions) error {
@@ -25,29 +22,8 @@ func KillProcess(cmd *exec.Cmd, opts *KillProcessOptions) error {
return killProcessRecursive(p, opts.Force)
}
if opts.Timeout != 0 {
// with timeout
return killProcessWithTimeout(p, opts.Timeout, killFunc)
} else {
// without timeout
return killFunc(p)
}
}
func killProcessWithTimeout(p *process.Process, timeout time.Duration, killFunc func(*process.Process) error) error {
go func() {
if err := killFunc(p); err != nil {
trace.PrintError(err)
}
}()
for i := 0; i < int(timeout.Seconds()); i++ {
ok, err := process.PidExists(p.Pid)
if err == nil && !ok {
return nil
}
time.Sleep(1 * time.Second)
}
return killProcess(p, true)
// without timeout
return killFunc(p)
}
func killProcessRecursive(p *process.Process, force bool) (err error) {
@@ -78,12 +54,3 @@ func killProcess(p *process.Process, force bool) (err error) {
}
return nil
}
func ConfigureCmdLogging(cmd *exec.Cmd, fn func(scanner *bufio.Scanner)) {
stdout, _ := (*cmd).StdoutPipe()
stderr, _ := (*cmd).StderrPipe()
scannerStdout := bufio.NewScanner(stdout)
scannerStderr := bufio.NewScanner(stderr)
go fn(scannerStdout)
go fn(scannerStderr)
}

View File

@@ -30,7 +30,6 @@ import (
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
"github.com/shirou/gopsutil/process"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
@@ -152,9 +151,6 @@ func (r *Runner) Run() (err error) {
return err
}
// wait for process to finish
go r.wait()
// start health check
go r.startHealthCheck()
@@ -172,35 +168,8 @@ func (r *Runner) Run() (err error) {
close(r.ipcChan) // Close IPC channel
}()
// declare task status
status := ""
// wait for signal
signal := <-r.ch
switch signal {
case constants.TaskSignalFinish:
err = nil
status = constants.TaskStatusFinished
case constants.TaskSignalCancel:
err = constants.ErrTaskCancelled
status = constants.TaskStatusCancelled
case constants.TaskSignalError:
err = r.err
status = constants.TaskStatusError
case constants.TaskSignalLost:
err = constants.ErrTaskLost
status = constants.TaskStatusError
default:
err = constants.ErrInvalidSignal
status = constants.TaskStatusError
}
// update task status
if err := r.updateTask(status, err); err != nil {
return err
}
return err
// wait for process to finish
return r.wait()
}
// Cancel terminates the running task. If force is true, the process will be killed immediately
@@ -210,11 +179,11 @@ func (r *Runner) Cancel(force bool) (err error) {
r.cancel()
// Kill process
opts := &sys_exec.KillProcessOptions{
Timeout: r.svc.GetCancelTimeout(),
Force: force,
}
if err := sys_exec.KillProcess(r.cmd, opts); err != nil {
err = sys_exec.KillProcess(r.cmd, &sys_exec.KillProcessOptions{
Force: force,
})
if err != nil {
log.Errorf("kill process error: %v", err)
return err
}
@@ -310,8 +279,7 @@ func (r *Runner) startHealthCheck() {
case <-r.ctx.Done():
return
case <-ticker.C:
exists, _ := process.PidExists(int32(r.pid))
if !exists {
if !utils.ProcessIdExists(r.pid) {
// process lost
r.ch <- constants.TaskSignalLost
return
@@ -536,30 +504,63 @@ func (r *Runner) getHttpRequestHeaders() (headers map[string]string) {
// - TaskSignalFinish for successful completion
// - TaskSignalCancel for cancellation
// - TaskSignalError for execution errors
func (r *Runner) wait() {
// wait for process to finish
if err := r.cmd.Wait(); err != nil {
var exitError *exec.ExitError
ok := errors.As(err, &exitError)
if !ok {
func (r *Runner) wait() (err error) {
// start a goroutine to wait for process to finish
go func() {
err = r.cmd.Wait()
if err != nil {
var exitError *exec.ExitError
if !errors.As(err, &exitError) {
r.ch <- constants.TaskSignalError
return
}
exitCode := exitError.ExitCode()
if exitCode == -1 {
// cancel error
r.ch <- constants.TaskSignalCancel
return
}
// standard error
r.err = err
r.ch <- constants.TaskSignalError
return
}
exitCode := exitError.ExitCode()
if exitCode == -1 {
// cancel error
r.ch <- constants.TaskSignalCancel
return
}
// standard error
r.err = err
r.ch <- constants.TaskSignalError
return
// success
r.ch <- constants.TaskSignalFinish
}()
// declare task status
status := ""
// wait for signal
signal := <-r.ch
switch signal {
case constants.TaskSignalFinish:
err = nil
status = constants.TaskStatusFinished
case constants.TaskSignalCancel:
err = constants.ErrTaskCancelled
status = constants.TaskStatusCancelled
case constants.TaskSignalError:
err = r.err
status = constants.TaskStatusError
case constants.TaskSignalLost:
err = constants.ErrTaskLost
status = constants.TaskStatusError
default:
err = constants.ErrInvalidSignal
status = constants.TaskStatusError
}
// success
r.ch <- constants.TaskSignalFinish
// update task status
if err := r.updateTask(status, err); err != nil {
log.Errorf("error updating task status: %v", err)
return err
}
return nil
}
// updateTask updates the task status and related statistics in the database

View File

@@ -28,12 +28,10 @@ type Service struct {
c *grpcclient.GrpcClient // grpc client
// settings
//maxRunners int
exitWatchDuration time.Duration
reportInterval time.Duration
fetchInterval time.Duration
fetchTimeout time.Duration
cancelTimeout time.Duration
reportInterval time.Duration
fetchInterval time.Duration
fetchTimeout time.Duration
cancelTimeout time.Duration
// internals variables
stopped bool
@@ -136,10 +134,6 @@ func (svc *Service) reportStatus() {
}
}
func (svc *Service) GetExitWatchDuration() (duration time.Duration) {
return svc.exitWatchDuration
}
func (svc *Service) GetCancelTimeout() (timeout time.Duration) {
return svc.cancelTimeout
}
@@ -444,13 +438,12 @@ func (svc *Service) cancelTask(taskId primitive.ObjectID, force bool) (err error
func newTaskHandlerService() *Service {
// service
svc := &Service{
exitWatchDuration: 60 * time.Second,
fetchInterval: 1 * time.Second,
fetchTimeout: 15 * time.Second,
reportInterval: 5 * time.Second,
cancelTimeout: 5 * time.Second,
mu: sync.Mutex{},
runners: sync.Map{},
fetchInterval: 1 * time.Second,
fetchTimeout: 15 * time.Second,
reportInterval: 5 * time.Second,
cancelTimeout: 60 * time.Second,
mu: sync.Mutex{},
runners: sync.Map{},
}
// dependency injection

View File

@@ -1,32 +1,16 @@
package utils
import (
"os"
"github.com/crawlab-team/crawlab/trace"
"github.com/shirou/gopsutil/process"
"os/exec"
"runtime"
"strings"
"syscall"
"github.com/crawlab-team/crawlab/trace"
)
func ProcessIdExists(pid int) (ok bool) {
// Find process by pid
p, err := os.FindProcess(pid)
if err != nil {
// Process not found
return false
}
// Check if process exists
err = p.Signal(syscall.Signal(0))
if err == nil {
// Process exists
return true
}
// Process not found
return false
ok, _ = process.PidExists(int32(pid))
return ok
}
func ListProcess(text string) (lines []string, err error) {