feat(store): pgx pool and endpoints CRUD
This commit is contained in:
@@ -0,0 +1,46 @@
|
||||
package store
|
||||
|
||||
import "context"
|
||||
|
||||
type Endpoint struct {
|
||||
ID int64
|
||||
RoleLabel string
|
||||
Host string
|
||||
Port int
|
||||
TLSMode string
|
||||
}
|
||||
|
||||
func (s *Store) CreateEndpoint(ctx context.Context, e Endpoint) (int64, error) {
|
||||
var id int64
|
||||
err := s.Pool.QueryRow(ctx,
|
||||
`INSERT INTO endpoints (role_label, host, port, tls_mode)
|
||||
VALUES ($1,$2,$3,$4) RETURNING id`,
|
||||
e.RoleLabel, e.Host, e.Port, e.TLSMode).Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
|
||||
func (s *Store) GetEndpoint(ctx context.Context, id int64) (Endpoint, error) {
|
||||
var e Endpoint
|
||||
err := s.Pool.QueryRow(ctx,
|
||||
`SELECT id, role_label, host, port, tls_mode FROM endpoints WHERE id=$1`, id).
|
||||
Scan(&e.ID, &e.RoleLabel, &e.Host, &e.Port, &e.TLSMode)
|
||||
return e, err
|
||||
}
|
||||
|
||||
func (s *Store) ListEndpoints(ctx context.Context) ([]Endpoint, error) {
|
||||
rows, err := s.Pool.Query(ctx,
|
||||
`SELECT id, role_label, host, port, tls_mode FROM endpoints ORDER BY id`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []Endpoint
|
||||
for rows.Next() {
|
||||
var e Endpoint
|
||||
if err := rows.Scan(&e.ID, &e.RoleLabel, &e.Host, &e.Port, &e.TLSMode); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, e)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
Pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
func New(ctx context.Context, dsn string) (*Store, error) {
|
||||
pool, err := pgxpool.New(ctx, dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Store{Pool: pool}, nil
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func testStore(t *testing.T) *Store {
|
||||
dsn := os.Getenv("TEST_DATABASE_URL")
|
||||
if dsn == "" {
|
||||
t.Skip("TEST_DATABASE_URL not set")
|
||||
}
|
||||
s, err := New(context.Background(), dsn)
|
||||
if err != nil {
|
||||
t.Fatalf("New: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
s.Pool.Exec(context.Background(),
|
||||
`TRUNCATE endpoints, tasks, accounts, runs, migrated_messages RESTART IDENTITY CASCADE`)
|
||||
s.Pool.Close()
|
||||
})
|
||||
return s
|
||||
}
|
||||
|
||||
func TestCreateAndGetEndpoint(t *testing.T) {
|
||||
s := testStore(t)
|
||||
ctx := context.Background()
|
||||
id, err := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "imap.a.com", Port: 993, TLSMode: "ssl"})
|
||||
if err != nil {
|
||||
t.Fatalf("create: %v", err)
|
||||
}
|
||||
got, err := s.GetEndpoint(ctx, id)
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
if got.Host != "imap.a.com" || got.Port != 993 {
|
||||
t.Fatalf("got %+v", got)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user