From a2234077df026db31e40a25aa275af4a78bc7ffd Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Thu, 2 Jul 2026 13:35:08 +0700 Subject: [PATCH] feat: stream the metadata scan instead of Collect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CopyFolder now streams envelope metadata via Next() in a first pass (dedup + queue new messages), then streams bodies for new ones in a second pass — no more blocking Collect of the whole folder with zero feedback, and memory stays flat (only new-message meta is held). - imapx: two-pass streaming CopyFolder + CopyDeps.OnScan(scanned,total) - orchestrator: throttled 'scan' events during the metadata pass - web: per-account 'scanning folder: X/N' line under the progress bar; scan events kept out of the log to avoid flooding Verified on greenmail: idempotency and internal-date preservation still hold. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd --- internal/imapx/copy.go | 71 ++++++++++++++++++++------- internal/orchestrator/orchestrator.go | 14 +++++- web/src/app.css | 4 ++ web/src/pages/TaskDetail.tsx | 22 ++++++++- 4 files changed, 89 insertions(+), 22 deletions(-) diff --git a/internal/imapx/copy.go b/internal/imapx/copy.go index 9a6fca9..0bbcf62 100644 --- a/internal/imapx/copy.go +++ b/internal/imapx/copy.go @@ -23,6 +23,10 @@ type CopyDeps struct { // (potentially long) envelope fetch, with the message count in the source // folder — for progress visibility. OnFolder func(srcFolder, dstFolder string, total int64) + // OnScan is called during the streaming metadata pass with how many of the + // folder's messages have been examined so far — so the UI shows movement + // while dedup decisions are made, before bodies start copying. + OnScan func(scanned, total int64) } // CopyResult summarizes the outcome of one CopyFolder run. @@ -46,48 +50,77 @@ func CopyFolder(ctx context.Context, src, dst *imapclient.Client, srcFolder, dst if err != nil { return res, fmt.Errorf("examine src %q: %w", srcFolder, err) } + total := int64(sel.NumMessages) if deps.OnFolder != nil { - deps.OnFolder(srcFolder, dstFolder, int64(sel.NumMessages)) + deps.OnFolder(srcFolder, dstFolder, total) } - if sel.NumMessages == 0 { + if total == 0 { return res, nil } - // 1) Collect envelope+uid+size for every message (cheap pass, no bodies). - metaSet := imap.SeqSet{imap.SeqRange{Start: 1, Stop: sel.NumMessages}} - metas, err := src.Fetch(metaSet, &imap.FetchOptions{ - UID: true, Envelope: true, RFC822Size: true, Flags: true, InternalDate: true, - }).Collect() - if err != nil { - return res, fmt.Errorf("fetch meta: %w", err) - } - // dst folder must exist (idempotent create; ignore "already exists"). _ = dst.Create(dstFolder, nil).Wait() - for _, m := range metas { + // Pass 1: STREAM metadata (no bodies) via Next(), dedup as we go, and queue + // only the new messages. Streaming (not Collect) means progress shows during + // the scan and memory stays flat — we hold small meta for new messages only. + type queued struct { + uid imap.UID + key string + flags []imap.Flag + internalDate time.Time + } + var todo []queued + metaSet := imap.SeqSet{imap.SeqRange{Start: 1, Stop: sel.NumMessages}} + fc := src.Fetch(metaSet, &imap.FetchOptions{ + UID: true, Envelope: true, RFC822Size: true, Flags: true, InternalDate: true, + }) + var scanned int64 + for { if err := ctx.Err(); err != nil { + _ = fc.Close() return res, err } - - key := MessageKey(m.Envelope, m.RFC822Size) - already, err := deps.IsMigrated(key) + msg := fc.Next() + if msg == nil { + break + } + buf, err := msg.Collect() if err != nil { res.Errors++ continue } - if already { + scanned++ + key := MessageKey(buf.Envelope, buf.RFC822Size) + already, err := deps.IsMigrated(key) + if err != nil { + res.Errors++ + } else if already { res.Skipped++ if deps.OnProgress != nil { deps.OnProgress(res.Copied, res.Skipped) } - continue + } else { + todo = append(todo, queued{uid: buf.UID, key: key, flags: buf.Flags, internalDate: buf.InternalDate}) } - if err := streamOne(src, dst, dstFolder, m.UID, m.Flags, m.InternalDate); err != nil { + if deps.OnScan != nil { + deps.OnScan(scanned, total) + } + } + if err := fc.Close(); err != nil { + return res, fmt.Errorf("fetch meta %q: %w", srcFolder, err) + } + + // Pass 2: fetch bodies for the queued (new) messages, one at a time. + for _, q := range todo { + if err := ctx.Err(); err != nil { + return res, err + } + if err := streamOne(src, dst, dstFolder, q.uid, q.flags, q.internalDate); err != nil { res.Errors++ continue } - if err := deps.MarkMigrated(dstFolder, key); err != nil { + if err := deps.MarkMigrated(dstFolder, q.key); err != nil { res.Errors++ continue } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 4f8b4fd..590160f 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -301,7 +301,7 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in var baseCopied, baseSkipped int64 var curFolder string var curTotal int64 - var lastEmit time.Time + var lastEmit, lastScanEmit time.Time deps := imapx.CopyDeps{ IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) }, MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) }, @@ -331,6 +331,18 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in "folder": srcFolder, "dst_folder": dstFolder, "messages": total, }}) }, + // Fires while streaming metadata (dedup scan) so the UI shows movement + // before bodies start copying. Throttled to ~4/sec, always emit the last. + OnScan: func(scanned, total int64) { + now := time.Now() + if now.Sub(lastScanEmit) < 250*time.Millisecond && scanned < total { + return + } + lastScanEmit = now + o.hub.Publish(wshub.Event{Type: "scan", TaskID: task.ID, Data: map[string]any{ + "account_id": a.ID, "folder": curFolder, "scanned": scanned, "folder_total": total, + }}) + }, } for _, fp := range plan { if actx.Err() != nil { diff --git a/web/src/app.css b/web/src/app.css index 837accf..a0f1f11 100644 --- a/web/src/app.css +++ b/web/src/app.css @@ -282,6 +282,10 @@ letter-spacing: 0.04em; } +.pscan { + color: var(--info); +} + /* clear-log button: mirrors the .panel-label tab on the right edge */ .log-clear { position: absolute; diff --git a/web/src/pages/TaskDetail.tsx b/web/src/pages/TaskDetail.tsx index 8060a19..2f466b3 100644 --- a/web/src/pages/TaskDetail.tsx +++ b/web/src/pages/TaskDetail.tsx @@ -15,6 +15,9 @@ type LiveProgress = { startTs: number startCount: number speed: number // messages/sec, averaged since the account's run started + scanFolder?: string + scanned?: number + scanTotal?: number } function fmtDuration(sec: number): string { @@ -89,11 +92,20 @@ export function TaskDetail({ id }: { id: number }) { useEffect( () => connectTaskWS(id, (ev: TaskEvent) => { - setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300)) + // `scan` is high-frequency and shown in the progress cell, not the log. + if (ev.type !== 'scan') { + setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300)) + } const d = (ev.data ?? {}) as Record const accId = typeof d.account_id === 'number' ? d.account_id : undefined - if (ev.type === 'plan' && accId != null) { + if (ev.type === 'scan' && accId != null) { + setLive((prev) => { + const cur = prev[accId] + const base: LiveProgress = cur ?? { copied: 0, skipped: 0, total: 0, startTs: Date.now(), startCount: 0, speed: 0 } + return { ...prev, [accId]: { ...base, scanFolder: d.folder as string | undefined, scanned: Number(d.scanned ?? 0), scanTotal: Number(d.folder_total ?? 0) } } + }) + } else if (ev.type === 'plan' && accId != null) { const total = Number(d.total ?? 0) const now = Date.now() setLive((prev) => ({ @@ -453,6 +465,7 @@ export function TaskDetail({ id }: { id: number }) { const done = lv.copied + lv.skipped const pct = Math.min(100, Math.floor((done / lv.total) * 100)) const eta = lv.speed > 0 ? (lv.total - done) / lv.speed : Infinity + const scanning = lv.scanned != null && lv.scanTotal != null && lv.scanned < lv.scanTotal return (
@@ -462,6 +475,11 @@ export function TaskDetail({ id }: { id: number }) { {done}/{lv.total} ({pct}%) · {lv.speed >= 1 ? Math.round(lv.speed) : lv.speed.toFixed(1)}/s · ETA {fmtDuration(eta)} {lv.folder ? ` · ${lv.folder}` : ''} + {scanning && ( + + scanning {lv.scanFolder}: {lv.scanned}/{lv.scanTotal} + + )}
) })()}