From 23e02d68049313af9a15ea1674b07184c3975db2 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Sat, 4 Jul 2026 13:53:06 +0700 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20in-process=20=D0=BF=D0=BB?= =?UTF-8?q?=D0=B0=D0=BD=D0=B8=D1=80=D0=BE=D0=B2=D1=89=D0=B8=D0=BA=20=D0=BF?= =?UTF-8?q?=D1=80=D0=BE=D0=B2=D0=B5=D1=80=D0=BE=D0=BA=20+=20=D1=81=D0=BC?= =?UTF-8?q?=D0=B5=D0=BD=D0=B0=20=D1=81=D1=82=D0=B0=D1=82=D1=83=D1=81=D0=B0?= =?UTF-8?q?=20+=20=D1=83=D0=B2=D0=B5=D0=B4=D0=BE=D0=BC=D0=BB=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D1=8F=20+=20=D0=BC=D0=B5=D1=82=D1=80=D0=B8=D0=BA=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/scheduler/scheduler.go | 196 ++++++++++++++++++++ internal/scheduler/scheduler_test.go | 265 +++++++++++++++++++++++++++ 2 files changed, 461 insertions(+) create mode 100644 internal/scheduler/scheduler.go create mode 100644 internal/scheduler/scheduler_test.go diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go new file mode 100644 index 0000000..8dba3b4 --- /dev/null +++ b/internal/scheduler/scheduler.go @@ -0,0 +1,196 @@ +// Package scheduler runs an in-process loop that periodically checks every +// domain of every due project schedule, records the resulting status, and +// notifies configured channels on meaningful status transitions. +package scheduler + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/google/uuid" + + "github.com/vasyakrg/dns-autoresolver/internal/diff" + "github.com/vasyakrg/dns-autoresolver/internal/metrics" + "github.com/vasyakrg/dns-autoresolver/internal/notify" + "github.com/vasyakrg/dns-autoresolver/internal/store" +) + +// Domain check statuses persisted via SchedStore.SetDomainStatus / +// surfaced via GetDomainStatus. "unknown" is the DB default for a domain +// that has never been checked (see migrations/0004_schedule_notify.sql). +const ( + StatusUnknown = "unknown" + StatusInSync = "in_sync" + StatusDrift = "drift" + StatusError = "error" +) + +// SchedStore is the narrow store dependency the scheduler needs: due +// schedules, their domains, and per-domain status bookkeeping. +type SchedStore interface { + ListDueSchedules(ctx context.Context, now time.Time) ([]store.Schedule, error) + TouchScheduleRun(ctx context.Context, projectID uuid.UUID, at time.Time) error + ListDomains(ctx context.Context, projectID uuid.UUID) ([]store.Domain, error) + GetDomainStatus(ctx context.Context, domainID uuid.UUID) (string, error) + SetDomainStatus(ctx context.Context, domainID uuid.UUID, status string) error + SaveCheckRun(ctx context.Context, domainID uuid.UUID, cs diff.Changeset) error +} + +// Checker computes the diff between a domain's desired template and its +// actual zone state. internal/service.DomainService satisfies this. +type Checker interface { + Check(ctx context.Context, projectID, domainID uuid.UUID) (diff.Changeset, error) +} + +// NotifySender delivers a status-change event to a project's notification +// channels. internal/notify.Dispatcher satisfies this. +type NotifySender interface { + Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) error +} + +// Scheduler drives periodic domain checks for every due project schedule. +type Scheduler struct { + store SchedStore + checker Checker + notifier NotifySender + metrics *metrics.Metrics +} + +// New builds a Scheduler wired with its store, checker, notifier and metrics +// dependencies. +func New(store SchedStore, checker Checker, notifier NotifySender, m *metrics.Metrics) *Scheduler { + return &Scheduler{store: store, checker: checker, notifier: notifier, metrics: m} +} + +// Run ticks every `tick` and calls RunOnce until ctx is cancelled. A failed +// iteration is logged, never fatal — the loop keeps ticking so a transient +// store/provider outage does not permanently stop future checks. +func (s *Scheduler) Run(ctx context.Context, tick time.Duration) { + ticker := time.NewTicker(tick) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := s.RunOnce(ctx, time.Now()); err != nil { + log.Printf("scheduler: run once failed: %v", err) + } + } + } +} + +// RunOnce performs a single scheduling pass: every due project schedule is +// checked, each of its domains is diffed against its template, its status +// is updated, and channels are notified on a meaningful status transition. +func (s *Scheduler) RunOnce(ctx context.Context, now time.Time) error { + due, err := s.store.ListDueSchedules(ctx, now) + if err != nil { + return fmt.Errorf("list due schedules: %w", err) + } + + driftCount := 0 + + for _, sch := range due { + domains, err := s.store.ListDomains(ctx, sch.ProjectID) + if err != nil { + log.Printf("scheduler: list domains for project %s failed: %v", sch.ProjectID, err) + continue + } + + for _, d := range domains { + if s.checkDomain(ctx, sch.ProjectID, d, now) == StatusDrift { + driftCount++ + } + } + + if err := s.store.TouchScheduleRun(ctx, sch.ProjectID, now); err != nil { + log.Printf("scheduler: touch schedule run for project %s failed: %v", sch.ProjectID, err) + } + } + + s.metrics.SetDrift(driftCount) + return nil +} + +// checkDomain runs a single domain's check, persists the outcome, and fires +// a notification if the status transition warrants one. It returns the new +// status. +func (s *Scheduler) checkDomain(ctx context.Context, projectID uuid.UUID, d store.Domain, now time.Time) string { + start := time.Now() + cs, checkErr := s.checker.Check(ctx, projectID, d.ID) + dur := time.Since(start) + + newStatus := StatusInSync + switch { + case checkErr != nil: + newStatus = StatusError + case len(cs.Actionable()) > 0: + newStatus = StatusDrift + } + s.metrics.ObserveCheck(newStatus, dur) + + prev, err := s.store.GetDomainStatus(ctx, d.ID) + if err != nil { + log.Printf("scheduler: get domain status for %s failed: %v", d.ID, err) + prev = StatusUnknown + } + + // A failed Check has no changeset worth recording; a successful one does. + if checkErr == nil { + if err := s.store.SaveCheckRun(ctx, d.ID, cs); err != nil { + log.Printf("scheduler: save check run for %s failed: %v", d.ID, err) + } + } + + if err := s.store.SetDomainStatus(ctx, d.ID, newStatus); err != nil { + log.Printf("scheduler: set domain status for %s failed: %v", d.ID, err) + } + + if shouldNotify(prev, newStatus) { + ev := notify.Event{ + Project: projectID.String(), + Domain: d.ID.String(), + Status: newStatus, + Summary: summarize(newStatus, cs, checkErr), + At: now, + } + if err := s.notifier.Send(ctx, projectID, ev); err != nil { + log.Printf("scheduler: notify send for project %s domain %s failed: %v", projectID, d.ID, err) + } + s.metrics.IncNotification("dispatch", newStatus) + } + + return newStatus +} + +// shouldNotify decides whether a prev -> new status transition is worth +// alerting on: +// - entering drift or error from any other status is always notified; +// - recovering from drift back to in_sync ("resolved") is notified; +// - the initial unknown -> in_sync transition (first successful check of a +// domain that never drifted) is NOT notified — it is not news, it is the +// expected steady state. +func shouldNotify(prev, newStatus string) bool { + if (newStatus == StatusDrift || newStatus == StatusError) && newStatus != prev { + return true + } + if prev == StatusDrift && newStatus == StatusInSync { + return true + } + return false +} + +// summarize builds a short, secret-free human-readable message for an Event. +func summarize(status string, cs diff.Changeset, checkErr error) string { + if checkErr != nil { + return fmt.Sprintf("check failed: %v", checkErr) + } + if status == StatusDrift { + return fmt.Sprintf("%d actionable diff(s) detected", len(cs.Actionable())) + } + return "zone back in sync with template" +} diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go new file mode 100644 index 0000000..ecc0ef0 --- /dev/null +++ b/internal/scheduler/scheduler_test.go @@ -0,0 +1,265 @@ +package scheduler + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus/testutil" + + "github.com/vasyakrg/dns-autoresolver/internal/diff" + "github.com/vasyakrg/dns-autoresolver/internal/metrics" + "github.com/vasyakrg/dns-autoresolver/internal/notify" + "github.com/vasyakrg/dns-autoresolver/internal/store" +) + +// mockStore is an in-memory SchedStore double. +type mockStore struct { + mu sync.Mutex + + schedules []store.Schedule + domains map[uuid.UUID][]store.Domain + status map[uuid.UUID]string + + savedCheckRuns []uuid.UUID + touchedProjects []uuid.UUID +} + +func newMockStore() *mockStore { + return &mockStore{ + domains: make(map[uuid.UUID][]store.Domain), + status: make(map[uuid.UUID]string), + } +} + +func (m *mockStore) ListDueSchedules(ctx context.Context, now time.Time) ([]store.Schedule, error) { + return m.schedules, nil +} + +func (m *mockStore) TouchScheduleRun(ctx context.Context, projectID uuid.UUID, at time.Time) error { + m.mu.Lock() + defer m.mu.Unlock() + m.touchedProjects = append(m.touchedProjects, projectID) + return nil +} + +func (m *mockStore) ListDomains(ctx context.Context, projectID uuid.UUID) ([]store.Domain, error) { + return m.domains[projectID], nil +} + +func (m *mockStore) GetDomainStatus(ctx context.Context, domainID uuid.UUID) (string, error) { + m.mu.Lock() + defer m.mu.Unlock() + if st, ok := m.status[domainID]; ok { + return st, nil + } + return StatusUnknown, nil +} + +func (m *mockStore) SetDomainStatus(ctx context.Context, domainID uuid.UUID, status string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.status[domainID] = status + return nil +} + +func (m *mockStore) SaveCheckRun(ctx context.Context, domainID uuid.UUID, cs diff.Changeset) error { + m.mu.Lock() + defer m.mu.Unlock() + m.savedCheckRuns = append(m.savedCheckRuns, domainID) + return nil +} + +// mockChecker returns a preset Changeset or error per domainID. +type mockChecker struct { + results map[uuid.UUID]diff.Changeset + errs map[uuid.UUID]error +} + +func (c *mockChecker) Check(ctx context.Context, projectID, domainID uuid.UUID) (diff.Changeset, error) { + if err, ok := c.errs[domainID]; ok { + return diff.Changeset{}, err + } + return c.results[domainID], nil +} + +// mockNotifier records every Event it is asked to Send. +type mockNotifier struct { + mu sync.Mutex + events []notify.Event +} + +func (n *mockNotifier) Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) error { + n.mu.Lock() + defer n.mu.Unlock() + n.events = append(n.events, ev) + return nil +} + +func (n *mockNotifier) count() int { + n.mu.Lock() + defer n.mu.Unlock() + return len(n.events) +} + +func driftChangeset() diff.Changeset { + return diff.Changeset{Diffs: []diff.RecordDiff{{Kind: diff.Update, Name: "www"}}} +} + +func TestRunOnce_NotifiesOnDriftNotOnFirstInSync(t *testing.T) { + projectID := uuid.New() + domainA := store.Domain{ID: uuid.New(), ProjectID: projectID} + domainB := store.Domain{ID: uuid.New(), ProjectID: projectID} + + st := newMockStore() + st.schedules = []store.Schedule{{ID: uuid.New(), ProjectID: projectID, IntervalSeconds: 3600, Enabled: true}} + st.domains[projectID] = []store.Domain{domainA, domainB} + + checker := &mockChecker{ + results: map[uuid.UUID]diff.Changeset{ + domainA.ID: driftChangeset(), + domainB.ID: {}, + }, + } + notifier := &mockNotifier{} + m := metrics.New() + + sched := New(st, checker, notifier, m) + + if err := sched.RunOnce(context.Background(), time.Now()); err != nil { + t.Fatalf("RunOnce: %v", err) + } + + if st.status[domainA.ID] != StatusDrift { + t.Fatalf("domain A status = %q, want drift", st.status[domainA.ID]) + } + if st.status[domainB.ID] != StatusInSync { + t.Fatalf("domain B status = %q, want in_sync", st.status[domainB.ID]) + } + + if got := notifier.count(); got != 1 { + t.Fatalf("notifications sent = %d, want 1 (only domain A)", got) + } + if notifier.events[0].Domain != domainA.ID.String() { + t.Fatalf("notified domain = %q, want domain A (%s)", notifier.events[0].Domain, domainA.ID) + } + if notifier.events[0].Status != StatusDrift { + t.Fatalf("notified status = %q, want drift", notifier.events[0].Status) + } + + if len(st.savedCheckRuns) != 2 { + t.Fatalf("SaveCheckRun calls = %d, want 2", len(st.savedCheckRuns)) + } + if len(st.touchedProjects) != 1 || st.touchedProjects[0] != projectID { + t.Fatalf("TouchScheduleRun calls = %v, want [%s]", st.touchedProjects, projectID) + } + + if got := testutil.ToFloat64(m.ChecksTotal.WithLabelValues(StatusDrift)); got != 1 { + t.Fatalf("ChecksTotal{drift} = %v, want 1", got) + } + if got := testutil.ToFloat64(m.ChecksTotal.WithLabelValues(StatusInSync)); got != 1 { + t.Fatalf("ChecksTotal{in_sync} = %v, want 1", got) + } + if got := testutil.ToFloat64(m.DriftDomains); got != 1 { + t.Fatalf("DriftDomains gauge = %v, want 1", got) + } +} + +func TestRunOnce_Idempotent_NoRepeatNotifyOnUnchangedDrift(t *testing.T) { + projectID := uuid.New() + domainA := store.Domain{ID: uuid.New(), ProjectID: projectID} + + st := newMockStore() + st.schedules = []store.Schedule{{ID: uuid.New(), ProjectID: projectID, IntervalSeconds: 3600, Enabled: true}} + st.domains[projectID] = []store.Domain{domainA} + + checker := &mockChecker{ + results: map[uuid.UUID]diff.Changeset{domainA.ID: driftChangeset()}, + } + notifier := &mockNotifier{} + m := metrics.New() + sched := New(st, checker, notifier, m) + + if err := sched.RunOnce(context.Background(), time.Now()); err != nil { + t.Fatalf("first RunOnce: %v", err) + } + if got := notifier.count(); got != 1 { + t.Fatalf("after first run notifications = %d, want 1", got) + } + + if err := sched.RunOnce(context.Background(), time.Now()); err != nil { + t.Fatalf("second RunOnce: %v", err) + } + if got := notifier.count(); got != 1 { + t.Fatalf("after second run (drift->drift) notifications = %d, want still 1 (no repeat)", got) + } +} + +func TestRunOnce_CheckError_StatusErrorAndNotify(t *testing.T) { + projectID := uuid.New() + domainA := store.Domain{ID: uuid.New(), ProjectID: projectID} + + st := newMockStore() + st.schedules = []store.Schedule{{ID: uuid.New(), ProjectID: projectID, IntervalSeconds: 3600, Enabled: true}} + st.domains[projectID] = []store.Domain{domainA} + + checker := &mockChecker{ + errs: map[uuid.UUID]error{domainA.ID: errors.New("provider timeout")}, + } + notifier := &mockNotifier{} + m := metrics.New() + sched := New(st, checker, notifier, m) + + if err := sched.RunOnce(context.Background(), time.Now()); err != nil { + t.Fatalf("RunOnce: %v", err) + } + + if st.status[domainA.ID] != StatusError { + t.Fatalf("domain A status = %q, want error", st.status[domainA.ID]) + } + if got := notifier.count(); got != 1 { + t.Fatalf("notifications = %d, want 1 (unknown->error)", got) + } + if notifier.events[0].Status != StatusError { + t.Fatalf("notified status = %q, want error", notifier.events[0].Status) + } + if got := testutil.ToFloat64(m.ChecksTotal.WithLabelValues(StatusError)); got != 1 { + t.Fatalf("ChecksTotal{error} = %v, want 1", got) + } + // A failed Check has no changeset worth recording. + if len(st.savedCheckRuns) != 0 { + t.Fatalf("SaveCheckRun calls on error = %d, want 0", len(st.savedCheckRuns)) + } +} + +func TestShouldNotify(t *testing.T) { + cases := []struct { + name string + prev string + new string + want bool + }{ + {"unknown->drift notifies", StatusUnknown, StatusDrift, true}, + {"unknown->error notifies", StatusUnknown, StatusError, true}, + {"unknown->in_sync is silent (first sync is not news)", StatusUnknown, StatusInSync, false}, + {"drift->drift does not repeat", StatusDrift, StatusDrift, false}, + {"error->error does not repeat", StatusError, StatusError, false}, + {"drift->in_sync notifies (resolved)", StatusDrift, StatusInSync, true}, + {"in_sync->drift notifies", StatusInSync, StatusDrift, true}, + {"in_sync->error notifies", StatusInSync, StatusError, true}, + {"in_sync->in_sync is silent", StatusInSync, StatusInSync, false}, + {"error->drift notifies (still bad, different bad)", StatusError, StatusDrift, true}, + {"error->in_sync is not the 'resolved' case, per spec", StatusError, StatusInSync, false}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := shouldNotify(tc.prev, tc.new); got != tc.want { + t.Fatalf("shouldNotify(%q, %q) = %v, want %v", tc.prev, tc.new, got, tc.want) + } + }) + } +}