From fa72f1b323ba9e98f9c26fe9b6136a2ea1669739 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Thu, 2 Jul 2026 12:57:39 +0700 Subject: [PATCH] fix: recover from phantom 'running' state after crash/restart The run-cancel registry is in-memory; a container restart mid-run leaves accounts/tasks persisted as 'running' with no goroutine, wedging cancel (not-in-map -> 409) and blocking remove/re-run. - startup: ResetRunningOnStartup clears stale 'running' -> 'idle' on boot - cancel handler: when no live goroutine, ClearStuckAccount + ReconcileTaskStatus reset the stuck account (and its task) instead of returning 409 Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd --- cmd/server/main.go | 8 +++++ internal/httpapi/run.go | 23 +++++++++++- internal/store/recovery.go | 44 +++++++++++++++++++++++ internal/store/recovery_test.go | 62 +++++++++++++++++++++++++++++++++ 4 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 internal/store/recovery.go create mode 100644 internal/store/recovery_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index c97dcc4..3ee890f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -33,6 +33,14 @@ func main() { slog.Error("store", "err", err) os.Exit(1) } + // Clear phantom "running" left by a prior crash/restart — no goroutines + // survive a restart, so any persisted "running" is stale. + if t, a, err := st.ResetRunningOnStartup(context.Background()); err != nil { + slog.Error("reset stale running", "err", err) + os.Exit(1) + } else if t > 0 || a > 0 { + slog.Warn("reset stale running statuses on startup", "tasks", t, "accounts", a) + } hub := wshub.New() orch := orchestrator.New(st, hub, cfg.EncKey, cfg.WorkerConcurrency) srv := httpapi.NewServer(cfg, st, orch, hub) diff --git a/internal/httpapi/run.go b/internal/httpapi/run.go index 4c44a26..f57439e 100644 --- a/internal/httpapi/run.go +++ b/internal/httpapi/run.go @@ -86,15 +86,36 @@ func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleCancelAccount(w http.ResponseWriter, r *http.Request) { + taskID, err := pathID(r, "id") + if err != nil { + http.Error(w, "bad id", http.StatusBadRequest) + return + } accID, err := pathID(r, "accountId") if err != nil { http.Error(w, "bad account id", http.StatusBadRequest) return } - if !s.orch.CancelAccount(accID) { + // Live in-flight copy: signal it to stop. + if s.orch.CancelAccount(accID) { + w.WriteHeader(http.StatusAccepted) + return + } + // No live goroutine but the DB may still say "running" (stale state left by + // a crash/restart): clear it so the account/task become usable again. + cleared, err := s.store.ClearStuckAccount(r.Context(), accID) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if !cleared { http.Error(w, "account is not running", http.StatusConflict) return } + if err := s.store.ReconcileTaskStatus(r.Context(), taskID); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } w.WriteHeader(http.StatusAccepted) } diff --git a/internal/store/recovery.go b/internal/store/recovery.go new file mode 100644 index 0000000..06f8b7e --- /dev/null +++ b/internal/store/recovery.go @@ -0,0 +1,44 @@ +package store + +import "context" + +// ResetRunningOnStartup clears phantom "running" statuses left behind when the +// process died mid-run (crash, container restart). A fresh process has no +// in-flight goroutines, so any persisted "running" is stale and would otherwise +// wedge the task (the run-guard refuses to start, and accounts can't be edited). +// Returns how many task and account rows were reset. +func (s *Store) ResetRunningOnStartup(ctx context.Context) (tasks int64, accounts int64, err error) { + ct, err := s.Pool.Exec(ctx, `UPDATE accounts SET status='idle' WHERE status='running'`) + if err != nil { + return 0, 0, err + } + accounts = ct.RowsAffected() + ct, err = s.Pool.Exec(ctx, `UPDATE tasks SET status='idle' WHERE status='running'`) + if err != nil { + return 0, accounts, err + } + tasks = ct.RowsAffected() + return tasks, accounts, nil +} + +// ClearStuckAccount resets an account stuck in "running" (no live goroutine) to +// "idle". Returns true if a stuck row was actually cleared. +func (s *Store) ClearStuckAccount(ctx context.Context, accountID int64) (bool, error) { + ct, err := s.Pool.Exec(ctx, `UPDATE accounts SET status='idle' WHERE id=$1 AND status='running'`, accountID) + if err != nil { + return false, err + } + return ct.RowsAffected() == 1, nil +} + +// ReconcileTaskStatus moves a task out of "running" once none of its accounts +// are still running — used after clearing a stuck account so the task can be +// re-run or deleted again. +func (s *Store) ReconcileTaskStatus(ctx context.Context, taskID int64) error { + _, err := s.Pool.Exec(ctx, + `UPDATE tasks SET status='idle' + WHERE id=$1 AND status='running' + AND NOT EXISTS (SELECT 1 FROM accounts WHERE task_id=$1 AND status='running')`, + taskID) + return err +} diff --git a/internal/store/recovery_test.go b/internal/store/recovery_test.go new file mode 100644 index 0000000..33aa3d9 --- /dev/null +++ b/internal/store/recovery_test.go @@ -0,0 +1,62 @@ +package store + +import ( + "context" + "testing" +) + +func TestResetRunningOnStartup(t *testing.T) { + s := testStore(t) + ctx := context.Background() + e1, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "s", Host: "a", Port: 993, TLSMode: "ssl"}) + e2, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "d", Host: "b", Port: 993, TLSMode: "ssl"}) + taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: e1, DstEndpointID: e2}) + accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "v", DstPassEnc: "y"}) + // simulate a crash mid-run + _ = s.SetTaskStatus(ctx, taskID, "running") + _ = s.SetAccountStatus(ctx, accID, "running") + + tn, an, err := s.ResetRunningOnStartup(ctx) + if err != nil { + t.Fatalf("reset: %v", err) + } + if tn != 1 || an != 1 { + t.Fatalf("reset counts tasks=%d accounts=%d, want 1/1", tn, an) + } + task, _ := s.GetTask(ctx, taskID) + if task.Status == "running" { + t.Fatal("task still running after reset") + } + accs, _ := s.ListAccountsByTask(ctx, taskID) + if accs[0].Status == "running" { + t.Fatal("account still running after reset") + } +} + +func TestClearStuckAccountAndReconcile(t *testing.T) { + s := testStore(t) + ctx := context.Background() + e1, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "s", Host: "a", Port: 993, TLSMode: "ssl"}) + e2, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "d", Host: "b", Port: 993, TLSMode: "ssl"}) + taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: e1, DstEndpointID: e2}) + accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "v", DstPassEnc: "y"}) + _ = s.SetTaskStatus(ctx, taskID, "running") + _ = s.SetAccountStatus(ctx, accID, "running") + + cleared, err := s.ClearStuckAccount(ctx, accID) + if err != nil || !cleared { + t.Fatalf("clear stuck: cleared=%v err=%v", cleared, err) + } + // second call finds nothing to clear + again, _ := s.ClearStuckAccount(ctx, accID) + if again { + t.Fatal("second clear should be false") + } + if err := s.ReconcileTaskStatus(ctx, taskID); err != nil { + t.Fatalf("reconcile: %v", err) + } + task, _ := s.GetTask(ctx, taskID) + if task.Status == "running" { + t.Fatal("task should no longer be running after reconcile") + } +}