Merge feat/scheduled-task-runs: recurring task execution

A 30s in-process scheduler auto-runs tasks on a recurring interval
(1/3/6/12/24h). Interval is measured from the last run's completion; a task
already running is never re-triggered. Enabling a schedule requires all
accounts tested OK. A breaker disables the schedule and flags the task broken
(red badge in list + detail) when a scheduled run errors or panics. Next-run
time (browser-local) and a run-log modal (per-run status/totals) are shown.
Migration 0004 adds tasks.schedule_interval_seconds/schedule_anchor/broken and
runs.trigger; task delete cascades runs as before.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-07-03 13:40:39 +07:00
22 changed files with 846 additions and 29 deletions
+3
View File
@@ -13,6 +13,7 @@ import (
"github.com/vasyansk/imap-copier/internal/config"
"github.com/vasyansk/imap-copier/internal/httpapi"
"github.com/vasyansk/imap-copier/internal/orchestrator"
"github.com/vasyansk/imap-copier/internal/scheduler"
"github.com/vasyansk/imap-copier/internal/store"
"github.com/vasyansk/imap-copier/internal/wshub"
)
@@ -45,6 +46,8 @@ func main() {
orch := orchestrator.New(st, hub, cfg.EncKey, cfg.WorkerConcurrency)
srv := httpapi.NewServer(cfg, st, orch, hub)
go scheduler.New(st, orch).Start(context.Background())
slog.Info("listening", "addr", cfg.HTTPAddr)
if err := http.ListenAndServe(cfg.HTTPAddr, srv.Router()); err != nil {
slog.Error("serve", "err", err)
@@ -0,0 +1,207 @@
# Scheduled (recurring) task runs — design
**Date:** 2026-07-03
**Status:** awaiting user review
## Context
Migrations are run manually today (the operator clicks "Run migration"). For
ongoing source→destination sync, the operator wants a task to run itself on a
recurring interval (e.g. every 1h or 6h) without babysitting it. Requirements
from the user:
- Per-task recurrence interval; the task runs automatically.
- Never start a run while the previous one is still running; measure the interval
from the **completion** of the last run (not its start).
- Show when the next run is due, in the **browser's local time**.
- A separate modal showing a log of runs with their status and totals.
- Deleting a task must clean up all this data along with the task.
Decisions confirmed with the user:
- **Run-log detail:** per-run totals only (reuse `runs`); no per-account snapshot.
- **Enable gate:** a schedule can be enabled only when **all accounts test OK**.
- **Breaker:** if a *scheduled* run finishes with errors, disable the schedule and
mark the task **broken** (red icon in the tasks list and the task detail).
Manual runs never trip the breaker.
- **First run:** one interval after enabling (`schedule_anchor + interval`).
- **Intervals:** presets 1 / 3 / 6 / 12 / 24 h, plus Off.
### Current architecture (as-is)
- `runs` table already has `started_at`, `finished_at`, `status`, and totals
(`migrations/0001_init.up.sql:35-44`). `CreateRun`/`FinishRun` in
`internal/store/runs.go`. No list-by-task query yet.
- `Orchestrator.Run(ctx, taskID)` gates on `gateOK` (all accounts test ok) and
`TryMarkTaskRunning` (atomic single-run), creates a run, and launches `runAll`
asynchronously; `runAll` sets the task status and calls `FinishRun`
(`internal/orchestrator/orchestrator.go`).
- `ON DELETE CASCADE` from `tasks` already removes `runs`, `accounts`, and
`migrated_messages` when a task is deleted.
- `main.go` wires store → orchestrator → hub → server and calls
`ResetRunningOnStartup` to clear phantom "running" after a restart. No scheduler.
- Task DTO / TS `Task` interface: `internal/httpapi/tasks.go`, `web/src/api.ts`.
## Decision: in-process polling scheduler
A single background goroutine (a `Scheduler`) started from `main.go`, ticking
every **30 s**. Each tick queries the DB for schedulable tasks and triggers the
due ones through the existing `Orchestrator.Run`. The DB is the source of truth,
so the scheduler is stateless and restart-safe (paired with the existing
`ResetRunningOnStartup`). Chosen over per-task timers (restart-fragile, in-memory
state) and external cron (breaks the single-binary model, needs auth wiring).
30 s poll precision is ample for hour-scale intervals.
## Data model
**Migration `0004_task_scheduling`:**
```sql
-- up
ALTER TABLE tasks ADD COLUMN schedule_interval_seconds INT NOT NULL DEFAULT 0;
ALTER TABLE tasks ADD COLUMN schedule_anchor TIMESTAMPTZ;
ALTER TABLE tasks ADD COLUMN broken BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE runs ADD COLUMN trigger TEXT NOT NULL DEFAULT 'manual';
-- down
ALTER TABLE runs DROP COLUMN trigger;
ALTER TABLE tasks DROP COLUMN broken;
ALTER TABLE tasks DROP COLUMN schedule_anchor;
ALTER TABLE tasks DROP COLUMN schedule_interval_seconds;
```
- `schedule_interval_seconds` — 0 = Off. Presets map to seconds (3600, 10800,
21600, 43200, 86400).
- `schedule_anchor` — set to `now()` when a schedule is enabled; the baseline for
the first run when no finished run exists yet.
- `broken` — the schedule breaker tripped; task needs operator attention.
- `runs.trigger``'manual' | 'scheduled'`; drives the breaker and enriches the
run log.
`store.Task` gains `ScheduleIntervalSeconds int64`, `ScheduleAnchor *time.Time`,
`Broken bool`. `store.Run` gains `Trigger string`, plus `StartedAt time.Time` and
`FinishedAt *time.Time` (needed for the log + due computation; not currently
scanned).
### Cascade / deletion
No new cleanup code. `runs` cascade-delete on task delete already; the schedule
columns live on `tasks` and vanish with the row. The in-memory scheduler polls
the DB, so a deleted task simply stops appearing. (Confirmed against
`migrations/0001_init.up.sql` FKs.)
## Store additions
- `SetTaskSchedule(ctx, id, intervalSeconds int64) error` — sets interval, sets
`schedule_anchor = now()` when enabling (interval>0) / leaves anchor when
disabling, and clears `broken` on any call (any explicit schedule change
un-breaks the task; enabling is additionally gated on all-accounts-OK in the
HTTP layer, disabling is always allowed).
- `SetTaskBroken(ctx, id) error``broken = true, schedule_interval_seconds = 0`
in one UPDATE (the breaker).
- `CreateRun(ctx, taskID, trigger string)` — extend the existing signature to
record the trigger.
- `ListRunsByTask(ctx, taskID) ([]Run, error)` — newest first, for the modal;
scans `id, started_at, finished_at, status, totals, trigger`.
- `LastFinishedRunAt(ctx, taskID) (*time.Time, error)` — most recent run with a
non-null `finished_at`, for due computation. (Or fold into `ListSchedulable`.)
- `ListSchedulableTasks(ctx) ([]ScheduledTask, error)` — tasks with
`schedule_interval_seconds > 0 AND NOT broken AND status <> 'running'`, joined
with their last finished run's `finished_at`, so the scheduler decides in one
query per tick. Returns the fields `dueAt` needs.
`GetTask`/`ListTasks` SELECT + Scan extend for the three new task columns.
## Scheduler (`internal/scheduler/`)
New package, one focused responsibility.
- `type Scheduler struct { store *store.Store; orch *orchestrator.Orchestrator }`
- `func (s *Scheduler) Start(ctx context.Context)``time.NewTicker(30s)` loop;
on each tick calls `s.tick(ctx)`; stops on `ctx.Done()`.
- `func (s *Scheduler) tick(ctx)``ListSchedulableTasks`, then for each due task
calls `s.orch.Run(ctx, taskID)` with `trigger = "scheduled"`. `Run` errors
(`ErrAlreadyRunning`, `ErrNotTested`) are logged, not fatal.
- **Pure, unit-tested decision function:**
`func dueAt(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time`
`lastFinished + interval` if a finished run exists, else `anchor + interval`.
A task is due when `now >= dueAt(...)`. Kept pure so the tick logic is testable
without a DB or clock injection at the call site.
`main.go` starts it: `go scheduler.New(st, orch).Start(context.Background())`.
### Trigger threading + breaker
`Orchestrator.Run` gains a `trigger string` parameter (manual callers pass
`"manual"`; the scheduler passes `"scheduled"`). `Run` forwards it to
`CreateRun` and into `runAll`. In `runAll`, after the final status is computed:
```
if trigger == "scheduled" && totErr > 0 {
_ = o.store.SetTaskBroken(ctx, task.ID)
o.hub.Publish(Event{Type: "task_broken", TaskID: task.ID, ...})
}
```
The HTTP `handleRun` passes `"manual"`. This keeps the breaker precise: only
scheduled runs trip it.
## HTTP API
- **`PUT /api/tasks/{id}/schedule`** — body `{interval_seconds: int}`. Enabling
(interval>0) first checks `gateOK` (all accounts test OK) via
`ListAccountsByTask`; returns **409** with a clear message if not. Then
`SetTaskSchedule`. Disabling (0) always allowed.
- **`GET /api/tasks/{id}/runs`** — `ListRunsByTask` → run-log rows.
- **Task DTO** (`tasks.go`) gains `schedule_interval_seconds`, `broken`, and a
computed `next_run_at *string` (RFC3339, UTC; null when off / running / broken).
`next_run_at` is computed server-side from the same `dueAt` logic so the client
only formats it. `ListTasks` DTO also includes `broken` (for the red icon in the
list).
## Frontend
- **`api.ts`**: `Task` gains `schedule_interval_seconds?`, `broken?`,
`next_run_at?`. New `setTaskSchedule(taskId, intervalSeconds)` and
`listRuns(taskId)` (+ a `Run` interface).
- **`TaskDetail.tsx`**: in the run-control panel, an interval `<select>`
(Off/1h/3h/6h/12h/24h) bound to `setTaskSchedule`; a "Next run: <local time>"
line formatting `next_run_at` with `toLocaleString()` (shows "running…" when a
run is active, nothing when off); a red **broken** badge when `broken`. A
**Runs** button opens a new modal.
- **`RunLogModal.tsx`** (new): lists `listRuns(taskId)` rows — started/finished in
browser-local time, `trigger`, a `StatusBadge`, and copied/skipped/errors.
Reuses the existing `Modal` + `StatusBadge` + `.tbl` patterns.
- **`Tasks.tsx`**: red icon/indicator on rows where `broken`.
- WS: on `task_broken`/`run_done`, `TaskDetail` reloads (existing reload wiring);
`Tasks` list refreshes.
## Error handling
- Enable-while-not-tested → 409, surfaced in the existing `role="alert"` banner.
- Scheduler `Run` errors are logged and skipped; a broken task is skipped every
tick until the operator re-enables (which clears `broken`).
- Restart: `ResetRunningOnStartup` clears phantom running; the scheduler recomputes
due times from persisted `finished_at`/`anchor`.
## Testing
- **store**: `SetTaskSchedule` (sets interval+anchor, clears broken),
`SetTaskBroken` (interval→0, broken→true), `CreateRun` records trigger,
`ListRunsByTask` ordering + fields, `GetTask` returns new columns.
- **scheduler (pure)**: `dueAt` / due decision — not-yet-due, due-from-finished,
first-run-from-anchor, running skipped, broken skipped.
- **orchestrator**: scheduled run with errors trips `SetTaskBroken`; manual run
with errors does not.
- **cascade**: deleting a task removes its runs (extend existing cascade test).
- **E2E** (prod, per project rules): enable a schedule on a tested task → "Next
run" shows correct local time; (with a short test interval) the run auto-starts;
the Runs modal shows the entry with `scheduled` trigger; break an account → the
next scheduled run disables the schedule and marks the task broken (red icon in
list + detail).
## Out of scope (YAGNI)
- Arbitrary cron expressions; catch-up/backfill of missed runs (run once when due);
notifications; per-account breakdown in the run log (totals chosen); configurable
poll interval.
+2
View File
@@ -22,6 +22,8 @@ func (s *Server) Router() http.Handler {
api.HandleFunc("POST /api/tasks/{id}/accounts", s.handleCreateAccount)
api.HandleFunc("POST /api/tasks/{id}/probe", s.handleProbeFolders)
api.HandleFunc("PUT /api/tasks/{id}/folder-mapping", s.handleSetFolderMapping)
api.HandleFunc("PUT /api/tasks/{id}/schedule", s.handleSetSchedule)
api.HandleFunc("GET /api/tasks/{id}/runs", s.handleListRuns)
api.HandleFunc("DELETE /api/tasks/{id}/accounts/{accountId}", s.handleDeleteAccount)
api.HandleFunc("POST /api/tasks/{id}/import", s.handleImportCSV)
api.HandleFunc("POST /api/tasks/{id}/test", s.handleTestAccounts)
+1 -1
View File
@@ -69,7 +69,7 @@ func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
runID, err := s.orch.Run(r.Context(), taskID)
runID, err := s.orch.Run(r.Context(), taskID, "manual")
if errors.Is(err, orchestrator.ErrNotTested) {
http.Error(w, "accounts must pass connection tests first", http.StatusConflict)
return
+94 -1
View File
@@ -1,9 +1,12 @@
package httpapi
import (
"context"
"encoding/json"
"net/http"
"time"
"github.com/vasyansk/imap-copier/internal/scheduler"
"github.com/vasyansk/imap-copier/internal/store"
)
@@ -50,5 +53,95 @@ func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) {
for _, a := range accs {
views = append(views, accountDTO(a))
}
writeJSON(w, http.StatusOK, map[string]any{"task": task, "accounts": views})
tv, err := s.taskViewFor(r.Context(), task)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, map[string]any{"task": tv, "accounts": views})
}
// taskView augments the stored task with the server-computed next scheduled run
// (RFC3339 UTC; null when the schedule is off, the task is running, or broken).
type taskView struct {
store.Task
NextRunAt *string `json:"next_run_at"`
}
func (s *Server) taskViewFor(ctx context.Context, t store.Task) (taskView, error) {
if t.ScheduleIntervalSeconds <= 0 || t.Broken || t.Status == "running" || t.ScheduleAnchor == nil {
return taskView{Task: t}, nil
}
last, err := s.store.LastFinishedRunAt(ctx, t.ID)
if err != nil {
return taskView{}, err
}
n := scheduler.NextRun(time.Duration(t.ScheduleIntervalSeconds)*time.Second, *t.ScheduleAnchor, last)
str := n.UTC().Format(time.RFC3339)
return taskView{Task: t, NextRunAt: &str}, nil
}
// handleSetSchedule enables/changes/disables a task's recurring schedule.
// Enabling (interval>0) requires every account to pass its connection tests.
func (s *Server) handleSetSchedule(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
var body struct {
IntervalSeconds int64 `json:"interval_seconds"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
if body.IntervalSeconds < 0 {
http.Error(w, "interval_seconds must be >= 0", http.StatusBadRequest)
return
}
if body.IntervalSeconds > 0 {
accs, err := s.store.ListAccountsByTask(r.Context(), taskID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !allAccountsTested(accs) {
http.Error(w, "all accounts must pass connection tests before scheduling", http.StatusConflict)
return
}
}
if err := s.store.SetTaskSchedule(r.Context(), taskID, body.IntervalSeconds); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
// allAccountsTested mirrors the orchestrator's run gate: non-empty and every
// account OK on both sides.
func allAccountsTested(accs []store.Account) bool {
if len(accs) == 0 {
return false
}
for _, a := range accs {
if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" {
return false
}
}
return true
}
func (s *Server) handleListRuns(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
runs, err := s.store.ListRunsByTask(r.Context(), taskID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, runs)
}
+21
View File
@@ -0,0 +1,21 @@
package orchestrator
import "testing"
func TestShouldBreak(t *testing.T) {
cases := []struct {
trigger string
errs int64
want bool
}{
{"scheduled", 2, true}, // scheduled run with errors trips the breaker
{"scheduled", 0, false}, // scheduled run, clean — no break
{"manual", 5, false}, // manual run never trips the breaker
{"manual", 0, false},
}
for _, c := range cases {
if got := shouldBreak(c.trigger, c.errs); got != c.want {
t.Fatalf("shouldBreak(%q,%d)=%v want %v", c.trigger, c.errs, got, c.want)
}
}
}
+21 -4
View File
@@ -150,7 +150,13 @@ func (o *Orchestrator) testSide(ctx context.Context, ep imapx.Endpoint, accID in
}})
}
func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) {
// shouldBreak reports whether a completed run should trip the schedule breaker:
// only scheduled runs that ended with errors.
func shouldBreak(trigger string, totErr int64) bool {
return trigger == "scheduled" && totErr > 0
}
func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error) {
task, err := o.store.GetTask(ctx, taskID)
if err != nil {
return 0, err
@@ -174,23 +180,28 @@ func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) {
_ = o.store.SetTaskStatus(ctx, taskID, "error")
return 0, err
}
runID, err := o.store.CreateRun(ctx, taskID)
runID, err := o.store.CreateRun(ctx, taskID, trigger)
if err != nil {
_ = o.store.SetTaskStatus(ctx, taskID, "error")
return 0, err
}
o.hub.Publish(wshub.Event{Type: "run_started", TaskID: taskID, Data: map[string]any{"run_id": runID}})
go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP)
go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP, trigger)
return runID, nil
}
func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint) {
func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint, trigger string) {
defer func() {
if r := recover(); r != nil {
slog.Error("run coordinator panicked", "task", task.ID, "run", runID, "panic", r)
_ = o.store.FinishRun(ctx, runID, "error", 0, 0, 0)
_ = o.store.SetTaskStatus(ctx, task.ID, "error")
if trigger == "scheduled" {
_ = o.store.SetTaskBroken(ctx, task.ID)
o.hub.Publish(wshub.Event{Type: "task_broken", TaskID: task.ID,
Data: map[string]any{"task_id": task.ID, "errors": int64(0)}})
}
}
}()
@@ -236,6 +247,12 @@ func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64,
_ = o.store.SetTaskStatus(ctx, task.ID, status)
o.hub.Publish(wshub.Event{Type: "run_done", TaskID: task.ID,
Data: map[string]any{"run_id": runID, "copied": totCopied, "skipped": totSkipped, "errors": totErr}})
if shouldBreak(trigger, totErr) {
_ = o.store.SetTaskBroken(ctx, task.ID)
o.hub.Publish(wshub.Event{Type: "task_broken", TaskID: task.ID,
Data: map[string]any{"task_id": task.ID, "errors": totErr}})
}
}
func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID int64, a store.Account, srcEP, dstEP imapx.Endpoint) (int64, int64, int64) {
+75
View File
@@ -0,0 +1,75 @@
// Package scheduler runs tasks on their recurring interval. A single goroutine
// polls the DB every pollInterval and triggers due tasks through the
// orchestrator. The DB is the source of truth, so the scheduler holds no state
// and is safe across restarts.
package scheduler
import (
"context"
"log/slog"
"time"
"github.com/vasyansk/imap-copier/internal/orchestrator"
"github.com/vasyansk/imap-copier/internal/store"
)
const pollInterval = 30 * time.Second
// NextRun is when a task should next run: one interval after its last completed
// run, or one interval after the schedule anchor if it has never completed one.
func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time {
base := anchor
if lastFinished != nil && lastFinished.After(anchor) {
base = *lastFinished
}
return base.Add(interval)
}
// dueTaskIDs returns the IDs of tasks whose next run is at or before now.
func dueTaskIDs(tasks []store.SchedulableTask, now time.Time) []int64 {
var ids []int64
for _, t := range tasks {
next := NextRun(time.Duration(t.IntervalSeconds)*time.Second, t.Anchor, t.LastFinished)
if !now.Before(next) {
ids = append(ids, t.ID)
}
}
return ids
}
type Scheduler struct {
store *store.Store
orch *orchestrator.Orchestrator
}
func New(st *store.Store, orch *orchestrator.Orchestrator) *Scheduler {
return &Scheduler{store: st, orch: orch}
}
// Start blocks, ticking until ctx is cancelled. Run it in a goroutine.
func (s *Scheduler) Start(ctx context.Context) {
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.tick(ctx)
}
}
}
func (s *Scheduler) tick(ctx context.Context) {
tasks, err := s.store.ListSchedulableTasks(ctx)
if err != nil {
slog.Error("scheduler: list schedulable", "err", err)
return
}
for _, id := range dueTaskIDs(tasks, time.Now()) {
if _, err := s.orch.Run(ctx, id, "scheduled"); err != nil {
// ErrAlreadyRunning / ErrNotTested are expected races/edge cases, not fatal.
slog.Info("scheduler: run skipped", "task", id, "err", err)
}
}
}
+44
View File
@@ -0,0 +1,44 @@
package scheduler
import (
"testing"
"time"
"github.com/vasyansk/imap-copier/internal/store"
)
func TestNextRun(t *testing.T) {
anchor := time.Date(2026, 7, 3, 10, 0, 0, 0, time.UTC)
fin := time.Date(2026, 7, 3, 11, 0, 0, 0, time.UTC)
// No finished run yet → anchor + interval.
if got := NextRun(time.Hour, anchor, nil); !got.Equal(anchor.Add(time.Hour)) {
t.Fatalf("first run: got %v", got)
}
// Finished run exists → last finished + interval (from completion, not start).
if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) {
t.Fatalf("recurring: got %v", got)
}
// Re-enable: anchor is newer than a stale lastFinished from before re-enable →
// use anchor, not the stale lastFinished (which would fire immediately/in the past).
staleFin := anchor.Add(-2 * time.Hour)
if got := NextRun(time.Hour, anchor, &staleFin); !got.Equal(anchor.Add(time.Hour)) {
t.Fatalf("re-enable: got %v, want %v", got, anchor.Add(time.Hour))
}
}
func TestDueTaskIDs(t *testing.T) {
now := time.Date(2026, 7, 3, 12, 0, 0, 0, time.UTC)
anchorOld := now.Add(-2 * time.Hour) // enabled 2h ago
finRecent := now.Add(-30 * time.Minute)
tasks := []store.SchedulableTask{
{ID: 1, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: nil}, // due: anchor+1h < now
{ID: 2, IntervalSeconds: 3600, Anchor: anchorOld, LastFinished: &finRecent}, // not due: fin+1h > now
{ID: 3, IntervalSeconds: 21600, Anchor: anchorOld, LastFinished: nil}, // not due: anchor+6h > now
}
ids := dueTaskIDs(tasks, now)
if len(ids) != 1 || ids[0] != 1 {
t.Fatalf("due IDs = %v, want [1]", ids)
}
}
+1 -1
View File
@@ -49,7 +49,7 @@ func TestDeleteTaskCascades(t *testing.T) {
ep2, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "d", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: ep1, DstEndpointID: ep2})
accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "v", DstPassEnc: "y"})
_, _ = s.CreateRun(ctx, taskID)
_, _ = s.CreateRun(ctx, taskID, "manual")
if err := s.DeleteTask(ctx, taskID); err != nil {
t.Fatalf("delete task: %v", err)
+7
View File
@@ -6,6 +6,8 @@ import "context"
// process died mid-run (crash, container restart). A fresh process has no
// in-flight goroutines, so any persisted "running" is stale and would otherwise
// wedge the task (the run-guard refuses to start, and accounts can't be edited).
// It also closes any run rows left stuck in "running" (finished_at NULL), which
// would otherwise surface as perpetually "running" in the run-log modal.
// Returns how many task and account rows were reset.
func (s *Store) ResetRunningOnStartup(ctx context.Context) (tasks int64, accounts int64, err error) {
ct, err := s.Pool.Exec(ctx, `UPDATE accounts SET status='idle' WHERE status='running'`)
@@ -18,6 +20,11 @@ func (s *Store) ResetRunningOnStartup(ctx context.Context) (tasks int64, account
return 0, accounts, err
}
tasks = ct.RowsAffected()
_, err = s.Pool.Exec(ctx,
`UPDATE runs SET status='error', finished_at=now() WHERE finished_at IS NULL`)
if err != nil {
return tasks, accounts, err
}
return tasks, accounts, nil
}
+14
View File
@@ -15,6 +15,7 @@ func TestResetRunningOnStartup(t *testing.T) {
// simulate a crash mid-run
_ = s.SetTaskStatus(ctx, taskID, "running")
_ = s.SetAccountStatus(ctx, accID, "running")
runID, _ := s.CreateRun(ctx, taskID, "manual") // phantom run: never finished
tn, an, err := s.ResetRunningOnStartup(ctx)
if err != nil {
@@ -31,6 +32,19 @@ func TestResetRunningOnStartup(t *testing.T) {
if accs[0].Status == "running" {
t.Fatal("account still running after reset")
}
runs, _ := s.ListRunsByTask(ctx, taskID)
var found bool
for _, r := range runs {
if r.ID == runID {
found = true
if r.Status == "running" || r.FinishedAt == nil {
t.Fatalf("phantom run %d still running: status=%s finished_at=%v", runID, r.Status, r.FinishedAt)
}
}
}
if !found {
t.Fatalf("run %d not found", runID)
}
}
func TestClearStuckAccountAndReconcile(t *testing.T) {
+48 -9
View File
@@ -1,20 +1,26 @@
package store
import "context"
import (
"context"
"time"
)
type Run struct {
ID int64
TaskID int64
Status string
TotalCopied int64
TotalSkipped int64
TotalErrors int64
ID int64 `json:"id"`
TaskID int64 `json:"task_id"`
Status string `json:"status"`
StartedAt time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at"`
TotalCopied int64 `json:"total_copied"`
TotalSkipped int64 `json:"total_skipped"`
TotalErrors int64 `json:"total_errors"`
Trigger string `json:"trigger"`
}
func (s *Store) CreateRun(ctx context.Context, taskID int64) (int64, error) {
func (s *Store) CreateRun(ctx context.Context, taskID int64, trigger string) (int64, error) {
var id int64
err := s.Pool.QueryRow(ctx,
`INSERT INTO runs (task_id) VALUES ($1) RETURNING id`, taskID).Scan(&id)
`INSERT INTO runs (task_id, trigger) VALUES ($1,$2) RETURNING id`, taskID, trigger).Scan(&id)
return id, err
}
@@ -25,3 +31,36 @@ func (s *Store) FinishRun(ctx context.Context, id int64, status string, copied,
id, status, copied, skipped, errs)
return err
}
// ListRunsByTask returns a task's runs, newest first, for the run-log modal.
func (s *Store) ListRunsByTask(ctx context.Context, taskID int64) ([]Run, error) {
rows, err := s.Pool.Query(ctx,
`SELECT id, task_id, started_at, finished_at, status,
total_copied, total_skipped, total_errors, trigger
FROM runs WHERE task_id=$1 ORDER BY id DESC`, taskID)
if err != nil {
return nil, err
}
defer rows.Close()
out := []Run{}
for rows.Next() {
var r Run
if err := rows.Scan(&r.ID, &r.TaskID, &r.StartedAt, &r.FinishedAt, &r.Status,
&r.TotalCopied, &r.TotalSkipped, &r.TotalErrors, &r.Trigger); err != nil {
return nil, err
}
out = append(out, r)
}
return out, rows.Err()
}
// LastFinishedRunAt returns the most recent finished run's timestamp, or nil if
// the task has never completed a run — the baseline for the next scheduled run.
// The same max(finished_at) rule is also inlined in ListSchedulableTasks's
// subquery — keep the two in sync if this changes.
func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) {
var t *time.Time
err := s.Pool.QueryRow(ctx,
`SELECT max(finished_at) FROM runs WHERE task_id=$1 AND finished_at IS NOT NULL`, taskID).Scan(&t)
return t, err
}
+81
View File
@@ -0,0 +1,81 @@
package store
import (
"context"
"testing"
)
func TestTaskScheduleAndBroken(t *testing.T) {
s := testStore(t)
ctx := context.Background()
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
// Defaults: no schedule, not broken.
tk, _ := s.GetTask(ctx, taskID)
if tk.ScheduleIntervalSeconds != 0 || tk.Broken || tk.ScheduleAnchor != nil {
t.Fatalf("defaults: interval=%d broken=%v anchor=%v", tk.ScheduleIntervalSeconds, tk.Broken, tk.ScheduleAnchor)
}
// Enable → interval set, anchor stamped, appears in schedulable list.
if err := s.SetTaskSchedule(ctx, taskID, 3600); err != nil {
t.Fatalf("set schedule: %v", err)
}
tk, _ = s.GetTask(ctx, taskID)
if tk.ScheduleIntervalSeconds != 3600 || tk.ScheduleAnchor == nil {
t.Fatalf("after enable: interval=%d anchor=%v", tk.ScheduleIntervalSeconds, tk.ScheduleAnchor)
}
sch, _ := s.ListSchedulableTasks(ctx)
if len(sch) != 1 || sch[0].ID != taskID || sch[0].IntervalSeconds != 3600 || sch[0].LastFinished != nil {
t.Fatalf("schedulable: %+v", sch)
}
// Breaker → broken true, interval 0, drops out of schedulable list.
if err := s.SetTaskBroken(ctx, taskID); err != nil {
t.Fatalf("set broken: %v", err)
}
tk, _ = s.GetTask(ctx, taskID)
if !tk.Broken || tk.ScheduleIntervalSeconds != 0 {
t.Fatalf("after break: broken=%v interval=%d", tk.Broken, tk.ScheduleIntervalSeconds)
}
if sch, _ := s.ListSchedulableTasks(ctx); len(sch) != 0 {
t.Fatalf("broken task still schedulable: %+v", sch)
}
// Re-enable clears broken.
_ = s.SetTaskSchedule(ctx, taskID, 21600)
if tk, _ = s.GetTask(ctx, taskID); tk.Broken {
t.Fatalf("re-enable did not clear broken")
}
}
func TestListRunsByTaskWithTrigger(t *testing.T) {
s := testStore(t)
ctx := context.Background()
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
r1, _ := s.CreateRun(ctx, taskID, "manual")
_ = s.FinishRun(ctx, r1, "done", 5, 1, 0)
_, _ = s.CreateRun(ctx, taskID, "scheduled") // still running (no finish)
runs, err := s.ListRunsByTask(ctx, taskID)
if err != nil || len(runs) != 2 {
t.Fatalf("list runs: %v len=%d", err, len(runs))
}
// newest first
if runs[0].Trigger != "scheduled" || runs[0].FinishedAt != nil {
t.Fatalf("run[0]: %+v", runs[0])
}
if runs[1].Trigger != "manual" || runs[1].FinishedAt == nil || runs[1].TotalCopied != 5 {
t.Fatalf("run[1]: %+v", runs[1])
}
// Last finished-at reflects the finished manual run.
lf, _ := s.LastFinishedRunAt(ctx, taskID)
if lf == nil {
t.Fatalf("LastFinishedRunAt nil, want the finished run's time")
}
}
+72 -11
View File
@@ -1,14 +1,20 @@
package store
import "context"
import (
"context"
"time"
)
type Task struct {
ID int64 `json:"id"`
Name string `json:"name"`
SrcEndpointID int64 `json:"src_endpoint_id"`
DstEndpointID int64 `json:"dst_endpoint_id"`
Status string `json:"status"`
FolderMapping map[string]string `json:"folder_mapping"`
ID int64 `json:"id"`
Name string `json:"name"`
SrcEndpointID int64 `json:"src_endpoint_id"`
DstEndpointID int64 `json:"dst_endpoint_id"`
Status string `json:"status"`
FolderMapping map[string]string `json:"folder_mapping"`
ScheduleIntervalSeconds int64 `json:"schedule_interval_seconds"`
ScheduleAnchor *time.Time `json:"-"`
Broken bool `json:"broken"`
}
func (s *Store) CreateTask(ctx context.Context, t Task) (int64, error) {
@@ -26,9 +32,11 @@ func (s *Store) CreateTask(ctx context.Context, t Task) (int64, error) {
func (s *Store) GetTask(ctx context.Context, id int64) (Task, error) {
var t Task
err := s.Pool.QueryRow(ctx,
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping,
schedule_interval_seconds, schedule_anchor, broken
FROM tasks WHERE id=$1`, id).
Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping)
Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping,
&t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken)
return t, err
}
@@ -40,7 +48,8 @@ func (s *Store) DeleteTask(ctx context.Context, id int64) error {
func (s *Store) ListTasks(ctx context.Context) ([]Task, error) {
rows, err := s.Pool.Query(ctx,
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping,
schedule_interval_seconds, schedule_anchor, broken
FROM tasks ORDER BY id DESC`)
if err != nil {
return nil, err
@@ -49,7 +58,8 @@ func (s *Store) ListTasks(ctx context.Context) ([]Task, error) {
out := []Task{}
for rows.Next() {
var t Task
if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping); err != nil {
if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping,
&t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken); err != nil {
return nil, err
}
out = append(out, t)
@@ -80,3 +90,54 @@ func (s *Store) TryMarkTaskRunning(ctx context.Context, id int64) (bool, error)
}
return ct.RowsAffected() == 1, nil
}
// SetTaskSchedule sets the recurrence interval (0 = off), stamps the anchor when
// enabling (the first run is one interval after this), and clears broken — any
// explicit schedule change un-breaks the task.
func (s *Store) SetTaskSchedule(ctx context.Context, id, intervalSeconds int64) error {
_, err := s.Pool.Exec(ctx,
`UPDATE tasks SET schedule_interval_seconds=$2,
schedule_anchor = CASE WHEN $2 > 0 THEN now() ELSE schedule_anchor END,
broken = false
WHERE id=$1`, id, intervalSeconds)
return err
}
// SetTaskBroken trips the schedule breaker: disables the schedule and flags the
// task for operator attention.
func (s *Store) SetTaskBroken(ctx context.Context, id int64) error {
_, err := s.Pool.Exec(ctx,
`UPDATE tasks SET broken=true, schedule_interval_seconds=0 WHERE id=$1`, id)
return err
}
// SchedulableTask is the per-tick decision input for the scheduler.
type SchedulableTask struct {
ID int64
IntervalSeconds int64
Anchor time.Time
LastFinished *time.Time
}
// ListSchedulableTasks returns tasks eligible to auto-run: schedule on, not
// broken, not currently running — each joined with its last finished run time.
func (s *Store) ListSchedulableTasks(ctx context.Context) ([]SchedulableTask, error) {
rows, err := s.Pool.Query(ctx,
`SELECT t.id, t.schedule_interval_seconds, t.schedule_anchor,
(SELECT max(finished_at) FROM runs r WHERE r.task_id=t.id AND r.finished_at IS NOT NULL)
FROM tasks t
WHERE t.schedule_interval_seconds > 0 AND NOT t.broken AND t.status <> 'running'`)
if err != nil {
return nil, err
}
defer rows.Close()
out := []SchedulableTask{}
for rows.Next() {
var st SchedulableTask
if err := rows.Scan(&st.ID, &st.IntervalSeconds, &st.Anchor, &st.LastFinished); err != nil {
return nil, err
}
out = append(out, st)
}
return out, rows.Err()
}
+4
View File
@@ -0,0 +1,4 @@
ALTER TABLE runs DROP COLUMN trigger;
ALTER TABLE tasks DROP COLUMN broken;
ALTER TABLE tasks DROP COLUMN schedule_anchor;
ALTER TABLE tasks DROP COLUMN schedule_interval_seconds;
+4
View File
@@ -0,0 +1,4 @@
ALTER TABLE tasks ADD COLUMN schedule_interval_seconds INT NOT NULL DEFAULT 0;
ALTER TABLE tasks ADD COLUMN schedule_anchor TIMESTAMPTZ;
ALTER TABLE tasks ADD COLUMN broken BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE runs ADD COLUMN trigger TEXT NOT NULL DEFAULT 'manual';
+20
View File
@@ -18,6 +18,9 @@ export interface Task {
dst_endpoint_id: number
status: string
folder_mapping?: Record<string, string>
schedule_interval_seconds?: number
broken?: boolean
next_run_at?: string | null
}
export type TestStatus = 'pending' | 'ok' | 'fail' | string
@@ -137,6 +140,23 @@ export const testAccounts = (id: number) => api(`/api/tasks/${id}/test`, { metho
export const runTask = (id: number) => api(`/api/tasks/${id}/run`, { method: 'POST' })
export interface Run {
id: number
task_id: number
status: string
started_at: string
finished_at: string | null
total_copied: number
total_skipped: number
total_errors: number
trigger: string
}
export const setTaskSchedule = (taskId: number, intervalSeconds: number) =>
api(`/api/tasks/${taskId}/schedule`, { ...jsonBody({ interval_seconds: intervalSeconds }), method: 'PUT' })
export const listRuns = (taskId: number) => api<Run[]>(`/api/tasks/${taskId}/runs`)
export const importCSV = (id: number, file: File) => {
const fd = new FormData()
fd.append('file', file)
+33
View File
@@ -605,6 +605,39 @@ table.tbl a.rowlink:focus-visible {
}
.badge-info .dot { background: var(--info); animation: pulse 1.4s ease-in-out infinite; }
/* ---------- schedule row ---------- */
.sched-row {
display: flex;
align-items: center;
gap: 12px;
margin-top: 14px;
flex-wrap: wrap;
}
.sched-row label {
font-size: 11px;
font-weight: 700;
letter-spacing: 0.1em;
text-transform: uppercase;
color: var(--fg-dim);
}
.sched-row select {
background: var(--bg-inset);
border: 1px solid var(--border);
color: var(--fg);
padding: 7px 10px;
font-size: 13px;
border-radius: 2px;
}
.sched-next {
font-size: 12px;
color: var(--fg-dim);
font-variant-numeric: tabular-nums;
}
/* ---------- tables ---------- */
.tbl-wrap {
+55
View File
@@ -0,0 +1,55 @@
import { useEffect, useState } from 'react'
import { Modal } from './Modal'
import { StatusBadge } from './StatusBadge'
import { listRuns, type Run } from '../api'
const fmt = (iso: string | null) => (iso ? new Date(iso).toLocaleString() : '—')
export function RunLogModal({ taskId, open, onClose }: { taskId: number; open: boolean; onClose: () => void }) {
const [runs, setRuns] = useState<Run[] | null>(null)
useEffect(() => {
if (!open) return
setRuns(null)
listRuns(taskId).then((r) => setRuns(r ?? [])).catch(() => setRuns([]))
}, [open, taskId])
return (
<Modal open={open} title="Run log" onClose={onClose} size="lg">
<div className="tbl-wrap">
<table className="tbl">
<thead>
<tr>
<th>Started</th>
<th>Finished</th>
<th>Trigger</th>
<th>Status</th>
<th>Copied</th>
<th>Skipped</th>
<th>Errors</th>
</tr>
</thead>
<tbody>
{runs === null ? (
<tr className="empty-row"><td colSpan={7}>loading</td></tr>
) : runs.length === 0 ? (
<tr className="empty-row"><td colSpan={7}>no runs yet</td></tr>
) : (
runs.map((r) => (
<tr key={r.id}>
<td>{fmt(r.started_at)}</td>
<td>{fmt(r.finished_at)}</td>
<td>{r.trigger}</td>
<td><StatusBadge status={r.status} /></td>
<td className="num-cell">{r.total_copied}</td>
<td className="num-cell">{r.total_skipped}</td>
<td className="num-cell">{r.total_errors}</td>
</tr>
))
)}
</tbody>
</table>
</div>
</Modal>
)
}
+38 -2
View File
@@ -1,9 +1,10 @@
import { useEffect, useRef, useState, type ChangeEvent, type FormEvent } from 'react'
import { cancelAccount, createAccount, deleteAccount, getTask, importCSV, probeAccountFolders, probeFolders, runTask, setAccountFolderMapping, testAccounts, type TaskDetail as TaskDetailData } from '../api'
import { cancelAccount, createAccount, deleteAccount, getTask, importCSV, probeAccountFolders, probeFolders, runTask, setAccountFolderMapping, setTaskSchedule, testAccounts, type TaskDetail as TaskDetailData } from '../api'
import { connectTaskWS, type TaskEvent } from '../ws'
import { StatusBadge } from '../components/StatusBadge'
import { useConfirm } from '../components/ConfirmProvider'
import { FolderMappingModal } from '../components/FolderMappingModal'
import { RunLogModal } from '../components/RunLogModal'
const emptyAccount = { src_login: '', src_pass: '', dst_login: '', dst_pass: '' }
@@ -85,6 +86,7 @@ export function TaskDetail({ id }: { id: number }) {
const confirm = useConfirm()
const [error, setError] = useState<string | null>(null)
const [live, setLive] = useState<Record<number, LiveProgress>>({})
const [showRuns, setShowRuns] = useState(false)
const fileInputRef = useRef<HTMLInputElement>(null)
function reload() {
@@ -164,7 +166,7 @@ export function TaskDetail({ id }: { id: number }) {
}
// Structural events refresh the persisted view; `progress` is covered by live state.
if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled', 'plan'].includes(ev.type)) {
if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled', 'plan', 'task_broken'].includes(ev.type)) {
reload()
}
}),
@@ -334,6 +336,16 @@ export function TaskDetail({ id }: { id: number }) {
}
}
async function onSchedule(intervalSeconds: number) {
setError(null)
try {
await setTaskSchedule(id, intervalSeconds)
reload()
} catch (err) {
setError(err instanceof Error ? err.message : 'Failed to set schedule')
}
}
if (notFound) {
return (
<div className="panel">
@@ -374,6 +386,7 @@ export function TaskDetail({ id }: { id: number }) {
</h1>
</div>
<StatusBadge status={task.status} />
{task.broken && <span className="badge badge-fail" style={{ marginLeft: 8 }}><span className="dot" />broken</span>}
</div>
<div className="panel">
@@ -406,6 +419,28 @@ export function TaskDetail({ id }: { id: number }) {
</button>
{!allTested && accounts.length > 0 && <span className="hint">run unlocks once every account tests OK on both sides</span>}
</div>
<div className="sched-row">
<label htmlFor="sched">Schedule</label>
<select
id="sched"
value={task.schedule_interval_seconds ?? 0}
onChange={(e) => onSchedule(Number(e.target.value))}
>
<option value={0}>Off</option>
<option value={3600}>Every 1h</option>
<option value={10800}>Every 3h</option>
<option value={21600}>Every 6h</option>
<option value={43200}>Every 12h</option>
<option value={86400}>Every 24h</option>
</select>
{task.next_run_at && (
<span className="sched-next">Next run: {new Date(task.next_run_at).toLocaleString()}</span>
)}
{task.broken && <span className="badge badge-fail"><span className="dot" />broken</span>}
<button type="button" className="link-btn" onClick={() => setShowRuns(true)}>
runs
</button>
</div>
</div>
<div className="panel-grid">
@@ -635,6 +670,7 @@ export function TaskDetail({ id }: { id: number }) {
onConfirm={saveEditMapping}
/>
)}
<RunLogModal taskId={id} open={showRuns} onClose={() => setShowRuns(false)} />
</>
)
}
+1
View File
@@ -163,6 +163,7 @@ export function Tasks() {
</td>
<td>
<StatusBadge status={t.status} />
{t.broken && <span className="badge badge-fail" style={{ marginLeft: 8 }}><span className="dot" />broken</span>}
</td>
<td className="num-cell">
<button