From e1251d808bea1a2b0ba3c6e6bc00ee8c870085af Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Thu, 7 Aug 2025 11:53:42 +0800 Subject: [PATCH] refactor: update method receivers to value type for cleanup and connection methods; enhance context usage for task client operations --- core/grpc/server/task_service_server.go | 6 +++--- core/node/service/worker_service.go | 5 ++++- core/task/handler/runner.go | 9 ++++++--- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/core/grpc/server/task_service_server.go b/core/grpc/server/task_service_server.go index a8fc43cf..2ad3d319 100644 --- a/core/grpc/server/task_service_server.go +++ b/core/grpc/server/task_service_server.go @@ -370,7 +370,7 @@ func (svr TaskServiceServer) GetSubscribeStream(taskId primitive.ObjectID) (stre } // cleanupStaleStreams periodically checks for and removes stale streams -func (svr *TaskServiceServer) cleanupStaleStreams() { +func (svr TaskServiceServer) cleanupStaleStreams() { ticker := time.NewTicker(10 * time.Minute) // Check every 10 minutes defer ticker.Stop() @@ -386,7 +386,7 @@ func (svr *TaskServiceServer) cleanupStaleStreams() { } // performStreamCleanup checks each stream and removes those that are no longer active -func (svr *TaskServiceServer) performStreamCleanup() { +func (svr TaskServiceServer) performStreamCleanup() { taskServiceMutex.Lock() defer taskServiceMutex.Unlock() @@ -462,7 +462,7 @@ func newTaskServiceServer() *TaskServiceServer { } // Stop gracefully shuts down the task service server -func (svr *TaskServiceServer) Stop() error { +func (svr TaskServiceServer) Stop() error { svr.Infof("stopping task service server...") // Cancel cleanup routine diff --git a/core/node/service/worker_service.go b/core/node/service/worker_service.go index 89242763..693204a7 100644 --- a/core/node/service/worker_service.go +++ b/core/node/service/worker_service.go @@ -162,7 +162,10 @@ func (svc *WorkerService) subscribe() { svc.Errorf("failed to get node client: %v", err) return err } - stream, err := nodeClient.Subscribe(context.Background(), &grpc.NodeServiceSubscribeRequest{ + // Use client context for proper cancellation + ctx, cancel := client.GetGrpcClient().Context() + defer cancel() + stream, err := nodeClient.Subscribe(ctx, &grpc.NodeServiceSubscribeRequest{ NodeKey: svc.cfgSvc.GetNodeKey(), }) if err != nil { diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index 2bcd20d3..2c57eab5 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -123,6 +123,7 @@ type Runner struct { cancel context.CancelFunc // function to cancel the context done chan struct{} // channel to signal completion wg sync.WaitGroup // wait group for goroutine synchronization + // connection management for robust task execution connMutex sync.RWMutex // mutex for connection access connHealthTicker *time.Ticker // ticker for connection health checks @@ -533,7 +534,7 @@ func (r *Runner) initConnection() (err error) { r.Errorf("failed to get task client: %v", err) return err } - r.conn, err = taskClient.Connect(context.Background()) + r.conn, err = taskClient.Connect(r.ctx) if err != nil { r.Errorf("error connecting to task service: %v", err) return err @@ -657,7 +658,7 @@ func (r *Runner) reconnectWithRetry() error { r.Warnf("reconnection attempt %d failed to get task client: %v", attempt+1, err) continue } - conn, err := taskClient.Connect(context.Background()) + conn, err := taskClient.Connect(r.ctx) if err != nil { r.Warnf("reconnection attempt %d failed: %v", attempt+1, err) continue @@ -730,7 +731,9 @@ func (r *Runner) sendNotification() { r.Errorf("failed to get task client: %v", err) return } - _, err = taskClient.SendNotification(context.Background(), req) + ctx, cancel := context.WithTimeout(r.ctx, 10*time.Second) + defer cancel() + _, err = taskClient.SendNotification(ctx, req) if err != nil { r.Errorf("error sending notification: %v", err) return