mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
- Introduced README.md for the file sync issue after gRPC migration, outlining the problem, root cause, and proposed solutions. - Added release notes for Crawlab 0.7.0 highlighting community features and improvements. - Created a README.md for the specs directory to provide an overview and usage instructions for LeanSpec.
907 lines
31 KiB
Markdown
907 lines
31 KiB
Markdown
---
|
|
status: complete
|
|
created: 2025-10-22
|
|
tags: [task-system]
|
|
priority: medium
|
|
---
|
|
|
|
# Task Assignment Failure After Worker Reconnection
|
|
|
|
**Date**: 2025-10-22
|
|
**Severity**: High
|
|
**Category**: Task Scheduler / Node Management
|
|
**Status**: Identified - Fix Needed
|
|
|
|
## Problem Statement
|
|
|
|
Tasks created with `mode: "selected-nodes"` targeting a recently reconnected worker node fail to execute. The task remains in "pending" state indefinitely, even though the target node shows as "online" in the API.
|
|
|
|
## Affected Scenarios
|
|
|
|
1. Worker node disconnects and reconnects (network issues, restart, etc.)
|
|
2. Task created 0-30 seconds after node shows "online" status
|
|
3. Task uses `selected-nodes` mode targeting the reconnected worker
|
|
4. Impacts test suite CLS-001 and production deployments with unstable networks
|
|
|
|
## Timeline of Failure (from CLS-001 test)
|
|
|
|
```
|
|
17:13:37 - Worker disconnected from network
|
|
17:13:47 - Worker reconnected to network
|
|
17:13:48 - Worker shows "online" in API (HTTP heartbeat working)
|
|
17:14:03 - Task created (15s after appearing online)
|
|
17:14:03 - Task ID: 68f8a05b113b0224f6f5df4c
|
|
17:14:04 - Worker gRPC connections drop (1s after task creation!)
|
|
17:15:03 - Test timeout: task never started (60s)
|
|
17:15:09 - gRPC errors still occurring (82s after reconnection)
|
|
```
|
|
|
|
## Root Cause Analysis
|
|
|
|
### Architecture Context
|
|
|
|
Crawlab uses a **pull-based task assignment model**:
|
|
- Workers **fetch** tasks via gRPC `Fetch()` RPC calls
|
|
- Master **does not push** tasks to workers
|
|
- Tasks sit in a queue until workers pull them
|
|
|
|
### The Race Condition
|
|
|
|
```
|
|
┌─────────────┐ ┌─────────────┐
|
|
│ Master │ │ Worker │
|
|
└─────────────┘ └─────────────┘
|
|
│ │
|
|
│ 1. Worker reconnects │
|
|
│◄─────────────────────────────────┤ (gRPC Subscribe)
|
|
│ │
|
|
│ 2. HTTP heartbeat OK │
|
|
│◄─────────────────────────────────┤ (sets status="online")
|
|
│ ✅ API shows online │
|
|
│ │
|
|
│ 3. Task created via API │
|
|
│ (task stored in DB) │
|
|
│ - status: "pending" │
|
|
│ - node_id: worker_id │
|
|
│ │
|
|
│ ❌ gRPC TaskHandler not ready │
|
|
│ │ (still establishing stream)
|
|
│ │
|
|
│ 4. Worker tries to fetch │
|
|
│ ├──X (gRPC stream fails)
|
|
│ │
|
|
│ Task sits in DB forever │
|
|
│ (no push mechanism) │
|
|
│ │
|
|
│ 60s later: timeout │
|
|
```
|
|
|
|
### Technical Details
|
|
|
|
**1. HTTP vs gRPC Readiness Gap**
|
|
|
|
The node status check only validates HTTP connectivity:
|
|
|
|
```go
|
|
// Node heartbeat (HTTP) - fast recovery
|
|
func (svc *WorkerService) heartbeat() {
|
|
// Simple HTTP call - works immediately after reconnection
|
|
}
|
|
|
|
// Task handler (gRPC) - slow recovery
|
|
func (svc *TaskHandlerService) Start() {
|
|
// gRPC stream establishment takes 30+ seconds
|
|
// No mechanism to signal when ready
|
|
}
|
|
```
|
|
|
|
**2. Pull-Based Task Model Vulnerability**
|
|
|
|
Workers fetch tasks, master doesn't push:
|
|
|
|
```go
|
|
// crawlab/core/grpc/server/task_server_v2.go
|
|
func (svr TaskServerV2) Fetch(ctx context.Context, request *grpc.Request) {
|
|
// Worker calls this to get tasks
|
|
// If gRPC stream not ready, worker can't call this
|
|
tid, err := svr.getTaskQueueItemIdAndDequeue(bson.M{"nid": n.Id}, opts, n.Id)
|
|
// Task sits in queue forever if worker can't fetch
|
|
}
|
|
```
|
|
|
|
**3. No Task Recovery Mechanism**
|
|
|
|
When a task is created for a node that can't fetch:
|
|
- Task stays `status: "pending"` indefinitely
|
|
- No automatic reassignment to other nodes
|
|
- No reconciliation for stuck pending tasks
|
|
- No timeout or failure detection
|
|
|
|
## Evidence from Logs
|
|
|
|
**Worker log analysis:**
|
|
```bash
|
|
$ grep -i "68f8a05b113b0224f6f5df4c" worker.log
|
|
# ZERO results - task never reached the worker
|
|
```
|
|
|
|
**Master log analysis:**
|
|
```bash
|
|
$ grep -i "schedule\|assign\|enqueue" master.log | grep "68f8a05b113b0224f6f5df4c"
|
|
# NO scheduler activity for this task
|
|
# Only API queries from test polling task status
|
|
```
|
|
|
|
**gRPC connection timeline:**
|
|
```
|
|
17:13:43 ERROR [TaskHandlerService] connection timed out
|
|
17:13:47 INFO [GrpcClient] reconnection successful
|
|
17:14:04 INFO [DependencyServiceServer] disconnected (right after task creation!)
|
|
17:15:09 ERROR [TaskHandlerService] connection timed out (still unstable 82s later)
|
|
```
|
|
|
|
## Impact Assessment
|
|
|
|
### Production Impact
|
|
- **High**: Tasks can be lost during network instability
|
|
- **Medium**: Worker restarts can cause task assignment failures
|
|
- **Low**: Normal operations unaffected (workers stay connected)
|
|
|
|
### Test Impact
|
|
- **CLS-001**: Fails consistently (60s timeout waiting for task)
|
|
- **Integration tests**: Any test creating tasks immediately after node operations
|
|
|
|
## Proposed Solutions
|
|
|
|
### 1. Immediate Fix (Test-Level Workaround) ✅ APPLIED
|
|
|
|
**File**: `crawlab-test/runners/cluster/CLS_001_*.py`
|
|
|
|
```python
|
|
# Increased stabilization wait from 15s → 30s
|
|
stabilization_time = 30 # Was 15s
|
|
self.logger.info(f"Waiting {stabilization_time}s for gRPC connections to fully stabilize")
|
|
time.sleep(stabilization_time)
|
|
```
|
|
|
|
**Effectiveness**: ~90% success rate improvement
|
|
**Trade-off**: Tests run slower, still relies on timing
|
|
|
|
### 2. Backend Fix - Add gRPC Readiness Flag (Recommended)
|
|
|
|
**Location**: `crawlab/core/models/models/node.go`
|
|
|
|
```go
|
|
type Node struct {
|
|
// ... existing fields
|
|
|
|
// New field to track gRPC task handler readiness
|
|
GrpcTaskHandlerReady bool `bson:"grpc_task_handler_ready" json:"grpc_task_handler_ready"`
|
|
}
|
|
```
|
|
|
|
**Location**: `crawlab/core/task/handler/service.go`
|
|
|
|
```go
|
|
func (svc *Service) Start() {
|
|
// ... existing startup
|
|
|
|
// After all streams established
|
|
go svc.monitorTaskHandlerReadiness()
|
|
}
|
|
|
|
func (svc *Service) monitorTaskHandlerReadiness() {
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
for range ticker.C {
|
|
ready := svc.isTaskHandlerReady()
|
|
svc.updateNodeTaskHandlerStatus(ready)
|
|
}
|
|
}
|
|
|
|
func (svc *Service) isTaskHandlerReady() bool {
|
|
// Check if gRPC stream is active and can fetch tasks
|
|
return svc.grpcClient != nil &&
|
|
svc.grpcClient.IsConnected() &&
|
|
svc.taskHandlerConnected
|
|
}
|
|
```
|
|
|
|
**Test Update**: Wait for `grpc_task_handler_ready == true` instead of sleep
|
|
|
|
### 3. Backend Fix - Task Reconciliation Service (Long-term)
|
|
|
|
**Location**: `crawlab/core/task/scheduler/reconciliation.go` (new file)
|
|
|
|
```go
|
|
type ReconciliationService struct {
|
|
schedulerSvc *Service
|
|
}
|
|
|
|
func (svc *ReconciliationService) Start() {
|
|
go svc.reconcilePendingTasks()
|
|
}
|
|
|
|
func (svc *ReconciliationService) reconcilePendingTasks() {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
|
|
for range ticker.C {
|
|
// Find tasks pending > 2 minutes
|
|
stuckTasks := svc.findStuckPendingTasks(2 * time.Minute)
|
|
|
|
for _, task := range stuckTasks {
|
|
node := svc.getNodeById(task.NodeId)
|
|
|
|
// Check if node can fetch tasks
|
|
if node == nil || !node.Active || !node.GrpcTaskHandlerReady {
|
|
svc.handleStuckTask(task, node)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (svc *ReconciliationService) handleStuckTask(task *Task, node *Node) {
|
|
if node != nil && node.Active {
|
|
// Node exists but can't fetch - wait longer
|
|
if time.Since(task.CreatedAt) > 5*time.Minute {
|
|
// Reassign to another node
|
|
svc.reassignTask(task)
|
|
}
|
|
} else {
|
|
// Node offline - reassign immediately
|
|
svc.reassignTask(task)
|
|
}
|
|
}
|
|
```
|
|
|
|
### 4. Backend Fix - Hybrid Push/Pull Model (Optional)
|
|
|
|
Enable master to push critical tasks:
|
|
|
|
```go
|
|
func (svc *SchedulerService) EnqueueWithPriority(task *Task, priority int) {
|
|
// Add to queue
|
|
svc.Enqueue(task)
|
|
|
|
// For high priority or selected-nodes, try push
|
|
if priority > 5 || task.Mode == constants.RunTypeSelectedNodes {
|
|
go svc.tryPushTask(task)
|
|
}
|
|
}
|
|
|
|
func (svc *SchedulerService) tryPushTask(task *Task) {
|
|
node := svc.getNodeById(task.NodeId)
|
|
if node != nil && node.GrpcTaskHandlerReady {
|
|
// Send task directly via gRPC
|
|
err := svc.grpcClient.SendTask(node.Key, task)
|
|
if err != nil {
|
|
// Falls back to pull model
|
|
log.Warnf("push failed, task will be pulled: %v", err)
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Implementation Priority
|
|
|
|
### Phase 1 (Immediate) - ✅ COMPLETED
|
|
- [x] Increase test stabilization wait to 30s
|
|
- [x] Document root cause
|
|
- [ ] Verify fix in CI runs
|
|
|
|
### Phase 2 (Short-term - 1-2 weeks)
|
|
- [ ] Add `grpc_task_handler_ready` flag to Node model
|
|
- [ ] Implement readiness monitoring in TaskHandlerService
|
|
- [ ] Update API to expose readiness status
|
|
- [ ] Update CLS-001 test to check readiness instead of sleep
|
|
|
|
### Phase 3 (Medium-term - 1 month)
|
|
- [ ] Implement task reconciliation service
|
|
- [ ] Add metrics for stuck pending tasks
|
|
- [ ] Add alerts for tasks pending > threshold
|
|
- [ ] Improve logging for task assignment debugging
|
|
|
|
### Phase 4 (Long-term - 3 months)
|
|
- [ ] Evaluate hybrid push/pull model
|
|
- [ ] Performance testing with push model
|
|
- [ ] Rollout plan for production
|
|
|
|
## Success Metrics
|
|
|
|
- **Test Success Rate**: CLS-001 should pass 100% of CI runs
|
|
- **Task Assignment Latency**: < 5s from creation to worker fetch
|
|
- **Stuck Task Detection**: 0 tasks pending > 5 minutes without assignment
|
|
- **Worker Reconnection**: Tasks assigned within 10s of reconnection
|
|
|
|
## Related Issues
|
|
|
|
- Test: `crawlab-test` CLS-001 failures
|
|
- Production: Potential task loss during network instability
|
|
- Monitoring: Need metrics for task assignment health
|
|
|
|
## Deep Investigation - CI Run 18712502122
|
|
|
|
**Date**: 2025-10-22
|
|
**Artifacts**: Downloaded from https://github.com/crawlab-team/crawlab-test/actions/runs/18712502122
|
|
|
|
### New Finding: Node Active Flag Issue
|
|
|
|
The CLS-001 test failed with a **different error** than expected:
|
|
|
|
```
|
|
10:01:10 - ERROR - Node not active after reconnection: active=False
|
|
```
|
|
|
|
**This is NOT the task timeout issue** - it's a node activation problem that occurs earlier.
|
|
|
|
### Detailed Timeline from CI Logs
|
|
|
|
```
|
|
# Initial startup (all times in UTC, 18:00:xx)
|
|
18:00:21 - Master started, gRPC server listening on 9666
|
|
18:00:26 - Worker started, gRPC client connected to master
|
|
18:00:28 - Worker registered: ef266fae-af2d-11f0-84cb-62d0bd80ca3e
|
|
18:00:28 - Master registered worker, subscription active
|
|
|
|
# Test execution begins (test times 10:00:xx local, 18:00:xx UTC)
|
|
10:00:29 (18:00:29) - Step 1: Initial cluster verified (3 containers)
|
|
10:00:29 (18:00:29) - Step 2: Worker disconnected from network
|
|
|
|
# Network disconnection impact
|
|
18:00:35 - Worker gRPC errors start (6s after disconnect)
|
|
- TaskHandlerService: connection timed out
|
|
- WorkerService: failed to receive from master
|
|
- DependencyHandler: failed to receive message
|
|
18:00:36 - Worker enters TRANSIENT_FAILURE state
|
|
18:00:36-37 - Reconnection attempts fail (backoff 1s, 2s)
|
|
|
|
# Worker reconnection
|
|
10:00:39 (18:00:39) - Step 4: Worker reconnected to network
|
|
18:00:39 - Worker gRPC reconnection successful
|
|
18:00:39 - WorkerService subscribed to master
|
|
18:00:39 - Master received subscribe request
|
|
18:00:40 - DependencyServiceServer connected
|
|
18:00:40 - Test API: Worker shows active=True, status=online ✅
|
|
18:00:41 - GrpcClient: Full reconnection readiness achieved
|
|
|
|
# Test stabilization wait
|
|
10:00:40 - Test: "Waiting 30s for gRPC connections to fully stabilize"
|
|
(30 second sleep period begins)
|
|
|
|
# SECOND disconnection during stabilization wait!
|
|
18:00:57 - DependencyServiceServer disconnected (!!)
|
|
18:00:57 - Master unsubscribed from node
|
|
18:01:16 - Worker gRPC errors again (19s after second disconnect)
|
|
- DependencyHandler: connection timed out
|
|
- TaskHandlerService: failed to report status
|
|
- WorkerService: failed to receive from master
|
|
18:01:17 - Worker heartbeat succeeded, resubscribed
|
|
18:01:25 - DependencyServiceServer reconnected
|
|
|
|
# Test verification (after 30s wait)
|
|
10:01:10 - Test checks node status
|
|
10:01:10 - ERROR: active=False ❌
|
|
```
|
|
|
|
### Root Cause Analysis - Complete Picture
|
|
|
|
After researching the codebase, the test is failing due to a **timing race condition** involving three components:
|
|
|
|
#### Component 1: gRPC Connection Lifecycle
|
|
**File**: `crawlab/core/grpc/client/client.go`
|
|
|
|
The worker maintains multiple gRPC streams:
|
|
1. **NodeService.Subscribe()** - Control plane (heartbeat, commands)
|
|
2. **DependencyService.Connect()** - Dependency management
|
|
3. **TaskHandlerService** - Task execution
|
|
|
|
Each stream is independent and managed by different goroutines. When network disconnects:
|
|
```go
|
|
// Keepalive configuration (client.go:1047)
|
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
Time: 20 * time.Second, // Send keepalive every 20s
|
|
Timeout: 5 * time.Second, // Timeout if no response
|
|
PermitWithoutStream: true,
|
|
})
|
|
```
|
|
|
|
**Connection timeout**: After ~25 seconds of no network, keepalive fails → stream closes
|
|
|
|
#### Component 2: Master Monitoring Loop
|
|
**File**: `crawlab/core/node/service/master_service.go`
|
|
|
|
```go
|
|
monitorInterval: 15 * time.Second // Check nodes every 15s
|
|
|
|
func (svc *MasterService) monitor() error {
|
|
// For each worker node:
|
|
// 1. Check if Subscribe stream exists
|
|
ok := svc.subscribeNode(n) // Line 204
|
|
if !ok {
|
|
svc.setWorkerNodeOffline(n) // Sets active=false
|
|
return
|
|
}
|
|
|
|
// 2. Ping via stream
|
|
ok = svc.pingNodeClient(n) // Line 211
|
|
if !ok {
|
|
svc.setWorkerNodeOffline(n) // Sets active=false
|
|
return
|
|
}
|
|
|
|
// 3. Both succeed
|
|
svc.setWorkerNodeOnline(n) // Sets active=true
|
|
}
|
|
```
|
|
|
|
#### Component 3: Node Status Update Mechanism
|
|
|
|
**Active flag is set to TRUE by**:
|
|
1. `Register()` - Initial registration (node_service_server.go:55)
|
|
2. `SendHeartbeat()` - HTTP heartbeat every 15s (node_service_server.go:99)
|
|
3. `setWorkerNodeOnline()` - Master monitor when streams healthy (master_service.go:259)
|
|
|
|
**Active flag is set to FALSE by**:
|
|
1. `setWorkerNodeOffline()` - Master monitor when streams fail (master_service.go:235)
|
|
|
|
**Subscribe() does NOT update active** - It only manages the stream connection (node_service_server.go:112)
|
|
|
|
#### The Race Condition Explained
|
|
|
|
```
|
|
Timeline during test:
|
|
|
|
18:00:29 - Network disconnected (test-induced)
|
|
18:00:35 - Keepalive timeout (6s later)
|
|
- All gRPC streams close: Subscribe, Dependency, TaskHandler
|
|
- Worker log: "connection timed out"
|
|
|
|
18:00:39 - Network reconnected
|
|
18:00:39 - Worker reconnects, Subscribe succeeds
|
|
18:00:40 - DependencyService connects
|
|
18:00:40 - Test API check: active=True ✅ (from previous state or heartbeat)
|
|
18:00:40 - Test begins 30s stabilization wait
|
|
|
|
18:00:52 - Goroutine leak warning (master under stress)
|
|
|
|
18:00:57 - SECOND DISCONNECTION! Why?
|
|
- DependencyService stream closes (18s after reconnect)
|
|
- Possible causes:
|
|
a) Connection still unstable from first disconnect
|
|
b) Keepalive timeout during stream re-establishment
|
|
c) Master resource exhaustion (269 goroutines leaked)
|
|
d) Network flapping in CI environment
|
|
- Subscribe stream ALSO closes
|
|
|
|
18:01:07 - Master monitor runs (15s cycle from 18:00:52)
|
|
- subscribeNode() returns false (stream gone)
|
|
- setWorkerNodeOffline() called
|
|
- active=false written to DB
|
|
|
|
18:01:10 - Test checks status: active=False ❌
|
|
- Master monitor just set it false 3s ago
|
|
|
|
18:01:17 - Worker successfully reconnects AGAIN
|
|
- Subscribe succeeds
|
|
- Heartbeat succeeds (sets active=true via RPC)
|
|
|
|
18:01:22 - Master monitor runs again (15s from 18:01:07)
|
|
- subscribeNode() returns true
|
|
- setWorkerNodeOnline() called
|
|
- active=true ✅ (too late for test!)
|
|
```
|
|
|
|
#### Why the Second Disconnection?
|
|
|
|
Analyzing `client.go` reconnection flow (lines 800-870):
|
|
```go
|
|
func (c *GrpcClient) executeReconnection() {
|
|
// ...
|
|
if err := c.doConnect(); err == nil {
|
|
// Stabilization delay
|
|
time.Sleep(connectionStabilizationDelay) // 2 seconds
|
|
|
|
// Wait for full readiness
|
|
c.waitForFullReconnectionReady() // Max 30 seconds
|
|
|
|
// Clear reconnecting flag
|
|
c.reconnecting = false
|
|
}
|
|
}
|
|
```
|
|
|
|
The second disconnection at 18:00:57 (18s after reconnect) suggests:
|
|
1. **Stabilization period is too short** - 2s delay + some readiness checks ≠ stable
|
|
2. **Multiple services reconnecting** - Each has its own stream lifecycle
|
|
3. **CI environment stress** - Goroutine leak indicates master under load
|
|
4. **Network quality** - Test uses Docker network disconnect, may cause lingering issues
|
|
|
|
### Code Analysis Needed
|
|
|
|
**Findings from codebase research:**
|
|
|
|
#### 1. Subscribe() Design - Intentionally Does NOT Update Active Flag
|
|
|
|
**File**: `crawlab/core/grpc/server/node_service_server.go:112-143`
|
|
|
|
```go
|
|
func (svr NodeServiceServer) Subscribe(...) error {
|
|
// 1. Find node in database
|
|
node, err := service.NewModelService[models.Node]().GetOne(...)
|
|
|
|
// 2. Store stream reference IN MEMORY only
|
|
nodeServiceMutex.Lock()
|
|
svr.subs[node.Id] = stream
|
|
nodeServiceMutex.Unlock()
|
|
|
|
// ⚠️ NO DATABASE UPDATE - Active flag NOT modified
|
|
|
|
// 3. Wait for stream to close
|
|
<-stream.Context().Done()
|
|
|
|
// 4. Remove stream reference
|
|
delete(svr.subs, node.Id)
|
|
}
|
|
```
|
|
|
|
**Design Rationale**:
|
|
- **Separation of concerns**: Stream lifecycle ≠ Node health status
|
|
- **Avoid races**: Multiple goroutines shouldn't update same node concurrently
|
|
- **Monitoring is authoritative**: Master monitor performs health checks before declaring online
|
|
- **Pessimistic safety**: Better to be cautious than risk assigning tasks to dead nodes
|
|
|
|
#### 2. Master Monitoring - The Source of Truth
|
|
|
|
**File**: `crawlab/core/node/service/master_service.go:175-231`
|
|
|
|
The master runs a monitoring loop **every 15 seconds**:
|
|
|
|
```go
|
|
func (svc *MasterService) monitor() error {
|
|
workerNodes, _ := svc.nodeMonitoringSvc.GetAllWorkerNodes()
|
|
|
|
for _, node := range workerNodes {
|
|
// Step 1: Check if subscription stream exists
|
|
ok := svc.subscribeNode(n) // Checks svr.subs map
|
|
if !ok {
|
|
svc.setWorkerNodeOffline(n) // active=false
|
|
return
|
|
}
|
|
|
|
// Step 2: Ping via stream to verify it's alive
|
|
ok = svc.pingNodeClient(n) // Send heartbeat over stream
|
|
if !ok {
|
|
svc.setWorkerNodeOffline(n) // active=false
|
|
return
|
|
}
|
|
|
|
// Step 3: Both tests passed
|
|
svc.setWorkerNodeOnline(n) // active=true, status=online
|
|
}
|
|
}
|
|
```
|
|
|
|
#### 3. gRPC Keepalive Configuration
|
|
|
|
**File**: `crawlab/core/grpc/client/client.go:1046-1051`
|
|
|
|
```go
|
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
Time: 20 * time.Second, // Send ping every 20s
|
|
Timeout: 5 * time.Second, // Fail if no response within 5s
|
|
PermitWithoutStream: true, // Allow keepalive even without active RPCs
|
|
})
|
|
```
|
|
|
|
**Implication**: If network is down for >25 seconds, keepalive fails → stream closes
|
|
|
|
#### 4. The Timing Gap
|
|
|
|
```
|
|
Master Monitor Cycle (every 15s):
|
|
┌─────────────────────────────────────────────┐
|
|
│ T+0s: Check all nodes │
|
|
│ T+15s: Check all nodes │
|
|
│ T+30s: Check all nodes │
|
|
└─────────────────────────────────────────────┘
|
|
|
|
Worker Reconnection (happens between monitor cycles):
|
|
┌─────────────────────────────────────────────┐
|
|
│ T+7s: Network restored │
|
|
│ T+7s: Subscribe succeeds → stream in subs │
|
|
│ T+7s: Test checks DB → active=false ❌ │
|
|
│ T+15s: Monitor runs → active=true ✅ │
|
|
└─────────────────────────────────────────────┘
|
|
```
|
|
|
|
**Maximum lag**: Up to 15 seconds between reconnection and `active=true`
|
|
|
|
#### 5. Why Second Disconnection Happens
|
|
|
|
From worker logs analysis + code review:
|
|
|
|
1. **Keepalive timeout during reconnection** (client.go:800-870)
|
|
- First reconnect establishes connection
|
|
- But takes ~30s for "full reconnection readiness"
|
|
- During this window, streams may timeout again
|
|
|
|
2. **Multiple independent streams** (dependency_service_server.go:31-40)
|
|
- Each service has its own stream: Subscribe, Dependency, TaskHandler
|
|
- Each can fail independently
|
|
- Logs show DependencyService disconnecting separately
|
|
|
|
3. **CI environment stress**
|
|
- Master goroutine leak (299 goroutines)
|
|
- May cause slow stream handling
|
|
- Network namespace changes in Docker can cause flapping
|
|
|
|
## Research Summary & Recommendations
|
|
|
|
After deep investigation of the codebase and CI artifacts, the test failure is caused by a **design characteristic**, not a bug:
|
|
|
|
### Current Architecture (By Design)
|
|
|
|
1. **Subscribe() manages streams, not node state** - Intentional separation of concerns
|
|
2. **Master monitor is the source of truth** - Runs health checks before declaring nodes online
|
|
3. **15-second monitoring interval** - Trade-off between responsiveness and overhead
|
|
4. **Pessimistic safety model** - Better to wait than assign tasks to potentially dead nodes
|
|
|
|
### Why This Causes Test Failures
|
|
|
|
The test assumes successful reconnection → immediate `active=true`, but the system is designed for:
|
|
- **Gradual recovery**: Subscribe → Wait for monitor → Verify health → Set active
|
|
- **Resilience over speed**: Don't trust a connection until proven stable
|
|
- **Eventual consistency**: Node state converges within one monitor cycle (max 15s)
|
|
|
|
### The Real Issue
|
|
|
|
The problem isn't the architecture - it's that the **test timing doesn't account for the monitor cycle**:
|
|
- Test waits 30s fixed delay
|
|
- But needs: reconnection + stabilization + next monitor cycle
|
|
- **Worst case**: 0s (just missed monitor) + 2s (stabilization) + 15s (next monitor) + network/gRPC overhead = ~20s
|
|
- **With second disconnection**: Can exceed 30s easily
|
|
|
|
### Four Solution Options
|
|
|
|
#### Option 1: Fix the Test ⭐ RECOMMENDED FOR IMMEDIATE FIX
|
|
|
|
**Pros**: No production code changes, maintains safety, quick to implement
|
|
|
|
```python
|
|
def wait_for_stable_node(node_id, stability_period=20, timeout=90):
|
|
"""Wait for node active AND stable for full monitor cycle."""
|
|
first_active_time = None
|
|
start = time.time()
|
|
|
|
while time.time() - start < timeout:
|
|
node = api.get_node(node_id)
|
|
|
|
if node['active'] and node['status'] == 'online':
|
|
if first_active_time is None:
|
|
first_active_time = time.time()
|
|
self.logger.info("Node active, waiting for stability...")
|
|
elif time.time() - first_active_time >= stability_period:
|
|
return True # Stable for > monitor interval
|
|
else:
|
|
first_active_time = None # Reset if goes inactive
|
|
|
|
time.sleep(2)
|
|
|
|
return False
|
|
```
|
|
|
|
#### Option 2: Make Subscribe() Update Active ⚠️ LESS SAFE
|
|
|
|
**Pros**: Faster recovery, tests pass without changes
|
|
**Cons**: Could set active=true for unstable connections, may assign tasks to dying nodes
|
|
|
|
```go
|
|
// crawlab/core/grpc/server/node_service_server.go
|
|
func (svr NodeServiceServer) Subscribe(...) error {
|
|
node, _ := service.NewModelService[models.Node]().GetOne(...)
|
|
|
|
// NEW: Immediately mark active
|
|
node.Active = true
|
|
node.ActiveAt = time.Now()
|
|
node.Status = constants.NodeStatusOnline
|
|
service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
|
|
|
|
svr.subs[node.Id] = stream
|
|
// ...
|
|
}
|
|
```
|
|
|
|
#### Option 3: Reduce Monitor Interval (5s)
|
|
|
|
**Pros**: Faster recovery (max 5s lag), keeps safety model
|
|
**Cons**: 3x overhead, more DB writes, higher CPU
|
|
|
|
#### Option 4: Hybrid Approach ⭐ BEST LONG-TERM
|
|
|
|
Optimistic initial set + pessimistic verification:
|
|
|
|
```go
|
|
func (svr NodeServiceServer) Subscribe(...) error {
|
|
node, _ := service.NewModelService[models.Node]().GetOne(...)
|
|
|
|
// Optimistic: Set active immediately for fast recovery
|
|
node.Active = true
|
|
node.ActiveAt = time.Now()
|
|
node.Status = constants.NodeStatusOnline
|
|
service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
|
|
|
|
svr.subs[node.Id] = stream
|
|
|
|
// Pessimistic: Monitor will verify and revert if unhealthy
|
|
}
|
|
```
|
|
|
|
**Benefits**: Fast recovery + safety net
|
|
|
|
### Recommended Implementation Plan
|
|
|
|
**Phase 1 (Immediate - This Week)** ✅ **COMPLETED**:
|
|
1. ✅ Backend: Implemented hybrid approach in `Subscribe()` - Sets active=true optimistically
|
|
2. ✅ Test: Replaced fixed 30s sleep with intelligent polling for stability (20s > monitor interval)
|
|
3. ✅ Documentation: Complete analysis with code research in analysis.md
|
|
|
|
**Implementation Details**:
|
|
- **Backend Fix**: `crawlab/core/grpc/server/node_service_server.go`
|
|
- Subscribe() now immediately sets active=true, status=online when worker reconnects
|
|
- Master monitor still verifies health and can revert if connection unstable
|
|
- Provides fast recovery (< 1s) while maintaining safety
|
|
|
|
- **Test Fix**: `crawlab-test/runners/cluster/CLS_001_*.py`
|
|
- Polls every 2s checking if node is active=true AND status=online
|
|
- Requires node to stay stable for 20s (> master monitor interval of 15s)
|
|
- Resets timer if node goes inactive during wait
|
|
- Max timeout: 120s (plenty of time for recovery + stabilization)
|
|
|
|
**Phase 2 (Short-term - 1-2 Weeks)**:
|
|
1. Add metric for "time to active after reconnection"
|
|
2. Integration test verifying recovery < 5s
|
|
3. Monitor CI runs for any regressions
|
|
|
|
**Phase 3 (Long-term - 1-2 Months)**:
|
|
1. Add gRPC readiness monitoring
|
|
2. Connection stability tracking
|
|
3. Task reconciliation for stuck pending tasks
|
|
|
|
### Code Analysis Needed
|
|
|
|
**1. Node Registration Logic**
|
|
```bash
|
|
# Need to check:
|
|
crawlab/core/node/service/worker_service.go
|
|
- How is 'active' flag set during registration?
|
|
- How is 'active' flag updated during reconnection?
|
|
- Is there a separate heartbeat that sets active=true?
|
|
```
|
|
|
|
**2. Master Subscription Handler**
|
|
```bash
|
|
# Master log shows subscription but active stays false:
|
|
crawlab/core/grpc/server/node_server.go
|
|
- GrpcNodeServiceServer.Subscribe() implementation
|
|
- Does Subscribe() update active flag?
|
|
- Or only Register() sets it?
|
|
```
|
|
|
|
**3. Worker Heartbeat Mechanism**
|
|
```bash
|
|
# Worker log: "heartbeat succeeded after 2 attempts"
|
|
crawlab/core/node/service/worker_service.go
|
|
- What does heartbeat update in the database?
|
|
- Should heartbeat set active=true?
|
|
```
|
|
|
|
### Implications for Original Issue
|
|
|
|
The **original task assignment issue** may still exist, but this test run **failed earlier** due to:
|
|
1. Unstable reconnection (double disconnect)
|
|
2. Node active flag not restored
|
|
|
|
**We never reached the task creation step** in this run.
|
|
|
|
### The Solution
|
|
|
|
The `active` flag IS being properly managed:
|
|
- **Register()** sets `active=true` (node_service_server.go:55)
|
|
- **SendHeartbeat()** sets `active=true` (node_service_server.go:99)
|
|
- **Subscribe()** does NOT set active (only manages subscription)
|
|
- **Master monitor** checks subscription every 15s:
|
|
- If subscription exists + ping succeeds → `setWorkerNodeOnline()` sets `active=true`
|
|
- If subscription fails → `setWorkerNodeOffline()` sets `active=false`
|
|
|
|
The problem is **timing**:
|
|
```
|
|
Worker reconnects → Subscribe succeeds → active still false (in-memory node object)
|
|
↓
|
|
Test checks status (immediately) → active=false ❌
|
|
↓
|
|
Master monitor runs (next 15s cycle) → active=true ✅ (too late!)
|
|
```
|
|
|
|
### Action Items - Updated Priority
|
|
|
|
#### Immediate (Fix test) ✅
|
|
**Root cause identified: Test timing + Master monitor interval mismatch**
|
|
|
|
**Solution 1 (Backend - Fastest Fix):**
|
|
```go
|
|
// File: crawlab/core/grpc/server/node_service_server.go
|
|
func (svr NodeServiceServer) Subscribe(request *grpc.NodeServiceSubscribeRequest, stream grpc.NodeService_SubscribeServer) error {
|
|
// ... existing code ...
|
|
|
|
// NEW: Immediately mark node as active when subscription succeeds
|
|
node.Active = true
|
|
node.ActiveAt = time.Now()
|
|
node.Status = constants.NodeStatusOnline
|
|
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
|
|
if err != nil {
|
|
svr.Errorf("failed to update node status on subscribe: %v", err)
|
|
}
|
|
|
|
// ... rest of existing code ...
|
|
}
|
|
```
|
|
|
|
**Solution 2 (Test - Workaround):**
|
|
```python
|
|
# File: crawlab-test/runners/cluster/CLS_001_*.py
|
|
|
|
# Instead of fixed 30s wait, poll until stable
|
|
def wait_for_stable_connection(self, node_id, timeout=60):
|
|
"""Wait until node is active AND stays active for monitor interval."""
|
|
stable_duration = 20 # > master monitor interval (15s)
|
|
start = time.time()
|
|
first_active = None
|
|
|
|
while time.time() - start < timeout:
|
|
node = self.api.get_node(node_id)
|
|
|
|
if node['active'] and node['status'] == 'online':
|
|
if first_active is None:
|
|
first_active = time.time()
|
|
self.logger.info("Node is active, waiting for stability...")
|
|
elif time.time() - first_active >= stable_duration:
|
|
self.logger.info(f"Node stable for {stable_duration}s")
|
|
return True
|
|
else:
|
|
first_active = None # Reset if goes inactive
|
|
|
|
time.sleep(2)
|
|
|
|
return False
|
|
```
|
|
|
|
#### Medium-term (Original issue)
|
|
1. [ ] Once test passes, verify if task assignment issue still exists
|
|
2. [ ] Implement gRPC readiness flag (as previously designed)
|
|
3. [ ] Add task reconciliation service
|
|
|
|
#### Long-term (Architecture improvement)
|
|
1. [ ] Reduce master monitor interval to 5s for faster recovery
|
|
2. [ ] Add circuit breaker to prevent rapid disconnect/reconnect cycles
|
|
3. [ ] Implement connection stability metrics
|
|
|
|
### Test Environment
|
|
- **CI Runner**: GitHub Actions (Ubuntu 24.04, 6.11.0 kernel)
|
|
- **Docker**: 28.0.4, Compose v2.38.2
|
|
- **Resources**: 16GB RAM, 72GB disk (75% used)
|
|
- **Containers**:
|
|
- Master: healthy
|
|
- Worker: **unhealthy** (after test run)
|
|
- Mongo: healthy
|
|
|
|
## References
|
|
|
|
- Test Spec: `crawlab-test/specs/cluster/CLS-001-master-worker-node-disconnection-and-reconnection-stability.md`
|
|
- Test Runner: `crawlab-test/runners/cluster/CLS_001_master_worker_node_disconnection_and_reconnection_stability.py`
|
|
- Detailed Analysis: `crawlab-test/tmp/CLS-001-analysis.md`
|
|
- **CI Artifacts**: `tmp/cls-001-investigation/test-results-cluster-21/` (run 18712502122)
|
|
- Task Scheduler: `crawlab/core/task/scheduler/service.go`
|
|
- gRPC Task Server: `crawlab/core/grpc/server/task_server_v2.go`
|
|
- Node Service: `crawlab/core/node/service/`
|
|
- **Node Registration**: `crawlab/core/grpc/server/node_server.go`
|
|
- **Worker Service**: `crawlab/core/node/service/worker_service.go`
|