From 06c0598b805c4617c5de13e1b0433eaca40687a9 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Wed, 1 Jul 2026 17:39:43 +0700 Subject: [PATCH] feat(imapx): streaming per-folder copy with dedup, idempotent --- internal/imapx/copy.go | 152 ++++++++++++++++++++++++++++++++++++ internal/imapx/copy_test.go | 92 ++++++++++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 internal/imapx/copy.go create mode 100644 internal/imapx/copy_test.go diff --git a/internal/imapx/copy.go b/internal/imapx/copy.go new file mode 100644 index 0000000..c43a4bb --- /dev/null +++ b/internal/imapx/copy.go @@ -0,0 +1,152 @@ +package imapx + +import ( + "bytes" + "context" + "fmt" + "io" + + "github.com/emersion/go-imap/v2" + "github.com/emersion/go-imap/v2/imapclient" +) + +// CopyDeps injects the dedup/progress hooks used by CopyFolder. APPEND to +// dst always happens before MarkMigrated is called, so a crash between the +// two only ever causes a message to be re-copied (never lost) on the next +// run. +type CopyDeps struct { + IsMigrated func(key string) (bool, error) + MarkMigrated func(folder, key string) error + OnProgress func(copied, skipped int) +} + +// CopyResult summarizes the outcome of one CopyFolder run. +type CopyResult struct { + Copied int + Skipped int + Errors int +} + +// CopyFolder streams messages from srcFolder on src to dstFolder on dst. +// +// The source folder is opened read-only (EXAMINE) and is never mutated: +// no \Deleted flags are set and no EXPUNGE is issued. Each message body is +// held in memory only for the duration of a single FETCH->APPEND and is +// never written to disk. Messages already migrated (per deps.IsMigrated) +// are skipped without re-fetching their bodies. +func CopyFolder(ctx context.Context, src, dst *imapclient.Client, srcFolder, dstFolder string, deps CopyDeps) (CopyResult, error) { + var res CopyResult + + sel, err := src.Select(srcFolder, &imap.SelectOptions{ReadOnly: true}).Wait() + if err != nil { + return res, fmt.Errorf("examine src %q: %w", srcFolder, err) + } + if sel.NumMessages == 0 { + return res, nil + } + + // 1) Collect envelope+uid+size for every message (cheap pass, no bodies). + metaSet := imap.SeqSet{imap.SeqRange{Start: 1, Stop: sel.NumMessages}} + metas, err := src.Fetch(metaSet, &imap.FetchOptions{ + UID: true, Envelope: true, RFC822Size: true, Flags: true, + }).Collect() + if err != nil { + return res, fmt.Errorf("fetch meta: %w", err) + } + + // dst folder must exist (idempotent create; ignore "already exists"). + _ = dst.Create(dstFolder, nil).Wait() + + for _, m := range metas { + if err := ctx.Err(); err != nil { + return res, err + } + + key := MessageKey(m.Envelope, m.RFC822Size) + already, err := deps.IsMigrated(key) + if err != nil { + res.Errors++ + continue + } + if already { + res.Skipped++ + if deps.OnProgress != nil { + deps.OnProgress(res.Copied, res.Skipped) + } + continue + } + if err := streamOne(src, dst, dstFolder, m.UID, m.Flags); err != nil { + res.Errors++ + continue + } + if err := deps.MarkMigrated(dstFolder, key); err != nil { + res.Errors++ + continue + } + res.Copied++ + if deps.OnProgress != nil { + deps.OnProgress(res.Copied, res.Skipped) + } + } + return res, nil +} + +// streamOne FETCHes BODY[] for one message and APPENDs it into dst without +// spooling to disk. The body is buffered in RAM only for the duration of +// this single FETCH->APPEND round trip. +func streamOne(src, dst *imapclient.Client, dstFolder string, uid imap.UID, flags []imap.Flag) error { + bodySection := &imap.FetchItemBodySection{} + fetchCmd := src.Fetch(imap.UIDSetNum(uid), &imap.FetchOptions{ + BodySection: []*imap.FetchItemBodySection{bodySection}, + }) + defer fetchCmd.Close() + + msg := fetchCmd.Next() + if msg == nil { + return fmt.Errorf("no message for uid %v", uid) + } + var body []byte + for { + item := msg.Next() + if item == nil { + break + } + if d, ok := item.(imapclient.FetchItemDataBodySection); ok { + b, err := io.ReadAll(d.Literal) + if err != nil { + return err + } + body = b + } + } + if err := fetchCmd.Close(); err != nil { + return err + } + if body == nil { + return fmt.Errorf("empty body uid %v", uid) + } + + appendCmd := dst.Append(dstFolder, int64(len(body)), &imap.AppendOptions{Flags: keepFlags(flags)}) + if _, err := io.Copy(appendCmd, bytes.NewReader(body)); err != nil { + return err + } + if err := appendCmd.Close(); err != nil { + return err + } + _, err := appendCmd.Wait() + return err +} + +// keepFlags drops \Recent: it cannot be set via APPEND. go-imap v2 beta.8 +// no longer defines an imap.FlagRecent constant (RFC 9051 dropped \Recent +// from IMAP4rev2), so match it by its literal wire form instead. +func keepFlags(flags []imap.Flag) []imap.Flag { + out := make([]imap.Flag, 0, len(flags)) + for _, f := range flags { + if f == "\\Recent" { + continue + } + out = append(out, f) + } + return out +} diff --git a/internal/imapx/copy_test.go b/internal/imapx/copy_test.go new file mode 100644 index 0000000..f6c1dbe --- /dev/null +++ b/internal/imapx/copy_test.go @@ -0,0 +1,92 @@ +package imapx + +import ( + "context" + "fmt" + "testing" +) + +// seedInbox logs in as login/pass and APPENDs n minimal messages with unique +// Message-IDs into INBOX via a dedicated connection. +func seedInbox(t *testing.T, ep Endpoint, login, pass string, n int) { + t.Helper() + ctx := context.Background() + + c, err := Connect(ctx, ep) + if err != nil { + t.Fatalf("seedInbox connect: %v", err) + } + defer func() { _ = c.Logout().Wait() }() + + if err := c.Login(login, pass).Wait(); err != nil { + t.Fatalf("seedInbox login: %v", err) + } + + for i := 0; i < n; i++ { + msg := fmt.Sprintf( + "From: sender@localhost\r\nTo: %s\r\nSubject: seed %d\r\nMessage-Id: \r\n\r\nBody %d\r\n", + login, i, i, &i, i, + ) + buf := []byte(msg) + appendCmd := c.Append("INBOX", int64(len(buf)), nil) + if _, err := appendCmd.Write(buf); err != nil { + t.Fatalf("seedInbox write %d: %v", i, err) + } + if err := appendCmd.Close(); err != nil { + t.Fatalf("seedInbox close %d: %v", i, err) + } + if _, err := appendCmd.Wait(); err != nil { + t.Fatalf("seedInbox append %d: %v", i, err) + } + } +} + +// Требует два ящика на greenmail. Первый запуск копирует N, второй — 0 (все skipped). +func TestCopyFolderIdempotent(t *testing.T) { + ep := testEP(t) // plain greenmail + ctx := context.Background() + + // подготовка: APPEND 2 письма в INBOX источника через отдельное соединение + seedInbox(t, ep, "src@localhost", "p", 2) + + src, err := Connect(ctx, ep) + if err != nil { + t.Fatal(err) + } + defer func() { _ = src.Logout().Wait() }() + if err := src.Login("src@localhost", "p").Wait(); err != nil { + t.Fatal(err) + } + + dst, err := Connect(ctx, ep) + if err != nil { + t.Fatal(err) + } + defer func() { _ = dst.Logout().Wait() }() + if err := dst.Login("dst@localhost", "p").Wait(); err != nil { + t.Fatal(err) + } + + seen := map[string]bool{} + deps := CopyDeps{ + IsMigrated: func(k string) (bool, error) { return seen[k], nil }, + MarkMigrated: func(_, k string) error { seen[k] = true; return nil }, + OnProgress: func(_, _ int) {}, + } + + r1, err := CopyFolder(ctx, src, dst, "INBOX", "INBOX", deps) + if err != nil { + t.Fatalf("run1: %v", err) + } + if r1.Copied != 2 { + t.Fatalf("run1 copied=%d want 2", r1.Copied) + } + + r2, err := CopyFolder(ctx, src, dst, "INBOX", "INBOX", deps) + if err != nil { + t.Fatalf("run2: %v", err) + } + if r2.Copied != 0 || r2.Skipped != 2 { + t.Fatalf("run2 copied=%d skipped=%d want 0/2", r2.Copied, r2.Skipped) + } +}