test: fix test cases

This commit is contained in:
Marvin Zhang
2024-11-22 12:45:04 +08:00
parent 110078333e
commit aa1b01b0a5
4 changed files with 76 additions and 131 deletions

View File

@@ -30,7 +30,6 @@ import (
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
"github.com/shirou/gopsutil/process"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
@@ -152,9 +151,6 @@ func (r *Runner) Run() (err error) {
return err
}
// wait for process to finish
go r.wait()
// start health check
go r.startHealthCheck()
@@ -172,35 +168,8 @@ func (r *Runner) Run() (err error) {
close(r.ipcChan) // Close IPC channel
}()
// declare task status
status := ""
// wait for signal
signal := <-r.ch
switch signal {
case constants.TaskSignalFinish:
err = nil
status = constants.TaskStatusFinished
case constants.TaskSignalCancel:
err = constants.ErrTaskCancelled
status = constants.TaskStatusCancelled
case constants.TaskSignalError:
err = r.err
status = constants.TaskStatusError
case constants.TaskSignalLost:
err = constants.ErrTaskLost
status = constants.TaskStatusError
default:
err = constants.ErrInvalidSignal
status = constants.TaskStatusError
}
// update task status
if err := r.updateTask(status, err); err != nil {
return err
}
return err
// wait for process to finish
return r.wait()
}
// Cancel terminates the running task. If force is true, the process will be killed immediately
@@ -210,11 +179,11 @@ func (r *Runner) Cancel(force bool) (err error) {
r.cancel()
// Kill process
opts := &sys_exec.KillProcessOptions{
Timeout: r.svc.GetCancelTimeout(),
Force: force,
}
if err := sys_exec.KillProcess(r.cmd, opts); err != nil {
err = sys_exec.KillProcess(r.cmd, &sys_exec.KillProcessOptions{
Force: force,
})
if err != nil {
log.Errorf("kill process error: %v", err)
return err
}
@@ -310,8 +279,7 @@ func (r *Runner) startHealthCheck() {
case <-r.ctx.Done():
return
case <-ticker.C:
exists, _ := process.PidExists(int32(r.pid))
if !exists {
if !utils.ProcessIdExists(r.pid) {
// process lost
r.ch <- constants.TaskSignalLost
return
@@ -536,30 +504,63 @@ func (r *Runner) getHttpRequestHeaders() (headers map[string]string) {
// - TaskSignalFinish for successful completion
// - TaskSignalCancel for cancellation
// - TaskSignalError for execution errors
func (r *Runner) wait() {
// wait for process to finish
if err := r.cmd.Wait(); err != nil {
var exitError *exec.ExitError
ok := errors.As(err, &exitError)
if !ok {
func (r *Runner) wait() (err error) {
// start a goroutine to wait for process to finish
go func() {
err = r.cmd.Wait()
if err != nil {
var exitError *exec.ExitError
if !errors.As(err, &exitError) {
r.ch <- constants.TaskSignalError
return
}
exitCode := exitError.ExitCode()
if exitCode == -1 {
// cancel error
r.ch <- constants.TaskSignalCancel
return
}
// standard error
r.err = err
r.ch <- constants.TaskSignalError
return
}
exitCode := exitError.ExitCode()
if exitCode == -1 {
// cancel error
r.ch <- constants.TaskSignalCancel
return
}
// standard error
r.err = err
r.ch <- constants.TaskSignalError
return
// success
r.ch <- constants.TaskSignalFinish
}()
// declare task status
status := ""
// wait for signal
signal := <-r.ch
switch signal {
case constants.TaskSignalFinish:
err = nil
status = constants.TaskStatusFinished
case constants.TaskSignalCancel:
err = constants.ErrTaskCancelled
status = constants.TaskStatusCancelled
case constants.TaskSignalError:
err = r.err
status = constants.TaskStatusError
case constants.TaskSignalLost:
err = constants.ErrTaskLost
status = constants.TaskStatusError
default:
err = constants.ErrInvalidSignal
status = constants.TaskStatusError
}
// success
r.ch <- constants.TaskSignalFinish
// update task status
if err := r.updateTask(status, err); err != nil {
log.Errorf("error updating task status: %v", err)
return err
}
return nil
}
// updateTask updates the task status and related statistics in the database

View File

@@ -28,12 +28,10 @@ type Service struct {
c *grpcclient.GrpcClient // grpc client
// settings
//maxRunners int
exitWatchDuration time.Duration
reportInterval time.Duration
fetchInterval time.Duration
fetchTimeout time.Duration
cancelTimeout time.Duration
reportInterval time.Duration
fetchInterval time.Duration
fetchTimeout time.Duration
cancelTimeout time.Duration
// internals variables
stopped bool
@@ -136,10 +134,6 @@ func (svc *Service) reportStatus() {
}
}
func (svc *Service) GetExitWatchDuration() (duration time.Duration) {
return svc.exitWatchDuration
}
func (svc *Service) GetCancelTimeout() (timeout time.Duration) {
return svc.cancelTimeout
}
@@ -444,13 +438,12 @@ func (svc *Service) cancelTask(taskId primitive.ObjectID, force bool) (err error
func newTaskHandlerService() *Service {
// service
svc := &Service{
exitWatchDuration: 60 * time.Second,
fetchInterval: 1 * time.Second,
fetchTimeout: 15 * time.Second,
reportInterval: 5 * time.Second,
cancelTimeout: 5 * time.Second,
mu: sync.Mutex{},
runners: sync.Map{},
fetchInterval: 1 * time.Second,
fetchTimeout: 15 * time.Second,
reportInterval: 5 * time.Second,
cancelTimeout: 60 * time.Second,
mu: sync.Mutex{},
runners: sync.Map{},
}
// dependency injection