mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
refactor: update method receivers to value type for cleanup and connection methods; enhance context usage for task client operations
This commit is contained in:
@@ -370,7 +370,7 @@ func (svr TaskServiceServer) GetSubscribeStream(taskId primitive.ObjectID) (stre
|
||||
}
|
||||
|
||||
// cleanupStaleStreams periodically checks for and removes stale streams
|
||||
func (svr *TaskServiceServer) cleanupStaleStreams() {
|
||||
func (svr TaskServiceServer) cleanupStaleStreams() {
|
||||
ticker := time.NewTicker(10 * time.Minute) // Check every 10 minutes
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -386,7 +386,7 @@ func (svr *TaskServiceServer) cleanupStaleStreams() {
|
||||
}
|
||||
|
||||
// performStreamCleanup checks each stream and removes those that are no longer active
|
||||
func (svr *TaskServiceServer) performStreamCleanup() {
|
||||
func (svr TaskServiceServer) performStreamCleanup() {
|
||||
taskServiceMutex.Lock()
|
||||
defer taskServiceMutex.Unlock()
|
||||
|
||||
@@ -462,7 +462,7 @@ func newTaskServiceServer() *TaskServiceServer {
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down the task service server
|
||||
func (svr *TaskServiceServer) Stop() error {
|
||||
func (svr TaskServiceServer) Stop() error {
|
||||
svr.Infof("stopping task service server...")
|
||||
|
||||
// Cancel cleanup routine
|
||||
|
||||
@@ -162,7 +162,10 @@ func (svc *WorkerService) subscribe() {
|
||||
svc.Errorf("failed to get node client: %v", err)
|
||||
return err
|
||||
}
|
||||
stream, err := nodeClient.Subscribe(context.Background(), &grpc.NodeServiceSubscribeRequest{
|
||||
// Use client context for proper cancellation
|
||||
ctx, cancel := client.GetGrpcClient().Context()
|
||||
defer cancel()
|
||||
stream, err := nodeClient.Subscribe(ctx, &grpc.NodeServiceSubscribeRequest{
|
||||
NodeKey: svc.cfgSvc.GetNodeKey(),
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -123,6 +123,7 @@ type Runner struct {
|
||||
cancel context.CancelFunc // function to cancel the context
|
||||
done chan struct{} // channel to signal completion
|
||||
wg sync.WaitGroup // wait group for goroutine synchronization
|
||||
|
||||
// connection management for robust task execution
|
||||
connMutex sync.RWMutex // mutex for connection access
|
||||
connHealthTicker *time.Ticker // ticker for connection health checks
|
||||
@@ -533,7 +534,7 @@ func (r *Runner) initConnection() (err error) {
|
||||
r.Errorf("failed to get task client: %v", err)
|
||||
return err
|
||||
}
|
||||
r.conn, err = taskClient.Connect(context.Background())
|
||||
r.conn, err = taskClient.Connect(r.ctx)
|
||||
if err != nil {
|
||||
r.Errorf("error connecting to task service: %v", err)
|
||||
return err
|
||||
@@ -657,7 +658,7 @@ func (r *Runner) reconnectWithRetry() error {
|
||||
r.Warnf("reconnection attempt %d failed to get task client: %v", attempt+1, err)
|
||||
continue
|
||||
}
|
||||
conn, err := taskClient.Connect(context.Background())
|
||||
conn, err := taskClient.Connect(r.ctx)
|
||||
if err != nil {
|
||||
r.Warnf("reconnection attempt %d failed: %v", attempt+1, err)
|
||||
continue
|
||||
@@ -730,7 +731,9 @@ func (r *Runner) sendNotification() {
|
||||
r.Errorf("failed to get task client: %v", err)
|
||||
return
|
||||
}
|
||||
_, err = taskClient.SendNotification(context.Background(), req)
|
||||
ctx, cancel := context.WithTimeout(r.ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
_, err = taskClient.SendNotification(ctx, req)
|
||||
if err != nil {
|
||||
r.Errorf("error sending notification: %v", err)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user