fix(orchestrator): recover from panics in run goroutines to avoid process crash and stuck 'running' task

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
This commit is contained in:
2026-07-01 19:36:40 +07:00
parent 2429c786e4
commit 8af7b11791
+19
View File
@@ -124,6 +124,14 @@ func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) {
}
func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint) {
defer func() {
if r := recover(); r != nil {
slog.Error("run coordinator panicked", "task", task.ID, "run", runID, "panic", r)
_ = o.store.FinishRun(ctx, runID, "error", 0, 0, 0)
_ = o.store.SetTaskStatus(ctx, task.ID, "error")
}
}()
var (
mu sync.Mutex
totCopied, totSkipped, totErr int64
@@ -137,6 +145,17 @@ func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64,
go func(a store.Account) {
defer wg.Done()
defer func() { <-sem }()
defer func() {
if r := recover(); r != nil {
slog.Error("account worker panicked", "task", task.ID, "account", a.ID, "panic", r)
_ = o.store.SetAccountStatus(ctx, a.ID, "error")
o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID,
Data: map[string]any{"account_id": a.ID, "error": "internal panic"}})
mu.Lock()
totErr++
mu.Unlock()
}
}()
c, s, e := o.runAccount(ctx, task, runID, a, srcEP, dstEP)
mu.Lock()
totCopied += c