feat: stream the metadata scan instead of Collect
CopyFolder now streams envelope metadata via Next() in a first pass (dedup + queue new messages), then streams bodies for new ones in a second pass — no more blocking Collect of the whole folder with zero feedback, and memory stays flat (only new-message meta is held). - imapx: two-pass streaming CopyFolder + CopyDeps.OnScan(scanned,total) - orchestrator: throttled 'scan' events during the metadata pass - web: per-account 'scanning folder: X/N' line under the progress bar; scan events kept out of the log to avoid flooding Verified on greenmail: idempotency and internal-date preservation still hold. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
This commit is contained in:
+52
-19
@@ -23,6 +23,10 @@ type CopyDeps struct {
|
|||||||
// (potentially long) envelope fetch, with the message count in the source
|
// (potentially long) envelope fetch, with the message count in the source
|
||||||
// folder — for progress visibility.
|
// folder — for progress visibility.
|
||||||
OnFolder func(srcFolder, dstFolder string, total int64)
|
OnFolder func(srcFolder, dstFolder string, total int64)
|
||||||
|
// OnScan is called during the streaming metadata pass with how many of the
|
||||||
|
// folder's messages have been examined so far — so the UI shows movement
|
||||||
|
// while dedup decisions are made, before bodies start copying.
|
||||||
|
OnScan func(scanned, total int64)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CopyResult summarizes the outcome of one CopyFolder run.
|
// CopyResult summarizes the outcome of one CopyFolder run.
|
||||||
@@ -46,48 +50,77 @@ func CopyFolder(ctx context.Context, src, dst *imapclient.Client, srcFolder, dst
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("examine src %q: %w", srcFolder, err)
|
return res, fmt.Errorf("examine src %q: %w", srcFolder, err)
|
||||||
}
|
}
|
||||||
|
total := int64(sel.NumMessages)
|
||||||
if deps.OnFolder != nil {
|
if deps.OnFolder != nil {
|
||||||
deps.OnFolder(srcFolder, dstFolder, int64(sel.NumMessages))
|
deps.OnFolder(srcFolder, dstFolder, total)
|
||||||
}
|
}
|
||||||
if sel.NumMessages == 0 {
|
if total == 0 {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1) Collect envelope+uid+size for every message (cheap pass, no bodies).
|
|
||||||
metaSet := imap.SeqSet{imap.SeqRange{Start: 1, Stop: sel.NumMessages}}
|
|
||||||
metas, err := src.Fetch(metaSet, &imap.FetchOptions{
|
|
||||||
UID: true, Envelope: true, RFC822Size: true, Flags: true, InternalDate: true,
|
|
||||||
}).Collect()
|
|
||||||
if err != nil {
|
|
||||||
return res, fmt.Errorf("fetch meta: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// dst folder must exist (idempotent create; ignore "already exists").
|
// dst folder must exist (idempotent create; ignore "already exists").
|
||||||
_ = dst.Create(dstFolder, nil).Wait()
|
_ = dst.Create(dstFolder, nil).Wait()
|
||||||
|
|
||||||
for _, m := range metas {
|
// Pass 1: STREAM metadata (no bodies) via Next(), dedup as we go, and queue
|
||||||
|
// only the new messages. Streaming (not Collect) means progress shows during
|
||||||
|
// the scan and memory stays flat — we hold small meta for new messages only.
|
||||||
|
type queued struct {
|
||||||
|
uid imap.UID
|
||||||
|
key string
|
||||||
|
flags []imap.Flag
|
||||||
|
internalDate time.Time
|
||||||
|
}
|
||||||
|
var todo []queued
|
||||||
|
metaSet := imap.SeqSet{imap.SeqRange{Start: 1, Stop: sel.NumMessages}}
|
||||||
|
fc := src.Fetch(metaSet, &imap.FetchOptions{
|
||||||
|
UID: true, Envelope: true, RFC822Size: true, Flags: true, InternalDate: true,
|
||||||
|
})
|
||||||
|
var scanned int64
|
||||||
|
for {
|
||||||
if err := ctx.Err(); err != nil {
|
if err := ctx.Err(); err != nil {
|
||||||
|
_ = fc.Close()
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
msg := fc.Next()
|
||||||
key := MessageKey(m.Envelope, m.RFC822Size)
|
if msg == nil {
|
||||||
already, err := deps.IsMigrated(key)
|
break
|
||||||
|
}
|
||||||
|
buf, err := msg.Collect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
res.Errors++
|
res.Errors++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if already {
|
scanned++
|
||||||
|
key := MessageKey(buf.Envelope, buf.RFC822Size)
|
||||||
|
already, err := deps.IsMigrated(key)
|
||||||
|
if err != nil {
|
||||||
|
res.Errors++
|
||||||
|
} else if already {
|
||||||
res.Skipped++
|
res.Skipped++
|
||||||
if deps.OnProgress != nil {
|
if deps.OnProgress != nil {
|
||||||
deps.OnProgress(res.Copied, res.Skipped)
|
deps.OnProgress(res.Copied, res.Skipped)
|
||||||
}
|
}
|
||||||
continue
|
} else {
|
||||||
|
todo = append(todo, queued{uid: buf.UID, key: key, flags: buf.Flags, internalDate: buf.InternalDate})
|
||||||
}
|
}
|
||||||
if err := streamOne(src, dst, dstFolder, m.UID, m.Flags, m.InternalDate); err != nil {
|
if deps.OnScan != nil {
|
||||||
|
deps.OnScan(scanned, total)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := fc.Close(); err != nil {
|
||||||
|
return res, fmt.Errorf("fetch meta %q: %w", srcFolder, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pass 2: fetch bodies for the queued (new) messages, one at a time.
|
||||||
|
for _, q := range todo {
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
if err := streamOne(src, dst, dstFolder, q.uid, q.flags, q.internalDate); err != nil {
|
||||||
res.Errors++
|
res.Errors++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := deps.MarkMigrated(dstFolder, key); err != nil {
|
if err := deps.MarkMigrated(dstFolder, q.key); err != nil {
|
||||||
res.Errors++
|
res.Errors++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -301,7 +301,7 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
var baseCopied, baseSkipped int64
|
var baseCopied, baseSkipped int64
|
||||||
var curFolder string
|
var curFolder string
|
||||||
var curTotal int64
|
var curTotal int64
|
||||||
var lastEmit time.Time
|
var lastEmit, lastScanEmit time.Time
|
||||||
deps := imapx.CopyDeps{
|
deps := imapx.CopyDeps{
|
||||||
IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) },
|
IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) },
|
||||||
MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) },
|
MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) },
|
||||||
@@ -331,6 +331,18 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
"folder": srcFolder, "dst_folder": dstFolder, "messages": total,
|
"folder": srcFolder, "dst_folder": dstFolder, "messages": total,
|
||||||
}})
|
}})
|
||||||
},
|
},
|
||||||
|
// Fires while streaming metadata (dedup scan) so the UI shows movement
|
||||||
|
// before bodies start copying. Throttled to ~4/sec, always emit the last.
|
||||||
|
OnScan: func(scanned, total int64) {
|
||||||
|
now := time.Now()
|
||||||
|
if now.Sub(lastScanEmit) < 250*time.Millisecond && scanned < total {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lastScanEmit = now
|
||||||
|
o.hub.Publish(wshub.Event{Type: "scan", TaskID: task.ID, Data: map[string]any{
|
||||||
|
"account_id": a.ID, "folder": curFolder, "scanned": scanned, "folder_total": total,
|
||||||
|
}})
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, fp := range plan {
|
for _, fp := range plan {
|
||||||
if actx.Err() != nil {
|
if actx.Err() != nil {
|
||||||
|
|||||||
@@ -282,6 +282,10 @@
|
|||||||
letter-spacing: 0.04em;
|
letter-spacing: 0.04em;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
.pscan {
|
||||||
|
color: var(--info);
|
||||||
|
}
|
||||||
|
|
||||||
/* clear-log button: mirrors the .panel-label tab on the right edge */
|
/* clear-log button: mirrors the .panel-label tab on the right edge */
|
||||||
.log-clear {
|
.log-clear {
|
||||||
position: absolute;
|
position: absolute;
|
||||||
|
|||||||
@@ -15,6 +15,9 @@ type LiveProgress = {
|
|||||||
startTs: number
|
startTs: number
|
||||||
startCount: number
|
startCount: number
|
||||||
speed: number // messages/sec, averaged since the account's run started
|
speed: number // messages/sec, averaged since the account's run started
|
||||||
|
scanFolder?: string
|
||||||
|
scanned?: number
|
||||||
|
scanTotal?: number
|
||||||
}
|
}
|
||||||
|
|
||||||
function fmtDuration(sec: number): string {
|
function fmtDuration(sec: number): string {
|
||||||
@@ -89,11 +92,20 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
useEffect(
|
useEffect(
|
||||||
() =>
|
() =>
|
||||||
connectTaskWS(id, (ev: TaskEvent) => {
|
connectTaskWS(id, (ev: TaskEvent) => {
|
||||||
|
// `scan` is high-frequency and shown in the progress cell, not the log.
|
||||||
|
if (ev.type !== 'scan') {
|
||||||
setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300))
|
setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300))
|
||||||
|
}
|
||||||
const d = (ev.data ?? {}) as Record<string, number | string | undefined>
|
const d = (ev.data ?? {}) as Record<string, number | string | undefined>
|
||||||
const accId = typeof d.account_id === 'number' ? d.account_id : undefined
|
const accId = typeof d.account_id === 'number' ? d.account_id : undefined
|
||||||
|
|
||||||
if (ev.type === 'plan' && accId != null) {
|
if (ev.type === 'scan' && accId != null) {
|
||||||
|
setLive((prev) => {
|
||||||
|
const cur = prev[accId]
|
||||||
|
const base: LiveProgress = cur ?? { copied: 0, skipped: 0, total: 0, startTs: Date.now(), startCount: 0, speed: 0 }
|
||||||
|
return { ...prev, [accId]: { ...base, scanFolder: d.folder as string | undefined, scanned: Number(d.scanned ?? 0), scanTotal: Number(d.folder_total ?? 0) } }
|
||||||
|
})
|
||||||
|
} else if (ev.type === 'plan' && accId != null) {
|
||||||
const total = Number(d.total ?? 0)
|
const total = Number(d.total ?? 0)
|
||||||
const now = Date.now()
|
const now = Date.now()
|
||||||
setLive((prev) => ({
|
setLive((prev) => ({
|
||||||
@@ -453,6 +465,7 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
const done = lv.copied + lv.skipped
|
const done = lv.copied + lv.skipped
|
||||||
const pct = Math.min(100, Math.floor((done / lv.total) * 100))
|
const pct = Math.min(100, Math.floor((done / lv.total) * 100))
|
||||||
const eta = lv.speed > 0 ? (lv.total - done) / lv.speed : Infinity
|
const eta = lv.speed > 0 ? (lv.total - done) / lv.speed : Infinity
|
||||||
|
const scanning = lv.scanned != null && lv.scanTotal != null && lv.scanned < lv.scanTotal
|
||||||
return (
|
return (
|
||||||
<div className="acct-progress">
|
<div className="acct-progress">
|
||||||
<div className="pbar">
|
<div className="pbar">
|
||||||
@@ -462,6 +475,11 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
{done}/{lv.total} ({pct}%) · {lv.speed >= 1 ? Math.round(lv.speed) : lv.speed.toFixed(1)}/s · ETA {fmtDuration(eta)}
|
{done}/{lv.total} ({pct}%) · {lv.speed >= 1 ? Math.round(lv.speed) : lv.speed.toFixed(1)}/s · ETA {fmtDuration(eta)}
|
||||||
{lv.folder ? ` · ${lv.folder}` : ''}
|
{lv.folder ? ` · ${lv.folder}` : ''}
|
||||||
</span>
|
</span>
|
||||||
|
{scanning && (
|
||||||
|
<span className="pmeta pscan mono-num">
|
||||||
|
scanning {lv.scanFolder}: {lv.scanned}/{lv.scanTotal}
|
||||||
|
</span>
|
||||||
|
)}
|
||||||
</div>
|
</div>
|
||||||
)
|
)
|
||||||
})()}
|
})()}
|
||||||
|
|||||||
Reference in New Issue
Block a user