From f024f329fc42277af58808ec8dc91813e3456daf Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Thu, 2 Jul 2026 12:47:07 +0700 Subject: [PATCH] feat: per-account cancel + folder message-count progress in log - orchestrator: per-account cancellable context registry + CancelAccount; on cancel, close IMAP connections to unblock in-flight FETCH; account ends in 'cancelled' status with a cancelled event - imapx: CopyDeps.OnFolder callback fires after EXAMINE with the folder's message count (before the long fetch) for visibility - httpapi: POST /tasks/{id}/accounts/{accountId}/cancel - web: per-row cancel button while running, folder event shows N messages, cancelled/done_with_errors status badges Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd --- internal/httpapi/router.go | 1 + internal/httpapi/run.go | 13 +++++ internal/imapx/copy.go | 7 +++ internal/orchestrator/orchestrator.go | 82 +++++++++++++++++++++++++-- web/src/api.ts | 3 + web/src/components/StatusBadge.tsx | 2 +- web/src/pages/TaskDetail.tsx | 41 ++++++++++---- 7 files changed, 133 insertions(+), 16 deletions(-) diff --git a/internal/httpapi/router.go b/internal/httpapi/router.go index 437e51c..f05ac16 100644 --- a/internal/httpapi/router.go +++ b/internal/httpapi/router.go @@ -24,6 +24,7 @@ func (s *Server) Router() http.Handler { api.HandleFunc("POST /api/tasks/{id}/import", s.handleImportCSV) api.HandleFunc("POST /api/tasks/{id}/test", s.handleTestAccounts) api.HandleFunc("POST /api/tasks/{id}/run", s.handleRun) + api.HandleFunc("POST /api/tasks/{id}/accounts/{accountId}/cancel", s.handleCancelAccount) mux.Handle("/api/", s.requireAuth(api)) mux.Handle("/ws", s.requireAuth(http.HandlerFunc(s.handleWS))) diff --git a/internal/httpapi/run.go b/internal/httpapi/run.go index 88ed0f9..4c44a26 100644 --- a/internal/httpapi/run.go +++ b/internal/httpapi/run.go @@ -85,6 +85,19 @@ func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusAccepted, map[string]int64{"run_id": runID}) } +func (s *Server) handleCancelAccount(w http.ResponseWriter, r *http.Request) { + accID, err := pathID(r, "accountId") + if err != nil { + http.Error(w, "bad account id", http.StatusBadRequest) + return + } + if !s.orch.CancelAccount(accID) { + http.Error(w, "account is not running", http.StatusConflict) + return + } + w.WriteHeader(http.StatusAccepted) +} + func (s *Server) handleDeleteTask(w http.ResponseWriter, r *http.Request) { id, err := pathID(r, "id") if err != nil { diff --git a/internal/imapx/copy.go b/internal/imapx/copy.go index 46ac07f..9a6fca9 100644 --- a/internal/imapx/copy.go +++ b/internal/imapx/copy.go @@ -19,6 +19,10 @@ type CopyDeps struct { IsMigrated func(key string) (bool, error) MarkMigrated func(folder, key string) error OnProgress func(copied, skipped int) + // OnFolder is called once per folder right after EXAMINE, before the + // (potentially long) envelope fetch, with the message count in the source + // folder — for progress visibility. + OnFolder func(srcFolder, dstFolder string, total int64) } // CopyResult summarizes the outcome of one CopyFolder run. @@ -42,6 +46,9 @@ func CopyFolder(ctx context.Context, src, dst *imapclient.Client, srcFolder, dst if err != nil { return res, fmt.Errorf("examine src %q: %w", srcFolder, err) } + if deps.OnFolder != nil { + deps.OnFolder(srcFolder, dstFolder, int64(sel.NumMessages)) + } if sel.NumMessages == 0 { return res, nil } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 4fdfa20..e92277c 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -20,10 +20,37 @@ type Orchestrator struct { hub *wshub.Hub encKey []byte concurrency int + + mu sync.Mutex + cancels map[int64]context.CancelFunc // account_id -> cancel of its in-flight copy } func New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator { - return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency} + return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency, cancels: map[int64]context.CancelFunc{}} +} + +// CancelAccount aborts the in-flight copy for one account, if it is running. +// Returns true if a running copy was found and signalled to stop. +func (o *Orchestrator) CancelAccount(accountID int64) bool { + o.mu.Lock() + cancel, ok := o.cancels[accountID] + o.mu.Unlock() + if ok { + cancel() + } + return ok +} + +func (o *Orchestrator) registerCancel(accountID int64, cancel context.CancelFunc) { + o.mu.Lock() + o.cancels[accountID] = cancel + o.mu.Unlock() +} + +func (o *Orchestrator) unregisterCancel(accountID int64) { + o.mu.Lock() + delete(o.cancels, accountID) + o.mu.Unlock() } func gateOK(accs []store.Account) bool { @@ -190,6 +217,16 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in }}) _ = o.store.SetAccountStatus(ctx, a.ID, "running") + // Per-account cancellable context: IMAP work uses actx (so CancelAccount + // stops it); DB writes keep the parent ctx so status/counters persist even + // after cancellation. ctx is context.WithoutCancel from runAll. + actx, cancel := context.WithCancel(ctx) + o.registerCancel(a.ID, cancel) + defer func() { + o.unregisterCancel(a.ID) + cancel() + }() + srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc) if err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err) @@ -199,7 +236,7 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err) } - src, err := imapx.Connect(ctx, srcEP) + src, err := imapx.Connect(actx, srcEP) if err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err) } @@ -207,7 +244,7 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err) } - dst, err := imapx.Connect(ctx, dstEP) + dst, err := imapx.Connect(actx, dstEP) if err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err) } @@ -216,6 +253,14 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err) } + // On cancel, close the connections so any in-flight network read (a slow + // FETCH/Collect that ctx.Err() checks can't interrupt) unblocks immediately. + go func() { + <-actx.Done() + _ = src.Close() + _ = dst.Close() + }() + folders, err := imapx.ListFolders(src) if err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err) @@ -229,14 +274,24 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, Data: map[string]any{"account_id": a.ID, "copied": c, "skipped": s}}) }, + // Fires after EXAMINE (before the long fetch) with the folder's message count. + OnFolder: func(srcFolder, dstFolder string, total int64) { + o.hub.Publish(wshub.Event{Type: "folder", TaskID: task.ID, Data: map[string]any{ + "account_id": a.ID, "src_login": a.SrcLogin, + "folder": srcFolder, "dst_folder": dstFolder, "messages": total, + }}) + }, } for _, folder := range folders { + if actx.Err() != nil { + break // cancelled — stop scheduling more folders + } dstFolder := folder if m, ok := task.FolderMapping[folder]; ok { dstFolder = m } - res, err := imapx.CopyFolder(ctx, src, dst, folder, dstFolder, deps) - if err != nil { + res, err := imapx.CopyFolder(actx, src, dst, folder, dstFolder, deps) + if err != nil && actx.Err() == nil { slog.Warn("folder copy error", "account", a.ID, "src_login", a.SrcLogin, "folder", folder, "err", err) errs++ o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, Data: map[string]any{ @@ -248,6 +303,16 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in errs += int64(res.Errors) _ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors)) } + + if actx.Err() != nil { + _ = o.store.SetAccountStatus(ctx, a.ID, "cancelled") + o.hub.Publish(wshub.Event{Type: "cancelled", TaskID: task.ID, + Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin, + "copied": copied, "skipped": skipped, "errors": errs}}) + slog.Info("account cancelled", "account", a.ID, "src_login", a.SrcLogin, "copied", copied, "skipped", skipped) + return copied, skipped, errs + } + _ = o.store.SetAccountStatus(ctx, a.ID, "done") o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID, Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin, "dst_login": a.DstLogin, @@ -257,6 +322,13 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in } func (o *Orchestrator) accountFailed(ctx context.Context, taskID int64, a store.Account, srcEP, dstEP imapx.Endpoint, side string, err error) (int64, int64, int64) { + // A cancellation surfacing as an error is a cancel, not a failure. + if errors.Is(err, context.Canceled) { + _ = o.store.SetAccountStatus(ctx, a.ID, "cancelled") + o.hub.Publish(wshub.Event{Type: "cancelled", TaskID: taskID, + Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin}}) + return 0, 0, 0 + } login, host, port := a.SrcLogin, srcEP.Host, srcEP.Port if side == "dst" { login, host, port = a.DstLogin, dstEP.Host, dstEP.Port diff --git a/web/src/api.ts b/web/src/api.ts index 8abf0fb..7a2ca86 100644 --- a/web/src/api.ts +++ b/web/src/api.ts @@ -81,6 +81,9 @@ export const deleteTask = (id: number) => api(`/api/tasks/${id}`, { method: 'DEL export const deleteAccount = (taskId: number, accountId: number) => api(`/api/tasks/${taskId}/accounts/${accountId}`, { method: 'DELETE' }) +export const cancelAccount = (taskId: number, accountId: number) => + api(`/api/tasks/${taskId}/accounts/${accountId}/cancel`, { method: 'POST' }) + export const listTasks = () => api('/api/tasks') export const getTask = (id: number) => api(`/api/tasks/${id}`) diff --git a/web/src/components/StatusBadge.tsx b/web/src/components/StatusBadge.tsx index ae478d8..1d00558 100644 --- a/web/src/components/StatusBadge.tsx +++ b/web/src/components/StatusBadge.tsx @@ -2,7 +2,7 @@ export function StatusBadge({ status }: { status: string }) { const s = (status || 'pending').toLowerCase() let cls = 'badge-pending' if (s === 'ok' || s === 'done' || s === 'success') cls = 'badge-ok' - else if (s === 'fail' || s === 'failed' || s === 'error') cls = 'badge-fail' + else if (s === 'fail' || s === 'failed' || s.includes('error')) cls = 'badge-fail' else if (s === 'running' || s === 'testing' || s === 'in_progress') cls = 'badge-info' return ( diff --git a/web/src/pages/TaskDetail.tsx b/web/src/pages/TaskDetail.tsx index feb5df4..40f9ea3 100644 --- a/web/src/pages/TaskDetail.tsx +++ b/web/src/pages/TaskDetail.tsx @@ -1,5 +1,5 @@ import { useEffect, useRef, useState, type ChangeEvent, type FormEvent } from 'react' -import { createAccount, deleteAccount, getTask, importCSV, runTask, testAccounts, type TaskDetail as TaskDetailData } from '../api' +import { cancelAccount, createAccount, deleteAccount, getTask, importCSV, runTask, testAccounts, type TaskDetail as TaskDetailData } from '../api' import { connectTaskWS, type TaskEvent } from '../ws' import { StatusBadge } from '../components/StatusBadge' import { useConfirm } from '../components/ConfirmProvider' @@ -22,6 +22,12 @@ function describeEvent(ev: TaskEvent): string { return `DONE #${d.account_id} (${d.src_login} → ${d.dst_login}): copied ${d.copied}, skipped ${d.skipped}, errors ${d.errors}` case 'progress': return `progress #${d.account_id}: copied ${d.copied}, skipped ${d.skipped}` + case 'folder': { + const route = d.dst_folder && d.dst_folder !== d.folder ? ` → "${d.dst_folder}"` : '' + return `folder "${d.folder}"${route}: ${d.messages ?? 0} messages — fetching (#${d.account_id})` + } + case 'cancelled': + return `CANCELLED #${d.account_id} (${d.src_login}): copied ${d.copied ?? 0}, skipped ${d.skipped ?? 0}` case 'error': { const where = d.folder ? ` folder "${d.folder}"` : d.side ? ` (${d.side} ${at})` : '' return `ERROR #${d.account_id}${where}: ${d.error}` @@ -60,7 +66,7 @@ export function TaskDetail({ id }: { id: number }) { () => connectTaskWS(id, (ev: TaskEvent) => { setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300)) - if (['account_started', 'account_test', 'account_done', 'progress', 'run_started', 'run_done', 'error'].includes(ev.type)) { + if (['account_started', 'account_test', 'account_done', 'progress', 'run_started', 'run_done', 'error', 'folder', 'cancelled'].includes(ev.type)) { reload() } }), @@ -132,6 +138,15 @@ export function TaskDetail({ id }: { id: number }) { } } + async function onCancelAccount(accId: number) { + setError(null) + try { + await cancelAccount(id, accId) + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to cancel account') + } + } + async function onTest() { setBusy('test') setError(null) @@ -351,14 +366,20 @@ export function TaskDetail({ id }: { id: number }) { {a.skipped} {a.errors} - + {a.status === 'running' ? ( + + ) : ( + + )} ))