feat(notify): Telegram/Webhook нотификаторы + Dispatcher по каналам проекта
This commit is contained in:
@@ -0,0 +1,75 @@
|
|||||||
|
package notify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/vasyakrg/dns-autoresolver/internal/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ChannelStore is the narrow store dependency Dispatcher needs: the set of
|
||||||
|
// enabled notification channels for a project.
|
||||||
|
type ChannelStore interface {
|
||||||
|
ListEnabledChannels(ctx context.Context, projectID uuid.UUID) ([]store.Channel, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decryptor decrypts a channel's stored secret (bot token, signing key, ...).
|
||||||
|
type Decryptor interface {
|
||||||
|
Decrypt(enc string) ([]byte, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dispatcher fans an Event out to every enabled channel of a project,
|
||||||
|
// picking the Notifier implementation by channel type. A failure on one
|
||||||
|
// channel does not stop delivery to the others; all errors are aggregated
|
||||||
|
// via errors.Join.
|
||||||
|
type Dispatcher struct {
|
||||||
|
store ChannelStore
|
||||||
|
cipher Decryptor
|
||||||
|
byType map[string]Notifier
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDispatcher builds a Dispatcher wired with the default Telegram and
|
||||||
|
// Webhook notifiers.
|
||||||
|
func NewDispatcher(store ChannelStore, cipher Decryptor) *Dispatcher {
|
||||||
|
return &Dispatcher{
|
||||||
|
store: store,
|
||||||
|
cipher: cipher,
|
||||||
|
byType: map[string]Notifier{
|
||||||
|
"telegram": &Telegram{BaseURL: "https://api.telegram.org", HTTP: &http.Client{Timeout: 15 * time.Second}},
|
||||||
|
"webhook": &Webhook{HTTP: &http.Client{Timeout: 15 * time.Second}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send delivers ev to every enabled channel of projectID. Errors from
|
||||||
|
// individual channels are aggregated (via errors.Join) rather than aborting
|
||||||
|
// delivery to the remaining channels.
|
||||||
|
func (d *Dispatcher) Send(ctx context.Context, projectID uuid.UUID, ev Event) error {
|
||||||
|
channels, err := d.store.ListEnabledChannels(ctx, projectID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var errs []error
|
||||||
|
for _, ch := range channels {
|
||||||
|
n, ok := d.byType[ch.Type]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
secret := ""
|
||||||
|
if ch.SecretEnc != "" {
|
||||||
|
b, err := d.cipher.Decrypt(ch.SecretEnc)
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
secret = string(b)
|
||||||
|
}
|
||||||
|
if err := n.Send(ctx, ch.Config, secret, ev); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errors.Join(errs...)
|
||||||
|
}
|
||||||
@@ -0,0 +1,27 @@
|
|||||||
|
// Package notify sends drift/error notifications to project-configured
|
||||||
|
// channels (Telegram, generic webhooks, ...). Notifier implementations must
|
||||||
|
// never log the secret they receive (bot tokens, HMAC keys, etc.).
|
||||||
|
package notify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Event describes a single notification-worthy occurrence for a domain
|
||||||
|
// belonging to a project (e.g. a status change detected by the scheduler).
|
||||||
|
type Event struct {
|
||||||
|
Project string
|
||||||
|
Domain string
|
||||||
|
Status string
|
||||||
|
Summary string
|
||||||
|
At time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notifier delivers an Event to a channel described by cfg (channel-type
|
||||||
|
// specific JSON config) and secret (decrypted credential, e.g. a bot token).
|
||||||
|
// Implementations must not log secret.
|
||||||
|
type Notifier interface {
|
||||||
|
Send(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error
|
||||||
|
}
|
||||||
@@ -0,0 +1,217 @@
|
|||||||
|
package notify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/vasyakrg/dns-autoresolver/internal/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTelegramSendSuccess(t *testing.T) {
|
||||||
|
var gotPath string
|
||||||
|
var gotBody map[string]string
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
gotPath = r.URL.Path
|
||||||
|
_ = json.NewDecoder(r.Body).Decode(&gotBody)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
tg := &Telegram{BaseURL: srv.URL, HTTP: srv.Client()}
|
||||||
|
ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "A record changed", At: time.Now()}
|
||||||
|
|
||||||
|
err := tg.Send(context.Background(), json.RawMessage(`{"chat_id":"12345"}`), "sekret-token", ev)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if gotPath != "/botsekret-token/sendMessage" {
|
||||||
|
t.Fatalf("unexpected path: %s", gotPath)
|
||||||
|
}
|
||||||
|
if gotBody["chat_id"] != "12345" {
|
||||||
|
t.Fatalf("unexpected chat_id: %+v", gotBody)
|
||||||
|
}
|
||||||
|
if !strings.Contains(gotBody["text"], "example.com") || !strings.Contains(gotBody["text"], "drift") {
|
||||||
|
t.Fatalf("unexpected text: %+v", gotBody)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTelegramSendServerError(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
tg := &Telegram{BaseURL: srv.URL, HTTP: srv.Client()}
|
||||||
|
ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "x", At: time.Now()}
|
||||||
|
|
||||||
|
if err := tg.Send(context.Background(), json.RawMessage(`{"chat_id":"1"}`), "tok", ev); err == nil {
|
||||||
|
t.Fatal("expected error on 500 response")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWebhookSendSuccess(t *testing.T) {
|
||||||
|
var gotEvent Event
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("unexpected method: %s", r.Method)
|
||||||
|
}
|
||||||
|
_ = json.NewDecoder(r.Body).Decode(&gotEvent)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
wh := &Webhook{HTTP: srv.Client()}
|
||||||
|
ev := Event{Project: "proj", Domain: "example.com", Status: "in_sync", Summary: "resolved", At: time.Now()}
|
||||||
|
|
||||||
|
cfg, _ := json.Marshal(map[string]string{"url": srv.URL})
|
||||||
|
if err := wh.Send(context.Background(), cfg, "", ev); err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if gotEvent.Domain != "example.com" || gotEvent.Status != "in_sync" {
|
||||||
|
t.Fatalf("unexpected event delivered: %+v", gotEvent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWebhookSendNonSuccessStatus(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
wh := &Webhook{HTTP: srv.Client()}
|
||||||
|
ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "x", At: time.Now()}
|
||||||
|
cfg, _ := json.Marshal(map[string]string{"url": srv.URL})
|
||||||
|
|
||||||
|
if err := wh.Send(context.Background(), cfg, "", ev); err == nil {
|
||||||
|
t.Fatal("expected error on 400 response")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Dispatcher ---
|
||||||
|
|
||||||
|
type mockChannelStore struct {
|
||||||
|
channels []store.Channel
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockChannelStore) ListEnabledChannels(ctx context.Context, projectID uuid.UUID) ([]store.Channel, error) {
|
||||||
|
return m.channels, m.err
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockDecryptor struct {
|
||||||
|
fail bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockDecryptor) Decrypt(enc string) ([]byte, error) {
|
||||||
|
if m.fail {
|
||||||
|
return nil, errBoom
|
||||||
|
}
|
||||||
|
return []byte("decrypted-" + enc), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var errBoom = &boomErr{}
|
||||||
|
|
||||||
|
type boomErr struct{}
|
||||||
|
|
||||||
|
func (*boomErr) Error() string { return "decrypt boom" }
|
||||||
|
|
||||||
|
func TestDispatcherSendsToAllChannelsAndAggregatesErrors(t *testing.T) {
|
||||||
|
var tgCalled, whCalled bool
|
||||||
|
var tgSecret string
|
||||||
|
|
||||||
|
tgSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
tgCalled = true
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer tgSrv.Close()
|
||||||
|
|
||||||
|
whSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
whCalled = true
|
||||||
|
w.WriteHeader(http.StatusInternalServerError) // webhook fails
|
||||||
|
}))
|
||||||
|
defer whSrv.Close()
|
||||||
|
|
||||||
|
projectID := uuid.New()
|
||||||
|
channels := []store.Channel{
|
||||||
|
{
|
||||||
|
ID: uuid.New(), ProjectID: projectID, Type: "telegram",
|
||||||
|
Config: json.RawMessage(`{"chat_id":"1"}`), SecretEnc: "enc-token", Enabled: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: uuid.New(), ProjectID: projectID, Type: "webhook",
|
||||||
|
Config: json.RawMessage(`{"url":"` + whSrv.URL + `"}`), SecretEnc: "", Enabled: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{})
|
||||||
|
// Redirect telegram to the httptest server and capture the decrypted secret.
|
||||||
|
d.byType["telegram"] = notifierFunc(func(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error {
|
||||||
|
tgSecret = secret
|
||||||
|
tg := &Telegram{BaseURL: tgSrv.URL, HTTP: tgSrv.Client()}
|
||||||
|
return tg.Send(ctx, cfg, secret, ev)
|
||||||
|
})
|
||||||
|
|
||||||
|
ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "changed", At: time.Now()}
|
||||||
|
err := d.Send(context.Background(), projectID, ev)
|
||||||
|
|
||||||
|
if !tgCalled {
|
||||||
|
t.Error("expected telegram notifier to be called")
|
||||||
|
}
|
||||||
|
if !whCalled {
|
||||||
|
t.Error("expected webhook notifier to be called")
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected aggregated error because webhook failed")
|
||||||
|
}
|
||||||
|
if tgSecret != "decrypted-enc-token" {
|
||||||
|
t.Fatalf("expected decrypted secret to be passed to telegram, got %q", tgSecret)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDispatcherSkipsUnknownChannelType(t *testing.T) {
|
||||||
|
projectID := uuid.New()
|
||||||
|
channels := []store.Channel{
|
||||||
|
{ID: uuid.New(), ProjectID: projectID, Type: "carrier-pigeon", Config: json.RawMessage(`{}`), Enabled: true},
|
||||||
|
}
|
||||||
|
d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{})
|
||||||
|
if err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"}); err != nil {
|
||||||
|
t.Fatalf("unexpected error for unknown channel type: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDispatcherDecryptFailureIsAggregatedNotFatal(t *testing.T) {
|
||||||
|
var whCalled bool
|
||||||
|
whSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
whCalled = true
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer whSrv.Close()
|
||||||
|
|
||||||
|
projectID := uuid.New()
|
||||||
|
channels := []store.Channel{
|
||||||
|
{ID: uuid.New(), ProjectID: projectID, Type: "telegram", Config: json.RawMessage(`{"chat_id":"1"}`), SecretEnc: "enc", Enabled: true},
|
||||||
|
{ID: uuid.New(), ProjectID: projectID, Type: "webhook", Config: json.RawMessage(`{"url":"` + whSrv.URL + `"}`), Enabled: true},
|
||||||
|
}
|
||||||
|
d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{fail: true})
|
||||||
|
|
||||||
|
err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error due to decrypt failure")
|
||||||
|
}
|
||||||
|
if !whCalled {
|
||||||
|
t.Error("expected webhook channel to still be attempted after telegram decrypt failure")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// notifierFunc adapts a function to the Notifier interface for tests.
|
||||||
|
type notifierFunc func(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error
|
||||||
|
|
||||||
|
func (f notifierFunc) Send(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error {
|
||||||
|
return f(ctx, cfg, secret, ev)
|
||||||
|
}
|
||||||
@@ -0,0 +1,45 @@
|
|||||||
|
package notify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Telegram delivers notifications via the Telegram Bot API sendMessage
|
||||||
|
// endpoint. Config is {"chat_id": "..."}; secret is the bot token and is
|
||||||
|
// never logged.
|
||||||
|
type Telegram struct {
|
||||||
|
BaseURL string
|
||||||
|
HTTP *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Telegram) Send(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error {
|
||||||
|
var c struct {
|
||||||
|
ChatID string `json:"chat_id"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(cfg, &c); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
body, _ := json.Marshal(map[string]string{
|
||||||
|
"chat_id": c.ChatID,
|
||||||
|
"text": fmt.Sprintf("[%s] %s → %s\n%s", ev.Project, ev.Domain, ev.Status, ev.Summary),
|
||||||
|
})
|
||||||
|
url := t.BaseURL + "/bot" + secret + "/sendMessage"
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
resp, err := t.HTTP.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode >= 300 {
|
||||||
|
return fmt.Errorf("telegram: status %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,43 @@
|
|||||||
|
package notify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Webhook delivers notifications as a JSON POST of the Event to a
|
||||||
|
// project-configured URL. Config is {"url": "..."}. secret is currently
|
||||||
|
// unused (reserved for future request signing) and is never logged.
|
||||||
|
type Webhook struct {
|
||||||
|
HTTP *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Webhook) Send(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error {
|
||||||
|
var c struct {
|
||||||
|
URL string `json:"url"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(cfg, &c); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
body, err := json.Marshal(ev)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.URL, bytes.NewReader(body))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
resp, err := w.HTTP.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return fmt.Errorf("webhook: status %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user