From 67a2367baa5d80999643e7dc633e64a850bf10a5 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Wed, 1 Jul 2026 16:58:33 +0700 Subject: [PATCH] feat(store): tasks, accounts, runs, dedup journal --- internal/store/accounts.go | 74 +++++++++++++++++++++++++++++++++ internal/store/accounts_test.go | 30 +++++++++++++ internal/store/migrated.go | 30 +++++++++++++ internal/store/runs.go | 27 ++++++++++++ internal/store/tasks.go | 57 +++++++++++++++++++++++++ 5 files changed, 218 insertions(+) create mode 100644 internal/store/accounts.go create mode 100644 internal/store/accounts_test.go create mode 100644 internal/store/migrated.go create mode 100644 internal/store/runs.go create mode 100644 internal/store/tasks.go diff --git a/internal/store/accounts.go b/internal/store/accounts.go new file mode 100644 index 0000000..80041d6 --- /dev/null +++ b/internal/store/accounts.go @@ -0,0 +1,74 @@ +package store + +import ( + "context" + "fmt" +) + +type Account struct { + ID int64 + TaskID int64 + SrcLogin string + SrcPassEnc string + DstLogin string + DstPassEnc string + TestSrcStatus string + TestDstStatus string + Status string + Copied int64 + Skipped int64 + Errors int64 +} + +func (s *Store) CreateAccount(ctx context.Context, a Account) (int64, error) { + var id int64 + err := s.Pool.QueryRow(ctx, + `INSERT INTO accounts (task_id, src_login, src_pass_enc, dst_login, dst_pass_enc) + VALUES ($1,$2,$3,$4,$5) RETURNING id`, + a.TaskID, a.SrcLogin, a.SrcPassEnc, a.DstLogin, a.DstPassEnc).Scan(&id) + return id, err +} + +func (s *Store) ListAccountsByTask(ctx context.Context, taskID int64) ([]Account, error) { + rows, err := s.Pool.Query(ctx, + `SELECT id, task_id, src_login, src_pass_enc, dst_login, dst_pass_enc, + test_src_status, test_dst_status, status, copied_count, skipped_count, error_count + FROM accounts WHERE task_id=$1 ORDER BY id`, taskID) + if err != nil { + return nil, err + } + defer rows.Close() + var out []Account + for rows.Next() { + var a Account + if err := rows.Scan(&a.ID, &a.TaskID, &a.SrcLogin, &a.SrcPassEnc, &a.DstLogin, &a.DstPassEnc, + &a.TestSrcStatus, &a.TestDstStatus, &a.Status, &a.Copied, &a.Skipped, &a.Errors); err != nil { + return nil, err + } + out = append(out, a) + } + return out, rows.Err() +} + +// side = "src" | "dst" +func (s *Store) SetAccountTestStatus(ctx context.Context, id int64, side, status string) error { + col := "test_src_status" + if side == "dst" { + col = "test_dst_status" + } + _, err := s.Pool.Exec(ctx, fmt.Sprintf(`UPDATE accounts SET %s=$2 WHERE id=$1`, col), id, status) + return err +} + +func (s *Store) SetAccountStatus(ctx context.Context, id int64, status string) error { + _, err := s.Pool.Exec(ctx, `UPDATE accounts SET status=$2 WHERE id=$1`, id, status) + return err +} + +func (s *Store) IncAccountCounters(ctx context.Context, id, copied, skipped, errs int64) error { + _, err := s.Pool.Exec(ctx, + `UPDATE accounts SET copied_count=copied_count+$2, + skipped_count=skipped_count+$3, error_count=error_count+$4 WHERE id=$1`, + id, copied, skipped, errs) + return err +} diff --git a/internal/store/accounts_test.go b/internal/store/accounts_test.go new file mode 100644 index 0000000..07b4fa9 --- /dev/null +++ b/internal/store/accounts_test.go @@ -0,0 +1,30 @@ +package store + +import ( + "context" + "testing" +) + +func TestMigratedIdempotency(t *testing.T) { + s := testStore(t) + ctx := context.Background() + epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"}) + epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"}) + taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst}) + accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "u2", DstPassEnc: "y"}) + + if err := s.MarkMigrated(ctx, accID, "INBOX", ""); err != nil { + t.Fatalf("mark: %v", err) + } + if err := s.MarkMigrated(ctx, accID, "INBOX", ""); err != nil { + t.Fatalf("second mark must not error (ON CONFLICT): %v", err) + } + ok, err := s.IsMigrated(ctx, accID, "") + if err != nil || !ok { + t.Fatalf("IsMigrated = %v,%v want true,nil", ok, err) + } + absent, _ := s.IsMigrated(ctx, accID, "") + if absent { + t.Fatal("unknown key must be false") + } +} diff --git a/internal/store/migrated.go b/internal/store/migrated.go new file mode 100644 index 0000000..5fb1d49 --- /dev/null +++ b/internal/store/migrated.go @@ -0,0 +1,30 @@ +package store + +import ( + "context" + "errors" + + "github.com/jackc/pgx/v5" +) + +func (s *Store) IsMigrated(ctx context.Context, accountID int64, key string) (bool, error) { + var one int + err := s.Pool.QueryRow(ctx, + `SELECT 1 FROM migrated_messages WHERE account_id=$1 AND message_key=$2`, + accountID, key).Scan(&one) + if errors.Is(err, pgx.ErrNoRows) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil +} + +func (s *Store) MarkMigrated(ctx context.Context, accountID int64, folder, key string) error { + _, err := s.Pool.Exec(ctx, + `INSERT INTO migrated_messages (account_id, folder, message_key) + VALUES ($1,$2,$3) ON CONFLICT (account_id, message_key) DO NOTHING`, + accountID, folder, key) + return err +} diff --git a/internal/store/runs.go b/internal/store/runs.go new file mode 100644 index 0000000..ac0075a --- /dev/null +++ b/internal/store/runs.go @@ -0,0 +1,27 @@ +package store + +import "context" + +type Run struct { + ID int64 + TaskID int64 + Status string + TotalCopied int64 + TotalSkipped int64 + TotalErrors int64 +} + +func (s *Store) CreateRun(ctx context.Context, taskID int64) (int64, error) { + var id int64 + err := s.Pool.QueryRow(ctx, + `INSERT INTO runs (task_id) VALUES ($1) RETURNING id`, taskID).Scan(&id) + return id, err +} + +func (s *Store) FinishRun(ctx context.Context, id int64, status string, copied, skipped, errs int64) error { + _, err := s.Pool.Exec(ctx, + `UPDATE runs SET status=$2, finished_at=now(), + total_copied=$3, total_skipped=$4, total_errors=$5 WHERE id=$1`, + id, status, copied, skipped, errs) + return err +} diff --git a/internal/store/tasks.go b/internal/store/tasks.go new file mode 100644 index 0000000..2d48fea --- /dev/null +++ b/internal/store/tasks.go @@ -0,0 +1,57 @@ +package store + +import "context" + +type Task struct { + ID int64 + Name string + SrcEndpointID int64 + DstEndpointID int64 + Status string + FolderMapping map[string]string +} + +func (s *Store) CreateTask(ctx context.Context, t Task) (int64, error) { + if t.FolderMapping == nil { + t.FolderMapping = map[string]string{} + } + var id int64 + err := s.Pool.QueryRow(ctx, + `INSERT INTO tasks (name, src_endpoint_id, dst_endpoint_id, folder_mapping) + VALUES ($1,$2,$3,$4) RETURNING id`, + t.Name, t.SrcEndpointID, t.DstEndpointID, t.FolderMapping).Scan(&id) + return id, err +} + +func (s *Store) GetTask(ctx context.Context, id int64) (Task, error) { + var t Task + err := s.Pool.QueryRow(ctx, + `SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping + FROM tasks WHERE id=$1`, id). + Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping) + return t, err +} + +func (s *Store) ListTasks(ctx context.Context) ([]Task, error) { + rows, err := s.Pool.Query(ctx, + `SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping + FROM tasks ORDER BY id DESC`) + if err != nil { + return nil, err + } + defer rows.Close() + var out []Task + for rows.Next() { + var t Task + if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping); err != nil { + return nil, err + } + out = append(out, t) + } + return out, rows.Err() +} + +func (s *Store) SetTaskStatus(ctx context.Context, id int64, status string) error { + _, err := s.Pool.Exec(ctx, `UPDATE tasks SET status=$2 WHERE id=$1`, id, status) + return err +}