From 45913ad7e41e3730b8f2e47e6974ae115eb328ba Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Fri, 8 Aug 2025 00:05:00 +0800 Subject: [PATCH] refactor: implement health service for master and worker nodes; add health check script and integrate health checks into service lifecycle --- Dockerfile | 3 +- core/node/service/health_service.go | 180 +++++++++++++++++++++++++++ core/node/service/master_service.go | 17 +++ core/node/service/worker_service.go | 184 +++++++++++++++++++--------- docker/bin/health-check.sh | 33 +++++ 5 files changed, 358 insertions(+), 59 deletions(-) create mode 100644 core/node/service/health_service.go create mode 100644 docker/bin/health-check.sh diff --git a/Dockerfile b/Dockerfile index f9aec9f7..ab156c2f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,6 +12,7 @@ COPY --from=frontend-build /app/dist /app/dist COPY ./backend/conf /app/backend/conf COPY ./docker/nginx/crawlab.conf /etc/nginx/conf.d COPY ./docker/bin/docker-init.sh /app/bin/docker-init.sh +COPY ./docker/bin/health-check.sh /app/bin/health-check.sh # Start backend CMD ["/bin/bash", "/app/bin/docker-init.sh"] @@ -21,4 +22,4 @@ EXPOSE 8080 # Healthcheck for backend HEALTHCHECK --interval=1m --timeout=3s \ - CMD curl -f http://localhost:8000/health || exit 1 \ No newline at end of file + CMD bash /app/bin/health-check.sh || exit 1 \ No newline at end of file diff --git a/core/node/service/health_service.go b/core/node/service/health_service.go new file mode 100644 index 00000000..6c84a401 --- /dev/null +++ b/core/node/service/health_service.go @@ -0,0 +1,180 @@ +package service + +import ( + "context" + "fmt" + "os" + "sync" + "time" + + "github.com/crawlab-team/crawlab/core/grpc/client" + "github.com/crawlab-team/crawlab/core/grpc/server" + "github.com/crawlab-team/crawlab/core/interfaces" + nodeconfig "github.com/crawlab-team/crawlab/core/node/config" + "github.com/crawlab-team/crawlab/core/utils" +) + +type HealthService struct { + // settings + healthFilePath string + updateInterval time.Duration + + // context and synchronization + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + mu sync.RWMutex + + // state + isReady bool + stopped bool + + // dependencies + cfgSvc interfaces.NodeConfigService + interfaces.Logger +} + +type HealthCheckFunc func() bool + +func (svc *HealthService) Start(customHealthCheck HealthCheckFunc) { + svc.ctx, svc.cancel = context.WithCancel(context.Background()) + + svc.wg.Add(1) + go func() { + defer svc.wg.Done() + svc.writeHealthFile(customHealthCheck) + }() +} + +func (svc *HealthService) Stop() { + svc.mu.Lock() + if svc.stopped { + svc.mu.Unlock() + return + } + svc.stopped = true + svc.mu.Unlock() + + if svc.cancel != nil { + svc.cancel() + } + + // Clean up health file + if svc.healthFilePath != "" { + os.Remove(svc.healthFilePath) + } + + // Wait for goroutines to finish with timeout + done := make(chan struct{}) + go func() { + svc.wg.Wait() + close(done) + }() + + select { + case <-done: + svc.Debugf("health service stopped gracefully") + case <-time.After(5 * time.Second): + svc.Warnf("health service shutdown timed out") + } +} + +func (svc *HealthService) SetReady(ready bool) { + svc.mu.Lock() + svc.isReady = ready + svc.mu.Unlock() +} + +func (svc *HealthService) IsReady() bool { + svc.mu.RLock() + defer svc.mu.RUnlock() + return svc.isReady +} + +func (svc *HealthService) writeHealthFile(customHealthCheck HealthCheckFunc) { + ticker := time.NewTicker(svc.updateInterval) + defer ticker.Stop() + + for { + select { + case <-svc.ctx.Done(): + svc.Debugf("health file writer stopping due to context cancellation") + return + case <-ticker.C: + svc.updateHealthFile(customHealthCheck) + } + } +} + +func (svc *HealthService) updateHealthFile(customHealthCheck HealthCheckFunc) { + if svc.healthFilePath == "" { + return + } + + svc.mu.RLock() + isReady := svc.isReady + stopped := svc.stopped + svc.mu.RUnlock() + + // Determine node type and health status + nodeType := utils.GetNodeType() + isHealthy := isReady && !stopped + + // Add node-type-specific health checks + if customHealthCheck != nil { + isHealthy = isHealthy && customHealthCheck() + } else { + // Default health checks based on node type + if utils.IsMaster() { + // Master node health: check if gRPC server is running + grpcServer := server.GetGrpcServer() + isHealthy = isHealthy && grpcServer != nil + } else { + // Worker node health: check if gRPC client is connected + grpcClient := client.GetGrpcClient() + isHealthy = isHealthy && grpcClient != nil && !grpcClient.IsClosed() + } + } + + healthData := fmt.Sprintf(`{ + "healthy": %t, + "timestamp": "%s", + "node_type": "%s", + "node_key": "%s", + "ready": %t, + "stopped": %t +} +`, isHealthy, time.Now().Format(time.RFC3339), nodeType, svc.cfgSvc.GetNodeKey(), isReady, stopped) + + // Write to temporary file first, then rename for atomicity + tmpPath := svc.healthFilePath + ".tmp" + if err := os.WriteFile(tmpPath, []byte(healthData), 0644); err != nil { + svc.Errorf("failed to write health file: %v", err) + return + } + + if err := os.Rename(tmpPath, svc.healthFilePath); err != nil { + svc.Errorf("failed to rename health file: %v", err) + os.Remove(tmpPath) // Clean up temp file + return + } +} + +func newHealthService() *HealthService { + return &HealthService{ + healthFilePath: "/tmp/crawlab_health", + updateInterval: 30 * time.Second, + cfgSvc: nodeconfig.GetNodeConfigService(), + Logger: utils.NewLogger("HealthService"), + } +} + +var healthService *HealthService +var healthServiceOnce sync.Once + +func GetHealthService() *HealthService { + healthServiceOnce.Do(func() { + healthService = newHealthService() + }) + return healthService +} diff --git a/core/node/service/master_service.go b/core/node/service/master_service.go index 84222181..fbfb3e0a 100644 --- a/core/node/service/master_service.go +++ b/core/node/service/master_service.go @@ -34,6 +34,7 @@ type MasterService struct { taskHandlerSvc *handler.Service scheduleSvc *schedule.Service systemSvc *system.Service + healthSvc *HealthService // settings monitorInterval time.Duration @@ -51,6 +52,18 @@ func (svc *MasterService) Start() { panic(err) } + // start health service + go svc.healthSvc.Start(func() bool { + // Master-specific health check: verify gRPC server and core services are running + return svc.server != nil && + svc.taskSchedulerSvc != nil && + svc.taskHandlerSvc != nil && + svc.scheduleSvc != nil + }) + + // mark as ready after registration + svc.healthSvc.SetReady(true) + // create indexes go common.InitIndexes() @@ -80,6 +93,9 @@ func (svc *MasterService) Wait() { func (svc *MasterService) Stop() { _ = svc.server.Stop() svc.taskHandlerSvc.Stop() + if svc.healthSvc != nil { + svc.healthSvc.Stop() + } svc.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey()) } @@ -304,6 +320,7 @@ func newMasterService() *MasterService { taskHandlerSvc: handler.GetTaskHandlerService(), scheduleSvc: schedule.GetScheduleService(), systemSvc: system.GetSystemService(), + healthSvc: GetHealthService(), Logger: utils.NewLogger("MasterService"), } } diff --git a/core/node/service/worker_service.go b/core/node/service/worker_service.go index 693204a7..1a96a8ec 100644 --- a/core/node/service/worker_service.go +++ b/core/node/service/worker_service.go @@ -2,16 +2,10 @@ package service import ( "context" - "errors" "fmt" - "net" - "net/http" "sync" "time" - "github.com/crawlab-team/crawlab/core/controllers" - "github.com/gin-gonic/gin" - "github.com/crawlab-team/crawlab/core/models/models" "github.com/cenkalti/backoff/v4" @@ -29,11 +23,18 @@ type WorkerService struct { // dependencies cfgSvc interfaces.NodeConfigService handlerSvc *handler.Service + healthSvc *HealthService // settings address interfaces.Address heartbeatInterval time.Duration + // context and synchronization + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + mu sync.RWMutex + // internals stopped bool n *models.Node @@ -43,6 +44,9 @@ type WorkerService struct { } func (svc *WorkerService) Start() { + // initialize context for the service + svc.ctx, svc.cancel = context.WithCancel(context.Background()) + // wait for grpc client ready client.GetGrpcClient().WaitForReady() @@ -50,19 +54,45 @@ func (svc *WorkerService) Start() { svc.register() // mark as ready after registration + svc.mu.Lock() svc.isReady = true + svc.mu.Unlock() + svc.healthSvc.SetReady(true) // start health check server - go svc.startHealthServer() + svc.wg.Add(1) + go func() { + defer svc.wg.Done() + // Start health service with worker-specific health check + svc.healthSvc.Start(func() bool { + svc.mu.RLock() + defer svc.mu.RUnlock() + return svc.isReady && !svc.stopped && client.GetGrpcClient() != nil && !client.GetGrpcClient().IsClosed() + }) + }() - // subscribe - go svc.subscribe() + // subscribe to master + svc.wg.Add(1) + go func() { + defer svc.wg.Done() + svc.subscribe() + }() // start sending heartbeat to master - go svc.reportStatus() + svc.wg.Add(1) + go func() { + defer svc.wg.Done() + svc.reportStatus() + }() // start task handler - go svc.handlerSvc.Start() + svc.wg.Add(1) + go func() { + defer svc.wg.Done() + svc.handlerSvc.Start() + }() + + svc.Infof("worker[%s] service started", svc.cfgSvc.GetNodeKey()) // wait for quit signal svc.Wait() @@ -76,10 +106,49 @@ func (svc *WorkerService) Wait() { } func (svc *WorkerService) Stop() { + svc.mu.Lock() + if svc.stopped { + svc.mu.Unlock() + return + } svc.stopped = true - _ = client.GetGrpcClient().Stop() - svc.handlerSvc.Stop() - svc.Infof("worker[%s] service has stopped", svc.cfgSvc.GetNodeKey()) + svc.mu.Unlock() + + svc.Infof("stopping worker[%s] service...", svc.cfgSvc.GetNodeKey()) + + // cancel context to signal all goroutines to stop + if svc.cancel != nil { + svc.cancel() + } + + // stop task handler + if svc.handlerSvc != nil { + svc.handlerSvc.Stop() + } + + // stop health service + if svc.healthSvc != nil { + svc.healthSvc.Stop() + } + + // stop grpc client + if err := client.GetGrpcClient().Stop(); err != nil { + svc.Errorf("error stopping grpc client: %v", err) + } + + // wait for all goroutines to finish with timeout + done := make(chan struct{}) + go func() { + svc.wg.Wait() + close(done) + }() + + select { + case <-done: + svc.Infof("worker[%s] service has stopped gracefully", svc.cfgSvc.GetNodeKey()) + case <-time.After(10 * time.Second): + svc.Warnf("worker[%s] service shutdown timed out", svc.cfgSvc.GetNodeKey()) + } } func (svc *WorkerService) register() { @@ -121,18 +190,22 @@ func (svc *WorkerService) register() { func (svc *WorkerService) reportStatus() { ticker := time.NewTicker(svc.heartbeatInterval) + defer ticker.Stop() + for { - // return if client is closed - if client.GetGrpcClient().IsClosed() { - ticker.Stop() + select { + case <-svc.ctx.Done(): + svc.Debugf("heartbeat goroutine stopping due to context cancellation") return + case <-ticker.C: + // return if client is closed + if client.GetGrpcClient().IsClosed() { + svc.Debugf("heartbeat goroutine stopping due to closed grpc client") + return + } + // send heartbeat + svc.sendHeartbeat() } - - // send heartbeat - svc.sendHeartbeat() - - // sleep - <-ticker.C } } @@ -149,9 +222,11 @@ func (svc *WorkerService) subscribe() { b.Multiplier = 2.0 for { - if svc.stopped { - svc.Infof("subscription stopped. exiting...") + select { + case <-svc.ctx.Done(): + svc.Infof("subscription stopped due to context cancellation") return + default: } // Use backoff for connection attempts @@ -162,10 +237,9 @@ func (svc *WorkerService) subscribe() { svc.Errorf("failed to get node client: %v", err) return err } - // Use client context for proper cancellation - ctx, cancel := client.GetGrpcClient().Context() - defer cancel() - stream, err := nodeClient.Subscribe(ctx, &grpc.NodeServiceSubscribeRequest{ + + // Use service context for proper cancellation + stream, err := nodeClient.Subscribe(svc.ctx, &grpc.NodeServiceSubscribeRequest{ NodeKey: svc.cfgSvc.GetNodeKey(), }) if err != nil { @@ -176,12 +250,20 @@ func (svc *WorkerService) subscribe() { // Handle messages for { - if svc.stopped { + select { + case <-svc.ctx.Done(): + svc.Debugf("subscription message loop stopped due to context cancellation") return nil + default: } msg, err := stream.Recv() if err != nil { + if svc.ctx.Err() != nil { + // Context was cancelled, this is expected + svc.Debugf("stream receive cancelled due to context") + return nil + } if client.GetGrpcClient().IsClosed() { svc.Errorf("connection to master is closed: %v", err) return err @@ -198,19 +280,27 @@ func (svc *WorkerService) subscribe() { } // Execute with backoff - err := backoff.Retry(operation, b) + err := backoff.Retry(operation, backoff.WithContext(b, svc.ctx)) if err != nil { + if svc.ctx.Err() != nil { + // Context was cancelled, exit gracefully + svc.Debugf("subscription retry cancelled due to context") + return + } svc.Errorf("subscription failed after max retries: %v", err) - return } - // Wait before attempting to reconnect - time.Sleep(time.Second) + // Wait before attempting to reconnect, but respect context cancellation + select { + case <-svc.ctx.Done(): + return + case <-time.After(time.Second): + } } } func (svc *WorkerService) sendHeartbeat() { - ctx, cancel := context.WithTimeout(context.Background(), svc.heartbeatInterval) + ctx, cancel := context.WithTimeout(svc.ctx, svc.heartbeatInterval) defer cancel() nodeClient, err := client.GetGrpcClient().GetNodeClient() if err != nil { @@ -225,34 +315,12 @@ func (svc *WorkerService) sendHeartbeat() { } } -func (svc *WorkerService) startHealthServer() { - // handlers - app := gin.New() - app.GET("/health", controllers.GetHealthFn(func() bool { - return svc.isReady && !svc.stopped && client.GetGrpcClient() != nil && !client.GetGrpcClient().IsClosed() - })) - - // listen - ln, err := net.Listen("tcp", utils.GetServerAddress()) - if err != nil { - panic(err) - } - - // serve - if err := http.Serve(ln, app); err != nil { - if !errors.Is(err, http.ErrServerClosed) { - svc.Errorf("run server error: %v", err) - } else { - svc.Info("server graceful down") - } - } -} - func newWorkerService() *WorkerService { return &WorkerService{ heartbeatInterval: 15 * time.Second, cfgSvc: nodeconfig.GetNodeConfigService(), handlerSvc: handler.GetTaskHandlerService(), + healthSvc: GetHealthService(), isReady: false, Logger: utils.NewLogger("WorkerService"), } diff --git a/docker/bin/health-check.sh b/docker/bin/health-check.sh new file mode 100644 index 00000000..bb5d61c5 --- /dev/null +++ b/docker/bin/health-check.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +# Universal health check script for both Crawlab master and worker nodes +# This script checks if the node health file exists and indicates healthy status + +HEALTH_FILE="/tmp/crawlab_health" +MAX_AGE_SECONDS=60 + +# Check if health file exists +if [ ! -f "$HEALTH_FILE" ]; then + echo "Health file not found at $HEALTH_FILE" + exit 1 +fi + +# Check if file is recent (modified within last 60 seconds) +if [ $(find "$HEALTH_FILE" -mmin -1 | wc -l) -eq 0 ]; then + echo "Health file is too old (last modified more than 1 minute ago)" + cat "$HEALTH_FILE" 2>/dev/null || echo "Could not read health file" + exit 1 +fi + +# Check health status from file content +HEALTHY=$(grep '"healthy": true' "$HEALTH_FILE" 2>/dev/null) +if [ -z "$HEALTHY" ]; then + echo "Node is not healthy according to health file:" + cat "$HEALTH_FILE" 2>/dev/null || echo "Could not read health file" + exit 1 +fi + +# Get node type for logging +NODE_TYPE=$(grep '"node_type"' "$HEALTH_FILE" 2>/dev/null | cut -d'"' -f4) +echo "Crawlab $NODE_TYPE node is healthy" +exit 0