fix: unable to start api

This commit is contained in:
Marvin Zhang
2024-11-22 21:19:17 +08:00
parent 7a322ae6c8
commit 858e5c2b89
6 changed files with 78 additions and 27 deletions

View File

@@ -54,7 +54,7 @@ func (svc *MasterService) Start() {
go common.InitIndexes()
// start monitoring worker nodes
go svc.Monitor()
go svc.startMonitoring()
// start task handler
go svc.taskHandlerSvc.Start()
@@ -82,7 +82,7 @@ func (svc *MasterService) Stop() {
log.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey())
}
func (svc *MasterService) Monitor() {
func (svc *MasterService) startMonitoring() {
log.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey())
// ticker

View File

@@ -2,7 +2,10 @@ package service
import (
"context"
"fmt"
"errors"
"github.com/crawlab-team/crawlab/core/controllers"
"github.com/gin-gonic/gin"
"net"
"net/http"
"sync"
"time"
@@ -33,11 +36,10 @@ type WorkerService struct {
heartbeatInterval time.Duration
// internals
stopped bool
n *models.Node
s grpc.NodeService_SubscribeClient
isReady bool
healthPort int
stopped bool
n *models.Node
s grpc.NodeService_SubscribeClient
isReady bool
}
func (svc *WorkerService) Start() {
@@ -46,15 +48,15 @@ func (svc *WorkerService) Start() {
panic(err)
}
// start health check server
svc.startHealthServer()
// register to master
svc.register()
// mark as ready after registration
svc.isReady = true
// start health check server
go svc.startHealthServer()
// subscribe
go svc.subscribe()
@@ -194,23 +196,26 @@ func (svc *WorkerService) sendHeartbeat() {
}
func (svc *WorkerService) startHealthServer() {
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
if svc.isReady && !svc.stopped && svc.client != nil && !svc.client.IsClosed() {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
return
}
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("not ready"))
})
// handlers
app := gin.New()
app.GET("/health", controllers.GetHealthFn(func() bool {
return svc.isReady && !svc.stopped && svc.client != nil && !svc.client.IsClosed()
}))
addr := fmt.Sprintf(":%d", svc.healthPort)
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Errorf("health check server failed: %v", err)
// listen
ln, err := net.Listen("tcp", utils.GetServerAddress())
if err != nil {
panic(err)
}
// serve
if err := http.Serve(ln, app); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
log.Error("run server error:" + err.Error())
} else {
log.Info("server graceful down")
}
}()
log.Infof("health check server started on port %d", svc.healthPort)
}
}
func newWorkerService() *WorkerService {
@@ -220,7 +225,6 @@ func newWorkerService() *WorkerService {
client: client.GetGrpcClient(),
handlerSvc: handler.GetTaskHandlerService(),
isReady: false,
healthPort: 8000, // You can make this configurable through environment variables
}
}