refactor: improve IPC handling and logging in task runner tests

- Enhanced the IPC message handling in runner_test.go by adding detailed logging for better traceability.
- Refactored the test setup to use channels for synchronization and improved error handling during message processing.
- Updated the runner.go file to rename variables for clarity and streamline the IPC reader implementation.
- Improved the cleanup process in tests to ensure proper resource management and context cancellation.
This commit is contained in:
Marvin Zhang
2025-01-01 15:18:40 +08:00
parent db2549e3cd
commit 136daffa26
2 changed files with 107 additions and 51 deletions

View File

@@ -55,9 +55,9 @@ type Runner struct {
interfaces.Logger
// log handling
scannerStdout *bufio.Reader // reader for process stdout
scannerStderr *bufio.Reader // reader for process stderr
logBatchSize int // number of log lines to batch before sending
readerStdout *bufio.Reader // reader for process stdout
readerStderr *bufio.Reader // reader for process stderr
logBatchSize int // number of log lines to batch before sending
// IPC (Inter-Process Communication)
stdinPipe io.WriteCloser // pipe for writing to child process
@@ -247,8 +247,8 @@ func (r *Runner) configureCmd() (err error) {
}
// Create buffered readers
r.scannerStdout = bufio.NewReader(r.stdoutPipe)
r.scannerStderr = bufio.NewReader(stderrPipe)
r.readerStdout = bufio.NewReader(r.stdoutPipe)
r.readerStderr = bufio.NewReader(stderrPipe)
// Initialize IPC channel
r.ipcChan = make(chan entity.IPCMessage)
@@ -849,32 +849,32 @@ func (r *Runner) startIPCReader() {
// Start stdout reader
go func() {
defer r.wg.Done()
r.readOutput(r.scannerStdout, true) // true for stdout
r.readOutput(r.readerStdout, true) // true for stdout
}()
// Start stderr reader
go func() {
defer r.wg.Done()
r.readOutput(r.scannerStderr, false) // false for stderr
r.readOutput(r.readerStderr, false) // false for stderr
}()
}
func (r *Runner) readOutput(reader *bufio.Reader, isStdout bool) {
scanner := bufio.NewScanner(reader)
for {
select {
case <-r.ctx.Done():
// Context cancelled, stop reading
return
default:
line, err := reader.ReadString('\n')
if err != nil {
if err != io.EOF {
r.Errorf("error reading from %s: %v",
map[bool]string{true: "stdout", false: "stderr"}[isStdout],
err)
}
// Scan the next line
if !scanner.Scan() {
return
}
// Get the line
line := scanner.Text()
// Trim the line
line = strings.TrimRight(line, "\n\r")
@@ -904,8 +904,6 @@ func (r *Runner) readOutput(reader *bufio.Reader, isStdout bool) {
// handleIPCInsertDataMessage converts the IPC message payload to JSON and sends it to the master node
func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
r.Debugf("processing IPC data message")
if ipcMsg.Payload == nil {
r.Errorf("empty payload in IPC message")
return
@@ -982,8 +980,6 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
return
}
}
r.Infof("successfully sent %d records to master node", len(records))
}
// newTaskRunner creates a new task runner instance with the specified task ID