Files
imap-copier/docs/superpowers/plans/2026-07-03-scheduled-task-runs.md

38 KiB
Raw Permalink Blame History

Scheduled (Recurring) Task Runs Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Let a task run itself on a recurring interval, with a breaker that disables the schedule and flags the task broken when a scheduled run errors, plus a run-log modal and next-run display.

Architecture: A single in-process scheduler goroutine (30 s ticker) reads schedulable tasks from the DB each tick and triggers due ones through Orchestrator.Run with a "scheduled" trigger. Schedule config lives on tasks (interval, anchor, broken); runs.trigger distinguishes manual vs scheduled. DB is the source of truth, so the scheduler is stateless and restart-safe. The next-run time is computed server-side and formatted in the browser's local time.

Tech Stack: Go (net/http, pgx v5, golang-migrate), Postgres 18, React 19 + Vite + TypeScript.

Global Constraints

  • Scheduler poll interval: 30 s (a package const).
  • Interval presets → seconds: Off=0, 1h=3600, 3h=10800, 6h=21600, 12h=43200, 24h=86400.
  • Breaker trips only on scheduled runs (trigger == "scheduled" AND total_errors > 0) → SetTaskBroken (sets broken=true, schedule_interval_seconds=0). Manual runs never trip it.
  • Enabling a schedule (interval>0) requires all accounts tested OK; otherwise HTTP 409. Disabling (0) is always allowed and clears broken.
  • First run after enabling = schedule_anchor + interval; subsequent = last_finished_at + interval. Never run while the task is running.
  • next_run_at: RFC3339 UTC string, null when schedule off / task running / broken.
  • Non-destructive migration; ON DELETE CASCADE already removes runs on task delete — no new cleanup code.
  • pgx v5 scans nullable TIMESTAMPTZ into *time.Time and non-null into time.Time.
  • Backend verification: go build ./..., go vet ./..., go test ./... (store tests need TEST_DATABASE_URL, else skip).
  • Frontend verification: cd web && npm run build, npx oxlint src/ (only the two pre-existing warnings allowed: ConfirmProvider only-export-components, TaskDetail exhaustive-deps).
  • Commit after each task.

Task 1: Schema + store (scheduling columns, run trigger, store methods)

Files:

  • Create: migrations/0004_task_scheduling.up.sql, migrations/0004_task_scheduling.down.sql
  • Modify: internal/store/tasks.go (Task struct; GetTask + ListTasks SELECT/Scan; add setters)
  • Modify: internal/store/runs.go (Run struct; CreateRun signature; add queries)
  • Modify: internal/orchestrator/orchestrator.go (one line: the CreateRun call — pass "manual" to keep it compiling; the real trigger threading is Task 3)
  • Test: internal/store/scheduling_test.go (new)

Interfaces:

  • Produces:

    • store.Task fields: ScheduleIntervalSeconds int64 (json schedule_interval_seconds), ScheduleAnchor *time.Time (json -), Broken bool (json broken).
    • store.Run fields: StartedAt time.Time (json started_at), FinishedAt *time.Time (json finished_at), Trigger string (json trigger); existing fields gain json tags.
    • store.SchedulableTask struct { ID int64; IntervalSeconds int64; Anchor time.Time; LastFinished *time.Time }
    • func (s *Store) SetTaskSchedule(ctx, id, intervalSeconds int64) error
    • func (s *Store) SetTaskBroken(ctx, id int64) error
    • func (s *Store) ListRunsByTask(ctx, taskID int64) ([]Run, error)
    • func (s *Store) LastFinishedRunAt(ctx, taskID int64) (*time.Time, error)
    • func (s *Store) ListSchedulableTasks(ctx) ([]SchedulableTask, error)
    • func (s *Store) CreateRun(ctx, taskID int64, trigger string) (int64, error) (signature change)
  • Step 1: Write the migration

migrations/0004_task_scheduling.up.sql:

ALTER TABLE tasks ADD COLUMN schedule_interval_seconds INT NOT NULL DEFAULT 0;
ALTER TABLE tasks ADD COLUMN schedule_anchor TIMESTAMPTZ;
ALTER TABLE tasks ADD COLUMN broken BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE runs  ADD COLUMN trigger TEXT NOT NULL DEFAULT 'manual';

migrations/0004_task_scheduling.down.sql:

ALTER TABLE runs  DROP COLUMN trigger;
ALTER TABLE tasks DROP COLUMN broken;
ALTER TABLE tasks DROP COLUMN schedule_anchor;
ALTER TABLE tasks DROP COLUMN schedule_interval_seconds;

Apply to the test DB before running store tests: docker exec -i imapcopier-testpg psql -U imap -d imapcopier_test -q -v ON_ERROR_STOP=1 < migrations/0004_task_scheduling.up.sql

  • Step 2: Write the failing store test

internal/store/scheduling_test.go:

package store

import (
	"context"
	"testing"
)

func TestTaskScheduleAndBroken(t *testing.T) {
	s := testStore(t)
	ctx := context.Background()
	epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
	epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
	taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})

	// Defaults: no schedule, not broken.
	tk, _ := s.GetTask(ctx, taskID)
	if tk.ScheduleIntervalSeconds != 0 || tk.Broken || tk.ScheduleAnchor != nil {
		t.Fatalf("defaults: interval=%d broken=%v anchor=%v", tk.ScheduleIntervalSeconds, tk.Broken, tk.ScheduleAnchor)
	}

	// Enable → interval set, anchor stamped, appears in schedulable list.
	if err := s.SetTaskSchedule(ctx, taskID, 3600); err != nil {
		t.Fatalf("set schedule: %v", err)
	}
	tk, _ = s.GetTask(ctx, taskID)
	if tk.ScheduleIntervalSeconds != 3600 || tk.ScheduleAnchor == nil {
		t.Fatalf("after enable: interval=%d anchor=%v", tk.ScheduleIntervalSeconds, tk.ScheduleAnchor)
	}
	sch, _ := s.ListSchedulableTasks(ctx)
	if len(sch) != 1 || sch[0].ID != taskID || sch[0].IntervalSeconds != 3600 || sch[0].LastFinished != nil {
		t.Fatalf("schedulable: %+v", sch)
	}

	// Breaker → broken true, interval 0, drops out of schedulable list.
	if err := s.SetTaskBroken(ctx, taskID); err != nil {
		t.Fatalf("set broken: %v", err)
	}
	tk, _ = s.GetTask(ctx, taskID)
	if !tk.Broken || tk.ScheduleIntervalSeconds != 0 {
		t.Fatalf("after break: broken=%v interval=%d", tk.Broken, tk.ScheduleIntervalSeconds)
	}
	if sch, _ := s.ListSchedulableTasks(ctx); len(sch) != 0 {
		t.Fatalf("broken task still schedulable: %+v", sch)
	}

	// Re-enable clears broken.
	_ = s.SetTaskSchedule(ctx, taskID, 21600)
	if tk, _ = s.GetTask(ctx, taskID); tk.Broken {
		t.Fatalf("re-enable did not clear broken")
	}
}

func TestListRunsByTaskWithTrigger(t *testing.T) {
	s := testStore(t)
	ctx := context.Background()
	epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
	epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
	taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})

	r1, _ := s.CreateRun(ctx, taskID, "manual")
	_ = s.FinishRun(ctx, r1, "done", 5, 1, 0)
	_, _ = s.CreateRun(ctx, taskID, "scheduled") // still running (no finish)

	runs, err := s.ListRunsByTask(ctx, taskID)
	if err != nil || len(runs) != 2 {
		t.Fatalf("list runs: %v len=%d", err, len(runs))
	}
	// newest first
	if runs[0].Trigger != "scheduled" || runs[0].FinishedAt != nil {
		t.Fatalf("run[0]: %+v", runs[0])
	}
	if runs[1].Trigger != "manual" || runs[1].FinishedAt == nil || runs[1].TotalCopied != 5 {
		t.Fatalf("run[1]: %+v", runs[1])
	}

	// Last finished-at reflects the finished manual run.
	lf, _ := s.LastFinishedRunAt(ctx, taskID)
	if lf == nil {
		t.Fatalf("LastFinishedRunAt nil, want the finished run's time")
	}
}
  • Step 3: Run tests to verify they fail

Run: TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./internal/store/ -run 'TestTaskScheduleAndBroken|TestListRunsByTaskWithTrigger' Expected: FAIL to compile — new struct fields, SetTaskSchedule, SetTaskBroken, ListRunsByTask, LastFinishedRunAt, ListSchedulableTasks, and the 3-arg CreateRun don't exist yet.

  • Step 4: Extend store.Task and its scans

In internal/store/tasks.go, add import "time", and extend the struct:

type Task struct {
	ID                      int64             `json:"id"`
	Name                    string            `json:"name"`
	SrcEndpointID           int64             `json:"src_endpoint_id"`
	DstEndpointID           int64             `json:"dst_endpoint_id"`
	Status                  string            `json:"status"`
	FolderMapping           map[string]string `json:"folder_mapping"`
	ScheduleIntervalSeconds int64             `json:"schedule_interval_seconds"`
	ScheduleAnchor          *time.Time        `json:"-"`
	Broken                  bool              `json:"broken"`
}

Update GetTask's SELECT + Scan:

	err := s.Pool.QueryRow(ctx,
		`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping,
		        schedule_interval_seconds, schedule_anchor, broken
		 FROM tasks WHERE id=$1`, id).
		Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping,
			&t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken)

Update ListTasks's SELECT + Scan the same way (add the three columns and three scan targets):

	rows, err := s.Pool.Query(ctx,
		`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping,
		        schedule_interval_seconds, schedule_anchor, broken
		 FROM tasks ORDER BY id DESC`)
	...
		if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping,
			&t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken); err != nil {
  • Step 5: Add task scheduling setters + schedulable query

Append to internal/store/tasks.go:

// SetTaskSchedule sets the recurrence interval (0 = off), stamps the anchor when
// enabling (the first run is one interval after this), and clears broken — any
// explicit schedule change un-breaks the task.
func (s *Store) SetTaskSchedule(ctx context.Context, id, intervalSeconds int64) error {
	_, err := s.Pool.Exec(ctx,
		`UPDATE tasks SET schedule_interval_seconds=$2,
		 schedule_anchor = CASE WHEN $2 > 0 THEN now() ELSE schedule_anchor END,
		 broken = false
		 WHERE id=$1`, id, intervalSeconds)
	return err
}

// SetTaskBroken trips the schedule breaker: disables the schedule and flags the
// task for operator attention.
func (s *Store) SetTaskBroken(ctx context.Context, id int64) error {
	_, err := s.Pool.Exec(ctx,
		`UPDATE tasks SET broken=true, schedule_interval_seconds=0 WHERE id=$1`, id)
	return err
}

// SchedulableTask is the per-tick decision input for the scheduler.
type SchedulableTask struct {
	ID              int64
	IntervalSeconds int64
	Anchor          time.Time
	LastFinished    *time.Time
}

// ListSchedulableTasks returns tasks eligible to auto-run: schedule on, not
// broken, not currently running — each joined with its last finished run time.
func (s *Store) ListSchedulableTasks(ctx context.Context) ([]SchedulableTask, error) {
	rows, err := s.Pool.Query(ctx,
		`SELECT t.id, t.schedule_interval_seconds, t.schedule_anchor,
		        (SELECT max(finished_at) FROM runs r WHERE r.task_id=t.id AND r.finished_at IS NOT NULL)
		 FROM tasks t
		 WHERE t.schedule_interval_seconds > 0 AND NOT t.broken AND t.status <> 'running'`)
	if err != nil {
		return nil, err
	}
	defer rows.Close()
	out := []SchedulableTask{}
	for rows.Next() {
		var st SchedulableTask
		if err := rows.Scan(&st.ID, &st.IntervalSeconds, &st.Anchor, &st.LastFinished); err != nil {
			return nil, err
		}
		out = append(out, st)
	}
	return out, rows.Err()
}
  • Step 6: Extend store.Run, CreateRun, and add run queries

In internal/store/runs.go, add import "time" (alongside context) and replace the struct + CreateRun, then add the queries:

type Run struct {
	ID           int64      `json:"id"`
	TaskID       int64      `json:"task_id"`
	Status       string     `json:"status"`
	StartedAt    time.Time  `json:"started_at"`
	FinishedAt   *time.Time `json:"finished_at"`
	TotalCopied  int64      `json:"total_copied"`
	TotalSkipped int64      `json:"total_skipped"`
	TotalErrors  int64      `json:"total_errors"`
	Trigger      string     `json:"trigger"`
}

func (s *Store) CreateRun(ctx context.Context, taskID int64, trigger string) (int64, error) {
	var id int64
	err := s.Pool.QueryRow(ctx,
		`INSERT INTO runs (task_id, trigger) VALUES ($1,$2) RETURNING id`, taskID, trigger).Scan(&id)
	return id, err
}

// ListRunsByTask returns a task's runs, newest first, for the run-log modal.
func (s *Store) ListRunsByTask(ctx context.Context, taskID int64) ([]Run, error) {
	rows, err := s.Pool.Query(ctx,
		`SELECT id, task_id, started_at, finished_at, status,
		        total_copied, total_skipped, total_errors, trigger
		 FROM runs WHERE task_id=$1 ORDER BY id DESC`, taskID)
	if err != nil {
		return nil, err
	}
	defer rows.Close()
	out := []Run{}
	for rows.Next() {
		var r Run
		if err := rows.Scan(&r.ID, &r.TaskID, &r.StartedAt, &r.FinishedAt, &r.Status,
			&r.TotalCopied, &r.TotalSkipped, &r.TotalErrors, &r.Trigger); err != nil {
			return nil, err
		}
		out = append(out, r)
	}
	return out, rows.Err()
}

// LastFinishedRunAt returns the most recent finished run's timestamp, or nil if
// the task has never completed a run — the baseline for the next scheduled run.
func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) {
	var t *time.Time
	err := s.Pool.QueryRow(ctx,
		`SELECT max(finished_at) FROM runs WHERE task_id=$1 AND finished_at IS NOT NULL`, taskID).Scan(&t)
	return t, err
}

Keep the existing FinishRun as-is.

  • Step 7: Fix the CreateRun caller so the build passes

In internal/orchestrator/orchestrator.go, the Run method calls o.store.CreateRun(ctx, taskID). Change it to pass a trigger literal for now:

	runID, err := o.store.CreateRun(ctx, taskID, "manual")

(Task 3 threads the real trigger through; this keeps the build green now.)

  • Step 8: Run the store tests to verify they pass

Run: TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./internal/store/ -run 'TestTaskScheduleAndBroken|TestListRunsByTaskWithTrigger' -v Expected: PASS.

  • Step 9: Verify whole build + suite

Run: go build ./... && go vet ./... && TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./... Expected: all pass.

  • Step 10: Commit
git add migrations/0004_task_scheduling.up.sql migrations/0004_task_scheduling.down.sql \
        internal/store/tasks.go internal/store/runs.go internal/store/scheduling_test.go \
        internal/orchestrator/orchestrator.go
git commit -m "feat(store): task schedule columns, run trigger, scheduling queries"

Task 2: Scheduler package + wiring

Files:

  • Create: internal/scheduler/scheduler.go
  • Create: internal/scheduler/scheduler_test.go
  • Modify: cmd/server/main.go (start the scheduler)

Interfaces:

  • Consumes: store.SchedulableTask, store.ListSchedulableTasks, orchestrator.Orchestrator.Run(ctx, taskID, trigger) (the 3-arg form lands in Task 3; this task calls it as 3-arg and both compile together only after Task 3 — see Step 5 note).

  • Produces:

    • func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time
    • func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64
    • type Scheduler; func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler; func (s *Scheduler) Start(ctx context.Context)
  • Step 1: Write the failing pure-logic test

internal/scheduler/scheduler_test.go:

package scheduler

import (
	"testing"
	"time"

	"github.com/vasyansk/imap-copier/internal/store"
)

func TestNextRun(t *testing.T) {
	anchor := time.Date(2026, 7, 3, 10, 0, 0, 0, time.UTC)
	fin := time.Date(2026, 7, 3, 11, 0, 0, 0, time.UTC)

	// No finished run yet → anchor + interval.
	if got := NextRun(time.Hour, anchor, nil); !got.Equal(anchor.Add(time.Hour)) {
		t.Fatalf("first run: got %v", got)
	}
	// Finished run exists → last finished + interval (from completion, not start).
	if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) {
		t.Fatalf("recurring: got %v", got)
	}
}

func TestDueTaskIDs(t *testing.T) {
	now := time.Date(2026, 7, 3, 12, 0, 0, 0, time.UTC)
	anchorOld := now.Add(-2 * time.Hour) // enabled 2h ago
	finRecent := now.Add(-30 * time.Minute)

	tasks := []store.SchedulableTask{
		{ID: 1, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: nil},        // due: anchor+1h < now
		{ID: 2, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: &finRecent}, // not due: fin+1h > now
		{ID: 3, IntervalSeconds: 21600, Anchor: anchorOld, LastFinished: nil},       // not due: anchor+6h > now
	}
	ids := dueTaskIDs(tasks, now)
	if len(ids) != 1 || ids[0] != 1 {
		t.Fatalf("due IDs = %v, want [1]", ids)
	}
}
  • Step 2: Run test to verify it fails

Run: go test ./internal/scheduler/ Expected: FAIL — package/functions don't exist.

  • Step 3: Implement the scheduler

internal/scheduler/scheduler.go:

// Package scheduler runs tasks on their recurring interval. A single goroutine
// polls the DB every pollInterval and triggers due tasks through the
// orchestrator. The DB is the source of truth, so the scheduler holds no state
// and is safe across restarts.
package scheduler

import (
	"context"
	"log/slog"
	"time"

	"github.com/vasyansk/imap-copier/internal/orchestrator"
	"github.com/vasyansk/imap-copier/internal/store"
)

const pollInterval = 30 * time.Second

// NextRun is when a task should next run: one interval after its last completed
// run, or one interval after the schedule anchor if it has never completed one.
func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time {
	base := anchor
	if lastFinished != nil {
		base = *lastFinished
	}
	return base.Add(interval)
}

// dueTaskIDs returns the IDs of tasks whose next run is at or before now.
func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64 {
	var ids []int64
	for _, t := range tasks {
		next := NextRun(time.Duration(t.IntervalSeconds)*time.Second, t.Anchor, t.LastFinished)
		if !now.Before(next) {
			ids = append(ids, t.ID)
		}
	}
	return ids
}

type Scheduler struct {
	store *store.Store
	orch  *orchestrator.Orchestrator
}

func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler {
	return &Scheduler{store: st, orch: orch}
}

// Start blocks, ticking until ctx is cancelled. Run it in a goroutine.
func (s *Scheduler) Start(ctx context.Context) {
	ticker := time.NewTicker(pollInterval)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			s.tick(ctx)
		}
	}
}

func (s *Scheduler) tick(ctx context.Context) {
	tasks, err := s.store.ListSchedulableTasks(ctx)
	if err != nil {
		slog.Error("scheduler: list schedulable", "err", err)
		return
	}
	for _, id := range dueTaskIDs(tasks, time.Now()) {
		if _, err := s.orch.Run(ctx, id, "scheduled"); err != nil {
			// ErrAlreadyRunning / ErrNotTested are expected races/edge cases, not fatal.
			slog.Info("scheduler: run skipped", "task", id, "err", err)
		}
	}
}
  • Step 4: Wire the scheduler into main

In cmd/server/main.go, add the import "github.com/vasyansk/imap-copier/internal/scheduler" and, after orch := orchestrator.New(...) and before http.ListenAndServe, start it:

	go scheduler.New(st, orch).Start(context.Background())
  • Step 5: Run the pure test

Run: go test ./internal/scheduler/ -v Expected: PASS.

Note: go build ./... will FAIL until Task 3 changes Orchestrator.Run to the 3-arg form (s.orch.Run(ctx, id, "scheduled")). Run Task 3 immediately after this task; the package unit test passes now, the whole-module build goes green after Task 3. (The two tasks are sequenced back-to-back for this reason.)

  • Step 6: Commit
git add internal/scheduler/scheduler.go internal/scheduler/scheduler_test.go cmd/server/main.go
git commit -m "feat(scheduler): 30s polling scheduler with pure NextRun/dueTaskIDs"

Task 3: Orchestrator trigger threading + breaker

Files:

  • Modify: internal/orchestrator/orchestrator.go (Run signature; thread trigger into CreateRun + runAll; breaker in runAll)
  • Modify: internal/httpapi/run.go (handleRun passes "manual")
  • Test: internal/orchestrator/breaker_test.go (new)

Interfaces:

  • Consumes: store.SetTaskBroken (Task 1).

  • Produces:

    • func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error) (was 2-arg)
    • func shouldBreak(trigger string, totErr int64) bool
  • Step 1: Write the failing breaker-logic test

internal/orchestrator/breaker_test.go:

package orchestrator

import "testing"

func TestShouldBreak(t *testing.T) {
	cases := []struct {
		trigger string
		errs    int64
		want    bool
	}{
		{"scheduled", 2, true},  // scheduled run with errors trips the breaker
		{"scheduled", 0, false}, // scheduled run, clean — no break
		{"manual", 5, false},    // manual run never trips the breaker
		{"manual", 0, false},
	}
	for _, c := range cases {
		if got := shouldBreak(c.trigger, c.errs); got != c.want {
			t.Fatalf("shouldBreak(%q,%d)=%v want %v", c.trigger, c.errs, got, c.want)
		}
	}
}
  • Step 2: Run test to verify it fails

Run: go test ./internal/orchestrator/ -run TestShouldBreak Expected: FAIL — shouldBreak undefined.

  • Step 3: Add shouldBreak and thread the trigger through Run

In internal/orchestrator/orchestrator.go:

Add the helper (package level):

// shouldBreak reports whether a completed run should trip the schedule breaker:
// only scheduled runs that ended with errors.
func shouldBreak(trigger string, totErr int64) bool {
	return trigger == "scheduled" && totErr > 0
}

Change Run to accept and forward the trigger:

func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error) {

Inside Run, replace the temporary literal from Task 1:

	runID, err := o.store.CreateRun(ctx, taskID, trigger)

And pass the trigger into the coordinator goroutine:

	go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP, trigger)

Change runAll's signature to receive it and trip the breaker after the final status is set:

func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint, trigger string) {

At the end of runAll, after SetTaskStatus and the run_done publish, add:

	if shouldBreak(trigger, totErr) {
		_ = o.store.SetTaskBroken(ctx, task.ID)
		o.hub.Publish(wshub.Event{Type: "task_broken", TaskID: task.ID,
			Data: map[string]any{"task_id": task.ID, "errors": totErr}})
	}
  • Step 4: Update the manual caller

In internal/httpapi/run.go, handleRun:

	runID, err := s.orch.Run(r.Context(), taskID, "manual")
  • Step 5: Run the breaker test + full build

Run: go test ./internal/orchestrator/ -run TestShouldBreak -v && go build ./... && go vet ./... Expected: test PASS; build + vet clean (the scheduler's s.orch.Run(ctx, id, "scheduled") from Task 2 now resolves).

  • Step 6: Full suite

Run: TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./... Expected: all pass.

  • Step 7: Commit
git add internal/orchestrator/orchestrator.go internal/orchestrator/breaker_test.go internal/httpapi/run.go
git commit -m "feat(orchestrator): thread run trigger; breaker on scheduled runs with errors"

Task 4: HTTP — schedule endpoint, runs endpoint, next_run_at DTO

Files:

  • Modify: internal/httpapi/tasks.go (taskView with next_run_at; compute in handleGetTask; add handleSetSchedule, handleListRuns)
  • Modify: internal/httpapi/router.go (two routes)

Interfaces:

  • Consumes: store.SetTaskSchedule, store.ListRunsByTask, store.LastFinishedRunAt, store.ListAccountsByTask, scheduler.NextRun.

  • Produces: routes PUT /api/tasks/{id}/schedule, GET /api/tasks/{id}/runs; task JSON gains next_run_at.

  • Step 1: Add the task DTO with next_run_at and compute it in handleGetTask

In internal/httpapi/tasks.go, add imports "time" and "github.com/vasyansk/imap-copier/internal/scheduler". Add the view type and compute the next run:

// taskView augments the stored task with the server-computed next scheduled run
// (RFC3339 UTC; null when the schedule is off, the task is running, or broken).
type taskView struct {
	store.Task
	NextRunAt *string `json:"next_run_at"`
}

func (s *Server) taskViewFor(ctx context.Context, t store.Task) taskView {
	var nextRunAt *string
	if t.ScheduleIntervalSeconds > 0 && !t.Broken && t.Status != "running" && t.ScheduleAnchor != nil {
		last, _ := s.store.LastFinishedRunAt(ctx, t.ID)
		n := scheduler.NextRun(time.Duration(t.ScheduleIntervalSeconds)*time.Second, *t.ScheduleAnchor, last)
		str := n.UTC().Format(time.RFC3339)
		nextRunAt = &str
	}
	return taskView{Task: t, NextRunAt: nextRunAt}
}

Change handleGetTask's final write to use it:

	writeJSON(w, http.StatusOK, map[string]any{"task": s.taskViewFor(r.Context(), task), "accounts": views})

(handleListTasks stays returning []store.Task — the list needs only broken, which serializes via the json tag; next_run_at is detail-only.)

  • Step 2: Add the schedule + runs handlers

Append to internal/httpapi/tasks.go:

// handleSetSchedule enables/changes/disables a task's recurring schedule.
// Enabling (interval>0) requires every account to pass its connection tests.
func (s *Server) handleSetSchedule(w http.ResponseWriter, r *http.Request) {
	taskID, err := pathID(r, "id")
	if err != nil {
		http.Error(w, "bad id", http.StatusBadRequest)
		return
	}
	var body struct {
		IntervalSeconds int64 `json:"interval_seconds"`
	}
	if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
		http.Error(w, "bad json", http.StatusBadRequest)
		return
	}
	if body.IntervalSeconds > 0 {
		accs, err := s.store.ListAccountsByTask(r.Context(), taskID)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		if !allAccountsTested(accs) {
			http.Error(w, "all accounts must pass connection tests before scheduling", http.StatusConflict)
			return
		}
	}
	if err := s.store.SetTaskSchedule(r.Context(), taskID, body.IntervalSeconds); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusNoContent)
}

// allAccountsTested mirrors the orchestrator's run gate: non-empty and every
// account OK on both sides.
func allAccountsTested(accs []store.Account) bool {
	if len(accs) == 0 {
		return false
	}
	for _, a := range accs {
		if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" {
			return false
		}
	}
	return true
}

func (s *Server) handleListRuns(w http.ResponseWriter, r *http.Request) {
	taskID, err := pathID(r, "id")
	if err != nil {
		http.Error(w, "bad id", http.StatusBadRequest)
		return
	}
	runs, err := s.store.ListRunsByTask(r.Context(), taskID)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	writeJSON(w, http.StatusOK, runs)
}

(json and store are already imported in tasks.go.)

  • Step 3: Register routes

In internal/httpapi/router.go, after the existing PUT /api/tasks/{id}/folder-mapping line (or with the other task routes):

	api.HandleFunc("PUT /api/tasks/{id}/schedule", s.handleSetSchedule)
	api.HandleFunc("GET /api/tasks/{id}/runs", s.handleListRuns)
  • Step 4: Verify build + vet + suite

Run: go build ./... && go vet ./... && TEST_DATABASE_URL="postgres://imap:test@localhost:55433/imapcopier_test?sslmode=disable" go test ./... Expected: all pass.

  • Step 5: Commit
git add internal/httpapi/tasks.go internal/httpapi/router.go
git commit -m "feat(api): schedule + runs endpoints; next_run_at on task detail"

Task 5: Frontend — schedule control, next-run, broken badge, run-log modal, list icon

Files:

  • Modify: web/src/api.ts (Task fields; Run type; setTaskSchedule, listRuns)
  • Create: web/src/components/RunLogModal.tsx
  • Modify: web/src/pages/TaskDetail.tsx (schedule select, next-run line, broken badge, Runs button+modal, reload on task_broken)
  • Modify: web/src/pages/Tasks.tsx (broken indicator in the list)
  • Modify: web/src/app.css (broken badge / schedule-row styles)

Interfaces:

  • Consumes: backend routes from Task 4; Modal (web/src/components/Modal.tsx), StatusBadge.

  • Step 1: Extend the API client

In web/src/api.ts, extend Task:

export interface Task {
  id: number
  name: string
  src_endpoint_id: number
  dst_endpoint_id: number
  status: string
  folder_mapping?: Record<string, string>
  schedule_interval_seconds?: number
  broken?: boolean
  next_run_at?: string | null
}

Add a Run interface and the two calls (near runTask):

export interface Run {
  id: number
  task_id: number
  status: string
  started_at: string
  finished_at: string | null
  total_copied: number
  total_skipped: number
  total_errors: number
  trigger: string
}

export const setTaskSchedule = (taskId: number, intervalSeconds: number) =>
  api(`/api/tasks/${taskId}/schedule`, { ...jsonBody({ interval_seconds: intervalSeconds }), method: 'PUT' })

export const listRuns = (taskId: number) => api<Run[]>(`/api/tasks/${taskId}/runs`)
  • Step 2: Create the run-log modal

web/src/components/RunLogModal.tsx:

import { useEffect, useState } from 'react'
import { Modal } from './Modal'
import { StatusBadge } from './StatusBadge'
import { listRuns, type Run } from '../api'

const fmt = (iso: string | null) => (iso ? new Date(iso).toLocaleString() : '—')

export function RunLogModal({ taskId, open, onClose }: { taskId: number; open: boolean; onClose: () => void }) {
  const [runs, setRuns] = useState<Run[] | null>(null)

  useEffect(() => {
    if (!open) return
    setRuns(null)
    listRuns(taskId).then((r) => setRuns(r ?? [])).catch(() => setRuns([]))
  }, [open, taskId])

  return (
    <Modal open={open} title="Run log" onClose={onClose} size="lg">
      <div className="tbl-wrap">
        <table className="tbl">
          <thead>
            <tr>
              <th>Started</th>
              <th>Finished</th>
              <th>Trigger</th>
              <th>Status</th>
              <th>Copied</th>
              <th>Skipped</th>
              <th>Errors</th>
            </tr>
          </thead>
          <tbody>
            {runs === null ? (
              <tr className="empty-row"><td colSpan={7}>loading</td></tr>
            ) : runs.length === 0 ? (
              <tr className="empty-row"><td colSpan={7}>no runs yet</td></tr>
            ) : (
              runs.map((r) => (
                <tr key={r.id}>
                  <td>{fmt(r.started_at)}</td>
                  <td>{fmt(r.finished_at)}</td>
                  <td>{r.trigger}</td>
                  <td><StatusBadge status={r.status} /></td>
                  <td className="num-cell">{r.total_copied}</td>
                  <td className="num-cell">{r.total_skipped}</td>
                  <td className="num-cell">{r.total_errors}</td>
                </tr>
              ))
            )}
          </tbody>
        </table>
      </div>
    </Modal>
  )
}
  • Step 3: TaskDetail — schedule control, next-run, broken badge, Runs modal

In web/src/pages/TaskDetail.tsx:

Add to the imports from ../api: listRuns is not needed here; add setTaskSchedule. Import the modal: import { RunLogModal } from '../components/RunLogModal'.

Add state near the other useStates:

  const [showRuns, setShowRuns] = useState(false)

Add task_broken to the reload event list (the existing if ([...].includes(ev.type)) reload() array) so the detail refreshes when the breaker trips:

        if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled', 'plan', 'task_broken'].includes(ev.type)) {
          reload()
        }

Add a schedule handler:

  async function onSchedule(intervalSeconds: number) {
    setError(null)
    try {
      await setTaskSchedule(id, intervalSeconds)
      reload()
    } catch (err) {
      setError(err instanceof Error ? err.message : 'Failed to set schedule')
    }
  }

In the "Run control" panel (after the .btn-row with Test/Run), add the schedule row. task is in scope from const { task, accounts } = data:

        <div className="sched-row">
          <label htmlFor="sched">Schedule</label>
          <select
            id="sched"
            value={task.schedule_interval_seconds ?? 0}
            onChange={(e) => onSchedule(Number(e.target.value))}
          >
            <option value={0}>Off</option>
            <option value={3600}>Every 1h</option>
            <option value={10800}>Every 3h</option>
            <option value={21600}>Every 6h</option>
            <option value={43200}>Every 12h</option>
            <option value={86400}>Every 24h</option>
          </select>
          {task.next_run_at && (
            <span className="sched-next">Next run: {new Date(task.next_run_at).toLocaleString()}</span>
          )}
          {task.broken && <span className="badge badge-fail"><span className="dot" />broken</span>}
          <button type="button" className="link-btn" onClick={() => setShowRuns(true)}>
            runs
          </button>
        </div>

Add the broken badge next to the status in the page head (<StatusBadge status={task.status} />):

        <StatusBadge status={task.status} />
        {task.broken && <span className="badge badge-fail" style={{ marginLeft: 8 }}><span className="dot" />broken</span>}

Render the modal near the other modals at the end of the returned JSX:

      <RunLogModal taskId={id} open={showRuns} onClose={() => setShowRuns(false)} />
  • Step 4: Tasks list — broken indicator

In web/src/pages/Tasks.tsx, in the row's Status cell, show the broken badge beside the status:

                    <td>
                      <StatusBadge status={t.status} />
                      {t.broken && <span className="badge badge-fail" style={{ marginLeft: 8 }}><span className="dot" />broken</span>}
                    </td>
  • Step 5: CSS for the schedule row

In web/src/app.css, add:

.sched-row {
  display: flex;
  align-items: center;
  gap: 12px;
  margin-top: 14px;
  flex-wrap: wrap;
}

.sched-row label {
  font-size: 11px;
  font-weight: 700;
  letter-spacing: 0.1em;
  text-transform: uppercase;
  color: var(--fg-dim);
}

.sched-row select {
  background: var(--bg-inset);
  border: 1px solid var(--border);
  color: var(--fg);
  padding: 7px 10px;
  font-size: 13px;
  border-radius: 2px;
}

.sched-next {
  font-size: 12px;
  color: var(--fg-dim);
  font-variant-numeric: tabular-nums;
}
  • Step 6: Verify build + lint

Run: cd web && npm run build && npx oxlint src/ Expected: build clean; oxlint shows ONLY the two pre-existing warnings (ConfirmProvider only-export-components, TaskDetail exhaustive-deps). Fix any new finding.

  • Step 7: Commit
git add web/src/api.ts web/src/components/RunLogModal.tsx web/src/pages/TaskDetail.tsx web/src/pages/Tasks.tsx web/src/app.css
git commit -m "feat(web): schedule control, next-run, broken badge, run-log modal"

Task 6: End-to-end verification on prod

Per project rules, functional verification runs on prod after deploy. No code.

  • Step 1: Build + migrate + deploy

Run: make build && make up (migration 0004 auto-applies at startup). Wait for curl -fsS http://localhost/healthz.

  • Step 2: Enable-gate + next-run display

On a task whose accounts are NOT all tested OK, set Schedule → the request returns 409 and the schedule stays Off (surfaced in the error banner). Test the accounts to OK, then set Schedule = Every 1h → "Next run: " appears (≈ now + 1h, in browser local time), and no run starts immediately (first run is one interval out).

  • Step 3: Run-log modal

Run the task manually once. Open runs → the modal lists the run with local-time started/finished, trigger = manual, status, and copied/skipped/errors.

  • Step 4: Breaker (scheduled failure → broken)

Simulate a scheduled failure: break one account (e.g. wrong stored password) on a scheduled task, then let a scheduled run occur (or temporarily observe the next scheduled cycle). When the scheduled run finishes with errors, the schedule flips to Off, the task shows the red broken badge in both the task detail and the Tasks list, and no further auto-runs occur until re-enabled. (Auto-run timing itself is covered by the scheduler unit tests; presets start at 1h, so the timed trigger is not observed within a quick E2E — verify the breaker/flag transition and the DB state.)

  • Step 5: Delete cleanup

Delete a task that had a schedule and runs; confirm it disappears and its runs are gone (cascade). The scheduler stops considering it (no errors in logs).

  • Step 6: Record result

Save an E2E note under ./swarm-report/scheduled-task-runs-<YYYY-MM-DD>.md with pass/fail per step.


Self-Review

  • Spec coverage: interval per task + auto-run (Tasks 13); don't-start-if-running + interval-from-completion (ListSchedulableTasks status<>'running' filter + NextRun from LastFinished, Tasks 12); next-run in browser local time (next_run_at DTO Task 4 + toLocaleString Task 5); run-log modal with per-run status/totals (Tasks 45); enable-gate all-OK → 409 (Task 4); breaker scheduled-error → broken + red icon in list & detail (Tasks 3, 5); first-run-after-interval via anchor (Task 1 SetTaskSchedule + Task 2 NextRun); delete cleanup via cascade (noted, no code); migration 0004 (Task 1). All covered.
  • Placeholder scan: none — every step carries real code/commands.
  • Type consistency: CreateRun(ctx, taskID, trigger) identical across Tasks 1/3; Run(ctx, taskID, trigger) defined Task 3, called by scheduler Task 2 and handleRun Task 3; NextRun(interval, anchor, lastFinished) identical in scheduler (Task 2) and taskViewFor (Task 4); SchedulableTask{ID,IntervalSeconds,Anchor,LastFinished} fields match between store (Task 1) and dueTaskIDs (Task 2); JSON contract schedule_interval_seconds/broken/next_run_at/interval_seconds and Run fields match between Go (Tasks 1,4) and TS (Task 5).
  • Sequencing note: Task 2's whole-module build is red until Task 3 lands the 3-arg Run (the scheduler calls it). Both are flagged to run back-to-back; each task's own unit test passes independently.