// Package scheduler runs tasks on their recurring interval. A single goroutine // polls the DB every pollInterval and triggers due tasks through the // orchestrator. The DB is the source of truth, so the scheduler holds no state // and is safe across restarts. package scheduler import ( "context" "log/slog" "time" "github.com/vasyansk/imap-copier/internal/orchestrator" "github.com/vasyansk/imap-copier/internal/store" ) const pollInterval = 30 * time.Second // NextRun is when a task should next run: one interval after its last completed // run, or one interval after the schedule anchor if it has never completed one. func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time { base := anchor if lastFinished != nil && lastFinished.After(anchor) { base = *lastFinished } return base.Add(interval) } // dueTaskIDs returns the IDs of tasks whose next run is at or before now. func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64 { var ids []int64 for _, t := range tasks { next := NextRun(time.Duration(t.IntervalSeconds)*time.Second, t.Anchor, t.LastFinished) if !now.Before(next) { ids = append(ids, t.ID) } } return ids } type Scheduler struct { store *store.Store orch *orchestrator.Orchestrator } func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler { return &Scheduler{store: st, orch: orch} } // Start blocks, ticking until ctx is cancelled. Run it in a goroutine. func (s *Scheduler) Start(ctx context.Context) { ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.tick(ctx) } } } func (s *Scheduler) tick(ctx context.Context) { tasks, err := s.store.ListSchedulableTasks(ctx) if err != nil { slog.Error("scheduler: list schedulable", "err", err) return } for _, id := range dueTaskIDs(tasks, time.Now()) { if _, err := s.orch.Run(ctx, id, "scheduled"); err != nil { // ErrAlreadyRunning / ErrNotTested are expected races/edge cases, not fatal. slog.Info("scheduler: run skipped", "task", id, "err", err) } } }