diff --git a/core/schedule/service.go b/core/schedule/service.go index b93d832e..dfecd3da 100644 --- a/core/schedule/service.go +++ b/core/schedule/service.go @@ -1,6 +1,10 @@ package schedule import ( + "errors" + "sync" + "time" + "github.com/crawlab-team/crawlab/core/interfaces" "github.com/crawlab-team/crawlab/core/models/models" "github.com/crawlab-team/crawlab/core/models/service" @@ -9,8 +13,6 @@ import ( "github.com/robfig/cron/v3" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" - "sync" - "time" ) type Service struct { @@ -20,8 +22,6 @@ type Service struct { // settings variables loc *time.Location - delay bool - skip bool updateInterval time.Duration // internals @@ -33,30 +33,6 @@ type Service struct { interfaces.Logger } -func (svc *Service) GetLocation() (loc *time.Location) { - return svc.loc -} - -func (svc *Service) SetLocation(loc *time.Location) { - svc.loc = loc -} - -func (svc *Service) GetDelay() (delay bool) { - return svc.delay -} - -func (svc *Service) SetDelay(delay bool) { - svc.delay = delay -} - -func (svc *Service) GetSkip() (skip bool) { - return svc.skip -} - -func (svc *Service) SetSkip(skip bool) { - svc.skip = skip -} - func (svc *Service) GetUpdateInterval() (interval time.Duration) { return svc.updateInterval } @@ -66,17 +42,53 @@ func (svc *Service) SetUpdateInterval(interval time.Duration) { } func (svc *Service) Init() (err error) { + // Validate dependencies + if svc.modelSvc == nil { + return errors.New("model service is not initialized") + } + if svc.adminSvc == nil { + return errors.New("admin service is not initialized") + } + if svc.cron == nil { + return errors.New("cron service is not initialized") + } + + // Fetch and validate existing schedules err = svc.fetch() if err != nil { svc.Fatalf("failed to initialize schedule service: %v", err) return err } + + // Validate and enable existing schedules + for _, s := range svc.schedules { + if s.Enabled { + // Validate cron expression + if _, err := cron.ParseStandard(s.Cron); err != nil { + svc.Errorf("invalid cron expression for schedule %s: %v", s.Id.Hex(), err) + // Disable invalid schedules + if disableErr := svc.Disable(s, s.GetUpdatedBy()); disableErr != nil { + svc.Errorf("failed to disable invalid schedule %s: %v", s.Id.Hex(), disableErr) + } + continue + } + + // Add to cron + if err := svc.Enable(s, s.GetUpdatedBy()); err != nil { + svc.Errorf("failed to enable schedule %s during initialization: %v", s.Id.Hex(), err) + } + } + } + + svc.Infof("initialized schedule service with %d enabled schedules", len(svc.schedules)) return nil } func (svc *Service) Start() { + svc.Infof("starting schedule service") svc.cron.Start() go svc.Update() + svc.Infof("schedule service started successfully") } func (svc *Service) Wait() { @@ -93,6 +105,11 @@ func (svc *Service) Enable(s models.Schedule, by primitive.ObjectID) (err error) svc.mu.Lock() defer svc.mu.Unlock() + // Validate cron expression + if _, err := cron.ParseStandard(s.Cron); err != nil { + return errors.New("invalid cron expression: " + err.Error()) + } + id, err := svc.cron.AddFunc(s.Cron, svc.schedule(s.Id)) if err != nil { svc.Errorf("failed to add cron job: %v", err) @@ -108,11 +125,22 @@ func (svc *Service) Disable(s models.Schedule, by primitive.ObjectID) (err error svc.mu.Lock() defer svc.mu.Unlock() - svc.cron.Remove(s.EntryId) + // Store the current entry ID before modifying + entryId := s.EntryId + + // First update the database s.Enabled = false s.EntryId = -1 s.SetUpdated(by) - return svc.modelSvc.ReplaceById(s.Id, s) + if err := svc.modelSvc.ReplaceById(s.Id, s); err != nil { + return err + } + + // Only remove from cron after successful database update using the stored entry ID + if entryId != -1 { + svc.cron.Remove(entryId) + } + return nil } func (svc *Service) Update() { @@ -131,7 +159,70 @@ func (svc *Service) GetCron() (c *cron.Cron) { return svc.cron } +// GetScheduleCount returns the number of enabled schedules +func (svc *Service) GetScheduleCount() int { + return len(svc.schedules) +} + +// GetCronEntryCount returns the number of active cron entries +func (svc *Service) GetCronEntryCount() int { + return len(svc.cron.Entries()) +} + +// IsHealthy performs a health check on the schedule service +func (svc *Service) IsHealthy() bool { + // Check if cron is running + if svc.cron == nil { + return false + } + + // Check if service is not stopped + if svc.stopped { + return false + } + + // Check if we can fetch schedules from database + if err := svc.fetch(); err != nil { + svc.Errorf("health check failed: cannot fetch schedules: %v", err) + return false + } + + return true +} + +// GetHealthStatus returns detailed health information +func (svc *Service) GetHealthStatus() map[string]interface{} { + status := map[string]interface{}{ + "healthy": svc.IsHealthy(), + "stopped": svc.stopped, + "schedule_count": svc.GetScheduleCount(), + "cron_entry_count": svc.GetCronEntryCount(), + "update_interval": svc.updateInterval.String(), + "location": svc.loc.String(), + } + + // Add cron entries info + entries := make([]map[string]interface{}, 0) + for _, entry := range svc.cron.Entries() { + entries = append(entries, map[string]interface{}{ + "id": entry.ID, + "next": entry.Next.Format(time.RFC3339), + "prev": entry.Prev.Format(time.RFC3339), + }) + } + status["cron_entries"] = entries + + return status +} + func (svc *Service) update() { + // Add recovery mechanism + defer func() { + if r := recover(); r != nil { + svc.Errorf("panic in schedule update: %v", r) + } + }() + // fetch enabled schedules if err := svc.fetch(); err != nil { svc.Errorf("failed to fetch schedules: %v", err) @@ -144,16 +235,19 @@ func (svc *Service) update() { // iterate enabled schedules for _, s := range svc.schedules { _, ok := entryIdsMap[s.EntryId] - if ok { - entryIdsMap[s.EntryId] = true - } else { - if !s.Enabled { - err := svc.Enable(s, s.GetCreatedBy()) + if !ok { + // Schedule is enabled but not in cron, add it + if s.Enabled { + // Add retry mechanism for enabling schedules + err := svc.enableWithRetry(s, s.GetCreatedBy(), 3) if err != nil { - svc.Errorf("failed to enable schedule: %v", err) + svc.Errorf("failed to enable schedule after retries: %v", err) continue } } + } else { + // Mark as found + entryIdsMap[s.EntryId] = true } } @@ -165,6 +259,21 @@ func (svc *Service) update() { } } +// enableWithRetry attempts to enable a schedule with retry logic +func (svc *Service) enableWithRetry(s models.Schedule, by primitive.ObjectID, maxRetries int) error { + var lastErr error + for i := 0; i < maxRetries; i++ { + if err := svc.Enable(s, by); err != nil { + lastErr = err + svc.Warnf("failed to enable schedule (attempt %d/%d): %v", i+1, maxRetries, err) + time.Sleep(time.Duration(i+1) * time.Second) // exponential backoff + continue + } + return nil + } + return lastErr +} + func (svc *Service) getEntryIdsMap() (res map[cron.EntryID]bool) { res = map[cron.EntryID]bool{} for _, e := range svc.cron.Entries() { @@ -186,6 +295,17 @@ func (svc *Service) fetch() (err error) { func (svc *Service) schedule(id primitive.ObjectID) (fn func()) { return func() { + // Add recovery mechanism for individual schedule executions + defer func() { + if r := recover(); r != nil { + svc.Errorf("panic in schedule execution for %s: %v", id.Hex(), r) + } + }() + + // Add execution logging + svc.Infof("executing schedule: %s", id.Hex()) + startTime := time.Now() + // schedule s, err := svc.modelSvc.GetById(id) if err != nil { @@ -193,6 +313,18 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) { return } + // Verify schedule still exists and is enabled + if s == nil || !s.Enabled { + svc.Warnf("schedule %s no longer exists or is disabled, removing from cron", id.Hex()) + // Use a goroutine to avoid potential deadlock when removing from within cron execution + go func() { + svc.mu.Lock() + defer svc.mu.Unlock() + svc.cron.Remove(s.EntryId) + }() + return + } + // spider spider, err := service.NewModelService[models.Spider]().GetById(s.SpiderId) if err != nil { @@ -200,6 +332,12 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) { return } + // Verify spider still exists + if spider == nil { + svc.Errorf("spider %s no longer exists", s.SpiderId.Hex()) + return + } + // options opts := &interfaces.SpiderRunOptions{ Mode: s.Mode, @@ -233,20 +371,22 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) { } // schedule or assign a task in the task queue - if _, err := svc.adminSvc.Schedule(s.SpiderId, opts); err != nil { + taskIds, err := svc.adminSvc.Schedule(s.SpiderId, opts) + if err != nil { svc.Errorf("failed to schedule spider: %v", err) return } + + // Log successful execution + duration := time.Since(startTime) + svc.Infof("successfully executed schedule %s, created %d tasks in %v", id.Hex(), len(taskIds), duration) } } func newScheduleService() *Service { // service svc := &Service{ - loc: time.Local, - // TODO: implement delay and skip - delay: false, - skip: false, + loc: time.Local, updateInterval: 1 * time.Minute, adminSvc: admin.GetSpiderAdminService(), modelSvc: service.NewModelService[models.Schedule](),