diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index b693a5a..5372f91 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -124,6 +124,14 @@ func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) { } func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint) { + defer func() { + if r := recover(); r != nil { + slog.Error("run coordinator panicked", "task", task.ID, "run", runID, "panic", r) + _ = o.store.FinishRun(ctx, runID, "error", 0, 0, 0) + _ = o.store.SetTaskStatus(ctx, task.ID, "error") + } + }() + var ( mu sync.Mutex totCopied, totSkipped, totErr int64 @@ -137,6 +145,17 @@ func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, go func(a store.Account) { defer wg.Done() defer func() { <-sem }() + defer func() { + if r := recover(); r != nil { + slog.Error("account worker panicked", "task", task.ID, "account", a.ID, "panic", r) + _ = o.store.SetAccountStatus(ctx, a.ID, "error") + o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, + Data: map[string]any{"account_id": a.ID, "error": "internal panic"}}) + mu.Lock() + totErr++ + mu.Unlock() + } + }() c, s, e := o.runAccount(ctx, task, runID, a, srcEP, dstEP) mu.Lock() totCopied += c