mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
fix: getting stream error for dependency server
This commit is contained in:
@@ -3,6 +3,8 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -248,12 +250,25 @@ func (svr DependencyServiceServer) SyncConfigSetup(_ context.Context, request *g
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (svr DependencyServiceServer) GetStream(key string) (stream *grpc.DependencyService_ConnectServer, err error) {
|
||||
func (svr DependencyServiceServer) GetStream(nodeKey string) (stream *grpc.DependencyService_ConnectServer, err error) {
|
||||
b := backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 30)
|
||||
err = backoff.Retry(func() error {
|
||||
stream, err = svr.getStream(nodeKey)
|
||||
return err
|
||||
}, b)
|
||||
if err != nil {
|
||||
log.Errorf("get stream error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
func (svr DependencyServiceServer) getStream(nodeKey string) (stream *grpc.DependencyService_ConnectServer, err error) {
|
||||
svr.mu.Lock()
|
||||
defer svr.mu.Unlock()
|
||||
stream, ok := svr.streams[key]
|
||||
stream, ok := svr.streams[nodeKey]
|
||||
if !ok {
|
||||
return nil, errors.New("stream not found")
|
||||
return nil, fmt.Errorf("stream not found for node: %s", nodeKey)
|
||||
}
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
@@ -767,17 +767,6 @@ func (r *Runner) handleIPC() {
|
||||
}
|
||||
}
|
||||
|
||||
// SendToChild sends a message to the child process through the IPC channel
|
||||
// msgType: type of message being sent
|
||||
// payload: data to be sent to the child process
|
||||
func (r *Runner) SendToChild(msgType string, payload interface{}) {
|
||||
r.ipcChan <- entity.IPCMessage{
|
||||
Type: msgType,
|
||||
Payload: payload,
|
||||
IPC: true, // Explicitly mark as IPC message
|
||||
}
|
||||
}
|
||||
|
||||
// SetIPCHandler sets the handler for incoming IPC messages
|
||||
func (r *Runner) SetIPCHandler(handler func(entity.IPCMessage)) {
|
||||
r.ipcHandler = handler
|
||||
|
||||
Reference in New Issue
Block a user