Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
91 KiB
imap-copier Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Self-hosted утилита переноса почты между IMAP-провайдерами с веб-UI, WebSocket-прогрессом, CSV-импортом и идемпотентным (без дублей) копированием.
Architecture: Один Go-бинарник (REST + WebSocket + встроенный React через embed.FS) с worker pool поверх emersion/go-imap/v2. Перенос — стриминг FETCH → APPEND в RAM, без спула на диск. Состояние и дедуп-журнал — в PostgreSQL. Разворачивается docker-compose (app + caddy + postgres); Caddy терминирует/проксирует (:80 дефолт, опц. :443 + Let's Encrypt).
Tech Stack: Go 1.22+, github.com/emersion/go-imap/v2, стандартный net/http, github.com/coder/websocket, github.com/jackc/pgx/v5, github.com/golang-migrate/migrate/v4, log/slog; React + TypeScript + Vite; Docker, Caddy.
Global Constraints
- Go 1.22+ (используются route-паттерны
net/httpвидаGET /api/tasks/{id}). - Тела писем НИКОГДА не пишутся на диск и не логируются; живут в RAM только на время одной итерации
FETCH→APPEND. - Копирование недеструктивно: источник не изменяется (никакого
\Deleted/EXPUNGE). - Пароли ящиков в БД только зашифрованы (AES-256-GCM, ключ
ENC_KEY= base64 32 байта). Пароли не возвращаются в API и не пишутся в лог. - Дедуп-ключ:
Message-ID, при отсутствии —md5(From|To|Subject|Date|Size). migrated_messagesимеетUNIQUE(account_id, message_key); порядок — сначалаAPPEND, затем запись ключа.- Все HTTP-роуты (REST и
/ws) под session-auth, кроме/login,/api/login,/healthz. - TDD: каждая задача — сначала падающий тест, потом минимальная реализация, потом коммит. Частые коммиты.
- Модуль:
github.com/vasyansk/imap-copier.
File Structure
imap-copier/
├── go.mod
├── cmd/server/main.go # сборка зависимостей, старт HTTP
├── internal/
│ ├── config/config.go # чтение env
│ ├── crypto/crypto.go # AES-GCM шифр паролей
│ ├── crypto/session.go # HMAC signed cookie
│ ├── store/store.go # pgx pool, конструктор
│ ├── store/endpoints.go # CRUD endpoints
│ ├── store/tasks.go # CRUD tasks
│ ├── store/accounts.go # CRUD accounts + счётчики
│ ├── store/runs.go # runs
│ ├── store/migrated.go # дедуп-журнал
│ ├── imapx/dial.go # connect + TLS + capability (тест endpoint)
│ ├── imapx/account.go # login + list folders (тест account)
│ ├── imapx/messagekey.go # вычисление message_key
│ ├── imapx/copy.go # stream FETCH→APPEND по папке
│ ├── orchestrator/orchestrator.go# worker pool, прогон run
│ ├── wshub/wshub.go # WebSocket hub, события
│ ├── csvimport/csvimport.go # парсинг/валидация CSV
│ └── httpapi/ # роуты + middleware + embed
│ ├── router.go
│ ├── auth.go
│ ├── endpoints.go
│ ├── tasks.go
│ ├── accounts.go
│ ├── run.go
│ ├── ws.go
│ └── static.go # embed React build
├── migrations/ # golang-migrate .sql
├── web/ # React + Vite
├── Dockerfile
├── docker-compose.yml
└── Caddyfile
Task 1: Инициализация Go-модуля и конфигурации
Files:
- Create:
go.mod,internal/config/config.go,internal/config/config_test.go
Interfaces:
-
Produces:
config.Configstruct с полямиHTTPAddr string,DatabaseURL string,AuthUser string,AuthPass string,EncKey []byte,SessionSecret []byte,WorkerConcurrency int;func config.Load() (Config, error). -
Step 1: Инициализировать модуль
Run: go mod init github.com/vasyansk/imap-copier
Expected: создан go.mod с go 1.22.
- Step 2: Написать падающий тест
internal/config/config_test.go:
package config
import (
"encoding/base64"
"testing"
)
func TestLoadRequiresEncKey32Bytes(t *testing.T) {
t.Setenv("DATABASE_URL", "postgres://x")
t.Setenv("AUTH_USER", "admin")
t.Setenv("AUTH_PASS", "pass")
t.Setenv("SESSION_SECRET", "secret")
t.Setenv("ENC_KEY", base64.StdEncoding.EncodeToString(make([]byte, 16))) // wrong size
if _, err := Load(); err == nil {
t.Fatal("expected error for 16-byte ENC_KEY, got nil")
}
}
func TestLoadDefaults(t *testing.T) {
t.Setenv("DATABASE_URL", "postgres://x")
t.Setenv("AUTH_USER", "admin")
t.Setenv("AUTH_PASS", "pass")
t.Setenv("SESSION_SECRET", "secret")
t.Setenv("ENC_KEY", base64.StdEncoding.EncodeToString(make([]byte, 32)))
cfg, err := Load()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if cfg.HTTPAddr != ":8080" {
t.Errorf("HTTPAddr = %q, want :8080", cfg.HTTPAddr)
}
if cfg.WorkerConcurrency != 4 {
t.Errorf("WorkerConcurrency = %d, want 4", cfg.WorkerConcurrency)
}
}
- Step 3: Запустить тест — убедиться, что падает
Run: go test ./internal/config/
Expected: FAIL (пакет/Load не существует).
- Step 4: Реализовать config
internal/config/config.go:
package config
import (
"encoding/base64"
"fmt"
"os"
"strconv"
)
type Config struct {
HTTPAddr string
DatabaseURL string
AuthUser string
AuthPass string
EncKey []byte
SessionSecret []byte
WorkerConcurrency int
}
func Load() (Config, error) {
c := Config{
HTTPAddr: getenv("HTTP_ADDR", ":8080"),
DatabaseURL: os.Getenv("DATABASE_URL"),
AuthUser: os.Getenv("AUTH_USER"),
AuthPass: os.Getenv("AUTH_PASS"),
SessionSecret: []byte(os.Getenv("SESSION_SECRET")),
WorkerConcurrency: 4,
}
if v := os.Getenv("WORKER_CONCURRENCY"); v != "" {
n, err := strconv.Atoi(v)
if err != nil || n < 1 {
return Config{}, fmt.Errorf("WORKER_CONCURRENCY invalid: %q", v)
}
c.WorkerConcurrency = n
}
for k, v := range map[string]string{
"DATABASE_URL": c.DatabaseURL, "AUTH_USER": c.AuthUser,
"AUTH_PASS": c.AuthPass, "SESSION_SECRET": string(c.SessionSecret),
} {
if v == "" {
return Config{}, fmt.Errorf("%s is required", k)
}
}
key, err := base64.StdEncoding.DecodeString(os.Getenv("ENC_KEY"))
if err != nil {
return Config{}, fmt.Errorf("ENC_KEY must be base64: %w", err)
}
if len(key) != 32 {
return Config{}, fmt.Errorf("ENC_KEY must decode to 32 bytes, got %d", len(key))
}
c.EncKey = key
return c, nil
}
func getenv(k, def string) string {
if v := os.Getenv(k); v != "" {
return v
}
return def
}
- Step 5: Запустить тесты — PASS
Run: go test ./internal/config/
Expected: PASS.
- Step 6: Commit
git add go.mod internal/config/
git commit -m "feat(config): env-based configuration with validation"
Task 2: Шифрование паролей (AES-256-GCM)
Files:
- Create:
internal/crypto/crypto.go,internal/crypto/crypto_test.go
Interfaces:
-
Consumes:
config.Config.EncKey. -
Produces:
func crypto.Encrypt(key, plaintext []byte) (string, error)(base64 nonce+ciphertext),func crypto.Decrypt(key []byte, enc string) ([]byte, error). -
Step 1: Написать падающий тест
internal/crypto/crypto_test.go:
package crypto
import (
"bytes"
"testing"
)
func TestEncryptDecryptRoundTrip(t *testing.T) {
key := make([]byte, 32)
enc, err := Encrypt(key, []byte("hunter2"))
if err != nil {
t.Fatalf("encrypt: %v", err)
}
if enc == "hunter2" {
t.Fatal("ciphertext must not equal plaintext")
}
got, err := Decrypt(key, enc)
if err != nil {
t.Fatalf("decrypt: %v", err)
}
if !bytes.Equal(got, []byte("hunter2")) {
t.Fatalf("got %q, want hunter2", got)
}
}
func TestEncryptNonDeterministic(t *testing.T) {
key := make([]byte, 32)
a, _ := Encrypt(key, []byte("x"))
b, _ := Encrypt(key, []byte("x"))
if a == b {
t.Fatal("two encryptions must differ (random nonce)")
}
}
- Step 2: Запустить — FAIL
Run: go test ./internal/crypto/
Expected: FAIL (не определено).
- Step 3: Реализовать
internal/crypto/crypto.go:
package crypto
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"errors"
"io"
)
func Encrypt(key, plaintext []byte) (string, error) {
gcm, err := newGCM(key)
if err != nil {
return "", err
}
nonce := make([]byte, gcm.NonceSize())
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return "", err
}
ct := gcm.Seal(nonce, nonce, plaintext, nil)
return base64.StdEncoding.EncodeToString(ct), nil
}
func Decrypt(key []byte, enc string) ([]byte, error) {
gcm, err := newGCM(key)
if err != nil {
return nil, err
}
raw, err := base64.StdEncoding.DecodeString(enc)
if err != nil {
return nil, err
}
ns := gcm.NonceSize()
if len(raw) < ns {
return nil, errors.New("ciphertext too short")
}
return gcm.Open(nil, raw[:ns], raw[ns:], nil)
}
func newGCM(key []byte) (cipher.AEAD, error) {
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
return cipher.NewGCM(block)
}
- Step 4: Запустить — PASS
Run: go test ./internal/crypto/
Expected: PASS.
- Step 5: Commit
git add internal/crypto/crypto.go internal/crypto/crypto_test.go
git commit -m "feat(crypto): AES-256-GCM password encryption"
Task 3: Session cookie (HMAC)
Files:
- Create:
internal/crypto/session.go,internal/crypto/session_test.go
Interfaces:
-
Consumes:
config.Config.SessionSecret. -
Produces:
func crypto.SignSession(secret []byte, user string, expiry time.Time) string,func crypto.VerifySession(secret []byte, token string, now time.Time) (user string, ok bool). -
Step 1: Написать падающий тест
internal/crypto/session_test.go:
package crypto
import (
"testing"
"time"
)
func TestSessionRoundTrip(t *testing.T) {
secret := []byte("s3cr3t")
now := time.Unix(1_700_000_000, 0)
tok := SignSession(secret, "admin", now.Add(time.Hour))
user, ok := VerifySession(secret, tok, now)
if !ok || user != "admin" {
t.Fatalf("verify = %q,%v want admin,true", user, ok)
}
}
func TestSessionRejectsExpired(t *testing.T) {
secret := []byte("s3cr3t")
now := time.Unix(1_700_000_000, 0)
tok := SignSession(secret, "admin", now.Add(-time.Second))
if _, ok := VerifySession(secret, tok, now); ok {
t.Fatal("expired token must be rejected")
}
}
func TestSessionRejectsTampered(t *testing.T) {
secret := []byte("s3cr3t")
now := time.Unix(1_700_000_000, 0)
tok := SignSession(secret, "admin", now.Add(time.Hour))
if _, ok := VerifySession([]byte("other"), tok, now); ok {
t.Fatal("wrong secret must be rejected")
}
}
- Step 2: Запустить — FAIL
Run: go test ./internal/crypto/ -run Session
Expected: FAIL.
- Step 3: Реализовать
internal/crypto/session.go:
package crypto
import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"fmt"
"strconv"
"strings"
"time"
)
// token = base64(user) "." expiryUnix "." base64(hmac)
func SignSession(secret []byte, user string, expiry time.Time) string {
payload := base64.RawURLEncoding.EncodeToString([]byte(user)) + "." +
strconv.FormatInt(expiry.Unix(), 10)
return payload + "." + sign(secret, payload)
}
func VerifySession(secret []byte, token string, now time.Time) (string, bool) {
parts := strings.Split(token, ".")
if len(parts) != 3 {
return "", false
}
payload := parts[0] + "." + parts[1]
if !hmac.Equal([]byte(parts[2]), []byte(sign(secret, payload))) {
return "", false
}
exp, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil || now.Unix() > exp {
return "", false
}
user, err := base64.RawURLEncoding.DecodeString(parts[0])
if err != nil {
return "", false
}
return string(user), true
}
func sign(secret []byte, payload string) string {
m := hmac.New(sha256.New, secret)
fmt.Fprint(m, payload)
return base64.RawURLEncoding.EncodeToString(m.Sum(nil))
}
- Step 4: Запустить — PASS
Run: go test ./internal/crypto/
Expected: PASS (все тесты пакета).
- Step 5: Commit
git add internal/crypto/session.go internal/crypto/session_test.go
git commit -m "feat(crypto): HMAC signed session tokens"
Task 4: Миграции БД
Files:
- Create:
migrations/0001_init.up.sql,migrations/0001_init.down.sql
Interfaces:
-
Produces: схема таблиц
endpoints, tasks, accounts, runs, migrated_messages. -
Step 1: Написать up-миграцию
migrations/0001_init.up.sql:
CREATE TABLE endpoints (
id BIGSERIAL PRIMARY KEY,
role_label TEXT NOT NULL,
host TEXT NOT NULL,
port INT NOT NULL,
tls_mode TEXT NOT NULL CHECK (tls_mode IN ('ssl','starttls','plain')),
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE tasks (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL,
src_endpoint_id BIGINT NOT NULL REFERENCES endpoints(id),
dst_endpoint_id BIGINT NOT NULL REFERENCES endpoints(id),
status TEXT NOT NULL DEFAULT 'draft',
folder_mapping JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE accounts (
id BIGSERIAL PRIMARY KEY,
task_id BIGINT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
src_login TEXT NOT NULL,
src_pass_enc TEXT NOT NULL,
dst_login TEXT NOT NULL,
dst_pass_enc TEXT NOT NULL,
test_src_status TEXT NOT NULL DEFAULT 'unknown',
test_dst_status TEXT NOT NULL DEFAULT 'unknown',
copied_count BIGINT NOT NULL DEFAULT 0,
skipped_count BIGINT NOT NULL DEFAULT 0,
error_count BIGINT NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'idle'
);
CREATE TABLE runs (
id BIGSERIAL PRIMARY KEY,
task_id BIGINT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
finished_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'running',
total_copied BIGINT NOT NULL DEFAULT 0,
total_skipped BIGINT NOT NULL DEFAULT 0,
total_errors BIGINT NOT NULL DEFAULT 0
);
CREATE TABLE migrated_messages (
id BIGSERIAL PRIMARY KEY,
account_id BIGINT NOT NULL REFERENCES accounts(id) ON DELETE CASCADE,
folder TEXT NOT NULL,
message_key TEXT NOT NULL,
copied_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (account_id, message_key)
);
migrations/0001_init.down.sql:
DROP TABLE IF EXISTS migrated_messages;
DROP TABLE IF EXISTS runs;
DROP TABLE IF EXISTS accounts;
DROP TABLE IF EXISTS tasks;
DROP TABLE IF EXISTS endpoints;
- Step 2: Проверить синтаксис на локальном Postgres
Run: docker run --rm -e POSTGRES_PASSWORD=t -d --name pgtest -p 55432:5432 postgres:16 && sleep 5 && PGPASSWORD=t psql -h localhost -p 55432 -U postgres -f migrations/0001_init.up.sql && docker rm -f pgtest
Expected: CREATE TABLE ×5, без ошибок.
- Step 3: Commit
git add migrations/
git commit -m "feat(db): initial schema migration"
Task 5: Store — пул и endpoints CRUD
Files:
- Create:
internal/store/store.go,internal/store/endpoints.go,internal/store/store_test.go
Interfaces:
- Consumes:
config.Config.DatabaseURL. - Produces:
type Store struct{ Pool *pgxpool.Pool };func store.New(ctx, dsn) (*Store, error);type Endpoint struct{ ID int64; RoleLabel, Host string; Port int; TLSMode string };func (*Store) CreateEndpoint(ctx, Endpoint) (int64, error);func (*Store) ListEndpoints(ctx) ([]Endpoint, error);func (*Store) GetEndpoint(ctx, id) (Endpoint, error).
Integration-тесты store используют env
TEST_DATABASE_URL; при отсутствии —t.Skip. ХелперtestStore(t)применяет миграции к чистой БД.
- Step 1: Добавить зависимости
Run: go get github.com/jackc/pgx/v5/pgxpool github.com/golang-migrate/migrate/v4
Expected: обновлён go.mod.
- Step 2: Написать падающий тест
internal/store/store_test.go:
package store
import (
"context"
"os"
"testing"
)
func testStore(t *testing.T) *Store {
dsn := os.Getenv("TEST_DATABASE_URL")
if dsn == "" {
t.Skip("TEST_DATABASE_URL not set")
}
s, err := New(context.Background(), dsn)
if err != nil {
t.Fatalf("New: %v", err)
}
t.Cleanup(func() {
s.Pool.Exec(context.Background(),
`TRUNCATE endpoints, tasks, accounts, runs, migrated_messages RESTART IDENTITY CASCADE`)
s.Pool.Close()
})
return s
}
func TestCreateAndGetEndpoint(t *testing.T) {
s := testStore(t)
ctx := context.Background()
id, err := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "imap.a.com", Port: 993, TLSMode: "ssl"})
if err != nil {
t.Fatalf("create: %v", err)
}
got, err := s.GetEndpoint(ctx, id)
if err != nil {
t.Fatalf("get: %v", err)
}
if got.Host != "imap.a.com" || got.Port != 993 {
t.Fatalf("got %+v", got)
}
}
- Step 3: Запустить — FAIL (или skip без БД)
Run: TEST_DATABASE_URL=postgres://postgres:t@localhost:55432/postgres go test ./internal/store/ (сначала подними pg как в Task 4 + migrate).
Expected: FAIL (не скомпилируется — типов нет).
- Step 4: Реализовать store + endpoints
internal/store/store.go:
package store
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
)
type Store struct {
Pool *pgxpool.Pool
}
func New(ctx context.Context, dsn string) (*Store, error) {
pool, err := pgxpool.New(ctx, dsn)
if err != nil {
return nil, err
}
if err := pool.Ping(ctx); err != nil {
return nil, err
}
return &Store{Pool: pool}, nil
}
internal/store/endpoints.go:
package store
import "context"
type Endpoint struct {
ID int64
RoleLabel string
Host string
Port int
TLSMode string
}
func (s *Store) CreateEndpoint(ctx context.Context, e Endpoint) (int64, error) {
var id int64
err := s.Pool.QueryRow(ctx,
`INSERT INTO endpoints (role_label, host, port, tls_mode)
VALUES ($1,$2,$3,$4) RETURNING id`,
e.RoleLabel, e.Host, e.Port, e.TLSMode).Scan(&id)
return id, err
}
func (s *Store) GetEndpoint(ctx context.Context, id int64) (Endpoint, error) {
var e Endpoint
err := s.Pool.QueryRow(ctx,
`SELECT id, role_label, host, port, tls_mode FROM endpoints WHERE id=$1`, id).
Scan(&e.ID, &e.RoleLabel, &e.Host, &e.Port, &e.TLSMode)
return e, err
}
func (s *Store) ListEndpoints(ctx context.Context) ([]Endpoint, error) {
rows, err := s.Pool.Query(ctx,
`SELECT id, role_label, host, port, tls_mode FROM endpoints ORDER BY id`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []Endpoint
for rows.Next() {
var e Endpoint
if err := rows.Scan(&e.ID, &e.RoleLabel, &e.Host, &e.Port, &e.TLSMode); err != nil {
return nil, err
}
out = append(out, e)
}
return out, rows.Err()
}
- Step 5: Запустить — PASS
Run: TEST_DATABASE_URL=... go test ./internal/store/
Expected: PASS.
- Step 6: Commit
git add internal/store/ go.mod go.sum
git commit -m "feat(store): pgx pool and endpoints CRUD"
Task 6: Store — tasks, accounts, runs, migrated
Files:
- Create:
internal/store/tasks.go,internal/store/accounts.go,internal/store/runs.go,internal/store/migrated.go,internal/store/accounts_test.go
Interfaces:
-
Produces:
type Task struct{ ID int64; Name string; SrcEndpointID, DstEndpointID int64; Status string; FolderMapping map[string]string };CreateTask,GetTask,ListTasks,SetTaskStatus.type Account struct{ ID, TaskID int64; SrcLogin, SrcPassEnc, DstLogin, DstPassEnc, TestSrcStatus, TestDstStatus, Status string; Copied, Skipped, Errors int64 };CreateAccount,ListAccountsByTask,SetAccountTestStatus(ctx, id, side, status),IncAccountCounters(ctx, id, copied, skipped, errors int64),SetAccountStatus.type Run struct{ ID, TaskID int64; Status string; TotalCopied, TotalSkipped, TotalErrors int64 };CreateRun,FinishRun(ctx, id, status, copied, skipped, errors).- migrated:
IsMigrated(ctx, accountID int64, key string) (bool, error),MarkMigrated(ctx, accountID int64, folder, key string) error.
-
Step 1: Написать падающий тест на идемпотентность дедупа
internal/store/accounts_test.go:
package store
import (
"context"
"testing"
)
func TestMigratedIdempotency(t *testing.T) {
s := testStore(t)
ctx := context.Background()
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "u2", DstPassEnc: "y"})
if err := s.MarkMigrated(ctx, accID, "INBOX", "<msg-1>"); err != nil {
t.Fatalf("mark: %v", err)
}
if err := s.MarkMigrated(ctx, accID, "INBOX", "<msg-1>"); err != nil {
t.Fatalf("second mark must not error (ON CONFLICT): %v", err)
}
ok, err := s.IsMigrated(ctx, accID, "<msg-1>")
if err != nil || !ok {
t.Fatalf("IsMigrated = %v,%v want true,nil", ok, err)
}
absent, _ := s.IsMigrated(ctx, accID, "<msg-2>")
if absent {
t.Fatal("unknown key must be false")
}
}
- Step 2: Запустить — FAIL
Run: TEST_DATABASE_URL=... go test ./internal/store/ -run Migrated
Expected: FAIL (типов/методов нет).
- Step 3: Реализовать четыре файла
internal/store/migrated.go:
package store
import "context"
func (s *Store) IsMigrated(ctx context.Context, accountID int64, key string) (bool, error) {
var one int
err := s.Pool.QueryRow(ctx,
`SELECT 1 FROM migrated_messages WHERE account_id=$1 AND message_key=$2`,
accountID, key).Scan(&one)
if err != nil {
if err.Error() == "no rows in result set" {
return false, nil
}
return false, nil // pgx returns pgx.ErrNoRows; treat as not migrated
}
return true, nil
}
func (s *Store) MarkMigrated(ctx context.Context, accountID int64, folder, key string) error {
_, err := s.Pool.Exec(ctx,
`INSERT INTO migrated_messages (account_id, folder, message_key)
VALUES ($1,$2,$3) ON CONFLICT (account_id, message_key) DO NOTHING`,
accountID, folder, key)
return err
}
Замечание для реализатора: используй
errors.Is(err, pgx.ErrNoRows)вместо сравнения строки — вIsMigratedзамени телоScan-обработки на:if errors.Is(err, pgx.ErrNoRows) { return false, nil } if err != nil { return false, err } return true, nil(импорт
errorsиgithub.com/jackc/pgx/v5).
internal/store/tasks.go:
package store
import "context"
type Task struct {
ID int64
Name string
SrcEndpointID int64
DstEndpointID int64
Status string
FolderMapping map[string]string
}
func (s *Store) CreateTask(ctx context.Context, t Task) (int64, error) {
if t.FolderMapping == nil {
t.FolderMapping = map[string]string{}
}
var id int64
err := s.Pool.QueryRow(ctx,
`INSERT INTO tasks (name, src_endpoint_id, dst_endpoint_id, folder_mapping)
VALUES ($1,$2,$3,$4) RETURNING id`,
t.Name, t.SrcEndpointID, t.DstEndpointID, t.FolderMapping).Scan(&id)
return id, err
}
func (s *Store) GetTask(ctx context.Context, id int64) (Task, error) {
var t Task
err := s.Pool.QueryRow(ctx,
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping
FROM tasks WHERE id=$1`, id).
Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping)
return t, err
}
func (s *Store) ListTasks(ctx context.Context) ([]Task, error) {
rows, err := s.Pool.Query(ctx,
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping
FROM tasks ORDER BY id DESC`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []Task
for rows.Next() {
var t Task
if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping); err != nil {
return nil, err
}
out = append(out, t)
}
return out, rows.Err()
}
func (s *Store) SetTaskStatus(ctx context.Context, id int64, status string) error {
_, err := s.Pool.Exec(ctx, `UPDATE tasks SET status=$2 WHERE id=$1`, id, status)
return err
}
internal/store/accounts.go:
package store
import (
"context"
"fmt"
)
type Account struct {
ID int64
TaskID int64
SrcLogin string
SrcPassEnc string
DstLogin string
DstPassEnc string
TestSrcStatus string
TestDstStatus string
Status string
Copied int64
Skipped int64
Errors int64
}
func (s *Store) CreateAccount(ctx context.Context, a Account) (int64, error) {
var id int64
err := s.Pool.QueryRow(ctx,
`INSERT INTO accounts (task_id, src_login, src_pass_enc, dst_login, dst_pass_enc)
VALUES ($1,$2,$3,$4,$5) RETURNING id`,
a.TaskID, a.SrcLogin, a.SrcPassEnc, a.DstLogin, a.DstPassEnc).Scan(&id)
return id, err
}
func (s *Store) ListAccountsByTask(ctx context.Context, taskID int64) ([]Account, error) {
rows, err := s.Pool.Query(ctx,
`SELECT id, task_id, src_login, src_pass_enc, dst_login, dst_pass_enc,
test_src_status, test_dst_status, status, copied_count, skipped_count, error_count
FROM accounts WHERE task_id=$1 ORDER BY id`, taskID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []Account
for rows.Next() {
var a Account
if err := rows.Scan(&a.ID, &a.TaskID, &a.SrcLogin, &a.SrcPassEnc, &a.DstLogin, &a.DstPassEnc,
&a.TestSrcStatus, &a.TestDstStatus, &a.Status, &a.Copied, &a.Skipped, &a.Errors); err != nil {
return nil, err
}
out = append(out, a)
}
return out, rows.Err()
}
// side = "src" | "dst"
func (s *Store) SetAccountTestStatus(ctx context.Context, id int64, side, status string) error {
col := "test_src_status"
if side == "dst" {
col = "test_dst_status"
}
_, err := s.Pool.Exec(ctx, fmt.Sprintf(`UPDATE accounts SET %s=$2 WHERE id=$1`, col), id, status)
return err
}
func (s *Store) SetAccountStatus(ctx context.Context, id int64, status string) error {
_, err := s.Pool.Exec(ctx, `UPDATE accounts SET status=$2 WHERE id=$1`, id, status)
return err
}
func (s *Store) IncAccountCounters(ctx context.Context, id, copied, skipped, errs int64) error {
_, err := s.Pool.Exec(ctx,
`UPDATE accounts SET copied_count=copied_count+$2,
skipped_count=skipped_count+$3, error_count=error_count+$4 WHERE id=$1`,
id, copied, skipped, errs)
return err
}
internal/store/runs.go:
package store
import "context"
type Run struct {
ID int64
TaskID int64
Status string
TotalCopied int64
TotalSkipped int64
TotalErrors int64
}
func (s *Store) CreateRun(ctx context.Context, taskID int64) (int64, error) {
var id int64
err := s.Pool.QueryRow(ctx,
`INSERT INTO runs (task_id) VALUES ($1) RETURNING id`, taskID).Scan(&id)
return id, err
}
func (s *Store) FinishRun(ctx context.Context, id int64, status string, copied, skipped, errs int64) error {
_, err := s.Pool.Exec(ctx,
`UPDATE runs SET status=$2, finished_at=now(),
total_copied=$3, total_skipped=$4, total_errors=$5 WHERE id=$1`,
id, status, copied, skipped, errs)
return err
}
- Step 4: Запустить — PASS
Run: TEST_DATABASE_URL=... go test ./internal/store/
Expected: PASS.
- Step 5: Commit
git add internal/store/
git commit -m "feat(store): tasks, accounts, runs, dedup journal"
Task 7: message_key (дедуп-ключ)
Files:
- Create:
internal/imapx/messagekey.go,internal/imapx/messagekey_test.go
Interfaces:
-
Produces:
func imapx.MessageKey(env *imap.Envelope, size int64) string. Еслиenv.MessageID != ""→ возвращает его; иначеmd5(From|To|Subject|Date|Size)в hex. -
Step 1: Добавить go-imap
Run: go get github.com/emersion/go-imap/v2
Expected: go.mod обновлён.
- Step 2: Написать падающий тест
internal/imapx/messagekey_test.go:
package imapx
import (
"testing"
"time"
"github.com/emersion/go-imap/v2"
)
func TestMessageKeyPrefersMessageID(t *testing.T) {
env := &imap.Envelope{MessageID: "<abc@host>"}
if got := MessageKey(env, 100); got != "<abc@host>" {
t.Fatalf("got %q, want <abc@host>", got)
}
}
func TestMessageKeyFallbackStable(t *testing.T) {
env := &imap.Envelope{
Subject: "Hi",
Date: time.Unix(1700000000, 0).UTC(),
From: []imap.Address{{Mailbox: "a", Host: "x.com"}},
To: []imap.Address{{Mailbox: "b", Host: "y.com"}},
}
k1 := MessageKey(env, 42)
k2 := MessageKey(env, 42)
if k1 != k2 {
t.Fatal("fallback key must be deterministic")
}
if MessageKey(env, 43) == k1 {
t.Fatal("different size must change key")
}
}
- Step 3: Запустить — FAIL
Run: go test ./internal/imapx/ -run MessageKey
Expected: FAIL.
- Step 4: Реализовать
internal/imapx/messagekey.go:
package imapx
import (
"crypto/md5"
"fmt"
"strings"
"github.com/emersion/go-imap/v2"
)
func MessageKey(env *imap.Envelope, size int64) string {
if env != nil && env.MessageID != "" {
return env.MessageID
}
var b strings.Builder
if env != nil {
b.WriteString(addrList(env.From))
b.WriteByte('|')
b.WriteString(addrList(env.To))
b.WriteByte('|')
b.WriteString(env.Subject)
b.WriteByte('|')
b.WriteString(env.Date.UTC().Format("2006-01-02T15:04:05Z"))
}
b.WriteByte('|')
fmt.Fprintf(&b, "%d", size)
sum := md5.Sum([]byte(b.String()))
return fmt.Sprintf("h:%x", sum)
}
func addrList(addrs []imap.Address) string {
parts := make([]string, 0, len(addrs))
for _, a := range addrs {
parts = append(parts, a.Mailbox+"@"+a.Host)
}
return strings.Join(parts, ",")
}
- Step 5: Запустить — PASS
Run: go test ./internal/imapx/ -run MessageKey
Expected: PASS.
- Step 6: Commit
git add internal/imapx/messagekey.go internal/imapx/messagekey_test.go go.mod go.sum
git commit -m "feat(imapx): message dedup key (Message-ID + header fallback)"
Task 8: imapx — dial/тест endpoint и account
Files:
- Create:
internal/imapx/dial.go,internal/imapx/account.go,internal/imapx/dial_test.go
Interfaces:
- Produces:
type Endpoint struct{ Host string; Port int; TLSMode string }.func imapx.Connect(ctx, ep Endpoint) (*imapclient.Client, error)— TLS поtls_mode(ssl= implicit TLS,starttls,plain).func imapx.TestEndpoint(ctx, ep Endpoint) error— Connect + Logout.func imapx.TestLogin(ctx, ep Endpoint, login, pass string) ([]string, error)— Connect + Login + список папок; возвращает имена папок.
Тесты используют greenmail (Docker) как реальный IMAP: env
TEST_IMAP_HOST/TEST_IMAP_PORT; при отсутствии —t.Skip.
- Step 1: Написать падающий тест
internal/imapx/dial_test.go:
package imapx
import (
"context"
"os"
"strconv"
"testing"
)
func testEP(t *testing.T) Endpoint {
host := os.Getenv("TEST_IMAP_HOST")
if host == "" {
t.Skip("TEST_IMAP_HOST not set")
}
port, _ := strconv.Atoi(os.Getenv("TEST_IMAP_PORT"))
return Endpoint{Host: host, Port: port, TLSMode: "plain"}
}
func TestTestEndpointOK(t *testing.T) {
ep := testEP(t)
if err := TestEndpoint(context.Background(), ep); err != nil {
t.Fatalf("TestEndpoint: %v", err)
}
}
func TestTestLoginListsFolders(t *testing.T) {
ep := testEP(t)
// greenmail auto-creates users on first login
folders, err := TestLogin(context.Background(), ep, "user1@localhost", "pass1")
if err != nil {
t.Fatalf("TestLogin: %v", err)
}
found := false
for _, f := range folders {
if f == "INBOX" {
found = true
}
}
if !found {
t.Fatalf("INBOX not in folders: %v", folders)
}
}
- Step 2: Запустить — FAIL
Run: go test ./internal/imapx/ -run Test
Expected: FAIL (типы/функции отсутствуют).
- Step 3: Реализовать dial + account
internal/imapx/dial.go:
package imapx
import (
"context"
"crypto/tls"
"fmt"
"github.com/emersion/go-imap/v2/imapclient"
)
type Endpoint struct {
Host string
Port int
TLSMode string // ssl | starttls | plain
}
func (e Endpoint) addr() string { return fmt.Sprintf("%s:%d", e.Host, e.Port) }
func Connect(ctx context.Context, ep Endpoint) (*imapclient.Client, error) {
var (
c *imapclient.Client
err error
)
switch ep.TLSMode {
case "ssl":
c, err = imapclient.DialTLS(ep.addr(), &imapclient.Options{
TLSConfig: &tls.Config{ServerName: ep.Host},
})
case "starttls":
c, err = imapclient.DialStartTLS(ep.addr(), &imapclient.Options{
TLSConfig: &tls.Config{ServerName: ep.Host},
})
case "plain":
c, err = imapclient.DialInsecure(ep.addr(), nil)
default:
return nil, fmt.Errorf("unknown tls_mode %q", ep.TLSMode)
}
if err != nil {
return nil, err
}
return c, nil
}
func TestEndpoint(ctx context.Context, ep Endpoint) error {
c, err := Connect(ctx, ep)
if err != nil {
return err
}
return c.Logout().Wait()
}
internal/imapx/account.go:
package imapx
import (
"context"
"github.com/emersion/go-imap/v2"
)
func TestLogin(ctx context.Context, ep Endpoint, login, pass string) ([]string, error) {
c, err := Connect(ctx, ep)
if err != nil {
return nil, err
}
defer c.Logout().Wait()
if err := c.Login(login, pass).Wait(); err != nil {
return nil, err
}
mboxes, err := c.List("", "*", nil).Collect()
if err != nil {
return nil, err
}
names := make([]string, 0, len(mboxes))
for _, m := range mboxes {
names = append(names, m.Mailbox)
}
return names, nil
}
- Step 4: Поднять greenmail и запустить — PASS
Run:
docker run --rm -d --name gm -p 3143:3143 -p 3025:3025 \
-e GREENMAIL_OPTS='-Dgreenmail.setup.test.all -Dgreenmail.users=user1:pass1 -Dgreenmail.auth.disabled' \
greenmail/standalone:2.1.0
TEST_IMAP_HOST=localhost TEST_IMAP_PORT=3143 go test ./internal/imapx/ -run Test
docker rm -f gm
Expected: PASS.
- Step 5: Commit
git add internal/imapx/dial.go internal/imapx/account.go internal/imapx/dial_test.go
git commit -m "feat(imapx): connect, endpoint test, login test with folder listing"
Task 9: imapx — стриминговое копирование папки
Files:
- Create:
internal/imapx/copy.go,internal/imapx/copy_test.go
Interfaces:
-
Consumes:
MessageKey,imapclient.Client. -
Produces:
type CopyDeps struct { IsMigrated func(key string) (bool, error); MarkMigrated func(folder, key string) error; OnProgress func(copied, skipped int) }.type CopyResult struct { Copied, Skipped, Errors int }.func imapx.CopyFolder(ctx, src, dst *imapclient.Client, folder string, deps CopyDeps) (CopyResult, error)—EXAMINEsrc,FETCHenvelope+uid+size по всем; для новых (по дедупу) —FETCH BODY[]стримом →APPENDв dst c флагами и internal date →MarkMigrated.
-
Step 1: Написать интеграционный тест идемпотентности
internal/imapx/copy_test.go:
package imapx
import (
"context"
"testing"
)
// Требует два ящика на greenmail. Первый запуск копирует N, второй — 0 (все skipped).
func TestCopyFolderIdempotent(t *testing.T) {
ep := testEP(t) // plain greenmail
ctx := context.Background()
// подготовка: APPEND 2 письма в INBOX источника через отдельное соединение
seedInbox(t, ep, "src@localhost", "p", 2)
src, err := Connect(ctx, ep)
if err != nil { t.Fatal(err) }
defer src.Logout().Wait()
if err := src.Login("src@localhost", "p").Wait(); err != nil { t.Fatal(err) }
dst, err := Connect(ctx, ep)
if err != nil { t.Fatal(err) }
defer dst.Logout().Wait()
if err := dst.Login("dst@localhost", "p").Wait(); err != nil { t.Fatal(err) }
seen := map[string]bool{}
deps := CopyDeps{
IsMigrated: func(k string) (bool, error) { return seen[k], nil },
MarkMigrated: func(_ , k string) error { seen[k] = true; return nil },
OnProgress: func(_, _ int) {},
}
r1, err := CopyFolder(ctx, src, dst, "INBOX", deps)
if err != nil { t.Fatalf("run1: %v", err) }
if r1.Copied != 2 {
t.Fatalf("run1 copied=%d want 2", r1.Copied)
}
r2, err := CopyFolder(ctx, src, dst, "INBOX", deps)
if err != nil { t.Fatalf("run2: %v", err) }
if r2.Copied != 0 || r2.Skipped != 2 {
t.Fatalf("run2 copied=%d skipped=%d want 0/2", r2.Copied, r2.Skipped)
}
}
Реализатор пишет хелпер
seedInbox(t, ep, login, pass string, n int)в этом же тест-файле: логинится иAPPENDn минимальных писем c уникальнымMessage-IDв INBOX. Используйc.Append("INBOX", size, nil)(см. context7-пример APPEND).
- Step 2: Запустить — FAIL
Run: TEST_IMAP_HOST=localhost TEST_IMAP_PORT=3143 go test ./internal/imapx/ -run CopyFolder
Expected: FAIL.
- Step 3: Реализовать CopyFolder
internal/imapx/copy.go:
package imapx
import (
"bytes"
"context"
"fmt"
"io"
"github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
)
type CopyDeps struct {
IsMigrated func(key string) (bool, error)
MarkMigrated func(folder, key string) error
OnProgress func(copied, skipped int)
}
type CopyResult struct {
Copied int
Skipped int
Errors int
}
func CopyFolder(ctx context.Context, src, dst *imapclient.Client, folder string, deps CopyDeps) (CopyResult, error) {
var res CopyResult
sel, err := src.Select(folder, &imap.SelectOptions{ReadOnly: true}).Wait()
if err != nil {
return res, fmt.Errorf("select src %q: %w", folder, err)
}
if sel.NumMessages == 0 {
return res, nil
}
// 1) Собрать envelope+uid+size по всем письмам (лёгкий проход, без тел).
metaSet := imap.SeqSetRange(1, sel.NumMessages)
metas, err := src.Fetch(metaSet, &imap.FetchOptions{
UID: true, Envelope: true, RFC822Size: true,
}).Collect()
if err != nil {
return res, fmt.Errorf("fetch meta: %w", err)
}
// dst-папка должна существовать (idempotent create).
_ = dst.Create(folder, nil).Wait()
for _, m := range metas {
key := MessageKey(m.Envelope, m.RFC822Size)
already, err := deps.IsMigrated(key)
if err != nil {
res.Errors++
continue
}
if already {
res.Skipped++
deps.OnProgress(res.Copied, res.Skipped)
continue
}
if err := streamOne(ctx, src, dst, folder, m.UID, m.RFC822Size, m.Flags); err != nil {
res.Errors++
continue
}
if err := deps.MarkMigrated(folder, key); err != nil {
res.Errors++
continue
}
res.Copied++
deps.OnProgress(res.Copied, res.Skipped)
}
return res, nil
}
// streamOne: FETCH BODY[] одного письма и APPEND в dst без спула на диск.
func streamOne(ctx context.Context, src, dst *imapclient.Client, folder string, uid imap.UID, size int64, flags []imap.Flag) error {
bodySection := &imap.FetchItemBodySection{}
fetchCmd := src.Fetch(imap.UIDSetNum(uid), &imap.FetchOptions{
BodySection: []*imap.FetchItemBodySection{bodySection},
})
defer fetchCmd.Close()
msg := fetchCmd.Next()
if msg == nil {
return fmt.Errorf("no message for uid %v", uid)
}
var body []byte
for {
item := msg.Next()
if item == nil {
break
}
if d, ok := item.(imapclient.FetchItemDataBodySection); ok {
b, err := io.ReadAll(d.Literal)
if err != nil {
return err
}
body = b
}
}
if err := fetchCmd.Close(); err != nil {
return err
}
if body == nil {
return fmt.Errorf("empty body uid %v", uid)
}
appendCmd := dst.Append(folder, int64(len(body)), &imap.AppendOptions{Flags: keepFlags(flags)})
if _, err := io.Copy(appendCmd, bytes.NewReader(body)); err != nil {
return err
}
if err := appendCmd.Close(); err != nil {
return err
}
_, err := appendCmd.Wait()
return err
}
// keepFlags отбрасывает \Recent (нельзя задавать при APPEND).
func keepFlags(flags []imap.Flag) []imap.Flag {
out := make([]imap.Flag, 0, len(flags))
for _, f := range flags {
if f == imap.FlagRecent {
continue
}
out = append(out, f)
}
return out
}
Замечание: тело письма держится в
[]byteтолько на время одной итерации и не сохраняется на диск — соответствует global constraint. Для очень крупных писем допустимо; при желании можно заменить на прямойio.Copy(appendCmd, d.Literal)в один проход, но тогда размер надо брать изFETCH RFC822.SIZE(size). Оставляем буфер ради простоты и корректной длины APPEND-литерала.
- Step 4: Запустить — PASS
Run: TEST_IMAP_HOST=localhost TEST_IMAP_PORT=3143 go test ./internal/imapx/ -run CopyFolder
Expected: PASS (run1 copied=2, run2 skipped=2).
- Step 5: Commit
git add internal/imapx/copy.go internal/imapx/copy_test.go
git commit -m "feat(imapx): streaming per-folder copy with dedup, idempotent"
Task 10: WebSocket hub
Files:
- Create:
internal/wshub/wshub.go,internal/wshub/wshub_test.go
Interfaces:
-
Produces:
type Event struct { Type stringjson:"type"; TaskID int64json:"task_id"; Data anyjson:"data,omitempty"}.type Hub struct{...};func wshub.New() *Hub.func (*Hub) Subscribe(taskID int64) (id int64, ch <-chan Event);func (*Hub) Unsubscribe(taskID, id int64);func (*Hub) Publish(ev Event)(неблокирующая рассылка подписчикам данного task_id).
-
Step 1: Написать падающий тест
internal/wshub/wshub_test.go:
package wshub
import (
"testing"
"time"
)
func TestPublishReachesSubscriber(t *testing.T) {
h := New()
_, ch := h.Subscribe(7)
h.Publish(Event{Type: "progress", TaskID: 7, Data: map[string]int{"copied": 3}})
select {
case ev := <-ch:
if ev.Type != "progress" || ev.TaskID != 7 {
t.Fatalf("bad event %+v", ev)
}
case <-time.After(time.Second):
t.Fatal("no event received")
}
}
func TestPublishIsolatedByTask(t *testing.T) {
h := New()
_, ch := h.Subscribe(1)
h.Publish(Event{Type: "x", TaskID: 2})
select {
case <-ch:
t.Fatal("subscriber for task 1 must not get task 2 event")
case <-time.After(100 * time.Millisecond):
}
}
- Step 2: Запустить — FAIL
Run: go test ./internal/wshub/
Expected: FAIL.
- Step 3: Реализовать
internal/wshub/wshub.go:
package wshub
import "sync"
type Event struct {
Type string `json:"type"`
TaskID int64 `json:"task_id"`
Data any `json:"data,omitempty"`
}
type Hub struct {
mu sync.Mutex
nextID int64
subs map[int64]map[int64]chan Event // taskID -> subID -> ch
}
func New() *Hub {
return &Hub{subs: make(map[int64]map[int64]chan Event)}
}
func (h *Hub) Subscribe(taskID int64) (int64, <-chan Event) {
h.mu.Lock()
defer h.mu.Unlock()
h.nextID++
id := h.nextID
ch := make(chan Event, 64)
if h.subs[taskID] == nil {
h.subs[taskID] = make(map[int64]chan Event)
}
h.subs[taskID][id] = ch
return id, ch
}
func (h *Hub) Unsubscribe(taskID, id int64) {
h.mu.Lock()
defer h.mu.Unlock()
if m := h.subs[taskID]; m != nil {
if ch, ok := m[id]; ok {
close(ch)
delete(m, id)
}
if len(m) == 0 {
delete(h.subs, taskID)
}
}
}
func (h *Hub) Publish(ev Event) {
h.mu.Lock()
defer h.mu.Unlock()
for _, ch := range h.subs[ev.TaskID] {
select {
case ch <- ev:
default: // медленный подписчик — событие дропаем, не блокируем воркер
}
}
}
- Step 4: Запустить — PASS
Run: go test ./internal/wshub/
Expected: PASS.
- Step 5: Commit
git add internal/wshub/
git commit -m "feat(wshub): per-task event hub with non-blocking publish"
Task 11: Orchestrator (worker pool + run)
Files:
- Create:
internal/orchestrator/orchestrator.go,internal/orchestrator/orchestrator_test.go
Interfaces:
-
Consumes:
store.Store,imapx(Connect/CopyFolder/TestLogin),wshub.Hub,crypto.Decrypt,config.EncKey,config.WorkerConcurrency. -
Produces:
type Orchestrator struct{...};func orchestrator.New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator.func (*Orchestrator) TestAccounts(ctx, taskID int64) error— для каждого account:TestLoginв src и dst, записьSetAccountTestStatus, публикация событий.func (*Orchestrator) Run(ctx, taskID int64) (runID int64, err error)— проверяет гейт (все accounts прошли обе проверки), создаёт run, параллельно (пул) гоняетCopyFolderпо всем папкам каждого account, обновляет счётчики иruns, публикует событияrun_started/account_*/progress/run_done.- Ошибка гейта:
var ErrNotTested = errors.New("accounts not fully tested").
-
Step 1: Написать тест гейта (без БД — через маленький интерфейс)
Для юнит-теста гейта выдели чистую функцию
func gateOK(accs []store.Account) bool(всеtest_src_status=="ok" && test_dst_status=="ok"). Тестируем её изолированно; полныйRunпокрывается E2E в Task 17.
internal/orchestrator/orchestrator_test.go:
package orchestrator
import (
"testing"
"github.com/vasyansk/imap-copier/internal/store"
)
func TestGateOK(t *testing.T) {
ok := []store.Account{
{TestSrcStatus: "ok", TestDstStatus: "ok"},
{TestSrcStatus: "ok", TestDstStatus: "ok"},
}
if !gateOK(ok) {
t.Fatal("all ok must pass gate")
}
bad := []store.Account{{TestSrcStatus: "ok", TestDstStatus: "fail"}}
if gateOK(bad) {
t.Fatal("any non-ok must fail gate")
}
if gateOK(nil) {
t.Fatal("empty accounts must fail gate")
}
}
- Step 2: Запустить — FAIL
Run: go test ./internal/orchestrator/
Expected: FAIL.
- Step 3: Реализовать orchestrator
internal/orchestrator/orchestrator.go:
package orchestrator
import (
"context"
"errors"
"log/slog"
"sync"
"github.com/vasyansk/imap-copier/internal/crypto"
"github.com/vasyansk/imap-copier/internal/imapx"
"github.com/vasyansk/imap-copier/internal/store"
"github.com/vasyansk/imap-copier/internal/wshub"
)
var ErrNotTested = errors.New("accounts not fully tested")
type Orchestrator struct {
store *store.Store
hub *wshub.Hub
encKey []byte
concurrency int
}
func New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator {
return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency}
}
func gateOK(accs []store.Account) bool {
if len(accs) == 0 {
return false
}
for _, a := range accs {
if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" {
return false
}
}
return true
}
func (o *Orchestrator) endpoints(ctx context.Context, task store.Task) (imapx.Endpoint, imapx.Endpoint, error) {
src, err := o.store.GetEndpoint(ctx, task.SrcEndpointID)
if err != nil {
return imapx.Endpoint{}, imapx.Endpoint{}, err
}
dst, err := o.store.GetEndpoint(ctx, task.DstEndpointID)
if err != nil {
return imapx.Endpoint{}, imapx.Endpoint{}, err
}
toEP := func(e store.Endpoint) imapx.Endpoint {
return imapx.Endpoint{Host: e.Host, Port: e.Port, TLSMode: e.TLSMode}
}
return toEP(src), toEP(dst), nil
}
func (o *Orchestrator) TestAccounts(ctx context.Context, taskID int64) error {
task, err := o.store.GetTask(ctx, taskID)
if err != nil {
return err
}
srcEP, dstEP, err := o.endpoints(ctx, task)
if err != nil {
return err
}
accs, err := o.store.ListAccountsByTask(ctx, taskID)
if err != nil {
return err
}
for _, a := range accs {
o.testSide(ctx, srcEP, a.ID, "src", a.SrcLogin, a.SrcPassEnc, taskID)
o.testSide(ctx, dstEP, a.ID, "dst", a.DstLogin, a.DstPassEnc, taskID)
}
return nil
}
func (o *Orchestrator) testSide(ctx context.Context, ep imapx.Endpoint, accID int64, side, login, passEnc string, taskID int64) {
status := "ok"
pass, err := crypto.Decrypt(o.encKey, passEnc)
if err == nil {
_, err = imapx.TestLogin(ctx, ep, login, string(pass))
}
if err != nil {
status = "fail"
slog.Warn("account test failed", "account", accID, "side", side, "err", err)
}
_ = o.store.SetAccountTestStatus(ctx, accID, side, status)
o.hub.Publish(wshub.Event{Type: "account_test", TaskID: taskID,
Data: map[string]any{"account_id": accID, "side": side, "status": status}})
}
func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) {
task, err := o.store.GetTask(ctx, taskID)
if err != nil {
return 0, err
}
accs, err := o.store.ListAccountsByTask(ctx, taskID)
if err != nil {
return 0, err
}
if !gateOK(accs) {
return 0, ErrNotTested
}
srcEP, dstEP, err := o.endpoints(ctx, task)
if err != nil {
return 0, err
}
runID, err := o.store.CreateRun(ctx, taskID)
if err != nil {
return 0, err
}
_ = o.store.SetTaskStatus(ctx, taskID, "running")
o.hub.Publish(wshub.Event{Type: "run_started", TaskID: taskID, Data: map[string]any{"run_id": runID}})
go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP)
return runID, nil
}
func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint) {
var (
mu sync.Mutex
totCopied, totSkipped, totErr int64
)
sem := make(chan struct{}, o.concurrency)
var wg sync.WaitGroup
for _, a := range accs {
wg.Add(1)
sem <- struct{}{}
go func(a store.Account) {
defer wg.Done()
defer func() { <-sem }()
c, s, e := o.runAccount(ctx, task, runID, a, srcEP, dstEP)
mu.Lock()
totCopied += c
totSkipped += s
totErr += e
mu.Unlock()
}(a)
}
wg.Wait()
_ = o.store.FinishRun(ctx, runID, "done", totCopied, totSkipped, totErr)
_ = o.store.SetTaskStatus(ctx, task.ID, "done")
o.hub.Publish(wshub.Event{Type: "run_done", TaskID: task.ID,
Data: map[string]any{"run_id": runID, "copied": totCopied, "skipped": totSkipped, "errors": totErr}})
}
func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID int64, a store.Account, srcEP, dstEP imapx.Endpoint) (int64, int64, int64) {
o.hub.Publish(wshub.Event{Type: "account_started", TaskID: task.ID, Data: map[string]any{"account_id": a.ID}})
_ = o.store.SetAccountStatus(ctx, a.ID, "running")
srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc)
if err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
}
dstPass, err := crypto.Decrypt(o.encKey, a.DstPassEnc)
if err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
}
src, err := imapx.Connect(ctx, srcEP)
if err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
}
defer src.Logout().Wait()
if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
}
dst, err := imapx.Connect(ctx, dstEP)
if err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
}
defer dst.Logout().Wait()
if err := dst.Login(a.DstLogin, string(dstPass)).Wait(); err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
}
folders, err := imapx.TestLogin(ctx, srcEP, a.SrcLogin, string(srcPass))
if err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
}
var copied, skipped, errs int64
deps := imapx.CopyDeps{
IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) },
MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) },
OnProgress: func(c, s int) {
o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID,
Data: map[string]any{"account_id": a.ID, "copied": c, "skipped": s}})
},
}
for _, folder := range folders {
dstFolder := folder
if m, ok := task.FolderMapping[folder]; ok {
dstFolder = m
}
res, err := imapx.CopyFolder(ctx, src, dst, dstFolder, deps)
if err != nil {
slog.Warn("folder copy error", "account", a.ID, "folder", folder, "err", err)
errs++
}
copied += int64(res.Copied)
skipped += int64(res.Skipped)
errs += int64(res.Errors)
_ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors))
}
_ = o.store.SetAccountStatus(ctx, a.ID, "done")
o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID,
Data: map[string]any{"account_id": a.ID, "copied": copied, "skipped": skipped, "errors": errs}})
slog.Info("account copied", "account", a.ID, "copied", copied, "skipped", skipped, "errors", errs)
return copied, skipped, errs
}
func (o *Orchestrator) accountFailed(ctx context.Context, taskID, accID int64, err error) (int64, int64, int64) {
slog.Error("account failed", "account", accID, "err", err)
_ = o.store.SetAccountStatus(ctx, accID, "error")
o.hub.Publish(wshub.Event{Type: "error", TaskID: taskID,
Data: map[string]any{"account_id": accID, "error": err.Error()}})
return 0, 0, 1
}
Замечание:
CopyFolderвнутри вызываетsrc.Select(folder…)— используем исходное имя папки для чтения. В цикле выше вCopyFolderпередаётсяdstFolderи как read-source, и как append-target — это баг маппинга. Реализатор ДОЛЖЕН расширить сигнатуруCopyFolder(ctx, src, dst, srcFolder, dstFolder string, deps)(Task 9 обновить:Select(srcFolder),Create/Append(dstFolder)), и здесь вызыватьCopyFolder(ctx, src, dst, folder, dstFolder, deps). Тест Task 9 передаёт одинаковые имена. Внести правку при реализации этой задачи.
- Step 4: Запустить — PASS
Run: go test ./internal/orchestrator/
Expected: PASS (gateOK).
- Step 5: Commit
git add internal/orchestrator/ internal/imapx/copy.go internal/imapx/copy_test.go
git commit -m "feat(orchestrator): worker pool run + account testing gate"
Task 12: CSV импорт
Files:
- Create:
internal/csvimport/csvimport.go,internal/csvimport/csvimport_test.go
Interfaces:
-
Produces:
type Row struct{ SrcLogin, SrcPass, DstLogin, DstPass string }.func csvimport.Parse(r io.Reader) ([]Row, error)— 4 колонки на строку, trim, пропуск пустых строк, ошибка при неверном числе колонок или пустых полях; дублиsrc_login→ ошибка.
-
Step 1: Написать падающий тест
internal/csvimport/csvimport_test.go:
package csvimport
import (
"strings"
"testing"
)
func TestParseOK(t *testing.T) {
rows, err := Parse(strings.NewReader("a@x,p1,a@y,p2\nb@x,p3,b@y,p4\n"))
if err != nil {
t.Fatalf("parse: %v", err)
}
if len(rows) != 2 || rows[0].SrcLogin != "a@x" || rows[1].DstPass != "p4" {
t.Fatalf("bad rows: %+v", rows)
}
}
func TestParseRejectsBadColumns(t *testing.T) {
if _, err := Parse(strings.NewReader("a,b,c\n")); err == nil {
t.Fatal("3 columns must error")
}
}
func TestParseRejectsDuplicateSrc(t *testing.T) {
if _, err := Parse(strings.NewReader("a@x,p,a@y,p\na@x,q,c@y,q\n")); err == nil {
t.Fatal("duplicate src_login must error")
}
}
func TestParseRejectsEmptyField(t *testing.T) {
if _, err := Parse(strings.NewReader("a@x,,a@y,p\n")); err == nil {
t.Fatal("empty password must error")
}
}
- Step 2: Запустить — FAIL
Run: go test ./internal/csvimport/
Expected: FAIL.
- Step 3: Реализовать
internal/csvimport/csvimport.go:
package csvimport
import (
"encoding/csv"
"fmt"
"io"
"strings"
)
type Row struct {
SrcLogin string
SrcPass string
DstLogin string
DstPass string
}
func Parse(r io.Reader) ([]Row, error) {
cr := csv.NewReader(r)
cr.FieldsPerRecord = -1 // проверяем сами
cr.TrimLeadingSpace = true
var rows []Row
seen := map[string]bool{}
line := 0
for {
rec, err := cr.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
line++
if len(rec) == 1 && strings.TrimSpace(rec[0]) == "" {
continue // пустая строка
}
if len(rec) != 4 {
return nil, fmt.Errorf("line %d: expected 4 columns, got %d", line, len(rec))
}
for i := range rec {
rec[i] = strings.TrimSpace(rec[i])
if rec[i] == "" {
return nil, fmt.Errorf("line %d: column %d is empty", line, i+1)
}
}
if seen[rec[0]] {
return nil, fmt.Errorf("line %d: duplicate src_login %q", line, rec[0])
}
seen[rec[0]] = true
rows = append(rows, Row{SrcLogin: rec[0], SrcPass: rec[1], DstLogin: rec[2], DstPass: rec[3]})
}
if len(rows) == 0 {
return nil, fmt.Errorf("no rows parsed")
}
return rows, nil
}
- Step 4: Запустить — PASS
Run: go test ./internal/csvimport/
Expected: PASS.
- Step 5: Commit
git add internal/csvimport/
git commit -m "feat(csvimport): validated CSV account parser"
Task 13: HTTP — auth middleware и login
Files:
- Create:
internal/httpapi/auth.go,internal/httpapi/auth_test.go
Interfaces:
-
Consumes:
config.Config(AuthUser/AuthPass/SessionSecret),crypto.SignSession/VerifySession. -
Produces:
type Server struct{ cfg config.Config; store *store.Store; orch *orchestrator.Orchestrator; hub *wshub.Hub }.func (*Server) handleLogin(w, r)— POST JSON{user,pass}; при совпадении — Set-Cookiesession.func (*Server) requireAuth(next http.Handler) http.Handler— проверяет cookie, иначе 401.
-
Step 1: Написать падающий тест
internal/httpapi/auth_test.go:
package httpapi
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/vasyansk/imap-copier/internal/config"
)
func testServer() *Server {
return &Server{cfg: config.Config{
AuthUser: "admin", AuthPass: "pw", SessionSecret: []byte("sekret"),
}}
}
func TestLoginSetsCookie(t *testing.T) {
s := testServer()
req := httptest.NewRequest("POST", "/api/login", strings.NewReader(`{"user":"admin","pass":"pw"}`))
rw := httptest.NewRecorder()
s.handleLogin(rw, req)
if rw.Code != http.StatusOK {
t.Fatalf("code=%d", rw.Code)
}
if len(rw.Result().Cookies()) == 0 {
t.Fatal("no session cookie set")
}
}
func TestRequireAuthBlocksNoCookie(t *testing.T) {
s := testServer()
h := s.requireAuth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }))
rw := httptest.NewRecorder()
h.ServeHTTP(rw, httptest.NewRequest("GET", "/api/tasks", nil))
if rw.Code != http.StatusUnauthorized {
t.Fatalf("want 401, got %d", rw.Code)
}
}
func TestRequireAuthAllowsValidCookie(t *testing.T) {
s := testServer()
// логинимся, забираем cookie, повторяем запрос
lr := httptest.NewRequest("POST", "/api/login", strings.NewReader(`{"user":"admin","pass":"pw"}`))
lrw := httptest.NewRecorder()
s.handleLogin(lrw, lr)
cookie := lrw.Result().Cookies()[0]
h := s.requireAuth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }))
req := httptest.NewRequest("GET", "/api/tasks", nil)
req.AddCookie(cookie)
rw := httptest.NewRecorder()
h.ServeHTTP(rw, req)
if rw.Code != 200 {
t.Fatalf("want 200, got %d", rw.Code)
}
}
- Step 2: Запустить — FAIL
Run: go test ./internal/httpapi/ -run Auth
Expected: FAIL.
- Step 3: Реализовать
internal/httpapi/auth.go:
package httpapi
import (
"crypto/subtle"
"encoding/json"
"net/http"
"time"
"github.com/vasyansk/imap-copier/internal/config"
"github.com/vasyansk/imap-copier/internal/crypto"
"github.com/vasyansk/imap-copier/internal/orchestrator"
"github.com/vasyansk/imap-copier/internal/store"
"github.com/vasyansk/imap-copier/internal/wshub"
)
const cookieName = "session"
type Server struct {
cfg config.Config
store *store.Store
orch *orchestrator.Orchestrator
hub *wshub.Hub
}
func NewServer(cfg config.Config, s *store.Store, orch *orchestrator.Orchestrator, hub *wshub.Hub) *Server {
return &Server{cfg: cfg, store: s, orch: orch, hub: hub}
}
func (s *Server) handleLogin(w http.ResponseWriter, r *http.Request) {
var body struct{ User, Pass string }
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
uOK := subtle.ConstantTimeCompare([]byte(body.User), []byte(s.cfg.AuthUser)) == 1
pOK := subtle.ConstantTimeCompare([]byte(body.Pass), []byte(s.cfg.AuthPass)) == 1
if !uOK || !pOK {
http.Error(w, "invalid credentials", http.StatusUnauthorized)
return
}
tok := crypto.SignSession(s.cfg.SessionSecret, body.User, time.Now().Add(24*time.Hour))
http.SetCookie(w, &http.Cookie{
Name: cookieName, Value: tok, Path: "/",
HttpOnly: true, SameSite: http.SameSiteLaxMode, MaxAge: 86400,
})
w.WriteHeader(http.StatusOK)
}
func (s *Server) handleLogout(w http.ResponseWriter, r *http.Request) {
http.SetCookie(w, &http.Cookie{Name: cookieName, Value: "", Path: "/", MaxAge: -1})
w.WriteHeader(http.StatusOK)
}
func (s *Server) requireAuth(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := r.Cookie(cookieName)
if err != nil {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
if _, ok := crypto.VerifySession(s.cfg.SessionSecret, c.Value, time.Now()); !ok {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
next.ServeHTTP(w, r)
})
}
- Step 4: Запустить — PASS
Run: go test ./internal/httpapi/ -run Auth
Expected: PASS.
- Step 5: Commit
git add internal/httpapi/auth.go internal/httpapi/auth_test.go
git commit -m "feat(httpapi): env-based login and session auth middleware"
Task 14: HTTP — REST-ресурсы (endpoints, tasks, accounts, csv, run)
Files:
- Create:
internal/httpapi/endpoints.go,internal/httpapi/tasks.go,internal/httpapi/accounts.go,internal/httpapi/run.go,internal/httpapi/dto_test.go
Interfaces:
-
Consumes:
Server,store,crypto.Encrypt,csvimport,orchestrator. -
Produces (handlers, все methodами
Server):handleCreateEndpoint,handleListEndpoints(POST/GET/api/endpoints).handleCreateTask,handleListTasks,handleGetTask(GET возвращает task + accounts со счётчиками, без паролей).handleCreateAccount(POST/api/tasks/{id}/accounts, шифрует пароли).handleImportCSV(POST/api/tasks/{id}/import, multipart file →csvimport.Parse→ создать accounts).handleTestAccounts(POST/api/tasks/{id}/test).handleRun(POST/api/tasks/{id}/run; приErrNotTested→ 409).- Хелпер
accountDTO— マッピングstore.Accountв JSON без*_pass_enc.
-
Step 1: Написать тест DTO без паролей
internal/httpapi/dto_test.go:
package httpapi
import (
"encoding/json"
"strings"
"testing"
"github.com/vasyansk/imap-copier/internal/store"
)
func TestAccountDTOHidesPasswords(t *testing.T) {
a := store.Account{ID: 1, SrcLogin: "u", SrcPassEnc: "SECRET_ENC", DstLogin: "v", DstPassEnc: "SECRET2"}
b, _ := json.Marshal(accountDTO(a))
s := string(b)
if strings.Contains(s, "SECRET_ENC") || strings.Contains(s, "SECRET2") || strings.Contains(strings.ToLower(s), "pass") {
t.Fatalf("DTO leaks password material: %s", s)
}
if !strings.Contains(s, `"src_login":"u"`) {
t.Fatalf("DTO missing login: %s", s)
}
}
- Step 2: Запустить — FAIL
Run: go test ./internal/httpapi/ -run DTO
Expected: FAIL.
- Step 3: Реализовать handlers
internal/httpapi/accounts.go (включает accountDTO и создание аккаунта):
package httpapi
import (
"encoding/json"
"net/http"
"strconv"
"github.com/vasyansk/imap-copier/internal/crypto"
"github.com/vasyansk/imap-copier/internal/store"
)
type AccountView struct {
ID int64 `json:"id"`
SrcLogin string `json:"src_login"`
DstLogin string `json:"dst_login"`
TestSrcStatus string `json:"test_src_status"`
TestDstStatus string `json:"test_dst_status"`
Status string `json:"status"`
Copied int64 `json:"copied"`
Skipped int64 `json:"skipped"`
Errors int64 `json:"errors"`
}
func accountDTO(a store.Account) AccountView {
return AccountView{
ID: a.ID, SrcLogin: a.SrcLogin, DstLogin: a.DstLogin,
TestSrcStatus: a.TestSrcStatus, TestDstStatus: a.TestDstStatus,
Status: a.Status, Copied: a.Copied, Skipped: a.Skipped, Errors: a.Errors,
}
}
func pathID(r *http.Request, name string) (int64, error) {
return strconv.ParseInt(r.PathValue(name), 10, 64)
}
func (s *Server) handleCreateAccount(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
var body struct {
SrcLogin, SrcPass, DstLogin, DstPass string
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
srcEnc, err := crypto.Encrypt(s.cfg.EncKey, []byte(body.SrcPass))
if err != nil {
http.Error(w, "encrypt", http.StatusInternalServerError)
return
}
dstEnc, err := crypto.Encrypt(s.cfg.EncKey, []byte(body.DstPass))
if err != nil {
http.Error(w, "encrypt", http.StatusInternalServerError)
return
}
id, err := s.store.CreateAccount(r.Context(), store.Account{
TaskID: taskID, SrcLogin: body.SrcLogin, SrcPassEnc: srcEnc,
DstLogin: body.DstLogin, DstPassEnc: dstEnc,
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusCreated, map[string]int64{"id": id})
}
internal/httpapi/endpoints.go:
package httpapi
import (
"encoding/json"
"net/http"
"github.com/vasyansk/imap-copier/internal/store"
)
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
func (s *Server) handleCreateEndpoint(w http.ResponseWriter, r *http.Request) {
var e store.Endpoint
if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
if e.TLSMode != "ssl" && e.TLSMode != "starttls" && e.TLSMode != "plain" {
http.Error(w, "tls_mode must be ssl|starttls|plain", http.StatusBadRequest)
return
}
id, err := s.store.CreateEndpoint(r.Context(), e)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusCreated, map[string]int64{"id": id})
}
func (s *Server) handleListEndpoints(w http.ResponseWriter, r *http.Request) {
eps, err := s.store.ListEndpoints(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, eps)
}
internal/httpapi/tasks.go:
package httpapi
import (
"encoding/json"
"net/http"
"github.com/vasyansk/imap-copier/internal/store"
)
func (s *Server) handleCreateTask(w http.ResponseWriter, r *http.Request) {
var t store.Task
if err := json.NewDecoder(r.Body).Decode(&t); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
id, err := s.store.CreateTask(r.Context(), t)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusCreated, map[string]int64{"id": id})
}
func (s *Server) handleListTasks(w http.ResponseWriter, r *http.Request) {
tasks, err := s.store.ListTasks(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, tasks)
}
func (s *Server) handleGetTask(w http.ResponseWriter, r *http.Request) {
id, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
task, err := s.store.GetTask(r.Context(), id)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
accs, err := s.store.ListAccountsByTask(r.Context(), id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
views := make([]AccountView, 0, len(accs))
for _, a := range accs {
views = append(views, accountDTO(a))
}
writeJSON(w, http.StatusOK, map[string]any{"task": task, "accounts": views})
}
internal/httpapi/run.go (import CSV, test, run):
package httpapi
import (
"errors"
"net/http"
"github.com/vasyansk/imap-copier/internal/crypto"
"github.com/vasyansk/imap-copier/internal/csvimport"
"github.com/vasyansk/imap-copier/internal/orchestrator"
"github.com/vasyansk/imap-copier/internal/store"
)
func (s *Server) handleImportCSV(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
file, _, err := r.FormFile("file")
if err != nil {
http.Error(w, "file required", http.StatusBadRequest)
return
}
defer file.Close()
rows, err := csvimport.Parse(file)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, row := range rows {
srcEnc, _ := crypto.Encrypt(s.cfg.EncKey, []byte(row.SrcPass))
dstEnc, _ := crypto.Encrypt(s.cfg.EncKey, []byte(row.DstPass))
if _, err := s.store.CreateAccount(r.Context(), store.Account{
TaskID: taskID, SrcLogin: row.SrcLogin, SrcPassEnc: srcEnc,
DstLogin: row.DstLogin, DstPassEnc: dstEnc,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
writeJSON(w, http.StatusCreated, map[string]int{"imported": len(rows)})
}
func (s *Server) handleTestAccounts(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
go s.orch.TestAccounts(r.Context(), taskID) // прогресс через WS
w.WriteHeader(http.StatusAccepted)
}
func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
runID, err := s.orch.Run(r.Context(), taskID)
if errors.Is(err, orchestrator.ErrNotTested) {
http.Error(w, "accounts must pass connection tests first", http.StatusConflict)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusAccepted, map[string]int64{"run_id": runID})
}
Примечание:
handleTestAccounts/handleRunзапускают фон черезr.Context(), который отменяется по завершении HTTP-запроса. В реализации использоватьcontext.WithoutCancel(r.Context())или фоновыйcontext.Background()— иначе работа оборвётся. Внести при реализации (вorch.Runуже естьcontext.WithoutCancelдля внутренней горутины, ноTestAccountsи внешний вызов Run нужно защитить аналогично).
- Step 4: Запустить — PASS
Run: go test ./internal/httpapi/ -run DTO
Expected: PASS.
- Step 5: Commit
git add internal/httpapi/endpoints.go internal/httpapi/tasks.go internal/httpapi/accounts.go internal/httpapi/run.go internal/httpapi/dto_test.go
git commit -m "feat(httpapi): REST resources for endpoints/tasks/accounts/csv/run"
Task 15: HTTP — WebSocket endpoint, router, embed static
Files:
- Create:
internal/httpapi/ws.go,internal/httpapi/router.go,internal/httpapi/static.go - Create:
cmd/server/main.go
Interfaces:
-
Consumes: всё выше.
-
Produces:
handleWS— апгрейд/ws?task_id=…, подписка на hub, стрим событий в сокет.func (*Server) Router() http.Handler— маршруты + auth + static.static.go://go:embed all:webdistembed.FS, отдача SPA (fallback наindex.html).cmd/server/main.go: load config → migrate → store → hub → orchestrator → server →http.ListenAndServe.
-
Step 1: Добавить websocket-зависимость
Run: go get github.com/coder/websocket
Expected: go.mod обновлён.
- Step 2: Написать smoke-тест роутера (health + auth-gate)
internal/httpapi/router_test.go:
package httpapi
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/vasyansk/imap-copier/internal/config"
)
func TestHealthzOpen(t *testing.T) {
s := &Server{cfg: config.Config{SessionSecret: []byte("x")}}
rw := httptest.NewRecorder()
s.Router().ServeHTTP(rw, httptest.NewRequest("GET", "/healthz", nil))
if rw.Code != http.StatusOK {
t.Fatalf("healthz=%d", rw.Code)
}
}
func TestTasksRequiresAuth(t *testing.T) {
s := &Server{cfg: config.Config{SessionSecret: []byte("x")}}
rw := httptest.NewRecorder()
s.Router().ServeHTTP(rw, httptest.NewRequest("GET", "/api/tasks", nil))
if rw.Code != http.StatusUnauthorized {
t.Fatalf("want 401, got %d", rw.Code)
}
}
- Step 3: Запустить — FAIL
Run: go test ./internal/httpapi/ -run 'Healthz|RequiresAuth'
Expected: FAIL (нет Router).
- Step 4: Реализовать ws, router, static, main
internal/httpapi/ws.go:
package httpapi
import (
"context"
"net/http"
"strconv"
"time"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
)
func (s *Server) handleWS(w http.ResponseWriter, r *http.Request) {
taskID, err := strconv.ParseInt(r.URL.Query().Get("task_id"), 10, 64)
if err != nil {
http.Error(w, "task_id required", http.StatusBadRequest)
return
}
c, err := websocket.Accept(w, r, nil)
if err != nil {
return
}
defer c.CloseNow()
subID, ch := s.hub.Subscribe(taskID)
defer s.hub.Unsubscribe(taskID, subID)
ctx := r.Context()
for {
select {
case ev, ok := <-ch:
if !ok {
return
}
wctx, cancel := context.WithTimeout(ctx, 5*time.Second)
err := wsjson.Write(wctx, c, ev)
cancel()
if err != nil {
return
}
case <-ctx.Done():
return
}
}
}
internal/httpapi/router.go:
package httpapi
import "net/http"
func (s *Server) Router() http.Handler {
mux := http.NewServeMux()
// открытые
mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) })
mux.HandleFunc("POST /api/login", s.handleLogin)
mux.HandleFunc("POST /api/logout", s.handleLogout)
// защищённые
api := http.NewServeMux()
api.HandleFunc("GET /api/endpoints", s.handleListEndpoints)
api.HandleFunc("POST /api/endpoints", s.handleCreateEndpoint)
api.HandleFunc("GET /api/tasks", s.handleListTasks)
api.HandleFunc("POST /api/tasks", s.handleCreateTask)
api.HandleFunc("GET /api/tasks/{id}", s.handleGetTask)
api.HandleFunc("POST /api/tasks/{id}/accounts", s.handleCreateAccount)
api.HandleFunc("POST /api/tasks/{id}/import", s.handleImportCSV)
api.HandleFunc("POST /api/tasks/{id}/test", s.handleTestAccounts)
api.HandleFunc("POST /api/tasks/{id}/run", s.handleRun)
api.HandleFunc("GET /ws", s.handleWS)
mux.Handle("/api/", s.requireAuth(api))
mux.Handle("/ws", s.requireAuth(http.HandlerFunc(s.handleWS)))
// SPA static (fallback)
mux.Handle("/", s.staticHandler())
return mux
}
internal/httpapi/static.go:
package httpapi
import (
"embed"
"io/fs"
"net/http"
)
//go:embed all:webdist
var webDist embed.FS
func (s *Server) staticHandler() http.Handler {
sub, err := fs.Sub(webDist, "webdist")
if err != nil {
panic(err)
}
fileServer := http.FileServer(http.FS(sub))
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// SPA fallback: если файла нет — отдать index.html
if _, err := fs.Stat(sub, trimLead(r.URL.Path)); err != nil && r.URL.Path != "/" {
r2 := r.Clone(r.Context())
r2.URL.Path = "/"
fileServer.ServeHTTP(w, r2)
return
}
fileServer.ServeHTTP(w, r)
})
}
func trimLead(p string) string {
if len(p) > 0 && p[0] == '/' {
return p[1:]
}
return p
}
Реализатор: создать заглушку
internal/httpapi/webdist/index.html(одна строка) до первой сборки фронта, иначе//go:embedне скомпилируется. Реальный build кладётся сюда в Task 16/17.
cmd/server/main.go:
package main
import (
"context"
"log/slog"
"net/http"
"os"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/vasyansk/imap-copier/internal/config"
"github.com/vasyansk/imap-copier/internal/httpapi"
"github.com/vasyansk/imap-copier/internal/orchestrator"
"github.com/vasyansk/imap-copier/internal/store"
"github.com/vasyansk/imap-copier/internal/wshub"
)
func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))
cfg, err := config.Load()
if err != nil {
slog.Error("config", "err", err)
os.Exit(1)
}
if err := runMigrations(cfg.DatabaseURL); err != nil {
slog.Error("migrate", "err", err)
os.Exit(1)
}
st, err := store.New(context.Background(), cfg.DatabaseURL)
if err != nil {
slog.Error("store", "err", err)
os.Exit(1)
}
hub := wshub.New()
orch := orchestrator.New(st, hub, cfg.EncKey, cfg.WorkerConcurrency)
srv := httpapi.NewServer(cfg, st, orch, hub)
slog.Info("listening", "addr", cfg.HTTPAddr)
if err := http.ListenAndServe(cfg.HTTPAddr, srv.Router()); err != nil {
slog.Error("serve", "err", err)
os.Exit(1)
}
}
func runMigrations(dsn string) error {
m, err := migrate.New("file://migrations", dsn)
if err != nil {
return err
}
if err := m.Up(); err != nil && err != migrate.ErrNoChange {
return err
}
return nil
}
- Step 5: Запустить тесты и сборку
Run: go test ./internal/httpapi/ -run 'Healthz|RequiresAuth' && go build ./...
Expected: PASS + успешная сборка (заглушка webdist на месте).
- Step 6: Commit
git add internal/httpapi/ws.go internal/httpapi/router.go internal/httpapi/static.go internal/httpapi/router_test.go internal/httpapi/webdist/index.html cmd/server/main.go go.mod go.sum
git commit -m "feat(httpapi): websocket, router, embed static, main entrypoint"
Task 16: React фронтенд
Files:
- Create:
web/(Vite + React + TS):package.json,vite.config.ts,index.html,src/main.tsx,src/api.ts,src/ws.ts,src/pages/Login.tsx,src/pages/Endpoints.tsx,src/pages/Tasks.tsx,src/pages/TaskDetail.tsx
Interfaces:
- Consumes: REST
/api/*, WS/ws?task_id=. - Produces: SPA-билд в
web/dist, копируемый вinternal/httpapi/webdist.
UI-реализация ведётся через skill frontend-design во время исполнения. Ниже — обязательный каркас и контракты; визуальную полировку добавляет исполнитель.
- Step 1: Скаффолдинг Vite
Run: cd web && npm create vite@latest . -- --template react-ts && npm install
Expected: базовый проект Vite.
- Step 2: Настроить прокси для разработки
web/vite.config.ts:
import { defineConfig } from 'vite'
import react from '@vitejs/plugin-react'
export default defineConfig({
plugins: [react()],
build: { outDir: 'dist' },
server: {
proxy: {
'/api': 'http://localhost:8080',
'/ws': { target: 'ws://localhost:8080', ws: true },
},
},
})
- Step 3: API-клиент
web/src/api.ts:
export async function api(path: string, opts: RequestInit = {}) {
const res = await fetch(path, { credentials: 'include', ...opts })
if (res.status === 401) { location.hash = '#/login'; throw new Error('unauthorized') }
if (!res.ok) throw new Error(await res.text())
const ct = res.headers.get('content-type') || ''
return ct.includes('application/json') ? res.json() : res.text()
}
export const login = (user: string, pass: string) =>
api('/api/login', { method: 'POST', headers: { 'content-type': 'application/json' }, body: JSON.stringify({ user, pass }) })
export const listTasks = () => api('/api/tasks')
export const getTask = (id: number) => api(`/api/tasks/${id}`)
export const createTask = (body: unknown) =>
api('/api/tasks', { method: 'POST', headers: { 'content-type': 'application/json' }, body: JSON.stringify(body) })
export const testAccounts = (id: number) => api(`/api/tasks/${id}/test`, { method: 'POST' })
export const runTask = (id: number) => api(`/api/tasks/${id}/run`, { method: 'POST' })
export const importCSV = (id: number, file: File) => {
const fd = new FormData(); fd.append('file', file)
return api(`/api/tasks/${id}/import`, { method: 'POST', body: fd })
}
- Step 4: WebSocket-хук
web/src/ws.ts:
export function connectTaskWS(taskId: number, onEvent: (ev: any) => void): () => void {
const proto = location.protocol === 'https:' ? 'wss' : 'ws'
const ws = new WebSocket(`${proto}://${location.host}/ws?task_id=${taskId}`)
ws.onmessage = (m) => { try { onEvent(JSON.parse(m.data)) } catch {} }
return () => ws.close()
}
- Step 5: Экраны (Login, Endpoints, Tasks, TaskDetail)
Реализатор создаёт 4 страницы с hash-роутингом (#/login, #/, #/endpoints, #/tasks/:id):
- Login — форма user/pass →
login(). - Endpoints — форма host/port/tls_mode для src и dst, список.
- Tasks — список задач, кнопка «создать», выбор src/dst endpoint, имя.
- TaskDetail — ручное добавление account'ов + загрузка CSV; таблица аккаунтов со статусами тестов и счётчиками (copied/skipped/errors); кнопки Test и Run (Run задизейблена, пока не все
test_*_status==="ok"); подписка на WS обновляет строки в реальном времени; лог событий.
TaskDetail — ядро реалтайма. Скелет:
import { useEffect, useState } from 'react'
import { getTask, testAccounts, runTask } from '../api'
import { connectTaskWS } from '../ws'
export function TaskDetail({ id }: { id: number }) {
const [data, setData] = useState<any>(null)
const [log, setLog] = useState<string[]>([])
const reload = () => getTask(id).then(setData)
useEffect(() => { reload() }, [id])
useEffect(() => connectTaskWS(id, (ev) => {
setLog((l) => [`${ev.type}: ${JSON.stringify(ev.data)}`, ...l].slice(0, 200))
if (['account_done', 'account_test', 'run_done', 'progress'].includes(ev.type)) reload()
}), [id])
if (!data) return <div>loading…</div>
const allOK = data.accounts.length > 0 &&
data.accounts.every((a: any) => a.test_src_status === 'ok' && a.test_dst_status === 'ok')
return (
<div>
<button onClick={() => testAccounts(id)}>Test connections</button>
<button disabled={!allOK} onClick={() => runTask(id)}>Run</button>
<table>{/* строки accounts: login, статусы, copied/skipped/errors */}</table>
<pre>{log.join('\n')}</pre>
</div>
)
}
- Step 6: Сборка и проверка
Run: cd web && npm run build
Expected: web/dist/index.html создан.
- Step 7: Commit
git add web/
git commit -m "feat(web): React SPA with realtime task detail over WebSocket"
Task 17: Docker, Caddy, compose и E2E
Files:
- Create:
Dockerfile,docker-compose.yml,Caddyfile,.dockerignore,Makefile,.env.example,README.md
Interfaces:
-
Produces: рабочий образ, поднимаемый
docker compose up. -
Step 1: Dockerfile (multi-stage: web → go)
Dockerfile:
# 1) build web
FROM node:20-alpine AS web
WORKDIR /web
COPY web/package*.json ./
RUN npm ci
COPY web/ ./
RUN npm run build
# 2) build go (embed web build)
FROM golang:1.22-alpine AS go
WORKDIR /src
COPY go.mod go.sum ./
RUN go mod download
COPY . .
COPY --from=web /web/dist ./internal/httpapi/webdist
RUN CGO_ENABLED=0 go build -o /out/server ./cmd/server
# 3) runtime
FROM alpine:3.20
RUN apk add --no-cache ca-certificates
WORKDIR /app
COPY --from=go /out/server /app/server
COPY migrations /app/migrations
EXPOSE 8080
ENTRYPOINT ["/app/server"]
- Step 2: Caddyfile (профили :80 и :443)
Caddyfile:
{
# email для ACME задаётся через env при включении TLS
email {$ACME_EMAIL}
}
# HTTP-профиль по умолчанию (без терминации TLS)
:80 {
reverse_proxy app:8080
}
# HTTPS-профиль включается, если задан DOMAIN (Let's Encrypt автоматически)
{$DOMAIN} {
reverse_proxy app:8080
tls {$ACME_EMAIL}
}
Реализатор: если
DOMAINпуст, второй блок Caddy можно не активировать — использовать два Caddyfile (Caddyfile.http,Caddyfile.tls) и выбирать через env в compose, ИЛИ шаблон. Простейший вариант — двумя разными compose-профилями (--profile tls). Задокументировать в README.
- Step 3: docker-compose.yml
docker-compose.yml:
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: imap
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: imapcopier
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U imap"]
interval: 5s
timeout: 3s
retries: 5
app:
build: .
environment:
DATABASE_URL: postgres://imap:${POSTGRES_PASSWORD}@postgres:5432/imapcopier?sslmode=disable
AUTH_USER: ${AUTH_USER}
AUTH_PASS: ${AUTH_PASS}
ENC_KEY: ${ENC_KEY}
SESSION_SECRET: ${SESSION_SECRET}
WORKER_CONCURRENCY: ${WORKER_CONCURRENCY:-4}
depends_on:
postgres:
condition: service_healthy
caddy:
image: caddy:2-alpine
ports:
- "${HTTP_PORT:-80}:80"
- "443:443"
environment:
DOMAIN: ${DOMAIN:-}
ACME_EMAIL: ${ACME_EMAIL:-}
volumes:
- ./Caddyfile:/etc/caddy/Caddyfile:ro
- caddy_data:/data
- caddy_config:/config
depends_on:
- app
volumes:
pgdata:
caddy_data:
caddy_config:
- Step 4: .env.example
.env.example:
POSTGRES_PASSWORD=change-me
AUTH_USER=admin
AUTH_PASS=change-me
# 32 байта в base64: openssl rand -base64 32
ENC_KEY=
SESSION_SECRET=change-me-long-random
WORKER_CONCURRENCY=4
HTTP_PORT=80
# Для HTTPS + Let's Encrypt: указать домен и email
DOMAIN=
ACME_EMAIL=
- Step 5: Собрать образ и прогнать полный стек
Run:
cp .env.example .env
sed -i '' "s|ENC_KEY=|ENC_KEY=$(openssl rand -base64 32)|" .env
docker compose build
docker compose up -d
sleep 10
curl -fsS http://localhost/healthz
Expected: 200/пустой ответ healthz; docker compose logs app без ошибок миграций.
- Step 6: E2E-скрипт (greenmail как src+dst)
Реализатор добавляет scripts/e2e.sh: поднимает два greenmail-инстанса (или один с двумя юзерами), сидит письма, через REST создаёт endpoints/task/accounts, вызывает /test, /run, дожидается run_done по WS/через GET /api/tasks/{id}, проверяет copied>0; повторный /run даёт copied=0, skipped>0. Это подтверждает идемпотентность на полном стеке.
Run: bash scripts/e2e.sh
Expected: первый run — copied>0; второй — copied=0, skipped=первому copied.
- Step 7: README + commit
README.md — краткий запуск: .env, docker compose up, HTTP на :80, включение HTTPS через DOMAIN/ACME_EMAIL.
git add Dockerfile docker-compose.yml Caddyfile .dockerignore Makefile .env.example README.md scripts/
git commit -m "feat(deploy): docker image, caddy, compose, e2e script"
Self-Review (coverage против спеки)
- Архитектура single-container + Caddy → Task 15 (embed), Task 17 (Docker/Caddy/compose). ✅
- Модель данных Postgres → Task 4 (миграции), Task 5–6 (store). ✅
- Шифрование паролей → Task 2; пароли не в API → Task 14 (
accountDTO-тест). ✅ - Session auth из env → Task 3 + Task 13. ✅
- Дедуп Message-ID + fallback → Task 7; идемпотентность → Task 6 (store) + Task 9 (copy) + Task 17 (E2E). ✅
- Тесты подключения обязательны перед run → Task 8 (imapx test) + Task 11 (
gateOK,TestAccounts) + Task 14 (409). ✅ - Стриминг в RAM, без диска → Task 9 (
streamOne), Global Constraints. ✅ - Недеструктивно (только копия) → Task 9 (нет
\Deleted/EXPUNGE), Global Constraints. ✅ - WebSocket реалтайм → Task 10 (hub) + Task 15 (
handleWS) + Task 16 (UI). ✅ - CSV импорт → Task 12 + Task 14 (
handleImportCSV). ✅ - Логирование со счётчиками → slog в orchestrator (Task 11),
runs/accountsсчётчики (Task 6). ✅ - Caddy :80 дефолт + опц. :443 Let's Encrypt → Task 17 (Caddyfile, compose, .env). ✅
Известные правки, встроенные в план (реализатор обязан выполнить):
- Task 9 ↔ Task 11:
CopyFolderрасширить до(srcFolder, dstFolder)для корректного folder-mapping (описано в Task 11, Step 3). - Task 14: фоновые
TestAccounts/Run— обернуть контекст вcontext.WithoutCancel, чтобы работа не обрывалась завершением HTTP-запроса. - Task 6:
IsMigrated— использоватьerrors.Is(err, pgx.ErrNoRows)(указано в замечании).
Type consistency: store.Account, imapx.Endpoint, imapx.CopyDeps, wshub.Event, AccountView — имена и поля согласованы между задачами 5–16.