feat(scheduler): 30s polling scheduler with pure NextRun/dueTaskIDs
This commit is contained in:
@@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/vasyansk/imap-copier/internal/config"
|
"github.com/vasyansk/imap-copier/internal/config"
|
||||||
"github.com/vasyansk/imap-copier/internal/httpapi"
|
"github.com/vasyansk/imap-copier/internal/httpapi"
|
||||||
"github.com/vasyansk/imap-copier/internal/orchestrator"
|
"github.com/vasyansk/imap-copier/internal/orchestrator"
|
||||||
|
"github.com/vasyansk/imap-copier/internal/scheduler"
|
||||||
"github.com/vasyansk/imap-copier/internal/store"
|
"github.com/vasyansk/imap-copier/internal/store"
|
||||||
"github.com/vasyansk/imap-copier/internal/wshub"
|
"github.com/vasyansk/imap-copier/internal/wshub"
|
||||||
)
|
)
|
||||||
@@ -45,6 +46,8 @@ func main() {
|
|||||||
orch := orchestrator.New(st, hub, cfg.EncKey, cfg.WorkerConcurrency)
|
orch := orchestrator.New(st, hub, cfg.EncKey, cfg.WorkerConcurrency)
|
||||||
srv := httpapi.NewServer(cfg, st, orch, hub)
|
srv := httpapi.NewServer(cfg, st, orch, hub)
|
||||||
|
|
||||||
|
go scheduler.New(st, orch).Start(context.Background())
|
||||||
|
|
||||||
slog.Info("listening", "addr", cfg.HTTPAddr)
|
slog.Info("listening", "addr", cfg.HTTPAddr)
|
||||||
if err := http.ListenAndServe(cfg.HTTPAddr, srv.Router()); err != nil {
|
if err := http.ListenAndServe(cfg.HTTPAddr, srv.Router()); err != nil {
|
||||||
slog.Error("serve", "err", err)
|
slog.Error("serve", "err", err)
|
||||||
|
|||||||
@@ -0,0 +1,75 @@
|
|||||||
|
// Package scheduler runs tasks on their recurring interval. A single goroutine
|
||||||
|
// polls the DB every pollInterval and triggers due tasks through the
|
||||||
|
// orchestrator. The DB is the source of truth, so the scheduler holds no state
|
||||||
|
// and is safe across restarts.
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/vasyansk/imap-copier/internal/orchestrator"
|
||||||
|
"github.com/vasyansk/imap-copier/internal/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
const pollInterval = 30 * time.Second
|
||||||
|
|
||||||
|
// NextRun is when a task should next run: one interval after its last completed
|
||||||
|
// run, or one interval after the schedule anchor if it has never completed one.
|
||||||
|
func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time {
|
||||||
|
base := anchor
|
||||||
|
if lastFinished != nil {
|
||||||
|
base = *lastFinished
|
||||||
|
}
|
||||||
|
return base.Add(interval)
|
||||||
|
}
|
||||||
|
|
||||||
|
// dueTaskIDs returns the IDs of tasks whose next run is at or before now.
|
||||||
|
func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64 {
|
||||||
|
var ids []int64
|
||||||
|
for _, t := range tasks {
|
||||||
|
next := NextRun(time.Duration(t.IntervalSeconds)*time.Second, t.Anchor, t.LastFinished)
|
||||||
|
if !now.Before(next) {
|
||||||
|
ids = append(ids, t.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ids
|
||||||
|
}
|
||||||
|
|
||||||
|
type Scheduler struct {
|
||||||
|
store *store.Store
|
||||||
|
orch *orchestrator.Orchestrator
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler {
|
||||||
|
return &Scheduler{store: st, orch: orch}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start blocks, ticking until ctx is cancelled. Run it in a goroutine.
|
||||||
|
func (s *Scheduler) Start(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(pollInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
s.tick(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) tick(ctx context.Context) {
|
||||||
|
tasks, err := s.store.ListSchedulableTasks(ctx)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("scheduler: list schedulable", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, id := range dueTaskIDs(tasks, time.Now()) {
|
||||||
|
if _, err := s.orch.Run(ctx, id, "scheduled"); err != nil {
|
||||||
|
// ErrAlreadyRunning / ErrNotTested are expected races/edge cases, not fatal.
|
||||||
|
slog.Info("scheduler: run skipped", "task", id, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,38 @@
|
|||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/vasyansk/imap-copier/internal/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNextRun(t *testing.T) {
|
||||||
|
anchor := time.Date(2026, 7, 3, 10, 0, 0, 0, time.UTC)
|
||||||
|
fin := time.Date(2026, 7, 3, 11, 0, 0, 0, time.UTC)
|
||||||
|
|
||||||
|
// No finished run yet → anchor + interval.
|
||||||
|
if got := NextRun(time.Hour, anchor, nil); !got.Equal(anchor.Add(time.Hour)) {
|
||||||
|
t.Fatalf("first run: got %v", got)
|
||||||
|
}
|
||||||
|
// Finished run exists → last finished + interval (from completion, not start).
|
||||||
|
if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) {
|
||||||
|
t.Fatalf("recurring: got %v", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDueTaskIDs(t *testing.T) {
|
||||||
|
now := time.Date(2026, 7, 3, 12, 0, 0, 0, time.UTC)
|
||||||
|
anchorOld := now.Add(-2 * time.Hour) // enabled 2h ago
|
||||||
|
finRecent := now.Add(-30 * time.Minute)
|
||||||
|
|
||||||
|
tasks := []store.SchedulableTask{
|
||||||
|
{ID: 1, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: nil}, // due: anchor+1h < now
|
||||||
|
{ID: 2, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: &finRecent}, // not due: fin+1h > now
|
||||||
|
{ID: 3, IntervalSeconds: 21600, Anchor: anchorOld, LastFinished: nil}, // not due: anchor+6h > now
|
||||||
|
}
|
||||||
|
ids := dueTaskIDs(tasks, now)
|
||||||
|
if len(ids) != 1 || ids[0] != 1 {
|
||||||
|
t.Fatalf("due IDs = %v, want [1]", ids)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user