feat(sync): add gRPC file synchronization service and integrate end-to-end

- add proto/services/sync_service.proto and generate Go pb + grpc bindings
- implement SyncServiceServer (streaming file scan + download) with:
  - request deduplication, in-memory cache (TTL), chunked streaming
  - concurrent-safe broadcast to waiters and server-side logging
- register SyncSvr in gRPC server and expose sync client in GrpcClient:
  - add syncClient field, registration and safe getters with reconnection-aware timeouts
- integrate gRPC sync into runner:
  - split syncFiles into syncFilesHTTP (legacy) and syncFilesGRPC
  - Runner now chooses implementation via config flag and performs streaming scan/download
- controller improvements:
  - add semaphore-based rate limiting for sync scan requests with in-flight counters and logs
- misc:
  - add utils.IsSyncGrpcEnabled() config helper
  - improve HTTP sync error diagnostics (Content-Type validation, response previews)
  - update/regenerate many protobuf and gRPC generated files (protoc/protoc-gen-go / protoc-gen-go-grpc version bumps)
This commit is contained in:
Marvin Zhang
2025-10-20 12:48:53 +08:00
parent 61604e1817
commit f441265cc2
22 changed files with 1860 additions and 1195 deletions

View File

@@ -15,9 +15,29 @@ import (
var (
syncDownloadSemaphore = semaphore.NewWeighted(utils.GetSyncDownloadMaxConcurrency())
syncDownloadInFlight int64
syncScanSemaphore = semaphore.NewWeighted(10) // Limit concurrent scan requests
syncScanInFlight int64
)
func GetSyncScan(c *gin.Context) (response *Response[entity.FsFileInfoMap], err error) {
ctx := c.Request.Context()
if ctx == nil {
ctx = context.Background()
}
// Rate limiting for scan requests
if err := syncScanSemaphore.Acquire(ctx, 1); err != nil {
logger.Warnf("failed to acquire sync scan slot for id=%s path=%s: %v", c.Param("id"), c.Param("path"), err)
return GetErrorResponse[entity.FsFileInfoMap](errors.Annotate(err, "server overloaded, please retry"))
}
current := atomic.AddInt64(&syncScanInFlight, 1)
logger.Debugf("sync scan in-flight=%d id=%s path=%s", current, c.Param("id"), c.Param("path"))
defer func() {
newVal := atomic.AddInt64(&syncScanInFlight, -1)
logger.Debugf("sync scan completed in-flight=%d id=%s path=%s", newVal, c.Param("id"), c.Param("path"))
syncScanSemaphore.Release(1)
}()
workspacePath := utils.GetWorkspace()
dirPath := filepath.Join(workspacePath, c.Param("id"), c.Param("path"))
files, err := utils.ScanDirectory(dirPath)

View File

@@ -90,6 +90,7 @@ type GrpcClient struct {
modelBaseServiceClient grpc2.ModelBaseServiceClient
dependencyClient grpc2.DependencyServiceClient
metricClient grpc2.MetricServiceClient
syncClient grpc2.SyncServiceClient
// Add new fields for state management
state connectivity.State
@@ -218,6 +219,7 @@ func (c *GrpcClient) register() {
c.taskClient = grpc2.NewTaskServiceClient(c.conn)
c.dependencyClient = grpc2.NewDependencyServiceClient(c.conn)
c.metricClient = grpc2.NewMetricServiceClient(c.conn)
c.syncClient = grpc2.NewSyncServiceClient(c.conn)
c.healthClient = grpc_health_v1.NewHealthClient(c.conn)
// Enable health checks by default for new connections
@@ -498,6 +500,21 @@ func (c *GrpcClient) GetMetricClient() (grpc2.MetricServiceClient, error) {
return c.GetMetricClientWithContext(ctx)
}
func (c *GrpcClient) GetSyncClient() (grpc2.SyncServiceClient, error) {
// Use longer timeout during reconnection scenarios
timeout := defaultClientTimeout
c.reconnectMux.Lock()
if c.reconnecting {
timeout = reconnectionClientTimeout
}
c.reconnectMux.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return c.GetSyncClientWithContext(ctx)
}
// Safe client getters with timeout - these methods will wait up to the specified timeout
// for registration to complete before returning an error
@@ -648,6 +665,14 @@ func (c *GrpcClient) GetMetricClientWithContext(ctx context.Context) (grpc2.Metr
return client.(grpc2.MetricServiceClient), nil
}
func (c *GrpcClient) GetSyncClientWithContext(ctx context.Context) (grpc2.SyncServiceClient, error) {
client, err := c.getClientWithContext(ctx, func() interface{} { return c.syncClient }, "sync")
if err != nil {
return nil, err
}
return client.(grpc2.SyncServiceClient), nil
}
func (c *GrpcClient) getClientWithContext(ctx context.Context, getter func() interface{}, clientType string) (interface{}, error) {
if c.stopped {
return nil, fmt.Errorf("grpc client is stopped")

View File

@@ -32,6 +32,7 @@ type GrpcServer struct {
ModelBaseServiceSvr *ModelBaseServiceServer
DependencySvr *DependencyServiceServer
MetricSvr *MetricServiceServer
SyncSvr *SyncServiceServer
}
func (svr *GrpcServer) Init() {
@@ -89,6 +90,7 @@ func (svr *GrpcServer) register() {
grpc2.RegisterTaskServiceServer(svr.svr, svr.TaskSvr)
grpc2.RegisterDependencyServiceServer(svr.svr, svr.DependencySvr)
grpc2.RegisterMetricServiceServer(svr.svr, svr.MetricSvr)
grpc2.RegisterSyncServiceServer(svr.svr, svr.SyncSvr)
}
func (svr *GrpcServer) recoveryHandlerFunc(p interface{}) (err error) {
@@ -109,6 +111,7 @@ func newGrpcServer() *GrpcServer {
svr.TaskSvr = GetTaskServiceServer()
svr.DependencySvr = GetDependencyServer()
svr.MetricSvr = GetMetricsServer()
svr.SyncSvr = GetSyncServiceServer()
// recovery options
recoveryOpts := []grpcrecovery.Option{

View File

@@ -0,0 +1,296 @@
package server
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/crawlab-team/crawlab/core/utils"
grpc2 "github.com/crawlab-team/crawlab/grpc"
)
type SyncServiceServer struct {
grpc2.UnimplementedSyncServiceServer
*utils.Logger
// Request deduplication: key = spider_id:path
activeScans map[string]*activeScanState
activeScansMu sync.RWMutex
// Cache: avoid rescanning within TTL
scanCache map[string]*cachedScanResult
scanCacheMu sync.RWMutex
scanCacheTTL time.Duration
chunkSize int
}
type activeScanState struct {
inProgress bool
waitChan chan *cachedScanResult // Broadcast to waiting requests
subscribers int
}
type cachedScanResult struct {
files []*grpc2.FileInfo
timestamp time.Time
err error
}
func NewSyncServiceServer() *SyncServiceServer {
return &SyncServiceServer{
Logger: utils.NewLogger("SyncServiceServer"),
activeScans: make(map[string]*activeScanState),
scanCache: make(map[string]*cachedScanResult),
scanCacheTTL: 60 * time.Second, // Longer TTL for streaming
chunkSize: 100, // Files per chunk
}
}
// StreamFileScan streams file information to worker
func (s *SyncServiceServer) StreamFileScan(
req *grpc2.FileSyncRequest,
stream grpc2.SyncService_StreamFileScanServer,
) error {
cacheKey := req.SpiderId + ":" + req.Path
s.Debugf("file scan request from node %s for spider %s, path %s", req.NodeKey, req.SpiderId, req.Path)
// Check cache first
if result := s.getCachedScan(cacheKey); result != nil {
s.Debugf("returning cached scan for %s", cacheKey)
return s.streamCachedResult(stream, result)
}
// Deduplicate concurrent requests
result, err := s.getOrWaitForScan(cacheKey, func() (*cachedScanResult, error) {
return s.performScan(req)
})
if err != nil {
s.Errorf("scan failed for %s: %v", cacheKey, err)
return stream.Send(&grpc2.FileScanChunk{
IsComplete: true,
Error: err.Error(),
})
}
return s.streamCachedResult(stream, result)
}
// performScan does the actual directory scan
func (s *SyncServiceServer) performScan(req *grpc2.FileSyncRequest) (*cachedScanResult, error) {
workspacePath := utils.GetWorkspace()
dirPath := filepath.Join(workspacePath, req.SpiderId, req.Path)
s.Infof("performing directory scan for %s", dirPath)
// Use existing ScanDirectory which has singleflight and short-term cache
fileMap, err := utils.ScanDirectory(dirPath)
if err != nil {
return nil, fmt.Errorf("failed to scan directory: %w", err)
}
// Convert to protobuf format
files := make([]*grpc2.FileInfo, 0, len(fileMap))
for _, f := range fileMap {
files = append(files, &grpc2.FileInfo{
Name: f.Name,
Path: f.Path,
FullPath: f.FullPath,
Extension: f.Extension,
IsDir: f.IsDir,
FileSize: f.FileSize,
ModTime: f.ModTime.Unix(),
Mode: uint32(f.Mode),
Hash: f.Hash,
})
}
result := &cachedScanResult{
files: files,
timestamp: time.Now(),
}
// Cache the result
cacheKey := req.SpiderId + ":" + req.Path
s.scanCacheMu.Lock()
s.scanCache[cacheKey] = result
s.scanCacheMu.Unlock()
s.Infof("scanned %d files from %s", len(files), dirPath)
return result, nil
}
// streamCachedResult streams the cached result in chunks
func (s *SyncServiceServer) streamCachedResult(
stream grpc2.SyncService_StreamFileScanServer,
result *cachedScanResult,
) error {
totalFiles := len(result.files)
for i := 0; i < totalFiles; i += s.chunkSize {
end := i + s.chunkSize
if end > totalFiles {
end = totalFiles
}
chunk := &grpc2.FileScanChunk{
Files: result.files[i:end],
IsComplete: end >= totalFiles,
TotalFiles: int32(totalFiles),
}
if err := stream.Send(chunk); err != nil {
return fmt.Errorf("failed to send chunk: %w", err)
}
}
return nil
}
// getOrWaitForScan implements request deduplication
func (s *SyncServiceServer) getOrWaitForScan(
key string,
scanFunc func() (*cachedScanResult, error),
) (*cachedScanResult, error) {
s.activeScansMu.Lock()
state, exists := s.activeScans[key]
if exists && state.inProgress {
// Another request is already scanning, wait for it
state.subscribers++
waitChan := state.waitChan
s.activeScansMu.Unlock()
s.Debugf("waiting for ongoing scan: %s", key)
result := <-waitChan
return result, result.err
}
// We're the first request, start scanning
state = &activeScanState{
inProgress: true,
waitChan: make(chan *cachedScanResult, 10),
subscribers: 0,
}
s.activeScans[key] = state
s.activeScansMu.Unlock()
s.Debugf("initiating new scan: %s", key)
// Perform scan
result, err := scanFunc()
if err != nil {
result = &cachedScanResult{err: err}
}
// Broadcast to waiting requests
s.activeScansMu.Lock()
for i := 0; i < state.subscribers; i++ {
state.waitChan <- result
}
delete(s.activeScans, key)
close(state.waitChan)
s.activeScansMu.Unlock()
s.Debugf("scan complete for %s, notified %d subscribers", key, state.subscribers)
return result, err
}
func (s *SyncServiceServer) getCachedScan(key string) *cachedScanResult {
s.scanCacheMu.RLock()
defer s.scanCacheMu.RUnlock()
result, exists := s.scanCache[key]
if !exists {
return nil
}
// Check if cache expired
if time.Since(result.timestamp) > s.scanCacheTTL {
return nil
}
return result
}
// StreamFileDownload streams file content to worker
func (s *SyncServiceServer) StreamFileDownload(
req *grpc2.FileDownloadRequest,
stream grpc2.SyncService_StreamFileDownloadServer,
) error {
workspacePath := utils.GetWorkspace()
filePath := filepath.Join(workspacePath, req.SpiderId, req.Path)
s.Infof("streaming file download: %s", filePath)
// Open file
file, err := os.Open(filePath)
if err != nil {
return stream.Send(&grpc2.FileDownloadChunk{
IsComplete: true,
Error: fmt.Sprintf("failed to open file: %v", err),
})
}
defer file.Close()
// Get file size
fileInfo, err := file.Stat()
if err != nil {
return stream.Send(&grpc2.FileDownloadChunk{
IsComplete: true,
Error: fmt.Sprintf("failed to stat file: %v", err),
})
}
// Stream file in chunks
const bufferSize = 64 * 1024 // 64KB chunks
buffer := make([]byte, bufferSize)
totalBytes := fileInfo.Size()
bytesSent := int64(0)
for {
n, err := file.Read(buffer)
if n > 0 {
chunk := &grpc2.FileDownloadChunk{
Data: buffer[:n],
IsComplete: false,
TotalBytes: totalBytes,
}
if err := stream.Send(chunk); err != nil {
return fmt.Errorf("failed to send chunk: %w", err)
}
bytesSent += int64(n)
}
if err != nil {
if err.Error() == "EOF" {
// Send final chunk
return stream.Send(&grpc2.FileDownloadChunk{
IsComplete: true,
TotalBytes: totalBytes,
})
}
return stream.Send(&grpc2.FileDownloadChunk{
IsComplete: true,
Error: fmt.Sprintf("read error: %v", err),
})
}
}
}
var _syncServiceServer *SyncServiceServer
var _syncServiceServerOnce sync.Once
func GetSyncServiceServer() *SyncServiceServer {
_syncServiceServerOnce.Do(func() {
_syncServiceServer = NewSyncServiceServer()
})
return _syncServiceServer
}

View File

@@ -31,13 +31,22 @@ var (
jitterMutex sync.Mutex
)
// syncFiles synchronizes files between master and worker nodes:
// syncFiles synchronizes files between master and worker nodes.
// It switches between gRPC streaming and HTTP based on the feature flag.
func (r *Runner) syncFiles() (err error) {
if utils.IsSyncGrpcEnabled() {
return r.syncFilesGRPC()
}
return r.syncFilesHTTP()
}
// syncFilesHTTP synchronizes files using HTTP/JSON (legacy implementation):
// 1. Gets file list from master
// 2. Compares with local files
// 3. Downloads new/modified files
// 4. Deletes files that no longer exist on master
func (r *Runner) syncFiles() (err error) {
r.Infof("starting file synchronization for spider: %s", r.s.Id.Hex())
func (r *Runner) syncFilesHTTP() (err error) {
r.Infof("starting HTTP file synchronization for spider: %s", r.s.Id.Hex())
workingDir := ""
if !r.s.GitId.IsZero() {
@@ -56,6 +65,21 @@ func (r *Runner) syncFiles() (err error) {
return err
}
defer resp.Body.Close()
// Validate Content-Type to detect non-JSON responses early
contentType := resp.Header.Get("Content-Type")
if !strings.Contains(contentType, "application/json") {
r.Errorf("unexpected Content-Type: %s (expected application/json)", contentType)
r.Errorf("URL: %s, Status: %d", resp.Request.URL.String(), resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
if len(body) > 500 {
r.Errorf("Response preview: %s...", string(body[:500]))
} else {
r.Errorf("Response body: %s", string(body))
}
return fmt.Errorf("master returned non-JSON response (Content-Type: %s, Status: %d)", contentType, resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
r.Errorf("error reading response body: %v", err)
@@ -68,6 +92,12 @@ func (r *Runner) syncFiles() (err error) {
if err != nil {
r.Errorf("error unmarshaling JSON for URL: %s", resp.Request.URL.String())
r.Errorf("error details: %v", err)
r.Errorf("response body length: %d bytes", len(body))
if len(body) > 500 {
r.Errorf("response preview: %s...", string(body[:500]))
} else if len(body) > 0 {
r.Errorf("response body: %s", string(body))
}
return err
}

View File

@@ -0,0 +1,231 @@
package handler
import (
"context"
"fmt"
"io"
"os"
"time"
"github.com/crawlab-team/crawlab/core/entity"
client2 "github.com/crawlab-team/crawlab/core/grpc/client"
"github.com/crawlab-team/crawlab/core/utils"
grpc2 "github.com/crawlab-team/crawlab/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// syncFilesGRPC replaces HTTP-based syncFiles() with gRPC streaming
func (r *Runner) syncFilesGRPC() (err error) {
r.Infof("starting gRPC file synchronization for spider: %s", r.s.Id.Hex())
workingDir := ""
if !r.s.GitId.IsZero() {
workingDir = r.s.GitRootPath
r.Debugf("using git root path: %s", workingDir)
}
// Get sync service client
syncClient, err := client2.GetGrpcClient().GetSyncClient()
if err != nil {
r.Errorf("failed to get sync client: %v", err)
return err
}
// Prepare request
req := &grpc2.FileSyncRequest{
SpiderId: r.s.Id.Hex(),
Path: workingDir,
NodeKey: utils.GetNodeKey(),
}
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Stream file list from master
r.Infof("fetching file list from master via gRPC")
stream, err := syncClient.StreamFileScan(ctx, req)
if err != nil {
r.Errorf("failed to start file scan stream: %v", err)
return err
}
// Receive file list in chunks
masterFilesMap := make(entity.FsFileInfoMap)
totalFiles := 0
for {
chunk, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
// Check for gRPC-specific errors
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unavailable {
r.Errorf("gRPC service unavailable, will retry: %v", err)
return fmt.Errorf("gRPC service unavailable: %w", err)
}
}
r.Errorf("error receiving file scan chunk: %v", err)
return err
}
// Check for error in chunk
if chunk.Error != "" {
r.Errorf("server error during file scan: %s", chunk.Error)
return fmt.Errorf("server error: %s", chunk.Error)
}
// Process files in chunk
for _, fileInfo := range chunk.Files {
fsFileInfo := entity.FsFileInfo{
Name: fileInfo.Name,
Path: fileInfo.Path,
FullPath: fileInfo.FullPath,
Extension: fileInfo.Extension,
IsDir: fileInfo.IsDir,
FileSize: fileInfo.FileSize,
ModTime: time.Unix(fileInfo.ModTime, 0),
Mode: os.FileMode(fileInfo.Mode),
Hash: fileInfo.Hash,
}
masterFilesMap[fileInfo.Path] = fsFileInfo
}
if chunk.IsComplete {
totalFiles = int(chunk.TotalFiles)
r.Infof("received complete file list: %d files", totalFiles)
break
}
}
// Create working directory if not exists
if _, err := os.Stat(r.cwd); os.IsNotExist(err) {
if err := os.MkdirAll(r.cwd, os.ModePerm); err != nil {
r.Errorf("error creating worker directory: %v", err)
return err
}
}
// Get file list from worker
workerFiles, err := utils.ScanDirectory(r.cwd)
if err != nil {
r.Errorf("error scanning worker directory: %v", err)
return err
}
// Delete files that are deleted on master node
for path, workerFile := range workerFiles {
if _, exists := masterFilesMap[path]; !exists {
r.Infof("deleting file: %s", path)
err := os.Remove(workerFile.FullPath)
if err != nil {
r.Errorf("error deleting file: %v", err)
return err
}
}
}
// Download new or modified files
downloadCount := 0
for path, masterFile := range masterFilesMap {
// Skip directories
if masterFile.IsDir {
continue
}
workerFile, exists := workerFiles[path]
needsDownload := false
if !exists {
r.Debugf("file not found locally: %s", path)
needsDownload = true
} else if workerFile.Hash != masterFile.Hash {
r.Debugf("file hash mismatch: %s (local: %s, master: %s)", path, workerFile.Hash, masterFile.Hash)
needsDownload = true
}
if needsDownload {
if err := r.downloadFileGRPC(syncClient, r.s.Id.Hex(), path); err != nil {
r.Errorf("error downloading file %s: %v", path, err)
return err
}
downloadCount++
}
}
r.Infof("file synchronization complete: %d files downloaded", downloadCount)
return nil
}
// downloadFileGRPC downloads a single file from master via gRPC streaming
func (r *Runner) downloadFileGRPC(client grpc2.SyncServiceClient, spiderId, path string) error {
r.Debugf("downloading file via gRPC: %s", path)
// Prepare request
req := &grpc2.FileDownloadRequest{
SpiderId: spiderId,
Path: path,
NodeKey: utils.GetNodeKey(),
}
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
// Stream file download
stream, err := client.StreamFileDownload(ctx, req)
if err != nil {
return fmt.Errorf("failed to start download stream: %w", err)
}
// Create target file
targetPath := fmt.Sprintf("%s/%s", r.cwd, path)
// Create directory if not exists
targetDir := targetPath[:len(targetPath)-len(path)]
if err := os.MkdirAll(targetDir, os.ModePerm); err != nil {
return fmt.Errorf("failed to create directory: %w", err)
}
file, err := os.Create(targetPath)
if err != nil {
return fmt.Errorf("failed to create file: %w", err)
}
defer file.Close()
// Receive file content in chunks
bytesReceived := int64(0)
for {
chunk, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("error receiving download chunk: %w", err)
}
// Check for error in chunk
if chunk.Error != "" {
return fmt.Errorf("server error during download: %s", chunk.Error)
}
// Write chunk data
if len(chunk.Data) > 0 {
n, err := file.Write(chunk.Data)
if err != nil {
return fmt.Errorf("error writing file: %w", err)
}
bytesReceived += int64(n)
}
if chunk.IsComplete {
r.Debugf("download complete: %s (%d bytes)", path, bytesReceived)
break
}
}
return nil
}

View File

@@ -348,3 +348,7 @@ func GetMinFileDescriptorLimit() uint64 {
}
return DefaultMinFileDescriptorLimit
}
func IsSyncGrpcEnabled() bool {
return viper.GetBool("sync.useGrpc")
}