feat(scheduler): in-process планировщик проверок + смена статуса + уведомления + метрики
This commit is contained in:
@@ -0,0 +1,196 @@
|
||||
// Package scheduler runs an in-process loop that periodically checks every
|
||||
// domain of every due project schedule, records the resulting status, and
|
||||
// notifies configured channels on meaningful status transitions.
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/vasyakrg/dns-autoresolver/internal/diff"
|
||||
"github.com/vasyakrg/dns-autoresolver/internal/metrics"
|
||||
"github.com/vasyakrg/dns-autoresolver/internal/notify"
|
||||
"github.com/vasyakrg/dns-autoresolver/internal/store"
|
||||
)
|
||||
|
||||
// Domain check statuses persisted via SchedStore.SetDomainStatus /
|
||||
// surfaced via GetDomainStatus. "unknown" is the DB default for a domain
|
||||
// that has never been checked (see migrations/0004_schedule_notify.sql).
|
||||
const (
|
||||
StatusUnknown = "unknown"
|
||||
StatusInSync = "in_sync"
|
||||
StatusDrift = "drift"
|
||||
StatusError = "error"
|
||||
)
|
||||
|
||||
// SchedStore is the narrow store dependency the scheduler needs: due
|
||||
// schedules, their domains, and per-domain status bookkeeping.
|
||||
type SchedStore interface {
|
||||
ListDueSchedules(ctx context.Context, now time.Time) ([]store.Schedule, error)
|
||||
TouchScheduleRun(ctx context.Context, projectID uuid.UUID, at time.Time) error
|
||||
ListDomains(ctx context.Context, projectID uuid.UUID) ([]store.Domain, error)
|
||||
GetDomainStatus(ctx context.Context, domainID uuid.UUID) (string, error)
|
||||
SetDomainStatus(ctx context.Context, domainID uuid.UUID, status string) error
|
||||
SaveCheckRun(ctx context.Context, domainID uuid.UUID, cs diff.Changeset) error
|
||||
}
|
||||
|
||||
// Checker computes the diff between a domain's desired template and its
|
||||
// actual zone state. internal/service.DomainService satisfies this.
|
||||
type Checker interface {
|
||||
Check(ctx context.Context, projectID, domainID uuid.UUID) (diff.Changeset, error)
|
||||
}
|
||||
|
||||
// NotifySender delivers a status-change event to a project's notification
|
||||
// channels. internal/notify.Dispatcher satisfies this.
|
||||
type NotifySender interface {
|
||||
Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) error
|
||||
}
|
||||
|
||||
// Scheduler drives periodic domain checks for every due project schedule.
|
||||
type Scheduler struct {
|
||||
store SchedStore
|
||||
checker Checker
|
||||
notifier NotifySender
|
||||
metrics *metrics.Metrics
|
||||
}
|
||||
|
||||
// New builds a Scheduler wired with its store, checker, notifier and metrics
|
||||
// dependencies.
|
||||
func New(store SchedStore, checker Checker, notifier NotifySender, m *metrics.Metrics) *Scheduler {
|
||||
return &Scheduler{store: store, checker: checker, notifier: notifier, metrics: m}
|
||||
}
|
||||
|
||||
// Run ticks every `tick` and calls RunOnce until ctx is cancelled. A failed
|
||||
// iteration is logged, never fatal — the loop keeps ticking so a transient
|
||||
// store/provider outage does not permanently stop future checks.
|
||||
func (s *Scheduler) Run(ctx context.Context, tick time.Duration) {
|
||||
ticker := time.NewTicker(tick)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := s.RunOnce(ctx, time.Now()); err != nil {
|
||||
log.Printf("scheduler: run once failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RunOnce performs a single scheduling pass: every due project schedule is
|
||||
// checked, each of its domains is diffed against its template, its status
|
||||
// is updated, and channels are notified on a meaningful status transition.
|
||||
func (s *Scheduler) RunOnce(ctx context.Context, now time.Time) error {
|
||||
due, err := s.store.ListDueSchedules(ctx, now)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list due schedules: %w", err)
|
||||
}
|
||||
|
||||
driftCount := 0
|
||||
|
||||
for _, sch := range due {
|
||||
domains, err := s.store.ListDomains(ctx, sch.ProjectID)
|
||||
if err != nil {
|
||||
log.Printf("scheduler: list domains for project %s failed: %v", sch.ProjectID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, d := range domains {
|
||||
if s.checkDomain(ctx, sch.ProjectID, d, now) == StatusDrift {
|
||||
driftCount++
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.store.TouchScheduleRun(ctx, sch.ProjectID, now); err != nil {
|
||||
log.Printf("scheduler: touch schedule run for project %s failed: %v", sch.ProjectID, err)
|
||||
}
|
||||
}
|
||||
|
||||
s.metrics.SetDrift(driftCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkDomain runs a single domain's check, persists the outcome, and fires
|
||||
// a notification if the status transition warrants one. It returns the new
|
||||
// status.
|
||||
func (s *Scheduler) checkDomain(ctx context.Context, projectID uuid.UUID, d store.Domain, now time.Time) string {
|
||||
start := time.Now()
|
||||
cs, checkErr := s.checker.Check(ctx, projectID, d.ID)
|
||||
dur := time.Since(start)
|
||||
|
||||
newStatus := StatusInSync
|
||||
switch {
|
||||
case checkErr != nil:
|
||||
newStatus = StatusError
|
||||
case len(cs.Actionable()) > 0:
|
||||
newStatus = StatusDrift
|
||||
}
|
||||
s.metrics.ObserveCheck(newStatus, dur)
|
||||
|
||||
prev, err := s.store.GetDomainStatus(ctx, d.ID)
|
||||
if err != nil {
|
||||
log.Printf("scheduler: get domain status for %s failed: %v", d.ID, err)
|
||||
prev = StatusUnknown
|
||||
}
|
||||
|
||||
// A failed Check has no changeset worth recording; a successful one does.
|
||||
if checkErr == nil {
|
||||
if err := s.store.SaveCheckRun(ctx, d.ID, cs); err != nil {
|
||||
log.Printf("scheduler: save check run for %s failed: %v", d.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.store.SetDomainStatus(ctx, d.ID, newStatus); err != nil {
|
||||
log.Printf("scheduler: set domain status for %s failed: %v", d.ID, err)
|
||||
}
|
||||
|
||||
if shouldNotify(prev, newStatus) {
|
||||
ev := notify.Event{
|
||||
Project: projectID.String(),
|
||||
Domain: d.ID.String(),
|
||||
Status: newStatus,
|
||||
Summary: summarize(newStatus, cs, checkErr),
|
||||
At: now,
|
||||
}
|
||||
if err := s.notifier.Send(ctx, projectID, ev); err != nil {
|
||||
log.Printf("scheduler: notify send for project %s domain %s failed: %v", projectID, d.ID, err)
|
||||
}
|
||||
s.metrics.IncNotification("dispatch", newStatus)
|
||||
}
|
||||
|
||||
return newStatus
|
||||
}
|
||||
|
||||
// shouldNotify decides whether a prev -> new status transition is worth
|
||||
// alerting on:
|
||||
// - entering drift or error from any other status is always notified;
|
||||
// - recovering from drift back to in_sync ("resolved") is notified;
|
||||
// - the initial unknown -> in_sync transition (first successful check of a
|
||||
// domain that never drifted) is NOT notified — it is not news, it is the
|
||||
// expected steady state.
|
||||
func shouldNotify(prev, newStatus string) bool {
|
||||
if (newStatus == StatusDrift || newStatus == StatusError) && newStatus != prev {
|
||||
return true
|
||||
}
|
||||
if prev == StatusDrift && newStatus == StatusInSync {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// summarize builds a short, secret-free human-readable message for an Event.
|
||||
func summarize(status string, cs diff.Changeset, checkErr error) string {
|
||||
if checkErr != nil {
|
||||
return fmt.Sprintf("check failed: %v", checkErr)
|
||||
}
|
||||
if status == StatusDrift {
|
||||
return fmt.Sprintf("%d actionable diff(s) detected", len(cs.Actionable()))
|
||||
}
|
||||
return "zone back in sync with template"
|
||||
}
|
||||
Reference in New Issue
Block a user