feat: added health check for worker service

This commit is contained in:
Marvin Zhang
2024-11-19 18:32:50 +08:00
parent 6df52e7401
commit 4ed7fcffc9

View File

@@ -2,6 +2,8 @@ package service
import (
"context"
"fmt"
"net/http"
"sync"
"time"
@@ -31,9 +33,11 @@ type WorkerService struct {
heartbeatInterval time.Duration
// internals
stopped bool
n *models.Node
s grpc.NodeService_SubscribeClient
stopped bool
n *models.Node
s grpc.NodeService_SubscribeClient
isReady bool
healthPort int
}
func (svc *WorkerService) Start() {
@@ -42,9 +46,15 @@ func (svc *WorkerService) Start() {
panic(err)
}
// start health check server
svc.startHealthServer()
// register to master
svc.register()
// mark as ready after registration
svc.isReady = true
// subscribe
go svc.subscribe()
@@ -183,12 +193,34 @@ func (svc *WorkerService) sendHeartbeat() {
}
}
func (svc *WorkerService) startHealthServer() {
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
if svc.isReady && !svc.stopped && svc.client != nil && !svc.client.IsClosed() {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
return
}
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("not ready"))
})
addr := fmt.Sprintf(":%d", svc.healthPort)
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Errorf("health check server failed: %v", err)
}
}()
log.Infof("health check server started on port %d", svc.healthPort)
}
func newWorkerService() *WorkerService {
return &WorkerService{
heartbeatInterval: 15 * time.Second,
cfgSvc: nodeconfig.GetNodeConfigService(),
client: client.GetGrpcClient(),
handlerSvc: handler.GetTaskHandlerService(),
isReady: false,
healthPort: 8000, // You can make this configurable through environment variables
}
}