From b9421d388c24775164d29c6cd30d48002cf48eaf Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Fri, 3 Jul 2026 13:07:05 +0700 Subject: [PATCH] feat(scheduler): 30s polling scheduler with pure NextRun/dueTaskIDs --- cmd/server/main.go | 3 ++ internal/scheduler/scheduler.go | 75 ++++++++++++++++++++++++++++ internal/scheduler/scheduler_test.go | 38 ++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 internal/scheduler/scheduler.go create mode 100644 internal/scheduler/scheduler_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 3ee890f..dc83fe0 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -13,6 +13,7 @@ import ( "github.com/vasyansk/imap-copier/internal/config" "github.com/vasyansk/imap-copier/internal/httpapi" "github.com/vasyansk/imap-copier/internal/orchestrator" + "github.com/vasyansk/imap-copier/internal/scheduler" "github.com/vasyansk/imap-copier/internal/store" "github.com/vasyansk/imap-copier/internal/wshub" ) @@ -45,6 +46,8 @@ func main() { orch := orchestrator.New(st, hub, cfg.EncKey, cfg.WorkerConcurrency) srv := httpapi.NewServer(cfg, st, orch, hub) + go scheduler.New(st, orch).Start(context.Background()) + slog.Info("listening", "addr", cfg.HTTPAddr) if err := http.ListenAndServe(cfg.HTTPAddr, srv.Router()); err != nil { slog.Error("serve", "err", err) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go new file mode 100644 index 0000000..adf76f9 --- /dev/null +++ b/internal/scheduler/scheduler.go @@ -0,0 +1,75 @@ +// Package scheduler runs tasks on their recurring interval. A single goroutine +// polls the DB every pollInterval and triggers due tasks through the +// orchestrator. The DB is the source of truth, so the scheduler holds no state +// and is safe across restarts. +package scheduler + +import ( + "context" + "log/slog" + "time" + + "github.com/vasyansk/imap-copier/internal/orchestrator" + "github.com/vasyansk/imap-copier/internal/store" +) + +const pollInterval = 30 * time.Second + +// NextRun is when a task should next run: one interval after its last completed +// run, or one interval after the schedule anchor if it has never completed one. +func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time { + base := anchor + if lastFinished != nil { + base = *lastFinished + } + return base.Add(interval) +} + +// dueTaskIDs returns the IDs of tasks whose next run is at or before now. +func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64 { + var ids []int64 + for _, t := range tasks { + next := NextRun(time.Duration(t.IntervalSeconds)*time.Second, t.Anchor, t.LastFinished) + if !now.Before(next) { + ids = append(ids, t.ID) + } + } + return ids +} + +type Scheduler struct { + store *store.Store + orch *orchestrator.Orchestrator +} + +func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler { + return &Scheduler{store: st, orch: orch} +} + +// Start blocks, ticking until ctx is cancelled. Run it in a goroutine. +func (s *Scheduler) Start(ctx context.Context) { + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.tick(ctx) + } + } +} + +func (s *Scheduler) tick(ctx context.Context) { + tasks, err := s.store.ListSchedulableTasks(ctx) + if err != nil { + slog.Error("scheduler: list schedulable", "err", err) + return + } + for _, id := range dueTaskIDs(tasks, time.Now()) { + if _, err := s.orch.Run(ctx, id, "scheduled"); err != nil { + // ErrAlreadyRunning / ErrNotTested are expected races/edge cases, not fatal. + slog.Info("scheduler: run skipped", "task", id, "err", err) + } + } +} diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go new file mode 100644 index 0000000..dd34e80 --- /dev/null +++ b/internal/scheduler/scheduler_test.go @@ -0,0 +1,38 @@ +package scheduler + +import ( + "testing" + "time" + + "github.com/vasyansk/imap-copier/internal/store" +) + +func TestNextRun(t *testing.T) { + anchor := time.Date(2026, 7, 3, 10, 0, 0, 0, time.UTC) + fin := time.Date(2026, 7, 3, 11, 0, 0, 0, time.UTC) + + // No finished run yet → anchor + interval. + if got := NextRun(time.Hour, anchor, nil); !got.Equal(anchor.Add(time.Hour)) { + t.Fatalf("first run: got %v", got) + } + // Finished run exists → last finished + interval (from completion, not start). + if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) { + t.Fatalf("recurring: got %v", got) + } +} + +func TestDueTaskIDs(t *testing.T) { + now := time.Date(2026, 7, 3, 12, 0, 0, 0, time.UTC) + anchorOld := now.Add(-2 * time.Hour) // enabled 2h ago + finRecent := now.Add(-30 * time.Minute) + + tasks := []store.SchedulableTask{ + {ID: 1, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: nil}, // due: anchor+1h < now + {ID: 2, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: &finRecent}, // not due: fin+1h > now + {ID: 3, IntervalSeconds: 21600, Anchor: anchorOld, LastFinished: nil}, // not due: anchor+6h > now + } + ids := dueTaskIDs(tasks, now) + if len(ids) != 1 || ids[0] != 1 { + t.Fatalf("due IDs = %v, want [1]", ids) + } +}