Merge feat/streaming-scan: streaming metadata scan with live feedback

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
This commit is contained in:
2026-07-02 13:35:08 +07:00
4 changed files with 89 additions and 22 deletions
+52 -19
View File
@@ -23,6 +23,10 @@ type CopyDeps struct {
// (potentially long) envelope fetch, with the message count in the source // (potentially long) envelope fetch, with the message count in the source
// folder — for progress visibility. // folder — for progress visibility.
OnFolder func(srcFolder, dstFolder string, total int64) OnFolder func(srcFolder, dstFolder string, total int64)
// OnScan is called during the streaming metadata pass with how many of the
// folder's messages have been examined so far — so the UI shows movement
// while dedup decisions are made, before bodies start copying.
OnScan func(scanned, total int64)
} }
// CopyResult summarizes the outcome of one CopyFolder run. // CopyResult summarizes the outcome of one CopyFolder run.
@@ -46,48 +50,77 @@ func CopyFolder(ctx context.Context, src, dst *imapclient.Client, srcFolder, dst
if err != nil { if err != nil {
return res, fmt.Errorf("examine src %q: %w", srcFolder, err) return res, fmt.Errorf("examine src %q: %w", srcFolder, err)
} }
total := int64(sel.NumMessages)
if deps.OnFolder != nil { if deps.OnFolder != nil {
deps.OnFolder(srcFolder, dstFolder, int64(sel.NumMessages)) deps.OnFolder(srcFolder, dstFolder, total)
} }
if sel.NumMessages == 0 { if total == 0 {
return res, nil return res, nil
} }
// 1) Collect envelope+uid+size for every message (cheap pass, no bodies).
metaSet := imap.SeqSet{imap.SeqRange{Start: 1, Stop: sel.NumMessages}}
metas, err := src.Fetch(metaSet, &imap.FetchOptions{
UID: true, Envelope: true, RFC822Size: true, Flags: true, InternalDate: true,
}).Collect()
if err != nil {
return res, fmt.Errorf("fetch meta: %w", err)
}
// dst folder must exist (idempotent create; ignore "already exists"). // dst folder must exist (idempotent create; ignore "already exists").
_ = dst.Create(dstFolder, nil).Wait() _ = dst.Create(dstFolder, nil).Wait()
for _, m := range metas { // Pass 1: STREAM metadata (no bodies) via Next(), dedup as we go, and queue
// only the new messages. Streaming (not Collect) means progress shows during
// the scan and memory stays flat — we hold small meta for new messages only.
type queued struct {
uid imap.UID
key string
flags []imap.Flag
internalDate time.Time
}
var todo []queued
metaSet := imap.SeqSet{imap.SeqRange{Start: 1, Stop: sel.NumMessages}}
fc := src.Fetch(metaSet, &imap.FetchOptions{
UID: true, Envelope: true, RFC822Size: true, Flags: true, InternalDate: true,
})
var scanned int64
for {
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
_ = fc.Close()
return res, err return res, err
} }
msg := fc.Next()
key := MessageKey(m.Envelope, m.RFC822Size) if msg == nil {
already, err := deps.IsMigrated(key) break
}
buf, err := msg.Collect()
if err != nil { if err != nil {
res.Errors++ res.Errors++
continue continue
} }
if already { scanned++
key := MessageKey(buf.Envelope, buf.RFC822Size)
already, err := deps.IsMigrated(key)
if err != nil {
res.Errors++
} else if already {
res.Skipped++ res.Skipped++
if deps.OnProgress != nil { if deps.OnProgress != nil {
deps.OnProgress(res.Copied, res.Skipped) deps.OnProgress(res.Copied, res.Skipped)
} }
continue } else {
todo = append(todo, queued{uid: buf.UID, key: key, flags: buf.Flags, internalDate: buf.InternalDate})
} }
if err := streamOne(src, dst, dstFolder, m.UID, m.Flags, m.InternalDate); err != nil { if deps.OnScan != nil {
deps.OnScan(scanned, total)
}
}
if err := fc.Close(); err != nil {
return res, fmt.Errorf("fetch meta %q: %w", srcFolder, err)
}
// Pass 2: fetch bodies for the queued (new) messages, one at a time.
for _, q := range todo {
if err := ctx.Err(); err != nil {
return res, err
}
if err := streamOne(src, dst, dstFolder, q.uid, q.flags, q.internalDate); err != nil {
res.Errors++ res.Errors++
continue continue
} }
if err := deps.MarkMigrated(dstFolder, key); err != nil { if err := deps.MarkMigrated(dstFolder, q.key); err != nil {
res.Errors++ res.Errors++
continue continue
} }
+13 -1
View File
@@ -301,7 +301,7 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
var baseCopied, baseSkipped int64 var baseCopied, baseSkipped int64
var curFolder string var curFolder string
var curTotal int64 var curTotal int64
var lastEmit time.Time var lastEmit, lastScanEmit time.Time
deps := imapx.CopyDeps{ deps := imapx.CopyDeps{
IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) }, IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) },
MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) }, MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) },
@@ -331,6 +331,18 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
"folder": srcFolder, "dst_folder": dstFolder, "messages": total, "folder": srcFolder, "dst_folder": dstFolder, "messages": total,
}}) }})
}, },
// Fires while streaming metadata (dedup scan) so the UI shows movement
// before bodies start copying. Throttled to ~4/sec, always emit the last.
OnScan: func(scanned, total int64) {
now := time.Now()
if now.Sub(lastScanEmit) < 250*time.Millisecond && scanned < total {
return
}
lastScanEmit = now
o.hub.Publish(wshub.Event{Type: "scan", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID, "folder": curFolder, "scanned": scanned, "folder_total": total,
}})
},
} }
for _, fp := range plan { for _, fp := range plan {
if actx.Err() != nil { if actx.Err() != nil {
+4
View File
@@ -282,6 +282,10 @@
letter-spacing: 0.04em; letter-spacing: 0.04em;
} }
.pscan {
color: var(--info);
}
/* clear-log button: mirrors the .panel-label tab on the right edge */ /* clear-log button: mirrors the .panel-label tab on the right edge */
.log-clear { .log-clear {
position: absolute; position: absolute;
+19 -1
View File
@@ -15,6 +15,9 @@ type LiveProgress = {
startTs: number startTs: number
startCount: number startCount: number
speed: number // messages/sec, averaged since the account's run started speed: number // messages/sec, averaged since the account's run started
scanFolder?: string
scanned?: number
scanTotal?: number
} }
function fmtDuration(sec: number): string { function fmtDuration(sec: number): string {
@@ -89,11 +92,20 @@ export function TaskDetail({ id }: { id: number }) {
useEffect( useEffect(
() => () =>
connectTaskWS(id, (ev: TaskEvent) => { connectTaskWS(id, (ev: TaskEvent) => {
// `scan` is high-frequency and shown in the progress cell, not the log.
if (ev.type !== 'scan') {
setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300)) setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300))
}
const d = (ev.data ?? {}) as Record<string, number | string | undefined> const d = (ev.data ?? {}) as Record<string, number | string | undefined>
const accId = typeof d.account_id === 'number' ? d.account_id : undefined const accId = typeof d.account_id === 'number' ? d.account_id : undefined
if (ev.type === 'plan' && accId != null) { if (ev.type === 'scan' && accId != null) {
setLive((prev) => {
const cur = prev[accId]
const base: LiveProgress = cur ?? { copied: 0, skipped: 0, total: 0, startTs: Date.now(), startCount: 0, speed: 0 }
return { ...prev, [accId]: { ...base, scanFolder: d.folder as string | undefined, scanned: Number(d.scanned ?? 0), scanTotal: Number(d.folder_total ?? 0) } }
})
} else if (ev.type === 'plan' && accId != null) {
const total = Number(d.total ?? 0) const total = Number(d.total ?? 0)
const now = Date.now() const now = Date.now()
setLive((prev) => ({ setLive((prev) => ({
@@ -453,6 +465,7 @@ export function TaskDetail({ id }: { id: number }) {
const done = lv.copied + lv.skipped const done = lv.copied + lv.skipped
const pct = Math.min(100, Math.floor((done / lv.total) * 100)) const pct = Math.min(100, Math.floor((done / lv.total) * 100))
const eta = lv.speed > 0 ? (lv.total - done) / lv.speed : Infinity const eta = lv.speed > 0 ? (lv.total - done) / lv.speed : Infinity
const scanning = lv.scanned != null && lv.scanTotal != null && lv.scanned < lv.scanTotal
return ( return (
<div className="acct-progress"> <div className="acct-progress">
<div className="pbar"> <div className="pbar">
@@ -462,6 +475,11 @@ export function TaskDetail({ id }: { id: number }) {
{done}/{lv.total} ({pct}%) · {lv.speed >= 1 ? Math.round(lv.speed) : lv.speed.toFixed(1)}/s · ETA {fmtDuration(eta)} {done}/{lv.total} ({pct}%) · {lv.speed >= 1 ? Math.round(lv.speed) : lv.speed.toFixed(1)}/s · ETA {fmtDuration(eta)}
{lv.folder ? ` · ${lv.folder}` : ''} {lv.folder ? ` · ${lv.folder}` : ''}
</span> </span>
{scanning && (
<span className="pmeta pscan mono-num">
scanning {lv.scanFolder}: {lv.scanned}/{lv.scanTotal}
</span>
)}
</div> </div>
) )
})()} })()}