fix: unable to start api

This commit is contained in:
Marvin Zhang
2024-11-22 21:19:17 +08:00
parent 5353c50978
commit ed8ab6c514
6 changed files with 78 additions and 27 deletions

View File

@@ -314,6 +314,11 @@ jobs:
env:
CRAWLAB_NODE_MASTER: N
CRAWLAB_MASTER_HOST: master
options: >-
--health-cmd "curl -f http://localhost:8000/health || exit 1"
--health-interval 30s
--health-timeout 10s
--health-retries 5
steps:
- name: Checkout repository
uses: actions/checkout@v4

View File

@@ -0,0 +1,18 @@
package controllers
import (
"github.com/gin-gonic/gin"
"net/http"
)
func GetHealthFn(healthFn func() bool) func(c *gin.Context) {
return func(c *gin.Context) {
if healthFn() {
c.Writer.Write([]byte("ok"))
c.AbortWithStatus(http.StatusOK)
return
}
c.Writer.Write([]byte("not ready"))
c.AbortWithStatus(http.StatusServiceUnavailable)
}
}

View File

@@ -381,6 +381,13 @@ func InitRoutes(app *gin.Engine) (err error) {
})
// Register public routes that don't require authentication
RegisterActions(groups.AnonymousGroup, "/health", []Action{
{
Path: "",
Method: http.MethodGet,
HandlerFunc: GetHealthFn(func() bool { return true }),
},
})
RegisterActions(groups.AnonymousGroup, "/system-info", []Action{
{
Path: "",

View File

@@ -54,7 +54,7 @@ func (svc *MasterService) Start() {
go common.InitIndexes()
// start monitoring worker nodes
go svc.Monitor()
go svc.startMonitoring()
// start task handler
go svc.taskHandlerSvc.Start()
@@ -82,7 +82,7 @@ func (svc *MasterService) Stop() {
log.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey())
}
func (svc *MasterService) Monitor() {
func (svc *MasterService) startMonitoring() {
log.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey())
// ticker

View File

@@ -2,7 +2,10 @@ package service
import (
"context"
"fmt"
"errors"
"github.com/crawlab-team/crawlab/core/controllers"
"github.com/gin-gonic/gin"
"net"
"net/http"
"sync"
"time"
@@ -33,11 +36,10 @@ type WorkerService struct {
heartbeatInterval time.Duration
// internals
stopped bool
n *models.Node
s grpc.NodeService_SubscribeClient
isReady bool
healthPort int
stopped bool
n *models.Node
s grpc.NodeService_SubscribeClient
isReady bool
}
func (svc *WorkerService) Start() {
@@ -46,15 +48,15 @@ func (svc *WorkerService) Start() {
panic(err)
}
// start health check server
svc.startHealthServer()
// register to master
svc.register()
// mark as ready after registration
svc.isReady = true
// start health check server
go svc.startHealthServer()
// subscribe
go svc.subscribe()
@@ -194,23 +196,26 @@ func (svc *WorkerService) sendHeartbeat() {
}
func (svc *WorkerService) startHealthServer() {
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
if svc.isReady && !svc.stopped && svc.client != nil && !svc.client.IsClosed() {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
return
}
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("not ready"))
})
// handlers
app := gin.New()
app.GET("/health", controllers.GetHealthFn(func() bool {
return svc.isReady && !svc.stopped && svc.client != nil && !svc.client.IsClosed()
}))
addr := fmt.Sprintf(":%d", svc.healthPort)
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Errorf("health check server failed: %v", err)
// listen
ln, err := net.Listen("tcp", utils.GetServerAddress())
if err != nil {
panic(err)
}
// serve
if err := http.Serve(ln, app); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
log.Error("run server error:" + err.Error())
} else {
log.Info("server graceful down")
}
}()
log.Infof("health check server started on port %d", svc.healthPort)
}
}
func newWorkerService() *WorkerService {
@@ -220,7 +225,6 @@ func newWorkerService() *WorkerService {
client: client.GetGrpcClient(),
handlerSvc: handler.GetTaskHandlerService(),
isReady: false,
healthPort: 8000, // You can make this configurable through environment variables
}
}

17
core/utils/health.go Normal file
View File

@@ -0,0 +1,17 @@
package utils
import (
"fmt"
"github.com/apex/log"
"net/http"
)
func HandleHealthFn(healthFn func() bool, healthPort int) {
addr := fmt.Sprintf(":%d", healthPort)
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Errorf("health check server failed: %v", err)
}
}()
log.Infof("health check server started on port %d", healthPort)
}