From e064889795f1d7775a154c1d0766bce7e907b30f Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Mon, 23 Dec 2024 21:45:38 +0800 Subject: [PATCH] refactor: replace apex/log with structured logger in master and worker services - Removed direct usage of apex/log in favor of a structured logger interface for improved logging consistency and context. - Updated logging calls in MasterService and WorkerService to utilize the new logger, enhancing error tracking and service monitoring. - Added logger initialization in both services to ensure proper logging setup. - Improved error handling and logging messages for better clarity during service operations. --- core/node/service/master_service.go | 41 ++++++++++--------- core/node/service/worker_service.go | 62 ++++++++++++++++++----------- 2 files changed, 60 insertions(+), 43 deletions(-) diff --git a/core/node/service/master_service.go b/core/node/service/master_service.go index 0aab95f2..45449002 100644 --- a/core/node/service/master_service.go +++ b/core/node/service/master_service.go @@ -2,7 +2,6 @@ package service import ( "errors" - "github.com/apex/log" "github.com/cenkalti/backoff/v4" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/grpc/server" @@ -37,6 +36,9 @@ type MasterService struct { // settings monitorInterval time.Duration + + // internals + logger interfaces.Logger } func (svc *MasterService) Start() { @@ -79,11 +81,11 @@ func (svc *MasterService) Wait() { func (svc *MasterService) Stop() { _ = svc.server.Stop() svc.taskHandlerSvc.Stop() - log.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey()) + svc.logger.Infof("master[%s] service has stopped", svc.cfgSvc.GetNodeKey()) } func (svc *MasterService) startMonitoring() { - log.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey()) + svc.logger.Infof("master[%s] monitoring started", svc.cfgSvc.GetNodeKey()) // ticker ticker := time.NewTicker(svc.monitorInterval) @@ -92,7 +94,7 @@ func (svc *MasterService) startMonitoring() { // monitor err := svc.monitor() if err != nil { - log.Errorf("master[%s] monitor error: %v", svc.cfgSvc.GetNodeKey(), err) + svc.logger.Errorf("master[%s] monitor error: %v", svc.cfgSvc.GetNodeKey(), err) } // wait @@ -107,7 +109,7 @@ func (svc *MasterService) Register() (err error) { node, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": nodeKey}, nil) if err != nil && err.Error() == mongo2.ErrNoDocuments.Error() { // not exists - log.Infof("master[%s] does not exist in db", nodeKey) + svc.logger.Infof("master[%s] does not exist in db", nodeKey) node := models.Node{ Key: nodeKey, Name: nodeName, @@ -120,23 +122,25 @@ func (svc *MasterService) Register() (err error) { } node.SetCreated(primitive.NilObjectID) node.SetUpdated(primitive.NilObjectID) - id, err := service.NewModelService[models.Node]().InsertOne(node) + _, err := service.NewModelService[models.Node]().InsertOne(node) if err != nil { + svc.logger.Errorf("save master[%s] to db error: %v", nodeKey, err) return err } - log.Infof("added master[%s] in db. id: %s", nodeKey, id.Hex()) + svc.logger.Infof("added master[%s] to db", nodeKey) return nil } else if err == nil { // exists - log.Infof("master[%s] exists in db", nodeKey) + svc.logger.Infof("master[%s] exists in db", nodeKey) node.Status = constants.NodeStatusOnline node.Active = true node.ActiveAt = time.Now() err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node) if err != nil { + svc.logger.Errorf("update master[%s] in db error: %v", nodeKey, err) return err } - log.Infof("updated master[%s] in db. id: %s", nodeKey, node.Id.Hex()) + svc.logger.Infof("updated master[%s] in db", nodeKey) return nil } else { // error @@ -147,7 +151,7 @@ func (svc *MasterService) Register() (err error) { func (svc *MasterService) monitor() (err error) { // update master node status in db if err := svc.updateMasterNodeStatus(); err != nil { - if err.Error() == mongo2.ErrNoDocuments.Error() { + if errors.Is(err, mongo2.ErrNoDocuments) { return nil } return err @@ -181,10 +185,7 @@ func (svc *MasterService) monitor() (err error) { } // update node available runners - if err := svc.updateNodeRunners(n); err != nil { - trace.PrintError(err) - return - } + _ = svc.updateNodeRunners(n) }(&n) } @@ -203,7 +204,8 @@ func (svc *MasterService) getAllWorkerNodes() (nodes []models.Node, err error) { if errors.Is(err, mongo2.ErrNoDocuments) { return nil, nil } - return nil, trace.TraceError(err) + svc.logger.Errorf("get all worker nodes error: %v", err) + return nil, err } return nodes, nil } @@ -255,14 +257,14 @@ func (svc *MasterService) subscribeNode(n *models.Node) (ok bool) { func (svc *MasterService) pingNodeClient(n *models.Node) (ok bool) { stream, ok := svc.server.NodeSvr.GetSubscribeStream(n.Id) if !ok { - log.Errorf("cannot get worker node client[%s]", n.Key) + svc.logger.Errorf("cannot get worker node client[%s]", n.Key) return false } err := stream.Send(&grpc.NodeServiceSubscribeResponse{ Code: grpc.NodeServiceSubscribeCode_PING, }) if err != nil { - log.Errorf("failed to ping worker node client[%s]: %v", n.Key, err) + svc.logger.Errorf("failed to ping worker node client[%s]: %v", n.Key, err) return false } return true @@ -275,13 +277,13 @@ func (svc *MasterService) updateNodeRunners(node *models.Node) (err error) { } runningTasksCount, err := service.NewModelService[models.Task]().Count(query) if err != nil { - log.Errorf("failed to count running tasks for node[%s]: %v", node.Key, err) + svc.logger.Errorf("failed to count running tasks for node[%s]: %v", node.Key, err) return err } node.CurrentRunners = runningTasksCount err = service.NewModelService[models.Node]().ReplaceById(node.Id, *node) if err != nil { - log.Errorf("failed to update node runners for node[%s]: %v", node.Key, err) + svc.logger.Errorf("failed to update node runners for node[%s]: %v", node.Key, err) return err } return nil @@ -303,6 +305,7 @@ func newMasterService() *MasterService { taskHandlerSvc: handler.GetTaskHandlerService(), scheduleSvc: schedule.GetScheduleService(), systemSvc: system.GetSystemService(), + logger: utils.NewServiceLogger("MasterService"), } } diff --git a/core/node/service/worker_service.go b/core/node/service/worker_service.go index dfa59caf..31332226 100644 --- a/core/node/service/worker_service.go +++ b/core/node/service/worker_service.go @@ -3,6 +3,7 @@ package service import ( "context" "errors" + "fmt" "github.com/crawlab-team/crawlab/core/controllers" "github.com/gin-gonic/gin" "net" @@ -12,7 +13,6 @@ import ( "github.com/crawlab-team/crawlab/core/models/models" - "github.com/apex/log" "github.com/cenkalti/backoff/v4" "github.com/crawlab-team/crawlab/core/grpc/client" "github.com/crawlab-team/crawlab/core/interfaces" @@ -38,6 +38,7 @@ type WorkerService struct { n *models.Node s grpc.NodeService_SubscribeClient isReady bool + logger interfaces.Logger } func (svc *WorkerService) Start() { @@ -77,28 +78,40 @@ func (svc *WorkerService) Stop() { svc.stopped = true _ = client.GetGrpcClient().Stop() svc.handlerSvc.Stop() - log.Infof("worker[%s] service has stopped", svc.cfgSvc.GetNodeKey()) + svc.logger.Infof("worker[%s] service has stopped", svc.cfgSvc.GetNodeKey()) } func (svc *WorkerService) register() { - ctx, cancel := client.GetGrpcClient().Context() - defer cancel() - _, err := client.GetGrpcClient().NodeClient.Register(ctx, &grpc.NodeServiceRegisterRequest{ - NodeKey: svc.cfgSvc.GetNodeKey(), - NodeName: svc.cfgSvc.GetNodeName(), - MaxRunners: int32(svc.cfgSvc.GetMaxRunners()), - }) + op := func() (err error) { + ctx, cancel := client.GetGrpcClient().Context() + defer cancel() + _, err = client.GetGrpcClient().NodeClient.Register(ctx, &grpc.NodeServiceRegisterRequest{ + NodeKey: svc.cfgSvc.GetNodeKey(), + NodeName: svc.cfgSvc.GetNodeName(), + MaxRunners: int32(svc.cfgSvc.GetMaxRunners()), + }) + if err != nil { + err = fmt.Errorf("failed to register worker[%s]: %v", svc.cfgSvc.GetNodeKey(), err) + return err + } + svc.n, err = client2.NewModelService[models.Node]().GetOne(bson.M{"key": svc.GetConfigService().GetNodeKey()}, nil) + if err != nil { + err = fmt.Errorf("failed to get node: %v", err) + return err + } + svc.logger.Infof("worker[%s] registered to master. id: %s", svc.GetConfigService().GetNodeKey(), svc.n.Id.Hex()) + return nil + } + b := backoff.NewExponentialBackOff() + n := func(err error, duration time.Duration) { + svc.logger.Errorf("register worker[%s] error: %v", svc.cfgSvc.GetNodeKey(), err) + svc.logger.Infof("retry in %.1f seconds", duration.Seconds()) + } + err := backoff.RetryNotify(op, b, n) if err != nil { - log.Fatalf("failed to register worker[%s] to master: %v", svc.cfgSvc.GetNodeKey(), err) + svc.logger.Fatalf("failed to register worker[%s]: %v", svc.cfgSvc.GetNodeKey(), err) panic(err) } - svc.n, err = client2.NewModelService[models.Node]().GetOne(bson.M{"key": svc.GetConfigService().GetNodeKey()}, nil) - if err != nil { - log.Fatalf("failed to get node: %v", err) - panic(err) - } - log.Infof("worker[%s] registered to master. id: %s", svc.GetConfigService().GetNodeKey(), svc.n.Id.Hex()) - return } func (svc *WorkerService) reportStatus() { @@ -141,7 +154,7 @@ func (svc *WorkerService) subscribe() { NodeKey: svc.cfgSvc.GetNodeKey(), }) if err != nil { - log.Errorf("failed to subscribe to master: %v", err) + svc.logger.Errorf("failed to subscribe to master: %v", err) return err } @@ -154,10 +167,10 @@ func (svc *WorkerService) subscribe() { msg, err := stream.Recv() if err != nil { if client.GetGrpcClient().IsClosed() { - log.Errorf("connection to master is closed: %v", err) + svc.logger.Errorf("connection to master is closed: %v", err) return err } - log.Errorf("failed to receive message from master: %v", err) + svc.logger.Errorf("failed to receive message from master: %v", err) return err } @@ -171,7 +184,7 @@ func (svc *WorkerService) subscribe() { // Execute with backoff err := backoff.Retry(operation, b) if err != nil { - log.Errorf("subscription failed after max retries: %v", err) + svc.logger.Errorf("subscription failed after max retries: %v", err) return } @@ -187,7 +200,7 @@ func (svc *WorkerService) sendHeartbeat() { NodeKey: svc.cfgSvc.GetNodeKey(), }) if err != nil { - log.Errorf("failed to send heartbeat to master: %v", err) + svc.logger.Errorf("failed to send heartbeat to master: %v", err) } } @@ -207,9 +220,9 @@ func (svc *WorkerService) startHealthServer() { // serve if err := http.Serve(ln, app); err != nil { if !errors.Is(err, http.ErrServerClosed) { - log.Error("run server error:" + err.Error()) + svc.logger.Errorf("run server error: %v", err) } else { - log.Info("server graceful down") + svc.logger.Info("server graceful down") } } } @@ -220,6 +233,7 @@ func newWorkerService() *WorkerService { cfgSvc: nodeconfig.GetNodeConfigService(), handlerSvc: handler.GetTaskHandlerService(), isReady: false, + logger: utils.NewServiceLogger("WorkerService"), } }