Files
imap-copier/docs/superpowers/plans/2026-07-03-scheduled-task-runs.md

1015 lines
38 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Scheduled (Recurring) Task Runs Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Let a task run itself on a recurring interval, with a breaker that disables the schedule and flags the task broken when a scheduled run errors, plus a run-log modal and next-run display.
**Architecture:** A single in-process scheduler goroutine (30 s ticker) reads schedulable tasks from the DB each tick and triggers due ones through `Orchestrator.Run` with a `"scheduled"` trigger. Schedule config lives on `tasks` (interval, anchor, broken); `runs.trigger` distinguishes manual vs scheduled. DB is the source of truth, so the scheduler is stateless and restart-safe. The next-run time is computed server-side and formatted in the browser's local time.
**Tech Stack:** Go (net/http, pgx v5, golang-migrate), Postgres 18, React 19 + Vite + TypeScript.
## Global Constraints
- Scheduler poll interval: **30 s** (a package const).
- Interval presets → seconds: Off=0, 1h=3600, 3h=10800, 6h=21600, 12h=43200, 24h=86400.
- Breaker trips **only on scheduled runs** (`trigger == "scheduled"` AND `total_errors > 0`) → `SetTaskBroken` (sets `broken=true, schedule_interval_seconds=0`). Manual runs never trip it.
- Enabling a schedule (interval>0) requires **all accounts tested OK**; otherwise HTTP **409**. Disabling (0) is always allowed and clears `broken`.
- First run after enabling = `schedule_anchor + interval`; subsequent = `last_finished_at + interval`. Never run while the task is `running`.
- `next_run_at`: RFC3339 UTC string, **null** when schedule off / task running / broken.
- Non-destructive migration; `ON DELETE CASCADE` already removes `runs` on task delete — no new cleanup code.
- pgx v5 scans nullable `TIMESTAMPTZ` into `*time.Time` and non-null into `time.Time`.
- Backend verification: `go build ./...`, `go vet ./...`, `go test ./...` (store tests need `TEST_DATABASE_URL`, else skip).
- Frontend verification: `cd web && npm run build`, `npx oxlint src/` (only the two pre-existing warnings allowed: `ConfirmProvider` only-export-components, `TaskDetail` exhaustive-deps).
- Commit after each task.
---
### Task 1: Schema + store (scheduling columns, run trigger, store methods)
**Files:**
- Create: `migrations/0004_task_scheduling.up.sql`, `migrations/0004_task_scheduling.down.sql`
- Modify: `internal/store/tasks.go` (Task struct; GetTask + ListTasks SELECT/Scan; add setters)
- Modify: `internal/store/runs.go` (Run struct; `CreateRun` signature; add queries)
- Modify: `internal/orchestrator/orchestrator.go` (one line: the `CreateRun` call — pass `"manual"` to keep it compiling; the real trigger threading is Task 3)
- Test: `internal/store/scheduling_test.go` (new)
**Interfaces:**
- Produces:
- `store.Task` fields: `ScheduleIntervalSeconds int64` (json `schedule_interval_seconds`), `ScheduleAnchor *time.Time` (json `-`), `Broken bool` (json `broken`).
- `store.Run` fields: `StartedAt time.Time` (json `started_at`), `FinishedAt *time.Time` (json `finished_at`), `Trigger string` (json `trigger`); existing fields gain json tags.
- `store.SchedulableTask struct { ID int64; IntervalSeconds int64; Anchor time.Time; LastFinished *time.Time }`
- `func (s *Store) SetTaskSchedule(ctx, id, intervalSeconds int64) error`
- `func (s *Store) SetTaskBroken(ctx, id int64) error`
- `func (s *Store) ListRunsByTask(ctx, taskID int64) ([]Run, error)`
- `func (s *Store) LastFinishedRunAt(ctx, taskID int64) (*time.Time, error)`
- `func (s *Store) ListSchedulableTasks(ctx) ([]SchedulableTask, error)`
- `func (s *Store) CreateRun(ctx, taskID int64, trigger string) (int64, error)` (signature change)
- [ ] **Step 1: Write the migration**
`migrations/0004_task_scheduling.up.sql`:
```sql
ALTER TABLE tasks ADD COLUMN schedule_interval_seconds INT NOT NULL DEFAULT 0;
ALTER TABLE tasks ADD COLUMN schedule_anchor TIMESTAMPTZ;
ALTER TABLE tasks ADD COLUMN broken BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE runs ADD COLUMN trigger TEXT NOT NULL DEFAULT 'manual';
```
`migrations/0004_task_scheduling.down.sql`:
```sql
ALTER TABLE runs DROP COLUMN trigger;
ALTER TABLE tasks DROP COLUMN broken;
ALTER TABLE tasks DROP COLUMN schedule_anchor;
ALTER TABLE tasks DROP COLUMN schedule_interval_seconds;
```
Apply to the test DB before running store tests:
`docker exec -i imapcopier-testpg psql -U imap -d imapcopier_test -q -v ON_ERROR_STOP=1 < migrations/0004_task_scheduling.up.sql`
- [ ] **Step 2: Write the failing store test**
`internal/store/scheduling_test.go`:
```go
package store
import (
"context"
"testing"
)
func TestTaskScheduleAndBroken(t *testing.T) {
s := testStore(t)
ctx := context.Background()
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
// Defaults: no schedule, not broken.
tk, _ := s.GetTask(ctx, taskID)
if tk.ScheduleIntervalSeconds != 0 || tk.Broken || tk.ScheduleAnchor != nil {
t.Fatalf("defaults: interval=%d broken=%v anchor=%v", tk.ScheduleIntervalSeconds, tk.Broken, tk.ScheduleAnchor)
}
// Enable → interval set, anchor stamped, appears in schedulable list.
if err := s.SetTaskSchedule(ctx, taskID, 3600); err != nil {
t.Fatalf("set schedule: %v", err)
}
tk, _ = s.GetTask(ctx, taskID)
if tk.ScheduleIntervalSeconds != 3600 || tk.ScheduleAnchor == nil {
t.Fatalf("after enable: interval=%d anchor=%v", tk.ScheduleIntervalSeconds, tk.ScheduleAnchor)
}
sch, _ := s.ListSchedulableTasks(ctx)
if len(sch) != 1 || sch[0].ID != taskID || sch[0].IntervalSeconds != 3600 || sch[0].LastFinished != nil {
t.Fatalf("schedulable: %+v", sch)
}
// Breaker → broken true, interval 0, drops out of schedulable list.
if err := s.SetTaskBroken(ctx, taskID); err != nil {
t.Fatalf("set broken: %v", err)
}
tk, _ = s.GetTask(ctx, taskID)
if !tk.Broken || tk.ScheduleIntervalSeconds != 0 {
t.Fatalf("after break: broken=%v interval=%d", tk.Broken, tk.ScheduleIntervalSeconds)
}
if sch, _ := s.ListSchedulableTasks(ctx); len(sch) != 0 {
t.Fatalf("broken task still schedulable: %+v", sch)
}
// Re-enable clears broken.
_ = s.SetTaskSchedule(ctx, taskID, 21600)
if tk, _ = s.GetTask(ctx, taskID); tk.Broken {
t.Fatalf("re-enable did not clear broken")
}
}
func TestListRunsByTaskWithTrigger(t *testing.T) {
s := testStore(t)
ctx := context.Background()
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
r1, _ := s.CreateRun(ctx, taskID, "manual")
_ = s.FinishRun(ctx, r1, "done", 5, 1, 0)
_, _ = s.CreateRun(ctx, taskID, "scheduled") // still running (no finish)
runs, err := s.ListRunsByTask(ctx, taskID)
if err != nil || len(runs) != 2 {
t.Fatalf("list runs: %v len=%d", err, len(runs))
}
// newest first
if runs[0].Trigger != "scheduled" || runs[0].FinishedAt != nil {
t.Fatalf("run[0]: %+v", runs[0])
}
if runs[1].Trigger != "manual" || runs[1].FinishedAt == nil || runs[1].TotalCopied != 5 {
t.Fatalf("run[1]: %+v", runs[1])
}
// Last finished-at reflects the finished manual run.
lf, _ := s.LastFinishedRunAt(ctx, taskID)
if lf == nil {
t.Fatalf("LastFinishedRunAt nil, want the finished run's time")
}
}
```
- [ ] **Step 3: Run tests to verify they fail**
Run: `TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./internal/store/ -run 'TestTaskScheduleAndBroken|TestListRunsByTaskWithTrigger'`
Expected: FAIL to compile — new struct fields, `SetTaskSchedule`, `SetTaskBroken`, `ListRunsByTask`, `LastFinishedRunAt`, `ListSchedulableTasks`, and the 3-arg `CreateRun` don't exist yet.
- [ ] **Step 4: Extend `store.Task` and its scans**
In `internal/store/tasks.go`, add `import "time"`, and extend the struct:
```go
type Task struct {
ID int64 `json:"id"`
Name string `json:"name"`
SrcEndpointID int64 `json:"src_endpoint_id"`
DstEndpointID int64 `json:"dst_endpoint_id"`
Status string `json:"status"`
FolderMapping map[string]string `json:"folder_mapping"`
ScheduleIntervalSeconds int64 `json:"schedule_interval_seconds"`
ScheduleAnchor *time.Time `json:"-"`
Broken bool `json:"broken"`
}
```
Update `GetTask`'s SELECT + Scan:
```go
err := s.Pool.QueryRow(ctx,
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping,
schedule_interval_seconds, schedule_anchor, broken
FROM tasks WHERE id=$1`, id).
Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping,
&t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken)
```
Update `ListTasks`'s SELECT + Scan the same way (add the three columns and three scan targets):
```go
rows, err := s.Pool.Query(ctx,
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping,
schedule_interval_seconds, schedule_anchor, broken
FROM tasks ORDER BY id DESC`)
...
if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping,
&t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken); err != nil {
```
- [ ] **Step 5: Add task scheduling setters + schedulable query**
Append to `internal/store/tasks.go`:
```go
// SetTaskSchedule sets the recurrence interval (0 = off), stamps the anchor when
// enabling (the first run is one interval after this), and clears broken — any
// explicit schedule change un-breaks the task.
func (s *Store) SetTaskSchedule(ctx context.Context, id, intervalSeconds int64) error {
_, err := s.Pool.Exec(ctx,
`UPDATE tasks SET schedule_interval_seconds=$2,
schedule_anchor = CASE WHEN $2 > 0 THEN now() ELSE schedule_anchor END,
broken = false
WHERE id=$1`, id, intervalSeconds)
return err
}
// SetTaskBroken trips the schedule breaker: disables the schedule and flags the
// task for operator attention.
func (s *Store) SetTaskBroken(ctx context.Context, id int64) error {
_, err := s.Pool.Exec(ctx,
`UPDATE tasks SET broken=true, schedule_interval_seconds=0 WHERE id=$1`, id)
return err
}
// SchedulableTask is the per-tick decision input for the scheduler.
type SchedulableTask struct {
ID int64
IntervalSeconds int64
Anchor time.Time
LastFinished *time.Time
}
// ListSchedulableTasks returns tasks eligible to auto-run: schedule on, not
// broken, not currently running — each joined with its last finished run time.
func (s *Store) ListSchedulableTasks(ctx context.Context) ([]SchedulableTask, error) {
rows, err := s.Pool.Query(ctx,
`SELECT t.id, t.schedule_interval_seconds, t.schedule_anchor,
(SELECT max(finished_at) FROM runs r WHERE r.task_id=t.id AND r.finished_at IS NOT NULL)
FROM tasks t
WHERE t.schedule_interval_seconds > 0 AND NOT t.broken AND t.status <> 'running'`)
if err != nil {
return nil, err
}
defer rows.Close()
out := []SchedulableTask{}
for rows.Next() {
var st SchedulableTask
if err := rows.Scan(&st.ID, &st.IntervalSeconds, &st.Anchor, &st.LastFinished); err != nil {
return nil, err
}
out = append(out, st)
}
return out, rows.Err()
}
```
- [ ] **Step 6: Extend `store.Run`, `CreateRun`, and add run queries**
In `internal/store/runs.go`, add `import "time"` (alongside context) and replace the struct + CreateRun, then add the queries:
```go
type Run struct {
ID int64 `json:"id"`
TaskID int64 `json:"task_id"`
Status string `json:"status"`
StartedAt time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at"`
TotalCopied int64 `json:"total_copied"`
TotalSkipped int64 `json:"total_skipped"`
TotalErrors int64 `json:"total_errors"`
Trigger string `json:"trigger"`
}
func (s *Store) CreateRun(ctx context.Context, taskID int64, trigger string) (int64, error) {
var id int64
err := s.Pool.QueryRow(ctx,
`INSERT INTO runs (task_id, trigger) VALUES ($1,$2) RETURNING id`, taskID, trigger).Scan(&id)
return id, err
}
// ListRunsByTask returns a task's runs, newest first, for the run-log modal.
func (s *Store) ListRunsByTask(ctx context.Context, taskID int64) ([]Run, error) {
rows, err := s.Pool.Query(ctx,
`SELECT id, task_id, started_at, finished_at, status,
total_copied, total_skipped, total_errors, trigger
FROM runs WHERE task_id=$1 ORDER BY id DESC`, taskID)
if err != nil {
return nil, err
}
defer rows.Close()
out := []Run{}
for rows.Next() {
var r Run
if err := rows.Scan(&r.ID, &r.TaskID, &r.StartedAt, &r.FinishedAt, &r.Status,
&r.TotalCopied, &r.TotalSkipped, &r.TotalErrors, &r.Trigger); err != nil {
return nil, err
}
out = append(out, r)
}
return out, rows.Err()
}
// LastFinishedRunAt returns the most recent finished run's timestamp, or nil if
// the task has never completed a run — the baseline for the next scheduled run.
func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) {
var t *time.Time
err := s.Pool.QueryRow(ctx,
`SELECT max(finished_at) FROM runs WHERE task_id=$1 AND finished_at IS NOT NULL`, taskID).Scan(&t)
return t, err
}
```
Keep the existing `FinishRun` as-is.
- [ ] **Step 7: Fix the `CreateRun` caller so the build passes**
In `internal/orchestrator/orchestrator.go`, the `Run` method calls `o.store.CreateRun(ctx, taskID)`. Change it to pass a trigger literal for now:
```go
runID, err := o.store.CreateRun(ctx, taskID, "manual")
```
(Task 3 threads the real trigger through; this keeps the build green now.)
- [ ] **Step 8: Run the store tests to verify they pass**
Run: `TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./internal/store/ -run 'TestTaskScheduleAndBroken|TestListRunsByTaskWithTrigger' -v`
Expected: PASS.
- [ ] **Step 9: Verify whole build + suite**
Run: `go build ./... && go vet ./... && TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...`
Expected: all pass.
- [ ] **Step 10: Commit**
```bash
git add migrations/0004_task_scheduling.up.sql migrations/0004_task_scheduling.down.sql \
internal/store/tasks.go internal/store/runs.go internal/store/scheduling_test.go \
internal/orchestrator/orchestrator.go
git commit -m "feat(store): task schedule columns, run trigger, scheduling queries"
```
---
### Task 2: Scheduler package + wiring
**Files:**
- Create: `internal/scheduler/scheduler.go`
- Create: `internal/scheduler/scheduler_test.go`
- Modify: `cmd/server/main.go` (start the scheduler)
**Interfaces:**
- Consumes: `store.SchedulableTask`, `store.ListSchedulableTasks`, `orchestrator.Orchestrator.Run(ctx, taskID, trigger)` (the 3-arg form lands in Task 3; this task calls it as 3-arg and both compile together only after Task 3 — see Step 5 note).
- Produces:
- `func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time`
- `func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64`
- `type Scheduler`; `func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler`; `func (s *Scheduler) Start(ctx context.Context)`
- [ ] **Step 1: Write the failing pure-logic test**
`internal/scheduler/scheduler_test.go`:
```go
package scheduler
import (
"testing"
"time"
"github.com/vasyansk/imap-copier/internal/store"
)
func TestNextRun(t *testing.T) {
anchor := time.Date(2026, 7, 3, 10, 0, 0, 0, time.UTC)
fin := time.Date(2026, 7, 3, 11, 0, 0, 0, time.UTC)
// No finished run yet → anchor + interval.
if got := NextRun(time.Hour, anchor, nil); !got.Equal(anchor.Add(time.Hour)) {
t.Fatalf("first run: got %v", got)
}
// Finished run exists → last finished + interval (from completion, not start).
if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) {
t.Fatalf("recurring: got %v", got)
}
}
func TestDueTaskIDs(t *testing.T) {
now := time.Date(2026, 7, 3, 12, 0, 0, 0, time.UTC)
anchorOld := now.Add(-2 * time.Hour) // enabled 2h ago
finRecent := now.Add(-30 * time.Minute)
tasks := []store.SchedulableTask{
{ID: 1, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: nil}, // due: anchor+1h < now
{ID: 2, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: &finRecent}, // not due: fin+1h > now
{ID: 3, IntervalSeconds: 21600, Anchor: anchorOld, LastFinished: nil}, // not due: anchor+6h > now
}
ids := dueTaskIDs(tasks, now)
if len(ids) != 1 || ids[0] != 1 {
t.Fatalf("due IDs = %v, want [1]", ids)
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/scheduler/`
Expected: FAIL — package/functions don't exist.
- [ ] **Step 3: Implement the scheduler**
`internal/scheduler/scheduler.go`:
```go
// Package scheduler runs tasks on their recurring interval. A single goroutine
// polls the DB every pollInterval and triggers due tasks through the
// orchestrator. The DB is the source of truth, so the scheduler holds no state
// and is safe across restarts.
package scheduler
import (
"context"
"log/slog"
"time"
"github.com/vasyansk/imap-copier/internal/orchestrator"
"github.com/vasyansk/imap-copier/internal/store"
)
const pollInterval = 30 * time.Second
// NextRun is when a task should next run: one interval after its last completed
// run, or one interval after the schedule anchor if it has never completed one.
func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time {
base := anchor
if lastFinished != nil {
base = *lastFinished
}
return base.Add(interval)
}
// dueTaskIDs returns the IDs of tasks whose next run is at or before now.
func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64 {
var ids []int64
for _, t := range tasks {
next := NextRun(time.Duration(t.IntervalSeconds)*time.Second, t.Anchor, t.LastFinished)
if !now.Before(next) {
ids = append(ids, t.ID)
}
}
return ids
}
type Scheduler struct {
store *store.Store
orch *orchestrator.Orchestrator
}
func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler {
return &Scheduler{store: st, orch: orch}
}
// Start blocks, ticking until ctx is cancelled. Run it in a goroutine.
func (s *Scheduler) Start(ctx context.Context) {
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.tick(ctx)
}
}
}
func (s *Scheduler) tick(ctx context.Context) {
tasks, err := s.store.ListSchedulableTasks(ctx)
if err != nil {
slog.Error("scheduler: list schedulable", "err", err)
return
}
for _, id := range dueTaskIDs(tasks, time.Now()) {
if _, err := s.orch.Run(ctx, id, "scheduled"); err != nil {
// ErrAlreadyRunning / ErrNotTested are expected races/edge cases, not fatal.
slog.Info("scheduler: run skipped", "task", id, "err", err)
}
}
}
```
- [ ] **Step 4: Wire the scheduler into main**
In `cmd/server/main.go`, add the import `"github.com/vasyansk/imap-copier/internal/scheduler"` and, after `orch := orchestrator.New(...)` and before `http.ListenAndServe`, start it:
```go
go scheduler.New(st, orch).Start(context.Background())
```
- [ ] **Step 5: Run the pure test**
Run: `go test ./internal/scheduler/ -v`
Expected: PASS.
Note: `go build ./...` will FAIL until Task 3 changes `Orchestrator.Run` to the 3-arg form (`s.orch.Run(ctx, id, "scheduled")`). Run Task 3 immediately after this task; the package unit test passes now, the whole-module build goes green after Task 3. (The two tasks are sequenced back-to-back for this reason.)
- [ ] **Step 6: Commit**
```bash
git add internal/scheduler/scheduler.go internal/scheduler/scheduler_test.go cmd/server/main.go
git commit -m "feat(scheduler): 30s polling scheduler with pure NextRun/dueTaskIDs"
```
---
### Task 3: Orchestrator trigger threading + breaker
**Files:**
- Modify: `internal/orchestrator/orchestrator.go` (`Run` signature; thread trigger into `CreateRun` + `runAll`; breaker in `runAll`)
- Modify: `internal/httpapi/run.go` (`handleRun` passes `"manual"`)
- Test: `internal/orchestrator/breaker_test.go` (new)
**Interfaces:**
- Consumes: `store.SetTaskBroken` (Task 1).
- Produces:
- `func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error)` (was 2-arg)
- `func shouldBreak(trigger string, totErr int64) bool`
- [ ] **Step 1: Write the failing breaker-logic test**
`internal/orchestrator/breaker_test.go`:
```go
package orchestrator
import "testing"
func TestShouldBreak(t *testing.T) {
cases := []struct {
trigger string
errs int64
want bool
}{
{"scheduled", 2, true}, // scheduled run with errors trips the breaker
{"scheduled", 0, false}, // scheduled run, clean — no break
{"manual", 5, false}, // manual run never trips the breaker
{"manual", 0, false},
}
for _, c := range cases {
if got := shouldBreak(c.trigger, c.errs); got != c.want {
t.Fatalf("shouldBreak(%q,%d)=%v want %v", c.trigger, c.errs, got, c.want)
}
}
}
```
- [ ] **Step 2: Run test to verify it fails**
Run: `go test ./internal/orchestrator/ -run TestShouldBreak`
Expected: FAIL — `shouldBreak` undefined.
- [ ] **Step 3: Add `shouldBreak` and thread the trigger through `Run`**
In `internal/orchestrator/orchestrator.go`:
Add the helper (package level):
```go
// shouldBreak reports whether a completed run should trip the schedule breaker:
// only scheduled runs that ended with errors.
func shouldBreak(trigger string, totErr int64) bool {
return trigger == "scheduled" && totErr > 0
}
```
Change `Run` to accept and forward the trigger:
```go
func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error) {
```
Inside `Run`, replace the temporary literal from Task 1:
```go
runID, err := o.store.CreateRun(ctx, taskID, trigger)
```
And pass the trigger into the coordinator goroutine:
```go
go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP, trigger)
```
Change `runAll`'s signature to receive it and trip the breaker after the final status is set:
```go
func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint, trigger string) {
```
At the end of `runAll`, after `SetTaskStatus` and the `run_done` publish, add:
```go
if shouldBreak(trigger, totErr) {
_ = o.store.SetTaskBroken(ctx, task.ID)
o.hub.Publish(wshub.Event{Type: "task_broken", TaskID: task.ID,
Data: map[string]any{"task_id": task.ID, "errors": totErr}})
}
```
- [ ] **Step 4: Update the manual caller**
In `internal/httpapi/run.go`, `handleRun`:
```go
runID, err := s.orch.Run(r.Context(), taskID, "manual")
```
- [ ] **Step 5: Run the breaker test + full build**
Run: `go test ./internal/orchestrator/ -run TestShouldBreak -v && go build ./... && go vet ./...`
Expected: test PASS; build + vet clean (the scheduler's `s.orch.Run(ctx, id, "scheduled")` from Task 2 now resolves).
- [ ] **Step 6: Full suite**
Run: `TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...`
Expected: all pass.
- [ ] **Step 7: Commit**
```bash
git add internal/orchestrator/orchestrator.go internal/orchestrator/breaker_test.go internal/httpapi/run.go
git commit -m "feat(orchestrator): thread run trigger; breaker on scheduled runs with errors"
```
---
### Task 4: HTTP — schedule endpoint, runs endpoint, next_run_at DTO
**Files:**
- Modify: `internal/httpapi/tasks.go` (`taskView` with `next_run_at`; compute in `handleGetTask`; add `handleSetSchedule`, `handleListRuns`)
- Modify: `internal/httpapi/router.go` (two routes)
**Interfaces:**
- Consumes: `store.SetTaskSchedule`, `store.ListRunsByTask`, `store.LastFinishedRunAt`, `store.ListAccountsByTask`, `scheduler.NextRun`.
- Produces: routes `PUT /api/tasks/{id}/schedule`, `GET /api/tasks/{id}/runs`; task JSON gains `next_run_at`.
- [ ] **Step 1: Add the task DTO with next_run_at and compute it in handleGetTask**
In `internal/httpapi/tasks.go`, add imports `"time"` and `"github.com/vasyansk/imap-copier/internal/scheduler"`. Add the view type and compute the next run:
```go
// taskView augments the stored task with the server-computed next scheduled run
// (RFC3339 UTC; null when the schedule is off, the task is running, or broken).
type taskView struct {
store.Task
NextRunAt *string `json:"next_run_at"`
}
func (s *Server) taskViewFor(ctx context.Context, t store.Task) taskView {
var nextRunAt *string
if t.ScheduleIntervalSeconds > 0 && !t.Broken && t.Status != "running" && t.ScheduleAnchor != nil {
last, _ := s.store.LastFinishedRunAt(ctx, t.ID)
n := scheduler.NextRun(time.Duration(t.ScheduleIntervalSeconds)*time.Second, *t.ScheduleAnchor, last)
str := n.UTC().Format(time.RFC3339)
nextRunAt = &str
}
return taskView{Task: t, NextRunAt: nextRunAt}
}
```
Change `handleGetTask`'s final write to use it:
```go
writeJSON(w, http.StatusOK, map[string]any{"task": s.taskViewFor(r.Context(), task), "accounts": views})
```
(`handleListTasks` stays returning `[]store.Task` — the list needs only `broken`, which serializes via the json tag; `next_run_at` is detail-only.)
- [ ] **Step 2: Add the schedule + runs handlers**
Append to `internal/httpapi/tasks.go`:
```go
// handleSetSchedule enables/changes/disables a task's recurring schedule.
// Enabling (interval>0) requires every account to pass its connection tests.
func (s *Server) handleSetSchedule(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
var body struct {
IntervalSeconds int64 `json:"interval_seconds"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
if body.IntervalSeconds > 0 {
accs, err := s.store.ListAccountsByTask(r.Context(), taskID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !allAccountsTested(accs) {
http.Error(w, "all accounts must pass connection tests before scheduling", http.StatusConflict)
return
}
}
if err := s.store.SetTaskSchedule(r.Context(), taskID, body.IntervalSeconds); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
// allAccountsTested mirrors the orchestrator's run gate: non-empty and every
// account OK on both sides.
func allAccountsTested(accs []store.Account) bool {
if len(accs) == 0 {
return false
}
for _, a := range accs {
if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" {
return false
}
}
return true
}
func (s *Server) handleListRuns(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
runs, err := s.store.ListRunsByTask(r.Context(), taskID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, runs)
}
```
(`json` and `store` are already imported in `tasks.go`.)
- [ ] **Step 3: Register routes**
In `internal/httpapi/router.go`, after the existing `PUT /api/tasks/{id}/folder-mapping` line (or with the other task routes):
```go
api.HandleFunc("PUT /api/tasks/{id}/schedule", s.handleSetSchedule)
api.HandleFunc("GET /api/tasks/{id}/runs", s.handleListRuns)
```
- [ ] **Step 4: Verify build + vet + suite**
Run: `go build ./... && go vet ./... && TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...`
Expected: all pass.
- [ ] **Step 5: Commit**
```bash
git add internal/httpapi/tasks.go internal/httpapi/router.go
git commit -m "feat(api): schedule + runs endpoints; next_run_at on task detail"
```
---
### Task 5: Frontend — schedule control, next-run, broken badge, run-log modal, list icon
**Files:**
- Modify: `web/src/api.ts` (Task fields; `Run` type; `setTaskSchedule`, `listRuns`)
- Create: `web/src/components/RunLogModal.tsx`
- Modify: `web/src/pages/TaskDetail.tsx` (schedule select, next-run line, broken badge, Runs button+modal, reload on `task_broken`)
- Modify: `web/src/pages/Tasks.tsx` (broken indicator in the list)
- Modify: `web/src/app.css` (broken badge / schedule-row styles)
**Interfaces:**
- Consumes: backend routes from Task 4; `Modal` (`web/src/components/Modal.tsx`), `StatusBadge`.
- [ ] **Step 1: Extend the API client**
In `web/src/api.ts`, extend `Task`:
```ts
export interface Task {
id: number
name: string
src_endpoint_id: number
dst_endpoint_id: number
status: string
folder_mapping?: Record<string, string>
schedule_interval_seconds?: number
broken?: boolean
next_run_at?: string | null
}
```
Add a `Run` interface and the two calls (near `runTask`):
```ts
export interface Run {
id: number
task_id: number
status: string
started_at: string
finished_at: string | null
total_copied: number
total_skipped: number
total_errors: number
trigger: string
}
export const setTaskSchedule = (taskId: number, intervalSeconds: number) =>
api(`/api/tasks/${taskId}/schedule`, { ...jsonBody({ interval_seconds: intervalSeconds }), method: 'PUT' })
export const listRuns = (taskId: number) => api<Run[]>(`/api/tasks/${taskId}/runs`)
```
- [ ] **Step 2: Create the run-log modal**
`web/src/components/RunLogModal.tsx`:
```tsx
import { useEffect, useState } from 'react'
import { Modal } from './Modal'
import { StatusBadge } from './StatusBadge'
import { listRuns, type Run } from '../api'
const fmt = (iso: string | null) => (iso ? new Date(iso).toLocaleString() : '—')
export function RunLogModal({ taskId, open, onClose }: { taskId: number; open: boolean; onClose: () => void }) {
const [runs, setRuns] = useState<Run[] | null>(null)
useEffect(() => {
if (!open) return
setRuns(null)
listRuns(taskId).then((r) => setRuns(r ?? [])).catch(() => setRuns([]))
}, [open, taskId])
return (
<Modal open={open} title="Run log" onClose={onClose} size="lg">
<div className="tbl-wrap">
<table className="tbl">
<thead>
<tr>
<th>Started</th>
<th>Finished</th>
<th>Trigger</th>
<th>Status</th>
<th>Copied</th>
<th>Skipped</th>
<th>Errors</th>
</tr>
</thead>
<tbody>
{runs === null ? (
<tr className="empty-row"><td colSpan={7}>loading</td></tr>
) : runs.length === 0 ? (
<tr className="empty-row"><td colSpan={7}>no runs yet</td></tr>
) : (
runs.map((r) => (
<tr key={r.id}>
<td>{fmt(r.started_at)}</td>
<td>{fmt(r.finished_at)}</td>
<td>{r.trigger}</td>
<td><StatusBadge status={r.status} /></td>
<td className="num-cell">{r.total_copied}</td>
<td className="num-cell">{r.total_skipped}</td>
<td className="num-cell">{r.total_errors}</td>
</tr>
))
)}
</tbody>
</table>
</div>
</Modal>
)
}
```
- [ ] **Step 3: TaskDetail — schedule control, next-run, broken badge, Runs modal**
In `web/src/pages/TaskDetail.tsx`:
Add to the imports from `../api`: `listRuns` is not needed here; add `setTaskSchedule`. Import the modal: `import { RunLogModal } from '../components/RunLogModal'`.
Add state near the other `useState`s:
```ts
const [showRuns, setShowRuns] = useState(false)
```
Add `task_broken` to the reload event list (the existing `if ([...].includes(ev.type)) reload()` array) so the detail refreshes when the breaker trips:
```ts
if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled', 'plan', 'task_broken'].includes(ev.type)) {
reload()
}
```
Add a schedule handler:
```ts
async function onSchedule(intervalSeconds: number) {
setError(null)
try {
await setTaskSchedule(id, intervalSeconds)
reload()
} catch (err) {
setError(err instanceof Error ? err.message : 'Failed to set schedule')
}
}
```
In the "Run control" panel (after the `.btn-row` with Test/Run), add the schedule row. `task` is in scope from `const { task, accounts } = data`:
```tsx
<div className="sched-row">
<label htmlFor="sched">Schedule</label>
<select
id="sched"
value={task.schedule_interval_seconds ?? 0}
onChange={(e) => onSchedule(Number(e.target.value))}
>
<option value={0}>Off</option>
<option value={3600}>Every 1h</option>
<option value={10800}>Every 3h</option>
<option value={21600}>Every 6h</option>
<option value={43200}>Every 12h</option>
<option value={86400}>Every 24h</option>
</select>
{task.next_run_at && (
<span className="sched-next">Next run: {new Date(task.next_run_at).toLocaleString()}</span>
)}
{task.broken && <span className="badge badge-fail"><span className="dot" />broken</span>}
<button type="button" className="link-btn" onClick={() => setShowRuns(true)}>
runs
</button>
</div>
```
Add the broken badge next to the status in the page head (`<StatusBadge status={task.status} />`):
```tsx
<StatusBadge status={task.status} />
{task.broken && <span className="badge badge-fail" style={{ marginLeft: 8 }}><span className="dot" />broken</span>}
```
Render the modal near the other modals at the end of the returned JSX:
```tsx
<RunLogModal taskId={id} open={showRuns} onClose={() => setShowRuns(false)} />
```
- [ ] **Step 4: Tasks list — broken indicator**
In `web/src/pages/Tasks.tsx`, in the row's Status cell, show the broken badge beside the status:
```tsx
<td>
<StatusBadge status={t.status} />
{t.broken && <span className="badge badge-fail" style={{ marginLeft: 8 }}><span className="dot" />broken</span>}
</td>
```
- [ ] **Step 5: CSS for the schedule row**
In `web/src/app.css`, add:
```css
.sched-row {
display: flex;
align-items: center;
gap: 12px;
margin-top: 14px;
flex-wrap: wrap;
}
.sched-row label {
font-size: 11px;
font-weight: 700;
letter-spacing: 0.1em;
text-transform: uppercase;
color: var(--fg-dim);
}
.sched-row select {
background: var(--bg-inset);
border: 1px solid var(--border);
color: var(--fg);
padding: 7px 10px;
font-size: 13px;
border-radius: 2px;
}
.sched-next {
font-size: 12px;
color: var(--fg-dim);
font-variant-numeric: tabular-nums;
}
```
- [ ] **Step 6: Verify build + lint**
Run: `cd web && npm run build && npx oxlint src/`
Expected: build clean; oxlint shows ONLY the two pre-existing warnings (`ConfirmProvider` only-export-components, `TaskDetail` exhaustive-deps). Fix any new finding.
- [ ] **Step 7: Commit**
```bash
git add web/src/api.ts web/src/components/RunLogModal.tsx web/src/pages/TaskDetail.tsx web/src/pages/Tasks.tsx web/src/app.css
git commit -m "feat(web): schedule control, next-run, broken badge, run-log modal"
```
---
### Task 6: End-to-end verification on prod
Per project rules, functional verification runs on prod after deploy. No code.
- [ ] **Step 1: Build + migrate + deploy**
Run: `make build && make up` (migration 0004 auto-applies at startup). Wait for `curl -fsS http://localhost/healthz`.
- [ ] **Step 2: Enable-gate + next-run display**
On a task whose accounts are NOT all tested OK, set Schedule → the request returns 409 and the schedule stays Off (surfaced in the error banner). Test the accounts to OK, then set Schedule = Every 1h → "Next run: <local time>" appears (≈ now + 1h, in browser local time), and no run starts immediately (first run is one interval out).
- [ ] **Step 3: Run-log modal**
Run the task manually once. Open **runs** → the modal lists the run with local-time started/finished, `trigger = manual`, status, and copied/skipped/errors.
- [ ] **Step 4: Breaker (scheduled failure → broken)**
Simulate a scheduled failure: break one account (e.g. wrong stored password) on a scheduled task, then let a scheduled run occur (or temporarily observe the next scheduled cycle). When the scheduled run finishes with errors, the schedule flips to Off, the task shows the red **broken** badge in both the task detail and the Tasks list, and no further auto-runs occur until re-enabled. (Auto-run *timing* itself is covered by the `scheduler` unit tests; presets start at 1h, so the timed trigger is not observed within a quick E2E — verify the breaker/flag transition and the DB state.)
- [ ] **Step 5: Delete cleanup**
Delete a task that had a schedule and runs; confirm it disappears and its runs are gone (cascade). The scheduler stops considering it (no errors in logs).
- [ ] **Step 6: Record result**
Save an E2E note under `./swarm-report/scheduled-task-runs-<YYYY-MM-DD>.md` with pass/fail per step.
---
## Self-Review
- **Spec coverage:** interval per task + auto-run (Tasks 13); don't-start-if-running + interval-from-completion (`ListSchedulableTasks` `status<>'running'` filter + `NextRun` from `LastFinished`, Tasks 12); next-run in browser local time (`next_run_at` DTO Task 4 + `toLocaleString` Task 5); run-log modal with per-run status/totals (Tasks 45); enable-gate all-OK → 409 (Task 4); breaker scheduled-error → broken + red icon in list & detail (Tasks 3, 5); first-run-after-interval via anchor (Task 1 `SetTaskSchedule` + Task 2 `NextRun`); delete cleanup via cascade (noted, no code); migration 0004 (Task 1). All covered.
- **Placeholder scan:** none — every step carries real code/commands.
- **Type consistency:** `CreateRun(ctx, taskID, trigger)` identical across Tasks 1/3; `Run(ctx, taskID, trigger)` defined Task 3, called by scheduler Task 2 and `handleRun` Task 3; `NextRun(interval, anchor, lastFinished)` identical in scheduler (Task 2) and `taskViewFor` (Task 4); `SchedulableTask{ID,IntervalSeconds,Anchor,LastFinished}` fields match between store (Task 1) and `dueTaskIDs` (Task 2); JSON contract `schedule_interval_seconds`/`broken`/`next_run_at`/`interval_seconds` and `Run` fields match between Go (Tasks 1,4) and TS (Task 5).
- **Sequencing note:** Task 2's whole-module build is red until Task 3 lands the 3-arg `Run` (the scheduler calls it). Both are flagged to run back-to-back; each task's own unit test passes independently.