feat(notify): per-channel delivery results + accurate notification metrics
Dispatcher.Send now returns []ChannelResult{Type, Err} alongside the
aggregated error, and scheduler.checkDomain increments
NotificationsTotal per channel type/status instead of a single
unconditional IncNotification("dispatch", newStatus) placeholder that
ignored per-channel delivery outcome.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01BwxdSt4reTm7Dj1oxRvpP3
This commit is contained in:
@@ -49,14 +49,23 @@ func NewDispatcher(store ChannelStore, cipher Decryptor) *Dispatcher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ChannelResult is the per-channel delivery outcome, so callers can record
|
||||||
|
// success/failure metrics per channel type instead of one aggregate blob.
|
||||||
|
type ChannelResult struct {
|
||||||
|
Type string
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
// Send delivers ev to every enabled channel of projectID. Errors from
|
// Send delivers ev to every enabled channel of projectID. Errors from
|
||||||
// individual channels are aggregated (via errors.Join) rather than aborting
|
// individual channels are aggregated (via errors.Join) rather than aborting
|
||||||
// delivery to the remaining channels.
|
// delivery to the remaining channels; the per-channel outcome is also
|
||||||
func (d *Dispatcher) Send(ctx context.Context, projectID uuid.UUID, ev Event) error {
|
// returned so callers can record accurate per-channel/status metrics.
|
||||||
|
func (d *Dispatcher) Send(ctx context.Context, projectID uuid.UUID, ev Event) ([]ChannelResult, error) {
|
||||||
channels, err := d.store.ListEnabledChannels(ctx, projectID)
|
channels, err := d.store.ListEnabledChannels(ctx, projectID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
var results []ChannelResult
|
||||||
var errs []error
|
var errs []error
|
||||||
for _, ch := range channels {
|
for _, ch := range channels {
|
||||||
n, ok := d.byType[ch.Type]
|
n, ok := d.byType[ch.Type]
|
||||||
@@ -65,18 +74,21 @@ func (d *Dispatcher) Send(ctx context.Context, projectID uuid.UUID, ev Event) er
|
|||||||
}
|
}
|
||||||
secret := ""
|
secret := ""
|
||||||
if ch.SecretEnc != "" {
|
if ch.SecretEnc != "" {
|
||||||
b, err := d.cipher.Decrypt(ch.SecretEnc)
|
b, derr := d.cipher.Decrypt(ch.SecretEnc)
|
||||||
if err != nil {
|
if derr != nil {
|
||||||
errs = append(errs, err)
|
errs = append(errs, derr)
|
||||||
|
results = append(results, ChannelResult{Type: ch.Type, Err: derr})
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
secret = string(b)
|
secret = string(b)
|
||||||
}
|
}
|
||||||
if err := n.Send(ctx, ch.Config, secret, ev); err != nil {
|
serr := n.Send(ctx, ch.Config, secret, ev)
|
||||||
errs = append(errs, err)
|
if serr != nil {
|
||||||
|
errs = append(errs, serr)
|
||||||
}
|
}
|
||||||
|
results = append(results, ChannelResult{Type: ch.Type, Err: serr})
|
||||||
}
|
}
|
||||||
return errors.Join(errs...)
|
return results, errors.Join(errs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendTest sends a single synthetic Event directly through the Notifier for
|
// SendTest sends a single synthetic Event directly through the Notifier for
|
||||||
|
|||||||
@@ -330,7 +330,7 @@ func TestDispatcherSendsToAllChannelsAndAggregatesErrors(t *testing.T) {
|
|||||||
d.byType["webhook"] = &Webhook{HTTP: whSrv.Client(), allowPrivate: true}
|
d.byType["webhook"] = &Webhook{HTTP: whSrv.Client(), allowPrivate: true}
|
||||||
|
|
||||||
ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "changed", At: time.Now()}
|
ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "changed", At: time.Now()}
|
||||||
err := d.Send(context.Background(), projectID, ev)
|
results, err := d.Send(context.Background(), projectID, ev)
|
||||||
|
|
||||||
if !tgCalled {
|
if !tgCalled {
|
||||||
t.Error("expected telegram notifier to be called")
|
t.Error("expected telegram notifier to be called")
|
||||||
@@ -344,6 +344,54 @@ func TestDispatcherSendsToAllChannelsAndAggregatesErrors(t *testing.T) {
|
|||||||
if tgSecret != "decrypted-enc-token" {
|
if tgSecret != "decrypted-enc-token" {
|
||||||
t.Fatalf("expected decrypted secret to be passed to telegram, got %q", tgSecret)
|
t.Fatalf("expected decrypted secret to be passed to telegram, got %q", tgSecret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(results) != 2 {
|
||||||
|
t.Fatalf("results = %d, want 2", len(results))
|
||||||
|
}
|
||||||
|
byType := make(map[string]ChannelResult, len(results))
|
||||||
|
for _, r := range results {
|
||||||
|
byType[r.Type] = r
|
||||||
|
}
|
||||||
|
if tg, ok := byType["telegram"]; !ok || tg.Err != nil {
|
||||||
|
t.Fatalf("telegram result = %+v, want ok result", tg)
|
||||||
|
}
|
||||||
|
if wh, ok := byType["webhook"]; !ok || wh.Err == nil {
|
||||||
|
t.Fatalf("webhook result = %+v, want error result", wh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDispatcherSendReturnsPerChannelResults exercises the exact scenario
|
||||||
|
// from the plan: one telegram channel succeeding, one webhook channel
|
||||||
|
// failing at the Notifier — the metric consumer (scheduler) needs a result
|
||||||
|
// per channel, not one aggregate blob, to record accurate per-channel/status
|
||||||
|
// metrics.
|
||||||
|
func TestDispatcherSendReturnsPerChannelResults(t *testing.T) {
|
||||||
|
projectID := uuid.New()
|
||||||
|
channels := []store.Channel{
|
||||||
|
{ID: uuid.New(), ProjectID: projectID, Type: "telegram", Config: json.RawMessage(`{"chat_id":"1"}`), Enabled: true},
|
||||||
|
{ID: uuid.New(), ProjectID: projectID, Type: "webhook", Config: json.RawMessage(`{"url":"http://x"}`), Enabled: true},
|
||||||
|
}
|
||||||
|
d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{})
|
||||||
|
d.byType["telegram"] = notifierFunc(func(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
d.byType["webhook"] = notifierFunc(func(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error {
|
||||||
|
return errBoom
|
||||||
|
})
|
||||||
|
|
||||||
|
results, err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected aggregated error because webhook failed")
|
||||||
|
}
|
||||||
|
if len(results) != 2 {
|
||||||
|
t.Fatalf("results = %d, want 2", len(results))
|
||||||
|
}
|
||||||
|
if results[0].Type != "telegram" || results[0].Err != nil {
|
||||||
|
t.Fatalf("results[0] = %+v, want telegram/nil", results[0])
|
||||||
|
}
|
||||||
|
if results[1].Type != "webhook" || results[1].Err == nil {
|
||||||
|
t.Fatalf("results[1] = %+v, want webhook/error", results[1])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDispatcherSkipsUnknownChannelType(t *testing.T) {
|
func TestDispatcherSkipsUnknownChannelType(t *testing.T) {
|
||||||
@@ -352,7 +400,7 @@ func TestDispatcherSkipsUnknownChannelType(t *testing.T) {
|
|||||||
{ID: uuid.New(), ProjectID: projectID, Type: "carrier-pigeon", Config: json.RawMessage(`{}`), Enabled: true},
|
{ID: uuid.New(), ProjectID: projectID, Type: "carrier-pigeon", Config: json.RawMessage(`{}`), Enabled: true},
|
||||||
}
|
}
|
||||||
d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{})
|
d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{})
|
||||||
if err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"}); err != nil {
|
if _, err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"}); err != nil {
|
||||||
t.Fatalf("unexpected error for unknown channel type: %v", err)
|
t.Fatalf("unexpected error for unknown channel type: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -375,7 +423,7 @@ func TestDispatcherDecryptFailureIsAggregatedNotFatal(t *testing.T) {
|
|||||||
// default; swap in an allowPrivate webhook so this test can still hit it.
|
// default; swap in an allowPrivate webhook so this test can still hit it.
|
||||||
d.byType["webhook"] = &Webhook{HTTP: whSrv.Client(), allowPrivate: true}
|
d.byType["webhook"] = &Webhook{HTTP: whSrv.Client(), allowPrivate: true}
|
||||||
|
|
||||||
err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"})
|
_, err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatal("expected error due to decrypt failure")
|
t.Fatal("expected error due to decrypt failure")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ type Checker interface {
|
|||||||
// NotifySender delivers a status-change event to a project's notification
|
// NotifySender delivers a status-change event to a project's notification
|
||||||
// channels. internal/notify.Dispatcher satisfies this.
|
// channels. internal/notify.Dispatcher satisfies this.
|
||||||
type NotifySender interface {
|
type NotifySender interface {
|
||||||
Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) error
|
Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) ([]notify.ChannelResult, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scheduler drives periodic domain checks for every due project schedule.
|
// Scheduler drives periodic domain checks for every due project schedule.
|
||||||
@@ -172,10 +172,17 @@ func (s *Scheduler) checkDomain(ctx context.Context, projectID uuid.UUID, d stor
|
|||||||
Summary: summarize(newStatus, cs, checkErr),
|
Summary: summarize(newStatus, cs, checkErr),
|
||||||
At: now,
|
At: now,
|
||||||
}
|
}
|
||||||
if err := s.notifier.Send(ctx, projectID, ev); err != nil {
|
results, err := s.notifier.Send(ctx, projectID, ev)
|
||||||
|
if err != nil {
|
||||||
log.Printf("scheduler: notify send for project %s domain %s failed: %v", projectID, d.ID, err)
|
log.Printf("scheduler: notify send for project %s domain %s failed: %v", projectID, d.ID, err)
|
||||||
}
|
}
|
||||||
s.metrics.IncNotification("dispatch", newStatus)
|
for _, r := range results {
|
||||||
|
status := "sent"
|
||||||
|
if r.Err != nil {
|
||||||
|
status = "failed"
|
||||||
|
}
|
||||||
|
s.metrics.IncNotification(r.Type, status)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return newStatus
|
return newStatus
|
||||||
|
|||||||
@@ -94,17 +94,21 @@ func (c *mockChecker) Check(ctx context.Context, projectID, domainID uuid.UUID)
|
|||||||
return c.results[domainID], nil
|
return c.results[domainID], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// mockNotifier records every Event it is asked to Send.
|
// mockNotifier records every Event it is asked to Send, and returns a
|
||||||
|
// canned set of per-channel results (results defaults to nil, i.e. no
|
||||||
|
// channels) so tests can assert on scheduler.checkDomain's per-channel
|
||||||
|
// metric recording.
|
||||||
type mockNotifier struct {
|
type mockNotifier struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
events []notify.Event
|
events []notify.Event
|
||||||
|
results []notify.ChannelResult
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *mockNotifier) Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) error {
|
func (n *mockNotifier) Send(ctx context.Context, projectID uuid.UUID, ev notify.Event) ([]notify.ChannelResult, error) {
|
||||||
n.mu.Lock()
|
n.mu.Lock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.Unlock()
|
||||||
n.events = append(n.events, ev)
|
n.events = append(n.events, ev)
|
||||||
return nil
|
return n.results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *mockNotifier) count() int {
|
func (n *mockNotifier) count() int {
|
||||||
@@ -291,6 +295,45 @@ func TestRunOnce_SkipsDomainWithoutTemplate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestRunOnce_RecordsPerChannelNotificationMetrics exercises the fix for the
|
||||||
|
// "IncNotification('dispatch', newStatus) unconditionally" bug: the
|
||||||
|
// scheduler must record one NotificationsTotal increment per channel result,
|
||||||
|
// labeled by that channel's actual type and its actual sent/failed outcome
|
||||||
|
// — not a single "dispatch" placeholder blind to per-channel delivery.
|
||||||
|
func TestRunOnce_RecordsPerChannelNotificationMetrics(t *testing.T) {
|
||||||
|
projectID := uuid.New()
|
||||||
|
templateID := uuid.New()
|
||||||
|
domainA := store.Domain{ID: uuid.New(), ProjectID: projectID, TemplateID: &templateID}
|
||||||
|
|
||||||
|
st := newMockStore()
|
||||||
|
st.schedules = []store.Schedule{{ID: uuid.New(), ProjectID: projectID, IntervalSeconds: 3600, Enabled: true}}
|
||||||
|
st.domains[projectID] = []store.Domain{domainA}
|
||||||
|
|
||||||
|
checker := &mockChecker{
|
||||||
|
results: map[uuid.UUID]diff.Changeset{domainA.ID: driftChangeset()},
|
||||||
|
}
|
||||||
|
notifier := &mockNotifier{
|
||||||
|
results: []notify.ChannelResult{
|
||||||
|
{Type: "telegram"},
|
||||||
|
{Type: "webhook", Err: errors.New("x")},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
m := metrics.New()
|
||||||
|
sched := New(st, checker, notifier, m)
|
||||||
|
|
||||||
|
// unknown -> drift: shouldNotify is true, so notifier.Send fires once.
|
||||||
|
if err := sched.RunOnce(context.Background(), time.Now()); err != nil {
|
||||||
|
t.Fatalf("RunOnce: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := testutil.ToFloat64(m.NotificationsTotal.WithLabelValues("telegram", "sent")); got != 1 {
|
||||||
|
t.Fatalf("NotificationsTotal{telegram,sent} = %v, want 1", got)
|
||||||
|
}
|
||||||
|
if got := testutil.ToFloat64(m.NotificationsTotal.WithLabelValues("webhook", "failed")); got != 1 {
|
||||||
|
t.Fatalf("NotificationsTotal{webhook,failed} = %v, want 1", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestShouldNotify(t *testing.T) {
|
func TestShouldNotify(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
|
|||||||
Reference in New Issue
Block a user