From 20ba390cf62b74eca2a48bbcdd1d8bfdacc75794 Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Wed, 9 Jul 2025 14:06:10 +0800 Subject: [PATCH] refactor: improve mongo client connection error logging format and remove redundant gRPC server start in MasterService --- core/grpc/client/client.go | 688 ++++++++++++++++++---------- core/mongo/client.go | 2 +- core/node/service/master_service.go | 11 +- 3 files changed, 441 insertions(+), 260 deletions(-) diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go index 494473e3..956fb8ea 100644 --- a/core/grpc/client/client.go +++ b/core/grpc/client/client.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/cenkalti/backoff/v4" "github.com/crawlab-team/crawlab/core/grpc/middlewares" "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/utils" @@ -14,9 +13,40 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" ) +// Circuit breaker constants +const ( + maxFailures = 5 + cbResetTime = 2 * time.Minute + cbHalfOpenRetryInterval = 30 * time.Second + healthCheckInterval = 30 * time.Second + stateMonitorInterval = 5 * time.Second + registrationCheckInterval = 100 * time.Millisecond + idleGracePeriod = 30 * time.Second + connectionTimeout = 30 * time.Second + defaultClientTimeout = 5 * time.Second +) + +// Circuit breaker states +type circuitBreakerState int + +const ( + cbClosed circuitBreakerState = iota + cbOpen + cbHalfOpen +) + +// min function for calculating backoff +func min(a, b int) int { + if a < b { + return a + } + return b +} + // GrpcClient provides a robust gRPC client with connection management and client registration. // // The client handles connection lifecycle and ensures that gRPC service clients are properly @@ -76,44 +106,63 @@ type GrpcClient struct { // Registration status registered bool registeredMux sync.RWMutex + + // Health monitoring + healthClient grpc_health_v1.HealthClient } func (c *GrpcClient) Start() { c.once.Do(func() { + // initialize stop channel before any goroutines + if c.stop == nil { + c.stop = make(chan struct{}) + } + // initialize reconnect channel - c.reconnect = make(chan struct{}) + c.reconnect = make(chan struct{}, 1) // Make it buffered to prevent blocking - // start state monitor - go c.monitorState() - - // connect (this will also register services) + // connect first, then start monitoring err := c.connect() if err != nil { - c.Fatalf("failed to connect: %v", err) - return + c.Errorf("failed initial connection, will retry: %v", err) + // Don't fatal here, let reconnection handle it } + + // start monitoring after connection attempt + go c.monitorState() + + // start health monitoring + go c.startHealthMonitor() }) } -func (c *GrpcClient) Stop() (err error) { - // set stopped flag - c.stopped = true - c.setRegistered(false) - c.stop <- struct{}{} - c.Infof("stopped") - - // skip if connection is nil - if c.conn == nil { +func (c *GrpcClient) Stop() error { + // Prevent multiple stops + c.reconnectMux.Lock() + if c.stopped { + c.reconnectMux.Unlock() return nil } + c.stopped = true + c.reconnectMux.Unlock() - // close connection - if err := c.conn.Close(); err != nil { - c.Errorf("failed to close connection: %v", err) - return err + c.setRegistered(false) + + // Close channels safely + select { + case c.stop <- struct{}{}: + default: } - c.Infof("disconnected from %s", c.address) + // Close connection + if c.conn != nil { + if err := c.conn.Close(); err != nil { + c.Errorf("failed to close connection: %v", err) + return err + } + } + + c.Infof("stopped and disconnected from %s", c.address) return nil } @@ -139,6 +188,7 @@ func (c *GrpcClient) register() { c.taskClient = grpc2.NewTaskServiceClient(c.conn) c.dependencyClient = grpc2.NewDependencyServiceClient(c.conn) c.metricClient = grpc2.NewMetricServiceClient(c.conn) + c.healthClient = grpc_health_v1.NewHealthClient(c.conn) // Mark as registered c.setRegistered(true) @@ -165,70 +215,88 @@ func (c *GrpcClient) IsClosed() (res bool) { } func (c *GrpcClient) monitorState() { - idleStartTime := time.Time{} - idleGracePeriod := 30 * time.Second // Allow IDLE state for 30 seconds before considering it a problem + defer func() { + if r := recover(); r != nil { + c.Errorf("state monitor panic recovered: %v", r) + } + }() + + var ( + idleStartTime = time.Time{} + ticker = time.NewTicker(stateMonitorInterval) // Reduce frequency + ) + defer ticker.Stop() for { select { case <-c.stop: + c.Debugf("state monitor stopping") return - default: - if c.conn == nil { - time.Sleep(time.Second) - continue + case <-ticker.C: + if c.stopped { + return } - previous := c.getState() - current := c.conn.GetState() - - if previous != current { - c.setState(current) - c.Infof("state changed from %s to %s", previous, current) - - // Handle state transitions more intelligently - switch current { - case connectivity.TransientFailure, connectivity.Shutdown: - // Always reconnect on actual failures, but respect circuit breaker - if !c.isCircuitBreakerOpen() { - select { - case c.reconnect <- struct{}{}: - c.Infof("triggering reconnection due to state change to %s", current) - default: - } - } else { - c.Debugf("circuit breaker open, not triggering reconnection for state %s", current) - } - case connectivity.Idle: - if previous == connectivity.Ready { - // Start grace period timer for IDLE state - idleStartTime = time.Now() - c.Debugf("connection went IDLE, starting grace period") - } - case connectivity.Ready: - // Reset idle timer when connection becomes ready - idleStartTime = time.Time{} - // Record successful connection - c.recordSuccess() - } - } - - // Check if IDLE state has exceeded grace period - if current == connectivity.Idle && !idleStartTime.IsZero() && - time.Since(idleStartTime) > idleGracePeriod && !c.isCircuitBreakerOpen() { - c.Warnf("connection has been IDLE for %v, triggering reconnection", time.Since(idleStartTime)) - select { - case c.reconnect <- struct{}{}: - c.Infof("triggering reconnection due to prolonged IDLE state") - default: - } - idleStartTime = time.Time{} // Reset timer to avoid repeated reconnections - } - - time.Sleep(time.Second) + c.checkAndHandleStateChange(&idleStartTime) } } } +func (c *GrpcClient) checkAndHandleStateChange(idleStartTime *time.Time) { + if c.conn == nil { + return + } + + previous := c.getState() + current := c.conn.GetState() + + if previous == current { + // Handle prolonged IDLE state + if current == connectivity.Idle && !idleStartTime.IsZero() && + time.Since(*idleStartTime) > idleGracePeriod { + c.triggerReconnection("prolonged IDLE state") + *idleStartTime = time.Time{} + } + return + } + + // State changed + c.setState(current) + c.Infof("connection state: %s -> %s", previous, current) + + switch current { + case connectivity.TransientFailure, connectivity.Shutdown: + c.setRegistered(false) + c.triggerReconnection(fmt.Sprintf("state change to %s", current)) + + case connectivity.Idle: + if previous == connectivity.Ready { + *idleStartTime = time.Now() + c.Debugf("connection went IDLE, grace period started") + } + + case connectivity.Ready: + *idleStartTime = time.Time{} + c.recordSuccess() + if !c.IsRegistered() { + c.register() // Re-register if needed + } + } +} + +func (c *GrpcClient) triggerReconnection(reason string) { + if c.stopped || c.isCircuitBreakerOpen() { + return + } + + select { + case c.reconnect <- struct{}{}: + c.Infof("reconnection triggered: %s", reason) + default: + c.Debugf("reconnection already queued") + } +} + func (c *GrpcClient) setState(state connectivity.State) { c.stateMux.Lock() defer c.stateMux.Unlock() @@ -254,7 +322,7 @@ func (c *GrpcClient) IsRegistered() bool { } func (c *GrpcClient) WaitForRegistered() { - ticker := time.NewTicker(100 * time.Millisecond) + ticker := time.NewTicker(registrationCheckInterval) defer ticker.Stop() for { select { @@ -274,73 +342,38 @@ func (c *GrpcClient) WaitForRegistered() { // These methods will wait for registration to complete or return an error if the client is stopped func (c *GrpcClient) GetNodeClient() (grpc2.NodeServiceClient, error) { - if c.stopped { - return nil, fmt.Errorf("grpc client is stopped") - } - if !c.IsRegistered() { - c.Debugf("waiting for node client registration") - c.WaitForRegistered() - if c.stopped { - return nil, fmt.Errorf("grpc client stopped while waiting for registration") - } - } - return c.nodeClient, nil + ctx, cancel := context.WithTimeout(context.Background(), defaultClientTimeout) + defer cancel() + + return c.GetNodeClientWithContext(ctx) } func (c *GrpcClient) GetTaskClient() (grpc2.TaskServiceClient, error) { - if c.stopped { - return nil, fmt.Errorf("grpc client is stopped") - } - if !c.IsRegistered() { - c.Debugf("waiting for task client registration") - c.WaitForRegistered() - if c.stopped { - return nil, fmt.Errorf("grpc client stopped while waiting for registration") - } - } - return c.taskClient, nil + ctx, cancel := context.WithTimeout(context.Background(), defaultClientTimeout) + defer cancel() + + return c.GetTaskClientWithContext(ctx) } func (c *GrpcClient) GetModelBaseServiceClient() (grpc2.ModelBaseServiceClient, error) { - if c.stopped { - return nil, fmt.Errorf("grpc client is stopped") - } - if !c.IsRegistered() { - c.Debugf("waiting for model base service client registration") - c.WaitForRegistered() - if c.stopped { - return nil, fmt.Errorf("grpc client stopped while waiting for registration") - } - } - return c.modelBaseServiceClient, nil + ctx, cancel := context.WithTimeout(context.Background(), defaultClientTimeout) + defer cancel() + + return c.GetModelBaseServiceClientWithContext(ctx) } func (c *GrpcClient) GetDependencyClient() (grpc2.DependencyServiceClient, error) { - if c.stopped { - return nil, fmt.Errorf("grpc client is stopped") - } - if !c.IsRegistered() { - c.Debugf("waiting for dependency client registration") - c.WaitForRegistered() - if c.stopped { - return nil, fmt.Errorf("grpc client stopped while waiting for registration") - } - } - return c.dependencyClient, nil + ctx, cancel := context.WithTimeout(context.Background(), defaultClientTimeout) + defer cancel() + + return c.GetDependencyClientWithContext(ctx) } func (c *GrpcClient) GetMetricClient() (grpc2.MetricServiceClient, error) { - if c.stopped { - return nil, fmt.Errorf("grpc client is stopped") - } - if !c.IsRegistered() { - c.Debugf("waiting for metric client registration") - c.WaitForRegistered() - if c.stopped { - return nil, fmt.Errorf("grpc client stopped while waiting for registration") - } - } - return c.metricClient, nil + ctx, cancel := context.WithTimeout(context.Background(), defaultClientTimeout) + defer cancel() + + return c.GetMetricClientWithContext(ctx) } // Safe client getters with timeout - these methods will wait up to the specified timeout @@ -407,7 +440,7 @@ func (c *GrpcClient) GetMetricClientWithTimeout(timeout time.Duration) (grpc2.Me } func (c *GrpcClient) waitForRegisteredWithTimeout(timeout time.Duration) error { - ticker := time.NewTicker(100 * time.Millisecond) + ticker := time.NewTicker(registrationCheckInterval) defer ticker.Stop() timer := time.NewTimer(timeout) defer timer.Stop() @@ -427,69 +460,171 @@ func (c *GrpcClient) waitForRegisteredWithTimeout(timeout time.Duration) error { } } -func (c *GrpcClient) connect() (err error) { - // Start reconnection loop with proper cleanup - go func() { - defer func() { - if r := recover(); r != nil { - c.Errorf("reconnection loop panic: %v", r) - } - }() +// Context-aware client getters +func (c *GrpcClient) GetNodeClientWithContext(ctx context.Context) (grpc2.NodeServiceClient, error) { + client, err := c.getClientWithContext(ctx, func() interface{} { return c.nodeClient }, "node") + if err != nil { + return nil, err + } + return client.(grpc2.NodeServiceClient), nil +} - for { - select { - case <-c.stop: - c.Debugf("reconnection loop stopping") - return - case <-c.reconnect: - // Check if we're already reconnecting to avoid multiple attempts - c.reconnectMux.Lock() - if c.reconnecting { - c.Debugf("reconnection already in progress, skipping") - c.reconnectMux.Unlock() - continue - } - c.reconnecting = true - c.reconnectMux.Unlock() +func (c *GrpcClient) GetTaskClientWithContext(ctx context.Context) (grpc2.TaskServiceClient, error) { + client, err := c.getClientWithContext(ctx, func() interface{} { return c.taskClient }, "task") + if err != nil { + return nil, err + } + return client.(grpc2.TaskServiceClient), nil +} - if !c.stopped && !c.isCircuitBreakerOpen() { - c.Infof("attempting to reconnect to %s", c.address) - if err := c.doConnect(); err != nil { - c.Errorf("reconnection failed: %v", err) - c.recordFailure() - // Add a brief delay before allowing next reconnection attempt - time.Sleep(2 * time.Second) - } else { - c.recordSuccess() - } - } else if c.isCircuitBreakerOpen() { - c.Debugf("circuit breaker is open, skipping reconnection attempt") - } +func (c *GrpcClient) GetModelBaseServiceClientWithContext(ctx context.Context) (grpc2.ModelBaseServiceClient, error) { + client, err := c.getClientWithContext(ctx, func() interface{} { return c.modelBaseServiceClient }, "model base service") + if err != nil { + return nil, err + } + return client.(grpc2.ModelBaseServiceClient), nil +} - // Reset reconnecting flag - c.reconnectMux.Lock() - c.reconnecting = false - c.reconnectMux.Unlock() +func (c *GrpcClient) GetDependencyClientWithContext(ctx context.Context) (grpc2.DependencyServiceClient, error) { + client, err := c.getClientWithContext(ctx, func() interface{} { return c.dependencyClient }, "dependency") + if err != nil { + return nil, err + } + return client.(grpc2.DependencyServiceClient), nil +} + +func (c *GrpcClient) GetMetricClientWithContext(ctx context.Context) (grpc2.MetricServiceClient, error) { + client, err := c.getClientWithContext(ctx, func() interface{} { return c.metricClient }, "metric") + if err != nil { + return nil, err + } + return client.(grpc2.MetricServiceClient), nil +} + +func (c *GrpcClient) getClientWithContext(ctx context.Context, getter func() interface{}, clientType string) (interface{}, error) { + if c.stopped { + return nil, fmt.Errorf("grpc client is stopped") + } + + if c.IsRegistered() { + return getter(), nil + } + + // Wait for registration with context + ticker := time.NewTicker(registrationCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled while waiting for %s client registration", clientType) + case <-c.stop: + return nil, fmt.Errorf("client stopped while waiting for %s client registration", clientType) + case <-ticker.C: + if c.IsRegistered() { + return getter(), nil } } - }() + } +} +func (c *GrpcClient) connect() error { + // Use a separate goroutine for reconnection handling + go c.handleReconnections() + + // Initial connection attempt return c.doConnect() } -// Circuit breaker methods -func (c *GrpcClient) isCircuitBreakerOpen() bool { +func (c *GrpcClient) handleReconnections() { + defer func() { + if r := recover(); r != nil { + c.Errorf("reconnection handler panic: %v", r) + } + }() + + for { + select { + case <-c.stop: + c.Debugf("reconnection handler stopping") + return + + case <-c.reconnect: + if c.stopped || !c.canAttemptConnection() { + continue + } + + c.executeReconnection() + } + } +} + +func (c *GrpcClient) executeReconnection() { + c.reconnectMux.Lock() + if c.reconnecting { + c.reconnectMux.Unlock() + return + } + c.reconnecting = true + c.reconnectMux.Unlock() + + defer func() { + c.reconnectMux.Lock() + c.reconnecting = false + c.reconnectMux.Unlock() + }() + + c.Infof("executing reconnection to %s", c.address) + + if err := c.doConnect(); err != nil { + c.Errorf("reconnection failed: %v", err) + c.recordFailure() + + // Exponential backoff before allowing next attempt + backoffDuration := c.calculateBackoff() + time.Sleep(backoffDuration) + } else { + c.recordSuccess() + c.Infof("reconnection successful") + } +} + +// Enhanced circuit breaker methods +func (c *GrpcClient) getCircuitBreakerState() circuitBreakerState { c.cbMux.RLock() defer c.cbMux.RUnlock() - // Circuit breaker opens after 5 consecutive failures - if c.failureCount >= 5 { - // Auto-recover after 1 minute - if time.Since(c.lastFailure) > 1*time.Minute { - return false - } - return true + if c.failureCount < maxFailures { + return cbClosed } + + timeSinceLastFailure := time.Since(c.lastFailure) + if timeSinceLastFailure > cbResetTime { + return cbHalfOpen + } + + return cbOpen +} + +func (c *GrpcClient) isCircuitBreakerOpen() bool { + return c.getCircuitBreakerState() == cbOpen +} + +func (c *GrpcClient) canAttemptConnection() bool { + state := c.getCircuitBreakerState() + + switch state { + case cbClosed: + return true + case cbHalfOpen: + c.cbMux.RLock() + canRetry := time.Since(c.lastFailure) > cbHalfOpenRetryInterval + c.cbMux.RUnlock() + return canRetry + case cbOpen: + return false + } + return false } @@ -498,7 +633,7 @@ func (c *GrpcClient) recordFailure() { defer c.cbMux.Unlock() c.failureCount++ c.lastFailure = time.Now() - if c.failureCount >= 5 { + if c.failureCount >= maxFailures { c.Warnf("circuit breaker opened after %d consecutive failures", c.failureCount) } } @@ -513,89 +648,136 @@ func (c *GrpcClient) recordSuccess() { c.lastFailure = time.Time{} } -func (c *GrpcClient) doConnect() (err error) { - op := func() error { - // Mark as not registered during connection attempt - c.setRegistered(false) +func (c *GrpcClient) calculateBackoff() time.Duration { + c.cbMux.RLock() + failures := c.failureCount + c.cbMux.RUnlock() - // Close existing connection if any - if c.conn != nil { - if err := c.conn.Close(); err != nil { - c.Debugf("failed to close existing connection: %v", err) - } - } - - // connection options with better settings for stability - opts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithChainUnaryInterceptor(middlewares.GetGrpcClientAuthTokenUnaryChainInterceptor()), - grpc.WithChainStreamInterceptor(middlewares.GetGrpcClientAuthTokenStreamChainInterceptor()), - // Add keep-alive settings to maintain connection health - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 30 * time.Second, // Send ping every 30 seconds - Timeout: 5 * time.Second, // Wait 5 seconds for ping response - PermitWithoutStream: true, // Send pings even without active streams - }), - } - - // create new client connection - c.conn, err = grpc.NewClient(c.address, opts...) - if err != nil { - c.Errorf("failed to create connection to %s: %v", c.address, err) - return err - } - - // connect - c.Infof("connecting to %s", c.address) - c.conn.Connect() - - // wait for connection to be ready with shorter timeout for faster failure detection - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - - // Wait for state to change from connecting - for c.conn.GetState() == connectivity.Connecting { - if !c.conn.WaitForStateChange(ctx, connectivity.Connecting) { - return fmt.Errorf("failed to connect to %s: connection timeout", c.address) - } - } - - // Check final state - state := c.conn.GetState() - if state != connectivity.Ready { - return fmt.Errorf("failed to connect to %s: final state is %s", c.address, state) - } - - // success - c.Infof("connected to %s", c.address) - - // Register services after successful connection - c.register() - - return nil + // Exponential backoff: 1s, 2s, 4s, 8s, max 30s + backoff := time.Duration(1< 30*time.Second { + backoff = 30 * time.Second } - // Configure backoff with more reasonable settings - b := backoff.NewExponentialBackOff() - b.InitialInterval = 1 * time.Second // Start with shorter interval - b.MaxInterval = 30 * time.Second // Cap the max interval - b.MaxElapsedTime = 5 * time.Minute // Reduce max retry time - b.Multiplier = 1.5 // Gentler exponential growth + return backoff +} - n := func(err error, duration time.Duration) { - c.Errorf("failed to connect to %s: %v, retrying in %s", c.address, err, duration) +func (c *GrpcClient) doConnect() error { + c.setRegistered(false) + + // Close existing connection + if c.conn != nil { + c.conn.Close() + c.conn = nil } - err = backoff.RetryNotify(op, b, n) + opts := c.getDialOptions() + + // Create connection with context timeout - using NewClient instead of DialContext + conn, err := grpc.NewClient(c.address, opts...) if err != nil { - c.recordFailure() + return fmt.Errorf("failed to create client for %s: %w", c.address, err) + } + + c.conn = conn + + // Connect and wait for ready state with timeout + ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) + defer cancel() + + c.conn.Connect() + if err := c.waitForConnectionReady(ctx); err != nil { + c.conn.Close() + c.conn = nil return err } - c.recordSuccess() + c.Infof("connected to %s", c.address) + c.register() + return nil } +func (c *GrpcClient) getDialOptions() []grpc.DialOption { + return []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithChainUnaryInterceptor(middlewares.GetGrpcClientAuthTokenUnaryChainInterceptor()), + grpc.WithChainStreamInterceptor(middlewares.GetGrpcClientAuthTokenStreamChainInterceptor()), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 20 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + }), + grpc.WithDefaultCallOptions( + grpc.WaitForReady(false), // Fail fast for initial connection + grpc.MaxCallRecvMsgSize(4*1024*1024), + grpc.MaxCallSendMsgSize(4*1024*1024), + ), + grpc.WithInitialWindowSize(65535), + grpc.WithInitialConnWindowSize(65535), + } +} + +func (c *GrpcClient) waitForConnectionReady(ctx context.Context) error { + for { + state := c.conn.GetState() + switch state { + case connectivity.Ready: + return nil + case connectivity.TransientFailure, connectivity.Shutdown: + return fmt.Errorf("connection failed with state: %s", state) + } + + if !c.conn.WaitForStateChange(ctx, state) { + return fmt.Errorf("connection timeout") + } + } +} + +// Health monitoring methods +func (c *GrpcClient) startHealthMonitor() { + go func() { + defer func() { + if r := recover(); r != nil { + c.Errorf("health monitor panic: %v", r) + } + }() + + ticker := time.NewTicker(healthCheckInterval) + defer ticker.Stop() + + for { + select { + case <-c.stop: + c.Debugf("health monitor stopping") + return + case <-ticker.C: + if !c.stopped { + c.performHealthCheck() + } + } + } + }() +} + +func (c *GrpcClient) performHealthCheck() { + if !c.IsReady() || !c.IsRegistered() { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := c.healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + + if err != nil { + c.Warnf("health check failed: %v", err) + c.triggerReconnection("health check failure") + } else { + c.Debugf("health check passed") + } +} + func newGrpcClient() (c *GrpcClient) { client := &GrpcClient{ address: utils.GetGrpcAddress(), diff --git a/core/mongo/client.go b/core/mongo/client.go index 00bc7081..235e7559 100644 --- a/core/mongo/client.go +++ b/core/mongo/client.go @@ -146,7 +146,7 @@ func newMongoClient(_opts *ClientOptions) (c *mongo.Client, err error) { } b := backoff.NewExponentialBackOff() n := func(err error, duration time.Duration) { - logger.Errorf("connect to mongo error: %v. retrying in %s", err, duration) + logger.Errorf("connect to mongo error: %v. retrying in %.1fs", err, duration.Seconds()) } err = backoff.RetryNotify(op, b, n) if err != nil { diff --git a/core/node/service/master_service.go b/core/node/service/master_service.go index 6dc7fc79..84222181 100644 --- a/core/node/service/master_service.go +++ b/core/node/service/master_service.go @@ -2,6 +2,9 @@ package service import ( "errors" + "sync" + "time" + "github.com/apex/log" "github.com/cenkalti/backoff/v4" "github.com/crawlab-team/crawlab/core/constants" @@ -21,8 +24,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" mongo2 "go.mongodb.org/mongo-driver/mongo" - "sync" - "time" ) type MasterService struct { @@ -42,10 +43,8 @@ type MasterService struct { } func (svc *MasterService) Start() { - // start grpc server - if err := svc.server.Start(); err != nil { - panic(err) - } + // gRPC server is now started earlier in main.go to avoid race conditions + // No need to start it here anymore // register to db if err := svc.Register(); err != nil {