feat(store): schedules, notification_channels, domain last_check_status + методы

This commit is contained in:
2026-07-04 13:10:42 +07:00
parent 1cdb32b747
commit 6fd847a909
10 changed files with 814 additions and 5 deletions
+174
View File
@@ -2,6 +2,7 @@ package store
import (
"context"
"encoding/json"
"errors"
"time"
@@ -137,12 +138,14 @@ type Domain struct {
ZoneName string
ZoneID string
TemplateID *uuid.UUID
LastCheckStatus string
}
func domainFromDB(d db.Domain) Domain {
return Domain{
ID: d.ID, ProjectID: d.ProjectID, ProviderAccountID: d.ProviderAccountID,
ZoneName: d.ZoneName, ZoneID: d.ZoneID, TemplateID: d.TemplateID,
LastCheckStatus: d.LastCheckStatus,
}
}
@@ -231,6 +234,19 @@ func (s *Store) SetDomainTemplate(ctx context.Context, domainID, projectID uuid.
return domainFromDB(d), nil
}
// GetDomainStatus returns the last known check status for a domain (Фаза 3
// scheduler/checker). Callers scope access to the domain themselves (e.g.
// via a prior GetDomain) — this lookup is by primary key alone.
func (s *Store) GetDomainStatus(ctx context.Context, domainID uuid.UUID) (string, error) {
return s.q.GetDomainStatus(ctx, domainID)
}
// SetDomainStatus records the outcome of the most recent check/apply run for
// a domain (e.g. "ok", "drift", "error").
func (s *Store) SetDomainStatus(ctx context.Context, domainID uuid.UUID, status string) error {
return s.q.SetDomainStatus(ctx, db.SetDomainStatusParams{ID: domainID, LastCheckStatus: status})
}
// User and Project are provider-neutral domain structs for the auth/tenant
// layer (Фаза 2), mirroring the Account/Template/Domain wrappers above so
// callers never need to import internal/store/db directly.
@@ -369,3 +385,161 @@ func (s *Store) RegisterUser(ctx context.Context, email, passwordHash string) (U
}
return toUser(dbu), toProject(dbp), nil
}
// Schedule and Channel are provider-neutral domain structs for the
// scheduler/notifications layer (Фаза 3), mirroring the wrappers above so
// callers never need to import internal/store/db or pgtype directly.
type Schedule struct {
ID uuid.UUID
ProjectID uuid.UUID
IntervalSeconds int32
Enabled bool
LastRunAt *time.Time
}
// timeFromTimestamptz converts a nullable pgtype.Timestamptz (schedules.last_run_at)
// into a *time.Time, nil when the column is NULL (schedule never ran).
func timeFromTimestamptz(t pgtype.Timestamptz) *time.Time {
if !t.Valid {
return nil
}
tt := t.Time
return &tt
}
// timestamptzFromTime is the inverse of timeFromTimestamptz, used to pass a
// Go time.Time (or nil) into a nullable timestamptz query parameter.
func timestamptzFromTime(t *time.Time) pgtype.Timestamptz {
if t == nil {
return pgtype.Timestamptz{}
}
return pgtype.Timestamptz{Time: *t, Valid: true}
}
func scheduleFromDB(s db.Schedule) Schedule {
return Schedule{
ID: s.ID,
ProjectID: s.ProjectID,
IntervalSeconds: s.IntervalSeconds,
Enabled: s.Enabled,
LastRunAt: timeFromTimestamptz(s.LastRunAt),
}
}
// GetSchedule looks up the schedule row for projectID. When no schedule has
// ever been created for the project it returns pgx.ErrNoRows unwrapped —
// the API layer (Task 5) is expected to treat that as the default schedule
// {interval: 3600, enabled: false} rather than an error.
func (s *Store) GetSchedule(ctx context.Context, projectID uuid.UUID) (Schedule, error) {
sc, err := s.q.GetSchedule(ctx, projectID)
if err != nil {
return Schedule{}, err
}
return scheduleFromDB(sc), nil
}
// UpsertSchedule creates or updates the (single, UNIQUE) schedule row for a
// project: an existing row has its interval/enabled flag updated in place
// rather than a second row being inserted.
func (s *Store) UpsertSchedule(ctx context.Context, projectID uuid.UUID, interval int32, enabled bool) (Schedule, error) {
sc, err := s.q.UpsertSchedule(ctx, db.UpsertScheduleParams{
ID: uuid.New(), ProjectID: projectID, IntervalSeconds: interval, Enabled: enabled,
})
if err != nil {
return Schedule{}, err
}
return scheduleFromDB(sc), nil
}
// ListDueSchedules returns every enabled schedule that is due to run at
// `now`: either it has never run (last_run_at IS NULL) or its interval has
// elapsed since the last run.
func (s *Store) ListDueSchedules(ctx context.Context, now time.Time) ([]Schedule, error) {
rows, err := s.q.ListDueSchedules(ctx, pgtype.Timestamptz{Time: now, Valid: true})
if err != nil {
return nil, err
}
out := make([]Schedule, 0, len(rows))
for _, r := range rows {
out = append(out, scheduleFromDB(r))
}
return out, nil
}
// TouchScheduleRun records that a project's schedule ran at `at`, so the
// next ListDueSchedules call excludes it until the interval elapses again.
func (s *Store) TouchScheduleRun(ctx context.Context, projectID uuid.UUID, at time.Time) error {
return s.q.TouchScheduleRun(ctx, db.TouchScheduleRunParams{
ProjectID: projectID, LastRunAt: timestamptzFromTime(&at),
})
}
type Channel struct {
ID uuid.UUID
ProjectID uuid.UUID
Type string
Config json.RawMessage
SecretEnc string
Enabled bool
}
// channelFromDB never logs SecretEnc — callers must not either (secrets are
// encrypted at rest, but the plaintext blob should still stay out of logs).
func channelFromDB(c db.NotificationChannel) Channel {
return Channel{
ID: c.ID, ProjectID: c.ProjectID, Type: c.Type,
Config: json.RawMessage(c.Config), SecretEnc: c.SecretEnc, Enabled: c.Enabled,
}
}
func (s *Store) CreateChannel(ctx context.Context, projectID uuid.UUID, ctype string, config json.RawMessage, secretEnc string) (Channel, error) {
c, err := s.q.CreateChannel(ctx, db.CreateChannelParams{
ID: uuid.New(), ProjectID: projectID, Type: ctype, Config: []byte(config), SecretEnc: secretEnc,
})
if err != nil {
return Channel{}, err
}
return channelFromDB(c), nil
}
func (s *Store) ListChannels(ctx context.Context, projectID uuid.UUID) ([]Channel, error) {
rows, err := s.q.ListChannels(ctx, projectID)
if err != nil {
return nil, err
}
out := make([]Channel, 0, len(rows))
for _, r := range rows {
out = append(out, channelFromDB(r))
}
return out, nil
}
// ListEnabledChannels returns only the channels a project has enabled — used
// by the notification dispatcher (Фаза 3) so disabled channels are silently
// skipped rather than filtered by every caller.
func (s *Store) ListEnabledChannels(ctx context.Context, projectID uuid.UUID) ([]Channel, error) {
rows, err := s.q.ListEnabledChannels(ctx, projectID)
if err != nil {
return nil, err
}
out := make([]Channel, 0, len(rows))
for _, r := range rows {
out = append(out, channelFromDB(r))
}
return out, nil
}
// GetChannel is scoped by projectID: a channel ID belonging to another
// tenant's project returns pgx.ErrNoRows rather than the foreign channel.
func (s *Store) GetChannel(ctx context.Context, id, projectID uuid.UUID) (Channel, error) {
c, err := s.q.GetChannel(ctx, db.GetChannelParams{ID: id, ProjectID: projectID})
if err != nil {
return Channel{}, err
}
return channelFromDB(c), nil
}
func (s *Store) DeleteChannel(ctx context.Context, id, projectID uuid.UUID) error {
return s.q.DeleteChannel(ctx, db.DeleteChannelParams{ID: id, ProjectID: projectID})
}