feat: planning pass counts all folders up front for account-wide progress
Before copying, EXAMINE every folder to sum the account's total message count and emit a 'plan' event; progress events now carry account_total so the UI shows a real overall bar, percent and ETA (not just per-folder). - imapx.FolderMessageCount: read-only count of a folder - orchestrator: plan pass + grandTotal, plan event, account_total in progress - web: live progress keyed on account total; PLAN log line; overall bar/ETA Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
This commit is contained in:
@@ -3,9 +3,21 @@ package imapx
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/emersion/go-imap/v2"
|
||||||
"github.com/emersion/go-imap/v2/imapclient"
|
"github.com/emersion/go-imap/v2/imapclient"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// FolderMessageCount opens a folder read-only (EXAMINE) and returns how many
|
||||||
|
// messages it holds — used to plan an accurate overall progress total before
|
||||||
|
// copying begins. It does not fetch any message bodies.
|
||||||
|
func FolderMessageCount(c *imapclient.Client, folder string) (int64, error) {
|
||||||
|
sel, err := c.Select(folder, &imap.SelectOptions{ReadOnly: true}).Wait()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return int64(sel.NumMessages), nil
|
||||||
|
}
|
||||||
|
|
||||||
// ListFolders returns the mailbox names visible on an already-connected, logged-in client.
|
// ListFolders returns the mailbox names visible on an already-connected, logged-in client.
|
||||||
func ListFolders(c *imapclient.Client) ([]string, error) {
|
func ListFolders(c *imapclient.Client) ([]string, error) {
|
||||||
mboxes, err := c.List("", "*", nil).Collect()
|
mboxes, err := c.List("", "*", nil).Collect()
|
||||||
|
|||||||
@@ -267,6 +267,33 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Planning pass: EXAMINE every folder up front to learn the total message
|
||||||
|
// count, so the UI can show an accurate overall bar / ETA before copying.
|
||||||
|
type folderPlan struct {
|
||||||
|
src, dst string
|
||||||
|
total int64
|
||||||
|
}
|
||||||
|
plan := make([]folderPlan, 0, len(folders))
|
||||||
|
var grandTotal int64
|
||||||
|
for _, folder := range folders {
|
||||||
|
if actx.Err() != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
df := folder
|
||||||
|
if m, ok := task.FolderMapping[folder]; ok {
|
||||||
|
df = m
|
||||||
|
}
|
||||||
|
n, cerr := imapx.FolderMessageCount(src, folder)
|
||||||
|
if cerr != nil && actx.Err() == nil {
|
||||||
|
slog.Warn("count folder failed", "account", a.ID, "folder", folder, "err", cerr)
|
||||||
|
}
|
||||||
|
plan = append(plan, folderPlan{src: folder, dst: df, total: n})
|
||||||
|
grandTotal += n
|
||||||
|
}
|
||||||
|
o.hub.Publish(wshub.Event{Type: "plan", TaskID: task.ID, Data: map[string]any{
|
||||||
|
"account_id": a.ID, "src_login": a.SrcLogin, "folders": len(plan), "total": grandTotal,
|
||||||
|
}})
|
||||||
|
|
||||||
var copied, skipped, errs int64
|
var copied, skipped, errs int64
|
||||||
// Account-level live progress state (all callbacks run on this goroutine,
|
// Account-level live progress state (all callbacks run on this goroutine,
|
||||||
// so plain vars are race-free). base* = totals from completed folders;
|
// so plain vars are race-free). base* = totals from completed folders;
|
||||||
@@ -287,12 +314,13 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
}
|
}
|
||||||
lastEmit = now
|
lastEmit = now
|
||||||
o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, Data: map[string]any{
|
o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, Data: map[string]any{
|
||||||
"account_id": a.ID,
|
"account_id": a.ID,
|
||||||
"copied": baseCopied + int64(c),
|
"copied": baseCopied + int64(c),
|
||||||
"skipped": baseSkipped + int64(s),
|
"skipped": baseSkipped + int64(s),
|
||||||
"folder": curFolder,
|
"folder": curFolder,
|
||||||
"folder_done": done,
|
"folder_done": done,
|
||||||
"folder_total": curTotal,
|
"folder_total": curTotal,
|
||||||
|
"account_total": grandTotal,
|
||||||
}})
|
}})
|
||||||
},
|
},
|
||||||
// Fires after EXAMINE (before the long fetch) with the folder's message count.
|
// Fires after EXAMINE (before the long fetch) with the folder's message count.
|
||||||
@@ -304,20 +332,16 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
}})
|
}})
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, folder := range folders {
|
for _, fp := range plan {
|
||||||
if actx.Err() != nil {
|
if actx.Err() != nil {
|
||||||
break // cancelled — stop scheduling more folders
|
break // cancelled — stop scheduling more folders
|
||||||
}
|
}
|
||||||
dstFolder := folder
|
res, err := imapx.CopyFolder(actx, src, dst, fp.src, fp.dst, deps)
|
||||||
if m, ok := task.FolderMapping[folder]; ok {
|
|
||||||
dstFolder = m
|
|
||||||
}
|
|
||||||
res, err := imapx.CopyFolder(actx, src, dst, folder, dstFolder, deps)
|
|
||||||
if err != nil && actx.Err() == nil {
|
if err != nil && actx.Err() == nil {
|
||||||
slog.Warn("folder copy error", "account", a.ID, "src_login", a.SrcLogin, "folder", folder, "err", err)
|
slog.Warn("folder copy error", "account", a.ID, "src_login", a.SrcLogin, "folder", fp.src, "err", err)
|
||||||
errs++
|
errs++
|
||||||
o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, Data: map[string]any{
|
o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, Data: map[string]any{
|
||||||
"account_id": a.ID, "src_login": a.SrcLogin, "folder": folder, "error": err.Error(),
|
"account_id": a.ID, "src_login": a.SrcLogin, "folder": fp.src, "error": err.Error(),
|
||||||
}})
|
}})
|
||||||
}
|
}
|
||||||
copied += int64(res.Copied)
|
copied += int64(res.Copied)
|
||||||
|
|||||||
@@ -10,9 +10,8 @@ const emptyAccount = { src_login: '', src_pass: '', dst_login: '', dst_pass: ''
|
|||||||
type LiveProgress = {
|
type LiveProgress = {
|
||||||
copied: number
|
copied: number
|
||||||
skipped: number
|
skipped: number
|
||||||
|
total: number // account-wide message total from the planning pass (0 if unknown)
|
||||||
folder?: string
|
folder?: string
|
||||||
folderDone: number
|
|
||||||
folderTotal: number
|
|
||||||
startTs: number
|
startTs: number
|
||||||
startCount: number
|
startCount: number
|
||||||
speed: number // messages/sec, averaged since the account's run started
|
speed: number // messages/sec, averaged since the account's run started
|
||||||
@@ -37,6 +36,8 @@ function describeEvent(ev: TaskEvent): string {
|
|||||||
}
|
}
|
||||||
case 'account_started':
|
case 'account_started':
|
||||||
return `START #${d.account_id}: ${d.src_login}@${d.src_host}:${d.src_port} → ${d.dst_login}@${d.dst_host}:${d.dst_port}`
|
return `START #${d.account_id}: ${d.src_login}@${d.src_host}:${d.src_port} → ${d.dst_login}@${d.dst_host}:${d.dst_port}`
|
||||||
|
case 'plan':
|
||||||
|
return `PLAN #${d.account_id} (${d.src_login}): ${d.folders} folders, ${d.total} messages total`
|
||||||
case 'account_done':
|
case 'account_done':
|
||||||
return `DONE #${d.account_id} (${d.src_login} → ${d.dst_login}): copied ${d.copied}, skipped ${d.skipped}, errors ${d.errors}`
|
return `DONE #${d.account_id} (${d.src_login} → ${d.dst_login}): copied ${d.copied}, skipped ${d.skipped}, errors ${d.errors}`
|
||||||
case 'progress': {
|
case 'progress': {
|
||||||
@@ -92,7 +93,22 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
const d = (ev.data ?? {}) as Record<string, number | string | undefined>
|
const d = (ev.data ?? {}) as Record<string, number | string | undefined>
|
||||||
const accId = typeof d.account_id === 'number' ? d.account_id : undefined
|
const accId = typeof d.account_id === 'number' ? d.account_id : undefined
|
||||||
|
|
||||||
if (ev.type === 'progress' && accId != null) {
|
if (ev.type === 'plan' && accId != null) {
|
||||||
|
const total = Number(d.total ?? 0)
|
||||||
|
const now = Date.now()
|
||||||
|
setLive((prev) => ({
|
||||||
|
...prev,
|
||||||
|
[accId]: {
|
||||||
|
copied: prev[accId]?.copied ?? 0,
|
||||||
|
skipped: prev[accId]?.skipped ?? 0,
|
||||||
|
total,
|
||||||
|
folder: prev[accId]?.folder,
|
||||||
|
startTs: prev[accId]?.startTs ?? now,
|
||||||
|
startCount: prev[accId]?.startCount ?? 0,
|
||||||
|
speed: prev[accId]?.speed ?? 0,
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
} else if (ev.type === 'progress' && accId != null) {
|
||||||
const now = Date.now()
|
const now = Date.now()
|
||||||
const copied = Number(d.copied ?? 0)
|
const copied = Number(d.copied ?? 0)
|
||||||
const skipped = Number(d.skipped ?? 0)
|
const skipped = Number(d.skipped ?? 0)
|
||||||
@@ -108,9 +124,8 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
[accId]: {
|
[accId]: {
|
||||||
copied,
|
copied,
|
||||||
skipped,
|
skipped,
|
||||||
|
total: Number(d.account_total ?? cur?.total ?? 0),
|
||||||
folder: d.folder as string | undefined,
|
folder: d.folder as string | undefined,
|
||||||
folderDone: Number(d.folder_done ?? 0),
|
|
||||||
folderTotal: Number(d.folder_total ?? 0),
|
|
||||||
startTs,
|
startTs,
|
||||||
startCount,
|
startCount,
|
||||||
speed,
|
speed,
|
||||||
@@ -128,7 +143,7 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Structural events refresh the persisted view; `progress` is covered by live state.
|
// Structural events refresh the persisted view; `progress` is covered by live state.
|
||||||
if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled'].includes(ev.type)) {
|
if (['account_started', 'account_test', 'account_done', 'run_started', 'run_done', 'error', 'folder', 'cancelled', 'plan'].includes(ev.type)) {
|
||||||
reload()
|
reload()
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
@@ -434,17 +449,18 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
<td className="progress-cell">
|
<td className="progress-cell">
|
||||||
{(() => {
|
{(() => {
|
||||||
const lv = live[a.id]
|
const lv = live[a.id]
|
||||||
if (!lv || !lv.folderTotal) return <span className="muted-note">—</span>
|
if (!lv || !lv.total) return <span className="muted-note">—</span>
|
||||||
const pct = Math.min(100, Math.floor((lv.folderDone / lv.folderTotal) * 100))
|
const done = lv.copied + lv.skipped
|
||||||
const eta = lv.speed > 0 ? (lv.folderTotal - lv.folderDone) / lv.speed : Infinity
|
const pct = Math.min(100, Math.floor((done / lv.total) * 100))
|
||||||
|
const eta = lv.speed > 0 ? (lv.total - done) / lv.speed : Infinity
|
||||||
return (
|
return (
|
||||||
<div className="acct-progress">
|
<div className="acct-progress">
|
||||||
<div className="pbar">
|
<div className="pbar">
|
||||||
<span className="pbar-fill" style={{ width: `${pct}%` }} />
|
<span className="pbar-fill" style={{ width: `${pct}%` }} />
|
||||||
</div>
|
</div>
|
||||||
<span className="pmeta mono-num">
|
<span className="pmeta mono-num">
|
||||||
{lv.folder ? `${lv.folder} · ` : ''}
|
{done}/{lv.total} ({pct}%) · {lv.speed >= 1 ? Math.round(lv.speed) : lv.speed.toFixed(1)}/s · ETA {fmtDuration(eta)}
|
||||||
{pct}% · {lv.speed >= 1 ? Math.round(lv.speed) : lv.speed.toFixed(1)}/s · ETA {fmtDuration(eta)}
|
{lv.folder ? ` · ${lv.folder}` : ''}
|
||||||
</span>
|
</span>
|
||||||
</div>
|
</div>
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user