From dc9f62dfd09a77cf215c2af01a858dbb389fb215 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Tue, 19 Nov 2024 18:32:50 +0800 Subject: [PATCH] feat: added health check for worker service --- core/node/service/worker_service.go | 38 ++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/core/node/service/worker_service.go b/core/node/service/worker_service.go index 73a48bc5..8cd3ce7c 100644 --- a/core/node/service/worker_service.go +++ b/core/node/service/worker_service.go @@ -2,6 +2,8 @@ package service import ( "context" + "fmt" + "net/http" "sync" "time" @@ -31,9 +33,11 @@ type WorkerService struct { heartbeatInterval time.Duration // internals - stopped bool - n *models.Node - s grpc.NodeService_SubscribeClient + stopped bool + n *models.Node + s grpc.NodeService_SubscribeClient + isReady bool + healthPort int } func (svc *WorkerService) Start() { @@ -42,9 +46,15 @@ func (svc *WorkerService) Start() { panic(err) } + // start health check server + svc.startHealthServer() + // register to master svc.register() + // mark as ready after registration + svc.isReady = true + // subscribe go svc.subscribe() @@ -183,12 +193,34 @@ func (svc *WorkerService) sendHeartbeat() { } } +func (svc *WorkerService) startHealthServer() { + http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + if svc.isReady && !svc.stopped && svc.client != nil && !svc.client.IsClosed() { + w.WriteHeader(http.StatusOK) + w.Write([]byte("ok")) + return + } + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte("not ready")) + }) + + addr := fmt.Sprintf(":%d", svc.healthPort) + go func() { + if err := http.ListenAndServe(addr, nil); err != nil { + log.Errorf("health check server failed: %v", err) + } + }() + log.Infof("health check server started on port %d", svc.healthPort) +} + func newWorkerService() *WorkerService { return &WorkerService{ heartbeatInterval: 15 * time.Second, cfgSvc: nodeconfig.GetNodeConfigService(), client: client.GetGrpcClient(), handlerSvc: handler.GetTaskHandlerService(), + isReady: false, + healthPort: 8000, // You can make this configurable through environment variables } }