diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index 9937a874..39eeea03 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -809,7 +809,8 @@ func (r *Runner) startIPCReader() { line := scanner.Text() var ipcMsg IPCMessage - if err := json.Unmarshal([]byte(line), &ipcMsg); err == nil && ipcMsg.IPC { + err := json.Unmarshal([]byte(line), &ipcMsg) + if err == nil && ipcMsg.IPC { // Only handle as IPC if it's valid JSON AND has IPC flag set if r.ipcHandler != nil { r.ipcHandler(ipcMsg) diff --git a/core/task/handler/runner_test.go b/core/task/handler/runner_test.go index 10eca89d..1c613fe3 100644 --- a/core/task/handler/runner_test.go +++ b/core/task/handler/runner_test.go @@ -270,8 +270,10 @@ func TestRunner_HandleIPCData(t *testing.T) { select { case recordCount := <-processed: assert.Equal(t, tc.expected, recordCount) - case <-time.After(3 * time.Second): - t.Fatal("timeout waiting for IPC message to be processed") + case <-time.After(1 * time.Second): + if tc.expected > 0 { + t.Fatal("timeout waiting for IPC message to be processed") + } } }) } @@ -330,8 +332,10 @@ func TestRunner_HandleIPCInvalidData(t *testing.T) { // Mock the gRPC connection runner.conn = &mockConnectClient{ sendFunc: func(req *grpc.TaskServiceConnectRequest) error { - // This should not be called for invalid data - processed <- struct{}{} + if req.Code == grpc.TaskServiceConnectCode_INSERT_DATA { + // This should not be called for invalid data + processed <- struct{}{} + } return nil }, }