fix: getting stream error for dependency server

This commit is contained in:
Marvin Zhang
2024-12-18 17:43:41 +08:00
parent 19b9135238
commit f736b2c58e
2 changed files with 18 additions and 14 deletions

View File

@@ -3,6 +3,8 @@ package server
import (
"context"
"errors"
"fmt"
"github.com/cenkalti/backoff/v4"
"io"
"sync"
"time"
@@ -248,12 +250,25 @@ func (svr DependencyServiceServer) SyncConfigSetup(_ context.Context, request *g
return nil, nil
}
func (svr DependencyServiceServer) GetStream(key string) (stream *grpc.DependencyService_ConnectServer, err error) {
func (svr DependencyServiceServer) GetStream(nodeKey string) (stream *grpc.DependencyService_ConnectServer, err error) {
b := backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 30)
err = backoff.Retry(func() error {
stream, err = svr.getStream(nodeKey)
return err
}, b)
if err != nil {
log.Errorf("get stream error: %v", err)
return nil, err
}
return stream, nil
}
func (svr DependencyServiceServer) getStream(nodeKey string) (stream *grpc.DependencyService_ConnectServer, err error) {
svr.mu.Lock()
defer svr.mu.Unlock()
stream, ok := svr.streams[key]
stream, ok := svr.streams[nodeKey]
if !ok {
return nil, errors.New("stream not found")
return nil, fmt.Errorf("stream not found for node: %s", nodeKey)
}
return stream, nil
}