Files
dns-autoresolver/internal/notify/dispatch.go
T
vasyansk f14916396c feat(notify): per-channel delivery results + accurate notification metrics
Dispatcher.Send now returns []ChannelResult{Type, Err} alongside the
aggregated error, and scheduler.checkDomain increments
NotificationsTotal per channel type/status instead of a single
unconditional IncNotification("dispatch", newStatus) placeholder that
ignored per-channel delivery outcome.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01BwxdSt4reTm7Dj1oxRvpP3
2026-07-04 15:56:15 +07:00

114 lines
3.5 KiB
Go

package notify
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"github.com/google/uuid"
"github.com/vasyakrg/dns-autoresolver/internal/store"
)
// ChannelStore is the narrow store dependency Dispatcher needs: the set of
// enabled notification channels for a project.
type ChannelStore interface {
ListEnabledChannels(ctx context.Context, projectID uuid.UUID) ([]store.Channel, error)
}
// Decryptor decrypts a channel's stored secret (bot token, signing key, ...).
type Decryptor interface {
Decrypt(enc string) ([]byte, error)
}
// Dispatcher fans an Event out to every enabled channel of a project,
// picking the Notifier implementation by channel type. A failure on one
// channel does not stop delivery to the others; all errors are aggregated
// via errors.Join.
type Dispatcher struct {
store ChannelStore
cipher Decryptor
byType map[string]Notifier
}
// NewDispatcher builds a Dispatcher wired with the default Telegram and
// Webhook notifiers.
func NewDispatcher(store ChannelStore, cipher Decryptor) *Dispatcher {
return &Dispatcher{
store: store,
cipher: cipher,
byType: map[string]Notifier{
"telegram": &Telegram{BaseURL: "https://api.telegram.org", HTTP: &http.Client{Timeout: 15 * time.Second}},
"webhook": &Webhook{HTTP: &http.Client{
Timeout: 15 * time.Second,
Transport: newWebhookTransport(false),
}},
},
}
}
// ChannelResult is the per-channel delivery outcome, so callers can record
// success/failure metrics per channel type instead of one aggregate blob.
type ChannelResult struct {
Type string
Err error
}
// Send delivers ev to every enabled channel of projectID. Errors from
// individual channels are aggregated (via errors.Join) rather than aborting
// delivery to the remaining channels; the per-channel outcome is also
// returned so callers can record accurate per-channel/status metrics.
func (d *Dispatcher) Send(ctx context.Context, projectID uuid.UUID, ev Event) ([]ChannelResult, error) {
channels, err := d.store.ListEnabledChannels(ctx, projectID)
if err != nil {
return nil, err
}
var results []ChannelResult
var errs []error
for _, ch := range channels {
n, ok := d.byType[ch.Type]
if !ok {
continue
}
secret := ""
if ch.SecretEnc != "" {
b, derr := d.cipher.Decrypt(ch.SecretEnc)
if derr != nil {
errs = append(errs, derr)
results = append(results, ChannelResult{Type: ch.Type, Err: derr})
continue
}
secret = string(b)
}
serr := n.Send(ctx, ch.Config, secret, ev)
if serr != nil {
errs = append(errs, serr)
}
results = append(results, ChannelResult{Type: ch.Type, Err: serr})
}
return results, errors.Join(errs...)
}
// SendTest sends a single synthetic Event directly through the Notifier for
// channelType, bypassing project/channel lookup entirely. It satisfies
// api.TestSender and backs POST /channels/{cid}/test, letting a user verify
// a channel's bot_token/chat_id or webhook URL works before enabling the
// schedule — the api layer resolves the channel and decrypts its secret; this
// method only performs the actual delivery attempt.
func (d *Dispatcher) SendTest(ctx context.Context, channelType string, config json.RawMessage, secret string) error {
n, ok := d.byType[channelType]
if !ok {
return fmt.Errorf("notify: unknown channel type %q", channelType)
}
ev := Event{
Project: "test",
Domain: "test",
Status: "test",
Summary: "test notification",
At: time.Now(),
}
return n.Send(ctx, config, secret, ev)
}