diff --git a/.github/workflows/docker-crawlab.yml b/.github/workflows/docker-crawlab.yml index e5ffb70e..72c07018 100644 --- a/.github/workflows/docker-crawlab.yml +++ b/.github/workflows/docker-crawlab.yml @@ -314,6 +314,11 @@ jobs: env: CRAWLAB_NODE_MASTER: N CRAWLAB_MASTER_HOST: master + options: >- + --health-cmd "curl -f http://localhost:8000/health || exit 1" + --health-interval 30s + --health-timeout 10s + --health-retries 5 steps: - name: Checkout repository uses: actions/checkout@v4 diff --git a/core/controllers/health.go b/core/controllers/health.go new file mode 100644 index 00000000..8e19adb1 --- /dev/null +++ b/core/controllers/health.go @@ -0,0 +1,18 @@ +package controllers + +import ( + "github.com/gin-gonic/gin" + "net/http" +) + +func GetHealthFn(healthFn func() bool) func(c *gin.Context) { + return func(c *gin.Context) { + if healthFn() { + c.Writer.Write([]byte("ok")) + c.AbortWithStatus(http.StatusOK) + return + } + c.Writer.Write([]byte("not ready")) + c.AbortWithStatus(http.StatusServiceUnavailable) + } +} diff --git a/core/controllers/router.go b/core/controllers/router.go index 05d89d93..f02299f5 100644 --- a/core/controllers/router.go +++ b/core/controllers/router.go @@ -381,6 +381,13 @@ func InitRoutes(app *gin.Engine) (err error) { }) // Register public routes that don't require authentication + RegisterActions(groups.AnonymousGroup, "/health", []Action{ + { + Path: "", + Method: http.MethodGet, + HandlerFunc: GetHealthFn(func() bool { return true }), + }, + }) RegisterActions(groups.AnonymousGroup, "/system-info", []Action{ { Path: "", diff --git a/core/node/service/master_service.go b/core/node/service/master_service.go index 796354d6..f0ebe37f 100644 --- a/core/node/service/master_service.go +++ b/core/node/service/master_service.go @@ -54,7 +54,7 @@ func (svc *MasterService) Start() { go common.InitIndexes() // start monitoring worker nodes - go svc.Monitor() + go svc.startMonitoring() // start task handler go svc.taskHandlerSvc.Start() @@ -82,7 +82,7 @@ func (svc *MasterService) Stop() { log.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey()) } -func (svc *MasterService) Monitor() { +func (svc *MasterService) startMonitoring() { log.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey()) // ticker diff --git a/core/node/service/worker_service.go b/core/node/service/worker_service.go index 8cd3ce7c..7b5fe933 100644 --- a/core/node/service/worker_service.go +++ b/core/node/service/worker_service.go @@ -2,7 +2,10 @@ package service import ( "context" - "fmt" + "errors" + "github.com/crawlab-team/crawlab/core/controllers" + "github.com/gin-gonic/gin" + "net" "net/http" "sync" "time" @@ -33,11 +36,10 @@ type WorkerService struct { heartbeatInterval time.Duration // internals - stopped bool - n *models.Node - s grpc.NodeService_SubscribeClient - isReady bool - healthPort int + stopped bool + n *models.Node + s grpc.NodeService_SubscribeClient + isReady bool } func (svc *WorkerService) Start() { @@ -46,15 +48,15 @@ func (svc *WorkerService) Start() { panic(err) } - // start health check server - svc.startHealthServer() - // register to master svc.register() // mark as ready after registration svc.isReady = true + // start health check server + go svc.startHealthServer() + // subscribe go svc.subscribe() @@ -194,23 +196,26 @@ func (svc *WorkerService) sendHeartbeat() { } func (svc *WorkerService) startHealthServer() { - http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { - if svc.isReady && !svc.stopped && svc.client != nil && !svc.client.IsClosed() { - w.WriteHeader(http.StatusOK) - w.Write([]byte("ok")) - return - } - w.WriteHeader(http.StatusServiceUnavailable) - w.Write([]byte("not ready")) - }) + // handlers + app := gin.New() + app.GET("/health", controllers.GetHealthFn(func() bool { + return svc.isReady && !svc.stopped && svc.client != nil && !svc.client.IsClosed() + })) - addr := fmt.Sprintf(":%d", svc.healthPort) - go func() { - if err := http.ListenAndServe(addr, nil); err != nil { - log.Errorf("health check server failed: %v", err) + // listen + ln, err := net.Listen("tcp", utils.GetServerAddress()) + if err != nil { + panic(err) + } + + // serve + if err := http.Serve(ln, app); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + log.Error("run server error:" + err.Error()) + } else { + log.Info("server graceful down") } - }() - log.Infof("health check server started on port %d", svc.healthPort) + } } func newWorkerService() *WorkerService { @@ -220,7 +225,6 @@ func newWorkerService() *WorkerService { client: client.GetGrpcClient(), handlerSvc: handler.GetTaskHandlerService(), isReady: false, - healthPort: 8000, // You can make this configurable through environment variables } } diff --git a/core/utils/health.go b/core/utils/health.go new file mode 100644 index 00000000..ce264e41 --- /dev/null +++ b/core/utils/health.go @@ -0,0 +1,17 @@ +package utils + +import ( + "fmt" + "github.com/apex/log" + "net/http" +) + +func HandleHealthFn(healthFn func() bool, healthPort int) { + addr := fmt.Sprintf(":%d", healthPort) + go func() { + if err := http.ListenAndServe(addr, nil); err != nil { + log.Errorf("health check server failed: %v", err) + } + }() + log.Infof("health check server started on port %d", healthPort) +}