feat: real-time progress with per-folder bar, speed and ETA

- orchestrator: progress events now carry account-level cumulative copied/
  skipped plus current folder done/total, throttled to ~3/sec per account
- web: RUN CONTROL counters and account copied/skipped read live WS values
  (DB only advances per folder, so the summary lagged); new Progress column
  shows a bar, percent, avg messages/sec and folder ETA while running

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
This commit is contained in:
2026-07-02 13:14:55 +07:00
parent 6395099b53
commit e1911ef13b
3 changed files with 149 additions and 9 deletions
+26 -2
View File
@@ -5,6 +5,7 @@ import (
"errors" "errors"
"log/slog" "log/slog"
"sync" "sync"
"time"
"github.com/vasyansk/imap-copier/internal/crypto" "github.com/vasyansk/imap-copier/internal/crypto"
"github.com/vasyansk/imap-copier/internal/imapx" "github.com/vasyansk/imap-copier/internal/imapx"
@@ -267,15 +268,36 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
} }
var copied, skipped, errs int64 var copied, skipped, errs int64
// Account-level live progress state (all callbacks run on this goroutine,
// so plain vars are race-free). base* = totals from completed folders;
// c/s inside OnProgress are cumulative within the current folder.
var baseCopied, baseSkipped int64
var curFolder string
var curTotal int64
var lastEmit time.Time
deps := imapx.CopyDeps{ deps := imapx.CopyDeps{
IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) }, IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) },
MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) }, MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) },
OnProgress: func(c, s int) { OnProgress: func(c, s int) {
o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, now := time.Now()
Data: map[string]any{"account_id": a.ID, "copied": c, "skipped": s}}) done := c + s
// throttle to ~3/sec per account, but always emit folder completion
if now.Sub(lastEmit) < 350*time.Millisecond && int64(done) < curTotal {
return
}
lastEmit = now
o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID,
"copied": baseCopied + int64(c),
"skipped": baseSkipped + int64(s),
"folder": curFolder,
"folder_done": done,
"folder_total": curTotal,
}})
}, },
// Fires after EXAMINE (before the long fetch) with the folder's message count. // Fires after EXAMINE (before the long fetch) with the folder's message count.
OnFolder: func(srcFolder, dstFolder string, total int64) { OnFolder: func(srcFolder, dstFolder string, total int64) {
curFolder, curTotal = srcFolder, total
o.hub.Publish(wshub.Event{Type: "folder", TaskID: task.ID, Data: map[string]any{ o.hub.Publish(wshub.Event{Type: "folder", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID, "src_login": a.SrcLogin, "account_id": a.ID, "src_login": a.SrcLogin,
"folder": srcFolder, "dst_folder": dstFolder, "messages": total, "folder": srcFolder, "dst_folder": dstFolder, "messages": total,
@@ -301,6 +323,8 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
copied += int64(res.Copied) copied += int64(res.Copied)
skipped += int64(res.Skipped) skipped += int64(res.Skipped)
errs += int64(res.Errors) errs += int64(res.Errors)
baseCopied += int64(res.Copied)
baseSkipped += int64(res.Skipped)
_ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors)) _ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors))
} }
+28
View File
@@ -254,6 +254,34 @@
cursor: not-allowed; cursor: not-allowed;
} }
/* per-account live progress */
.acct-progress {
display: flex;
flex-direction: column;
gap: 4px;
min-width: 160px;
}
.pbar {
height: 4px;
background: var(--bg-inset);
border: 1px solid var(--border);
overflow: hidden;
}
.pbar-fill {
display: block;
height: 100%;
background: var(--accent);
transition: width 0.3s ease-out;
}
.pmeta {
font-size: 10px;
color: var(--fg-dim);
letter-spacing: 0.04em;
}
/* clear-log button: mirrors the .panel-label tab on the right edge */ /* clear-log button: mirrors the .panel-label tab on the right edge */
.log-clear { .log-clear {
position: absolute; position: absolute;
+95 -7
View File
@@ -6,6 +6,25 @@ import { useConfirm } from '../components/ConfirmProvider'
const emptyAccount = { src_login: '', src_pass: '', dst_login: '', dst_pass: '' } const emptyAccount = { src_login: '', src_pass: '', dst_login: '', dst_pass: '' }
// Live per-account progress derived from throttled `progress` WS events.
type LiveProgress = {
copied: number
skipped: number
folder?: string
folderDone: number
folderTotal: number
startTs: number
startCount: number
speed: number // messages/sec, averaged since the account's run started
}
function fmtDuration(sec: number): string {
if (!isFinite(sec) || sec < 0) return '—'
const m = Math.floor(sec / 60)
const s = Math.floor(sec % 60)
return `${m}:${String(s).padStart(2, '0')}`
}
// Human-readable one-line description of a task event for the log panel. // Human-readable one-line description of a task event for the log panel.
function describeEvent(ev: TaskEvent): string { function describeEvent(ev: TaskEvent): string {
const d = (ev.data ?? {}) as Record<string, unknown> const d = (ev.data ?? {}) as Record<string, unknown>
@@ -20,8 +39,11 @@ function describeEvent(ev: TaskEvent): string {
return `START #${d.account_id}: ${d.src_login}@${d.src_host}:${d.src_port}${d.dst_login}@${d.dst_host}:${d.dst_port}` return `START #${d.account_id}: ${d.src_login}@${d.src_host}:${d.src_port}${d.dst_login}@${d.dst_host}:${d.dst_port}`
case 'account_done': case 'account_done':
return `DONE #${d.account_id} (${d.src_login}${d.dst_login}): copied ${d.copied}, skipped ${d.skipped}, errors ${d.errors}` return `DONE #${d.account_id} (${d.src_login}${d.dst_login}): copied ${d.copied}, skipped ${d.skipped}, errors ${d.errors}`
case 'progress': case 'progress': {
return `progress #${d.account_id}: copied ${d.copied}, skipped ${d.skipped}` const pct = d.folder_total ? Math.floor((Number(d.folder_done) / Number(d.folder_total)) * 100) : 0
const loc = d.folder ? `"${d.folder}" ${d.folder_done}/${d.folder_total} (${pct}%) · ` : ''
return `progress #${d.account_id}: ${loc}copied ${d.copied}, skipped ${d.skipped}`
}
case 'folder': { case 'folder': {
const route = d.dst_folder && d.dst_folder !== d.folder ? ` → "${d.dst_folder}"` : '' const route = d.dst_folder && d.dst_folder !== d.folder ? ` → "${d.dst_folder}"` : ''
return `folder "${d.folder}"${route}: ${d.messages ?? 0} messages — fetching (#${d.account_id})` return `folder "${d.folder}"${route}: ${d.messages ?? 0} messages — fetching (#${d.account_id})`
@@ -49,6 +71,7 @@ export function TaskDetail({ id }: { id: number }) {
const [busy, setBusy] = useState<'test' | 'run' | 'add' | 'import' | 'delete' | null>(null) const [busy, setBusy] = useState<'test' | 'run' | 'add' | 'import' | 'delete' | null>(null)
const confirm = useConfirm() const confirm = useConfirm()
const [error, setError] = useState<string | null>(null) const [error, setError] = useState<string | null>(null)
const [live, setLive] = useState<Record<number, LiveProgress>>({})
const fileInputRef = useRef<HTMLInputElement>(null) const fileInputRef = useRef<HTMLInputElement>(null)
function reload() { function reload() {
@@ -66,7 +89,46 @@ export function TaskDetail({ id }: { id: number }) {
() => () =>
connectTaskWS(id, (ev: TaskEvent) => { connectTaskWS(id, (ev: TaskEvent) => {
setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300)) setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300))
if (['account_started', 'account_test', 'account_done', 'progress', 'run_started', 'run_done', 'error', 'folder', 'cancelled'].includes(ev.type)) { const d = (ev.data ?? {}) as Record<string, number | string | undefined>
const accId = typeof d.account_id === 'number' ? d.account_id : undefined
if (ev.type === 'progress' && accId != null) {
const now = Date.now()
const copied = Number(d.copied ?? 0)
const skipped = Number(d.skipped ?? 0)
const processed = copied + skipped
setLive((prev) => {
const cur = prev[accId]
const startTs = cur?.startTs ?? now
const startCount = cur?.startCount ?? processed
const dt = (now - startTs) / 1000
const speed = dt > 0.5 ? (processed - startCount) / dt : (cur?.speed ?? 0)
return {
...prev,
[accId]: {
copied,
skipped,
folder: d.folder as string | undefined,
folderDone: Number(d.folder_done ?? 0),
folderTotal: Number(d.folder_total ?? 0),
startTs,
startCount,
speed,
},
}
})
} else if (accId != null && (ev.type === 'account_started' || ev.type === 'account_done' || ev.type === 'cancelled' || (ev.type === 'error' && d.folder == null))) {
// terminal/reset for this account — drop live overlay, fall back to DB
setLive((prev) => {
if (!(accId in prev)) return prev
const next = { ...prev }
delete next[accId]
return next
})
}
// Structural events refresh the persisted view; `progress` is covered by live state.
if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled'].includes(ev.type)) {
reload() reload()
} }
}), }),
@@ -188,8 +250,14 @@ export function TaskDetail({ id }: { id: number }) {
const { task, accounts } = data const { task, accounts } = data
const allTested = accounts.length > 0 && accounts.every((a) => a.test_src_status === 'ok' && a.test_dst_status === 'ok') const allTested = accounts.length > 0 && accounts.every((a) => a.test_src_status === 'ok' && a.test_dst_status === 'ok')
// Prefer live (WS) copied/skipped over the DB values, which only advance per
// folder — so the summary moves in real time during a large folder.
const totals = accounts.reduce( const totals = accounts.reduce(
(acc, a) => ({ copied: acc.copied + a.copied, skipped: acc.skipped + a.skipped, errors: acc.errors + a.errors }), (acc, a) => ({
copied: acc.copied + (live[a.id]?.copied ?? a.copied),
skipped: acc.skipped + (live[a.id]?.skipped ?? a.skipped),
errors: acc.errors + a.errors,
}),
{ copied: 0, skipped: 0, errors: 0 }, { copied: 0, skipped: 0, errors: 0 },
) )
@@ -337,6 +405,7 @@ export function TaskDetail({ id }: { id: number }) {
<th>Src test</th> <th>Src test</th>
<th>Dst test</th> <th>Dst test</th>
<th>Status</th> <th>Status</th>
<th>Progress</th>
<th>Copied</th> <th>Copied</th>
<th>Skipped</th> <th>Skipped</th>
<th>Errors</th> <th>Errors</th>
@@ -346,7 +415,7 @@ export function TaskDetail({ id }: { id: number }) {
<tbody> <tbody>
{accounts.length === 0 ? ( {accounts.length === 0 ? (
<tr className="empty-row"> <tr className="empty-row">
<td colSpan={9}>no accounts yet add one or import a CSV above</td> <td colSpan={10}>no accounts yet add one or import a CSV above</td>
</tr> </tr>
) : ( ) : (
accounts.map((a) => ( accounts.map((a) => (
@@ -362,8 +431,27 @@ export function TaskDetail({ id }: { id: number }) {
<td> <td>
<StatusBadge status={a.status} /> <StatusBadge status={a.status} />
</td> </td>
<td className="num-cell">{a.copied}</td> <td className="progress-cell">
<td className="num-cell">{a.skipped}</td> {(() => {
const lv = live[a.id]
if (!lv || !lv.folderTotal) return <span className="muted-note"></span>
const pct = Math.min(100, Math.floor((lv.folderDone / lv.folderTotal) * 100))
const eta = lv.speed > 0 ? (lv.folderTotal - lv.folderDone) / lv.speed : Infinity
return (
<div className="acct-progress">
<div className="pbar">
<span className="pbar-fill" style={{ width: `${pct}%` }} />
</div>
<span className="pmeta mono-num">
{lv.folder ? `${lv.folder} · ` : ''}
{pct}% · {lv.speed >= 1 ? Math.round(lv.speed) : lv.speed.toFixed(1)}/s · ETA {fmtDuration(eta)}
</span>
</div>
)
})()}
</td>
<td className="num-cell">{live[a.id]?.copied ?? a.copied}</td>
<td className="num-cell">{live[a.id]?.skipped ?? a.skipped}</td>
<td className="num-cell">{a.errors}</td> <td className="num-cell">{a.errors}</td>
<td className="num-cell"> <td className="num-cell">
{a.status === 'running' ? ( {a.status === 'running' ? (