From 3cb74d76f9d87931d4332d656dc45bcc7f717e3e Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Fri, 20 Dec 2024 20:34:04 +0800 Subject: [PATCH] feat: enhance gRPC client functionality and improve logging - Added WaitForReady method to GrpcClient for blocking until the client is ready. - Updated WorkerService to utilize WaitForReady for ensuring gRPC client readiness before starting. - Refactored ModelService to consistently use GetGrpcClient for context management. - Changed logging level for received metrics in MetricServiceServer from Info to Debug. - Modified error handling in HandleError to conditionally print errors based on the environment. - Cleaned up unused GrpcClient references in various services, improving code clarity. --- core/grpc/client/client.go | 46 ++++++++++++++- core/grpc/server/metric_service_server.go | 2 +- core/grpc/server/utils_handle.go | 5 +- core/models/client/model_service.go | 70 ++++++++++------------- core/node/service/worker_service.go | 42 ++++---------- core/task/handler/runner.go | 17 ++---- core/task/handler/service.go | 9 +-- 7 files changed, 95 insertions(+), 96 deletions(-) diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go index a1a71abc..a3616bef 100644 --- a/core/grpc/client/client.go +++ b/core/grpc/client/client.go @@ -3,6 +3,7 @@ package client import ( "context" "encoding/json" + "fmt" "sync" "time" @@ -75,6 +76,21 @@ func (c *GrpcClient) Stop() (err error) { return nil } +func (c *GrpcClient) WaitForReady() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if c.IsReady() { + return + } + case <-c.stop: + log.Errorf("grpc client stopped") + } + } +} + func (c *GrpcClient) register() { c.NodeClient = grpc2.NewNodeServiceClient(c.conn) c.ModelBaseServiceClient = grpc2.NewModelBaseServiceClient(c.conn) @@ -87,8 +103,8 @@ func (c *GrpcClient) Context() (ctx context.Context, cancel context.CancelFunc) return context.WithTimeout(context.Background(), c.timeout) } -func (c *GrpcClient) IsStarted() (res bool) { - return c.conn != nil +func (c *GrpcClient) IsReady() (res bool) { + return c.conn != nil && c.conn.GetState() == connectivity.Ready } func (c *GrpcClient) IsClosed() (res bool) { @@ -132,12 +148,30 @@ func (c *GrpcClient) connect() (err error) { } // connect + log.Infof("[GrpcClient] grpc client connecting to %s", c.address) c.conn.Connect() + + // wait for connection to be ready + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ok := c.conn.WaitForStateChange(ctx, connectivity.Ready) + if !ok { + return fmt.Errorf("[GrpcClient] grpc client failed to connect to %s: timed out", c.address) + } + + // success log.Infof("[GrpcClient] grpc client connected to %s", c.address) return nil } - return backoff.RetryNotify(op, backoff.NewExponentialBackOff(), utils.BackoffErrorNotify("grpc client connect")) + b := backoff.NewExponentialBackOff( + backoff.WithInitialInterval(5*time.Second), + backoff.WithMaxElapsedTime(10*time.Minute), + ) + n := func(err error, duration time.Duration) { + log.Errorf("[GrpcClient] grpc client failed to connect to %s: %v, retrying in %s", c.address, err, duration) + } + return backoff.RetryNotify(op, b, n) } func newGrpcClient() (c *GrpcClient) { @@ -154,6 +188,12 @@ var _clientOnce sync.Once func GetGrpcClient() *GrpcClient { _clientOnce.Do(func() { _client = newGrpcClient() + go func() { + err := _client.Start() + if err != nil { + log.Fatalf("[GrpcClient] failed to start: %v", err) + } + }() }) return _client } diff --git a/core/grpc/server/metric_service_server.go b/core/grpc/server/metric_service_server.go index 7ffb2c34..cadde45a 100644 --- a/core/grpc/server/metric_service_server.go +++ b/core/grpc/server/metric_service_server.go @@ -16,7 +16,7 @@ type MetricServiceServer struct { } func (svr MetricServiceServer) Send(_ context.Context, req *grpc.MetricServiceSendRequest) (res *grpc.Response, err error) { - log.Info("[MetricServiceServer] received metric from node: " + req.NodeKey) + log.Debugf("[MetricServiceServer] received metric from node: " + req.NodeKey) n, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": req.NodeKey}, nil) if err != nil { log.Errorf("[MetricServiceServer] error getting node: %v", err) diff --git a/core/grpc/server/utils_handle.go b/core/grpc/server/utils_handle.go index 756a740f..26c276fb 100644 --- a/core/grpc/server/utils_handle.go +++ b/core/grpc/server/utils_handle.go @@ -2,12 +2,15 @@ package server import ( "encoding/json" + "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/grpc" "github.com/crawlab-team/crawlab/trace" ) func HandleError(err error) (res *grpc.Response, err2 error) { - trace.PrintError(err) + if utils.IsDev() { + trace.PrintError(err) + } return &grpc.Response{ Code: grpc.ResponseCode_ERROR, Error: err.Error(), diff --git a/core/models/client/model_service.go b/core/models/client/model_service.go index c9f396af..85477d9b 100644 --- a/core/models/client/model_service.go +++ b/core/models/client/model_service.go @@ -22,14 +22,13 @@ var ( type ModelService[T any] struct { cfg interfaces.NodeConfigService - c *client.GrpcClient modelType string } func (svc *ModelService[T]) GetById(id primitive.ObjectID) (model *T, err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() - res, err := svc.c.ModelBaseServiceClient.GetById(ctx, &grpc.ModelServiceGetByIdRequest{ + res, err := client.GetGrpcClient().ModelBaseServiceClient.GetById(ctx, &grpc.ModelServiceGetByIdRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Id: id.Hex(), @@ -41,7 +40,7 @@ func (svc *ModelService[T]) GetById(id primitive.ObjectID) (model *T, err error) } func (svc *ModelService[T]) GetOne(query bson.M, options *mongo.FindOptions) (model *T, err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() queryData, err := json.Marshal(query) if err != nil { @@ -51,7 +50,7 @@ func (svc *ModelService[T]) GetOne(query bson.M, options *mongo.FindOptions) (mo if err != nil { return nil, err } - res, err := svc.c.ModelBaseServiceClient.GetOne(ctx, &grpc.ModelServiceGetOneRequest{ + res, err := client.GetGrpcClient().ModelBaseServiceClient.GetOne(ctx, &grpc.ModelServiceGetOneRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Query: queryData, @@ -64,7 +63,7 @@ func (svc *ModelService[T]) GetOne(query bson.M, options *mongo.FindOptions) (mo } func (svc *ModelService[T]) GetMany(query bson.M, options *mongo.FindOptions) (models []T, err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() queryData, err := json.Marshal(query) if err != nil { @@ -74,7 +73,7 @@ func (svc *ModelService[T]) GetMany(query bson.M, options *mongo.FindOptions) (m if err != nil { return nil, err } - res, err := svc.c.ModelBaseServiceClient.GetMany(ctx, &grpc.ModelServiceGetManyRequest{ + res, err := client.GetGrpcClient().ModelBaseServiceClient.GetMany(ctx, &grpc.ModelServiceGetManyRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Query: queryData, @@ -87,9 +86,9 @@ func (svc *ModelService[T]) GetMany(query bson.M, options *mongo.FindOptions) (m } func (svc *ModelService[T]) DeleteById(id primitive.ObjectID) (err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() - _, err = svc.c.ModelBaseServiceClient.DeleteById(ctx, &grpc.ModelServiceDeleteByIdRequest{ + _, err = client.GetGrpcClient().ModelBaseServiceClient.DeleteById(ctx, &grpc.ModelServiceDeleteByIdRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Id: id.Hex(), @@ -101,13 +100,13 @@ func (svc *ModelService[T]) DeleteById(id primitive.ObjectID) (err error) { } func (svc *ModelService[T]) DeleteOne(query bson.M) (err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() queryData, err := json.Marshal(query) if err != nil { return err } - _, err = svc.c.ModelBaseServiceClient.DeleteOne(ctx, &grpc.ModelServiceDeleteOneRequest{ + _, err = client.GetGrpcClient().ModelBaseServiceClient.DeleteOne(ctx, &grpc.ModelServiceDeleteOneRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Query: queryData, @@ -119,13 +118,13 @@ func (svc *ModelService[T]) DeleteOne(query bson.M) (err error) { } func (svc *ModelService[T]) DeleteMany(query bson.M) (err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() queryData, err := json.Marshal(query) if err != nil { return err } - _, err = svc.c.ModelBaseServiceClient.DeleteMany(ctx, &grpc.ModelServiceDeleteManyRequest{ + _, err = client.GetGrpcClient().ModelBaseServiceClient.DeleteMany(ctx, &grpc.ModelServiceDeleteManyRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Query: queryData, @@ -137,13 +136,13 @@ func (svc *ModelService[T]) DeleteMany(query bson.M) (err error) { } func (svc *ModelService[T]) UpdateById(id primitive.ObjectID, update bson.M) (err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() updateData, err := json.Marshal(update) if err != nil { return err } - _, err = svc.c.ModelBaseServiceClient.UpdateById(ctx, &grpc.ModelServiceUpdateByIdRequest{ + _, err = client.GetGrpcClient().ModelBaseServiceClient.UpdateById(ctx, &grpc.ModelServiceUpdateByIdRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Id: id.Hex(), @@ -156,7 +155,7 @@ func (svc *ModelService[T]) UpdateById(id primitive.ObjectID, update bson.M) (er } func (svc *ModelService[T]) UpdateOne(query bson.M, update bson.M) (err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() queryData, err := json.Marshal(query) if err != nil { @@ -166,7 +165,7 @@ func (svc *ModelService[T]) UpdateOne(query bson.M, update bson.M) (err error) { if err != nil { return err } - _, err = svc.c.ModelBaseServiceClient.UpdateOne(ctx, &grpc.ModelServiceUpdateOneRequest{ + _, err = client.GetGrpcClient().ModelBaseServiceClient.UpdateOne(ctx, &grpc.ModelServiceUpdateOneRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Query: queryData, @@ -179,7 +178,7 @@ func (svc *ModelService[T]) UpdateOne(query bson.M, update bson.M) (err error) { } func (svc *ModelService[T]) UpdateMany(query bson.M, update bson.M) (err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() queryData, err := json.Marshal(query) if err != nil { @@ -189,7 +188,7 @@ func (svc *ModelService[T]) UpdateMany(query bson.M, update bson.M) (err error) if err != nil { return err } - _, err = svc.c.ModelBaseServiceClient.UpdateMany(ctx, &grpc.ModelServiceUpdateManyRequest{ + _, err = client.GetGrpcClient().ModelBaseServiceClient.UpdateMany(ctx, &grpc.ModelServiceUpdateManyRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Query: queryData, @@ -199,13 +198,13 @@ func (svc *ModelService[T]) UpdateMany(query bson.M, update bson.M) (err error) } func (svc *ModelService[T]) ReplaceById(id primitive.ObjectID, model T) (err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() modelData, err := json.Marshal(model) if err != nil { return err } - _, err = svc.c.ModelBaseServiceClient.ReplaceById(ctx, &grpc.ModelServiceReplaceByIdRequest{ + _, err = client.GetGrpcClient().ModelBaseServiceClient.ReplaceById(ctx, &grpc.ModelServiceReplaceByIdRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Id: id.Hex(), @@ -218,7 +217,7 @@ func (svc *ModelService[T]) ReplaceById(id primitive.ObjectID, model T) (err err } func (svc *ModelService[T]) ReplaceOne(query bson.M, model T) (err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() queryData, err := json.Marshal(query) if err != nil { @@ -228,7 +227,7 @@ func (svc *ModelService[T]) ReplaceOne(query bson.M, model T) (err error) { if err != nil { return err } - _, err = svc.c.ModelBaseServiceClient.ReplaceOne(ctx, &grpc.ModelServiceReplaceOneRequest{ + _, err = client.GetGrpcClient().ModelBaseServiceClient.ReplaceOne(ctx, &grpc.ModelServiceReplaceOneRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Query: queryData, @@ -241,13 +240,13 @@ func (svc *ModelService[T]) ReplaceOne(query bson.M, model T) (err error) { } func (svc *ModelService[T]) InsertOne(model T) (id primitive.ObjectID, err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() modelData, err := json.Marshal(model) if err != nil { return primitive.NilObjectID, err } - res, err := svc.c.ModelBaseServiceClient.InsertOne(ctx, &grpc.ModelServiceInsertOneRequest{ + res, err := client.GetGrpcClient().ModelBaseServiceClient.InsertOne(ctx, &grpc.ModelServiceInsertOneRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Model: modelData, @@ -259,13 +258,13 @@ func (svc *ModelService[T]) InsertOne(model T) (id primitive.ObjectID, err error } func (svc *ModelService[T]) InsertMany(models []T) (ids []primitive.ObjectID, err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() modelsData, err := json.Marshal(models) if err != nil { return nil, err } - res, err := svc.c.ModelBaseServiceClient.InsertMany(ctx, &grpc.ModelServiceInsertManyRequest{ + res, err := client.GetGrpcClient().ModelBaseServiceClient.InsertMany(ctx, &grpc.ModelServiceInsertManyRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Models: modelsData, @@ -277,7 +276,7 @@ func (svc *ModelService[T]) InsertMany(models []T) (ids []primitive.ObjectID, er } func (svc *ModelService[T]) UpsertOne(query bson.M, model T) (id primitive.ObjectID, err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() queryData, err := json.Marshal(query) if err != nil { @@ -287,7 +286,7 @@ func (svc *ModelService[T]) UpsertOne(query bson.M, model T) (id primitive.Objec if err != nil { return primitive.NilObjectID, err } - res, err := svc.c.ModelBaseServiceClient.UpsertOne(ctx, &grpc.ModelServiceUpsertOneRequest{ + res, err := client.GetGrpcClient().ModelBaseServiceClient.UpsertOne(ctx, &grpc.ModelServiceUpsertOneRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Query: queryData, @@ -301,13 +300,13 @@ func (svc *ModelService[T]) UpsertOne(query bson.M, model T) (id primitive.Objec } func (svc *ModelService[T]) Count(query bson.M) (total int, err error) { - ctx, cancel := svc.c.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() queryData, err := json.Marshal(query) if err != nil { return 0, err } - res, err := svc.c.ModelBaseServiceClient.Count(ctx, &grpc.ModelServiceCountRequest{ + res, err := client.GetGrpcClient().ModelBaseServiceClient.Count(ctx, &grpc.ModelServiceCountRequest{ NodeKey: svc.cfg.GetNodeKey(), ModelType: svc.modelType, Query: queryData, @@ -356,18 +355,9 @@ func NewModelService[T any]() *ModelService[T] { var instance *ModelService[T] - c := client.GetGrpcClient() - if !c.IsStarted() { - err := c.Start() - if err != nil { - panic(err) - } - } - onceMap[typeName].Do(func() { instance = &ModelService[T]{ cfg: nodeconfig.GetNodeConfigService(), - c: c, modelType: typeName, } instanceMap[typeName] = instance diff --git a/core/node/service/worker_service.go b/core/node/service/worker_service.go index edc558e2..dfa59caf 100644 --- a/core/node/service/worker_service.go +++ b/core/node/service/worker_service.go @@ -21,14 +21,12 @@ import ( "github.com/crawlab-team/crawlab/core/task/handler" "github.com/crawlab-team/crawlab/core/utils" "github.com/crawlab-team/crawlab/grpc" - "github.com/crawlab-team/crawlab/trace" "go.mongodb.org/mongo-driver/bson" ) type WorkerService struct { // dependencies cfgSvc interfaces.NodeConfigService - client *client.GrpcClient handlerSvc *handler.Service // settings @@ -43,25 +41,8 @@ type WorkerService struct { } func (svc *WorkerService) Start() { - // start grpc client (retry if failed) - err := backoff.RetryNotify( - func() error { - return svc.client.Start() - }, - backoff.NewExponentialBackOff( - backoff.WithInitialInterval(1*time.Second), - backoff.WithMaxInterval(1*time.Minute), - backoff.WithMaxElapsedTime(10*time.Minute), - ), - func(err error, duration time.Duration) { - log.Errorf("failed to start grpc client: %v", err) - log.Infof("retrying in %s", duration) - }, - ) - if err != nil { - log.Fatalf("failed to start grpc client: %v", err) - panic(err) - } + // wait for grpc client ready + client.GetGrpcClient().WaitForReady() // register to master svc.register() @@ -94,15 +75,15 @@ func (svc *WorkerService) Wait() { func (svc *WorkerService) Stop() { svc.stopped = true - _ = svc.client.Stop() + _ = client.GetGrpcClient().Stop() svc.handlerSvc.Stop() log.Infof("worker[%s] service has stopped", svc.cfgSvc.GetNodeKey()) } func (svc *WorkerService) register() { - ctx, cancel := svc.client.Context() + ctx, cancel := client.GetGrpcClient().Context() defer cancel() - _, err := svc.client.NodeClient.Register(ctx, &grpc.NodeServiceRegisterRequest{ + _, err := client.GetGrpcClient().NodeClient.Register(ctx, &grpc.NodeServiceRegisterRequest{ NodeKey: svc.cfgSvc.GetNodeKey(), NodeName: svc.cfgSvc.GetNodeName(), MaxRunners: int32(svc.cfgSvc.GetMaxRunners()), @@ -124,7 +105,7 @@ func (svc *WorkerService) reportStatus() { ticker := time.NewTicker(svc.heartbeatInterval) for { // return if client is closed - if svc.client.IsClosed() { + if client.GetGrpcClient().IsClosed() { ticker.Stop() return } @@ -156,7 +137,7 @@ func (svc *WorkerService) subscribe() { // Use backoff for connection attempts operation := func() error { - stream, err := svc.client.NodeClient.Subscribe(context.Background(), &grpc.NodeServiceSubscribeRequest{ + stream, err := client.GetGrpcClient().NodeClient.Subscribe(context.Background(), &grpc.NodeServiceSubscribeRequest{ NodeKey: svc.cfgSvc.GetNodeKey(), }) if err != nil { @@ -172,7 +153,7 @@ func (svc *WorkerService) subscribe() { msg, err := stream.Recv() if err != nil { - if svc.client.IsClosed() { + if client.GetGrpcClient().IsClosed() { log.Errorf("connection to master is closed: %v", err) return err } @@ -202,11 +183,11 @@ func (svc *WorkerService) subscribe() { func (svc *WorkerService) sendHeartbeat() { ctx, cancel := context.WithTimeout(context.Background(), svc.heartbeatInterval) defer cancel() - _, err := svc.client.NodeClient.SendHeartbeat(ctx, &grpc.NodeServiceSendHeartbeatRequest{ + _, err := client.GetGrpcClient().NodeClient.SendHeartbeat(ctx, &grpc.NodeServiceSendHeartbeatRequest{ NodeKey: svc.cfgSvc.GetNodeKey(), }) if err != nil { - trace.PrintError(err) + log.Errorf("failed to send heartbeat to master: %v", err) } } @@ -214,7 +195,7 @@ func (svc *WorkerService) startHealthServer() { // handlers app := gin.New() app.GET("/health", controllers.GetHealthFn(func() bool { - return svc.isReady && !svc.stopped && svc.client != nil && !svc.client.IsClosed() + return svc.isReady && !svc.stopped && client.GetGrpcClient() != nil && !client.GetGrpcClient().IsClosed() })) // listen @@ -237,7 +218,6 @@ func newWorkerService() *WorkerService { return &WorkerService{ heartbeatInterval: 15 * time.Second, cfgSvc: nodeconfig.GetNodeConfigService(), - client: client.GetGrpcClient(), handlerSvc: handler.GetTaskHandlerService(), isReady: false, } diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index a8603b2e..ca6a4d77 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -53,7 +53,6 @@ type Runner struct { ch chan constants.TaskSignal // channel for task status communication err error // captures any process execution errors cwd string // current working directory for task - c *client2.GrpcClient // gRPC client for communication conn grpc.TaskService_ConnectClient // gRPC stream connection for task service // log handling @@ -81,13 +80,8 @@ func (r *Runner) Init() (err error) { return err } - // start grpc client - if !r.c.IsStarted() { - err := r.c.Start() - if err != nil { - return err - } - } + // wait for grpc client ready + client2.GetGrpcClient().WaitForReady() // grpc task service stream client if err := r.initConnection(); err != nil { @@ -599,7 +593,7 @@ func (r *Runner) updateTask(status string, e error) (err error) { // initConnection establishes a gRPC connection to the task service func (r *Runner) initConnection() (err error) { - r.conn, err = r.c.TaskClient.Connect(context.Background()) + r.conn, err = client2.GetGrpcClient().TaskClient.Connect(context.Background()) if err != nil { log.Errorf("error connecting to task service: %v", err) return err @@ -670,7 +664,7 @@ func (r *Runner) sendNotification() { NodeKey: r.svc.GetNodeConfigService().GetNodeKey(), TaskId: r.tid.Hex(), } - _, err := r.c.TaskClient.SendNotification(context.Background(), req) + _, err := client2.GetGrpcClient().TaskClient.SendNotification(context.Background(), req) if err != nil { log.Errorf("error sending notification: %v", err) trace.PrintError(err) @@ -930,9 +924,6 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) { } } - // grpc client - r.c = client2.GetGrpcClient() - // Initialize context and done channel r.ctx, r.cancel = context.WithCancel(context.Background()) r.done = make(chan struct{}) diff --git a/core/task/handler/service.go b/core/task/handler/service.go index 6ccb263e..12b6d284 100644 --- a/core/task/handler/service.go +++ b/core/task/handler/service.go @@ -41,13 +41,8 @@ type Service struct { } func (svc *Service) Start() { - // Initialize gRPC if not started - if !svc.c.IsStarted() { - err := svc.c.Start() - if err != nil { - return - } - } + // wait for grpc client ready + grpcclient.GetGrpcClient().WaitForReady() go svc.reportStatus() go svc.fetchAndRunTasks()