diff --git a/docs/superpowers/plans/2026-07-03-scheduled-task-runs.md b/docs/superpowers/plans/2026-07-03-scheduled-task-runs.md new file mode 100644 index 0000000..1e22e56 --- /dev/null +++ b/docs/superpowers/plans/2026-07-03-scheduled-task-runs.md @@ -0,0 +1,1014 @@ +# Scheduled (Recurring) Task Runs Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Let a task run itself on a recurring interval, with a breaker that disables the schedule and flags the task broken when a scheduled run errors, plus a run-log modal and next-run display. + +**Architecture:** A single in-process scheduler goroutine (30 s ticker) reads schedulable tasks from the DB each tick and triggers due ones through `Orchestrator.Run` with a `"scheduled"` trigger. Schedule config lives on `tasks` (interval, anchor, broken); `runs.trigger` distinguishes manual vs scheduled. DB is the source of truth, so the scheduler is stateless and restart-safe. The next-run time is computed server-side and formatted in the browser's local time. + +**Tech Stack:** Go (net/http, pgx v5, golang-migrate), Postgres 18, React 19 + Vite + TypeScript. + +## Global Constraints + +- Scheduler poll interval: **30 s** (a package const). +- Interval presets → seconds: Off=0, 1h=3600, 3h=10800, 6h=21600, 12h=43200, 24h=86400. +- Breaker trips **only on scheduled runs** (`trigger == "scheduled"` AND `total_errors > 0`) → `SetTaskBroken` (sets `broken=true, schedule_interval_seconds=0`). Manual runs never trip it. +- Enabling a schedule (interval>0) requires **all accounts tested OK**; otherwise HTTP **409**. Disabling (0) is always allowed and clears `broken`. +- First run after enabling = `schedule_anchor + interval`; subsequent = `last_finished_at + interval`. Never run while the task is `running`. +- `next_run_at`: RFC3339 UTC string, **null** when schedule off / task running / broken. +- Non-destructive migration; `ON DELETE CASCADE` already removes `runs` on task delete — no new cleanup code. +- pgx v5 scans nullable `TIMESTAMPTZ` into `*time.Time` and non-null into `time.Time`. +- Backend verification: `go build ./...`, `go vet ./...`, `go test ./...` (store tests need `TEST_DATABASE_URL`, else skip). +- Frontend verification: `cd web && npm run build`, `npx oxlint src/` (only the two pre-existing warnings allowed: `ConfirmProvider` only-export-components, `TaskDetail` exhaustive-deps). +- Commit after each task. + +--- + +### Task 1: Schema + store (scheduling columns, run trigger, store methods) + +**Files:** +- Create: `migrations/0004_task_scheduling.up.sql`, `migrations/0004_task_scheduling.down.sql` +- Modify: `internal/store/tasks.go` (Task struct; GetTask + ListTasks SELECT/Scan; add setters) +- Modify: `internal/store/runs.go` (Run struct; `CreateRun` signature; add queries) +- Modify: `internal/orchestrator/orchestrator.go` (one line: the `CreateRun` call — pass `"manual"` to keep it compiling; the real trigger threading is Task 3) +- Test: `internal/store/scheduling_test.go` (new) + +**Interfaces:** +- Produces: + - `store.Task` fields: `ScheduleIntervalSeconds int64` (json `schedule_interval_seconds`), `ScheduleAnchor *time.Time` (json `-`), `Broken bool` (json `broken`). + - `store.Run` fields: `StartedAt time.Time` (json `started_at`), `FinishedAt *time.Time` (json `finished_at`), `Trigger string` (json `trigger`); existing fields gain json tags. + - `store.SchedulableTask struct { ID int64; IntervalSeconds int64; Anchor time.Time; LastFinished *time.Time }` + - `func (s *Store) SetTaskSchedule(ctx, id, intervalSeconds int64) error` + - `func (s *Store) SetTaskBroken(ctx, id int64) error` + - `func (s *Store) ListRunsByTask(ctx, taskID int64) ([]Run, error)` + - `func (s *Store) LastFinishedRunAt(ctx, taskID int64) (*time.Time, error)` + - `func (s *Store) ListSchedulableTasks(ctx) ([]SchedulableTask, error)` + - `func (s *Store) CreateRun(ctx, taskID int64, trigger string) (int64, error)` (signature change) + +- [ ] **Step 1: Write the migration** + +`migrations/0004_task_scheduling.up.sql`: +```sql +ALTER TABLE tasks ADD COLUMN schedule_interval_seconds INT NOT NULL DEFAULT 0; +ALTER TABLE tasks ADD COLUMN schedule_anchor TIMESTAMPTZ; +ALTER TABLE tasks ADD COLUMN broken BOOLEAN NOT NULL DEFAULT false; +ALTER TABLE runs ADD COLUMN trigger TEXT NOT NULL DEFAULT 'manual'; +``` + +`migrations/0004_task_scheduling.down.sql`: +```sql +ALTER TABLE runs DROP COLUMN trigger; +ALTER TABLE tasks DROP COLUMN broken; +ALTER TABLE tasks DROP COLUMN schedule_anchor; +ALTER TABLE tasks DROP COLUMN schedule_interval_seconds; +``` + +Apply to the test DB before running store tests: +`docker exec -i imapcopier-testpg psql -U imap -d imapcopier_test -q -v ON_ERROR_STOP=1 < migrations/0004_task_scheduling.up.sql` + +- [ ] **Step 2: Write the failing store test** + +`internal/store/scheduling_test.go`: +```go +package store + +import ( + "context" + "testing" +) + +func TestTaskScheduleAndBroken(t *testing.T) { + s := testStore(t) + ctx := context.Background() + epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"}) + epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"}) + taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst}) + + // Defaults: no schedule, not broken. + tk, _ := s.GetTask(ctx, taskID) + if tk.ScheduleIntervalSeconds != 0 || tk.Broken || tk.ScheduleAnchor != nil { + t.Fatalf("defaults: interval=%d broken=%v anchor=%v", tk.ScheduleIntervalSeconds, tk.Broken, tk.ScheduleAnchor) + } + + // Enable → interval set, anchor stamped, appears in schedulable list. + if err := s.SetTaskSchedule(ctx, taskID, 3600); err != nil { + t.Fatalf("set schedule: %v", err) + } + tk, _ = s.GetTask(ctx, taskID) + if tk.ScheduleIntervalSeconds != 3600 || tk.ScheduleAnchor == nil { + t.Fatalf("after enable: interval=%d anchor=%v", tk.ScheduleIntervalSeconds, tk.ScheduleAnchor) + } + sch, _ := s.ListSchedulableTasks(ctx) + if len(sch) != 1 || sch[0].ID != taskID || sch[0].IntervalSeconds != 3600 || sch[0].LastFinished != nil { + t.Fatalf("schedulable: %+v", sch) + } + + // Breaker → broken true, interval 0, drops out of schedulable list. + if err := s.SetTaskBroken(ctx, taskID); err != nil { + t.Fatalf("set broken: %v", err) + } + tk, _ = s.GetTask(ctx, taskID) + if !tk.Broken || tk.ScheduleIntervalSeconds != 0 { + t.Fatalf("after break: broken=%v interval=%d", tk.Broken, tk.ScheduleIntervalSeconds) + } + if sch, _ := s.ListSchedulableTasks(ctx); len(sch) != 0 { + t.Fatalf("broken task still schedulable: %+v", sch) + } + + // Re-enable clears broken. + _ = s.SetTaskSchedule(ctx, taskID, 21600) + if tk, _ = s.GetTask(ctx, taskID); tk.Broken { + t.Fatalf("re-enable did not clear broken") + } +} + +func TestListRunsByTaskWithTrigger(t *testing.T) { + s := testStore(t) + ctx := context.Background() + epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"}) + epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"}) + taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst}) + + r1, _ := s.CreateRun(ctx, taskID, "manual") + _ = s.FinishRun(ctx, r1, "done", 5, 1, 0) + _, _ = s.CreateRun(ctx, taskID, "scheduled") // still running (no finish) + + runs, err := s.ListRunsByTask(ctx, taskID) + if err != nil || len(runs) != 2 { + t.Fatalf("list runs: %v len=%d", err, len(runs)) + } + // newest first + if runs[0].Trigger != "scheduled" || runs[0].FinishedAt != nil { + t.Fatalf("run[0]: %+v", runs[0]) + } + if runs[1].Trigger != "manual" || runs[1].FinishedAt == nil || runs[1].TotalCopied != 5 { + t.Fatalf("run[1]: %+v", runs[1]) + } + + // Last finished-at reflects the finished manual run. + lf, _ := s.LastFinishedRunAt(ctx, taskID) + if lf == nil { + t.Fatalf("LastFinishedRunAt nil, want the finished run's time") + } +} +``` + +- [ ] **Step 3: Run tests to verify they fail** + +Run: `TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./internal/store/ -run 'TestTaskScheduleAndBroken|TestListRunsByTaskWithTrigger'` +Expected: FAIL to compile — new struct fields, `SetTaskSchedule`, `SetTaskBroken`, `ListRunsByTask`, `LastFinishedRunAt`, `ListSchedulableTasks`, and the 3-arg `CreateRun` don't exist yet. + +- [ ] **Step 4: Extend `store.Task` and its scans** + +In `internal/store/tasks.go`, add `import "time"`, and extend the struct: +```go +type Task struct { + ID int64 `json:"id"` + Name string `json:"name"` + SrcEndpointID int64 `json:"src_endpoint_id"` + DstEndpointID int64 `json:"dst_endpoint_id"` + Status string `json:"status"` + FolderMapping map[string]string `json:"folder_mapping"` + ScheduleIntervalSeconds int64 `json:"schedule_interval_seconds"` + ScheduleAnchor *time.Time `json:"-"` + Broken bool `json:"broken"` +} +``` +Update `GetTask`'s SELECT + Scan: +```go + err := s.Pool.QueryRow(ctx, + `SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping, + schedule_interval_seconds, schedule_anchor, broken + FROM tasks WHERE id=$1`, id). + Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping, + &t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken) +``` +Update `ListTasks`'s SELECT + Scan the same way (add the three columns and three scan targets): +```go + rows, err := s.Pool.Query(ctx, + `SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping, + schedule_interval_seconds, schedule_anchor, broken + FROM tasks ORDER BY id DESC`) + ... + if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping, + &t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken); err != nil { +``` + +- [ ] **Step 5: Add task scheduling setters + schedulable query** + +Append to `internal/store/tasks.go`: +```go +// SetTaskSchedule sets the recurrence interval (0 = off), stamps the anchor when +// enabling (the first run is one interval after this), and clears broken — any +// explicit schedule change un-breaks the task. +func (s *Store) SetTaskSchedule(ctx context.Context, id, intervalSeconds int64) error { + _, err := s.Pool.Exec(ctx, + `UPDATE tasks SET schedule_interval_seconds=$2, + schedule_anchor = CASE WHEN $2 > 0 THEN now() ELSE schedule_anchor END, + broken = false + WHERE id=$1`, id, intervalSeconds) + return err +} + +// SetTaskBroken trips the schedule breaker: disables the schedule and flags the +// task for operator attention. +func (s *Store) SetTaskBroken(ctx context.Context, id int64) error { + _, err := s.Pool.Exec(ctx, + `UPDATE tasks SET broken=true, schedule_interval_seconds=0 WHERE id=$1`, id) + return err +} + +// SchedulableTask is the per-tick decision input for the scheduler. +type SchedulableTask struct { + ID int64 + IntervalSeconds int64 + Anchor time.Time + LastFinished *time.Time +} + +// ListSchedulableTasks returns tasks eligible to auto-run: schedule on, not +// broken, not currently running — each joined with its last finished run time. +func (s *Store) ListSchedulableTasks(ctx context.Context) ([]SchedulableTask, error) { + rows, err := s.Pool.Query(ctx, + `SELECT t.id, t.schedule_interval_seconds, t.schedule_anchor, + (SELECT max(finished_at) FROM runs r WHERE r.task_id=t.id AND r.finished_at IS NOT NULL) + FROM tasks t + WHERE t.schedule_interval_seconds > 0 AND NOT t.broken AND t.status <> 'running'`) + if err != nil { + return nil, err + } + defer rows.Close() + out := []SchedulableTask{} + for rows.Next() { + var st SchedulableTask + if err := rows.Scan(&st.ID, &st.IntervalSeconds, &st.Anchor, &st.LastFinished); err != nil { + return nil, err + } + out = append(out, st) + } + return out, rows.Err() +} +``` + +- [ ] **Step 6: Extend `store.Run`, `CreateRun`, and add run queries** + +In `internal/store/runs.go`, add `import "time"` (alongside context) and replace the struct + CreateRun, then add the queries: +```go +type Run struct { + ID int64 `json:"id"` + TaskID int64 `json:"task_id"` + Status string `json:"status"` + StartedAt time.Time `json:"started_at"` + FinishedAt *time.Time `json:"finished_at"` + TotalCopied int64 `json:"total_copied"` + TotalSkipped int64 `json:"total_skipped"` + TotalErrors int64 `json:"total_errors"` + Trigger string `json:"trigger"` +} + +func (s *Store) CreateRun(ctx context.Context, taskID int64, trigger string) (int64, error) { + var id int64 + err := s.Pool.QueryRow(ctx, + `INSERT INTO runs (task_id, trigger) VALUES ($1,$2) RETURNING id`, taskID, trigger).Scan(&id) + return id, err +} + +// ListRunsByTask returns a task's runs, newest first, for the run-log modal. +func (s *Store) ListRunsByTask(ctx context.Context, taskID int64) ([]Run, error) { + rows, err := s.Pool.Query(ctx, + `SELECT id, task_id, started_at, finished_at, status, + total_copied, total_skipped, total_errors, trigger + FROM runs WHERE task_id=$1 ORDER BY id DESC`, taskID) + if err != nil { + return nil, err + } + defer rows.Close() + out := []Run{} + for rows.Next() { + var r Run + if err := rows.Scan(&r.ID, &r.TaskID, &r.StartedAt, &r.FinishedAt, &r.Status, + &r.TotalCopied, &r.TotalSkipped, &r.TotalErrors, &r.Trigger); err != nil { + return nil, err + } + out = append(out, r) + } + return out, rows.Err() +} + +// LastFinishedRunAt returns the most recent finished run's timestamp, or nil if +// the task has never completed a run — the baseline for the next scheduled run. +func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) { + var t *time.Time + err := s.Pool.QueryRow(ctx, + `SELECT max(finished_at) FROM runs WHERE task_id=$1 AND finished_at IS NOT NULL`, taskID).Scan(&t) + return t, err +} +``` +Keep the existing `FinishRun` as-is. + +- [ ] **Step 7: Fix the `CreateRun` caller so the build passes** + +In `internal/orchestrator/orchestrator.go`, the `Run` method calls `o.store.CreateRun(ctx, taskID)`. Change it to pass a trigger literal for now: +```go + runID, err := o.store.CreateRun(ctx, taskID, "manual") +``` +(Task 3 threads the real trigger through; this keeps the build green now.) + +- [ ] **Step 8: Run the store tests to verify they pass** + +Run: `TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./internal/store/ -run 'TestTaskScheduleAndBroken|TestListRunsByTaskWithTrigger' -v` +Expected: PASS. + +- [ ] **Step 9: Verify whole build + suite** + +Run: `go build ./... && go vet ./... && TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...` +Expected: all pass. + +- [ ] **Step 10: Commit** + +```bash +git add migrations/0004_task_scheduling.up.sql migrations/0004_task_scheduling.down.sql \ + internal/store/tasks.go internal/store/runs.go internal/store/scheduling_test.go \ + internal/orchestrator/orchestrator.go +git commit -m "feat(store): task schedule columns, run trigger, scheduling queries" +``` + +--- + +### Task 2: Scheduler package + wiring + +**Files:** +- Create: `internal/scheduler/scheduler.go` +- Create: `internal/scheduler/scheduler_test.go` +- Modify: `cmd/server/main.go` (start the scheduler) + +**Interfaces:** +- Consumes: `store.SchedulableTask`, `store.ListSchedulableTasks`, `orchestrator.Orchestrator.Run(ctx, taskID, trigger)` (the 3-arg form lands in Task 3; this task calls it as 3-arg and both compile together only after Task 3 — see Step 5 note). +- Produces: + - `func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time` + - `func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64` + - `type Scheduler`; `func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler`; `func (s *Scheduler) Start(ctx context.Context)` + +- [ ] **Step 1: Write the failing pure-logic test** + +`internal/scheduler/scheduler_test.go`: +```go +package scheduler + +import ( + "testing" + "time" + + "github.com/vasyansk/imap-copier/internal/store" +) + +func TestNextRun(t *testing.T) { + anchor := time.Date(2026, 7, 3, 10, 0, 0, 0, time.UTC) + fin := time.Date(2026, 7, 3, 11, 0, 0, 0, time.UTC) + + // No finished run yet → anchor + interval. + if got := NextRun(time.Hour, anchor, nil); !got.Equal(anchor.Add(time.Hour)) { + t.Fatalf("first run: got %v", got) + } + // Finished run exists → last finished + interval (from completion, not start). + if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) { + t.Fatalf("recurring: got %v", got) + } +} + +func TestDueTaskIDs(t *testing.T) { + now := time.Date(2026, 7, 3, 12, 0, 0, 0, time.UTC) + anchorOld := now.Add(-2 * time.Hour) // enabled 2h ago + finRecent := now.Add(-30 * time.Minute) + + tasks := []store.SchedulableTask{ + {ID: 1, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: nil}, // due: anchor+1h < now + {ID: 2, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: &finRecent}, // not due: fin+1h > now + {ID: 3, IntervalSeconds: 21600, Anchor: anchorOld, LastFinished: nil}, // not due: anchor+6h > now + } + ids := dueTaskIDs(tasks, now) + if len(ids) != 1 || ids[0] != 1 { + t.Fatalf("due IDs = %v, want [1]", ids) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/scheduler/` +Expected: FAIL — package/functions don't exist. + +- [ ] **Step 3: Implement the scheduler** + +`internal/scheduler/scheduler.go`: +```go +// Package scheduler runs tasks on their recurring interval. A single goroutine +// polls the DB every pollInterval and triggers due tasks through the +// orchestrator. The DB is the source of truth, so the scheduler holds no state +// and is safe across restarts. +package scheduler + +import ( + "context" + "log/slog" + "time" + + "github.com/vasyansk/imap-copier/internal/orchestrator" + "github.com/vasyansk/imap-copier/internal/store" +) + +const pollInterval = 30 * time.Second + +// NextRun is when a task should next run: one interval after its last completed +// run, or one interval after the schedule anchor if it has never completed one. +func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time { + base := anchor + if lastFinished != nil { + base = *lastFinished + } + return base.Add(interval) +} + +// dueTaskIDs returns the IDs of tasks whose next run is at or before now. +func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64 { + var ids []int64 + for _, t := range tasks { + next := NextRun(time.Duration(t.IntervalSeconds)*time.Second, t.Anchor, t.LastFinished) + if !now.Before(next) { + ids = append(ids, t.ID) + } + } + return ids +} + +type Scheduler struct { + store *store.Store + orch *orchestrator.Orchestrator +} + +func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler { + return &Scheduler{store: st, orch: orch} +} + +// Start blocks, ticking until ctx is cancelled. Run it in a goroutine. +func (s *Scheduler) Start(ctx context.Context) { + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.tick(ctx) + } + } +} + +func (s *Scheduler) tick(ctx context.Context) { + tasks, err := s.store.ListSchedulableTasks(ctx) + if err != nil { + slog.Error("scheduler: list schedulable", "err", err) + return + } + for _, id := range dueTaskIDs(tasks, time.Now()) { + if _, err := s.orch.Run(ctx, id, "scheduled"); err != nil { + // ErrAlreadyRunning / ErrNotTested are expected races/edge cases, not fatal. + slog.Info("scheduler: run skipped", "task", id, "err", err) + } + } +} +``` + +- [ ] **Step 4: Wire the scheduler into main** + +In `cmd/server/main.go`, add the import `"github.com/vasyansk/imap-copier/internal/scheduler"` and, after `orch := orchestrator.New(...)` and before `http.ListenAndServe`, start it: +```go + go scheduler.New(st, orch).Start(context.Background()) +``` + +- [ ] **Step 5: Run the pure test** + +Run: `go test ./internal/scheduler/ -v` +Expected: PASS. + +Note: `go build ./...` will FAIL until Task 3 changes `Orchestrator.Run` to the 3-arg form (`s.orch.Run(ctx, id, "scheduled")`). Run Task 3 immediately after this task; the package unit test passes now, the whole-module build goes green after Task 3. (The two tasks are sequenced back-to-back for this reason.) + +- [ ] **Step 6: Commit** + +```bash +git add internal/scheduler/scheduler.go internal/scheduler/scheduler_test.go cmd/server/main.go +git commit -m "feat(scheduler): 30s polling scheduler with pure NextRun/dueTaskIDs" +``` + +--- + +### Task 3: Orchestrator trigger threading + breaker + +**Files:** +- Modify: `internal/orchestrator/orchestrator.go` (`Run` signature; thread trigger into `CreateRun` + `runAll`; breaker in `runAll`) +- Modify: `internal/httpapi/run.go` (`handleRun` passes `"manual"`) +- Test: `internal/orchestrator/breaker_test.go` (new) + +**Interfaces:** +- Consumes: `store.SetTaskBroken` (Task 1). +- Produces: + - `func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error)` (was 2-arg) + - `func shouldBreak(trigger string, totErr int64) bool` + +- [ ] **Step 1: Write the failing breaker-logic test** + +`internal/orchestrator/breaker_test.go`: +```go +package orchestrator + +import "testing" + +func TestShouldBreak(t *testing.T) { + cases := []struct { + trigger string + errs int64 + want bool + }{ + {"scheduled", 2, true}, // scheduled run with errors trips the breaker + {"scheduled", 0, false}, // scheduled run, clean — no break + {"manual", 5, false}, // manual run never trips the breaker + {"manual", 0, false}, + } + for _, c := range cases { + if got := shouldBreak(c.trigger, c.errs); got != c.want { + t.Fatalf("shouldBreak(%q,%d)=%v want %v", c.trigger, c.errs, got, c.want) + } + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/orchestrator/ -run TestShouldBreak` +Expected: FAIL — `shouldBreak` undefined. + +- [ ] **Step 3: Add `shouldBreak` and thread the trigger through `Run`** + +In `internal/orchestrator/orchestrator.go`: + +Add the helper (package level): +```go +// shouldBreak reports whether a completed run should trip the schedule breaker: +// only scheduled runs that ended with errors. +func shouldBreak(trigger string, totErr int64) bool { + return trigger == "scheduled" && totErr > 0 +} +``` + +Change `Run` to accept and forward the trigger: +```go +func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error) { +``` +Inside `Run`, replace the temporary literal from Task 1: +```go + runID, err := o.store.CreateRun(ctx, taskID, trigger) +``` +And pass the trigger into the coordinator goroutine: +```go + go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP, trigger) +``` + +Change `runAll`'s signature to receive it and trip the breaker after the final status is set: +```go +func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint, trigger string) { +``` +At the end of `runAll`, after `SetTaskStatus` and the `run_done` publish, add: +```go + if shouldBreak(trigger, totErr) { + _ = o.store.SetTaskBroken(ctx, task.ID) + o.hub.Publish(wshub.Event{Type: "task_broken", TaskID: task.ID, + Data: map[string]any{"task_id": task.ID, "errors": totErr}}) + } +``` + +- [ ] **Step 4: Update the manual caller** + +In `internal/httpapi/run.go`, `handleRun`: +```go + runID, err := s.orch.Run(r.Context(), taskID, "manual") +``` + +- [ ] **Step 5: Run the breaker test + full build** + +Run: `go test ./internal/orchestrator/ -run TestShouldBreak -v && go build ./... && go vet ./...` +Expected: test PASS; build + vet clean (the scheduler's `s.orch.Run(ctx, id, "scheduled")` from Task 2 now resolves). + +- [ ] **Step 6: Full suite** + +Run: `TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...` +Expected: all pass. + +- [ ] **Step 7: Commit** + +```bash +git add internal/orchestrator/orchestrator.go internal/orchestrator/breaker_test.go internal/httpapi/run.go +git commit -m "feat(orchestrator): thread run trigger; breaker on scheduled runs with errors" +``` + +--- + +### Task 4: HTTP — schedule endpoint, runs endpoint, next_run_at DTO + +**Files:** +- Modify: `internal/httpapi/tasks.go` (`taskView` with `next_run_at`; compute in `handleGetTask`; add `handleSetSchedule`, `handleListRuns`) +- Modify: `internal/httpapi/router.go` (two routes) + +**Interfaces:** +- Consumes: `store.SetTaskSchedule`, `store.ListRunsByTask`, `store.LastFinishedRunAt`, `store.ListAccountsByTask`, `scheduler.NextRun`. +- Produces: routes `PUT /api/tasks/{id}/schedule`, `GET /api/tasks/{id}/runs`; task JSON gains `next_run_at`. + +- [ ] **Step 1: Add the task DTO with next_run_at and compute it in handleGetTask** + +In `internal/httpapi/tasks.go`, add imports `"time"` and `"github.com/vasyansk/imap-copier/internal/scheduler"`. Add the view type and compute the next run: +```go +// taskView augments the stored task with the server-computed next scheduled run +// (RFC3339 UTC; null when the schedule is off, the task is running, or broken). +type taskView struct { + store.Task + NextRunAt *string `json:"next_run_at"` +} + +func (s *Server) taskViewFor(ctx context.Context, t store.Task) taskView { + var nextRunAt *string + if t.ScheduleIntervalSeconds > 0 && !t.Broken && t.Status != "running" && t.ScheduleAnchor != nil { + last, _ := s.store.LastFinishedRunAt(ctx, t.ID) + n := scheduler.NextRun(time.Duration(t.ScheduleIntervalSeconds)*time.Second, *t.ScheduleAnchor, last) + str := n.UTC().Format(time.RFC3339) + nextRunAt = &str + } + return taskView{Task: t, NextRunAt: nextRunAt} +} +``` +Change `handleGetTask`'s final write to use it: +```go + writeJSON(w, http.StatusOK, map[string]any{"task": s.taskViewFor(r.Context(), task), "accounts": views}) +``` +(`handleListTasks` stays returning `[]store.Task` — the list needs only `broken`, which serializes via the json tag; `next_run_at` is detail-only.) + +- [ ] **Step 2: Add the schedule + runs handlers** + +Append to `internal/httpapi/tasks.go`: +```go +// handleSetSchedule enables/changes/disables a task's recurring schedule. +// Enabling (interval>0) requires every account to pass its connection tests. +func (s *Server) handleSetSchedule(w http.ResponseWriter, r *http.Request) { + taskID, err := pathID(r, "id") + if err != nil { + http.Error(w, "bad id", http.StatusBadRequest) + return + } + var body struct { + IntervalSeconds int64 `json:"interval_seconds"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "bad json", http.StatusBadRequest) + return + } + if body.IntervalSeconds > 0 { + accs, err := s.store.ListAccountsByTask(r.Context(), taskID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if !allAccountsTested(accs) { + http.Error(w, "all accounts must pass connection tests before scheduling", http.StatusConflict) + return + } + } + if err := s.store.SetTaskSchedule(r.Context(), taskID, body.IntervalSeconds); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) +} + +// allAccountsTested mirrors the orchestrator's run gate: non-empty and every +// account OK on both sides. +func allAccountsTested(accs []store.Account) bool { + if len(accs) == 0 { + return false + } + for _, a := range accs { + if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" { + return false + } + } + return true +} + +func (s *Server) handleListRuns(w http.ResponseWriter, r *http.Request) { + taskID, err := pathID(r, "id") + if err != nil { + http.Error(w, "bad id", http.StatusBadRequest) + return + } + runs, err := s.store.ListRunsByTask(r.Context(), taskID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, http.StatusOK, runs) +} +``` +(`json` and `store` are already imported in `tasks.go`.) + +- [ ] **Step 3: Register routes** + +In `internal/httpapi/router.go`, after the existing `PUT /api/tasks/{id}/folder-mapping` line (or with the other task routes): +```go + api.HandleFunc("PUT /api/tasks/{id}/schedule", s.handleSetSchedule) + api.HandleFunc("GET /api/tasks/{id}/runs", s.handleListRuns) +``` + +- [ ] **Step 4: Verify build + vet + suite** + +Run: `go build ./... && go vet ./... && TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...` +Expected: all pass. + +- [ ] **Step 5: Commit** + +```bash +git add internal/httpapi/tasks.go internal/httpapi/router.go +git commit -m "feat(api): schedule + runs endpoints; next_run_at on task detail" +``` + +--- + +### Task 5: Frontend — schedule control, next-run, broken badge, run-log modal, list icon + +**Files:** +- Modify: `web/src/api.ts` (Task fields; `Run` type; `setTaskSchedule`, `listRuns`) +- Create: `web/src/components/RunLogModal.tsx` +- Modify: `web/src/pages/TaskDetail.tsx` (schedule select, next-run line, broken badge, Runs button+modal, reload on `task_broken`) +- Modify: `web/src/pages/Tasks.tsx` (broken indicator in the list) +- Modify: `web/src/app.css` (broken badge / schedule-row styles) + +**Interfaces:** +- Consumes: backend routes from Task 4; `Modal` (`web/src/components/Modal.tsx`), `StatusBadge`. + +- [ ] **Step 1: Extend the API client** + +In `web/src/api.ts`, extend `Task`: +```ts +export interface Task { + id: number + name: string + src_endpoint_id: number + dst_endpoint_id: number + status: string + folder_mapping?: Record + schedule_interval_seconds?: number + broken?: boolean + next_run_at?: string | null +} +``` +Add a `Run` interface and the two calls (near `runTask`): +```ts +export interface Run { + id: number + task_id: number + status: string + started_at: string + finished_at: string | null + total_copied: number + total_skipped: number + total_errors: number + trigger: string +} + +export const setTaskSchedule = (taskId: number, intervalSeconds: number) => + api(`/api/tasks/${taskId}/schedule`, { ...jsonBody({ interval_seconds: intervalSeconds }), method: 'PUT' }) + +export const listRuns = (taskId: number) => api(`/api/tasks/${taskId}/runs`) +``` + +- [ ] **Step 2: Create the run-log modal** + +`web/src/components/RunLogModal.tsx`: +```tsx +import { useEffect, useState } from 'react' +import { Modal } from './Modal' +import { StatusBadge } from './StatusBadge' +import { listRuns, type Run } from '../api' + +const fmt = (iso: string | null) => (iso ? new Date(iso).toLocaleString() : '—') + +export function RunLogModal({ taskId, open, onClose }: { taskId: number; open: boolean; onClose: () => void }) { + const [runs, setRuns] = useState(null) + + useEffect(() => { + if (!open) return + setRuns(null) + listRuns(taskId).then((r) => setRuns(r ?? [])).catch(() => setRuns([])) + }, [open, taskId]) + + return ( + +
+ + + + + + + + + + + + + + {runs === null ? ( + + ) : runs.length === 0 ? ( + + ) : ( + runs.map((r) => ( + + + + + + + + + + )) + )} + +
StartedFinishedTriggerStatusCopiedSkippedErrors
loading…
no runs yet
{fmt(r.started_at)}{fmt(r.finished_at)}{r.trigger}{r.total_copied}{r.total_skipped}{r.total_errors}
+
+
+ ) +} +``` + +- [ ] **Step 3: TaskDetail — schedule control, next-run, broken badge, Runs modal** + +In `web/src/pages/TaskDetail.tsx`: + +Add to the imports from `../api`: `listRuns` is not needed here; add `setTaskSchedule`. Import the modal: `import { RunLogModal } from '../components/RunLogModal'`. + +Add state near the other `useState`s: +```ts + const [showRuns, setShowRuns] = useState(false) +``` + +Add `task_broken` to the reload event list (the existing `if ([...].includes(ev.type)) reload()` array) so the detail refreshes when the breaker trips: +```ts + if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled', 'plan', 'task_broken'].includes(ev.type)) { + reload() + } +``` + +Add a schedule handler: +```ts + async function onSchedule(intervalSeconds: number) { + setError(null) + try { + await setTaskSchedule(id, intervalSeconds) + reload() + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to set schedule') + } + } +``` + +In the "Run control" panel (after the `.btn-row` with Test/Run), add the schedule row. `task` is in scope from `const { task, accounts } = data`: +```tsx +
+ + + {task.next_run_at && ( + Next run: {new Date(task.next_run_at).toLocaleString()} + )} + {task.broken && broken} + +
+``` + +Add the broken badge next to the status in the page head (``): +```tsx + + {task.broken && broken} +``` + +Render the modal near the other modals at the end of the returned JSX: +```tsx + setShowRuns(false)} /> +``` + +- [ ] **Step 4: Tasks list — broken indicator** + +In `web/src/pages/Tasks.tsx`, in the row's Status cell, show the broken badge beside the status: +```tsx + + + {t.broken && broken} + +``` + +- [ ] **Step 5: CSS for the schedule row** + +In `web/src/app.css`, add: +```css +.sched-row { + display: flex; + align-items: center; + gap: 12px; + margin-top: 14px; + flex-wrap: wrap; +} + +.sched-row label { + font-size: 11px; + font-weight: 700; + letter-spacing: 0.1em; + text-transform: uppercase; + color: var(--fg-dim); +} + +.sched-row select { + background: var(--bg-inset); + border: 1px solid var(--border); + color: var(--fg); + padding: 7px 10px; + font-size: 13px; + border-radius: 2px; +} + +.sched-next { + font-size: 12px; + color: var(--fg-dim); + font-variant-numeric: tabular-nums; +} +``` + +- [ ] **Step 6: Verify build + lint** + +Run: `cd web && npm run build && npx oxlint src/` +Expected: build clean; oxlint shows ONLY the two pre-existing warnings (`ConfirmProvider` only-export-components, `TaskDetail` exhaustive-deps). Fix any new finding. + +- [ ] **Step 7: Commit** + +```bash +git add web/src/api.ts web/src/components/RunLogModal.tsx web/src/pages/TaskDetail.tsx web/src/pages/Tasks.tsx web/src/app.css +git commit -m "feat(web): schedule control, next-run, broken badge, run-log modal" +``` + +--- + +### Task 6: End-to-end verification on prod + +Per project rules, functional verification runs on prod after deploy. No code. + +- [ ] **Step 1: Build + migrate + deploy** + +Run: `make build && make up` (migration 0004 auto-applies at startup). Wait for `curl -fsS http://localhost/healthz`. + +- [ ] **Step 2: Enable-gate + next-run display** + +On a task whose accounts are NOT all tested OK, set Schedule → the request returns 409 and the schedule stays Off (surfaced in the error banner). Test the accounts to OK, then set Schedule = Every 1h → "Next run: " appears (≈ now + 1h, in browser local time), and no run starts immediately (first run is one interval out). + +- [ ] **Step 3: Run-log modal** + +Run the task manually once. Open **runs** → the modal lists the run with local-time started/finished, `trigger = manual`, status, and copied/skipped/errors. + +- [ ] **Step 4: Breaker (scheduled failure → broken)** + +Simulate a scheduled failure: break one account (e.g. wrong stored password) on a scheduled task, then let a scheduled run occur (or temporarily observe the next scheduled cycle). When the scheduled run finishes with errors, the schedule flips to Off, the task shows the red **broken** badge in both the task detail and the Tasks list, and no further auto-runs occur until re-enabled. (Auto-run *timing* itself is covered by the `scheduler` unit tests; presets start at 1h, so the timed trigger is not observed within a quick E2E — verify the breaker/flag transition and the DB state.) + +- [ ] **Step 5: Delete cleanup** + +Delete a task that had a schedule and runs; confirm it disappears and its runs are gone (cascade). The scheduler stops considering it (no errors in logs). + +- [ ] **Step 6: Record result** + +Save an E2E note under `./swarm-report/scheduled-task-runs-.md` with pass/fail per step. + +--- + +## Self-Review + +- **Spec coverage:** interval per task + auto-run (Tasks 1–3); don't-start-if-running + interval-from-completion (`ListSchedulableTasks` `status<>'running'` filter + `NextRun` from `LastFinished`, Tasks 1–2); next-run in browser local time (`next_run_at` DTO Task 4 + `toLocaleString` Task 5); run-log modal with per-run status/totals (Tasks 4–5); enable-gate all-OK → 409 (Task 4); breaker scheduled-error → broken + red icon in list & detail (Tasks 3, 5); first-run-after-interval via anchor (Task 1 `SetTaskSchedule` + Task 2 `NextRun`); delete cleanup via cascade (noted, no code); migration 0004 (Task 1). All covered. +- **Placeholder scan:** none — every step carries real code/commands. +- **Type consistency:** `CreateRun(ctx, taskID, trigger)` identical across Tasks 1/3; `Run(ctx, taskID, trigger)` defined Task 3, called by scheduler Task 2 and `handleRun` Task 3; `NextRun(interval, anchor, lastFinished)` identical in scheduler (Task 2) and `taskViewFor` (Task 4); `SchedulableTask{ID,IntervalSeconds,Anchor,LastFinished}` fields match between store (Task 1) and `dueTaskIDs` (Task 2); JSON contract `schedule_interval_seconds`/`broken`/`next_run_at`/`interval_seconds` and `Run` fields match between Go (Tasks 1,4) and TS (Task 5). +- **Sequencing note:** Task 2's whole-module build is red until Task 3 lands the 3-arg `Run` (the scheduler calls it). Both are flagged to run back-to-back; each task's own unit test passes independently.