feat(store): tasks, accounts, runs, dedup journal
This commit is contained in:
@@ -0,0 +1,74 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Account struct {
|
||||||
|
ID int64
|
||||||
|
TaskID int64
|
||||||
|
SrcLogin string
|
||||||
|
SrcPassEnc string
|
||||||
|
DstLogin string
|
||||||
|
DstPassEnc string
|
||||||
|
TestSrcStatus string
|
||||||
|
TestDstStatus string
|
||||||
|
Status string
|
||||||
|
Copied int64
|
||||||
|
Skipped int64
|
||||||
|
Errors int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) CreateAccount(ctx context.Context, a Account) (int64, error) {
|
||||||
|
var id int64
|
||||||
|
err := s.Pool.QueryRow(ctx,
|
||||||
|
`INSERT INTO accounts (task_id, src_login, src_pass_enc, dst_login, dst_pass_enc)
|
||||||
|
VALUES ($1,$2,$3,$4,$5) RETURNING id`,
|
||||||
|
a.TaskID, a.SrcLogin, a.SrcPassEnc, a.DstLogin, a.DstPassEnc).Scan(&id)
|
||||||
|
return id, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) ListAccountsByTask(ctx context.Context, taskID int64) ([]Account, error) {
|
||||||
|
rows, err := s.Pool.Query(ctx,
|
||||||
|
`SELECT id, task_id, src_login, src_pass_enc, dst_login, dst_pass_enc,
|
||||||
|
test_src_status, test_dst_status, status, copied_count, skipped_count, error_count
|
||||||
|
FROM accounts WHERE task_id=$1 ORDER BY id`, taskID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var out []Account
|
||||||
|
for rows.Next() {
|
||||||
|
var a Account
|
||||||
|
if err := rows.Scan(&a.ID, &a.TaskID, &a.SrcLogin, &a.SrcPassEnc, &a.DstLogin, &a.DstPassEnc,
|
||||||
|
&a.TestSrcStatus, &a.TestDstStatus, &a.Status, &a.Copied, &a.Skipped, &a.Errors); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, a)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// side = "src" | "dst"
|
||||||
|
func (s *Store) SetAccountTestStatus(ctx context.Context, id int64, side, status string) error {
|
||||||
|
col := "test_src_status"
|
||||||
|
if side == "dst" {
|
||||||
|
col = "test_dst_status"
|
||||||
|
}
|
||||||
|
_, err := s.Pool.Exec(ctx, fmt.Sprintf(`UPDATE accounts SET %s=$2 WHERE id=$1`, col), id, status)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) SetAccountStatus(ctx context.Context, id int64, status string) error {
|
||||||
|
_, err := s.Pool.Exec(ctx, `UPDATE accounts SET status=$2 WHERE id=$1`, id, status)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) IncAccountCounters(ctx context.Context, id, copied, skipped, errs int64) error {
|
||||||
|
_, err := s.Pool.Exec(ctx,
|
||||||
|
`UPDATE accounts SET copied_count=copied_count+$2,
|
||||||
|
skipped_count=skipped_count+$3, error_count=error_count+$4 WHERE id=$1`,
|
||||||
|
id, copied, skipped, errs)
|
||||||
|
return err
|
||||||
|
}
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMigratedIdempotency(t *testing.T) {
|
||||||
|
s := testStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
|
||||||
|
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
|
||||||
|
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
|
||||||
|
accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "u2", DstPassEnc: "y"})
|
||||||
|
|
||||||
|
if err := s.MarkMigrated(ctx, accID, "INBOX", "<msg-1>"); err != nil {
|
||||||
|
t.Fatalf("mark: %v", err)
|
||||||
|
}
|
||||||
|
if err := s.MarkMigrated(ctx, accID, "INBOX", "<msg-1>"); err != nil {
|
||||||
|
t.Fatalf("second mark must not error (ON CONFLICT): %v", err)
|
||||||
|
}
|
||||||
|
ok, err := s.IsMigrated(ctx, accID, "<msg-1>")
|
||||||
|
if err != nil || !ok {
|
||||||
|
t.Fatalf("IsMigrated = %v,%v want true,nil", ok, err)
|
||||||
|
}
|
||||||
|
absent, _ := s.IsMigrated(ctx, accID, "<msg-2>")
|
||||||
|
if absent {
|
||||||
|
t.Fatal("unknown key must be false")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *Store) IsMigrated(ctx context.Context, accountID int64, key string) (bool, error) {
|
||||||
|
var one int
|
||||||
|
err := s.Pool.QueryRow(ctx,
|
||||||
|
`SELECT 1 FROM migrated_messages WHERE account_id=$1 AND message_key=$2`,
|
||||||
|
accountID, key).Scan(&one)
|
||||||
|
if errors.Is(err, pgx.ErrNoRows) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) MarkMigrated(ctx context.Context, accountID int64, folder, key string) error {
|
||||||
|
_, err := s.Pool.Exec(ctx,
|
||||||
|
`INSERT INTO migrated_messages (account_id, folder, message_key)
|
||||||
|
VALUES ($1,$2,$3) ON CONFLICT (account_id, message_key) DO NOTHING`,
|
||||||
|
accountID, folder, key)
|
||||||
|
return err
|
||||||
|
}
|
||||||
@@ -0,0 +1,27 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type Run struct {
|
||||||
|
ID int64
|
||||||
|
TaskID int64
|
||||||
|
Status string
|
||||||
|
TotalCopied int64
|
||||||
|
TotalSkipped int64
|
||||||
|
TotalErrors int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) CreateRun(ctx context.Context, taskID int64) (int64, error) {
|
||||||
|
var id int64
|
||||||
|
err := s.Pool.QueryRow(ctx,
|
||||||
|
`INSERT INTO runs (task_id) VALUES ($1) RETURNING id`, taskID).Scan(&id)
|
||||||
|
return id, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) FinishRun(ctx context.Context, id int64, status string, copied, skipped, errs int64) error {
|
||||||
|
_, err := s.Pool.Exec(ctx,
|
||||||
|
`UPDATE runs SET status=$2, finished_at=now(),
|
||||||
|
total_copied=$3, total_skipped=$4, total_errors=$5 WHERE id=$1`,
|
||||||
|
id, status, copied, skipped, errs)
|
||||||
|
return err
|
||||||
|
}
|
||||||
@@ -0,0 +1,57 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type Task struct {
|
||||||
|
ID int64
|
||||||
|
Name string
|
||||||
|
SrcEndpointID int64
|
||||||
|
DstEndpointID int64
|
||||||
|
Status string
|
||||||
|
FolderMapping map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) CreateTask(ctx context.Context, t Task) (int64, error) {
|
||||||
|
if t.FolderMapping == nil {
|
||||||
|
t.FolderMapping = map[string]string{}
|
||||||
|
}
|
||||||
|
var id int64
|
||||||
|
err := s.Pool.QueryRow(ctx,
|
||||||
|
`INSERT INTO tasks (name, src_endpoint_id, dst_endpoint_id, folder_mapping)
|
||||||
|
VALUES ($1,$2,$3,$4) RETURNING id`,
|
||||||
|
t.Name, t.SrcEndpointID, t.DstEndpointID, t.FolderMapping).Scan(&id)
|
||||||
|
return id, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) GetTask(ctx context.Context, id int64) (Task, error) {
|
||||||
|
var t Task
|
||||||
|
err := s.Pool.QueryRow(ctx,
|
||||||
|
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping
|
||||||
|
FROM tasks WHERE id=$1`, id).
|
||||||
|
Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping)
|
||||||
|
return t, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) ListTasks(ctx context.Context) ([]Task, error) {
|
||||||
|
rows, err := s.Pool.Query(ctx,
|
||||||
|
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping
|
||||||
|
FROM tasks ORDER BY id DESC`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var out []Task
|
||||||
|
for rows.Next() {
|
||||||
|
var t Task
|
||||||
|
if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, t)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) SetTaskStatus(ctx context.Context, id int64, status string) error {
|
||||||
|
_, err := s.Pool.Exec(ctx, `UPDATE tasks SET status=$2 WHERE id=$1`, id, status)
|
||||||
|
return err
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user