mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-22 17:31:03 +01:00
181 lines
3.9 KiB
Go
181 lines
3.9 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/crawlab-team/crawlab/core/grpc/client"
|
|
"github.com/crawlab-team/crawlab/core/grpc/server"
|
|
"github.com/crawlab-team/crawlab/core/interfaces"
|
|
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
|
|
"github.com/crawlab-team/crawlab/core/utils"
|
|
)
|
|
|
|
type HealthService struct {
|
|
// settings
|
|
healthFilePath string
|
|
updateInterval time.Duration
|
|
|
|
// context and synchronization
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
mu sync.RWMutex
|
|
|
|
// state
|
|
isReady bool
|
|
stopped bool
|
|
|
|
// dependencies
|
|
cfgSvc interfaces.NodeConfigService
|
|
interfaces.Logger
|
|
}
|
|
|
|
type HealthCheckFunc func() bool
|
|
|
|
func (svc *HealthService) Start(customHealthCheck HealthCheckFunc) {
|
|
svc.ctx, svc.cancel = context.WithCancel(context.Background())
|
|
|
|
svc.wg.Add(1)
|
|
go func() {
|
|
defer svc.wg.Done()
|
|
svc.writeHealthFile(customHealthCheck)
|
|
}()
|
|
}
|
|
|
|
func (svc *HealthService) Stop() {
|
|
svc.mu.Lock()
|
|
if svc.stopped {
|
|
svc.mu.Unlock()
|
|
return
|
|
}
|
|
svc.stopped = true
|
|
svc.mu.Unlock()
|
|
|
|
if svc.cancel != nil {
|
|
svc.cancel()
|
|
}
|
|
|
|
// Clean up health file
|
|
if svc.healthFilePath != "" {
|
|
os.Remove(svc.healthFilePath)
|
|
}
|
|
|
|
// Wait for goroutines to finish with timeout
|
|
done := make(chan struct{})
|
|
go func() {
|
|
svc.wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
svc.Debugf("health service stopped gracefully")
|
|
case <-time.After(5 * time.Second):
|
|
svc.Warnf("health service shutdown timed out")
|
|
}
|
|
}
|
|
|
|
func (svc *HealthService) SetReady(ready bool) {
|
|
svc.mu.Lock()
|
|
svc.isReady = ready
|
|
svc.mu.Unlock()
|
|
}
|
|
|
|
func (svc *HealthService) IsReady() bool {
|
|
svc.mu.RLock()
|
|
defer svc.mu.RUnlock()
|
|
return svc.isReady
|
|
}
|
|
|
|
func (svc *HealthService) writeHealthFile(customHealthCheck HealthCheckFunc) {
|
|
ticker := time.NewTicker(svc.updateInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-svc.ctx.Done():
|
|
svc.Debugf("health file writer stopping due to context cancellation")
|
|
return
|
|
case <-ticker.C:
|
|
svc.updateHealthFile(customHealthCheck)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (svc *HealthService) updateHealthFile(customHealthCheck HealthCheckFunc) {
|
|
if svc.healthFilePath == "" {
|
|
return
|
|
}
|
|
|
|
svc.mu.RLock()
|
|
isReady := svc.isReady
|
|
stopped := svc.stopped
|
|
svc.mu.RUnlock()
|
|
|
|
// Determine node type and health status
|
|
nodeType := utils.GetNodeType()
|
|
isHealthy := isReady && !stopped
|
|
|
|
// Add node-type-specific health checks
|
|
if customHealthCheck != nil {
|
|
isHealthy = isHealthy && customHealthCheck()
|
|
} else {
|
|
// Default health checks based on node type
|
|
if utils.IsMaster() {
|
|
// Master node health: check if gRPC server is running
|
|
grpcServer := server.GetGrpcServer()
|
|
isHealthy = isHealthy && grpcServer != nil
|
|
} else {
|
|
// Worker node health: check if gRPC client is connected
|
|
grpcClient := client.GetGrpcClient()
|
|
isHealthy = isHealthy && grpcClient != nil && !grpcClient.IsClosed()
|
|
}
|
|
}
|
|
|
|
healthData := fmt.Sprintf(`{
|
|
"healthy": %t,
|
|
"timestamp": "%s",
|
|
"node_type": "%s",
|
|
"node_key": "%s",
|
|
"ready": %t,
|
|
"stopped": %t
|
|
}
|
|
`, isHealthy, time.Now().Format(time.RFC3339), nodeType, svc.cfgSvc.GetNodeKey(), isReady, stopped)
|
|
|
|
// Write to temporary file first, then rename for atomicity
|
|
tmpPath := svc.healthFilePath + ".tmp"
|
|
if err := os.WriteFile(tmpPath, []byte(healthData), 0644); err != nil {
|
|
svc.Errorf("failed to write health file: %v", err)
|
|
return
|
|
}
|
|
|
|
if err := os.Rename(tmpPath, svc.healthFilePath); err != nil {
|
|
svc.Errorf("failed to rename health file: %v", err)
|
|
os.Remove(tmpPath) // Clean up temp file
|
|
return
|
|
}
|
|
}
|
|
|
|
func newHealthService() *HealthService {
|
|
return &HealthService{
|
|
healthFilePath: "/tmp/crawlab_health",
|
|
updateInterval: 30 * time.Second,
|
|
cfgSvc: nodeconfig.GetNodeConfigService(),
|
|
Logger: utils.NewLogger("HealthService"),
|
|
}
|
|
}
|
|
|
|
var healthService *HealthService
|
|
var healthServiceOnce sync.Once
|
|
|
|
func GetHealthService() *HealthService {
|
|
healthServiceOnce.Do(func() {
|
|
healthService = newHealthService()
|
|
})
|
|
return healthService
|
|
}
|