fix: anchor floor on re-enable; breaker on scheduled panic; propagate next-run err; reject negative interval; close phantom runs on startup
This commit is contained in:
@@ -53,7 +53,12 @@ func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) {
|
|||||||
for _, a := range accs {
|
for _, a := range accs {
|
||||||
views = append(views, accountDTO(a))
|
views = append(views, accountDTO(a))
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, map[string]any{"task": s.taskViewFor(r.Context(), task), "accounts": views})
|
tv, err := s.taskViewFor(r.Context(), task)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"task": tv, "accounts": views})
|
||||||
}
|
}
|
||||||
|
|
||||||
// taskView augments the stored task with the server-computed next scheduled run
|
// taskView augments the stored task with the server-computed next scheduled run
|
||||||
@@ -63,15 +68,17 @@ type taskView struct {
|
|||||||
NextRunAt *string `json:"next_run_at"`
|
NextRunAt *string `json:"next_run_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) taskViewFor(ctx context.Context, t store.Task) taskView {
|
func (s *Server) taskViewFor(ctx context.Context, t store.Task) (taskView, error) {
|
||||||
var nextRunAt *string
|
if t.ScheduleIntervalSeconds <= 0 || t.Broken || t.Status == "running" || t.ScheduleAnchor == nil {
|
||||||
if t.ScheduleIntervalSeconds > 0 && !t.Broken && t.Status != "running" && t.ScheduleAnchor != nil {
|
return taskView{Task: t}, nil
|
||||||
last, _ := s.store.LastFinishedRunAt(ctx, t.ID)
|
|
||||||
n := scheduler.NextRun(time.Duration(t.ScheduleIntervalSeconds)*time.Second, *t.ScheduleAnchor, last)
|
|
||||||
str := n.UTC().Format(time.RFC3339)
|
|
||||||
nextRunAt = &str
|
|
||||||
}
|
}
|
||||||
return taskView{Task: t, NextRunAt: nextRunAt}
|
last, err := s.store.LastFinishedRunAt(ctx, t.ID)
|
||||||
|
if err != nil {
|
||||||
|
return taskView{}, err
|
||||||
|
}
|
||||||
|
n := scheduler.NextRun(time.Duration(t.ScheduleIntervalSeconds)*time.Second, *t.ScheduleAnchor, last)
|
||||||
|
str := n.UTC().Format(time.RFC3339)
|
||||||
|
return taskView{Task: t, NextRunAt: &str}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleSetSchedule enables/changes/disables a task's recurring schedule.
|
// handleSetSchedule enables/changes/disables a task's recurring schedule.
|
||||||
@@ -89,6 +96,10 @@ func (s *Server) handleSetSchedule(w http.ResponseWriter, r *http.Request) {
|
|||||||
http.Error(w, "bad json", http.StatusBadRequest)
|
http.Error(w, "bad json", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if body.IntervalSeconds < 0 {
|
||||||
|
http.Error(w, "interval_seconds must be >= 0", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
if body.IntervalSeconds > 0 {
|
if body.IntervalSeconds > 0 {
|
||||||
accs, err := s.store.ListAccountsByTask(r.Context(), taskID)
|
accs, err := s.store.ListAccountsByTask(r.Context(), taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -197,6 +197,11 @@ func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64,
|
|||||||
slog.Error("run coordinator panicked", "task", task.ID, "run", runID, "panic", r)
|
slog.Error("run coordinator panicked", "task", task.ID, "run", runID, "panic", r)
|
||||||
_ = o.store.FinishRun(ctx, runID, "error", 0, 0, 0)
|
_ = o.store.FinishRun(ctx, runID, "error", 0, 0, 0)
|
||||||
_ = o.store.SetTaskStatus(ctx, task.ID, "error")
|
_ = o.store.SetTaskStatus(ctx, task.ID, "error")
|
||||||
|
if trigger == "scheduled" {
|
||||||
|
_ = o.store.SetTaskBroken(ctx, task.ID)
|
||||||
|
o.hub.Publish(wshub.Event{Type: "task_broken", TaskID: task.ID,
|
||||||
|
Data: map[string]any{"task_id": task.ID, "errors": int64(0)}})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ const pollInterval = 30 * time.Second
|
|||||||
// run, or one interval after the schedule anchor if it has never completed one.
|
// run, or one interval after the schedule anchor if it has never completed one.
|
||||||
func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time {
|
func NextRun(interval time.Duration, anchor time.Time, lastFinished *time.Time) time.Time {
|
||||||
base := anchor
|
base := anchor
|
||||||
if lastFinished != nil {
|
if lastFinished != nil && lastFinished.After(anchor) {
|
||||||
base = *lastFinished
|
base = *lastFinished
|
||||||
}
|
}
|
||||||
return base.Add(interval)
|
return base.Add(interval)
|
||||||
|
|||||||
@@ -19,6 +19,12 @@ func TestNextRun(t *testing.T) {
|
|||||||
if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) {
|
if got := NextRun(time.Hour, anchor, &fin); !got.Equal(fin.Add(time.Hour)) {
|
||||||
t.Fatalf("recurring: got %v", got)
|
t.Fatalf("recurring: got %v", got)
|
||||||
}
|
}
|
||||||
|
// Re-enable: anchor is newer than a stale lastFinished from before re-enable →
|
||||||
|
// use anchor, not the stale lastFinished (which would fire immediately/in the past).
|
||||||
|
staleFin := anchor.Add(-2 * time.Hour)
|
||||||
|
if got := NextRun(time.Hour, anchor, &staleFin); !got.Equal(anchor.Add(time.Hour)) {
|
||||||
|
t.Fatalf("re-enable: got %v, want %v", got, anchor.Add(time.Hour))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDueTaskIDs(t *testing.T) {
|
func TestDueTaskIDs(t *testing.T) {
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ import "context"
|
|||||||
// process died mid-run (crash, container restart). A fresh process has no
|
// process died mid-run (crash, container restart). A fresh process has no
|
||||||
// in-flight goroutines, so any persisted "running" is stale and would otherwise
|
// in-flight goroutines, so any persisted "running" is stale and would otherwise
|
||||||
// wedge the task (the run-guard refuses to start, and accounts can't be edited).
|
// wedge the task (the run-guard refuses to start, and accounts can't be edited).
|
||||||
|
// It also closes any run rows left stuck in "running" (finished_at NULL), which
|
||||||
|
// would otherwise surface as perpetually "running" in the run-log modal.
|
||||||
// Returns how many task and account rows were reset.
|
// Returns how many task and account rows were reset.
|
||||||
func (s *Store) ResetRunningOnStartup(ctx context.Context) (tasks int64, accounts int64, err error) {
|
func (s *Store) ResetRunningOnStartup(ctx context.Context) (tasks int64, accounts int64, err error) {
|
||||||
ct, err := s.Pool.Exec(ctx, `UPDATE accounts SET status='idle' WHERE status='running'`)
|
ct, err := s.Pool.Exec(ctx, `UPDATE accounts SET status='idle' WHERE status='running'`)
|
||||||
@@ -18,6 +20,11 @@ func (s *Store) ResetRunningOnStartup(ctx context.Context) (tasks int64, account
|
|||||||
return 0, accounts, err
|
return 0, accounts, err
|
||||||
}
|
}
|
||||||
tasks = ct.RowsAffected()
|
tasks = ct.RowsAffected()
|
||||||
|
_, err = s.Pool.Exec(ctx,
|
||||||
|
`UPDATE runs SET status='error', finished_at=now() WHERE finished_at IS NULL`)
|
||||||
|
if err != nil {
|
||||||
|
return tasks, accounts, err
|
||||||
|
}
|
||||||
return tasks, accounts, nil
|
return tasks, accounts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ func TestResetRunningOnStartup(t *testing.T) {
|
|||||||
// simulate a crash mid-run
|
// simulate a crash mid-run
|
||||||
_ = s.SetTaskStatus(ctx, taskID, "running")
|
_ = s.SetTaskStatus(ctx, taskID, "running")
|
||||||
_ = s.SetAccountStatus(ctx, accID, "running")
|
_ = s.SetAccountStatus(ctx, accID, "running")
|
||||||
|
runID, _ := s.CreateRun(ctx, taskID, "manual") // phantom run: never finished
|
||||||
|
|
||||||
tn, an, err := s.ResetRunningOnStartup(ctx)
|
tn, an, err := s.ResetRunningOnStartup(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -31,6 +32,19 @@ func TestResetRunningOnStartup(t *testing.T) {
|
|||||||
if accs[0].Status == "running" {
|
if accs[0].Status == "running" {
|
||||||
t.Fatal("account still running after reset")
|
t.Fatal("account still running after reset")
|
||||||
}
|
}
|
||||||
|
runs, _ := s.ListRunsByTask(ctx, taskID)
|
||||||
|
var found bool
|
||||||
|
for _, r := range runs {
|
||||||
|
if r.ID == runID {
|
||||||
|
found = true
|
||||||
|
if r.Status == "running" || r.FinishedAt == nil {
|
||||||
|
t.Fatalf("phantom run %d still running: status=%s finished_at=%v", runID, r.Status, r.FinishedAt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Fatalf("run %d not found", runID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestClearStuckAccountAndReconcile(t *testing.T) {
|
func TestClearStuckAccountAndReconcile(t *testing.T) {
|
||||||
|
|||||||
@@ -56,6 +56,8 @@ func (s *Store) ListRunsByTask(ctx context.Context, taskID int64) ([]Run, error)
|
|||||||
|
|
||||||
// LastFinishedRunAt returns the most recent finished run's timestamp, or nil if
|
// LastFinishedRunAt returns the most recent finished run's timestamp, or nil if
|
||||||
// the task has never completed a run — the baseline for the next scheduled run.
|
// the task has never completed a run — the baseline for the next scheduled run.
|
||||||
|
// The same max(finished_at) rule is also inlined in ListSchedulableTasks's
|
||||||
|
// subquery — keep the two in sync if this changes.
|
||||||
func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) {
|
func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) {
|
||||||
var t *time.Time
|
var t *time.Time
|
||||||
err := s.Pool.QueryRow(ctx,
|
err := s.Pool.QueryRow(ctx,
|
||||||
|
|||||||
Reference in New Issue
Block a user