mirror of
https://github.com/crawlab-team/crawlab.git
synced 2026-01-21 17:21:09 +01:00
refactor: improve schedule service initialization, validation, and error handling
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
package schedule
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/crawlab-team/crawlab/core/interfaces"
|
||||
"github.com/crawlab-team/crawlab/core/models/models"
|
||||
"github.com/crawlab-team/crawlab/core/models/service"
|
||||
@@ -9,8 +13,6 @@ import (
|
||||
"github.com/robfig/cron/v3"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
@@ -20,8 +22,6 @@ type Service struct {
|
||||
|
||||
// settings variables
|
||||
loc *time.Location
|
||||
delay bool
|
||||
skip bool
|
||||
updateInterval time.Duration
|
||||
|
||||
// internals
|
||||
@@ -33,30 +33,6 @@ type Service struct {
|
||||
interfaces.Logger
|
||||
}
|
||||
|
||||
func (svc *Service) GetLocation() (loc *time.Location) {
|
||||
return svc.loc
|
||||
}
|
||||
|
||||
func (svc *Service) SetLocation(loc *time.Location) {
|
||||
svc.loc = loc
|
||||
}
|
||||
|
||||
func (svc *Service) GetDelay() (delay bool) {
|
||||
return svc.delay
|
||||
}
|
||||
|
||||
func (svc *Service) SetDelay(delay bool) {
|
||||
svc.delay = delay
|
||||
}
|
||||
|
||||
func (svc *Service) GetSkip() (skip bool) {
|
||||
return svc.skip
|
||||
}
|
||||
|
||||
func (svc *Service) SetSkip(skip bool) {
|
||||
svc.skip = skip
|
||||
}
|
||||
|
||||
func (svc *Service) GetUpdateInterval() (interval time.Duration) {
|
||||
return svc.updateInterval
|
||||
}
|
||||
@@ -66,17 +42,53 @@ func (svc *Service) SetUpdateInterval(interval time.Duration) {
|
||||
}
|
||||
|
||||
func (svc *Service) Init() (err error) {
|
||||
// Validate dependencies
|
||||
if svc.modelSvc == nil {
|
||||
return errors.New("model service is not initialized")
|
||||
}
|
||||
if svc.adminSvc == nil {
|
||||
return errors.New("admin service is not initialized")
|
||||
}
|
||||
if svc.cron == nil {
|
||||
return errors.New("cron service is not initialized")
|
||||
}
|
||||
|
||||
// Fetch and validate existing schedules
|
||||
err = svc.fetch()
|
||||
if err != nil {
|
||||
svc.Fatalf("failed to initialize schedule service: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate and enable existing schedules
|
||||
for _, s := range svc.schedules {
|
||||
if s.Enabled {
|
||||
// Validate cron expression
|
||||
if _, err := cron.ParseStandard(s.Cron); err != nil {
|
||||
svc.Errorf("invalid cron expression for schedule %s: %v", s.Id.Hex(), err)
|
||||
// Disable invalid schedules
|
||||
if disableErr := svc.Disable(s, s.GetUpdatedBy()); disableErr != nil {
|
||||
svc.Errorf("failed to disable invalid schedule %s: %v", s.Id.Hex(), disableErr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Add to cron
|
||||
if err := svc.Enable(s, s.GetUpdatedBy()); err != nil {
|
||||
svc.Errorf("failed to enable schedule %s during initialization: %v", s.Id.Hex(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
svc.Infof("initialized schedule service with %d enabled schedules", len(svc.schedules))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *Service) Start() {
|
||||
svc.Infof("starting schedule service")
|
||||
svc.cron.Start()
|
||||
go svc.Update()
|
||||
svc.Infof("schedule service started successfully")
|
||||
}
|
||||
|
||||
func (svc *Service) Wait() {
|
||||
@@ -93,6 +105,11 @@ func (svc *Service) Enable(s models.Schedule, by primitive.ObjectID) (err error)
|
||||
svc.mu.Lock()
|
||||
defer svc.mu.Unlock()
|
||||
|
||||
// Validate cron expression
|
||||
if _, err := cron.ParseStandard(s.Cron); err != nil {
|
||||
return errors.New("invalid cron expression: " + err.Error())
|
||||
}
|
||||
|
||||
id, err := svc.cron.AddFunc(s.Cron, svc.schedule(s.Id))
|
||||
if err != nil {
|
||||
svc.Errorf("failed to add cron job: %v", err)
|
||||
@@ -108,11 +125,22 @@ func (svc *Service) Disable(s models.Schedule, by primitive.ObjectID) (err error
|
||||
svc.mu.Lock()
|
||||
defer svc.mu.Unlock()
|
||||
|
||||
svc.cron.Remove(s.EntryId)
|
||||
// Store the current entry ID before modifying
|
||||
entryId := s.EntryId
|
||||
|
||||
// First update the database
|
||||
s.Enabled = false
|
||||
s.EntryId = -1
|
||||
s.SetUpdated(by)
|
||||
return svc.modelSvc.ReplaceById(s.Id, s)
|
||||
if err := svc.modelSvc.ReplaceById(s.Id, s); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only remove from cron after successful database update using the stored entry ID
|
||||
if entryId != -1 {
|
||||
svc.cron.Remove(entryId)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *Service) Update() {
|
||||
@@ -131,7 +159,70 @@ func (svc *Service) GetCron() (c *cron.Cron) {
|
||||
return svc.cron
|
||||
}
|
||||
|
||||
// GetScheduleCount returns the number of enabled schedules
|
||||
func (svc *Service) GetScheduleCount() int {
|
||||
return len(svc.schedules)
|
||||
}
|
||||
|
||||
// GetCronEntryCount returns the number of active cron entries
|
||||
func (svc *Service) GetCronEntryCount() int {
|
||||
return len(svc.cron.Entries())
|
||||
}
|
||||
|
||||
// IsHealthy performs a health check on the schedule service
|
||||
func (svc *Service) IsHealthy() bool {
|
||||
// Check if cron is running
|
||||
if svc.cron == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if service is not stopped
|
||||
if svc.stopped {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if we can fetch schedules from database
|
||||
if err := svc.fetch(); err != nil {
|
||||
svc.Errorf("health check failed: cannot fetch schedules: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// GetHealthStatus returns detailed health information
|
||||
func (svc *Service) GetHealthStatus() map[string]interface{} {
|
||||
status := map[string]interface{}{
|
||||
"healthy": svc.IsHealthy(),
|
||||
"stopped": svc.stopped,
|
||||
"schedule_count": svc.GetScheduleCount(),
|
||||
"cron_entry_count": svc.GetCronEntryCount(),
|
||||
"update_interval": svc.updateInterval.String(),
|
||||
"location": svc.loc.String(),
|
||||
}
|
||||
|
||||
// Add cron entries info
|
||||
entries := make([]map[string]interface{}, 0)
|
||||
for _, entry := range svc.cron.Entries() {
|
||||
entries = append(entries, map[string]interface{}{
|
||||
"id": entry.ID,
|
||||
"next": entry.Next.Format(time.RFC3339),
|
||||
"prev": entry.Prev.Format(time.RFC3339),
|
||||
})
|
||||
}
|
||||
status["cron_entries"] = entries
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
func (svc *Service) update() {
|
||||
// Add recovery mechanism
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
svc.Errorf("panic in schedule update: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
// fetch enabled schedules
|
||||
if err := svc.fetch(); err != nil {
|
||||
svc.Errorf("failed to fetch schedules: %v", err)
|
||||
@@ -144,16 +235,19 @@ func (svc *Service) update() {
|
||||
// iterate enabled schedules
|
||||
for _, s := range svc.schedules {
|
||||
_, ok := entryIdsMap[s.EntryId]
|
||||
if ok {
|
||||
entryIdsMap[s.EntryId] = true
|
||||
} else {
|
||||
if !s.Enabled {
|
||||
err := svc.Enable(s, s.GetCreatedBy())
|
||||
if !ok {
|
||||
// Schedule is enabled but not in cron, add it
|
||||
if s.Enabled {
|
||||
// Add retry mechanism for enabling schedules
|
||||
err := svc.enableWithRetry(s, s.GetCreatedBy(), 3)
|
||||
if err != nil {
|
||||
svc.Errorf("failed to enable schedule: %v", err)
|
||||
svc.Errorf("failed to enable schedule after retries: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Mark as found
|
||||
entryIdsMap[s.EntryId] = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,6 +259,21 @@ func (svc *Service) update() {
|
||||
}
|
||||
}
|
||||
|
||||
// enableWithRetry attempts to enable a schedule with retry logic
|
||||
func (svc *Service) enableWithRetry(s models.Schedule, by primitive.ObjectID, maxRetries int) error {
|
||||
var lastErr error
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
if err := svc.Enable(s, by); err != nil {
|
||||
lastErr = err
|
||||
svc.Warnf("failed to enable schedule (attempt %d/%d): %v", i+1, maxRetries, err)
|
||||
time.Sleep(time.Duration(i+1) * time.Second) // exponential backoff
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func (svc *Service) getEntryIdsMap() (res map[cron.EntryID]bool) {
|
||||
res = map[cron.EntryID]bool{}
|
||||
for _, e := range svc.cron.Entries() {
|
||||
@@ -186,6 +295,17 @@ func (svc *Service) fetch() (err error) {
|
||||
|
||||
func (svc *Service) schedule(id primitive.ObjectID) (fn func()) {
|
||||
return func() {
|
||||
// Add recovery mechanism for individual schedule executions
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
svc.Errorf("panic in schedule execution for %s: %v", id.Hex(), r)
|
||||
}
|
||||
}()
|
||||
|
||||
// Add execution logging
|
||||
svc.Infof("executing schedule: %s", id.Hex())
|
||||
startTime := time.Now()
|
||||
|
||||
// schedule
|
||||
s, err := svc.modelSvc.GetById(id)
|
||||
if err != nil {
|
||||
@@ -193,6 +313,18 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) {
|
||||
return
|
||||
}
|
||||
|
||||
// Verify schedule still exists and is enabled
|
||||
if s == nil || !s.Enabled {
|
||||
svc.Warnf("schedule %s no longer exists or is disabled, removing from cron", id.Hex())
|
||||
// Use a goroutine to avoid potential deadlock when removing from within cron execution
|
||||
go func() {
|
||||
svc.mu.Lock()
|
||||
defer svc.mu.Unlock()
|
||||
svc.cron.Remove(s.EntryId)
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
// spider
|
||||
spider, err := service.NewModelService[models.Spider]().GetById(s.SpiderId)
|
||||
if err != nil {
|
||||
@@ -200,6 +332,12 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) {
|
||||
return
|
||||
}
|
||||
|
||||
// Verify spider still exists
|
||||
if spider == nil {
|
||||
svc.Errorf("spider %s no longer exists", s.SpiderId.Hex())
|
||||
return
|
||||
}
|
||||
|
||||
// options
|
||||
opts := &interfaces.SpiderRunOptions{
|
||||
Mode: s.Mode,
|
||||
@@ -233,20 +371,22 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) {
|
||||
}
|
||||
|
||||
// schedule or assign a task in the task queue
|
||||
if _, err := svc.adminSvc.Schedule(s.SpiderId, opts); err != nil {
|
||||
taskIds, err := svc.adminSvc.Schedule(s.SpiderId, opts)
|
||||
if err != nil {
|
||||
svc.Errorf("failed to schedule spider: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Log successful execution
|
||||
duration := time.Since(startTime)
|
||||
svc.Infof("successfully executed schedule %s, created %d tasks in %v", id.Hex(), len(taskIds), duration)
|
||||
}
|
||||
}
|
||||
|
||||
func newScheduleService() *Service {
|
||||
// service
|
||||
svc := &Service{
|
||||
loc: time.Local,
|
||||
// TODO: implement delay and skip
|
||||
delay: false,
|
||||
skip: false,
|
||||
loc: time.Local,
|
||||
updateInterval: 1 * time.Minute,
|
||||
adminSvc: admin.GetSpiderAdminService(),
|
||||
modelSvc: service.NewModelService[models.Schedule](),
|
||||
|
||||
Reference in New Issue
Block a user