Files
dns-autoresolver/internal/scheduler/scheduler.go
T
vasyansk f14916396c feat(notify): per-channel delivery results + accurate notification metrics
Dispatcher.Send now returns []ChannelResult{Type, Err} alongside the
aggregated error, and scheduler.checkDomain increments
NotificationsTotal per channel type/status instead of a single
unconditional IncNotification("dispatch", newStatus) placeholder that
ignored per-channel delivery outcome.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01BwxdSt4reTm7Dj1oxRvpP3
2026-07-04 15:56:15 +07:00

219 lines
7.5 KiB
Go

// Package scheduler runs an in-process loop that periodically checks every
// domain of every due project schedule, records the resulting status, and
// notifies configured channels on meaningful status transitions.
package scheduler
import (
"context"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/vasyakrg/dns-autoresolver/internal/diff"
"github.com/vasyakrg/dns-autoresolver/internal/metrics"
"github.com/vasyakrg/dns-autoresolver/internal/notify"
"github.com/vasyakrg/dns-autoresolver/internal/store"
)
// Domain check statuses persisted via SchedStore.SetDomainStatus /
// surfaced via GetDomainStatus. "unknown" is the DB default for a domain
// that has never been checked (see migrations/0004_schedule_notify.sql).
const (
StatusUnknown = "unknown"
StatusInSync = "in_sync"
StatusDrift = "drift"
StatusError = "error"
)
// SchedStore is the narrow store dependency the scheduler needs: due
// schedules, their domains, and per-domain status bookkeeping. Persisting
// the check result itself (check_runs) is the Checker's job — see Checker
// below — not the scheduler's.
type SchedStore interface {
ListDueSchedules(ctx context.Context, now time.Time) ([]store.Schedule, error)
TouchScheduleRun(ctx context.Context, projectID uuid.UUID, at time.Time) error
ListDomains(ctx context.Context, projectID uuid.UUID) ([]store.Domain, error)
GetDomainStatus(ctx context.Context, domainID uuid.UUID) (string, error)
SetDomainStatus(ctx context.Context, domainID uuid.UUID, status string) error
CountDriftDomains(ctx context.Context) (int, error)
}
// Checker computes the diff between a domain's desired template and its
// actual zone state. internal/service.DomainService satisfies this.
type Checker interface {
Check(ctx context.Context, projectID, domainID uuid.UUID) (diff.Changeset, error)
}
// NotifySender delivers a status-change event to a project's notification
// channels. internal/notify.Dispatcher satisfies this.
type NotifySender interface {
Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) ([]notify.ChannelResult, error)
}
// Scheduler drives periodic domain checks for every due project schedule.
type Scheduler struct {
store SchedStore
checker Checker
notifier NotifySender
metrics *metrics.Metrics
}
// New builds a Scheduler wired with its store, checker, notifier and metrics
// dependencies.
func New(store SchedStore, checker Checker, notifier NotifySender, m *metrics.Metrics) *Scheduler {
return &Scheduler{store: store, checker: checker, notifier: notifier, metrics: m}
}
// Run ticks every `tick` and calls RunOnce until ctx is cancelled. A failed
// iteration is logged, never fatal — the loop keeps ticking so a transient
// store/provider outage does not permanently stop future checks.
func (s *Scheduler) Run(ctx context.Context, tick time.Duration) {
ticker := time.NewTicker(tick)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := s.RunOnce(ctx, time.Now()); err != nil {
log.Printf("scheduler: run once failed: %v", err)
}
}
}
}
// RunOnce performs a single scheduling pass: every due project schedule is
// checked, each of its domains is diffed against its template, its status
// is updated, and channels are notified on a meaningful status transition.
func (s *Scheduler) RunOnce(ctx context.Context, now time.Time) error {
due, err := s.store.ListDueSchedules(ctx, now)
if err != nil {
return fmt.Errorf("list due schedules: %w", err)
}
for _, sch := range due {
domains, err := s.store.ListDomains(ctx, sch.ProjectID)
if err != nil {
log.Printf("scheduler: list domains for project %s failed: %v", sch.ProjectID, err)
continue
}
for _, d := range domains {
// A domain with no template attached is not yet configured for
// checking (a valid, expected state right after import) — not a
// failure. Checking it would make LoadDomain return "domain has
// no template", turning into a StatusError that spams a
// notification and shows a red badge for a domain the user
// simply hasn't set up yet. Skip it silently: no check, no
// status change, no notification.
if d.TemplateID == nil {
continue
}
s.checkDomain(ctx, sch.ProjectID, d, now)
}
if err := s.store.TouchScheduleRun(ctx, sch.ProjectID, now); err != nil {
log.Printf("scheduler: touch schedule run for project %s failed: %v", sch.ProjectID, err)
}
}
// The real, system-wide count of drift domains — not a local
// accumulator scoped to this tick's due projects — so the gauge
// reflects reality even across ticks where different projects are due.
count, err := s.store.CountDriftDomains(ctx)
if err != nil {
log.Printf("scheduler: count drift domains failed: %v", err)
} else {
s.metrics.SetDrift(count)
}
return nil
}
// checkDomain runs a single domain's check, persists the outcome, and fires
// a notification if the status transition warrants one. It returns the new
// status.
func (s *Scheduler) checkDomain(ctx context.Context, projectID uuid.UUID, d store.Domain, now time.Time) string {
start := time.Now()
cs, checkErr := s.checker.Check(ctx, projectID, d.ID)
dur := time.Since(start)
newStatus := StatusInSync
switch {
case checkErr != nil:
newStatus = StatusError
case len(cs.Actionable()) > 0:
newStatus = StatusDrift
}
s.metrics.ObserveCheck(newStatus, dur)
prev, err := s.store.GetDomainStatus(ctx, d.ID)
if err != nil {
log.Printf("scheduler: get domain status for %s failed: %v", d.ID, err)
prev = StatusUnknown
}
// Persisting the check_runs row is the Checker's job: DomainService.Check
// already calls Recorder.SaveCheckRun internally on every successful
// check (drift or in_sync). Calling it again here would double-write
// check_runs history for the same check.
if err := s.store.SetDomainStatus(ctx, d.ID, newStatus); err != nil {
log.Printf("scheduler: set domain status for %s failed: %v", d.ID, err)
}
if shouldNotify(prev, newStatus) {
ev := notify.Event{
Project: projectID.String(),
Domain: d.ID.String(),
Status: newStatus,
Summary: summarize(newStatus, cs, checkErr),
At: now,
}
results, err := s.notifier.Send(ctx, projectID, ev)
if err != nil {
log.Printf("scheduler: notify send for project %s domain %s failed: %v", projectID, d.ID, err)
}
for _, r := range results {
status := "sent"
if r.Err != nil {
status = "failed"
}
s.metrics.IncNotification(r.Type, status)
}
}
return newStatus
}
// shouldNotify decides whether a prev -> new status transition is worth
// alerting on:
// - entering drift or error from any other status is always notified;
// - recovering from drift OR error back to in_sync ("resolved") is
// notified — including recovery after a provider/check failure;
// - the initial unknown -> in_sync transition (first successful check of a
// domain that never drifted or errored) is NOT notified — it is not
// news, it is the expected steady state.
func shouldNotify(prev, newStatus string) bool {
if (newStatus == StatusDrift || newStatus == StatusError) && newStatus != prev {
return true
}
if (prev == StatusDrift || prev == StatusError) && newStatus == StatusInSync {
return true
}
return false
}
// summarize builds a short, secret-free human-readable message for an Event.
func summarize(status string, cs diff.Changeset, checkErr error) string {
if checkErr != nil {
return fmt.Sprintf("check failed: %v", checkErr)
}
if status == StatusDrift {
return fmt.Sprintf("%d actionable diff(s) detected", len(cs.Actionable()))
}
return "zone back in sync with template"
}