From d042bc8cd7220bb955c1a11d4859a61cd501c73a Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Thu, 7 Aug 2025 11:12:46 +0800 Subject: [PATCH] refactor: improve connection readiness check and enhance goroutine management in gRPC client; ensure proper context handling in stream listeners --- core/grpc/client/client.go | 13 ++++++++++--- core/grpc/middlewares/auth_token.go | 7 ++++--- core/task/handler/stream_manager.go | 22 +++++++++++++++++++--- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go index 806e6ee1..587d864d 100644 --- a/core/grpc/client/client.go +++ b/core/grpc/client/client.go @@ -230,8 +230,11 @@ func (c *GrpcClient) Context() (ctx context.Context, cancel context.CancelFunc) } func (c *GrpcClient) IsReady() (res bool) { + if c.conn == nil { + return false + } state := c.conn.GetState() - return c.conn != nil && state == connectivity.Ready + return state == connectivity.Ready } func (c *GrpcClient) IsReadyAndRegistered() (res bool) { @@ -602,8 +605,12 @@ func (c *GrpcClient) getClientWithContext(ctx context.Context, getter func() int } func (c *GrpcClient) connect() error { - // Use a separate goroutine for reconnection handling - go c.handleReconnections() + // Start reconnection handling goroutine with proper tracking + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.handleReconnections() + }() // Initial connection attempt return c.doConnect() diff --git a/core/grpc/middlewares/auth_token.go b/core/grpc/middlewares/auth_token.go index d93db1c9..77ba7f65 100644 --- a/core/grpc/middlewares/auth_token.go +++ b/core/grpc/middlewares/auth_token.go @@ -2,9 +2,10 @@ package middlewares import ( "context" + "github.com/crawlab-team/crawlab/core/errors" "github.com/crawlab-team/crawlab/core/utils" - "github.com/grpc-ecosystem/go-grpc-middleware/auth" + grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -43,7 +44,7 @@ func GetGrpcClientAuthTokenUnaryChainInterceptor() grpc.UnaryClientInterceptor { // set auth key md := metadata.Pairs(GrpcHeaderAuthorization, utils.GetAuthKey()) return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - ctx = metadata.NewOutgoingContext(context.Background(), md) + ctx = metadata.NewOutgoingContext(ctx, md) return invoker(ctx, method, req, reply, cc, opts...) } } @@ -52,7 +53,7 @@ func GetGrpcClientAuthTokenStreamChainInterceptor() grpc.StreamClientInterceptor // set auth key md := metadata.Pairs(GrpcHeaderAuthorization, utils.GetAuthKey()) return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - ctx = metadata.NewOutgoingContext(context.Background(), md) + ctx = metadata.NewOutgoingContext(ctx, md) s, err := streamer(ctx, desc, cc, method, opts...) if err != nil { return nil, err diff --git a/core/task/handler/stream_manager.go b/core/task/handler/stream_manager.go index abd54d86..dbc1169d 100644 --- a/core/task/handler/stream_manager.go +++ b/core/task/handler/stream_manager.go @@ -162,13 +162,29 @@ func (sm *StreamManager) streamListener(ts *TaskStream) { err error }, 1) - // Start receive operation in a separate goroutine + // Start receive operation in a separate goroutine with proper cleanup go func() { + defer func() { + if r := recover(); r != nil { + sm.service.Errorf("stream recv goroutine panic for task[%s]: %v", ts.taskId.Hex(), r) + } + }() + msg, err := ts.stream.Recv() - resultChan <- struct { + + // Use select to ensure we don't block if the main goroutine has exited + select { + case resultChan <- struct { msg *grpc.TaskServiceSubscribeResponse err error - }{msg, err} + }{msg, err}: + case <-ts.ctx.Done(): + // Parent context cancelled, just return without sending + return + case <-sm.ctx.Done(): + // Manager context cancelled, just return without sending + return + } }() // Wait for result, timeout, or cancellation