From f736b2c58e4edb5c9b9496511f9b89bbd5c83e4d Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 18 Dec 2024 17:43:41 +0800 Subject: [PATCH] fix: getting stream error for dependency server --- core/grpc/server/dependency_service_server.go | 21 ++++++++++++++++--- core/task/handler/runner.go | 11 ---------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/core/grpc/server/dependency_service_server.go b/core/grpc/server/dependency_service_server.go index f952105d..4a0fdec8 100644 --- a/core/grpc/server/dependency_service_server.go +++ b/core/grpc/server/dependency_service_server.go @@ -3,6 +3,8 @@ package server import ( "context" "errors" + "fmt" + "github.com/cenkalti/backoff/v4" "io" "sync" "time" @@ -248,12 +250,25 @@ func (svr DependencyServiceServer) SyncConfigSetup(_ context.Context, request *g return nil, nil } -func (svr DependencyServiceServer) GetStream(key string) (stream *grpc.DependencyService_ConnectServer, err error) { +func (svr DependencyServiceServer) GetStream(nodeKey string) (stream *grpc.DependencyService_ConnectServer, err error) { + b := backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 30) + err = backoff.Retry(func() error { + stream, err = svr.getStream(nodeKey) + return err + }, b) + if err != nil { + log.Errorf("get stream error: %v", err) + return nil, err + } + return stream, nil +} + +func (svr DependencyServiceServer) getStream(nodeKey string) (stream *grpc.DependencyService_ConnectServer, err error) { svr.mu.Lock() defer svr.mu.Unlock() - stream, ok := svr.streams[key] + stream, ok := svr.streams[nodeKey] if !ok { - return nil, errors.New("stream not found") + return nil, fmt.Errorf("stream not found for node: %s", nodeKey) } return stream, nil } diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index 5e22f468..a8603b2e 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -767,17 +767,6 @@ func (r *Runner) handleIPC() { } } -// SendToChild sends a message to the child process through the IPC channel -// msgType: type of message being sent -// payload: data to be sent to the child process -func (r *Runner) SendToChild(msgType string, payload interface{}) { - r.ipcChan <- entity.IPCMessage{ - Type: msgType, - Payload: payload, - IPC: true, // Explicitly mark as IPC message - } -} - // SetIPCHandler sets the handler for incoming IPC messages func (r *Runner) SetIPCHandler(handler func(entity.IPCMessage)) { r.ipcHandler = handler