mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
fix: improve task cancellation and connection health check logic with timeout handling
This commit is contained in:
@@ -279,6 +279,24 @@ func (r *Runner) Cancel(force bool) (err error) {
|
||||
// Signal goroutines to stop
|
||||
r.cancel()
|
||||
|
||||
// Stop health check ticker immediately to prevent interference
|
||||
if r.connHealthTicker != nil {
|
||||
r.connHealthTicker.Stop()
|
||||
r.Debugf("stopped connection health ticker")
|
||||
}
|
||||
|
||||
// Close gRPC connection to stop health check messages
|
||||
r.connMutex.Lock()
|
||||
if r.conn != nil {
|
||||
_ = r.conn.CloseSend()
|
||||
r.conn = nil
|
||||
r.Debugf("closed gRPC connection to stop health checks")
|
||||
}
|
||||
r.connMutex.Unlock()
|
||||
|
||||
// Wait a moment for background goroutines to respond to cancellation signal
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// If force is not requested, try graceful termination first
|
||||
if !force {
|
||||
r.Debugf("attempting graceful termination of process[%d]", r.pid)
|
||||
@@ -334,6 +352,19 @@ forceKill:
|
||||
case <-ticker.C:
|
||||
if !utils.ProcessIdExists(r.pid) {
|
||||
r.Debugf("process[%d] terminated successfully", r.pid)
|
||||
// Wait for background goroutines to finish with timeout
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
r.wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
r.Debugf("all background goroutines stopped")
|
||||
case <-time.After(5 * time.Second):
|
||||
r.Warnf("some background goroutines did not stop within timeout")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -906,7 +937,16 @@ func (r *Runner) isConnectionHealthy() bool {
|
||||
if r.conn == nil {
|
||||
return false
|
||||
}
|
||||
// Try to send a ping-like message to test connection
|
||||
|
||||
// Check if context is already cancelled - don't do health checks during cancellation
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
r.Debugf("skipping health check - task is being cancelled")
|
||||
return false
|
||||
default:
|
||||
}
|
||||
|
||||
// Try to send a ping-like message to test connection with timeout
|
||||
// Use a simple log message as ping since PING code doesn't exist
|
||||
testMsg := &grpc.TaskServiceConnectRequest{
|
||||
Code: grpc.TaskServiceConnectCode_INSERT_LOGS,
|
||||
@@ -914,12 +954,27 @@ func (r *Runner) isConnectionHealthy() bool {
|
||||
Data: []byte(`["[HEALTH CHECK] connection test"]`),
|
||||
}
|
||||
|
||||
if err := r.conn.Send(testMsg); err != nil {
|
||||
r.Debugf("connection health check failed: %v", err)
|
||||
// Use a channel to make the Send operation timeout-aware
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- r.conn.Send(testMsg)
|
||||
}()
|
||||
|
||||
// Wait for either completion or timeout
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
r.Debugf("connection health check failed: %v", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
case <-time.After(5 * time.Second):
|
||||
r.Debugf("connection health check timed out")
|
||||
return false
|
||||
case <-r.ctx.Done():
|
||||
r.Debugf("connection health check cancelled")
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// reconnectWithRetry attempts to reconnect to the gRPC service with exponential backoff
|
||||
|
||||
Reference in New Issue
Block a user