mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
test: fix test cases
This commit is contained in:
@@ -13,7 +13,6 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
@@ -75,6 +74,11 @@ type Runner struct {
|
||||
wg sync.WaitGroup // wait group for goroutine synchronization
|
||||
}
|
||||
|
||||
const (
|
||||
IPCMessageData = "data" // IPCMessageData is the message type identifier for data messages
|
||||
IPCMessageLog = "log" // IPCMessageLog is the message type identifier for log messages
|
||||
)
|
||||
|
||||
// IPCMessage defines the structure for messages exchanged between parent and child processes
|
||||
type IPCMessage struct {
|
||||
Type string `json:"type"` // message type identifier
|
||||
@@ -219,14 +223,7 @@ func (r *Runner) Cancel(force bool) (err error) {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
p, err := os.FindProcess(r.pid)
|
||||
if err != nil {
|
||||
// process not exists, exit
|
||||
return nil
|
||||
}
|
||||
err = p.Signal(syscall.Signal(0))
|
||||
if err == nil {
|
||||
// process still exists, continue
|
||||
if utils.ProcessIdExists(r.pid) {
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
@@ -812,7 +809,7 @@ func (r *Runner) startIPCReader() {
|
||||
r.ipcHandler(ipcMsg)
|
||||
} else {
|
||||
// Default handler (insert data)
|
||||
if ipcMsg.Type == "" || ipcMsg.Type == "insert_data" {
|
||||
if ipcMsg.Type == "" || ipcMsg.Type == IPCMessageData {
|
||||
r.handleIPCInsertDataMessage(ipcMsg)
|
||||
} else {
|
||||
log.Warnf("no IPC handler set for message: %v", ipcMsg)
|
||||
|
||||
@@ -3,6 +3,7 @@ package handler
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/apex/log"
|
||||
"io"
|
||||
"os"
|
||||
"syscall"
|
||||
@@ -69,6 +70,8 @@ func TestRunner_HandleIPC(t *testing.T) {
|
||||
|
||||
// Create a pipe for testing
|
||||
pr, pw := io.Pipe()
|
||||
defer pr.Close()
|
||||
defer pw.Close()
|
||||
runner.stdoutPipe = pr
|
||||
|
||||
// Start IPC reader
|
||||
@@ -108,13 +111,10 @@ func TestRunner_HandleIPC(t *testing.T) {
|
||||
select {
|
||||
case <-handled:
|
||||
// Message was handled successfully
|
||||
log.Info("IPC message was handled successfully")
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timeout waiting for IPC message to be handled")
|
||||
}
|
||||
|
||||
// Clean up
|
||||
pw.Close()
|
||||
pr.Close()
|
||||
}
|
||||
|
||||
func TestRunner_Cancel(t *testing.T) {
|
||||
|
||||
@@ -1,26 +1,31 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"github.com/crawlab-team/crawlab/trace"
|
||||
"os"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/crawlab-team/crawlab/trace"
|
||||
)
|
||||
|
||||
var pidRegexp, _ = regexp.Compile("(?:^|\\s+)\\d+(?:$|\\s+)")
|
||||
|
||||
func ProcessIdExists(id int) (ok bool) {
|
||||
lines, err := ListProcess(string(rune(id)))
|
||||
func ProcessIdExists(pid int) (ok bool) {
|
||||
// Find process by pid
|
||||
p, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
// Process not found
|
||||
return false
|
||||
}
|
||||
for _, line := range lines {
|
||||
matched := pidRegexp.MatchString(line)
|
||||
if matched {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check if process exists
|
||||
err = p.Signal(syscall.Signal(0))
|
||||
if err == nil {
|
||||
// Process exists
|
||||
return true
|
||||
}
|
||||
|
||||
// Process not found
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user