diff --git a/internal/notify/dispatch.go b/internal/notify/dispatch.go index 357e772..670ddb5 100644 --- a/internal/notify/dispatch.go +++ b/internal/notify/dispatch.go @@ -49,14 +49,23 @@ func NewDispatcher(store ChannelStore, cipher Decryptor) *Dispatcher { } } +// ChannelResult is the per-channel delivery outcome, so callers can record +// success/failure metrics per channel type instead of one aggregate blob. +type ChannelResult struct { + Type string + Err error +} + // Send delivers ev to every enabled channel of projectID. Errors from // individual channels are aggregated (via errors.Join) rather than aborting -// delivery to the remaining channels. -func (d *Dispatcher) Send(ctx context.Context, projectID uuid.UUID, ev Event) error { +// delivery to the remaining channels; the per-channel outcome is also +// returned so callers can record accurate per-channel/status metrics. +func (d *Dispatcher) Send(ctx context.Context, projectID uuid.UUID, ev Event) ([]ChannelResult, error) { channels, err := d.store.ListEnabledChannels(ctx, projectID) if err != nil { - return err + return nil, err } + var results []ChannelResult var errs []error for _, ch := range channels { n, ok := d.byType[ch.Type] @@ -65,18 +74,21 @@ func (d *Dispatcher) Send(ctx context.Context, projectID uuid.UUID, ev Event) er } secret := "" if ch.SecretEnc != "" { - b, err := d.cipher.Decrypt(ch.SecretEnc) - if err != nil { - errs = append(errs, err) + b, derr := d.cipher.Decrypt(ch.SecretEnc) + if derr != nil { + errs = append(errs, derr) + results = append(results, ChannelResult{Type: ch.Type, Err: derr}) continue } secret = string(b) } - if err := n.Send(ctx, ch.Config, secret, ev); err != nil { - errs = append(errs, err) + serr := n.Send(ctx, ch.Config, secret, ev) + if serr != nil { + errs = append(errs, serr) } + results = append(results, ChannelResult{Type: ch.Type, Err: serr}) } - return errors.Join(errs...) + return results, errors.Join(errs...) } // SendTest sends a single synthetic Event directly through the Notifier for diff --git a/internal/notify/notify_test.go b/internal/notify/notify_test.go index 62a7c0d..63ea1db 100644 --- a/internal/notify/notify_test.go +++ b/internal/notify/notify_test.go @@ -330,7 +330,7 @@ func TestDispatcherSendsToAllChannelsAndAggregatesErrors(t *testing.T) { d.byType["webhook"] = &Webhook{HTTP: whSrv.Client(), allowPrivate: true} ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "changed", At: time.Now()} - err := d.Send(context.Background(), projectID, ev) + results, err := d.Send(context.Background(), projectID, ev) if !tgCalled { t.Error("expected telegram notifier to be called") @@ -344,6 +344,54 @@ func TestDispatcherSendsToAllChannelsAndAggregatesErrors(t *testing.T) { if tgSecret != "decrypted-enc-token" { t.Fatalf("expected decrypted secret to be passed to telegram, got %q", tgSecret) } + + if len(results) != 2 { + t.Fatalf("results = %d, want 2", len(results)) + } + byType := make(map[string]ChannelResult, len(results)) + for _, r := range results { + byType[r.Type] = r + } + if tg, ok := byType["telegram"]; !ok || tg.Err != nil { + t.Fatalf("telegram result = %+v, want ok result", tg) + } + if wh, ok := byType["webhook"]; !ok || wh.Err == nil { + t.Fatalf("webhook result = %+v, want error result", wh) + } +} + +// TestDispatcherSendReturnsPerChannelResults exercises the exact scenario +// from the plan: one telegram channel succeeding, one webhook channel +// failing at the Notifier — the metric consumer (scheduler) needs a result +// per channel, not one aggregate blob, to record accurate per-channel/status +// metrics. +func TestDispatcherSendReturnsPerChannelResults(t *testing.T) { + projectID := uuid.New() + channels := []store.Channel{ + {ID: uuid.New(), ProjectID: projectID, Type: "telegram", Config: json.RawMessage(`{"chat_id":"1"}`), Enabled: true}, + {ID: uuid.New(), ProjectID: projectID, Type: "webhook", Config: json.RawMessage(`{"url":"http://x"}`), Enabled: true}, + } + d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{}) + d.byType["telegram"] = notifierFunc(func(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error { + return nil + }) + d.byType["webhook"] = notifierFunc(func(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error { + return errBoom + }) + + results, err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"}) + if err == nil { + t.Fatal("expected aggregated error because webhook failed") + } + if len(results) != 2 { + t.Fatalf("results = %d, want 2", len(results)) + } + if results[0].Type != "telegram" || results[0].Err != nil { + t.Fatalf("results[0] = %+v, want telegram/nil", results[0]) + } + if results[1].Type != "webhook" || results[1].Err == nil { + t.Fatalf("results[1] = %+v, want webhook/error", results[1]) + } } func TestDispatcherSkipsUnknownChannelType(t *testing.T) { @@ -352,7 +400,7 @@ func TestDispatcherSkipsUnknownChannelType(t *testing.T) { {ID: uuid.New(), ProjectID: projectID, Type: "carrier-pigeon", Config: json.RawMessage(`{}`), Enabled: true}, } d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{}) - if err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"}); err != nil { + if _, err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"}); err != nil { t.Fatalf("unexpected error for unknown channel type: %v", err) } } @@ -375,7 +423,7 @@ func TestDispatcherDecryptFailureIsAggregatedNotFatal(t *testing.T) { // default; swap in an allowPrivate webhook so this test can still hit it. d.byType["webhook"] = &Webhook{HTTP: whSrv.Client(), allowPrivate: true} - err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"}) + _, err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"}) if err == nil { t.Fatal("expected error due to decrypt failure") } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 240ad9d..47b443b 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -49,7 +49,7 @@ type Checker interface { // NotifySender delivers a status-change event to a project's notification // channels. internal/notify.Dispatcher satisfies this. type NotifySender interface { - Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) error + Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) ([]notify.ChannelResult, error) } // Scheduler drives periodic domain checks for every due project schedule. @@ -172,10 +172,17 @@ func (s *Scheduler) checkDomain(ctx context.Context, projectID uuid.UUID, d stor Summary: summarize(newStatus, cs, checkErr), At: now, } - if err := s.notifier.Send(ctx, projectID, ev); err != nil { + results, err := s.notifier.Send(ctx, projectID, ev) + if err != nil { log.Printf("scheduler: notify send for project %s domain %s failed: %v", projectID, d.ID, err) } - s.metrics.IncNotification("dispatch", newStatus) + for _, r := range results { + status := "sent" + if r.Err != nil { + status = "failed" + } + s.metrics.IncNotification(r.Type, status) + } } return newStatus diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 280e1d6..d6a6c75 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -94,17 +94,21 @@ func (c *mockChecker) Check(ctx context.Context, projectID, domainID uuid.UUID) return c.results[domainID], nil } -// mockNotifier records every Event it is asked to Send. +// mockNotifier records every Event it is asked to Send, and returns a +// canned set of per-channel results (results defaults to nil, i.e. no +// channels) so tests can assert on scheduler.checkDomain's per-channel +// metric recording. type mockNotifier struct { - mu sync.Mutex - events []notify.Event + mu sync.Mutex + events []notify.Event + results []notify.ChannelResult } -func (n *mockNotifier) Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) error { +func (n *mockNotifier) Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) ([]notify.ChannelResult, error) { n.mu.Lock() defer n.mu.Unlock() n.events = append(n.events, ev) - return nil + return n.results, nil } func (n *mockNotifier) count() int { @@ -291,6 +295,45 @@ func TestRunOnce_SkipsDomainWithoutTemplate(t *testing.T) { } } +// TestRunOnce_RecordsPerChannelNotificationMetrics exercises the fix for the +// "IncNotification('dispatch', newStatus) unconditionally" bug: the +// scheduler must record one NotificationsTotal increment per channel result, +// labeled by that channel's actual type and its actual sent/failed outcome +// — not a single "dispatch" placeholder blind to per-channel delivery. +func TestRunOnce_RecordsPerChannelNotificationMetrics(t *testing.T) { + projectID := uuid.New() + templateID := uuid.New() + domainA := store.Domain{ID: uuid.New(), ProjectID: projectID, TemplateID: &templateID} + + st := newMockStore() + st.schedules = []store.Schedule{{ID: uuid.New(), ProjectID: projectID, IntervalSeconds: 3600, Enabled: true}} + st.domains[projectID] = []store.Domain{domainA} + + checker := &mockChecker{ + results: map[uuid.UUID]diff.Changeset{domainA.ID: driftChangeset()}, + } + notifier := &mockNotifier{ + results: []notify.ChannelResult{ + {Type: "telegram"}, + {Type: "webhook", Err: errors.New("x")}, + }, + } + m := metrics.New() + sched := New(st, checker, notifier, m) + + // unknown -> drift: shouldNotify is true, so notifier.Send fires once. + if err := sched.RunOnce(context.Background(), time.Now()); err != nil { + t.Fatalf("RunOnce: %v", err) + } + + if got := testutil.ToFloat64(m.NotificationsTotal.WithLabelValues("telegram", "sent")); got != 1 { + t.Fatalf("NotificationsTotal{telegram,sent} = %v, want 1", got) + } + if got := testutil.ToFloat64(m.NotificationsTotal.WithLabelValues("webhook", "failed")); got != 1 { + t.Fatalf("NotificationsTotal{webhook,failed} = %v, want 1", got) + } +} + func TestShouldNotify(t *testing.T) { cases := []struct { name string