diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index 297e654b..ec069047 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -13,7 +13,6 @@ import ( "path/filepath" "strings" "sync" - "syscall" "time" "github.com/crawlab-team/crawlab/core/models/models" @@ -75,6 +74,11 @@ type Runner struct { wg sync.WaitGroup // wait group for goroutine synchronization } +const ( + IPCMessageData = "data" // IPCMessageData is the message type identifier for data messages + IPCMessageLog = "log" // IPCMessageLog is the message type identifier for log messages +) + // IPCMessage defines the structure for messages exchanged between parent and child processes type IPCMessage struct { Type string `json:"type"` // message type identifier @@ -219,14 +223,7 @@ func (r *Runner) Cancel(force bool) (err error) { for { select { case <-ticker.C: - p, err := os.FindProcess(r.pid) - if err != nil { - // process not exists, exit - return nil - } - err = p.Signal(syscall.Signal(0)) - if err == nil { - // process still exists, continue + if utils.ProcessIdExists(r.pid) { continue } return nil @@ -812,7 +809,7 @@ func (r *Runner) startIPCReader() { r.ipcHandler(ipcMsg) } else { // Default handler (insert data) - if ipcMsg.Type == "" || ipcMsg.Type == "insert_data" { + if ipcMsg.Type == "" || ipcMsg.Type == IPCMessageData { r.handleIPCInsertDataMessage(ipcMsg) } else { log.Warnf("no IPC handler set for message: %v", ipcMsg) diff --git a/core/task/handler/runner_test.go b/core/task/handler/runner_test.go index 8acd8b83..baad0059 100644 --- a/core/task/handler/runner_test.go +++ b/core/task/handler/runner_test.go @@ -3,6 +3,7 @@ package handler import ( "encoding/json" "fmt" + "github.com/apex/log" "io" "os" "syscall" @@ -69,6 +70,8 @@ func TestRunner_HandleIPC(t *testing.T) { // Create a pipe for testing pr, pw := io.Pipe() + defer pr.Close() + defer pw.Close() runner.stdoutPipe = pr // Start IPC reader @@ -108,13 +111,10 @@ func TestRunner_HandleIPC(t *testing.T) { select { case <-handled: // Message was handled successfully + log.Info("IPC message was handled successfully") case <-time.After(3 * time.Second): t.Fatal("timeout waiting for IPC message to be handled") } - - // Clean up - pw.Close() - pr.Close() } func TestRunner_Cancel(t *testing.T) { diff --git a/core/utils/process.go b/core/utils/process.go index 72575f7f..009d41d6 100644 --- a/core/utils/process.go +++ b/core/utils/process.go @@ -1,26 +1,31 @@ package utils import ( - "github.com/crawlab-team/crawlab/trace" + "os" "os/exec" - "regexp" "runtime" "strings" + "syscall" + + "github.com/crawlab-team/crawlab/trace" ) -var pidRegexp, _ = regexp.Compile("(?:^|\\s+)\\d+(?:$|\\s+)") - -func ProcessIdExists(id int) (ok bool) { - lines, err := ListProcess(string(rune(id))) +func ProcessIdExists(pid int) (ok bool) { + // Find process by pid + p, err := os.FindProcess(pid) if err != nil { + // Process not found return false } - for _, line := range lines { - matched := pidRegexp.MatchString(line) - if matched { - return true - } + + // Check if process exists + err = p.Signal(syscall.Signal(0)) + if err == nil { + // Process exists + return true } + + // Process not found return false }