feat(imapx): streaming per-folder copy with dedup, idempotent

This commit is contained in:
2026-07-01 17:39:43 +07:00
parent 79f5c5b93a
commit 06c0598b80
2 changed files with 244 additions and 0 deletions
+152
View File
@@ -0,0 +1,152 @@
package imapx
import (
"bytes"
"context"
"fmt"
"io"
"github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
)
// CopyDeps injects the dedup/progress hooks used by CopyFolder. APPEND to
// dst always happens before MarkMigrated is called, so a crash between the
// two only ever causes a message to be re-copied (never lost) on the next
// run.
type CopyDeps struct {
IsMigrated func(key string) (bool, error)
MarkMigrated func(folder, key string) error
OnProgress func(copied, skipped int)
}
// CopyResult summarizes the outcome of one CopyFolder run.
type CopyResult struct {
Copied int
Skipped int
Errors int
}
// CopyFolder streams messages from srcFolder on src to dstFolder on dst.
//
// The source folder is opened read-only (EXAMINE) and is never mutated:
// no \Deleted flags are set and no EXPUNGE is issued. Each message body is
// held in memory only for the duration of a single FETCH->APPEND and is
// never written to disk. Messages already migrated (per deps.IsMigrated)
// are skipped without re-fetching their bodies.
func CopyFolder(ctx context.Context, src, dst *imapclient.Client, srcFolder, dstFolder string, deps CopyDeps) (CopyResult, error) {
var res CopyResult
sel, err := src.Select(srcFolder, &imap.SelectOptions{ReadOnly: true}).Wait()
if err != nil {
return res, fmt.Errorf("examine src %q: %w", srcFolder, err)
}
if sel.NumMessages == 0 {
return res, nil
}
// 1) Collect envelope+uid+size for every message (cheap pass, no bodies).
metaSet := imap.SeqSet{imap.SeqRange{Start: 1, Stop: sel.NumMessages}}
metas, err := src.Fetch(metaSet, &imap.FetchOptions{
UID: true, Envelope: true, RFC822Size: true, Flags: true,
}).Collect()
if err != nil {
return res, fmt.Errorf("fetch meta: %w", err)
}
// dst folder must exist (idempotent create; ignore "already exists").
_ = dst.Create(dstFolder, nil).Wait()
for _, m := range metas {
if err := ctx.Err(); err != nil {
return res, err
}
key := MessageKey(m.Envelope, m.RFC822Size)
already, err := deps.IsMigrated(key)
if err != nil {
res.Errors++
continue
}
if already {
res.Skipped++
if deps.OnProgress != nil {
deps.OnProgress(res.Copied, res.Skipped)
}
continue
}
if err := streamOne(src, dst, dstFolder, m.UID, m.Flags); err != nil {
res.Errors++
continue
}
if err := deps.MarkMigrated(dstFolder, key); err != nil {
res.Errors++
continue
}
res.Copied++
if deps.OnProgress != nil {
deps.OnProgress(res.Copied, res.Skipped)
}
}
return res, nil
}
// streamOne FETCHes BODY[] for one message and APPENDs it into dst without
// spooling to disk. The body is buffered in RAM only for the duration of
// this single FETCH->APPEND round trip.
func streamOne(src, dst *imapclient.Client, dstFolder string, uid imap.UID, flags []imap.Flag) error {
bodySection := &imap.FetchItemBodySection{}
fetchCmd := src.Fetch(imap.UIDSetNum(uid), &imap.FetchOptions{
BodySection: []*imap.FetchItemBodySection{bodySection},
})
defer fetchCmd.Close()
msg := fetchCmd.Next()
if msg == nil {
return fmt.Errorf("no message for uid %v", uid)
}
var body []byte
for {
item := msg.Next()
if item == nil {
break
}
if d, ok := item.(imapclient.FetchItemDataBodySection); ok {
b, err := io.ReadAll(d.Literal)
if err != nil {
return err
}
body = b
}
}
if err := fetchCmd.Close(); err != nil {
return err
}
if body == nil {
return fmt.Errorf("empty body uid %v", uid)
}
appendCmd := dst.Append(dstFolder, int64(len(body)), &imap.AppendOptions{Flags: keepFlags(flags)})
if _, err := io.Copy(appendCmd, bytes.NewReader(body)); err != nil {
return err
}
if err := appendCmd.Close(); err != nil {
return err
}
_, err := appendCmd.Wait()
return err
}
// keepFlags drops \Recent: it cannot be set via APPEND. go-imap v2 beta.8
// no longer defines an imap.FlagRecent constant (RFC 9051 dropped \Recent
// from IMAP4rev2), so match it by its literal wire form instead.
func keepFlags(flags []imap.Flag) []imap.Flag {
out := make([]imap.Flag, 0, len(flags))
for _, f := range flags {
if f == "\\Recent" {
continue
}
out = append(out, f)
}
return out
}
+92
View File
@@ -0,0 +1,92 @@
package imapx
import (
"context"
"fmt"
"testing"
)
// seedInbox logs in as login/pass and APPENDs n minimal messages with unique
// Message-IDs into INBOX via a dedicated connection.
func seedInbox(t *testing.T, ep Endpoint, login, pass string, n int) {
t.Helper()
ctx := context.Background()
c, err := Connect(ctx, ep)
if err != nil {
t.Fatalf("seedInbox connect: %v", err)
}
defer func() { _ = c.Logout().Wait() }()
if err := c.Login(login, pass).Wait(); err != nil {
t.Fatalf("seedInbox login: %v", err)
}
for i := 0; i < n; i++ {
msg := fmt.Sprintf(
"From: sender@localhost\r\nTo: %s\r\nSubject: seed %d\r\nMessage-Id: <seed-%d-%p@localhost>\r\n\r\nBody %d\r\n",
login, i, i, &i, i,
)
buf := []byte(msg)
appendCmd := c.Append("INBOX", int64(len(buf)), nil)
if _, err := appendCmd.Write(buf); err != nil {
t.Fatalf("seedInbox write %d: %v", i, err)
}
if err := appendCmd.Close(); err != nil {
t.Fatalf("seedInbox close %d: %v", i, err)
}
if _, err := appendCmd.Wait(); err != nil {
t.Fatalf("seedInbox append %d: %v", i, err)
}
}
}
// Требует два ящика на greenmail. Первый запуск копирует N, второй — 0 (все skipped).
func TestCopyFolderIdempotent(t *testing.T) {
ep := testEP(t) // plain greenmail
ctx := context.Background()
// подготовка: APPEND 2 письма в INBOX источника через отдельное соединение
seedInbox(t, ep, "src@localhost", "p", 2)
src, err := Connect(ctx, ep)
if err != nil {
t.Fatal(err)
}
defer func() { _ = src.Logout().Wait() }()
if err := src.Login("src@localhost", "p").Wait(); err != nil {
t.Fatal(err)
}
dst, err := Connect(ctx, ep)
if err != nil {
t.Fatal(err)
}
defer func() { _ = dst.Logout().Wait() }()
if err := dst.Login("dst@localhost", "p").Wait(); err != nil {
t.Fatal(err)
}
seen := map[string]bool{}
deps := CopyDeps{
IsMigrated: func(k string) (bool, error) { return seen[k], nil },
MarkMigrated: func(_, k string) error { seen[k] = true; return nil },
OnProgress: func(_, _ int) {},
}
r1, err := CopyFolder(ctx, src, dst, "INBOX", "INBOX", deps)
if err != nil {
t.Fatalf("run1: %v", err)
}
if r1.Copied != 2 {
t.Fatalf("run1 copied=%d want 2", r1.Copied)
}
r2, err := CopyFolder(ctx, src, dst, "INBOX", "INBOX", deps)
if err != nil {
t.Fatalf("run2: %v", err)
}
if r2.Copied != 0 || r2.Skipped != 2 {
t.Fatalf("run2 copied=%d skipped=%d want 0/2", r2.Copied, r2.Skipped)
}
}