# Scheduled (Recurring) Task Runs Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Let a task run itself on a recurring interval, with a breaker that disables the schedule and flags the task broken when a scheduled run errors, plus a run-log modal and next-run display. **Architecture:** A single in-process scheduler goroutine (30 s ticker) reads schedulable tasks from the DB each tick and triggers due ones through `Orchestrator.Run` with a `"scheduled"` trigger. Schedule config lives on `tasks` (interval, anchor, broken); `runs.trigger` distinguishes manual vs scheduled. DB is the source of truth, so the scheduler is stateless and restart-safe. The next-run time is computed server-side and formatted in the browser's local time. **Tech Stack:** Go (net/http, pgx v5, golang-migrate), Postgres 18, React 19 + Vite + TypeScript. ## Global Constraints - Scheduler poll interval: **30 s** (a package const). - Interval presets → seconds: Off=0, 1h=3600, 3h=10800, 6h=21600, 12h=43200, 24h=86400. - Breaker trips **only on scheduled runs** (`trigger == "scheduled"` AND `total_errors > 0`) → `SetTaskBroken` (sets `broken=true, schedule_interval_seconds=0`). Manual runs never trip it. - Enabling a schedule (interval>0) requires **all accounts tested OK**; otherwise HTTP **409**. Disabling (0) is always allowed and clears `broken`. - First run after enabling = `schedule_anchor + interval`; subsequent = `last_finished_at + interval`. Never run while the task is `running`. - `next_run_at`: RFC3339 UTC string, **null** when schedule off / task running / broken. - Non-destructive migration; `ON DELETE CASCADE` already removes `runs` on task delete — no new cleanup code. - pgx v5 scans nullable `TIMESTAMPTZ` into `*time.Time` and non-null into `time.Time`. - Backend verification: `go build ./...`, `go vet ./...`, `go test ./...` (store tests need `TEST_DATABASE_URL`, else skip). - Frontend verification: `cd web && npm run build`, `npx oxlint src/` (only the two pre-existing warnings allowed: `ConfirmProvider` only-export-components, `TaskDetail` exhaustive-deps). - Commit after each task. --- ### Task 1: Schema + store (scheduling columns, run trigger, store methods) **Files:** - Create: `migrations/0004_task_scheduling.up.sql`, `migrations/0004_task_scheduling.down.sql` - Modify: `internal/store/tasks.go` (Task struct; GetTask + ListTasks SELECT/Scan; add setters) - Modify: `internal/store/runs.go` (Run struct; `CreateRun` signature; add queries) - Modify: `internal/orchestrator/orchestrator.go` (one line: the `CreateRun` call — pass `"manual"` to keep it compiling; the real trigger threading is Task 3) - Test: `internal/store/scheduling_test.go` (new) **Interfaces:** - Produces: - `store.Task` fields: `ScheduleIntervalSeconds int64` (json `schedule_interval_seconds`), `ScheduleAnchor *time.Time` (json `-`), `Broken bool` (json `broken`). - `store.Run` fields: `StartedAt time.Time` (json `started_at`), `FinishedAt *time.Time` (json `finished_at`), `Trigger string` (json `trigger`); existing fields gain json tags. - `store.SchedulableTask struct { ID int64; IntervalSeconds int64; Anchor time.Time; LastFinished *time.Time }` - `func (s *Store) SetTaskSchedule(ctx, id, intervalSeconds int64) error` - `func (s *Store) SetTaskBroken(ctx, id int64) error` - `func (s *Store) ListRunsByTask(ctx, taskID int64) ([]Run, error)` - `func (s *Store) LastFinishedRunAt(ctx, taskID int64) (*time.Time, error)` - `func (s *Store) ListSchedulableTasks(ctx) ([]SchedulableTask, error)` - `func (s *Store) CreateRun(ctx, taskID int64, trigger string) (int64, error)` (signature change) - [ ] **Step 1: Write the migration** `migrations/0004_task_scheduling.up.sql`: ```sql ALTER TABLE tasks ADD COLUMN schedule_interval_seconds INT NOT NULL DEFAULT 0; ALTER TABLE tasks ADD COLUMN schedule_anchor TIMESTAMPTZ; ALTER TABLE tasks ADD COLUMN broken BOOLEAN NOT NULL DEFAULT false; ALTER TABLE runs ADD COLUMN trigger TEXT NOT NULL DEFAULT 'manual'; ``` `migrations/0004_task_scheduling.down.sql`: ```sql ALTER TABLE runs DROP COLUMN trigger; ALTER TABLE tasks DROP COLUMN broken; ALTER TABLE tasks DROP COLUMN schedule_anchor; ALTER TABLE tasks DROP COLUMN schedule_interval_seconds; ``` Apply to the test DB before running store tests: `docker exec -i imapcopier-testpg psql -U imap -d imapcopier_test -q -v ON_ERROR_STOP=1 < migrations/0004_task_scheduling.up.sql` - [ ] **Step 2: Write the failing store test** `internal/store/scheduling_test.go`: ```go package store import ( "context" "testing" ) func TestTaskScheduleAndBroken(t *testing.T) { s := testStore(t) ctx := context.Background() epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"}) epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"}) taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst}) // Defaults: no schedule, not broken. tk, _ := s.GetTask(ctx, taskID) if tk.ScheduleIntervalSeconds != 0 || tk.Broken || tk.ScheduleAnchor != nil { t.Fatalf("defaults: interval=%d broken=%v anchor=%v", tk.ScheduleIntervalSeconds, tk.Broken, tk.ScheduleAnchor) } // Enable → interval set, anchor stamped, appears in schedulable list. if err := s.SetTaskSchedule(ctx, taskID, 3600); err != nil { t.Fatalf("set schedule: %v", err) } tk, _ = s.GetTask(ctx, taskID) if tk.ScheduleIntervalSeconds != 3600 || tk.ScheduleAnchor == nil { t.Fatalf("after enable: interval=%d anchor=%v", tk.ScheduleIntervalSeconds, tk.ScheduleAnchor) } sch, _ := s.ListSchedulableTasks(ctx) if len(sch) != 1 || sch[0].ID != taskID || sch[0].IntervalSeconds != 3600 || sch[0].LastFinished != nil { t.Fatalf("schedulable: %+v", sch) } // Breaker → broken true, interval 0, drops out of schedulable list. if err := s.SetTaskBroken(ctx, taskID); err != nil { t.Fatalf("set broken: %v", err) } tk, _ = s.GetTask(ctx, taskID) if !tk.Broken || tk.ScheduleIntervalSeconds != 0 { t.Fatalf("after break: broken=%v interval=%d", tk.Broken, tk.ScheduleIntervalSeconds) } if sch, _ := s.ListSchedulableTasks(ctx); len(sch) != 0 { t.Fatalf("broken task still schedulable: %+v", sch) } // Re-enable clears broken. _ = s.SetTaskSchedule(ctx, taskID, 21600) if tk, _ = s.GetTask(ctx, taskID); tk.Broken { t.Fatalf("re-enable did not clear broken") } } func TestListRunsByTaskWithTrigger(t *testing.T) { s := testStore(t) ctx := context.Background() epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"}) epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"}) taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst}) r1, _ := s.CreateRun(ctx, taskID, "manual") _ = s.FinishRun(ctx, r1, "done", 5, 1, 0) _, _ = s.CreateRun(ctx, taskID, "scheduled") // still running (no finish) runs, err := s.ListRunsByTask(ctx, taskID) if err != nil || len(runs) != 2 { t.Fatalf("list runs: %v len=%d", err, len(runs)) } // newest first if runs[0].Trigger != "scheduled" || runs[0].FinishedAt != nil { t.Fatalf("run[0]: %+v", runs[0]) } if runs[1].Trigger != "manual" || runs[1].FinishedAt == nil || runs[1].TotalCopied != 5 { t.Fatalf("run[1]: %+v", runs[1]) } // Last finished-at reflects the finished manual run. lf, _ := s.LastFinishedRunAt(ctx, taskID) if lf == nil { t.Fatalf("LastFinishedRunAt nil, want the finished run's time") } } ``` - [ ] **Step 3: Run tests to verify they fail** Run: `TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./internal/store/ -run 'TestTaskScheduleAndBroken|TestListRunsByTaskWithTrigger'` Expected: FAIL to compile — new struct fields, `SetTaskSchedule`, `SetTaskBroken`, `ListRunsByTask`, `LastFinishedRunAt`, `ListSchedulableTasks`, and the 3-arg `CreateRun` don't exist yet. - [ ] **Step 4: Extend `store.Task` and its scans** In `internal/store/tasks.go`, add `import "time"`, and extend the struct: ```go type Task struct { ID int64 `json:"id"` Name string `json:"name"` SrcEndpointID int64 `json:"src_endpoint_id"` DstEndpointID int64 `json:"dst_endpoint_id"` Status string `json:"status"` FolderMapping map[string]string `json:"folder_mapping"` ScheduleIntervalSeconds int64 `json:"schedule_interval_seconds"` ScheduleAnchor *time.Time `json:"-"` Broken bool `json:"broken"` } ``` Update `GetTask`'s SELECT + Scan: ```go err := s.Pool.QueryRow(ctx, `SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping, schedule_interval_seconds, schedule_anchor, broken FROM tasks WHERE id=$1`, id). Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping, &t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken) ``` Update `ListTasks`'s SELECT + Scan the same way (add the three columns and three scan targets): ```go rows, err := s.Pool.Query(ctx, `SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping, schedule_interval_seconds, schedule_anchor, broken FROM tasks ORDER BY id DESC`) ... if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping, &t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken); err != nil { ``` - [ ] **Step 5: Add task scheduling setters + schedulable query** Append to `internal/store/tasks.go`: ```go // SetTaskSchedule sets the recurrence interval (0 = off), stamps the anchor when // enabling (the first run is one interval after this), and clears broken — any // explicit schedule change un-breaks the task. func (s *Store) SetTaskSchedule(ctx context.Context, id, intervalSeconds int64) error { _, err := s.Pool.Exec(ctx, `UPDATE tasks SET schedule_interval_seconds=$2, schedule_anchor = CASE WHEN $2 > 0 THEN now() ELSE schedule_anchor END, broken = false WHERE id=$1`, id, intervalSeconds) return err } // SetTaskBroken trips the schedule breaker: disables the schedule and flags the // task for operator attention. func (s *Store) SetTaskBroken(ctx context.Context, id int64) error { _, err := s.Pool.Exec(ctx, `UPDATE tasks SET broken=true, schedule_interval_seconds=0 WHERE id=$1`, id) return err } // SchedulableTask is the per-tick decision input for the scheduler. type SchedulableTask struct { ID int64 IntervalSeconds int64 Anchor time.Time LastFinished *time.Time } // ListSchedulableTasks returns tasks eligible to auto-run: schedule on, not // broken, not currently running — each joined with its last finished run time. func (s *Store) ListSchedulableTasks(ctx context.Context) ([]SchedulableTask, error) { rows, err := s.Pool.Query(ctx, `SELECT t.id, t.schedule_interval_seconds, t.schedule_anchor, (SELECT max(finished_at) FROM runs r WHERE r.task_id=t.id AND r.finished_at IS NOT NULL) FROM tasks t WHERE t.schedule_interval_seconds > 0 AND NOT t.broken AND t.status <> 'running'`) if err != nil { return nil, err } defer rows.Close() out := []SchedulableTask{} for rows.Next() { var st SchedulableTask if err := rows.Scan(&st.ID, &st.IntervalSeconds, &st.Anchor, &st.LastFinished); err != nil { return nil, err } out = append(out, st) } return out, rows.Err() } ``` - [ ] **Step 6: Extend `store.Run`, `CreateRun`, and add run queries** In `internal/store/runs.go`, add `import "time"` (alongside context) and replace the struct + CreateRun, then add the queries: ```go type Run struct { ID int64 `json:"id"` TaskID int64 `json:"task_id"` Status string `json:"status"` StartedAt time.Time `json:"started_at"` FinishedAt *time.Time `json:"finished_at"` TotalCopied int64 `json:"total_copied"` TotalSkipped int64 `json:"total_skipped"` TotalErrors int64 `json:"total_errors"` Trigger string `json:"trigger"` } func (s *Store) CreateRun(ctx context.Context, taskID int64, trigger string) (int64, error) { var id int64 err := s.Pool.QueryRow(ctx, `INSERT INTO runs (task_id, trigger) VALUES ($1,$2) RETURNING id`, taskID, trigger).Scan(&id) return id, err } // ListRunsByTask returns a task's runs, newest first, for the run-log modal. func (s *Store) ListRunsByTask(ctx context.Context, taskID int64) ([]Run, error) { rows, err := s.Pool.Query(ctx, `SELECT id, task_id, started_at, finished_at, status, total_copied, total_skipped, total_errors, trigger FROM runs WHERE task_id=$1 ORDER BY id DESC`, taskID) if err != nil { return nil, err } defer rows.Close() out := []Run{} for rows.Next() { var r Run if err := rows.Scan(&r.ID, &r.TaskID, &r.StartedAt, &r.FinishedAt, &r.Status, &r.TotalCopied, &r.TotalSkipped, &r.TotalErrors, &r.Trigger); err != nil { return nil, err } out = append(out, r) } return out, rows.Err() } // LastFinishedRunAt returns the most recent finished run's timestamp, or nil if // the task has never completed a run — the baseline for the next scheduled run. func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) { var t *time.Time err := s.Pool.QueryRow(ctx, `SELECT max(finished_at) FROM runs WHERE task_id=$1 AND finished_at IS NOT NULL`, taskID).Scan(&t) return t, err } ``` Keep the existing `FinishRun` as-is. - [ ] **Step 7: Fix the `CreateRun` caller so the build passes** In `internal/orchestrator/orchestrator.go`, the `Run` method calls `o.store.CreateRun(ctx, taskID)`. Change it to pass a trigger literal for now: ```go runID, err := o.store.CreateRun(ctx, taskID, "manual") ``` (Task 3 threads the real trigger through; this keeps the build green now.) - [ ] **Step 8: Run the store tests to verify they pass** Run: `TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./internal/store/ -run 'TestTaskScheduleAndBroken|TestListRunsByTaskWithTrigger' -v` Expected: PASS. - [ ] **Step 9: Verify whole build + suite** Run: `go build ./... && go vet ./... && TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...` Expected: all pass. - [ ] **Step 10: Commit** ```bash git add migrations/0004_task_scheduling.up.sql migrations/0004_task_scheduling.down.sql \ internal/store/tasks.go internal/store/runs.go internal/store/scheduling_test.go \ internal/orchestrator/orchestrator.go git commit -m "feat(store): task schedule columns, run trigger, scheduling queries" ``` --- ### Task 2: Scheduler package + wiring **Files:** - Create: `internal/scheduler/scheduler.go` - Create: `internal/scheduler/scheduler_test.go` - Modify: `cmd/server/main.go` (start the scheduler) **Interfaces:** - Consumes: `store.SchedulableTask`, `store.ListSchedulableTasks`, `orchestrator.Orchestrator.Run(ctx, taskID, trigger)` (the 3-arg form lands in Task 3; this task calls it as 3-arg and both compile together only after Task 3 — see Step 5 note). - Produces: - `func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time` - `func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64` - `type Scheduler`; `func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler`; `func (s *Scheduler) Start(ctx context.Context)` - [ ] **Step 1: Write the failing pure-logic test** `internal/scheduler/scheduler_test.go`: ```go package scheduler import ( "testing" "time" "github.com/vasyansk/imap-copier/internal/store" ) func TestNextRun(t *testing.T) { anchor := time.Date(2026, 7, 3, 10, 0, 0, 0, time.UTC) fin := time.Date(2026, 7, 3, 11, 0, 0, 0, time.UTC) // No finished run yet → anchor + interval. if got := NextRun(time.Hour, anchor, nil); !got.Equal(anchor.Add(time.Hour)) { t.Fatalf("first run: got %v", got) } // Finished run exists → last finished + interval (from completion, not start). if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) { t.Fatalf("recurring: got %v", got) } } func TestDueTaskIDs(t *testing.T) { now := time.Date(2026, 7, 3, 12, 0, 0, 0, time.UTC) anchorOld := now.Add(-2 * time.Hour) // enabled 2h ago finRecent := now.Add(-30 * time.Minute) tasks := []store.SchedulableTask{ {ID: 1, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: nil}, // due: anchor+1h < now {ID: 2, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: &finRecent}, // not due: fin+1h > now {ID: 3, IntervalSeconds: 21600, Anchor: anchorOld, LastFinished: nil}, // not due: anchor+6h > now } ids := dueTaskIDs(tasks, now) if len(ids) != 1 || ids[0] != 1 { t.Fatalf("due IDs = %v, want [1]", ids) } } ``` - [ ] **Step 2: Run test to verify it fails** Run: `go test ./internal/scheduler/` Expected: FAIL — package/functions don't exist. - [ ] **Step 3: Implement the scheduler** `internal/scheduler/scheduler.go`: ```go // Package scheduler runs tasks on their recurring interval. A single goroutine // polls the DB every pollInterval and triggers due tasks through the // orchestrator. The DB is the source of truth, so the scheduler holds no state // and is safe across restarts. package scheduler import ( "context" "log/slog" "time" "github.com/vasyansk/imap-copier/internal/orchestrator" "github.com/vasyansk/imap-copier/internal/store" ) const pollInterval = 30 * time.Second // NextRun is when a task should next run: one interval after its last completed // run, or one interval after the schedule anchor if it has never completed one. func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time { base := anchor if lastFinished != nil { base = *lastFinished } return base.Add(interval) } // dueTaskIDs returns the IDs of tasks whose next run is at or before now. func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64 { var ids []int64 for _, t := range tasks { next := NextRun(time.Duration(t.IntervalSeconds)*time.Second, t.Anchor, t.LastFinished) if !now.Before(next) { ids = append(ids, t.ID) } } return ids } type Scheduler struct { store *store.Store orch *orchestrator.Orchestrator } func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler { return &Scheduler{store: st, orch: orch} } // Start blocks, ticking until ctx is cancelled. Run it in a goroutine. func (s *Scheduler) Start(ctx context.Context) { ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.tick(ctx) } } } func (s *Scheduler) tick(ctx context.Context) { tasks, err := s.store.ListSchedulableTasks(ctx) if err != nil { slog.Error("scheduler: list schedulable", "err", err) return } for _, id := range dueTaskIDs(tasks, time.Now()) { if _, err := s.orch.Run(ctx, id, "scheduled"); err != nil { // ErrAlreadyRunning / ErrNotTested are expected races/edge cases, not fatal. slog.Info("scheduler: run skipped", "task", id, "err", err) } } } ``` - [ ] **Step 4: Wire the scheduler into main** In `cmd/server/main.go`, add the import `"github.com/vasyansk/imap-copier/internal/scheduler"` and, after `orch := orchestrator.New(...)` and before `http.ListenAndServe`, start it: ```go go scheduler.New(st, orch).Start(context.Background()) ``` - [ ] **Step 5: Run the pure test** Run: `go test ./internal/scheduler/ -v` Expected: PASS. Note: `go build ./...` will FAIL until Task 3 changes `Orchestrator.Run` to the 3-arg form (`s.orch.Run(ctx, id, "scheduled")`). Run Task 3 immediately after this task; the package unit test passes now, the whole-module build goes green after Task 3. (The two tasks are sequenced back-to-back for this reason.) - [ ] **Step 6: Commit** ```bash git add internal/scheduler/scheduler.go internal/scheduler/scheduler_test.go cmd/server/main.go git commit -m "feat(scheduler): 30s polling scheduler with pure NextRun/dueTaskIDs" ``` --- ### Task 3: Orchestrator trigger threading + breaker **Files:** - Modify: `internal/orchestrator/orchestrator.go` (`Run` signature; thread trigger into `CreateRun` + `runAll`; breaker in `runAll`) - Modify: `internal/httpapi/run.go` (`handleRun` passes `"manual"`) - Test: `internal/orchestrator/breaker_test.go` (new) **Interfaces:** - Consumes: `store.SetTaskBroken` (Task 1). - Produces: - `func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error)` (was 2-arg) - `func shouldBreak(trigger string, totErr int64) bool` - [ ] **Step 1: Write the failing breaker-logic test** `internal/orchestrator/breaker_test.go`: ```go package orchestrator import "testing" func TestShouldBreak(t *testing.T) { cases := []struct { trigger string errs int64 want bool }{ {"scheduled", 2, true}, // scheduled run with errors trips the breaker {"scheduled", 0, false}, // scheduled run, clean — no break {"manual", 5, false}, // manual run never trips the breaker {"manual", 0, false}, } for _, c := range cases { if got := shouldBreak(c.trigger, c.errs); got != c.want { t.Fatalf("shouldBreak(%q,%d)=%v want %v", c.trigger, c.errs, got, c.want) } } } ``` - [ ] **Step 2: Run test to verify it fails** Run: `go test ./internal/orchestrator/ -run TestShouldBreak` Expected: FAIL — `shouldBreak` undefined. - [ ] **Step 3: Add `shouldBreak` and thread the trigger through `Run`** In `internal/orchestrator/orchestrator.go`: Add the helper (package level): ```go // shouldBreak reports whether a completed run should trip the schedule breaker: // only scheduled runs that ended with errors. func shouldBreak(trigger string, totErr int64) bool { return trigger == "scheduled" && totErr > 0 } ``` Change `Run` to accept and forward the trigger: ```go func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error) { ``` Inside `Run`, replace the temporary literal from Task 1: ```go runID, err := o.store.CreateRun(ctx, taskID, trigger) ``` And pass the trigger into the coordinator goroutine: ```go go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP, trigger) ``` Change `runAll`'s signature to receive it and trip the breaker after the final status is set: ```go func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint, trigger string) { ``` At the end of `runAll`, after `SetTaskStatus` and the `run_done` publish, add: ```go if shouldBreak(trigger, totErr) { _ = o.store.SetTaskBroken(ctx, task.ID) o.hub.Publish(wshub.Event{Type: "task_broken", TaskID: task.ID, Data: map[string]any{"task_id": task.ID, "errors": totErr}}) } ``` - [ ] **Step 4: Update the manual caller** In `internal/httpapi/run.go`, `handleRun`: ```go runID, err := s.orch.Run(r.Context(), taskID, "manual") ``` - [ ] **Step 5: Run the breaker test + full build** Run: `go test ./internal/orchestrator/ -run TestShouldBreak -v && go build ./... && go vet ./...` Expected: test PASS; build + vet clean (the scheduler's `s.orch.Run(ctx, id, "scheduled")` from Task 2 now resolves). - [ ] **Step 6: Full suite** Run: `TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...` Expected: all pass. - [ ] **Step 7: Commit** ```bash git add internal/orchestrator/orchestrator.go internal/orchestrator/breaker_test.go internal/httpapi/run.go git commit -m "feat(orchestrator): thread run trigger; breaker on scheduled runs with errors" ``` --- ### Task 4: HTTP — schedule endpoint, runs endpoint, next_run_at DTO **Files:** - Modify: `internal/httpapi/tasks.go` (`taskView` with `next_run_at`; compute in `handleGetTask`; add `handleSetSchedule`, `handleListRuns`) - Modify: `internal/httpapi/router.go` (two routes) **Interfaces:** - Consumes: `store.SetTaskSchedule`, `store.ListRunsByTask`, `store.LastFinishedRunAt`, `store.ListAccountsByTask`, `scheduler.NextRun`. - Produces: routes `PUT /api/tasks/{id}/schedule`, `GET /api/tasks/{id}/runs`; task JSON gains `next_run_at`. - [ ] **Step 1: Add the task DTO with next_run_at and compute it in handleGetTask** In `internal/httpapi/tasks.go`, add imports `"time"` and `"github.com/vasyansk/imap-copier/internal/scheduler"`. Add the view type and compute the next run: ```go // taskView augments the stored task with the server-computed next scheduled run // (RFC3339 UTC; null when the schedule is off, the task is running, or broken). type taskView struct { store.Task NextRunAt *string `json:"next_run_at"` } func (s *Server) taskViewFor(ctx context.Context, t store.Task) taskView { var nextRunAt *string if t.ScheduleIntervalSeconds > 0 && !t.Broken && t.Status != "running" && t.ScheduleAnchor != nil { last, _ := s.store.LastFinishedRunAt(ctx, t.ID) n := scheduler.NextRun(time.Duration(t.ScheduleIntervalSeconds)*time.Second, *t.ScheduleAnchor, last) str := n.UTC().Format(time.RFC3339) nextRunAt = &str } return taskView{Task: t, NextRunAt: nextRunAt} } ``` Change `handleGetTask`'s final write to use it: ```go writeJSON(w, http.StatusOK, map[string]any{"task": s.taskViewFor(r.Context(), task), "accounts": views}) ``` (`handleListTasks` stays returning `[]store.Task` — the list needs only `broken`, which serializes via the json tag; `next_run_at` is detail-only.) - [ ] **Step 2: Add the schedule + runs handlers** Append to `internal/httpapi/tasks.go`: ```go // handleSetSchedule enables/changes/disables a task's recurring schedule. // Enabling (interval>0) requires every account to pass its connection tests. func (s *Server) handleSetSchedule(w http.ResponseWriter, r *http.Request) { taskID, err := pathID(r, "id") if err != nil { http.Error(w, "bad id", http.StatusBadRequest) return } var body struct { IntervalSeconds int64 `json:"interval_seconds"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } if body.IntervalSeconds > 0 { accs, err := s.store.ListAccountsByTask(r.Context(), taskID) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if !allAccountsTested(accs) { http.Error(w, "all accounts must pass connection tests before scheduling", http.StatusConflict) return } } if err := s.store.SetTaskSchedule(r.Context(), taskID, body.IntervalSeconds); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } // allAccountsTested mirrors the orchestrator's run gate: non-empty and every // account OK on both sides. func allAccountsTested(accs []store.Account) bool { if len(accs) == 0 { return false } for _, a := range accs { if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" { return false } } return true } func (s *Server) handleListRuns(w http.ResponseWriter, r *http.Request) { taskID, err := pathID(r, "id") if err != nil { http.Error(w, "bad id", http.StatusBadRequest) return } runs, err := s.store.ListRunsByTask(r.Context(), taskID) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } writeJSON(w, http.StatusOK, runs) } ``` (`json` and `store` are already imported in `tasks.go`.) - [ ] **Step 3: Register routes** In `internal/httpapi/router.go`, after the existing `PUT /api/tasks/{id}/folder-mapping` line (or with the other task routes): ```go api.HandleFunc("PUT /api/tasks/{id}/schedule", s.handleSetSchedule) api.HandleFunc("GET /api/tasks/{id}/runs", s.handleListRuns) ``` - [ ] **Step 4: Verify build + vet + suite** Run: `go build ./... && go vet ./... && TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...` Expected: all pass. - [ ] **Step 5: Commit** ```bash git add internal/httpapi/tasks.go internal/httpapi/router.go git commit -m "feat(api): schedule + runs endpoints; next_run_at on task detail" ``` --- ### Task 5: Frontend — schedule control, next-run, broken badge, run-log modal, list icon **Files:** - Modify: `web/src/api.ts` (Task fields; `Run` type; `setTaskSchedule`, `listRuns`) - Create: `web/src/components/RunLogModal.tsx` - Modify: `web/src/pages/TaskDetail.tsx` (schedule select, next-run line, broken badge, Runs button+modal, reload on `task_broken`) - Modify: `web/src/pages/Tasks.tsx` (broken indicator in the list) - Modify: `web/src/app.css` (broken badge / schedule-row styles) **Interfaces:** - Consumes: backend routes from Task 4; `Modal` (`web/src/components/Modal.tsx`), `StatusBadge`. - [ ] **Step 1: Extend the API client** In `web/src/api.ts`, extend `Task`: ```ts export interface Task { id: number name: string src_endpoint_id: number dst_endpoint_id: number status: string folder_mapping?: Record schedule_interval_seconds?: number broken?: boolean next_run_at?: string | null } ``` Add a `Run` interface and the two calls (near `runTask`): ```ts export interface Run { id: number task_id: number status: string started_at: string finished_at: string | null total_copied: number total_skipped: number total_errors: number trigger: string } export const setTaskSchedule = (taskId: number, intervalSeconds: number) => api(`/api/tasks/${taskId}/schedule`, { ...jsonBody({ interval_seconds: intervalSeconds }), method: 'PUT' }) export const listRuns = (taskId: number) => api(`/api/tasks/${taskId}/runs`) ``` - [ ] **Step 2: Create the run-log modal** `web/src/components/RunLogModal.tsx`: ```tsx import { useEffect, useState } from 'react' import { Modal } from './Modal' import { StatusBadge } from './StatusBadge' import { listRuns, type Run } from '../api' const fmt = (iso: string | null) => (iso ? new Date(iso).toLocaleString() : '—') export function RunLogModal({ taskId, open, onClose }: { taskId: number; open: boolean; onClose: () => void }) { const [runs, setRuns] = useState(null) useEffect(() => { if (!open) return setRuns(null) listRuns(taskId).then((r) => setRuns(r ?? [])).catch(() => setRuns([])) }, [open, taskId]) return (
{runs === null ? ( ) : runs.length === 0 ? ( ) : ( runs.map((r) => ( )) )}
Started Finished Trigger Status Copied Skipped Errors
loading…
no runs yet
{fmt(r.started_at)} {fmt(r.finished_at)} {r.trigger} {r.total_copied} {r.total_skipped} {r.total_errors}
) } ``` - [ ] **Step 3: TaskDetail — schedule control, next-run, broken badge, Runs modal** In `web/src/pages/TaskDetail.tsx`: Add to the imports from `../api`: `listRuns` is not needed here; add `setTaskSchedule`. Import the modal: `import { RunLogModal } from '../components/RunLogModal'`. Add state near the other `useState`s: ```ts const [showRuns, setShowRuns] = useState(false) ``` Add `task_broken` to the reload event list (the existing `if ([...].includes(ev.type)) reload()` array) so the detail refreshes when the breaker trips: ```ts if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled', 'plan', 'task_broken'].includes(ev.type)) { reload() } ``` Add a schedule handler: ```ts async function onSchedule(intervalSeconds: number) { setError(null) try { await setTaskSchedule(id, intervalSeconds) reload() } catch (err) { setError(err instanceof Error ? err.message : 'Failed to set schedule') } } ``` In the "Run control" panel (after the `.btn-row` with Test/Run), add the schedule row. `task` is in scope from `const { task, accounts } = data`: ```tsx
{task.next_run_at && ( Next run: {new Date(task.next_run_at).toLocaleString()} )} {task.broken && broken}
``` Add the broken badge next to the status in the page head (``): ```tsx {task.broken && broken} ``` Render the modal near the other modals at the end of the returned JSX: ```tsx setShowRuns(false)} /> ``` - [ ] **Step 4: Tasks list — broken indicator** In `web/src/pages/Tasks.tsx`, in the row's Status cell, show the broken badge beside the status: ```tsx {t.broken && broken} ``` - [ ] **Step 5: CSS for the schedule row** In `web/src/app.css`, add: ```css .sched-row { display: flex; align-items: center; gap: 12px; margin-top: 14px; flex-wrap: wrap; } .sched-row label { font-size: 11px; font-weight: 700; letter-spacing: 0.1em; text-transform: uppercase; color: var(--fg-dim); } .sched-row select { background: var(--bg-inset); border: 1px solid var(--border); color: var(--fg); padding: 7px 10px; font-size: 13px; border-radius: 2px; } .sched-next { font-size: 12px; color: var(--fg-dim); font-variant-numeric: tabular-nums; } ``` - [ ] **Step 6: Verify build + lint** Run: `cd web && npm run build && npx oxlint src/` Expected: build clean; oxlint shows ONLY the two pre-existing warnings (`ConfirmProvider` only-export-components, `TaskDetail` exhaustive-deps). Fix any new finding. - [ ] **Step 7: Commit** ```bash git add web/src/api.ts web/src/components/RunLogModal.tsx web/src/pages/TaskDetail.tsx web/src/pages/Tasks.tsx web/src/app.css git commit -m "feat(web): schedule control, next-run, broken badge, run-log modal" ``` --- ### Task 6: End-to-end verification on prod Per project rules, functional verification runs on prod after deploy. No code. - [ ] **Step 1: Build + migrate + deploy** Run: `make build && make up` (migration 0004 auto-applies at startup). Wait for `curl -fsS http://localhost/healthz`. - [ ] **Step 2: Enable-gate + next-run display** On a task whose accounts are NOT all tested OK, set Schedule → the request returns 409 and the schedule stays Off (surfaced in the error banner). Test the accounts to OK, then set Schedule = Every 1h → "Next run: " appears (≈ now + 1h, in browser local time), and no run starts immediately (first run is one interval out). - [ ] **Step 3: Run-log modal** Run the task manually once. Open **runs** → the modal lists the run with local-time started/finished, `trigger = manual`, status, and copied/skipped/errors. - [ ] **Step 4: Breaker (scheduled failure → broken)** Simulate a scheduled failure: break one account (e.g. wrong stored password) on a scheduled task, then let a scheduled run occur (or temporarily observe the next scheduled cycle). When the scheduled run finishes with errors, the schedule flips to Off, the task shows the red **broken** badge in both the task detail and the Tasks list, and no further auto-runs occur until re-enabled. (Auto-run *timing* itself is covered by the `scheduler` unit tests; presets start at 1h, so the timed trigger is not observed within a quick E2E — verify the breaker/flag transition and the DB state.) - [ ] **Step 5: Delete cleanup** Delete a task that had a schedule and runs; confirm it disappears and its runs are gone (cascade). The scheduler stops considering it (no errors in logs). - [ ] **Step 6: Record result** Save an E2E note under `./swarm-report/scheduled-task-runs-.md` with pass/fail per step. --- ## Self-Review - **Spec coverage:** interval per task + auto-run (Tasks 1–3); don't-start-if-running + interval-from-completion (`ListSchedulableTasks` `status<>'running'` filter + `NextRun` from `LastFinished`, Tasks 1–2); next-run in browser local time (`next_run_at` DTO Task 4 + `toLocaleString` Task 5); run-log modal with per-run status/totals (Tasks 4–5); enable-gate all-OK → 409 (Task 4); breaker scheduled-error → broken + red icon in list & detail (Tasks 3, 5); first-run-after-interval via anchor (Task 1 `SetTaskSchedule` + Task 2 `NextRun`); delete cleanup via cascade (noted, no code); migration 0004 (Task 1). All covered. - **Placeholder scan:** none — every step carries real code/commands. - **Type consistency:** `CreateRun(ctx, taskID, trigger)` identical across Tasks 1/3; `Run(ctx, taskID, trigger)` defined Task 3, called by scheduler Task 2 and `handleRun` Task 3; `NextRun(interval, anchor, lastFinished)` identical in scheduler (Task 2) and `taskViewFor` (Task 4); `SchedulableTask{ID,IntervalSeconds,Anchor,LastFinished}` fields match between store (Task 1) and `dueTaskIDs` (Task 2); JSON contract `schedule_interval_seconds`/`broken`/`next_run_at`/`interval_seconds` and `Run` fields match between Go (Tasks 1,4) and TS (Task 5). - **Sequencing note:** Task 2's whole-module build is red until Task 3 lands the 3-arg `Run` (the scheduler calls it). Both are flagged to run back-to-back; each task's own unit test passes independently.