diff --git a/internal/httpapi/tasks.go b/internal/httpapi/tasks.go index 17dcbd1..f354e06 100644 --- a/internal/httpapi/tasks.go +++ b/internal/httpapi/tasks.go @@ -53,7 +53,12 @@ func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) { for _, a := range accs { views = append(views, accountDTO(a)) } - writeJSON(w, http.StatusOK, map[string]any{"task": s.taskViewFor(r.Context(), task), "accounts": views}) + tv, err := s.taskViewFor(r.Context(), task) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, http.StatusOK, map[string]any{"task": tv, "accounts": views}) } // taskView augments the stored task with the server-computed next scheduled run @@ -63,15 +68,17 @@ type taskView struct { NextRunAt *string `json:"next_run_at"` } -func (s *Server) taskViewFor(ctx context.Context, t store.Task) taskView { - var nextRunAt *string - if t.ScheduleIntervalSeconds > 0 && !t.Broken && t.Status != "running" && t.ScheduleAnchor != nil { - last, _ := s.store.LastFinishedRunAt(ctx, t.ID) - n := scheduler.NextRun(time.Duration(t.ScheduleIntervalSeconds)*time.Second, *t.ScheduleAnchor, last) - str := n.UTC().Format(time.RFC3339) - nextRunAt = &str +func (s *Server) taskViewFor(ctx context.Context, t store.Task) (taskView, error) { + if t.ScheduleIntervalSeconds <= 0 || t.Broken || t.Status == "running" || t.ScheduleAnchor == nil { + return taskView{Task: t}, nil } - return taskView{Task: t, NextRunAt: nextRunAt} + last, err := s.store.LastFinishedRunAt(ctx, t.ID) + if err != nil { + return taskView{}, err + } + n := scheduler.NextRun(time.Duration(t.ScheduleIntervalSeconds)*time.Second, *t.ScheduleAnchor, last) + str := n.UTC().Format(time.RFC3339) + return taskView{Task: t, NextRunAt: &str}, nil } // handleSetSchedule enables/changes/disables a task's recurring schedule. @@ -89,6 +96,10 @@ func (s *Server) handleSetSchedule(w http.ResponseWriter, r *http.Request) { http.Error(w, "bad json", http.StatusBadRequest) return } + if body.IntervalSeconds < 0 { + http.Error(w, "interval_seconds must be >= 0", http.StatusBadRequest) + return + } if body.IntervalSeconds > 0 { accs, err := s.store.ListAccountsByTask(r.Context(), taskID) if err != nil { diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 2a3e5e8..123b1a0 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -197,6 +197,11 @@ func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, slog.Error("run coordinator panicked", "task", task.ID, "run", runID, "panic", r) _ = o.store.FinishRun(ctx, runID, "error", 0, 0, 0) _ = o.store.SetTaskStatus(ctx, task.ID, "error") + if trigger == "scheduled" { + _ = o.store.SetTaskBroken(ctx, task.ID) + o.hub.Publish(wshub.Event{Type: "task_broken", TaskID: task.ID, + Data: map[string]any{"task_id": task.ID, "errors": int64(0)}}) + } } }() diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index adf76f9..6844ced 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -19,7 +19,7 @@ const pollInterval = 30 * time.Second // run, or one interval after the schedule anchor if it has never completed one. func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time { base := anchor - if lastFinished != nil { + if lastFinished != nil && lastFinished.After(anchor) { base = *lastFinished } return base.Add(interval) diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index dd34e80..5dcef68 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -19,6 +19,12 @@ func TestNextRun(t *testing.T) { if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) { t.Fatalf("recurring: got %v", got) } + // Re-enable: anchor is newer than a stale lastFinished from before re-enable → + // use anchor, not the stale lastFinished (which would fire immediately/in the past). + staleFin := anchor.Add(-2 * time.Hour) + if got := NextRun(time.Hour, anchor, &staleFin); !got.Equal(anchor.Add(time.Hour)) { + t.Fatalf("re-enable: got %v, want %v", got, anchor.Add(time.Hour)) + } } func TestDueTaskIDs(t *testing.T) { diff --git a/internal/store/recovery.go b/internal/store/recovery.go index 06f8b7e..2e6dac4 100644 --- a/internal/store/recovery.go +++ b/internal/store/recovery.go @@ -6,6 +6,8 @@ import "context" // process died mid-run (crash, container restart). A fresh process has no // in-flight goroutines, so any persisted "running" is stale and would otherwise // wedge the task (the run-guard refuses to start, and accounts can't be edited). +// It also closes any run rows left stuck in "running" (finished_at NULL), which +// would otherwise surface as perpetually "running" in the run-log modal. // Returns how many task and account rows were reset. func (s *Store) ResetRunningOnStartup(ctx context.Context) (tasks int64, accounts int64, err error) { ct, err := s.Pool.Exec(ctx, `UPDATE accounts SET status='idle' WHERE status='running'`) @@ -18,6 +20,11 @@ func (s *Store) ResetRunningOnStartup(ctx context.Context) (tasks int64, account return 0, accounts, err } tasks = ct.RowsAffected() + _, err = s.Pool.Exec(ctx, + `UPDATE runs SET status='error', finished_at=now() WHERE finished_at IS NULL`) + if err != nil { + return tasks, accounts, err + } return tasks, accounts, nil } diff --git a/internal/store/recovery_test.go b/internal/store/recovery_test.go index 33aa3d9..907b560 100644 --- a/internal/store/recovery_test.go +++ b/internal/store/recovery_test.go @@ -15,6 +15,7 @@ func TestResetRunningOnStartup(t *testing.T) { // simulate a crash mid-run _ = s.SetTaskStatus(ctx, taskID, "running") _ = s.SetAccountStatus(ctx, accID, "running") + runID, _ := s.CreateRun(ctx, taskID, "manual") // phantom run: never finished tn, an, err := s.ResetRunningOnStartup(ctx) if err != nil { @@ -31,6 +32,19 @@ func TestResetRunningOnStartup(t *testing.T) { if accs[0].Status == "running" { t.Fatal("account still running after reset") } + runs, _ := s.ListRunsByTask(ctx, taskID) + var found bool + for _, r := range runs { + if r.ID == runID { + found = true + if r.Status == "running" || r.FinishedAt == nil { + t.Fatalf("phantom run %d still running: status=%s finished_at=%v", runID, r.Status, r.FinishedAt) + } + } + } + if !found { + t.Fatalf("run %d not found", runID) + } } func TestClearStuckAccountAndReconcile(t *testing.T) { diff --git a/internal/store/runs.go b/internal/store/runs.go index 43ba984..db30bb1 100644 --- a/internal/store/runs.go +++ b/internal/store/runs.go @@ -56,6 +56,8 @@ func (s *Store) ListRunsByTask(ctx context.Context, taskID int64) ([]Run, error) // LastFinishedRunAt returns the most recent finished run's timestamp, or nil if // the task has never completed a run — the baseline for the next scheduled run. +// The same max(finished_at) rule is also inlined in ListSchedulableTasks's +// subquery — keep the two in sync if this changes. func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) { var t *time.Time err := s.Pool.QueryRow(ctx,