diff --git a/internal/wshub/wshub.go b/internal/wshub/wshub.go new file mode 100644 index 0000000..ba67cb7 --- /dev/null +++ b/internal/wshub/wshub.go @@ -0,0 +1,57 @@ +package wshub + +import "sync" + +type Event struct { + Type string `json:"type"` + TaskID int64 `json:"task_id"` + Data any `json:"data,omitempty"` +} + +type Hub struct { + mu sync.Mutex + nextID int64 + subs map[int64]map[int64]chan Event // taskID -> subID -> ch +} + +func New() *Hub { + return &Hub{subs: make(map[int64]map[int64]chan Event)} +} + +func (h *Hub) Subscribe(taskID int64) (int64, <-chan Event) { + h.mu.Lock() + defer h.mu.Unlock() + h.nextID++ + id := h.nextID + ch := make(chan Event, 64) + if h.subs[taskID] == nil { + h.subs[taskID] = make(map[int64]chan Event) + } + h.subs[taskID][id] = ch + return id, ch +} + +func (h *Hub) Unsubscribe(taskID, id int64) { + h.mu.Lock() + defer h.mu.Unlock() + if m := h.subs[taskID]; m != nil { + if ch, ok := m[id]; ok { + close(ch) + delete(m, id) + } + if len(m) == 0 { + delete(h.subs, taskID) + } + } +} + +func (h *Hub) Publish(ev Event) { + h.mu.Lock() + defer h.mu.Unlock() + for _, ch := range h.subs[ev.TaskID] { + select { + case ch <- ev: + default: // медленный подписчик — событие дропаем, не блокируем воркер + } + } +} diff --git a/internal/wshub/wshub_test.go b/internal/wshub/wshub_test.go new file mode 100644 index 0000000..36bbea4 --- /dev/null +++ b/internal/wshub/wshub_test.go @@ -0,0 +1,31 @@ +package wshub + +import ( + "testing" + "time" +) + +func TestPublishReachesSubscriber(t *testing.T) { + h := New() + _, ch := h.Subscribe(7) + h.Publish(Event{Type: "progress", TaskID: 7, Data: map[string]int{"copied": 3}}) + select { + case ev := <-ch: + if ev.Type != "progress" || ev.TaskID != 7 { + t.Fatalf("bad event %+v", ev) + } + case <-time.After(time.Second): + t.Fatal("no event received") + } +} + +func TestPublishIsolatedByTask(t *testing.T) { + h := New() + _, ch := h.Subscribe(1) + h.Publish(Event{Type: "x", TaskID: 2}) + select { + case <-ch: + t.Fatal("subscriber for task 1 must not get task 2 event") + case <-time.After(100 * time.Millisecond): + } +}