fix(scheduler): убрать двойной SaveCheckRun (Checker персистит), SetDrift через CountDriftDomains, resolved после error
This commit is contained in:
@@ -28,14 +28,16 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// SchedStore is the narrow store dependency the scheduler needs: due
|
// SchedStore is the narrow store dependency the scheduler needs: due
|
||||||
// schedules, their domains, and per-domain status bookkeeping.
|
// schedules, their domains, and per-domain status bookkeeping. Persisting
|
||||||
|
// the check result itself (check_runs) is the Checker's job — see Checker
|
||||||
|
// below — not the scheduler's.
|
||||||
type SchedStore interface {
|
type SchedStore interface {
|
||||||
ListDueSchedules(ctx context.Context, now time.Time) ([]store.Schedule, error)
|
ListDueSchedules(ctx context.Context, now time.Time) ([]store.Schedule, error)
|
||||||
TouchScheduleRun(ctx context.Context, projectID uuid.UUID, at time.Time) error
|
TouchScheduleRun(ctx context.Context, projectID uuid.UUID, at time.Time) error
|
||||||
ListDomains(ctx context.Context, projectID uuid.UUID) ([]store.Domain, error)
|
ListDomains(ctx context.Context, projectID uuid.UUID) ([]store.Domain, error)
|
||||||
GetDomainStatus(ctx context.Context, domainID uuid.UUID) (string, error)
|
GetDomainStatus(ctx context.Context, domainID uuid.UUID) (string, error)
|
||||||
SetDomainStatus(ctx context.Context, domainID uuid.UUID, status string) error
|
SetDomainStatus(ctx context.Context, domainID uuid.UUID, status string) error
|
||||||
SaveCheckRun(ctx context.Context, domainID uuid.UUID, cs diff.Changeset) error
|
CountDriftDomains(ctx context.Context) (int, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checker computes the diff between a domain's desired template and its
|
// Checker computes the diff between a domain's desired template and its
|
||||||
@@ -92,8 +94,6 @@ func (s *Scheduler) RunOnce(ctx context.Context, now time.Time) error {
|
|||||||
return fmt.Errorf("list due schedules: %w", err)
|
return fmt.Errorf("list due schedules: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
driftCount := 0
|
|
||||||
|
|
||||||
for _, sch := range due {
|
for _, sch := range due {
|
||||||
domains, err := s.store.ListDomains(ctx, sch.ProjectID)
|
domains, err := s.store.ListDomains(ctx, sch.ProjectID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -102,9 +102,7 @@ func (s *Scheduler) RunOnce(ctx context.Context, now time.Time) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, d := range domains {
|
for _, d := range domains {
|
||||||
if s.checkDomain(ctx, sch.ProjectID, d, now) == StatusDrift {
|
s.checkDomain(ctx, sch.ProjectID, d, now)
|
||||||
driftCount++
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.store.TouchScheduleRun(ctx, sch.ProjectID, now); err != nil {
|
if err := s.store.TouchScheduleRun(ctx, sch.ProjectID, now); err != nil {
|
||||||
@@ -112,7 +110,15 @@ func (s *Scheduler) RunOnce(ctx context.Context, now time.Time) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.metrics.SetDrift(driftCount)
|
// The real, system-wide count of drift domains — not a local
|
||||||
|
// accumulator scoped to this tick's due projects — so the gauge
|
||||||
|
// reflects reality even across ticks where different projects are due.
|
||||||
|
count, err := s.store.CountDriftDomains(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("scheduler: count drift domains failed: %v", err)
|
||||||
|
} else {
|
||||||
|
s.metrics.SetDrift(count)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -139,12 +145,10 @@ func (s *Scheduler) checkDomain(ctx context.Context, projectID uuid.UUID, d stor
|
|||||||
prev = StatusUnknown
|
prev = StatusUnknown
|
||||||
}
|
}
|
||||||
|
|
||||||
// A failed Check has no changeset worth recording; a successful one does.
|
// Persisting the check_runs row is the Checker's job: DomainService.Check
|
||||||
if checkErr == nil {
|
// already calls Recorder.SaveCheckRun internally on every successful
|
||||||
if err := s.store.SaveCheckRun(ctx, d.ID, cs); err != nil {
|
// check (drift or in_sync). Calling it again here would double-write
|
||||||
log.Printf("scheduler: save check run for %s failed: %v", d.ID, err)
|
// check_runs history for the same check.
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.store.SetDomainStatus(ctx, d.ID, newStatus); err != nil {
|
if err := s.store.SetDomainStatus(ctx, d.ID, newStatus); err != nil {
|
||||||
log.Printf("scheduler: set domain status for %s failed: %v", d.ID, err)
|
log.Printf("scheduler: set domain status for %s failed: %v", d.ID, err)
|
||||||
@@ -170,15 +174,16 @@ func (s *Scheduler) checkDomain(ctx context.Context, projectID uuid.UUID, d stor
|
|||||||
// shouldNotify decides whether a prev -> new status transition is worth
|
// shouldNotify decides whether a prev -> new status transition is worth
|
||||||
// alerting on:
|
// alerting on:
|
||||||
// - entering drift or error from any other status is always notified;
|
// - entering drift or error from any other status is always notified;
|
||||||
// - recovering from drift back to in_sync ("resolved") is notified;
|
// - recovering from drift OR error back to in_sync ("resolved") is
|
||||||
|
// notified — including recovery after a provider/check failure;
|
||||||
// - the initial unknown -> in_sync transition (first successful check of a
|
// - the initial unknown -> in_sync transition (first successful check of a
|
||||||
// domain that never drifted) is NOT notified — it is not news, it is the
|
// domain that never drifted or errored) is NOT notified — it is not
|
||||||
// expected steady state.
|
// news, it is the expected steady state.
|
||||||
func shouldNotify(prev, newStatus string) bool {
|
func shouldNotify(prev, newStatus string) bool {
|
||||||
if (newStatus == StatusDrift || newStatus == StatusError) && newStatus != prev {
|
if (newStatus == StatusDrift || newStatus == StatusError) && newStatus != prev {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if prev == StatusDrift && newStatus == StatusInSync {
|
if (prev == StatusDrift || prev == StatusError) && newStatus == StatusInSync {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -24,8 +24,11 @@ type mockStore struct {
|
|||||||
domains map[uuid.UUID][]store.Domain
|
domains map[uuid.UUID][]store.Domain
|
||||||
status map[uuid.UUID]string
|
status map[uuid.UUID]string
|
||||||
|
|
||||||
savedCheckRuns []uuid.UUID
|
|
||||||
touchedProjects []uuid.UUID
|
touchedProjects []uuid.UUID
|
||||||
|
|
||||||
|
// driftCount is what CountDriftDomains returns — a canned system-wide
|
||||||
|
// count, independent of what this RunOnce's due projects touched.
|
||||||
|
driftCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMockStore() *mockStore {
|
func newMockStore() *mockStore {
|
||||||
@@ -66,11 +69,10 @@ func (m *mockStore) SetDomainStatus(ctx context.Context, domainID uuid.UUID, sta
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockStore) SaveCheckRun(ctx context.Context, domainID uuid.UUID, cs diff.Changeset) error {
|
func (m *mockStore) CountDriftDomains(ctx context.Context) (int, error) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
m.savedCheckRuns = append(m.savedCheckRuns, domainID)
|
return m.driftCount, nil
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// mockChecker returns a preset Changeset or error per domainID.
|
// mockChecker returns a preset Changeset or error per domainID.
|
||||||
@@ -127,6 +129,12 @@ func TestRunOnce_NotifiesOnDriftNotOnFirstInSync(t *testing.T) {
|
|||||||
notifier := &mockNotifier{}
|
notifier := &mockNotifier{}
|
||||||
m := metrics.New()
|
m := metrics.New()
|
||||||
|
|
||||||
|
// CountDriftDomains is the real system-wide count, independent of what
|
||||||
|
// this tick touched — set it to something that would NOT match a local
|
||||||
|
// per-tick accumulator (only 1 of 2 domains here drifted) to prove the
|
||||||
|
// gauge comes from the store call, not a local tally.
|
||||||
|
st.driftCount = 7
|
||||||
|
|
||||||
sched := New(st, checker, notifier, m)
|
sched := New(st, checker, notifier, m)
|
||||||
|
|
||||||
if err := sched.RunOnce(context.Background(), time.Now()); err != nil {
|
if err := sched.RunOnce(context.Background(), time.Now()); err != nil {
|
||||||
@@ -150,9 +158,6 @@ func TestRunOnce_NotifiesOnDriftNotOnFirstInSync(t *testing.T) {
|
|||||||
t.Fatalf("notified status = %q, want drift", notifier.events[0].Status)
|
t.Fatalf("notified status = %q, want drift", notifier.events[0].Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(st.savedCheckRuns) != 2 {
|
|
||||||
t.Fatalf("SaveCheckRun calls = %d, want 2", len(st.savedCheckRuns))
|
|
||||||
}
|
|
||||||
if len(st.touchedProjects) != 1 || st.touchedProjects[0] != projectID {
|
if len(st.touchedProjects) != 1 || st.touchedProjects[0] != projectID {
|
||||||
t.Fatalf("TouchScheduleRun calls = %v, want [%s]", st.touchedProjects, projectID)
|
t.Fatalf("TouchScheduleRun calls = %v, want [%s]", st.touchedProjects, projectID)
|
||||||
}
|
}
|
||||||
@@ -163,8 +168,8 @@ func TestRunOnce_NotifiesOnDriftNotOnFirstInSync(t *testing.T) {
|
|||||||
if got := testutil.ToFloat64(m.ChecksTotal.WithLabelValues(StatusInSync)); got != 1 {
|
if got := testutil.ToFloat64(m.ChecksTotal.WithLabelValues(StatusInSync)); got != 1 {
|
||||||
t.Fatalf("ChecksTotal{in_sync} = %v, want 1", got)
|
t.Fatalf("ChecksTotal{in_sync} = %v, want 1", got)
|
||||||
}
|
}
|
||||||
if got := testutil.ToFloat64(m.DriftDomains); got != 1 {
|
if got := testutil.ToFloat64(m.DriftDomains); got != float64(st.driftCount) {
|
||||||
t.Fatalf("DriftDomains gauge = %v, want 1", got)
|
t.Fatalf("DriftDomains gauge = %v, want %d (from CountDriftDomains)", got, st.driftCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -229,10 +234,6 @@ func TestRunOnce_CheckError_StatusErrorAndNotify(t *testing.T) {
|
|||||||
if got := testutil.ToFloat64(m.ChecksTotal.WithLabelValues(StatusError)); got != 1 {
|
if got := testutil.ToFloat64(m.ChecksTotal.WithLabelValues(StatusError)); got != 1 {
|
||||||
t.Fatalf("ChecksTotal{error} = %v, want 1", got)
|
t.Fatalf("ChecksTotal{error} = %v, want 1", got)
|
||||||
}
|
}
|
||||||
// A failed Check has no changeset worth recording.
|
|
||||||
if len(st.savedCheckRuns) != 0 {
|
|
||||||
t.Fatalf("SaveCheckRun calls on error = %d, want 0", len(st.savedCheckRuns))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestShouldNotify(t *testing.T) {
|
func TestShouldNotify(t *testing.T) {
|
||||||
@@ -252,7 +253,7 @@ func TestShouldNotify(t *testing.T) {
|
|||||||
{"in_sync->error notifies", StatusInSync, StatusError, true},
|
{"in_sync->error notifies", StatusInSync, StatusError, true},
|
||||||
{"in_sync->in_sync is silent", StatusInSync, StatusInSync, false},
|
{"in_sync->in_sync is silent", StatusInSync, StatusInSync, false},
|
||||||
{"error->drift notifies (still bad, different bad)", StatusError, StatusDrift, true},
|
{"error->drift notifies (still bad, different bad)", StatusError, StatusDrift, true},
|
||||||
{"error->in_sync is not the 'resolved' case, per spec", StatusError, StatusInSync, false},
|
{"error->in_sync notifies (resolved after failure)", StatusError, StatusInSync, true},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
|
|||||||
@@ -12,6 +12,17 @@ import (
|
|||||||
dto "github.com/vasyakrg/dns-autoresolver/internal/store/dto"
|
dto "github.com/vasyakrg/dns-autoresolver/internal/store/dto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const countDriftDomains = `-- name: CountDriftDomains :one
|
||||||
|
SELECT count(*) FROM domains WHERE last_check_status = 'drift'
|
||||||
|
`
|
||||||
|
|
||||||
|
func (q *Queries) CountDriftDomains(ctx context.Context) (int64, error) {
|
||||||
|
row := q.db.QueryRow(ctx, countDriftDomains)
|
||||||
|
var count int64
|
||||||
|
err := row.Scan(&count)
|
||||||
|
return count, err
|
||||||
|
}
|
||||||
|
|
||||||
const createDomain = `-- name: CreateDomain :one
|
const createDomain = `-- name: CreateDomain :one
|
||||||
INSERT INTO domains (id, project_id, provider_account_id, zone_name, zone_id, template_id)
|
INSERT INTO domains (id, project_id, provider_account_id, zone_name, zone_id, template_id)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6)
|
VALUES ($1, $2, $3, $4, $5, $6)
|
||||||
|
|||||||
@@ -34,3 +34,6 @@ SELECT last_check_status FROM domains WHERE id = $1;
|
|||||||
|
|
||||||
-- name: SetDomainStatus :exec
|
-- name: SetDomainStatus :exec
|
||||||
UPDATE domains SET last_check_status = $2 WHERE id = $1;
|
UPDATE domains SET last_check_status = $2 WHERE id = $1;
|
||||||
|
|
||||||
|
-- name: CountDriftDomains :one
|
||||||
|
SELECT count(*) FROM domains WHERE last_check_status = 'drift';
|
||||||
|
|||||||
@@ -258,6 +258,14 @@ func (s *Store) SetDomainStatus(ctx context.Context, domainID uuid.UUID, status
|
|||||||
return s.q.SetDomainStatus(ctx, db.SetDomainStatusParams{ID: domainID, LastCheckStatus: status})
|
return s.q.SetDomainStatus(ctx, db.SetDomainStatusParams{ID: domainID, LastCheckStatus: status})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CountDriftDomains returns the current number of domains system-wide whose
|
||||||
|
// last check status is "drift". This is a global count (not per-project) —
|
||||||
|
// it backs the dns_ar_drift_domains gauge, which is a system-level metric.
|
||||||
|
func (s *Store) CountDriftDomains(ctx context.Context) (int, error) {
|
||||||
|
n, err := s.q.CountDriftDomains(ctx)
|
||||||
|
return int(n), err
|
||||||
|
}
|
||||||
|
|
||||||
// User and Project are provider-neutral domain structs for the auth/tenant
|
// User and Project are provider-neutral domain structs for the auth/tenant
|
||||||
// layer (Фаза 2), mirroring the Account/Template/Domain wrappers above so
|
// layer (Фаза 2), mirroring the Account/Template/Domain wrappers above so
|
||||||
// callers never need to import internal/store/db directly.
|
// callers never need to import internal/store/db directly.
|
||||||
|
|||||||
Reference in New Issue
Block a user