mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
refactor: replace apex/log with structured logger in master and worker services
- Removed direct usage of apex/log in favor of a structured logger interface for improved logging consistency and context. - Updated logging calls in MasterService and WorkerService to utilize the new logger, enhancing error tracking and service monitoring. - Added logger initialization in both services to ensure proper logging setup. - Improved error handling and logging messages for better clarity during service operations.
This commit is contained in:
@@ -2,7 +2,6 @@ package service
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/apex/log"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/crawlab-team/crawlab/core/constants"
|
||||
"github.com/crawlab-team/crawlab/core/grpc/server"
|
||||
@@ -37,6 +36,9 @@ type MasterService struct {
|
||||
|
||||
// settings
|
||||
monitorInterval time.Duration
|
||||
|
||||
// internals
|
||||
logger interfaces.Logger
|
||||
}
|
||||
|
||||
func (svc *MasterService) Start() {
|
||||
@@ -79,11 +81,11 @@ func (svc *MasterService) Wait() {
|
||||
func (svc *MasterService) Stop() {
|
||||
_ = svc.server.Stop()
|
||||
svc.taskHandlerSvc.Stop()
|
||||
log.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey())
|
||||
svc.logger.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey())
|
||||
}
|
||||
|
||||
func (svc *MasterService) startMonitoring() {
|
||||
log.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey())
|
||||
svc.logger.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey())
|
||||
|
||||
// ticker
|
||||
ticker := time.NewTicker(svc.monitorInterval)
|
||||
@@ -92,7 +94,7 @@ func (svc *MasterService) startMonitoring() {
|
||||
// monitor
|
||||
err := svc.monitor()
|
||||
if err != nil {
|
||||
log.Errorf("master[%s] monitor error: %v", svc.cfgSvc.GetNodeKey(), err)
|
||||
svc.logger.Errorf("master[%s] monitor error: %v", svc.cfgSvc.GetNodeKey(), err)
|
||||
}
|
||||
|
||||
// wait
|
||||
@@ -107,7 +109,7 @@ func (svc *MasterService) Register() (err error) {
|
||||
node, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil)
|
||||
if err != nil && err.Error() == mongo2.ErrNoDocuments.Error() {
|
||||
// not exists
|
||||
log.Infof("master[%s] does not exist in db", nodeKey)
|
||||
svc.logger.Infof("master[%s] does not exist in db", nodeKey)
|
||||
node := models.Node{
|
||||
Key: nodeKey,
|
||||
Name: nodeName,
|
||||
@@ -120,23 +122,25 @@ func (svc *MasterService) Register() (err error) {
|
||||
}
|
||||
node.SetCreated(primitive.NilObjectID)
|
||||
node.SetUpdated(primitive.NilObjectID)
|
||||
id, err := service.NewModelService[models.Node]().InsertOne(node)
|
||||
_, err := service.NewModelService[models.Node]().InsertOne(node)
|
||||
if err != nil {
|
||||
svc.logger.Errorf("save master[%s] to db error: %v", nodeKey, err)
|
||||
return err
|
||||
}
|
||||
log.Infof("added master[%s] in db. id: %s", nodeKey, id.Hex())
|
||||
svc.logger.Infof("added master[%s] to db", nodeKey)
|
||||
return nil
|
||||
} else if err == nil {
|
||||
// exists
|
||||
log.Infof("master[%s] exists in db", nodeKey)
|
||||
svc.logger.Infof("master[%s] exists in db", nodeKey)
|
||||
node.Status = constants.NodeStatusOnline
|
||||
node.Active = true
|
||||
node.ActiveAt = time.Now()
|
||||
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
|
||||
if err != nil {
|
||||
svc.logger.Errorf("update master[%s] in db error: %v", nodeKey, err)
|
||||
return err
|
||||
}
|
||||
log.Infof("updated master[%s] in db. id: %s", nodeKey, node.Id.Hex())
|
||||
svc.logger.Infof("updated master[%s] in db", nodeKey)
|
||||
return nil
|
||||
} else {
|
||||
// error
|
||||
@@ -147,7 +151,7 @@ func (svc *MasterService) Register() (err error) {
|
||||
func (svc *MasterService) monitor() (err error) {
|
||||
// update master node status in db
|
||||
if err := svc.updateMasterNodeStatus(); err != nil {
|
||||
if err.Error() == mongo2.ErrNoDocuments.Error() {
|
||||
if errors.Is(err, mongo2.ErrNoDocuments) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
@@ -181,10 +185,7 @@ func (svc *MasterService) monitor() (err error) {
|
||||
}
|
||||
|
||||
// update node available runners
|
||||
if err := svc.updateNodeRunners(n); err != nil {
|
||||
trace.PrintError(err)
|
||||
return
|
||||
}
|
||||
_ = svc.updateNodeRunners(n)
|
||||
}(&n)
|
||||
}
|
||||
|
||||
@@ -203,7 +204,8 @@ func (svc *MasterService) getAllWorkerNodes() (nodes []models.Node, err error) {
|
||||
if errors.Is(err, mongo2.ErrNoDocuments) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, trace.TraceError(err)
|
||||
svc.logger.Errorf("get all worker nodes error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
||||
@@ -255,14 +257,14 @@ func (svc *MasterService) subscribeNode(n *models.Node) (ok bool) {
|
||||
func (svc *MasterService) pingNodeClient(n *models.Node) (ok bool) {
|
||||
stream, ok := svc.server.NodeSvr.GetSubscribeStream(n.Id)
|
||||
if !ok {
|
||||
log.Errorf("cannot get worker node client[%s]", n.Key)
|
||||
svc.logger.Errorf("cannot get worker node client[%s]", n.Key)
|
||||
return false
|
||||
}
|
||||
err := stream.Send(&grpc.NodeServiceSubscribeResponse{
|
||||
Code: grpc.NodeServiceSubscribeCode_PING,
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("failed to ping worker node client[%s]: %v", n.Key, err)
|
||||
svc.logger.Errorf("failed to ping worker node client[%s]: %v", n.Key, err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -275,13 +277,13 @@ func (svc *MasterService) updateNodeRunners(node *models.Node) (err error) {
|
||||
}
|
||||
runningTasksCount, err := service.NewModelService[models.Task]().Count(query)
|
||||
if err != nil {
|
||||
log.Errorf("failed to count running tasks for node[%s]: %v", node.Key, err)
|
||||
svc.logger.Errorf("failed to count running tasks for node[%s]: %v", node.Key, err)
|
||||
return err
|
||||
}
|
||||
node.CurrentRunners = runningTasksCount
|
||||
err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node)
|
||||
if err != nil {
|
||||
log.Errorf("failed to update node runners for node[%s]: %v", node.Key, err)
|
||||
svc.logger.Errorf("failed to update node runners for node[%s]: %v", node.Key, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -303,6 +305,7 @@ func newMasterService() *MasterService {
|
||||
taskHandlerSvc: handler.GetTaskHandlerService(),
|
||||
scheduleSvc: schedule.GetScheduleService(),
|
||||
systemSvc: system.GetSystemService(),
|
||||
logger: utils.NewServiceLogger("MasterService"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package service
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/crawlab-team/crawlab/core/controllers"
|
||||
"github.com/gin-gonic/gin"
|
||||
"net"
|
||||
@@ -12,7 +13,6 @@ import (
|
||||
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
|
||||
"github.com/apex/log"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/crawlab-team/crawlab/core/grpc/client"
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
@@ -38,6 +38,7 @@ type WorkerService struct {
|
||||
n *models.Node
|
||||
s grpc.NodeService_SubscribeClient
|
||||
isReady bool
|
||||
logger interfaces.Logger
|
||||
}
|
||||
|
||||
func (svc *WorkerService) Start() {
|
||||
@@ -77,28 +78,40 @@ func (svc *WorkerService) Stop() {
|
||||
svc.stopped = true
|
||||
_ = client.GetGrpcClient().Stop()
|
||||
svc.handlerSvc.Stop()
|
||||
log.Infof("worker[%s] service has stopped", svc.cfgSvc.GetNodeKey())
|
||||
svc.logger.Infof("worker[%s] service has stopped", svc.cfgSvc.GetNodeKey())
|
||||
}
|
||||
|
||||
func (svc *WorkerService) register() {
|
||||
ctx, cancel := client.GetGrpcClient().Context()
|
||||
defer cancel()
|
||||
_, err := client.GetGrpcClient().NodeClient.Register(ctx, &grpc.NodeServiceRegisterRequest{
|
||||
NodeKey: svc.cfgSvc.GetNodeKey(),
|
||||
NodeName: svc.cfgSvc.GetNodeName(),
|
||||
MaxRunners: int32(svc.cfgSvc.GetMaxRunners()),
|
||||
})
|
||||
op := func() (err error) {
|
||||
ctx, cancel := client.GetGrpcClient().Context()
|
||||
defer cancel()
|
||||
_, err = client.GetGrpcClient().NodeClient.Register(ctx, &grpc.NodeServiceRegisterRequest{
|
||||
NodeKey: svc.cfgSvc.GetNodeKey(),
|
||||
NodeName: svc.cfgSvc.GetNodeName(),
|
||||
MaxRunners: int32(svc.cfgSvc.GetMaxRunners()),
|
||||
})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to register worker[%s]: %v", svc.cfgSvc.GetNodeKey(), err)
|
||||
return err
|
||||
}
|
||||
svc.n, err = client2.NewModelService[models.Node]().GetOne(bson.M{"key": svc.GetConfigService().GetNodeKey()}, nil)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to get node: %v", err)
|
||||
return err
|
||||
}
|
||||
svc.logger.Infof("worker[%s] registered to master. id: %s", svc.GetConfigService().GetNodeKey(), svc.n.Id.Hex())
|
||||
return nil
|
||||
}
|
||||
b := backoff.NewExponentialBackOff()
|
||||
n := func(err error, duration time.Duration) {
|
||||
svc.logger.Errorf("register worker[%s] error: %v", svc.cfgSvc.GetNodeKey(), err)
|
||||
svc.logger.Infof("retry in %.1f seconds", duration.Seconds())
|
||||
}
|
||||
err := backoff.RetryNotify(op, b, n)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to register worker[%s] to master: %v", svc.cfgSvc.GetNodeKey(), err)
|
||||
svc.logger.Fatalf("failed to register worker[%s]: %v", svc.cfgSvc.GetNodeKey(), err)
|
||||
panic(err)
|
||||
}
|
||||
svc.n, err = client2.NewModelService[models.Node]().GetOne(bson.M{"key": svc.GetConfigService().GetNodeKey()}, nil)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to get node: %v", err)
|
||||
panic(err)
|
||||
}
|
||||
log.Infof("worker[%s] registered to master. id: %s", svc.GetConfigService().GetNodeKey(), svc.n.Id.Hex())
|
||||
return
|
||||
}
|
||||
|
||||
func (svc *WorkerService) reportStatus() {
|
||||
@@ -141,7 +154,7 @@ func (svc *WorkerService) subscribe() {
|
||||
NodeKey: svc.cfgSvc.GetNodeKey(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("failed to subscribe to master: %v", err)
|
||||
svc.logger.Errorf("failed to subscribe to master: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -154,10 +167,10 @@ func (svc *WorkerService) subscribe() {
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
if client.GetGrpcClient().IsClosed() {
|
||||
log.Errorf("connection to master is closed: %v", err)
|
||||
svc.logger.Errorf("connection to master is closed: %v", err)
|
||||
return err
|
||||
}
|
||||
log.Errorf("failed to receive message from master: %v", err)
|
||||
svc.logger.Errorf("failed to receive message from master: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -171,7 +184,7 @@ func (svc *WorkerService) subscribe() {
|
||||
// Execute with backoff
|
||||
err := backoff.Retry(operation, b)
|
||||
if err != nil {
|
||||
log.Errorf("subscription failed after max retries: %v", err)
|
||||
svc.logger.Errorf("subscription failed after max retries: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -187,7 +200,7 @@ func (svc *WorkerService) sendHeartbeat() {
|
||||
NodeKey: svc.cfgSvc.GetNodeKey(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("failed to send heartbeat to master: %v", err)
|
||||
svc.logger.Errorf("failed to send heartbeat to master: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,9 +220,9 @@ func (svc *WorkerService) startHealthServer() {
|
||||
// serve
|
||||
if err := http.Serve(ln, app); err != nil {
|
||||
if !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Error("run server error:" + err.Error())
|
||||
svc.logger.Errorf("run server error: %v", err)
|
||||
} else {
|
||||
log.Info("server graceful down")
|
||||
svc.logger.Info("server graceful down")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -220,6 +233,7 @@ func newWorkerService() *WorkerService {
|
||||
cfgSvc: nodeconfig.GetNodeConfigService(),
|
||||
handlerSvc: handler.GetTaskHandlerService(),
|
||||
isReady: false,
|
||||
logger: utils.NewServiceLogger("WorkerService"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user