a2234077df
CopyFolder now streams envelope metadata via Next() in a first pass (dedup + queue new messages), then streams bodies for new ones in a second pass — no more blocking Collect of the whole folder with zero feedback, and memory stays flat (only new-message meta is held). - imapx: two-pass streaming CopyFolder + CopyDeps.OnScan(scanned,total) - orchestrator: throttled 'scan' events during the metadata pass - web: per-account 'scanning folder: X/N' line under the progress bar; scan events kept out of the log to avoid flooding Verified on greenmail: idempotency and internal-date preservation still hold. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
194 lines
5.6 KiB
Go
194 lines
5.6 KiB
Go
package imapx
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/emersion/go-imap/v2"
|
|
"github.com/emersion/go-imap/v2/imapclient"
|
|
)
|
|
|
|
// CopyDeps injects the dedup/progress hooks used by CopyFolder. APPEND to
|
|
// dst always happens before MarkMigrated is called, so a crash between the
|
|
// two only ever causes a message to be re-copied (never lost) on the next
|
|
// run.
|
|
type CopyDeps struct {
|
|
IsMigrated func(key string) (bool, error)
|
|
MarkMigrated func(folder, key string) error
|
|
OnProgress func(copied, skipped int)
|
|
// OnFolder is called once per folder right after EXAMINE, before the
|
|
// (potentially long) envelope fetch, with the message count in the source
|
|
// folder — for progress visibility.
|
|
OnFolder func(srcFolder, dstFolder string, total int64)
|
|
// OnScan is called during the streaming metadata pass with how many of the
|
|
// folder's messages have been examined so far — so the UI shows movement
|
|
// while dedup decisions are made, before bodies start copying.
|
|
OnScan func(scanned, total int64)
|
|
}
|
|
|
|
// CopyResult summarizes the outcome of one CopyFolder run.
|
|
type CopyResult struct {
|
|
Copied int
|
|
Skipped int
|
|
Errors int
|
|
}
|
|
|
|
// CopyFolder streams messages from srcFolder on src to dstFolder on dst.
|
|
//
|
|
// The source folder is opened read-only (EXAMINE) and is never mutated:
|
|
// no \Deleted flags are set and no EXPUNGE is issued. Each message body is
|
|
// held in memory only for the duration of a single FETCH->APPEND and is
|
|
// never written to disk. Messages already migrated (per deps.IsMigrated)
|
|
// are skipped without re-fetching their bodies.
|
|
func CopyFolder(ctx context.Context, src, dst *imapclient.Client, srcFolder, dstFolder string, deps CopyDeps) (CopyResult, error) {
|
|
var res CopyResult
|
|
|
|
sel, err := src.Select(srcFolder, &imap.SelectOptions{ReadOnly: true}).Wait()
|
|
if err != nil {
|
|
return res, fmt.Errorf("examine src %q: %w", srcFolder, err)
|
|
}
|
|
total := int64(sel.NumMessages)
|
|
if deps.OnFolder != nil {
|
|
deps.OnFolder(srcFolder, dstFolder, total)
|
|
}
|
|
if total == 0 {
|
|
return res, nil
|
|
}
|
|
|
|
// dst folder must exist (idempotent create; ignore "already exists").
|
|
_ = dst.Create(dstFolder, nil).Wait()
|
|
|
|
// Pass 1: STREAM metadata (no bodies) via Next(), dedup as we go, and queue
|
|
// only the new messages. Streaming (not Collect) means progress shows during
|
|
// the scan and memory stays flat — we hold small meta for new messages only.
|
|
type queued struct {
|
|
uid imap.UID
|
|
key string
|
|
flags []imap.Flag
|
|
internalDate time.Time
|
|
}
|
|
var todo []queued
|
|
metaSet := imap.SeqSet{imap.SeqRange{Start: 1, Stop: sel.NumMessages}}
|
|
fc := src.Fetch(metaSet, &imap.FetchOptions{
|
|
UID: true, Envelope: true, RFC822Size: true, Flags: true, InternalDate: true,
|
|
})
|
|
var scanned int64
|
|
for {
|
|
if err := ctx.Err(); err != nil {
|
|
_ = fc.Close()
|
|
return res, err
|
|
}
|
|
msg := fc.Next()
|
|
if msg == nil {
|
|
break
|
|
}
|
|
buf, err := msg.Collect()
|
|
if err != nil {
|
|
res.Errors++
|
|
continue
|
|
}
|
|
scanned++
|
|
key := MessageKey(buf.Envelope, buf.RFC822Size)
|
|
already, err := deps.IsMigrated(key)
|
|
if err != nil {
|
|
res.Errors++
|
|
} else if already {
|
|
res.Skipped++
|
|
if deps.OnProgress != nil {
|
|
deps.OnProgress(res.Copied, res.Skipped)
|
|
}
|
|
} else {
|
|
todo = append(todo, queued{uid: buf.UID, key: key, flags: buf.Flags, internalDate: buf.InternalDate})
|
|
}
|
|
if deps.OnScan != nil {
|
|
deps.OnScan(scanned, total)
|
|
}
|
|
}
|
|
if err := fc.Close(); err != nil {
|
|
return res, fmt.Errorf("fetch meta %q: %w", srcFolder, err)
|
|
}
|
|
|
|
// Pass 2: fetch bodies for the queued (new) messages, one at a time.
|
|
for _, q := range todo {
|
|
if err := ctx.Err(); err != nil {
|
|
return res, err
|
|
}
|
|
if err := streamOne(src, dst, dstFolder, q.uid, q.flags, q.internalDate); err != nil {
|
|
res.Errors++
|
|
continue
|
|
}
|
|
if err := deps.MarkMigrated(dstFolder, q.key); err != nil {
|
|
res.Errors++
|
|
continue
|
|
}
|
|
res.Copied++
|
|
if deps.OnProgress != nil {
|
|
deps.OnProgress(res.Copied, res.Skipped)
|
|
}
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// streamOne FETCHes BODY[] for one message and APPENDs it into dst without
|
|
// spooling to disk. The body is buffered in RAM only for the duration of
|
|
// this single FETCH->APPEND round trip.
|
|
func streamOne(src, dst *imapclient.Client, dstFolder string, uid imap.UID, flags []imap.Flag, internalDate time.Time) error {
|
|
bodySection := &imap.FetchItemBodySection{}
|
|
fetchCmd := src.Fetch(imap.UIDSetNum(uid), &imap.FetchOptions{
|
|
BodySection: []*imap.FetchItemBodySection{bodySection},
|
|
})
|
|
defer fetchCmd.Close()
|
|
|
|
msg := fetchCmd.Next()
|
|
if msg == nil {
|
|
return fmt.Errorf("no message for uid %v", uid)
|
|
}
|
|
var body []byte
|
|
for {
|
|
item := msg.Next()
|
|
if item == nil {
|
|
break
|
|
}
|
|
if d, ok := item.(imapclient.FetchItemDataBodySection); ok {
|
|
b, err := io.ReadAll(d.Literal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
body = b
|
|
}
|
|
}
|
|
if err := fetchCmd.Close(); err != nil {
|
|
return err
|
|
}
|
|
if body == nil {
|
|
return fmt.Errorf("empty body uid %v", uid)
|
|
}
|
|
|
|
appendCmd := dst.Append(dstFolder, int64(len(body)), &imap.AppendOptions{Flags: keepFlags(flags), Time: internalDate})
|
|
if _, err := io.Copy(appendCmd, bytes.NewReader(body)); err != nil {
|
|
return err
|
|
}
|
|
if err := appendCmd.Close(); err != nil {
|
|
return err
|
|
}
|
|
_, err := appendCmd.Wait()
|
|
return err
|
|
}
|
|
|
|
// keepFlags drops \Recent: it cannot be set via APPEND. go-imap v2 beta.8
|
|
// no longer defines an imap.FlagRecent constant (RFC 9051 dropped \Recent
|
|
// from IMAP4rev2), so match it by its literal wire form instead.
|
|
func keepFlags(flags []imap.Flag) []imap.Flag {
|
|
out := make([]imap.Flag, 0, len(flags))
|
|
for _, f := range flags {
|
|
if f == "\\Recent" {
|
|
continue
|
|
}
|
|
out = append(out, f)
|
|
}
|
|
return out
|
|
}
|