diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index e92277c..878f47b 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -5,6 +5,7 @@ import ( "errors" "log/slog" "sync" + "time" "github.com/vasyansk/imap-copier/internal/crypto" "github.com/vasyansk/imap-copier/internal/imapx" @@ -267,15 +268,36 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in } var copied, skipped, errs int64 + // Account-level live progress state (all callbacks run on this goroutine, + // so plain vars are race-free). base* = totals from completed folders; + // c/s inside OnProgress are cumulative within the current folder. + var baseCopied, baseSkipped int64 + var curFolder string + var curTotal int64 + var lastEmit time.Time deps := imapx.CopyDeps{ IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) }, MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) }, OnProgress: func(c, s int) { - o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, - Data: map[string]any{"account_id": a.ID, "copied": c, "skipped": s}}) + now := time.Now() + done := c + s + // throttle to ~3/sec per account, but always emit folder completion + if now.Sub(lastEmit) < 350*time.Millisecond && int64(done) < curTotal { + return + } + lastEmit = now + o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, Data: map[string]any{ + "account_id": a.ID, + "copied": baseCopied + int64(c), + "skipped": baseSkipped + int64(s), + "folder": curFolder, + "folder_done": done, + "folder_total": curTotal, + }}) }, // Fires after EXAMINE (before the long fetch) with the folder's message count. OnFolder: func(srcFolder, dstFolder string, total int64) { + curFolder, curTotal = srcFolder, total o.hub.Publish(wshub.Event{Type: "folder", TaskID: task.ID, Data: map[string]any{ "account_id": a.ID, "src_login": a.SrcLogin, "folder": srcFolder, "dst_folder": dstFolder, "messages": total, @@ -301,6 +323,8 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in copied += int64(res.Copied) skipped += int64(res.Skipped) errs += int64(res.Errors) + baseCopied += int64(res.Copied) + baseSkipped += int64(res.Skipped) _ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors)) } diff --git a/web/src/app.css b/web/src/app.css index 7f7d31f..837accf 100644 --- a/web/src/app.css +++ b/web/src/app.css @@ -254,6 +254,34 @@ cursor: not-allowed; } +/* per-account live progress */ +.acct-progress { + display: flex; + flex-direction: column; + gap: 4px; + min-width: 160px; +} + +.pbar { + height: 4px; + background: var(--bg-inset); + border: 1px solid var(--border); + overflow: hidden; +} + +.pbar-fill { + display: block; + height: 100%; + background: var(--accent); + transition: width 0.3s ease-out; +} + +.pmeta { + font-size: 10px; + color: var(--fg-dim); + letter-spacing: 0.04em; +} + /* clear-log button: mirrors the .panel-label tab on the right edge */ .log-clear { position: absolute; diff --git a/web/src/pages/TaskDetail.tsx b/web/src/pages/TaskDetail.tsx index 40f9ea3..0957596 100644 --- a/web/src/pages/TaskDetail.tsx +++ b/web/src/pages/TaskDetail.tsx @@ -6,6 +6,25 @@ import { useConfirm } from '../components/ConfirmProvider' const emptyAccount = { src_login: '', src_pass: '', dst_login: '', dst_pass: '' } +// Live per-account progress derived from throttled `progress` WS events. +type LiveProgress = { + copied: number + skipped: number + folder?: string + folderDone: number + folderTotal: number + startTs: number + startCount: number + speed: number // messages/sec, averaged since the account's run started +} + +function fmtDuration(sec: number): string { + if (!isFinite(sec) || sec < 0) return '—' + const m = Math.floor(sec / 60) + const s = Math.floor(sec % 60) + return `${m}:${String(s).padStart(2, '0')}` +} + // Human-readable one-line description of a task event for the log panel. function describeEvent(ev: TaskEvent): string { const d = (ev.data ?? {}) as Record @@ -20,8 +39,11 @@ function describeEvent(ev: TaskEvent): string { return `START #${d.account_id}: ${d.src_login}@${d.src_host}:${d.src_port} → ${d.dst_login}@${d.dst_host}:${d.dst_port}` case 'account_done': return `DONE #${d.account_id} (${d.src_login} → ${d.dst_login}): copied ${d.copied}, skipped ${d.skipped}, errors ${d.errors}` - case 'progress': - return `progress #${d.account_id}: copied ${d.copied}, skipped ${d.skipped}` + case 'progress': { + const pct = d.folder_total ? Math.floor((Number(d.folder_done) / Number(d.folder_total)) * 100) : 0 + const loc = d.folder ? `"${d.folder}" ${d.folder_done}/${d.folder_total} (${pct}%) · ` : '' + return `progress #${d.account_id}: ${loc}copied ${d.copied}, skipped ${d.skipped}` + } case 'folder': { const route = d.dst_folder && d.dst_folder !== d.folder ? ` → "${d.dst_folder}"` : '' return `folder "${d.folder}"${route}: ${d.messages ?? 0} messages — fetching (#${d.account_id})` @@ -49,6 +71,7 @@ export function TaskDetail({ id }: { id: number }) { const [busy, setBusy] = useState<'test' | 'run' | 'add' | 'import' | 'delete' | null>(null) const confirm = useConfirm() const [error, setError] = useState(null) + const [live, setLive] = useState>({}) const fileInputRef = useRef(null) function reload() { @@ -66,7 +89,46 @@ export function TaskDetail({ id }: { id: number }) { () => connectTaskWS(id, (ev: TaskEvent) => { setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300)) - if (['account_started', 'account_test', 'account_done', 'progress', 'run_started', 'run_done', 'error', 'folder', 'cancelled'].includes(ev.type)) { + const d = (ev.data ?? {}) as Record + const accId = typeof d.account_id === 'number' ? d.account_id : undefined + + if (ev.type === 'progress' && accId != null) { + const now = Date.now() + const copied = Number(d.copied ?? 0) + const skipped = Number(d.skipped ?? 0) + const processed = copied + skipped + setLive((prev) => { + const cur = prev[accId] + const startTs = cur?.startTs ?? now + const startCount = cur?.startCount ?? processed + const dt = (now - startTs) / 1000 + const speed = dt > 0.5 ? (processed - startCount) / dt : (cur?.speed ?? 0) + return { + ...prev, + [accId]: { + copied, + skipped, + folder: d.folder as string | undefined, + folderDone: Number(d.folder_done ?? 0), + folderTotal: Number(d.folder_total ?? 0), + startTs, + startCount, + speed, + }, + } + }) + } else if (accId != null && (ev.type === 'account_started' || ev.type === 'account_done' || ev.type === 'cancelled' || (ev.type === 'error' && d.folder == null))) { + // terminal/reset for this account — drop live overlay, fall back to DB + setLive((prev) => { + if (!(accId in prev)) return prev + const next = { ...prev } + delete next[accId] + return next + }) + } + + // Structural events refresh the persisted view; `progress` is covered by live state. + if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled'].includes(ev.type)) { reload() } }), @@ -188,8 +250,14 @@ export function TaskDetail({ id }: { id: number }) { const { task, accounts } = data const allTested = accounts.length > 0 && accounts.every((a) => a.test_src_status === 'ok' && a.test_dst_status === 'ok') + // Prefer live (WS) copied/skipped over the DB values, which only advance per + // folder — so the summary moves in real time during a large folder. const totals = accounts.reduce( - (acc, a) => ({ copied: acc.copied + a.copied, skipped: acc.skipped + a.skipped, errors: acc.errors + a.errors }), + (acc, a) => ({ + copied: acc.copied + (live[a.id]?.copied ?? a.copied), + skipped: acc.skipped + (live[a.id]?.skipped ?? a.skipped), + errors: acc.errors + a.errors, + }), { copied: 0, skipped: 0, errors: 0 }, ) @@ -337,6 +405,7 @@ export function TaskDetail({ id }: { id: number }) { Src test Dst test Status + Progress Copied Skipped Errors @@ -346,7 +415,7 @@ export function TaskDetail({ id }: { id: number }) { {accounts.length === 0 ? ( - no accounts yet — add one or import a CSV above + no accounts yet — add one or import a CSV above ) : ( accounts.map((a) => ( @@ -362,8 +431,27 @@ export function TaskDetail({ id }: { id: number }) { - {a.copied} - {a.skipped} + + {(() => { + const lv = live[a.id] + if (!lv || !lv.folderTotal) return + const pct = Math.min(100, Math.floor((lv.folderDone / lv.folderTotal) * 100)) + const eta = lv.speed > 0 ? (lv.folderTotal - lv.folderDone) / lv.speed : Infinity + return ( +
+
+ +
+ + {lv.folder ? `${lv.folder} · ` : ''} + {pct}% · {lv.speed >= 1 ? Math.round(lv.speed) : lv.speed.toFixed(1)}/s · ETA {fmtDuration(eta)} + +
+ ) + })()} + + {live[a.id]?.copied ?? a.copied} + {live[a.id]?.skipped ?? a.skipped} {a.errors} {a.status === 'running' ? (