package imapx import ( "bytes" "context" "fmt" "io" "time" "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapclient" ) // CopyDeps injects the dedup/progress hooks used by CopyFolder. APPEND to // dst always happens before MarkMigrated is called, so a crash between the // two only ever causes a message to be re-copied (never lost) on the next // run. type CopyDeps struct { IsMigrated func(key string) (bool, error) MarkMigrated func(folder, key string) error OnProgress func(copied, skipped int) // OnFolder is called once per folder right after EXAMINE, before the // (potentially long) envelope fetch, with the message count in the source // folder — for progress visibility. OnFolder func(srcFolder, dstFolder string, total int64) // OnScan is called during the streaming metadata pass with how many of the // folder's messages have been examined so far — so the UI shows movement // while dedup decisions are made, before bodies start copying. OnScan func(scanned, total int64) } // CopyResult summarizes the outcome of one CopyFolder run. type CopyResult struct { Copied int Skipped int Errors int } // CopyFolder streams messages from srcFolder on src to dstFolder on dst. // // The source folder is opened read-only (EXAMINE) and is never mutated: // no \Deleted flags are set and no EXPUNGE is issued. Each message body is // held in memory only for the duration of a single FETCH->APPEND and is // never written to disk. Messages already migrated (per deps.IsMigrated) // are skipped without re-fetching their bodies. func CopyFolder(ctx context.Context, src, dst *imapclient.Client, srcFolder, dstFolder string, deps CopyDeps) (CopyResult, error) { var res CopyResult sel, err := src.Select(srcFolder, &imap.SelectOptions{ReadOnly: true}).Wait() if err != nil { return res, fmt.Errorf("examine src %q: %w", srcFolder, err) } total := int64(sel.NumMessages) if deps.OnFolder != nil { deps.OnFolder(srcFolder, dstFolder, total) } if total == 0 { return res, nil } // dst folder must exist (idempotent create; ignore "already exists"). _ = dst.Create(dstFolder, nil).Wait() // Pass 1: STREAM metadata (no bodies) via Next(), dedup as we go, and queue // only the new messages. Streaming (not Collect) means progress shows during // the scan and memory stays flat — we hold small meta for new messages only. type queued struct { uid imap.UID key string flags []imap.Flag internalDate time.Time } var todo []queued metaSet := imap.SeqSet{imap.SeqRange{Start: 1, Stop: sel.NumMessages}} fc := src.Fetch(metaSet, &imap.FetchOptions{ UID: true, Envelope: true, RFC822Size: true, Flags: true, InternalDate: true, }) var scanned int64 for { if err := ctx.Err(); err != nil { _ = fc.Close() return res, err } msg := fc.Next() if msg == nil { break } buf, err := msg.Collect() if err != nil { res.Errors++ continue } scanned++ key := MessageKey(buf.Envelope, buf.RFC822Size) already, err := deps.IsMigrated(key) if err != nil { res.Errors++ } else if already { res.Skipped++ if deps.OnProgress != nil { deps.OnProgress(res.Copied, res.Skipped) } } else { todo = append(todo, queued{uid: buf.UID, key: key, flags: buf.Flags, internalDate: buf.InternalDate}) } if deps.OnScan != nil { deps.OnScan(scanned, total) } } if err := fc.Close(); err != nil { return res, fmt.Errorf("fetch meta %q: %w", srcFolder, err) } // Pass 2: fetch bodies for the queued (new) messages, one at a time. for _, q := range todo { if err := ctx.Err(); err != nil { return res, err } if err := streamOne(src, dst, dstFolder, q.uid, q.flags, q.internalDate); err != nil { res.Errors++ continue } if err := deps.MarkMigrated(dstFolder, q.key); err != nil { res.Errors++ continue } res.Copied++ if deps.OnProgress != nil { deps.OnProgress(res.Copied, res.Skipped) } } return res, nil } // streamOne FETCHes BODY[] for one message and APPENDs it into dst without // spooling to disk. The body is buffered in RAM only for the duration of // this single FETCH->APPEND round trip. func streamOne(src, dst *imapclient.Client, dstFolder string, uid imap.UID, flags []imap.Flag, internalDate time.Time) error { bodySection := &imap.FetchItemBodySection{} fetchCmd := src.Fetch(imap.UIDSetNum(uid), &imap.FetchOptions{ BodySection: []*imap.FetchItemBodySection{bodySection}, }) defer fetchCmd.Close() msg := fetchCmd.Next() if msg == nil { return fmt.Errorf("no message for uid %v", uid) } var body []byte for { item := msg.Next() if item == nil { break } if d, ok := item.(imapclient.FetchItemDataBodySection); ok { b, err := io.ReadAll(d.Literal) if err != nil { return err } body = b } } if err := fetchCmd.Close(); err != nil { return err } if body == nil { return fmt.Errorf("empty body uid %v", uid) } appendCmd := dst.Append(dstFolder, int64(len(body)), &imap.AppendOptions{Flags: keepFlags(flags), Time: internalDate}) if _, err := io.Copy(appendCmd, bytes.NewReader(body)); err != nil { return err } if err := appendCmd.Close(); err != nil { return err } _, err := appendCmd.Wait() return err } // keepFlags drops \Recent: it cannot be set via APPEND. go-imap v2 beta.8 // no longer defines an imap.FlagRecent constant (RFC 9051 dropped \Recent // from IMAP4rev2), so match it by its literal wire form instead. func keepFlags(flags []imap.Flag) []imap.Flag { out := make([]imap.Flag, 0, len(flags)) for _, f := range flags { if f == "\\Recent" { continue } out = append(out, f) } return out }