From be777dc9080b8e5938a51a470b6840a0d015ddc6 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Fri, 3 Jul 2026 13:08:21 +0700 Subject: [PATCH] feat(orchestrator): thread run trigger; breaker on scheduled runs with errors --- internal/httpapi/run.go | 2 +- internal/orchestrator/breaker_test.go | 21 +++++++++++++++++++++ internal/orchestrator/orchestrator.go | 20 ++++++++++++++++---- 3 files changed, 38 insertions(+), 5 deletions(-) create mode 100644 internal/orchestrator/breaker_test.go diff --git a/internal/httpapi/run.go b/internal/httpapi/run.go index f57439e..2195673 100644 --- a/internal/httpapi/run.go +++ b/internal/httpapi/run.go @@ -69,7 +69,7 @@ func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) { http.Error(w, "bad id", http.StatusBadRequest) return } - runID, err := s.orch.Run(r.Context(), taskID) + runID, err := s.orch.Run(r.Context(), taskID, "manual") if errors.Is(err, orchestrator.ErrNotTested) { http.Error(w, "accounts must pass connection tests first", http.StatusConflict) return diff --git a/internal/orchestrator/breaker_test.go b/internal/orchestrator/breaker_test.go new file mode 100644 index 0000000..8cff613 --- /dev/null +++ b/internal/orchestrator/breaker_test.go @@ -0,0 +1,21 @@ +package orchestrator + +import "testing" + +func TestShouldBreak(t *testing.T) { + cases := []struct { + trigger string + errs int64 + want bool + }{ + {"scheduled", 2, true}, // scheduled run with errors trips the breaker + {"scheduled", 0, false}, // scheduled run, clean — no break + {"manual", 5, false}, // manual run never trips the breaker + {"manual", 0, false}, + } + for _, c := range cases { + if got := shouldBreak(c.trigger, c.errs); got != c.want { + t.Fatalf("shouldBreak(%q,%d)=%v want %v", c.trigger, c.errs, got, c.want) + } + } +} diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 2836b85..2a3e5e8 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -150,7 +150,13 @@ func (o *Orchestrator) testSide(ctx context.Context, ep imapx.Endpoint, accID in }}) } -func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) { +// shouldBreak reports whether a completed run should trip the schedule breaker: +// only scheduled runs that ended with errors. +func shouldBreak(trigger string, totErr int64) bool { + return trigger == "scheduled" && totErr > 0 +} + +func (o *Orchestrator) Run(ctx context.Context, taskID int64, trigger string) (int64, error) { task, err := o.store.GetTask(ctx, taskID) if err != nil { return 0, err @@ -174,18 +180,18 @@ func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) { _ = o.store.SetTaskStatus(ctx, taskID, "error") return 0, err } - runID, err := o.store.CreateRun(ctx, taskID, "manual") + runID, err := o.store.CreateRun(ctx, taskID, trigger) if err != nil { _ = o.store.SetTaskStatus(ctx, taskID, "error") return 0, err } o.hub.Publish(wshub.Event{Type: "run_started", TaskID: taskID, Data: map[string]any{"run_id": runID}}) - go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP) + go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP, trigger) return runID, nil } -func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint) { +func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint, trigger string) { defer func() { if r := recover(); r != nil { slog.Error("run coordinator panicked", "task", task.ID, "run", runID, "panic", r) @@ -236,6 +242,12 @@ func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, _ = o.store.SetTaskStatus(ctx, task.ID, status) o.hub.Publish(wshub.Event{Type: "run_done", TaskID: task.ID, Data: map[string]any{"run_id": runID, "copied": totCopied, "skipped": totSkipped, "errors": totErr}}) + + if shouldBreak(trigger, totErr) { + _ = o.store.SetTaskBroken(ctx, task.ID) + o.hub.Publish(wshub.Event{Type: "task_broken", TaskID: task.ID, + Data: map[string]any{"task_id": task.ID, "errors": totErr}}) + } } func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID int64, a store.Account, srcEP, dstEP imapx.Endpoint) (int64, int64, int64) {