From e82fb0b13d8c9606138bbb3a6ae806d61325f476 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Sat, 4 Jul 2026 13:19:21 +0700 Subject: [PATCH] =?UTF-8?q?feat(notify):=20Telegram/Webhook=20=D0=BD=D0=BE?= =?UTF-8?q?=D1=82=D0=B8=D1=84=D0=B8=D0=BA=D0=B0=D1=82=D0=BE=D1=80=D1=8B=20?= =?UTF-8?q?+=20Dispatcher=20=D0=BF=D0=BE=20=D0=BA=D0=B0=D0=BD=D0=B0=D0=BB?= =?UTF-8?q?=D0=B0=D0=BC=20=D0=BF=D1=80=D0=BE=D0=B5=D0=BA=D1=82=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/notify/dispatch.go | 75 ++++++++++++ internal/notify/notify.go | 27 ++++ internal/notify/notify_test.go | 217 +++++++++++++++++++++++++++++++++ internal/notify/telegram.go | 45 +++++++ internal/notify/webhook.go | 43 +++++++ 5 files changed, 407 insertions(+) create mode 100644 internal/notify/dispatch.go create mode 100644 internal/notify/notify.go create mode 100644 internal/notify/notify_test.go create mode 100644 internal/notify/telegram.go create mode 100644 internal/notify/webhook.go diff --git a/internal/notify/dispatch.go b/internal/notify/dispatch.go new file mode 100644 index 0000000..3d2329e --- /dev/null +++ b/internal/notify/dispatch.go @@ -0,0 +1,75 @@ +package notify + +import ( + "context" + "errors" + "net/http" + "time" + + "github.com/google/uuid" + "github.com/vasyakrg/dns-autoresolver/internal/store" +) + +// ChannelStore is the narrow store dependency Dispatcher needs: the set of +// enabled notification channels for a project. +type ChannelStore interface { + ListEnabledChannels(ctx context.Context, projectID uuid.UUID) ([]store.Channel, error) +} + +// Decryptor decrypts a channel's stored secret (bot token, signing key, ...). +type Decryptor interface { + Decrypt(enc string) ([]byte, error) +} + +// Dispatcher fans an Event out to every enabled channel of a project, +// picking the Notifier implementation by channel type. A failure on one +// channel does not stop delivery to the others; all errors are aggregated +// via errors.Join. +type Dispatcher struct { + store ChannelStore + cipher Decryptor + byType map[string]Notifier +} + +// NewDispatcher builds a Dispatcher wired with the default Telegram and +// Webhook notifiers. +func NewDispatcher(store ChannelStore, cipher Decryptor) *Dispatcher { + return &Dispatcher{ + store: store, + cipher: cipher, + byType: map[string]Notifier{ + "telegram": &Telegram{BaseURL: "https://api.telegram.org", HTTP: &http.Client{Timeout: 15 * time.Second}}, + "webhook": &Webhook{HTTP: &http.Client{Timeout: 15 * time.Second}}, + }, + } +} + +// Send delivers ev to every enabled channel of projectID. Errors from +// individual channels are aggregated (via errors.Join) rather than aborting +// delivery to the remaining channels. +func (d *Dispatcher) Send(ctx context.Context, projectID uuid.UUID, ev Event) error { + channels, err := d.store.ListEnabledChannels(ctx, projectID) + if err != nil { + return err + } + var errs []error + for _, ch := range channels { + n, ok := d.byType[ch.Type] + if !ok { + continue + } + secret := "" + if ch.SecretEnc != "" { + b, err := d.cipher.Decrypt(ch.SecretEnc) + if err != nil { + errs = append(errs, err) + continue + } + secret = string(b) + } + if err := n.Send(ctx, ch.Config, secret, ev); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} diff --git a/internal/notify/notify.go b/internal/notify/notify.go new file mode 100644 index 0000000..cc1e510 --- /dev/null +++ b/internal/notify/notify.go @@ -0,0 +1,27 @@ +// Package notify sends drift/error notifications to project-configured +// channels (Telegram, generic webhooks, ...). Notifier implementations must +// never log the secret they receive (bot tokens, HMAC keys, etc.). +package notify + +import ( + "context" + "encoding/json" + "time" +) + +// Event describes a single notification-worthy occurrence for a domain +// belonging to a project (e.g. a status change detected by the scheduler). +type Event struct { + Project string + Domain string + Status string + Summary string + At time.Time +} + +// Notifier delivers an Event to a channel described by cfg (channel-type +// specific JSON config) and secret (decrypted credential, e.g. a bot token). +// Implementations must not log secret. +type Notifier interface { + Send(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error +} diff --git a/internal/notify/notify_test.go b/internal/notify/notify_test.go new file mode 100644 index 0000000..835d4b4 --- /dev/null +++ b/internal/notify/notify_test.go @@ -0,0 +1,217 @@ +package notify + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/google/uuid" + "github.com/vasyakrg/dns-autoresolver/internal/store" +) + +func TestTelegramSendSuccess(t *testing.T) { + var gotPath string + var gotBody map[string]string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotPath = r.URL.Path + _ = json.NewDecoder(r.Body).Decode(&gotBody) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + tg := &Telegram{BaseURL: srv.URL, HTTP: srv.Client()} + ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "A record changed", At: time.Now()} + + err := tg.Send(context.Background(), json.RawMessage(`{"chat_id":"12345"}`), "sekret-token", ev) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gotPath != "/botsekret-token/sendMessage" { + t.Fatalf("unexpected path: %s", gotPath) + } + if gotBody["chat_id"] != "12345" { + t.Fatalf("unexpected chat_id: %+v", gotBody) + } + if !strings.Contains(gotBody["text"], "example.com") || !strings.Contains(gotBody["text"], "drift") { + t.Fatalf("unexpected text: %+v", gotBody) + } +} + +func TestTelegramSendServerError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + tg := &Telegram{BaseURL: srv.URL, HTTP: srv.Client()} + ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "x", At: time.Now()} + + if err := tg.Send(context.Background(), json.RawMessage(`{"chat_id":"1"}`), "tok", ev); err == nil { + t.Fatal("expected error on 500 response") + } +} + +func TestWebhookSendSuccess(t *testing.T) { + var gotEvent Event + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("unexpected method: %s", r.Method) + } + _ = json.NewDecoder(r.Body).Decode(&gotEvent) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + wh := &Webhook{HTTP: srv.Client()} + ev := Event{Project: "proj", Domain: "example.com", Status: "in_sync", Summary: "resolved", At: time.Now()} + + cfg, _ := json.Marshal(map[string]string{"url": srv.URL}) + if err := wh.Send(context.Background(), cfg, "", ev); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gotEvent.Domain != "example.com" || gotEvent.Status != "in_sync" { + t.Fatalf("unexpected event delivered: %+v", gotEvent) + } +} + +func TestWebhookSendNonSuccessStatus(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + })) + defer srv.Close() + + wh := &Webhook{HTTP: srv.Client()} + ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "x", At: time.Now()} + cfg, _ := json.Marshal(map[string]string{"url": srv.URL}) + + if err := wh.Send(context.Background(), cfg, "", ev); err == nil { + t.Fatal("expected error on 400 response") + } +} + +// --- Dispatcher --- + +type mockChannelStore struct { + channels []store.Channel + err error +} + +func (m *mockChannelStore) ListEnabledChannels(ctx context.Context, projectID uuid.UUID) ([]store.Channel, error) { + return m.channels, m.err +} + +type mockDecryptor struct { + fail bool +} + +func (m *mockDecryptor) Decrypt(enc string) ([]byte, error) { + if m.fail { + return nil, errBoom + } + return []byte("decrypted-" + enc), nil +} + +var errBoom = &boomErr{} + +type boomErr struct{} + +func (*boomErr) Error() string { return "decrypt boom" } + +func TestDispatcherSendsToAllChannelsAndAggregatesErrors(t *testing.T) { + var tgCalled, whCalled bool + var tgSecret string + + tgSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tgCalled = true + w.WriteHeader(http.StatusOK) + })) + defer tgSrv.Close() + + whSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + whCalled = true + w.WriteHeader(http.StatusInternalServerError) // webhook fails + })) + defer whSrv.Close() + + projectID := uuid.New() + channels := []store.Channel{ + { + ID: uuid.New(), ProjectID: projectID, Type: "telegram", + Config: json.RawMessage(`{"chat_id":"1"}`), SecretEnc: "enc-token", Enabled: true, + }, + { + ID: uuid.New(), ProjectID: projectID, Type: "webhook", + Config: json.RawMessage(`{"url":"` + whSrv.URL + `"}`), SecretEnc: "", Enabled: true, + }, + } + + d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{}) + // Redirect telegram to the httptest server and capture the decrypted secret. + d.byType["telegram"] = notifierFunc(func(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error { + tgSecret = secret + tg := &Telegram{BaseURL: tgSrv.URL, HTTP: tgSrv.Client()} + return tg.Send(ctx, cfg, secret, ev) + }) + + ev := Event{Project: "proj", Domain: "example.com", Status: "drift", Summary: "changed", At: time.Now()} + err := d.Send(context.Background(), projectID, ev) + + if !tgCalled { + t.Error("expected telegram notifier to be called") + } + if !whCalled { + t.Error("expected webhook notifier to be called") + } + if err == nil { + t.Fatal("expected aggregated error because webhook failed") + } + if tgSecret != "decrypted-enc-token" { + t.Fatalf("expected decrypted secret to be passed to telegram, got %q", tgSecret) + } +} + +func TestDispatcherSkipsUnknownChannelType(t *testing.T) { + projectID := uuid.New() + channels := []store.Channel{ + {ID: uuid.New(), ProjectID: projectID, Type: "carrier-pigeon", Config: json.RawMessage(`{}`), Enabled: true}, + } + d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{}) + if err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"}); err != nil { + t.Fatalf("unexpected error for unknown channel type: %v", err) + } +} + +func TestDispatcherDecryptFailureIsAggregatedNotFatal(t *testing.T) { + var whCalled bool + whSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + whCalled = true + w.WriteHeader(http.StatusOK) + })) + defer whSrv.Close() + + projectID := uuid.New() + channels := []store.Channel{ + {ID: uuid.New(), ProjectID: projectID, Type: "telegram", Config: json.RawMessage(`{"chat_id":"1"}`), SecretEnc: "enc", Enabled: true}, + {ID: uuid.New(), ProjectID: projectID, Type: "webhook", Config: json.RawMessage(`{"url":"` + whSrv.URL + `"}`), Enabled: true}, + } + d := NewDispatcher(&mockChannelStore{channels: channels}, &mockDecryptor{fail: true}) + + err := d.Send(context.Background(), projectID, Event{Project: "p", Domain: "d", Status: "drift"}) + if err == nil { + t.Fatal("expected error due to decrypt failure") + } + if !whCalled { + t.Error("expected webhook channel to still be attempted after telegram decrypt failure") + } +} + +// notifierFunc adapts a function to the Notifier interface for tests. +type notifierFunc func(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error + +func (f notifierFunc) Send(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error { + return f(ctx, cfg, secret, ev) +} diff --git a/internal/notify/telegram.go b/internal/notify/telegram.go new file mode 100644 index 0000000..57d85b0 --- /dev/null +++ b/internal/notify/telegram.go @@ -0,0 +1,45 @@ +package notify + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" +) + +// Telegram delivers notifications via the Telegram Bot API sendMessage +// endpoint. Config is {"chat_id": "..."}; secret is the bot token and is +// never logged. +type Telegram struct { + BaseURL string + HTTP *http.Client +} + +func (t *Telegram) Send(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error { + var c struct { + ChatID string `json:"chat_id"` + } + if err := json.Unmarshal(cfg, &c); err != nil { + return err + } + body, _ := json.Marshal(map[string]string{ + "chat_id": c.ChatID, + "text": fmt.Sprintf("[%s] %s → %s\n%s", ev.Project, ev.Domain, ev.Status, ev.Summary), + }) + url := t.BaseURL + "/bot" + secret + "/sendMessage" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := t.HTTP.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + return fmt.Errorf("telegram: status %d", resp.StatusCode) + } + return nil +} diff --git a/internal/notify/webhook.go b/internal/notify/webhook.go new file mode 100644 index 0000000..06e62c2 --- /dev/null +++ b/internal/notify/webhook.go @@ -0,0 +1,43 @@ +package notify + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" +) + +// Webhook delivers notifications as a JSON POST of the Event to a +// project-configured URL. Config is {"url": "..."}. secret is currently +// unused (reserved for future request signing) and is never logged. +type Webhook struct { + HTTP *http.Client +} + +func (w *Webhook) Send(ctx context.Context, cfg json.RawMessage, secret string, ev Event) error { + var c struct { + URL string `json:"url"` + } + if err := json.Unmarshal(cfg, &c); err != nil { + return err + } + body, err := json.Marshal(ev) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.URL, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := w.HTTP.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("webhook: status %d", resp.StatusCode) + } + return nil +}