diff --git a/cmd/server/main.go b/cmd/server/main.go index 3ee890f..dc83fe0 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -13,6 +13,7 @@ import ( "github.com/vasyansk/imap-copier/internal/config" "github.com/vasyansk/imap-copier/internal/httpapi" "github.com/vasyansk/imap-copier/internal/orchestrator" + "github.com/vasyansk/imap-copier/internal/scheduler" "github.com/vasyansk/imap-copier/internal/store" "github.com/vasyansk/imap-copier/internal/wshub" ) @@ -45,6 +46,8 @@ func main() { orch := orchestrator.New(st, hub, cfg.EncKey, cfg.WorkerConcurrency) srv := httpapi.NewServer(cfg, st, orch, hub) + go scheduler.New(st, orch).Start(context.Background()) + slog.Info("listening", "addr", cfg.HTTPAddr) if err := http.ListenAndServe(cfg.HTTPAddr, srv.Router()); err != nil { slog.Error("serve", "err", err) diff --git a/docs/superpowers/specs/2026-07-03-scheduled-task-runs-design.md b/docs/superpowers/specs/2026-07-03-scheduled-task-runs-design.md new file mode 100644 index 0000000..d4679ab --- /dev/null +++ b/docs/superpowers/specs/2026-07-03-scheduled-task-runs-design.md @@ -0,0 +1,207 @@ +# Scheduled (recurring) task runs — design + +**Date:** 2026-07-03 +**Status:** awaiting user review + +## Context + +Migrations are run manually today (the operator clicks "Run migration"). For +ongoing source→destination sync, the operator wants a task to run itself on a +recurring interval (e.g. every 1h or 6h) without babysitting it. Requirements +from the user: + +- Per-task recurrence interval; the task runs automatically. +- Never start a run while the previous one is still running; measure the interval + from the **completion** of the last run (not its start). +- Show when the next run is due, in the **browser's local time**. +- A separate modal showing a log of runs with their status and totals. +- Deleting a task must clean up all this data along with the task. + +Decisions confirmed with the user: + +- **Run-log detail:** per-run totals only (reuse `runs`); no per-account snapshot. +- **Enable gate:** a schedule can be enabled only when **all accounts test OK**. +- **Breaker:** if a *scheduled* run finishes with errors, disable the schedule and + mark the task **broken** (red icon in the tasks list and the task detail). + Manual runs never trip the breaker. +- **First run:** one interval after enabling (`schedule_anchor + interval`). +- **Intervals:** presets 1 / 3 / 6 / 12 / 24 h, plus Off. + +### Current architecture (as-is) + +- `runs` table already has `started_at`, `finished_at`, `status`, and totals + (`migrations/0001_init.up.sql:35-44`). `CreateRun`/`FinishRun` in + `internal/store/runs.go`. No list-by-task query yet. +- `Orchestrator.Run(ctx, taskID)` gates on `gateOK` (all accounts test ok) and + `TryMarkTaskRunning` (atomic single-run), creates a run, and launches `runAll` + asynchronously; `runAll` sets the task status and calls `FinishRun` + (`internal/orchestrator/orchestrator.go`). +- `ON DELETE CASCADE` from `tasks` already removes `runs`, `accounts`, and + `migrated_messages` when a task is deleted. +- `main.go` wires store → orchestrator → hub → server and calls + `ResetRunningOnStartup` to clear phantom "running" after a restart. No scheduler. +- Task DTO / TS `Task` interface: `internal/httpapi/tasks.go`, `web/src/api.ts`. + +## Decision: in-process polling scheduler + +A single background goroutine (a `Scheduler`) started from `main.go`, ticking +every **30 s**. Each tick queries the DB for schedulable tasks and triggers the +due ones through the existing `Orchestrator.Run`. The DB is the source of truth, +so the scheduler is stateless and restart-safe (paired with the existing +`ResetRunningOnStartup`). Chosen over per-task timers (restart-fragile, in-memory +state) and external cron (breaks the single-binary model, needs auth wiring). + +30 s poll precision is ample for hour-scale intervals. + +## Data model + +**Migration `0004_task_scheduling`:** + +```sql +-- up +ALTER TABLE tasks ADD COLUMN schedule_interval_seconds INT NOT NULL DEFAULT 0; +ALTER TABLE tasks ADD COLUMN schedule_anchor TIMESTAMPTZ; +ALTER TABLE tasks ADD COLUMN broken BOOLEAN NOT NULL DEFAULT false; +ALTER TABLE runs ADD COLUMN trigger TEXT NOT NULL DEFAULT 'manual'; +-- down +ALTER TABLE runs DROP COLUMN trigger; +ALTER TABLE tasks DROP COLUMN broken; +ALTER TABLE tasks DROP COLUMN schedule_anchor; +ALTER TABLE tasks DROP COLUMN schedule_interval_seconds; +``` + +- `schedule_interval_seconds` — 0 = Off. Presets map to seconds (3600, 10800, + 21600, 43200, 86400). +- `schedule_anchor` — set to `now()` when a schedule is enabled; the baseline for + the first run when no finished run exists yet. +- `broken` — the schedule breaker tripped; task needs operator attention. +- `runs.trigger` — `'manual' | 'scheduled'`; drives the breaker and enriches the + run log. + +`store.Task` gains `ScheduleIntervalSeconds int64`, `ScheduleAnchor *time.Time`, +`Broken bool`. `store.Run` gains `Trigger string`, plus `StartedAt time.Time` and +`FinishedAt *time.Time` (needed for the log + due computation; not currently +scanned). + +### Cascade / deletion + +No new cleanup code. `runs` cascade-delete on task delete already; the schedule +columns live on `tasks` and vanish with the row. The in-memory scheduler polls +the DB, so a deleted task simply stops appearing. (Confirmed against +`migrations/0001_init.up.sql` FKs.) + +## Store additions + +- `SetTaskSchedule(ctx, id, intervalSeconds int64) error` — sets interval, sets + `schedule_anchor = now()` when enabling (interval>0) / leaves anchor when + disabling, and clears `broken` on any call (any explicit schedule change + un-breaks the task; enabling is additionally gated on all-accounts-OK in the + HTTP layer, disabling is always allowed). +- `SetTaskBroken(ctx, id) error` — `broken = true, schedule_interval_seconds = 0` + in one UPDATE (the breaker). +- `CreateRun(ctx, taskID, trigger string)` — extend the existing signature to + record the trigger. +- `ListRunsByTask(ctx, taskID) ([]Run, error)` — newest first, for the modal; + scans `id, started_at, finished_at, status, totals, trigger`. +- `LastFinishedRunAt(ctx, taskID) (*time.Time, error)` — most recent run with a + non-null `finished_at`, for due computation. (Or fold into `ListSchedulable`.) +- `ListSchedulableTasks(ctx) ([]ScheduledTask, error)` — tasks with + `schedule_interval_seconds > 0 AND NOT broken AND status <> 'running'`, joined + with their last finished run's `finished_at`, so the scheduler decides in one + query per tick. Returns the fields `dueAt` needs. + +`GetTask`/`ListTasks` SELECT + Scan extend for the three new task columns. + +## Scheduler (`internal/scheduler/`) + +New package, one focused responsibility. + +- `type Scheduler struct { store *store.Store; orch *orchestrator.Orchestrator }` +- `func (s *Scheduler) Start(ctx context.Context)` — `time.NewTicker(30s)` loop; + on each tick calls `s.tick(ctx)`; stops on `ctx.Done()`. +- `func (s *Scheduler) tick(ctx)` — `ListSchedulableTasks`, then for each due task + calls `s.orch.Run(ctx, taskID)` with `trigger = "scheduled"`. `Run` errors + (`ErrAlreadyRunning`, `ErrNotTested`) are logged, not fatal. +- **Pure, unit-tested decision function:** + `func dueAt(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time` + → `lastFinished + interval` if a finished run exists, else `anchor + interval`. + A task is due when `now >= dueAt(...)`. Kept pure so the tick logic is testable + without a DB or clock injection at the call site. + +`main.go` starts it: `go scheduler.New(st, orch).Start(context.Background())`. + +### Trigger threading + breaker + +`Orchestrator.Run` gains a `trigger string` parameter (manual callers pass +`"manual"`; the scheduler passes `"scheduled"`). `Run` forwards it to +`CreateRun` and into `runAll`. In `runAll`, after the final status is computed: + +``` +if trigger == "scheduled" && totErr > 0 { + _ = o.store.SetTaskBroken(ctx, task.ID) + o.hub.Publish(Event{Type: "task_broken", TaskID: task.ID, ...}) +} +``` + +The HTTP `handleRun` passes `"manual"`. This keeps the breaker precise: only +scheduled runs trip it. + +## HTTP API + +- **`PUT /api/tasks/{id}/schedule`** — body `{interval_seconds: int}`. Enabling + (interval>0) first checks `gateOK` (all accounts test OK) via + `ListAccountsByTask`; returns **409** with a clear message if not. Then + `SetTaskSchedule`. Disabling (0) always allowed. +- **`GET /api/tasks/{id}/runs`** — `ListRunsByTask` → run-log rows. +- **Task DTO** (`tasks.go`) gains `schedule_interval_seconds`, `broken`, and a + computed `next_run_at *string` (RFC3339, UTC; null when off / running / broken). + `next_run_at` is computed server-side from the same `dueAt` logic so the client + only formats it. `ListTasks` DTO also includes `broken` (for the red icon in the + list). + +## Frontend + +- **`api.ts`**: `Task` gains `schedule_interval_seconds?`, `broken?`, + `next_run_at?`. New `setTaskSchedule(taskId, intervalSeconds)` and + `listRuns(taskId)` (+ a `Run` interface). +- **`TaskDetail.tsx`**: in the run-control panel, an interval ` onSchedule(Number(e.target.value))} + > + + + + + + + + {task.next_run_at && ( + Next run: {new Date(task.next_run_at).toLocaleString()} + )} + {task.broken && broken} + +
@@ -635,6 +670,7 @@ export function TaskDetail({ id }: { id: number }) { onConfirm={saveEditMapping} /> )} + setShowRuns(false)} /> ) } diff --git a/web/src/pages/Tasks.tsx b/web/src/pages/Tasks.tsx index 8fc127c..382e3c7 100644 --- a/web/src/pages/Tasks.tsx +++ b/web/src/pages/Tasks.tsx @@ -163,6 +163,7 @@ export function Tasks() { + {t.broken && broken}