fix(httpapi): detect ws client disconnect via CloseRead to prevent subscriber leak

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
This commit is contained in:
2026-07-01 18:47:54 +07:00
parent 9ec6acd414
commit 4c57848c35
3 changed files with 74 additions and 1 deletions
+4 -1
View File
@@ -25,7 +25,10 @@ func (s *Server) handleWS(w http.ResponseWriter, r *http.Request) {
subID, ch := s.hub.Subscribe(taskID) subID, ch := s.hub.Subscribe(taskID)
defer s.hub.Unsubscribe(taskID, subID) defer s.hub.Unsubscribe(taskID, subID)
ctx := r.Context() // websocket.Accept хайджекает соединение, поэтому r.Context() не отменяется
// при обрыве связи клиентом. CloseRead запускает фоновое чтение control-фреймов
// и отменяет возвращаемый контекст, когда соединение действительно умирает.
ctx := c.CloseRead(r.Context())
for { for {
select { select {
case ev, ok := <-ch: case ev, ok := <-ch:
+63
View File
@@ -0,0 +1,63 @@
package httpapi
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/coder/websocket"
"github.com/vasyansk/imap-copier/internal/config"
"github.com/vasyansk/imap-copier/internal/crypto"
"github.com/vasyansk/imap-copier/internal/wshub"
)
func TestWSRequiresAuth(t *testing.T) {
s := &Server{cfg: config.Config{SessionSecret: []byte("x")}, hub: wshub.New()}
srv := httptest.NewServer(s.Router())
defer srv.Close()
// no cookie -> upgrade rejected (401)
_, resp, err := websocket.Dial(context.Background(), "ws"+srv.URL[4:]+"/ws?task_id=1", nil)
if err == nil {
t.Fatal("expected auth rejection")
}
if resp != nil && resp.StatusCode != http.StatusUnauthorized {
t.Fatalf("want 401, got %d", resp.StatusCode)
}
}
func TestWSUnsubscribesOnClientDisconnect(t *testing.T) {
hub := wshub.New()
secret := []byte("sekret")
s := &Server{cfg: config.Config{AuthUser: "admin", SessionSecret: secret}, hub: hub}
srv := httptest.NewServer(s.Router())
defer srv.Close()
tok := crypto.SignSession(secret, "admin", time.Now().Add(time.Hour))
hdr := http.Header{}
hdr.Set("Cookie", cookieName+"="+tok)
ctx := context.Background()
c, _, err := websocket.Dial(ctx, "ws"+srv.URL[4:]+"/ws?task_id=7", &websocket.DialOptions{HTTPHeader: hdr})
if err != nil {
t.Fatalf("dial: %v", err)
}
// wait until subscribed
deadline := time.Now().Add(2 * time.Second)
for hub.SubscriberCount(7) == 0 {
if time.Now().After(deadline) {
t.Fatal("never subscribed")
}
time.Sleep(10 * time.Millisecond)
}
// abrupt client close -> server must detect and unsubscribe
c.CloseNow()
deadline = time.Now().Add(3 * time.Second)
for hub.SubscriberCount(7) != 0 {
if time.Now().After(deadline) {
t.Fatal("subscription leaked after client disconnect")
}
time.Sleep(20 * time.Millisecond)
}
}
+7
View File
@@ -45,6 +45,13 @@ func (h *Hub) Unsubscribe(taskID, id int64) {
} }
} }
// SubscriberCount returns the number of active subscribers for a task (for tests/metrics).
func (h *Hub) SubscriberCount(taskID int64) int {
h.mu.Lock()
defer h.mu.Unlock()
return len(h.subs[taskID])
}
func (h *Hub) Publish(ev Event) { func (h *Hub) Publish(ev Event) {
h.mu.Lock() h.mu.Lock()
defer h.mu.Unlock() defer h.mu.Unlock()