38 KiB
Scheduled (Recurring) Task Runs Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Let a task run itself on a recurring interval, with a breaker that disables the schedule and flags the task broken when a scheduled run errors, plus a run-log modal and next-run display.
Architecture: A single in-process scheduler goroutine (30 s ticker) reads schedulable tasks from the DB each tick and triggers due ones through Orchestrator.Run with a "scheduled" trigger. Schedule config lives on tasks (interval, anchor, broken); runs.trigger distinguishes manual vs scheduled. DB is the source of truth, so the scheduler is stateless and restart-safe. The next-run time is computed server-side and formatted in the browser's local time.
Tech Stack: Go (net/http, pgx v5, golang-migrate), Postgres 18, React 19 + Vite + TypeScript.
Global Constraints
- Scheduler poll interval: 30 s (a package const).
- Interval presets → seconds: Off=0, 1h=3600, 3h=10800, 6h=21600, 12h=43200, 24h=86400.
- Breaker trips only on scheduled runs (
trigger == "scheduled"ANDtotal_errors > 0) →SetTaskBroken(setsbroken=true, schedule_interval_seconds=0). Manual runs never trip it. - Enabling a schedule (interval>0) requires all accounts tested OK; otherwise HTTP 409. Disabling (0) is always allowed and clears
broken. - First run after enabling =
schedule_anchor + interval; subsequent =last_finished_at + interval. Never run while the task isrunning. next_run_at: RFC3339 UTC string, null when schedule off / task running / broken.- Non-destructive migration;
ON DELETE CASCADEalready removesrunson task delete — no new cleanup code. - pgx v5 scans nullable
TIMESTAMPTZinto*time.Timeand non-null intotime.Time. - Backend verification:
go build ./...,go vet ./...,go test ./...(store tests needTEST_DATABASE_URL, else skip). - Frontend verification:
cd web && npm run build,npx oxlint src/(only the two pre-existing warnings allowed:ConfirmProvideronly-export-components,TaskDetailexhaustive-deps). - Commit after each task.
Task 1: Schema + store (scheduling columns, run trigger, store methods)
Files:
- Create:
migrations/0004_task_scheduling.up.sql,migrations/0004_task_scheduling.down.sql - Modify:
internal/store/tasks.go(Task struct; GetTask + ListTasks SELECT/Scan; add setters) - Modify:
internal/store/runs.go(Run struct;CreateRunsignature; add queries) - Modify:
internal/orchestrator/orchestrator.go(one line: theCreateRuncall — pass"manual"to keep it compiling; the real trigger threading is Task 3) - Test:
internal/store/scheduling_test.go(new)
Interfaces:
-
Produces:
store.Taskfields:ScheduleIntervalSeconds int64(jsonschedule_interval_seconds),ScheduleAnchor *time.Time(json-),Broken bool(jsonbroken).store.Runfields:StartedAt time.Time(jsonstarted_at),FinishedAt *time.Time(jsonfinished_at),Trigger string(jsontrigger); existing fields gain json tags.store.SchedulableTask struct { ID int64; IntervalSeconds int64; Anchor time.Time; LastFinished *time.Time }func (s *Store) SetTaskSchedule(ctx, id, intervalSeconds int64) errorfunc (s *Store) SetTaskBroken(ctx, id int64) errorfunc (s *Store) ListRunsByTask(ctx, taskID int64) ([]Run, error)func (s *Store) LastFinishedRunAt(ctx, taskID int64) (*time.Time, error)func (s *Store) ListSchedulableTasks(ctx) ([]SchedulableTask, error)func (s *Store) CreateRun(ctx, taskID int64, trigger string) (int64, error)(signature change)
-
Step 1: Write the migration
migrations/0004_task_scheduling.up.sql:
ALTER TABLE tasks ADD COLUMN schedule_interval_seconds INT NOT NULL DEFAULT 0;
ALTER TABLE tasks ADD COLUMN schedule_anchor TIMESTAMPTZ;
ALTER TABLE tasks ADD COLUMN broken BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE runs ADD COLUMN trigger TEXT NOT NULL DEFAULT 'manual';
migrations/0004_task_scheduling.down.sql:
ALTER TABLE runs DROP COLUMN trigger;
ALTER TABLE tasks DROP COLUMN broken;
ALTER TABLE tasks DROP COLUMN schedule_anchor;
ALTER TABLE tasks DROP COLUMN schedule_interval_seconds;
Apply to the test DB before running store tests:
docker exec -i imapcopier-testpg psql -U imap -d imapcopier_test -q -v ON_ERROR_STOP=1 < migrations/0004_task_scheduling.up.sql
- Step 2: Write the failing store test
internal/store/scheduling_test.go:
package store
import (
"context"
"testing"
)
func TestTaskScheduleAndBroken(t *testing.T) {
s := testStore(t)
ctx := context.Background()
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
// Defaults: no schedule, not broken.
tk, _ := s.GetTask(ctx, taskID)
if tk.ScheduleIntervalSeconds != 0 || tk.Broken || tk.ScheduleAnchor != nil {
t.Fatalf("defaults: interval=%d broken=%v anchor=%v", tk.ScheduleIntervalSeconds, tk.Broken, tk.ScheduleAnchor)
}
// Enable → interval set, anchor stamped, appears in schedulable list.
if err := s.SetTaskSchedule(ctx, taskID, 3600); err != nil {
t.Fatalf("set schedule: %v", err)
}
tk, _ = s.GetTask(ctx, taskID)
if tk.ScheduleIntervalSeconds != 3600 || tk.ScheduleAnchor == nil {
t.Fatalf("after enable: interval=%d anchor=%v", tk.ScheduleIntervalSeconds, tk.ScheduleAnchor)
}
sch, _ := s.ListSchedulableTasks(ctx)
if len(sch) != 1 || sch[0].ID != taskID || sch[0].IntervalSeconds != 3600 || sch[0].LastFinished != nil {
t.Fatalf("schedulable: %+v", sch)
}
// Breaker → broken true, interval 0, drops out of schedulable list.
if err := s.SetTaskBroken(ctx, taskID); err != nil {
t.Fatalf("set broken: %v", err)
}
tk, _ = s.GetTask(ctx, taskID)
if !tk.Broken || tk.ScheduleIntervalSeconds != 0 {
t.Fatalf("after break: broken=%v interval=%d", tk.Broken, tk.ScheduleIntervalSeconds)
}
if sch, _ := s.ListSchedulableTasks(ctx); len(sch) != 0 {
t.Fatalf("broken task still schedulable: %+v", sch)
}
// Re-enable clears broken.
_ = s.SetTaskSchedule(ctx, taskID, 21600)
if tk, _ = s.GetTask(ctx, taskID); tk.Broken {
t.Fatalf("re-enable did not clear broken")
}
}
func TestListRunsByTaskWithTrigger(t *testing.T) {
s := testStore(t)
ctx := context.Background()
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
r1, _ := s.CreateRun(ctx, taskID, "manual")
_ = s.FinishRun(ctx, r1, "done", 5, 1, 0)
_, _ = s.CreateRun(ctx, taskID, "scheduled") // still running (no finish)
runs, err := s.ListRunsByTask(ctx, taskID)
if err != nil || len(runs) != 2 {
t.Fatalf("list runs: %v len=%d", err, len(runs))
}
// newest first
if runs[0].Trigger != "scheduled" || runs[0].FinishedAt != nil {
t.Fatalf("run[0]: %+v", runs[0])
}
if runs[1].Trigger != "manual" || runs[1].FinishedAt == nil || runs[1].TotalCopied != 5 {
t.Fatalf("run[1]: %+v", runs[1])
}
// Last finished-at reflects the finished manual run.
lf, _ := s.LastFinishedRunAt(ctx, taskID)
if lf == nil {
t.Fatalf("LastFinishedRunAt nil, want the finished run's time")
}
}
- Step 3: Run tests to verify they fail
Run: TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./internal/store/ -run 'TestTaskScheduleAndBroken|TestListRunsByTaskWithTrigger'
Expected: FAIL to compile — new struct fields, SetTaskSchedule, SetTaskBroken, ListRunsByTask, LastFinishedRunAt, ListSchedulableTasks, and the 3-arg CreateRun don't exist yet.
- Step 4: Extend
store.Taskand its scans
In internal/store/tasks.go, add import "time", and extend the struct:
type Task struct {
ID int64 `json:"id"`
Name string `json:"name"`
SrcEndpointID int64 `json:"src_endpoint_id"`
DstEndpointID int64 `json:"dst_endpoint_id"`
Status string `json:"status"`
FolderMapping map[string]string `json:"folder_mapping"`
ScheduleIntervalSeconds int64 `json:"schedule_interval_seconds"`
ScheduleAnchor *time.Time `json:"-"`
Broken bool `json:"broken"`
}
Update GetTask's SELECT + Scan:
err := s.Pool.QueryRow(ctx,
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping,
schedule_interval_seconds, schedule_anchor, broken
FROM tasks WHERE id=$1`, id).
Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping,
&t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken)
Update ListTasks's SELECT + Scan the same way (add the three columns and three scan targets):
rows, err := s.Pool.Query(ctx,
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping,
schedule_interval_seconds, schedule_anchor, broken
FROM tasks ORDER BY id DESC`)
...
if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping,
&t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken); err != nil {
- Step 5: Add task scheduling setters + schedulable query
Append to internal/store/tasks.go:
// SetTaskSchedule sets the recurrence interval (0 = off), stamps the anchor when
// enabling (the first run is one interval after this), and clears broken — any
// explicit schedule change un-breaks the task.
func (s *Store) SetTaskSchedule(ctx context.Context, id, intervalSeconds int64) error {
_, err := s.Pool.Exec(ctx,
`UPDATE tasks SET schedule_interval_seconds=$2,
schedule_anchor = CASE WHEN $2 > 0 THEN now() ELSE schedule_anchor END,
broken = false
WHERE id=$1`, id, intervalSeconds)
return err
}
// SetTaskBroken trips the schedule breaker: disables the schedule and flags the
// task for operator attention.
func (s *Store) SetTaskBroken(ctx context.Context, id int64) error {
_, err := s.Pool.Exec(ctx,
`UPDATE tasks SET broken=true, schedule_interval_seconds=0 WHERE id=$1`, id)
return err
}
// SchedulableTask is the per-tick decision input for the scheduler.
type SchedulableTask struct {
ID int64
IntervalSeconds int64
Anchor time.Time
LastFinished *time.Time
}
// ListSchedulableTasks returns tasks eligible to auto-run: schedule on, not
// broken, not currently running — each joined with its last finished run time.
func (s *Store) ListSchedulableTasks(ctx context.Context) ([]SchedulableTask, error) {
rows, err := s.Pool.Query(ctx,
`SELECT t.id, t.schedule_interval_seconds, t.schedule_anchor,
(SELECT max(finished_at) FROM runs r WHERE r.task_id=t.id AND r.finished_at IS NOT NULL)
FROM tasks t
WHERE t.schedule_interval_seconds > 0 AND NOT t.broken AND t.status <> 'running'`)
if err != nil {
return nil, err
}
defer rows.Close()
out := []SchedulableTask{}
for rows.Next() {
var st SchedulableTask
if err := rows.Scan(&st.ID, &st.IntervalSeconds, &st.Anchor, &st.LastFinished); err != nil {
return nil, err
}
out = append(out, st)
}
return out, rows.Err()
}
- Step 6: Extend
store.Run,CreateRun, and add run queries
In internal/store/runs.go, add import "time" (alongside context) and replace the struct + CreateRun, then add the queries:
type Run struct {
ID int64 `json:"id"`
TaskID int64 `json:"task_id"`
Status string `json:"status"`
StartedAt time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at"`
TotalCopied int64 `json:"total_copied"`
TotalSkipped int64 `json:"total_skipped"`
TotalErrors int64 `json:"total_errors"`
Trigger string `json:"trigger"`
}
func (s *Store) CreateRun(ctx context.Context, taskID int64, trigger string) (int64, error) {
var id int64
err := s.Pool.QueryRow(ctx,
`INSERT INTO runs (task_id, trigger) VALUES ($1,$2) RETURNING id`, taskID, trigger).Scan(&id)
return id, err
}
// ListRunsByTask returns a task's runs, newest first, for the run-log modal.
func (s *Store) ListRunsByTask(ctx context.Context, taskID int64) ([]Run, error) {
rows, err := s.Pool.Query(ctx,
`SELECT id, task_id, started_at, finished_at, status,
total_copied, total_skipped, total_errors, trigger
FROM runs WHERE task_id=$1 ORDER BY id DESC`, taskID)
if err != nil {
return nil, err
}
defer rows.Close()
out := []Run{}
for rows.Next() {
var r Run
if err := rows.Scan(&r.ID, &r.TaskID, &r.StartedAt, &r.FinishedAt, &r.Status,
&r.TotalCopied, &r.TotalSkipped, &r.TotalErrors, &r.Trigger); err != nil {
return nil, err
}
out = append(out, r)
}
return out, rows.Err()
}
// LastFinishedRunAt returns the most recent finished run's timestamp, or nil if
// the task has never completed a run — the baseline for the next scheduled run.
func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) {
var t *time.Time
err := s.Pool.QueryRow(ctx,
`SELECT max(finished_at) FROM runs WHERE task_id=$1 AND finished_at IS NOT NULL`, taskID).Scan(&t)
return t, err
}
Keep the existing FinishRun as-is.
- Step 7: Fix the
CreateRuncaller so the build passes
In internal/orchestrator/orchestrator.go, the Run method calls o.store.CreateRun(ctx, taskID). Change it to pass a trigger literal for now:
runID, err := o.store.CreateRun(ctx, taskID, "manual")
(Task 3 threads the real trigger through; this keeps the build green now.)
- Step 8: Run the store tests to verify they pass
Run: TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./internal/store/ -run 'TestTaskScheduleAndBroken|TestListRunsByTaskWithTrigger' -v
Expected: PASS.
- Step 9: Verify whole build + suite
Run: go build ./... && go vet ./... && TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...
Expected: all pass.
- Step 10: Commit
git add migrations/0004_task_scheduling.up.sql migrations/0004_task_scheduling.down.sql \
internal/store/tasks.go internal/store/runs.go internal/store/scheduling_test.go \
internal/orchestrator/orchestrator.go
git commit -m "feat(store): task schedule columns, run trigger, scheduling queries"
Task 2: Scheduler package + wiring
Files:
- Create:
internal/scheduler/scheduler.go - Create:
internal/scheduler/scheduler_test.go - Modify:
cmd/server/main.go(start the scheduler)
Interfaces:
-
Consumes:
store.SchedulableTask,store.ListSchedulableTasks,orchestrator.Orchestrator.Run(ctx, taskID, trigger)(the 3-arg form lands in Task 3; this task calls it as 3-arg and both compile together only after Task 3 — see Step 5 note). -
Produces:
func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Timefunc dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64type Scheduler;func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler;func (s *Scheduler) Start(ctx context.Context)
-
Step 1: Write the failing pure-logic test
internal/scheduler/scheduler_test.go:
package scheduler
import (
"testing"
"time"
"github.com/vasyansk/imap-copier/internal/store"
)
func TestNextRun(t *testing.T) {
anchor := time.Date(2026, 7, 3, 10, 0, 0, 0, time.UTC)
fin := time.Date(2026, 7, 3, 11, 0, 0, 0, time.UTC)
// No finished run yet → anchor + interval.
if got := NextRun(time.Hour, anchor, nil); !got.Equal(anchor.Add(time.Hour)) {
t.Fatalf("first run: got %v", got)
}
// Finished run exists → last finished + interval (from completion, not start).
if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) {
t.Fatalf("recurring: got %v", got)
}
}
func TestDueTaskIDs(t *testing.T) {
now := time.Date(2026, 7, 3, 12, 0, 0, 0, time.UTC)
anchorOld := now.Add(-2 * time.Hour) // enabled 2h ago
finRecent := now.Add(-30 * time.Minute)
tasks := []store.SchedulableTask{
{ID: 1, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: nil}, // due: anchor+1h < now
{ID: 2, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: &finRecent}, // not due: fin+1h > now
{ID: 3, IntervalSeconds: 21600, Anchor: anchorOld, LastFinished: nil}, // not due: anchor+6h > now
}
ids := dueTaskIDs(tasks, now)
if len(ids) != 1 || ids[0] != 1 {
t.Fatalf("due IDs = %v, want [1]", ids)
}
}
- Step 2: Run test to verify it fails
Run: go test ./internal/scheduler/
Expected: FAIL — package/functions don't exist.
- Step 3: Implement the scheduler
internal/scheduler/scheduler.go:
// Package scheduler runs tasks on their recurring interval. A single goroutine
// polls the DB every pollInterval and triggers due tasks through the
// orchestrator. The DB is the source of truth, so the scheduler holds no state
// and is safe across restarts.
package scheduler
import (
"context"
"log/slog"
"time"
"github.com/vasyansk/imap-copier/internal/orchestrator"
"github.com/vasyansk/imap-copier/internal/store"
)
const pollInterval = 30 * time.Second
// NextRun is when a task should next run: one interval after its last completed
// run, or one interval after the schedule anchor if it has never completed one.
func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time {
base := anchor
if lastFinished != nil {
base = *lastFinished
}
return base.Add(interval)
}
// dueTaskIDs returns the IDs of tasks whose next run is at or before now.
func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64 {
var ids []int64
for _, t := range tasks {
next := NextRun(time.Duration(t.IntervalSeconds)*time.Second, t.Anchor, t.LastFinished)
if !now.Before(next) {
ids = append(ids, t.ID)
}
}
return ids
}
type Scheduler struct {
store *store.Store
orch *orchestrator.Orchestrator
}
func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler {
return &Scheduler{store: st, orch: orch}
}
// Start blocks, ticking until ctx is cancelled. Run it in a goroutine.
func (s *Scheduler) Start(ctx context.Context) {
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.tick(ctx)
}
}
}
func (s *Scheduler) tick(ctx context.Context) {
tasks, err := s.store.ListSchedulableTasks(ctx)
if err != nil {
slog.Error("scheduler: list schedulable", "err", err)
return
}
for _, id := range dueTaskIDs(tasks, time.Now()) {
if _, err := s.orch.Run(ctx, id, "scheduled"); err != nil {
// ErrAlreadyRunning / ErrNotTested are expected races/edge cases, not fatal.
slog.Info("scheduler: run skipped", "task", id, "err", err)
}
}
}
- Step 4: Wire the scheduler into main
In cmd/server/main.go, add the import "github.com/vasyansk/imap-copier/internal/scheduler" and, after orch := orchestrator.New(...) and before http.ListenAndServe, start it:
go scheduler.New(st, orch).Start(context.Background())
- Step 5: Run the pure test
Run: go test ./internal/scheduler/ -v
Expected: PASS.
Note: go build ./... will FAIL until Task 3 changes Orchestrator.Run to the 3-arg form (s.orch.Run(ctx, id, "scheduled")). Run Task 3 immediately after this task; the package unit test passes now, the whole-module build goes green after Task 3. (The two tasks are sequenced back-to-back for this reason.)
- Step 6: Commit
git add internal/scheduler/scheduler.go internal/scheduler/scheduler_test.go cmd/server/main.go
git commit -m "feat(scheduler): 30s polling scheduler with pure NextRun/dueTaskIDs"
Task 3: Orchestrator trigger threading + breaker
Files:
- Modify:
internal/orchestrator/orchestrator.go(Runsignature; thread trigger intoCreateRun+runAll; breaker inrunAll) - Modify:
internal/httpapi/run.go(handleRunpasses"manual") - Test:
internal/orchestrator/breaker_test.go(new)
Interfaces:
-
Consumes:
store.SetTaskBroken(Task 1). -
Produces:
func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error)(was 2-arg)func shouldBreak(trigger string, totErr int64) bool
-
Step 1: Write the failing breaker-logic test
internal/orchestrator/breaker_test.go:
package orchestrator
import "testing"
func TestShouldBreak(t *testing.T) {
cases := []struct {
trigger string
errs int64
want bool
}{
{"scheduled", 2, true}, // scheduled run with errors trips the breaker
{"scheduled", 0, false}, // scheduled run, clean — no break
{"manual", 5, false}, // manual run never trips the breaker
{"manual", 0, false},
}
for _, c := range cases {
if got := shouldBreak(c.trigger, c.errs); got != c.want {
t.Fatalf("shouldBreak(%q,%d)=%v want %v", c.trigger, c.errs, got, c.want)
}
}
}
- Step 2: Run test to verify it fails
Run: go test ./internal/orchestrator/ -run TestShouldBreak
Expected: FAIL — shouldBreak undefined.
- Step 3: Add
shouldBreakand thread the trigger throughRun
In internal/orchestrator/orchestrator.go:
Add the helper (package level):
// shouldBreak reports whether a completed run should trip the schedule breaker:
// only scheduled runs that ended with errors.
func shouldBreak(trigger string, totErr int64) bool {
return trigger == "scheduled" && totErr > 0
}
Change Run to accept and forward the trigger:
func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error) {
Inside Run, replace the temporary literal from Task 1:
runID, err := o.store.CreateRun(ctx, taskID, trigger)
And pass the trigger into the coordinator goroutine:
go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP, trigger)
Change runAll's signature to receive it and trip the breaker after the final status is set:
func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint, trigger string) {
At the end of runAll, after SetTaskStatus and the run_done publish, add:
if shouldBreak(trigger, totErr) {
_ = o.store.SetTaskBroken(ctx, task.ID)
o.hub.Publish(wshub.Event{Type: "task_broken", TaskID: task.ID,
Data: map[string]any{"task_id": task.ID, "errors": totErr}})
}
- Step 4: Update the manual caller
In internal/httpapi/run.go, handleRun:
runID, err := s.orch.Run(r.Context(), taskID, "manual")
- Step 5: Run the breaker test + full build
Run: go test ./internal/orchestrator/ -run TestShouldBreak -v && go build ./... && go vet ./...
Expected: test PASS; build + vet clean (the scheduler's s.orch.Run(ctx, id, "scheduled") from Task 2 now resolves).
- Step 6: Full suite
Run: TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...
Expected: all pass.
- Step 7: Commit
git add internal/orchestrator/orchestrator.go internal/orchestrator/breaker_test.go internal/httpapi/run.go
git commit -m "feat(orchestrator): thread run trigger; breaker on scheduled runs with errors"
Task 4: HTTP — schedule endpoint, runs endpoint, next_run_at DTO
Files:
- Modify:
internal/httpapi/tasks.go(taskViewwithnext_run_at; compute inhandleGetTask; addhandleSetSchedule,handleListRuns) - Modify:
internal/httpapi/router.go(two routes)
Interfaces:
-
Consumes:
store.SetTaskSchedule,store.ListRunsByTask,store.LastFinishedRunAt,store.ListAccountsByTask,scheduler.NextRun. -
Produces: routes
PUT /api/tasks/{id}/schedule,GET /api/tasks/{id}/runs; task JSON gainsnext_run_at. -
Step 1: Add the task DTO with next_run_at and compute it in handleGetTask
In internal/httpapi/tasks.go, add imports "time" and "github.com/vasyansk/imap-copier/internal/scheduler". Add the view type and compute the next run:
// taskView augments the stored task with the server-computed next scheduled run
// (RFC3339 UTC; null when the schedule is off, the task is running, or broken).
type taskView struct {
store.Task
NextRunAt *string `json:"next_run_at"`
}
func (s *Server) taskViewFor(ctx context.Context, t store.Task) taskView {
var nextRunAt *string
if t.ScheduleIntervalSeconds > 0 && !t.Broken && t.Status != "running" && t.ScheduleAnchor != nil {
last, _ := s.store.LastFinishedRunAt(ctx, t.ID)
n := scheduler.NextRun(time.Duration(t.ScheduleIntervalSeconds)*time.Second, *t.ScheduleAnchor, last)
str := n.UTC().Format(time.RFC3339)
nextRunAt = &str
}
return taskView{Task: t, NextRunAt: nextRunAt}
}
Change handleGetTask's final write to use it:
writeJSON(w, http.StatusOK, map[string]any{"task": s.taskViewFor(r.Context(), task), "accounts": views})
(handleListTasks stays returning []store.Task — the list needs only broken, which serializes via the json tag; next_run_at is detail-only.)
- Step 2: Add the schedule + runs handlers
Append to internal/httpapi/tasks.go:
// handleSetSchedule enables/changes/disables a task's recurring schedule.
// Enabling (interval>0) requires every account to pass its connection tests.
func (s *Server) handleSetSchedule(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
var body struct {
IntervalSeconds int64 `json:"interval_seconds"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
if body.IntervalSeconds > 0 {
accs, err := s.store.ListAccountsByTask(r.Context(), taskID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !allAccountsTested(accs) {
http.Error(w, "all accounts must pass connection tests before scheduling", http.StatusConflict)
return
}
}
if err := s.store.SetTaskSchedule(r.Context(), taskID, body.IntervalSeconds); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
// allAccountsTested mirrors the orchestrator's run gate: non-empty and every
// account OK on both sides.
func allAccountsTested(accs []store.Account) bool {
if len(accs) == 0 {
return false
}
for _, a := range accs {
if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" {
return false
}
}
return true
}
func (s *Server) handleListRuns(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
runs, err := s.store.ListRunsByTask(r.Context(), taskID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, runs)
}
(json and store are already imported in tasks.go.)
- Step 3: Register routes
In internal/httpapi/router.go, after the existing PUT /api/tasks/{id}/folder-mapping line (or with the other task routes):
api.HandleFunc("PUT /api/tasks/{id}/schedule", s.handleSetSchedule)
api.HandleFunc("GET /api/tasks/{id}/runs", s.handleListRuns)
- Step 4: Verify build + vet + suite
Run: go build ./... && go vet ./... && TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./...
Expected: all pass.
- Step 5: Commit
git add internal/httpapi/tasks.go internal/httpapi/router.go
git commit -m "feat(api): schedule + runs endpoints; next_run_at on task detail"
Task 5: Frontend — schedule control, next-run, broken badge, run-log modal, list icon
Files:
- Modify:
web/src/api.ts(Task fields;Runtype;setTaskSchedule,listRuns) - Create:
web/src/components/RunLogModal.tsx - Modify:
web/src/pages/TaskDetail.tsx(schedule select, next-run line, broken badge, Runs button+modal, reload ontask_broken) - Modify:
web/src/pages/Tasks.tsx(broken indicator in the list) - Modify:
web/src/app.css(broken badge / schedule-row styles)
Interfaces:
-
Consumes: backend routes from Task 4;
Modal(web/src/components/Modal.tsx),StatusBadge. -
Step 1: Extend the API client
In web/src/api.ts, extend Task:
export interface Task {
id: number
name: string
src_endpoint_id: number
dst_endpoint_id: number
status: string
folder_mapping?: Record<string, string>
schedule_interval_seconds?: number
broken?: boolean
next_run_at?: string | null
}
Add a Run interface and the two calls (near runTask):
export interface Run {
id: number
task_id: number
status: string
started_at: string
finished_at: string | null
total_copied: number
total_skipped: number
total_errors: number
trigger: string
}
export const setTaskSchedule = (taskId: number, intervalSeconds: number) =>
api(`/api/tasks/${taskId}/schedule`, { ...jsonBody({ interval_seconds: intervalSeconds }), method: 'PUT' })
export const listRuns = (taskId: number) => api<Run[]>(`/api/tasks/${taskId}/runs`)
- Step 2: Create the run-log modal
web/src/components/RunLogModal.tsx:
import { useEffect, useState } from 'react'
import { Modal } from './Modal'
import { StatusBadge } from './StatusBadge'
import { listRuns, type Run } from '../api'
const fmt = (iso: string | null) => (iso ? new Date(iso).toLocaleString() : '—')
export function RunLogModal({ taskId, open, onClose }: { taskId: number; open: boolean; onClose: () => void }) {
const [runs, setRuns] = useState<Run[] | null>(null)
useEffect(() => {
if (!open) return
setRuns(null)
listRuns(taskId).then((r) => setRuns(r ?? [])).catch(() => setRuns([]))
}, [open, taskId])
return (
<Modal open={open} title="Run log" onClose={onClose} size="lg">
<div className="tbl-wrap">
<table className="tbl">
<thead>
<tr>
<th>Started</th>
<th>Finished</th>
<th>Trigger</th>
<th>Status</th>
<th>Copied</th>
<th>Skipped</th>
<th>Errors</th>
</tr>
</thead>
<tbody>
{runs === null ? (
<tr className="empty-row"><td colSpan={7}>loading…</td></tr>
) : runs.length === 0 ? (
<tr className="empty-row"><td colSpan={7}>no runs yet</td></tr>
) : (
runs.map((r) => (
<tr key={r.id}>
<td>{fmt(r.started_at)}</td>
<td>{fmt(r.finished_at)}</td>
<td>{r.trigger}</td>
<td><StatusBadge status={r.status} /></td>
<td className="num-cell">{r.total_copied}</td>
<td className="num-cell">{r.total_skipped}</td>
<td className="num-cell">{r.total_errors}</td>
</tr>
))
)}
</tbody>
</table>
</div>
</Modal>
)
}
- Step 3: TaskDetail — schedule control, next-run, broken badge, Runs modal
In web/src/pages/TaskDetail.tsx:
Add to the imports from ../api: listRuns is not needed here; add setTaskSchedule. Import the modal: import { RunLogModal } from '../components/RunLogModal'.
Add state near the other useStates:
const [showRuns, setShowRuns] = useState(false)
Add task_broken to the reload event list (the existing if ([...].includes(ev.type)) reload() array) so the detail refreshes when the breaker trips:
if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled', 'plan', 'task_broken'].includes(ev.type)) {
reload()
}
Add a schedule handler:
async function onSchedule(intervalSeconds: number) {
setError(null)
try {
await setTaskSchedule(id, intervalSeconds)
reload()
} catch (err) {
setError(err instanceof Error ? err.message : 'Failed to set schedule')
}
}
In the "Run control" panel (after the .btn-row with Test/Run), add the schedule row. task is in scope from const { task, accounts } = data:
<div className="sched-row">
<label htmlFor="sched">Schedule</label>
<select
id="sched"
value={task.schedule_interval_seconds ?? 0}
onChange={(e) => onSchedule(Number(e.target.value))}
>
<option value={0}>Off</option>
<option value={3600}>Every 1h</option>
<option value={10800}>Every 3h</option>
<option value={21600}>Every 6h</option>
<option value={43200}>Every 12h</option>
<option value={86400}>Every 24h</option>
</select>
{task.next_run_at && (
<span className="sched-next">Next run: {new Date(task.next_run_at).toLocaleString()}</span>
)}
{task.broken && <span className="badge badge-fail"><span className="dot" />broken</span>}
<button type="button" className="link-btn" onClick={() => setShowRuns(true)}>
runs
</button>
</div>
Add the broken badge next to the status in the page head (<StatusBadge status={task.status} />):
<StatusBadge status={task.status} />
{task.broken && <span className="badge badge-fail" style={{ marginLeft: 8 }}><span className="dot" />broken</span>}
Render the modal near the other modals at the end of the returned JSX:
<RunLogModal taskId={id} open={showRuns} onClose={() => setShowRuns(false)} />
- Step 4: Tasks list — broken indicator
In web/src/pages/Tasks.tsx, in the row's Status cell, show the broken badge beside the status:
<td>
<StatusBadge status={t.status} />
{t.broken && <span className="badge badge-fail" style={{ marginLeft: 8 }}><span className="dot" />broken</span>}
</td>
- Step 5: CSS for the schedule row
In web/src/app.css, add:
.sched-row {
display: flex;
align-items: center;
gap: 12px;
margin-top: 14px;
flex-wrap: wrap;
}
.sched-row label {
font-size: 11px;
font-weight: 700;
letter-spacing: 0.1em;
text-transform: uppercase;
color: var(--fg-dim);
}
.sched-row select {
background: var(--bg-inset);
border: 1px solid var(--border);
color: var(--fg);
padding: 7px 10px;
font-size: 13px;
border-radius: 2px;
}
.sched-next {
font-size: 12px;
color: var(--fg-dim);
font-variant-numeric: tabular-nums;
}
- Step 6: Verify build + lint
Run: cd web && npm run build && npx oxlint src/
Expected: build clean; oxlint shows ONLY the two pre-existing warnings (ConfirmProvider only-export-components, TaskDetail exhaustive-deps). Fix any new finding.
- Step 7: Commit
git add web/src/api.ts web/src/components/RunLogModal.tsx web/src/pages/TaskDetail.tsx web/src/pages/Tasks.tsx web/src/app.css
git commit -m "feat(web): schedule control, next-run, broken badge, run-log modal"
Task 6: End-to-end verification on prod
Per project rules, functional verification runs on prod after deploy. No code.
- Step 1: Build + migrate + deploy
Run: make build && make up (migration 0004 auto-applies at startup). Wait for curl -fsS http://localhost/healthz.
- Step 2: Enable-gate + next-run display
On a task whose accounts are NOT all tested OK, set Schedule → the request returns 409 and the schedule stays Off (surfaced in the error banner). Test the accounts to OK, then set Schedule = Every 1h → "Next run: " appears (≈ now + 1h, in browser local time), and no run starts immediately (first run is one interval out).
- Step 3: Run-log modal
Run the task manually once. Open runs → the modal lists the run with local-time started/finished, trigger = manual, status, and copied/skipped/errors.
- Step 4: Breaker (scheduled failure → broken)
Simulate a scheduled failure: break one account (e.g. wrong stored password) on a scheduled task, then let a scheduled run occur (or temporarily observe the next scheduled cycle). When the scheduled run finishes with errors, the schedule flips to Off, the task shows the red broken badge in both the task detail and the Tasks list, and no further auto-runs occur until re-enabled. (Auto-run timing itself is covered by the scheduler unit tests; presets start at 1h, so the timed trigger is not observed within a quick E2E — verify the breaker/flag transition and the DB state.)
- Step 5: Delete cleanup
Delete a task that had a schedule and runs; confirm it disappears and its runs are gone (cascade). The scheduler stops considering it (no errors in logs).
- Step 6: Record result
Save an E2E note under ./swarm-report/scheduled-task-runs-<YYYY-MM-DD>.md with pass/fail per step.
Self-Review
- Spec coverage: interval per task + auto-run (Tasks 1–3); don't-start-if-running + interval-from-completion (
ListSchedulableTasksstatus<>'running'filter +NextRunfromLastFinished, Tasks 1–2); next-run in browser local time (next_run_atDTO Task 4 +toLocaleStringTask 5); run-log modal with per-run status/totals (Tasks 4–5); enable-gate all-OK → 409 (Task 4); breaker scheduled-error → broken + red icon in list & detail (Tasks 3, 5); first-run-after-interval via anchor (Task 1SetTaskSchedule+ Task 2NextRun); delete cleanup via cascade (noted, no code); migration 0004 (Task 1). All covered. - Placeholder scan: none — every step carries real code/commands.
- Type consistency:
CreateRun(ctx, taskID, trigger)identical across Tasks 1/3;Run(ctx, taskID, trigger)defined Task 3, called by scheduler Task 2 andhandleRunTask 3;NextRun(interval, anchor, lastFinished)identical in scheduler (Task 2) andtaskViewFor(Task 4);SchedulableTask{ID,IntervalSeconds,Anchor,LastFinished}fields match between store (Task 1) anddueTaskIDs(Task 2); JSON contractschedule_interval_seconds/broken/next_run_at/interval_secondsandRunfields match between Go (Tasks 1,4) and TS (Task 5). - Sequencing note: Task 2's whole-module build is red until Task 3 lands the 3-arg
Run(the scheduler calls it). Both are flagged to run back-to-back; each task's own unit test passes independently.