diff --git a/DOCS/RUNNING.md b/DOCS/RUNNING.md index f42c8f4..c36a1ca 100644 --- a/DOCS/RUNNING.md +++ b/DOCS/RUNNING.md @@ -150,6 +150,16 @@ S shutdown 4. **Нативные уведомления:** сверни/расфокусь окно GUI, доведи панель до `done/wait/error` → придёт macOS-уведомление (первый раз спросит разрешение). Клик по записи в **Event Center** фокусит панель. 5. **Авто-unread:** событие статуса в **неактивном** воркспейсе ставит ему синюю точку «не забыть»; выбор воркспейса снимает. +### SP2 — персистентный журнал событий / read-model +1. Доведи панель до `done`/`error` (или `spacesh notify --surface --state error`). В Event Center появляется запись; бейдж на `bell` (в топ-баре) растёт. +2. **Перезапусти GUI** (демон жив): лента на месте — она берётся из демона, не из памяти GUI. +3. **Холодный рестарт демона** (`spacesh shutdown`, затем снова открой GUI): лента всё ещё на месте — восстановлена из `~/.spacesh/events.json`. +4. Клик по записи (или фокус её панели) помечает её прочитанной — запись тускнеет, бейдж уменьшается. `Mark all read` гасит бейдж полностью. +5. Табы фильтруют по реальным данным: `Unread` — только непрочитанные, `Errors` — только события `error`. +6. **Явное закрытие панели не логируется** (это намеренно — пользователь сам её закрыл); в журнал попадают только сами-завершившиеся/упавшие процессы и переходы статуса `done/wait/error`. + +Файл журнала: `~/.spacesh/events.json` (кольцо на 1000 записей, атомарная запись + corrupt-backup, как у `state.json`). + ### M4 — CLI - `spacesh status --json` против живого демона; `spacesh notify` без демона → молча `exit 0`; `spacesh completions zsh` печатает скрипт. @@ -191,7 +201,7 @@ rm -rf ~/.spacesh # сбрасывает сокет, лок, state.json, - **Playwright/headless-браузер** видит только Vite-фронт (`npm run dev`, :1420) — Tauri-IPC там недоступен, живой daemon-флоу не тестируется. Полный e2e — только `npm run tauri dev` на дисплее. - **OSC 133 — только zsh** (через `ZDOTDIR`); bash/fish работают на fallback-эвристике. - **Клик по нативному уведомлению** не фокусит конкретную панель (клик по записи в Event Center — фокусит). -- **Event Center** — плоская лента (вкладки All/Unread/Errors пока без фильтра); живёт в памяти GUI, очищается при перезапуске GUI. +- **Event Center** — лента хранится в демоне и персистируется в `~/.spacesh/events.json` (переживает перезапуск GUI и холодный рестарт демона). Вкладки `Unread`/`Errors` и бейдж `bell` работают по реальным данным (флаги прочтения на уровне события). По-прежнему не реализованы: каналы Telegram/MAX в футере Event Center (SP5), а также `search`/`settings` и меню аккаунта в топ-баре. - **Статус эфемерен** (work/wait/done/error/idle) — не персистится; после холодного рестарта демона панель `stopped`, статус `idle`. - Авторизация / личный кабинет / внешние нотификации (Telegram/MAX) / зум / поиск по скроллбэку / diff-вьюер / remote — **не реализованы** (M5/M6/auth, см. `DOCS/MAIN.md`). diff --git a/app/package-lock.json b/app/package-lock.json index ce96560..3ed1d59 100644 --- a/app/package-lock.json +++ b/app/package-lock.json @@ -8,10 +8,13 @@ "name": "spacesh-app", "version": "0.1.0", "dependencies": { + "@fontsource-variable/jetbrains-mono": "^5.2.8", + "@fontsource/inter": "^5.2.8", "@tauri-apps/api": "^2", "@tauri-apps/plugin-notification": "^2", "@xterm/addon-webgl": "^0.18.0", "@xterm/xterm": "^5.5.0", + "lucide-react": "^1.17.0", "react": "^18.3.1", "react-dom": "^18.3.1" }, @@ -697,6 +700,24 @@ "node": ">=12" } }, + "node_modules/@fontsource-variable/jetbrains-mono": { + "version": "5.2.8", + "resolved": "https://registry.npmjs.org/@fontsource-variable/jetbrains-mono/-/jetbrains-mono-5.2.8.tgz", + "integrity": "sha512-WBA9elru6Jdp5df2mES55wuOO0WIrn3kpXnI4+W2ek5u3ZgLS9XS4gmIlcQhiZOWEKl95meYdvK7xI+ETLCq/Q==", + "license": "OFL-1.1", + "funding": { + "url": "https://github.com/sponsors/ayuhito" + } + }, + "node_modules/@fontsource/inter": { + "version": "5.2.8", + "resolved": "https://registry.npmjs.org/@fontsource/inter/-/inter-5.2.8.tgz", + "integrity": "sha512-P6r5WnJoKiNVV+zvW2xM13gNdFhAEpQ9dQJHt3naLvfg+LkF2ldgSLiF4T41lf1SQCM9QmkqPTn4TH568IRagg==", + "license": "OFL-1.1", + "funding": { + "url": "https://github.com/sponsors/ayuhito" + } + }, "node_modules/@jridgewell/gen-mapping": { "version": "0.3.13", "resolved": "https://registry.npmjs.org/@jridgewell/gen-mapping/-/gen-mapping-0.3.13.tgz", @@ -1691,6 +1712,15 @@ "yallist": "^3.0.2" } }, + "node_modules/lucide-react": { + "version": "1.17.0", + "resolved": "https://registry.npmjs.org/lucide-react/-/lucide-react-1.17.0.tgz", + "integrity": "sha512-9FA9evdox/JQL5PT57fdA1x/yg8T7knJ98+zjTL3UfKza6pflQUUh3XtaQIHKvnsJw1lmsEyHVlt5jchYxOQ5w==", + "license": "ISC", + "peerDependencies": { + "react": "^16.5.1 || ^17.0.0 || ^18.0.0 || ^19.0.0" + } + }, "node_modules/ms": { "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", diff --git a/app/package.json b/app/package.json index f916090..62e4672 100644 --- a/app/package.json +++ b/app/package.json @@ -9,10 +9,13 @@ "tauri": "tauri" }, "dependencies": { + "@fontsource-variable/jetbrains-mono": "^5.2.8", + "@fontsource/inter": "^5.2.8", "@tauri-apps/api": "^2", "@tauri-apps/plugin-notification": "^2", - "@xterm/xterm": "^5.5.0", "@xterm/addon-webgl": "^0.18.0", + "@xterm/xterm": "^5.5.0", + "lucide-react": "^1.17.0", "react": "^18.3.1", "react-dom": "^18.3.1" }, diff --git a/app/src-tauri/src/bridge.rs b/app/src-tauri/src/bridge.rs index 5dae7b0..8f6e5b8 100644 --- a/app/src-tauri/src/bridge.rs +++ b/app/src-tauri/src/bridge.rs @@ -298,3 +298,16 @@ pub async fn delete_group(state: BridgeState<'_>, group_id: String) -> Result, surface_id: String) -> Result { data_of(state.request(Cmd::Focus { surface_id: SurfaceId(surface_id) }).await.map_err(|e| e.to_string())?) } + +// ---- M3 event log commands ---- + +#[tauri::command] +pub async fn event_log(state: BridgeState<'_>, limit: Option) -> Result { + data_of(state.request(Cmd::EventLog { limit }).await.map_err(|e| e.to_string())?) +} + +#[tauri::command] +pub async fn mark_read(state: BridgeState<'_>, target: Value) -> Result { + let target: spacesh_proto::MarkReadTarget = serde_json::from_value(target).map_err(|e| format!("invalid mark_read target: {e}"))?; + data_of(state.request(Cmd::MarkRead { target }).await.map_err(|e| e.to_string())?) +} diff --git a/app/src-tauri/src/lib.rs b/app/src-tauri/src/lib.rs index f2bf9d4..7904d80 100644 --- a/app/src-tauri/src/lib.rs +++ b/app/src-tauri/src/lib.rs @@ -47,6 +47,8 @@ pub fn run() { bridge::set_group, bridge::delete_group, bridge::focus, + bridge::event_log, + bridge::mark_read, ]) .run(tauri::generate_context!()) .expect("error while running spacesh"); diff --git a/app/src/App.tsx b/app/src/App.tsx index 0887d81..5aaac1f 100644 --- a/app/src/App.tsx +++ b/app/src/App.tsx @@ -1,11 +1,15 @@ -import { useEffect, useState, useCallback, useRef } from "react"; +import { useEffect, useState, useCallback, useMemo, useRef } from "react"; import { LayoutEngine } from "./LayoutEngine"; import { Sidebar } from "./Sidebar"; -import { PresetPicker } from "./PresetPicker"; +import { TopBar } from "./TopBar"; +import { CenterToolbar } from "./CenterToolbar"; import { Wizard } from "./Wizard"; -import { EventCenter, type FeedEntry } from "./EventCenter"; +import { EventCenter } from "./EventCenter"; import { maybeNotify } from "./notify"; -import { getStatusFull, applyPreset, onDaemonEvent, onDaemonRawEvent, setWorkspaceMeta, focusSurface } from "./socketBridge"; +import { COLORS } from "./theme"; +import { getStatusFull, applyPreset, onDaemonEvent, onDaemonRawEvent, setWorkspaceMeta, focusSurface, getEventLog, markEventsRead } from "./socketBridge"; +import type { EventRecord } from "./socketBridge"; +import { leafIds } from "./layoutTypes"; import type { Group, WorkspaceView, SurfaceState } from "./layoutTypes"; export function App() { @@ -14,14 +18,25 @@ export function App() { const [activeId, setActiveId] = useState(null); const [running, setRunning] = useState>({}); const [states, setStates] = useState>({}); - const [feed, setFeed] = useState([]); + const [events, setEvents] = useState([]); const [wizard, setWizard] = useState(false); - const feedId = useRef(0); + const [eventsOpen, setEventsOpen] = useState(true); + const [focusedId, setFocusedId] = useState(null); const activeRef = useRef(null); const wsRef = useRef([]); activeRef.current = activeId; wsRef.current = workspaces; + const seedEvents = useCallback(async () => { + const log = await getEventLog(); + setEvents((existing) => { + const byId = new Map(); + for (const e of log.events) byId.set(e.id, e); // daemon is authoritative for overlapping ids + for (const e of existing) if (!byId.has(e.id)) byId.set(e.id, e); // keep live events not in the snapshot + return [...byId.values()].sort((a, b) => b.id - a.id).slice(0, 1000); + }); + }, []); + const refresh = useCallback(async () => { const st = await getStatusFull(); setGroups(st.groups); @@ -39,55 +54,64 @@ export function App() { useEffect(() => { void refresh(); + void seedEvents(); const unlisten = onDaemonEvent((evt) => { - if (evt.evt === "state") { - const { surface_id, state } = evt.data; - setStates((m) => ({ ...m, [surface_id]: state })); - const w = wsOf(surface_id); - const agent = w?.surfaces[surface_id]?.spec.agent_label ?? "shell"; - if (["done", "wait", "error"].includes(state)) { - const entry: FeedEntry = { id: feedId.current++, surfaceId: surface_id, workspace: w?.name ?? "?", agent, kind: state, time: "now" }; - setFeed((f) => [entry, ...f].slice(0, 200)); - if (w && w.id !== activeRef.current) void setWorkspaceMeta(w.id, { unread: true }); - void maybeNotify(surface_id, agent, w?.name ?? "?", state); - } + if (evt.evt === "event") { + const rec = evt.data.record; + setEvents((es) => [rec, ...es].slice(0, 1000)); + const w = wsOf(rec.surface_id); + if (w && w.id !== activeRef.current) void setWorkspaceMeta(w.id, { unread: true }); + void maybeNotify(rec.surface_id, rec.agent_label ?? "shell", rec.workspace_name, rec.kind); + } else if (evt.evt === "events_read") { + const ids = new Set(evt.data.ids); + setEvents((es) => es.map((e) => (ids.has(e.id) ? { ...e, read: true } : e))); + } else if (evt.evt === "state") { + setStates((m) => ({ ...m, [evt.data.surface_id]: evt.data.state })); void refresh(); } else if (evt.evt === "exit") { - const w = wsOf(evt.data.surface_id); - const exitEntry: FeedEntry = { id: feedId.current++, surfaceId: evt.data.surface_id, workspace: w?.name ?? "?", agent: w?.surfaces[evt.data.surface_id]?.spec.agent_label ?? "shell", kind: "exit", time: "now" }; - setFeed((f) => [exitEntry, ...f].slice(0, 200)); void refresh(); } else { void refresh(); } }); - const reconnect = onDaemonRawEvent("spacesh:disconnected", () => { void refresh(); }); + const reconnect = onDaemonRawEvent("spacesh:disconnected", () => { void refresh(); void seedEvents(); }); return () => { void unlisten.then((f) => f()); void reconnect.then((f) => f()); }; - }, [refresh]); + }, [refresh, seedEvents]); + const unread = useMemo(() => events.filter((e) => !e.read).length, [events]); const active = workspaces.find((w) => w.id === activeId) ?? null; + const leaves = active ? leafIds(active.layout) : []; + const effectiveFocus = focusedId && leaves.includes(focusedId) ? focusedId : leaves[0] ?? null; function selectWorkspace(id: string) { setActiveId(id); + setFocusedId(null); void setWorkspaceMeta(id, { unread: false }); } return ( -
- setWizard(true)} /> -
- {active && ( -
- { if (active) void applyPreset(active.id, p, []); }} /> +
+ setEventsOpen((v) => !v)} unread={unread} /> +
+ setWizard(true)} /> +
+ {active && ( + { if (active) void applyPreset(active.id, p, []); }} /> + )} +
+ {active + ? + :
No workspace — create one to begin.
}
- )} -
- {active - ? - :
No workspace — create one to begin.
}
+ {eventsOpen && ( + { void markEventsRead({ target: "all" }); }} + onSelect={(sid, id) => { void focusSurface(sid); void markEventsRead({ target: "ids", value: [id] }); }} + /> + )}
- setFeed([])} onSelect={(sid) => { void focusSurface(sid); }} /> {wizard && { setWizard(false); setActiveId(id); void refresh(); }} onCancel={() => setWizard(false)} />}
); diff --git a/app/src/CenterToolbar.tsx b/app/src/CenterToolbar.tsx new file mode 100644 index 0000000..699e5ca --- /dev/null +++ b/app/src/CenterToolbar.tsx @@ -0,0 +1,23 @@ +import { Search } from "lucide-react"; +import { COLORS, FONT } from "./theme"; +import { PresetPicker } from "./PresetPicker"; + +/** Top-of-grid toolbar: layout presets on the left, scrollback search on the right (search is a mock). */ +export function CenterToolbar({ selected, onSelect }: { selected: string; onSelect: (id: string) => void }) { + return ( +
+ +
+
+ + Search scrollback + ⌘F +
+
+ ); +} diff --git a/app/src/EventCenter.tsx b/app/src/EventCenter.tsx index 2415252..561d348 100644 --- a/app/src/EventCenter.tsx +++ b/app/src/EventCenter.tsx @@ -1,37 +1,97 @@ -import type { SurfaceState } from "./layoutTypes"; +import { useState } from "react"; +import { Check, Hourglass, X, Power, Send, MessageSquare } from "lucide-react"; +import { COLORS, FONT } from "./theme"; +import type { EventRecord } from "./socketBridge"; -export interface FeedEntry { - id: number; - surfaceId: string; - workspace: string; - agent: string; - kind: SurfaceState | "exit"; - time: string; +const ICON: Record = { + done: , wait: , error: , exit: , +}; +const COLOR: Record = { + done: COLORS.stDone, wait: COLORS.stWait, error: COLORS.stError, exit: COLORS.textMuted, +}; + +type Tab = "all" | "unread" | "errors"; +const TABS: { id: Tab; label: string }[] = [ + { id: "all", label: "All" }, + { id: "unread", label: "Unread" }, + { id: "errors", label: "Errors" }, +]; + +function rel(ts: number): string { + const s = Math.max(0, Math.floor((Date.now() - ts) / 1000)); + if (s < 60) return `${s}s`; + if (s < 3600) return `${Math.floor(s / 60)}m`; + if (s < 86400) return `${Math.floor(s / 3600)}h`; + return `${Math.floor(s / 86400)}d`; } -const ICON: Record = { done: "✓", wait: "⌛", error: "✕", work: "●", idle: "·", exit: "⏻" }; -const COLOR: Record = { done: "#3FB950", wait: "#F2B84B", error: "#F4544E", work: "#4C8DFF", idle: "#5A6573", exit: "#5A6573" }; +export function EventCenter({ + events, onMarkAllRead, onSelect, +}: { + events: EventRecord[]; + onMarkAllRead: () => void; + onSelect: (surfaceId: string, id: number) => void; +}) { + const [tab, setTab] = useState("all"); + const shown = tab === "unread" ? events.filter((e) => !e.read) + : tab === "errors" ? events.filter((e) => e.kind === "error") + : events; -export function EventCenter({ feed, onMarkRead, onSelect }: { feed: FeedEntry[]; onMarkRead: () => void; onSelect: (surfaceId: string) => void }) { return ( -
+
- Event Center - Mark all read + Event Center + Mark all read
-
- {feed.length === 0 &&
No events yet.
} - {feed.map((e) => ( -
onSelect(e.surfaceId)} - style={{ display: "flex", gap: 9, padding: 10, borderRadius: 8, border: "1px solid #232A33", cursor: "pointer" }}> - {ICON[e.kind]} -
-
{e.workspace} · {e.agent}
-
{e.kind} {e.time}
+ +
+ {TABS.map((t) => { + const on = t.id === tab; + return ( + + ); + })} +
+ +
+ {shown.length === 0 &&
No events yet.
} + {shown.map((e) => ( +
onSelect(e.surface_id, e.id)} + style={{ display: "flex", gap: 9, padding: 10, borderRadius: 8, border: `1px solid ${COLORS.borderSubtle}`, cursor: "pointer", opacity: e.read ? 0.55 : 1 }}> + {ICON[e.kind] ?? null} +
+
{e.workspace_name} · {e.agent_label ?? "shell"}
+
{e.kind} {rel(e.ts)}
+ {!e.read && }
))}
+ + {/* External notification channels — mocked until the daemon subscriber lands (SP5). */} +
+ EXTERNAL NOTIFY +
+ {[ + { name: "Telegram", icon: }, + { name: "MAX", icon: }, + ].map((c) => ( +
+ {c.icon} + {c.name} + +
+ ))} +
+
); } diff --git a/app/src/LayoutEngine.tsx b/app/src/LayoutEngine.tsx index af8c6b5..b528116 100644 --- a/app/src/LayoutEngine.tsx +++ b/app/src/LayoutEngine.tsx @@ -1,7 +1,9 @@ import { useRef } from "react"; +import { Maximize2, RotateCw } from "lucide-react"; import { TerminalView } from "./TerminalView"; import { StatusRing } from "./StatusRing"; -import type { LayoutNode, SurfaceState } from "./layoutTypes"; +import { COLORS, FONT, STATE_COLOR } from "./theme"; +import type { LayoutNode, SurfaceState, SurfaceView } from "./layoutTypes"; import { setRatios, restartSurface } from "./socketBridge"; interface Props { @@ -10,36 +12,81 @@ interface Props { /** surface_id -> running flag, from the latest status/events. */ running: Record; states: Record; + surfaces: Record; + focusedId: string | null; + onFocus: (id: string) => void; } -export function LayoutEngine({ workspaceId, layout, running, states }: Props) { +/** Collapse an absolute cwd into a ~/ style label for the panel header. */ +function shortPath(cwd: string): string { + const leaf = cwd.split("/").filter(Boolean).pop(); + return leaf ? `~/${leaf}` : cwd; +} + +export function LayoutEngine({ workspaceId, layout, running, states, surfaces, focusedId, onFocus }: Props) { if (!layout) { - return
Empty workspace — apply a preset to add panels.
; + return
Empty workspace — apply a preset to add panels.
; } - return ; + return ( +
+ +
+ ); } -function Node({ workspaceId, node, path, running, states }: { workspaceId: string; node: LayoutNode; path: number[]; running: Record; states: Record }) { +function Node({ workspaceId, node, path, running, states, surfaces, focusedId, onFocus }: { + workspaceId: string; node: LayoutNode; path: number[]; + running: Record; states: Record; + surfaces: Record; focusedId: string | null; onFocus: (id: string) => void; +}) { if ("leaf" in node) { const id = node.leaf.surface_id; + const focused = focusedId === id; + const card = (inner: React.ReactNode) => ( +
onFocus(id)} + style={{ + display: "flex", flexDirection: "column", width: "100%", height: "100%", + background: COLORS.bgPanel, borderRadius: 8, overflow: "hidden", + border: focused ? `2px solid ${COLORS.accent}` : `1px solid ${COLORS.borderSubtle}`, + boxSizing: "border-box", + }} + > + {inner} +
+ ); + if (running[id] === false) { - return ( -
-
Process exited
- + return card( +
+
Process exited
+
); } - return ( -
-
- - {id} + + const spec = surfaces[id]?.spec; + const agent = spec?.agent_label ?? "shell"; + const state = states[id] ?? "idle"; + return card( + <> +
+ + {agent} + {spec?.cwd && {shortPath(spec.cwd)}} + + + {state} + +
-
+ ); } @@ -55,7 +102,7 @@ function Node({ workspaceId, node, path, running, states }: { workspaceId: strin next[i + 1] = Math.max(0.05, (next[i + 1] ?? 1) - deltaFrac); void setRatios(workspaceId, path, next); }}> - + ))}
@@ -69,8 +116,7 @@ function Pane({ grow, isLast, orient, onResize, children }: { grow: number; isLa const parent = ref.current?.parentElement; if (!parent) return; const total = orient === "h" ? parent.clientWidth : parent.clientHeight; - const start = orient === "h" ? e.clientX : e.clientY; - let last = start; + let last = orient === "h" ? e.clientX : e.clientY; const move = (ev: MouseEvent) => { const cur = orient === "h" ? ev.clientX : ev.clientY; const delta = (cur - last) / total; @@ -92,9 +138,9 @@ function Pane({ grow, isLast, orient, onResize, children }: { grow: number; isLa {!isLast && (
)} diff --git a/app/src/PresetPicker.tsx b/app/src/PresetPicker.tsx index 5b25e13..040642d 100644 --- a/app/src/PresetPicker.tsx +++ b/app/src/PresetPicker.tsx @@ -11,20 +11,25 @@ export const PRESETS: { id: string; label: string; slots: number }[] = [ { id: "2x4", label: "2×4", slots: 8 }, ]; +import { COLORS, FONT } from "./theme"; + export function PresetPicker({ selected, onSelect }: { selected: string; onSelect: (id: string) => void }) { return ( -
- {PRESETS.map((p) => ( - - ))} +
+ {PRESETS.map((p) => { + const on = p.id === selected; + return ( + + ); + })}
); } diff --git a/app/src/Sidebar.tsx b/app/src/Sidebar.tsx index 6f8794a..ce3954a 100644 --- a/app/src/Sidebar.tsx +++ b/app/src/Sidebar.tsx @@ -1,9 +1,8 @@ +import { useState } from "react"; +import { Plus, ChevronDown, ChevronRight } from "lucide-react"; +import { COLORS, FONT, STATE_COLOR } from "./theme"; import type { Group, WorkspaceView, SurfaceState } from "./layoutTypes"; -const RING: Record = { - error: "#F4544E", wait: "#F2B84B", work: "#4C8DFF", done: "#3FB950", idle: "#5A6573", stopped: "#5A6573", -}; - function aggregate(w: WorkspaceView): SurfaceState | "stopped" { const order: SurfaceState[] = ["error", "wait", "work", "done", "idle"]; const running = Object.values(w.surfaces).filter((s) => s.running); @@ -23,36 +22,67 @@ export function Sidebar({ onSelect: (id: string) => void; onNew: () => void; }) { + const [collapsed, setCollapsed] = useState>({}); const byGroup = (gid: string | null) => workspaces.filter((w) => (w.group_id ?? null) === gid).sort((a, b) => a.order - b.order); const ungrouped = byGroup(null); - const row = (w: WorkspaceView) => ( -
onSelect(w.id)} - style={{ - display: "flex", alignItems: "center", gap: 9, padding: "6px 8px", borderRadius: 6, cursor: "pointer", - background: w.id === activeId ? "#1A2029" : "transparent", fontFamily: "Inter", fontSize: 13, - color: w.id === activeId ? "#E6EDF3" : "#8B97A6", - }}> - - {w.name} - {w.unread && } - {Object.keys(w.surfaces).length} -
- ); + const row = (w: WorkspaceView) => { + const isActive = w.id === activeId; + return ( +
onSelect(w.id)} + style={{ + display: "flex", alignItems: "center", gap: 10, height: 34, padding: "0 8px", borderRadius: 6, cursor: "pointer", + background: isActive ? COLORS.bgElevated : "transparent", fontFamily: FONT.ui, fontSize: 13, + color: isActive ? COLORS.textPrimary : COLORS.textSecondary, + }}> + + {w.name} + {w.unread && } + + {Object.keys(w.surfaces).length} + +
+ ); + }; return ( -
- - {groups.sort((a, b) => a.order - b.order).map((g) => ( -
-
- - {g.name.toUpperCase()} -
- {byGroup(g.id).map(row)} -
- ))} - {ungrouped.length > 0 &&
{ungrouped.map(row)}
} +
+ + +
+ {groups.sort((a, b) => a.order - b.order).map((g) => { + const open = !collapsed[g.id]; + return ( +
+
setCollapsed((c) => ({ ...c, [g.id]: open }))} + style={{ display: "flex", alignItems: "center", gap: 7, height: 24, padding: "0 4px", cursor: "pointer" }}> + {open ? : } + + {g.name.toUpperCase()} +
+ {open && byGroup(g.id).map(row)} +
+ ); + })} + {ungrouped.length > 0 &&
{ungrouped.map(row)}
} +
+ + {/* Daemon status footer — uptime is mocked until the daemon reports it. */} +
+ + spaceshd · live + + 3d 4h +
); } diff --git a/app/src/TerminalView.tsx b/app/src/TerminalView.tsx index f6172f8..fd070de 100644 --- a/app/src/TerminalView.tsx +++ b/app/src/TerminalView.tsx @@ -11,7 +11,7 @@ export function TerminalView({ surfaceId }: { surfaceId: string }) { useEffect(() => { if (!ref.current) return; - const term = new Terminal({ fontFamily: "monospace", fontSize: 13, convertEol: false }); + const term = new Terminal({ fontFamily: "'JetBrains Mono Variable', 'JetBrains Mono', monospace", fontSize: 13, convertEol: false }); try { term.loadAddon(new WebglAddon()); } catch { diff --git a/app/src/TopBar.tsx b/app/src/TopBar.tsx new file mode 100644 index 0000000..72e3342 --- /dev/null +++ b/app/src/TopBar.tsx @@ -0,0 +1,102 @@ +import { FolderGit2, PanelRight, Search, Bell, Settings, ChevronDown } from "lucide-react"; +import { COLORS, FONT } from "./theme"; +import type { WorkspaceView } from "./layoutTypes"; +import { leafIds } from "./layoutTypes"; + +/** Human-readable descriptor of the active workspace layout (mock until a real preset id is tracked). */ +function describeLayout(w: WorkspaceView | null): string { + if (!w || !w.layout) return "no layout"; + const n = leafIds(w.layout).length; + return n === 1 ? "1 pane" : `${n} panes`; +} + +function IconBtn({ icon, onClick, active, title }: { icon: React.ReactNode; onClick?: () => void; active?: boolean; title?: string }) { + return ( + + ); +} + +export function TopBar({ + active, eventsOpen, onToggleEvents, unread, +}: { + active: WorkspaceView | null; + eventsOpen: boolean; + onToggleEvents: () => void; + unread: number; +}) { + return ( +
+ {/* macOS traffic-light spacer — real lights are drawn by the window chrome. */} +
+ + {/* Workspace breadcrumb */} +
+ + + {active?.name ?? "spacesh"} + + {active && ( + <> + / + + {describeLayout(active)} + + + )} +
+ +
+ + {/* Right cluster */} +
+ } onClick={onToggleEvents} active={eventsOpen} title="Toggle Event Center" /> + } title="Search (mock)" /> +
+ } title="Notifications (mock)" /> + {unread > 0 && ( + + {unread > 99 ? "99+" : unread} + + )} +
+ } title="Settings (mock)" /> + + +
+
+ ); +} diff --git a/app/src/main.tsx b/app/src/main.tsx index 5d115fc..4340d68 100644 --- a/app/src/main.tsx +++ b/app/src/main.tsx @@ -1,7 +1,13 @@ import React from "react"; import ReactDOM from "react-dom/client"; import { App } from "./App"; +import "@fontsource/inter/400.css"; +import "@fontsource/inter/500.css"; +import "@fontsource/inter/600.css"; +import "@fontsource/inter/700.css"; +import "@fontsource-variable/jetbrains-mono"; import "@xterm/xterm/css/xterm.css"; +import "./styles.css"; ReactDOM.createRoot(document.getElementById("root")!).render( diff --git a/app/src/notify.ts b/app/src/notify.ts index bb63f7e..5a18f77 100644 --- a/app/src/notify.ts +++ b/app/src/notify.ts @@ -1,12 +1,11 @@ import { isPermissionGranted, requestPermission, sendNotification } from "@tauri-apps/plugin-notification"; import { getCurrentWindow } from "@tauri-apps/api/window"; -import type { SurfaceState } from "./layoutTypes"; -const NOTIFY_STATES: SurfaceState[] = ["done", "wait", "error"]; -let lastBySurface: Record = {}; +const NOTIFY_STATES = ["done", "wait", "error"]; +let lastBySurface: Record = {}; /// Fire a native notification for a status change when the window is unfocused. -export async function maybeNotify(surfaceId: string, agent: string, workspace: string, state: SurfaceState): Promise { +export async function maybeNotify(surfaceId: string, agent: string, workspace: string, state: string): Promise { if (!NOTIFY_STATES.includes(state)) return; if (lastBySurface[surfaceId] === state) return; // dedup repeats lastBySurface[surfaceId] = state; diff --git a/app/src/socketBridge.ts b/app/src/socketBridge.ts index 8dd29bd..a6470ee 100644 --- a/app/src/socketBridge.ts +++ b/app/src/socketBridge.ts @@ -62,6 +62,30 @@ export async function getStatus(): Promise { return data.workspaces; } +export interface EventRecord { + id: number; + surface_id: string; + workspace_id: string; + workspace_name: string; + agent_label: string | null; + kind: "done" | "wait" | "error" | "exit"; + ts: number; + read: boolean; +} + +export type MarkReadTarget = + | { target: "all" } + | { target: "ids"; value: number[] } + | { target: "surface"; value: string }; + +export async function getEventLog(limit?: number): Promise<{ events: EventRecord[]; unread: number }> { + return await invoke<{ events: EventRecord[]; unread: number }>("event_log", { limit: limit ?? null }); +} + +export async function markEventsRead(target: MarkReadTarget): Promise { + await invoke("mark_read", { target }); +} + export type DaemonEvt = | { evt: "exit"; data: { surface_id: string; code: number } } | { evt: "surface_created"; data: { surface_id: string; workspace_id: string } } @@ -69,7 +93,9 @@ export type DaemonEvt = | { evt: "state"; data: { surface_id: string; state: import("./layoutTypes").SurfaceState } } | { evt: "layout_changed"; data: { workspace_id: string } } | { evt: "workspace_changed"; data: unknown } - | { evt: "groups_changed"; data: unknown }; + | { evt: "groups_changed"; data: unknown } + | { evt: "event"; data: { record: EventRecord } } + | { evt: "events_read"; data: { ids: number[] } }; export function onDaemonEvent(handler: (evt: DaemonEvt) => void): Promise<() => void> { return listen("spacesh:evt", (e) => handler(e.payload)); diff --git a/app/src/styles.css b/app/src/styles.css new file mode 100644 index 0000000..5851bf2 --- /dev/null +++ b/app/src/styles.css @@ -0,0 +1,39 @@ +:root { + color-scheme: dark; +} + +* { + box-sizing: border-box; +} + +html, +body, +#root { + margin: 0; + height: 100%; +} + +body { + font-family: "Inter", system-ui, sans-serif; + background: #0e1116; + color: #e6edf3; + -webkit-font-smoothing: antialiased; +} + +button { + font-family: inherit; + cursor: pointer; +} + +/* Thin, unobtrusive scrollbars to match the dark chrome. */ +::-webkit-scrollbar { + width: 8px; + height: 8px; +} +::-webkit-scrollbar-thumb { + background: #232a33; + border-radius: 4px; +} +::-webkit-scrollbar-track { + background: transparent; +} diff --git a/app/src/theme.ts b/app/src/theme.ts new file mode 100644 index 0000000..700b335 --- /dev/null +++ b/app/src/theme.ts @@ -0,0 +1,36 @@ +import type { SurfaceState } from "./layoutTypes"; + +/** Design tokens — mirror of DOCS/space-sh.pen variables. Single source for the UI. */ +export const COLORS = { + accent: "#4C8DFF", + bgApp: "#0E1116", + bgElevated: "#1A2029", + bgHover: "#222A35", + bgPanel: "#0A0D12", + bgSidebar: "#13171F", + borderStrong: "#323C49", + borderSubtle: "#232A33", + textPrimary: "#E6EDF3", + textSecondary: "#8B97A6", + textMuted: "#5A6573", + stWork: "#4C8DFF", + stWait: "#F2B84B", + stDone: "#3FB950", + stError: "#F4544E", + stIdle: "#5A6573", +} as const; + +export const FONT = { + ui: "Inter, system-ui, sans-serif", + mono: "'JetBrains Mono Variable', 'JetBrains Mono', monospace", +} as const; + +/** Status color by surface state, plus the stopped pseudo-state. */ +export const STATE_COLOR: Record = { + work: COLORS.stWork, + wait: COLORS.stWait, + done: COLORS.stDone, + error: COLORS.stError, + idle: COLORS.stIdle, + stopped: COLORS.stIdle, +}; diff --git a/crates/spacesh-proto/src/event.rs b/crates/spacesh-proto/src/event.rs new file mode 100644 index 0000000..0a1ca41 --- /dev/null +++ b/crates/spacesh-proto/src/event.rs @@ -0,0 +1,76 @@ +use serde::{Deserialize, Serialize}; +use crate::ids::{SurfaceId, WorkspaceId}; + +/// The subset of activity that lands in the event log. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum EventKind { + Done, + Wait, + Error, + Exit, +} + +/// One logged event. Workspace name and agent label are denormalized so the +/// feed stays displayable after the surface or workspace is closed. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct EventRecord { + pub id: u64, + pub surface_id: SurfaceId, + pub workspace_id: WorkspaceId, + pub workspace_name: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub agent_label: Option, + pub kind: EventKind, + pub ts: u64, + pub read: bool, +} + +/// What a `mark_read` request targets. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "target", content = "value", rename_all = "snake_case")] +pub enum MarkReadTarget { + All, + Ids(Vec), + Surface(SurfaceId), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn event_kind_serializes_lowercase() { + assert_eq!(serde_json::to_string(&EventKind::Done).unwrap(), r#""done""#); + assert_eq!(serde_json::to_string(&EventKind::Exit).unwrap(), r#""exit""#); + } + + #[test] + fn event_record_round_trips() { + let r = EventRecord { + id: 7, + surface_id: SurfaceId("s_1".into()), + workspace_id: WorkspaceId("w_1".into()), + workspace_name: "infra".into(), + agent_label: Some("claude".into()), + kind: EventKind::Error, + ts: 1_700_000_000_000, + read: false, + }; + let back: EventRecord = serde_json::from_str(&serde_json::to_string(&r).unwrap()).unwrap(); + assert_eq!(back, r); + } + + #[test] + fn mark_read_target_variants_serialize() { + assert_eq!(serde_json::to_string(&MarkReadTarget::All).unwrap(), r#"{"target":"all"}"#); + assert_eq!( + serde_json::to_string(&MarkReadTarget::Ids(vec![1, 2])).unwrap(), + r#"{"target":"ids","value":[1,2]}"# + ); + let s = MarkReadTarget::Surface(SurfaceId("s_9".into())); + assert_eq!(serde_json::to_string(&s).unwrap(), r#"{"target":"surface","value":"s_9"}"#); + let back: MarkReadTarget = serde_json::from_str(&serde_json::to_string(&s).unwrap()).unwrap(); + assert_eq!(back, s); + } +} diff --git a/crates/spacesh-proto/src/lib.rs b/crates/spacesh-proto/src/lib.rs index 987b410..addc79f 100644 --- a/crates/spacesh-proto/src/lib.rs +++ b/crates/spacesh-proto/src/lib.rs @@ -1,10 +1,12 @@ pub mod codec; +pub mod event; pub mod ids; pub mod layout; pub mod message; pub mod status; pub mod workspace; +pub use event::{EventKind, EventRecord, MarkReadTarget}; pub use ids::{GroupId, SurfaceId, WorkspaceId}; pub use layout::{LayoutNode, Orient}; pub use message::{Cmd, Envelope, ErrorBody, Evt}; diff --git a/crates/spacesh-proto/src/message.rs b/crates/spacesh-proto/src/message.rs index dc26265..3c682ba 100644 --- a/crates/spacesh-proto/src/message.rs +++ b/crates/spacesh-proto/src/message.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use crate::event::{EventRecord, MarkReadTarget}; use crate::ids::{GroupId, SurfaceId, WorkspaceId}; use crate::layout::LayoutNode; use crate::status::SurfaceState; @@ -116,6 +117,11 @@ pub enum Cmd { }, DeleteGroup { group_id: GroupId }, SetState { surface_id: SurfaceId, state: SurfaceState }, + EventLog { + #[serde(default, skip_serializing_if = "Option::is_none")] + limit: Option, + }, + MarkRead { target: MarkReadTarget }, Status, Shutdown, } @@ -134,6 +140,8 @@ pub enum Evt { GroupsChanged { groups: Vec }, SurfaceRestarted { surface_id: SurfaceId }, State { surface_id: SurfaceId, state: SurfaceState }, + Event { record: EventRecord }, + EventsRead { ids: Vec }, } #[cfg(test)] @@ -262,4 +270,77 @@ mod tests { let back: Envelope = serde_json::from_str(&j).unwrap(); assert_eq!(back, evt); } + + #[test] + fn event_log_cmd_round_trips() { + let env = Envelope::Req { id: 1, cmd: Cmd::EventLog { limit: Some(50) } }; + let j = serde_json::to_string(&env).unwrap(); + assert!(j.contains(r#""cmd":"event_log""#)); + let back: Envelope = serde_json::from_str(&j).unwrap(); + assert_eq!(back, env); + } + + #[test] + fn mark_read_cmd_round_trips() { + let env = Envelope::Req { + id: 2, + cmd: Cmd::MarkRead { target: crate::event::MarkReadTarget::All }, + }; + let j = serde_json::to_string(&env).unwrap(); + assert!(j.contains(r#""cmd":"mark_read""#)); + let back: Envelope = serde_json::from_str(&j).unwrap(); + assert_eq!(back, env); + } + + #[test] + fn event_evt_round_trips() { + let evt = Envelope::Evt(Evt::Event { + record: crate::event::EventRecord { + id: 3, + surface_id: SurfaceId("s_1".into()), + workspace_id: WorkspaceId("w_1".into()), + workspace_name: "p".into(), + agent_label: None, + kind: crate::event::EventKind::Done, + ts: 1, + read: false, + }, + }); + let j = serde_json::to_string(&evt).unwrap(); + assert!(j.contains(r#""evt":"event""#)); + let back: Envelope = serde_json::from_str(&j).unwrap(); + assert_eq!(back, evt); + } + + #[test] + fn events_read_evt_round_trips() { + let evt = Envelope::Evt(Evt::EventsRead { ids: vec![1, 2, 3] }); + let j = serde_json::to_string(&evt).unwrap(); + assert!(j.contains(r#""evt":"events_read""#)); + let back: Envelope = serde_json::from_str(&j).unwrap(); + assert_eq!(back, evt); + } + + #[test] + fn event_log_cmd_no_limit_round_trips() { + let env = Envelope::Req { id: 9, cmd: Cmd::EventLog { limit: None } }; + let j = serde_json::to_string(&env).unwrap(); + assert!(j.contains(r#""cmd":"event_log""#)); + assert!(j.contains(r#""args":{}"#), "no-limit serializes to empty args, got: {j}"); + let back: Envelope = serde_json::from_str(&j).unwrap(); + assert_eq!(back, env); + } + + #[test] + fn mark_read_cmd_ids_and_surface_round_trip() { + let ids = Envelope::Req { id: 10, cmd: Cmd::MarkRead { target: crate::event::MarkReadTarget::Ids(vec![1, 2]) } }; + let j = serde_json::to_string(&ids).unwrap(); + assert!(j.contains(r#""target":"ids""#)); + assert_eq!(serde_json::from_str::(&j).unwrap(), ids); + + let surf = Envelope::Req { id: 11, cmd: Cmd::MarkRead { target: crate::event::MarkReadTarget::Surface(SurfaceId("s_3".into())) } }; + let j = serde_json::to_string(&surf).unwrap(); + assert!(j.contains(r#""target":"surface""#)); + assert_eq!(serde_json::from_str::(&j).unwrap(), surf); + } } diff --git a/crates/spaceshd/src/event_log.rs b/crates/spaceshd/src/event_log.rs new file mode 100644 index 0000000..d60b614 --- /dev/null +++ b/crates/spaceshd/src/event_log.rs @@ -0,0 +1,212 @@ +use std::collections::VecDeque; +use serde::{Deserialize, Serialize}; +use spacesh_proto::event::{EventKind, EventRecord, MarkReadTarget}; +use spacesh_proto::ids::{SurfaceId, WorkspaceId}; + +const SNAPSHOT_VERSION: u32 = 1; + +/// Serializable form of the log, used for persistence. +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +pub struct EventLogState { + pub version: u32, + pub next_id: u64, + #[serde(default)] + pub records: Vec, +} + +/// In-memory event log: a capped ring with monotonic ids. +pub struct EventLog { + records: VecDeque, + next_id: u64, + cap: usize, +} + +impl EventLog { + #[cfg(test)] + pub fn new(cap: usize) -> Self { + Self { records: VecDeque::new(), next_id: 1, cap } + } + + /// Rebuild from a persisted snapshot, clamping to `cap` (keeping newest). + pub fn restore(state: EventLogState, cap: usize) -> Self { + let mut records: VecDeque = state.records.into_iter().collect(); + while records.len() > cap { + records.pop_front(); + } + let max_record_id = records.iter().map(|r| r.id).max().unwrap_or(0); + let next_id = state.next_id.max(max_record_id + 1).max(1); + Self { records, next_id, cap } + } + + /// Append a new event. Evicts the oldest when over capacity. Returns the + /// stored record (with its assigned id) for broadcasting. + #[allow(clippy::too_many_arguments)] + pub fn record( + &mut self, + surface_id: SurfaceId, + workspace_id: WorkspaceId, + workspace_name: String, + agent_label: Option, + kind: EventKind, + ts: u64, + ) -> EventRecord { + let rec = EventRecord { + id: self.next_id, + surface_id, + workspace_id, + workspace_name, + agent_label, + kind, + ts, + read: false, + }; + self.next_id += 1; + self.records.push_back(rec.clone()); + while self.records.len() > self.cap { + self.records.pop_front(); + } + rec + } + + /// Flip matching records to read. Returns the ids that actually changed. + pub fn mark_read(&mut self, target: &MarkReadTarget) -> Vec { + let mut changed = Vec::new(); + for r in self.records.iter_mut() { + if r.read { + continue; + } + let hit = match target { + MarkReadTarget::All => true, + MarkReadTarget::Ids(ids) => ids.contains(&r.id), + MarkReadTarget::Surface(sid) => &r.surface_id == sid, + }; + if hit { + r.read = true; + changed.push(r.id); + } + } + changed + } + + pub fn unread_count(&self) -> u32 { + self.records.iter().filter(|r| !r.read).count() as u32 + } + + /// Most-recent-first, optionally capped to `limit`. + pub fn recent(&self, limit: Option) -> Vec { + let iter = self.records.iter().rev().cloned(); + match limit { + Some(n) => iter.take(n as usize).collect(), + None => iter.collect(), + } + } + + pub fn snapshot(&self) -> EventLogState { + EventLogState { + version: SNAPSHOT_VERSION, + next_id: self.next_id, + records: self.records.iter().cloned().collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn rec(log: &mut EventLog, sid: &str, kind: EventKind) -> EventRecord { + log.record( + SurfaceId(sid.into()), + WorkspaceId("w_1".into()), + "infra".into(), + Some("claude".into()), + kind, + 1, + ) + } + + #[test] + fn record_assigns_monotonic_ids() { + let mut log = EventLog::new(10); + assert_eq!(rec(&mut log, "s_1", EventKind::Done).id, 1); + assert_eq!(rec(&mut log, "s_1", EventKind::Wait).id, 2); + } + + #[test] + fn push_beyond_cap_evicts_oldest() { + let mut log = EventLog::new(2); + rec(&mut log, "s_1", EventKind::Done); // id 1 + rec(&mut log, "s_2", EventKind::Done); // id 2 + rec(&mut log, "s_3", EventKind::Done); // id 3, evicts id 1 + let ids: Vec = log.recent(None).iter().map(|r| r.id).collect(); + assert_eq!(ids, vec![3, 2]); // newest first, id 1 gone + } + + #[test] + fn mark_read_by_surface_then_ids_then_all() { + let mut log = EventLog::new(10); + rec(&mut log, "s_1", EventKind::Done); // 1 + rec(&mut log, "s_2", EventKind::Error); // 2 + rec(&mut log, "s_1", EventKind::Wait); // 3 + assert_eq!(log.unread_count(), 3); + + let changed = log.mark_read(&MarkReadTarget::Surface(SurfaceId("s_1".into()))); + assert_eq!(changed, vec![1, 3]); + assert_eq!(log.unread_count(), 1); + + // Re-marking the same surface changes nothing. + assert!(log.mark_read(&MarkReadTarget::Surface(SurfaceId("s_1".into()))).is_empty()); + + let changed = log.mark_read(&MarkReadTarget::Ids(vec![2, 999])); + assert_eq!(changed, vec![2]); + assert_eq!(log.unread_count(), 0); + + assert!(log.mark_read(&MarkReadTarget::All).is_empty()); + } + + #[test] + fn snapshot_restore_preserves_next_id_and_records() { + let mut log = EventLog::new(10); + rec(&mut log, "s_1", EventKind::Done); + rec(&mut log, "s_2", EventKind::Done); + let snap = log.snapshot(); + assert_eq!(snap.next_id, 3); + + let restored = EventLog::restore(snap, 10); + assert_eq!(restored.recent(None).len(), 2); + // Next recorded id continues from 3, no reuse. + let mut restored = restored; + assert_eq!(rec(&mut restored, "s_3", EventKind::Done).id, 3); + } + + #[test] + fn restore_clamps_to_cap_keeping_newest() { + let state = EventLogState { + version: 1, + next_id: 4, + records: vec![ + EventRecord { id: 1, surface_id: SurfaceId("a".into()), workspace_id: WorkspaceId("w".into()), workspace_name: "x".into(), agent_label: None, kind: EventKind::Done, ts: 1, read: false }, + EventRecord { id: 2, surface_id: SurfaceId("a".into()), workspace_id: WorkspaceId("w".into()), workspace_name: "x".into(), agent_label: None, kind: EventKind::Done, ts: 1, read: false }, + EventRecord { id: 3, surface_id: SurfaceId("a".into()), workspace_id: WorkspaceId("w".into()), workspace_name: "x".into(), agent_label: None, kind: EventKind::Done, ts: 1, read: false }, + ], + }; + let log = EventLog::restore(state, 2); + let ids: Vec = log.recent(None).iter().map(|r| r.id).collect(); + assert_eq!(ids, vec![3, 2]); + } + + #[test] + fn restore_reconciles_next_id_against_records() { + // Snapshot claims next_id=1 but already holds id=5 → next must jump past 5. + let state = EventLogState { + version: 1, + next_id: 1, + records: vec![EventRecord { + id: 5, surface_id: SurfaceId("a".into()), workspace_id: WorkspaceId("w".into()), + workspace_name: "x".into(), agent_label: None, kind: EventKind::Done, ts: 1, read: false, + }], + }; + let mut log = EventLog::restore(state, 10); + assert_eq!(rec(&mut log, "s_1", EventKind::Done).id, 6); + } +} diff --git a/crates/spaceshd/src/event_store.rs b/crates/spaceshd/src/event_store.rs new file mode 100644 index 0000000..202f7fe --- /dev/null +++ b/crates/spaceshd/src/event_store.rs @@ -0,0 +1,200 @@ +use std::path::PathBuf; +use std::sync::Arc; +use anyhow::Result; +use tokio::sync::mpsc; +use tokio::time::{Duration, Instant}; +use crate::event_log::EventLogState; + +pub trait EventStore: Send + Sync { + fn load(&self) -> Result; + fn save(&self, state: &EventLogState) -> Result<()>; +} + +/// JSON file store with atomic write (temp + fsync + rename) and corrupt backup. +pub struct JsonEventStore { + path: PathBuf, +} + +impl JsonEventStore { + pub fn new(path: PathBuf) -> Self { + Self { path } + } + + fn backup_corrupt(&self, ts: u128) { + let bak = self.path.with_extension(format!("corrupt-{ts}")); + let _ = std::fs::rename(&self.path, bak); + } +} + +impl EventStore for JsonEventStore { + fn load(&self) -> Result { + if !self.path.exists() { + return Ok(EventLogState { version: 1, next_id: 1, records: vec![] }); + } + let bytes = std::fs::read(&self.path)?; + match serde_json::from_slice::(&bytes) { + Ok(state) => Ok(state), + Err(_) => { + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or(0); + self.backup_corrupt(ts); + Ok(EventLogState { version: 1, next_id: 1, records: vec![] }) + } + } + } + + fn save(&self, state: &EventLogState) -> Result<()> { + if let Some(parent) = self.path.parent() { + std::fs::create_dir_all(parent)?; + } + let tmp = self.path.with_extension("json.tmp"); + let bytes = serde_json::to_vec_pretty(state)?; + std::fs::write(&tmp, &bytes)?; + let f = std::fs::File::open(&tmp)?; + // fsync the temp file before rename for durability. + f.sync_all()?; + std::fs::rename(&tmp, &self.path)?; + Ok(()) + } +} + +/// Handle the recorder uses to request a debounced persist. +#[derive(Clone)] +pub struct EventPersister { + tx: mpsc::Sender, +} + +impl EventPersister { + pub fn mark_dirty(&self, state: EventLogState) { + // Best-effort; dropping a snapshot is fine because a newer one will arrive. + let _ = self.tx.try_send(state); + } +} + +/// Spawn the debounce task; coalesces a burst into one save. +pub fn spawn(store: Arc, debounce: Duration) -> EventPersister { + let (tx, mut rx) = mpsc::channel::(64); + tokio::spawn(async move { + let mut latest: Option = None; + let mut deadline: Option = None; + loop { + let timer = async { + match deadline { + Some(d) => tokio::time::sleep_until(d).await, + None => std::future::pending::<()>().await, + } + }; + tokio::select! { + msg = rx.recv() => { + match msg { + Some(state) => { + latest = Some(state); + deadline = Some(Instant::now() + debounce); + } + None => { + // channel closed: final flush then exit + if let Some(s) = latest.take() { let _ = store.save(&s); } + break; + } + } + } + _ = timer => { + if let Some(s) = latest.take() { let _ = store.save(&s); } + deadline = None; + } + } + } + }); + EventPersister { tx } +} + +#[cfg(test)] +mod tests { + use super::*; + use spacesh_proto::event::{EventKind, EventRecord}; + use spacesh_proto::ids::{SurfaceId, WorkspaceId}; + + fn tmp_file(name: &str) -> PathBuf { + let mut p = std::env::temp_dir(); + let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos(); + p.push(format!("spacesh-events-{name}-{n}.json")); + p + } + + fn sample() -> EventLogState { + EventLogState { + version: 1, + next_id: 2, + records: vec![EventRecord { + id: 1, + surface_id: SurfaceId("s_1".into()), + workspace_id: WorkspaceId("w_1".into()), + workspace_name: "infra".into(), + agent_label: Some("claude".into()), + kind: EventKind::Done, + ts: 1, + read: false, + }], + } + } + + #[test] + fn save_then_load_round_trips() { + let path = tmp_file("roundtrip"); + let store = JsonEventStore::new(path.clone()); + store.save(&sample()).unwrap(); + assert_eq!(store.load().unwrap(), sample()); + let _ = std::fs::remove_file(path); + } + + #[test] + fn missing_file_loads_empty() { + let store = JsonEventStore::new(tmp_file("missing")); + let s = store.load().unwrap(); + assert_eq!(s.next_id, 1); + assert!(s.records.is_empty()); + } + + #[test] + fn corrupt_file_is_backed_up_and_load_returns_empty() { + let path = tmp_file("corrupt"); + std::fs::write(&path, b"{ not valid json").unwrap(); + let store = JsonEventStore::new(path.clone()); + let s = store.load().unwrap(); + assert!(s.records.is_empty()); + assert!(!path.exists()); + let _ = std::fs::remove_file(path); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn burst_coalesces_to_one_save() { + struct Counting { + saves: std::sync::atomic::AtomicUsize, + last: std::sync::Mutex>, + } + impl EventStore for Counting { + fn load(&self) -> Result { Ok(EventLogState::default()) } + fn save(&self, s: &EventLogState) -> Result<()> { + self.saves.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + *self.last.lock().unwrap() = Some(s.clone()); + Ok(()) + } + } + let store = Arc::new(Counting { + saves: std::sync::atomic::AtomicUsize::new(0), + last: std::sync::Mutex::new(None), + }); + let p = spawn(store.clone(), Duration::from_millis(80)); + for v in 1..=5u64 { + let mut s = EventLogState::default(); + s.next_id = v; + p.mark_dirty(s); + tokio::time::sleep(Duration::from_millis(10)).await; + } + tokio::time::sleep(Duration::from_millis(200)).await; + assert_eq!(store.saves.load(std::sync::atomic::Ordering::SeqCst), 1, "burst should coalesce to one save"); + assert_eq!(store.last.lock().unwrap().as_ref().unwrap().next_id, 5, "save uses the latest snapshot"); + } +} diff --git a/crates/spaceshd/src/main.rs b/crates/spaceshd/src/main.rs index 3715507..054364c 100644 --- a/crates/spaceshd/src/main.rs +++ b/crates/spaceshd/src/main.rs @@ -1,3 +1,5 @@ +mod event_log; +mod event_store; mod hooks; mod launchd; mod lifecycle; @@ -54,6 +56,9 @@ async fn run_daemon() -> Result<()> { let state_path = lifecycle::spacesh_dir()?.join("state.json"); let store: std::sync::Arc = std::sync::Arc::new(state_store::JsonStateStore::new(state_path)); + let events_path = lifecycle::spacesh_dir()?.join("events.json"); + let event_store: std::sync::Arc = + std::sync::Arc::new(event_store::JsonEventStore::new(events_path)); eprintln!("spaceshd listening on {}", sock.display()); - server::serve(&sock, store).await + server::serve(&sock, store, event_store).await } diff --git a/crates/spaceshd/src/server.rs b/crates/spaceshd/src/server.rs index 062aac5..ac3c214 100644 --- a/crates/spaceshd/src/server.rs +++ b/crates/spaceshd/src/server.rs @@ -9,6 +9,8 @@ use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId, WorkspaceId}; use spacesh_proto::status::SurfaceState; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{mpsc, oneshot}; +use crate::event_log::EventLog; +use crate::event_store::{self, EventPersister, EventStore}; use crate::persist::{self, Persister}; use crate::registry::Registry; use crate::state_store::StateStore; @@ -35,7 +37,7 @@ enum ServerMsg { type ClientId = u64; -pub async fn serve(socket: &Path, store: Arc) -> Result<()> { +pub async fn serve(socket: &Path, store: Arc, event_store: Arc) -> Result<()> { let listener = UnixListener::bind(socket)?; let (router_tx, router_rx) = mpsc::channel::(256); @@ -58,7 +60,12 @@ pub async fn serve(socket: &Path, store: Arc) -> Result<()> { let persister = persist::spawn(store.clone(), Duration::from_millis(500)); let initial = store.load().unwrap_or_default(); - let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx, state_tx, persister, initial)); + let event_persister = event_store::spawn(event_store.clone(), Duration::from_millis(500)); + let event_initial = event_store.load().unwrap_or_default(); + let shutdown = tokio::spawn(router( + router_rx, router_tx.clone(), exit_tx, state_tx, + persister, initial, event_persister, event_initial, + )); let mut next_client: ClientId = 0; loop { @@ -116,9 +123,12 @@ async fn router( state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, persister: Persister, initial: crate::state_store::PersistState, + event_persister: EventPersister, + event_initial: crate::event_log::EventLogState, ) { let mut reg = Registry::new(); reg.restore(initial); + let mut event_log = EventLog::restore(event_initial, 1000); let mut clients: HashMap = HashMap::new(); // surface_id → set of client ids subscribed (attached). let mut subs: HashMap> = HashMap::new(); @@ -147,17 +157,24 @@ async fn router( ServerMsg::Exit { surface_id, code } => { reg.mark_stopped(&surface_id); reg.drop_state(&surface_id); + record_event(®, &mut event_log, &event_persister, &clients, + &surface_id, spacesh_proto::event::EventKind::Exit); let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code }); broadcast_evt(&clients, &evt); } ServerMsg::StateDetected { surface_id, state } => { if reg.is_running(&surface_id) { reg.set_state(&surface_id, state); - broadcast_evt(&clients, &Envelope::Evt(Evt::State { surface_id, state })); + broadcast_evt(&clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state })); + if let Some(kind) = kind_for_state(state) { + record_event(®, &mut event_log, &event_persister, &clients, &surface_id, kind); + } } } ServerMsg::Request { id, cmd, client, out } => { - handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx, &state_tx, &persister).await; + handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, + &router_tx, &exit_tx, &state_tx, &persister, + &mut event_log, &event_persister).await; } } } @@ -169,6 +186,46 @@ fn broadcast_evt(clients: &HashMap, evt: &Envelope) { } } +/// Current unix-epoch milliseconds. `as u64` is safe — epoch millis fit u64 for ~584M years. +fn now_millis() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + +/// Which state transitions are worth logging. work/idle are noise → None. +fn kind_for_state(state: SurfaceState) -> Option { + use spacesh_proto::event::EventKind; + match state { + SurfaceState::Done => Some(EventKind::Done), + SurfaceState::Wait => Some(EventKind::Wait), + SurfaceState::Error => Some(EventKind::Error), + SurfaceState::Work | SurfaceState::Idle => None, + } +} + +/// Record one event (denormalizing workspace name + agent label), persist, broadcast. +fn record_event( + reg: &Registry, + log: &mut EventLog, + persister: &EventPersister, + clients: &HashMap, + sid: &SurfaceId, + kind: spacesh_proto::event::EventKind, +) { + // No workspace → the surface was already removed (user-initiated Close / ApplyPreset / + // CloseWorkspace remove it synchronously before the async Exit arrives). Such deliberate + // closes are intentionally NOT logged — only spontaneous process exits and status + // transitions become events. + let Some(ws_id) = reg.workspace_of(sid) else { return }; + let ws_name = reg.workspace(&ws_id).map(|w| w.name.clone()).unwrap_or_default(); + let agent = reg.surface_spec(sid).and_then(|s| s.agent_label); + let rec = log.record(sid.clone(), ws_id, ws_name, agent, kind, now_millis()); + persister.mark_dirty(log.snapshot()); + broadcast_evt(clients, &Envelope::Evt(Evt::Event { record: rec })); +} + fn ok(id: u64, data: serde_json::Value) -> Envelope { Envelope::Res { id, ok: true, data, error: None } } @@ -213,6 +270,8 @@ async fn handle_request( exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>, state_tx: &mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, persister: &Persister, + event_log: &mut EventLog, + event_persister: &EventPersister, ) { use spacesh_proto::message::SplitDir; use spacesh_proto::layout::{LayoutNode, Orient}; @@ -491,7 +550,14 @@ async fn handle_request( let _ = out.send(ok(id, serde_json::Value::Null)).await; } - Cmd::Focus { surface_id: _ } => { let _ = out.send(ok(id, serde_json::Value::Null)).await; } + Cmd::Focus { surface_id } => { + let ids = event_log.mark_read(&spacesh_proto::event::MarkReadTarget::Surface(surface_id)); + if !ids.is_empty() { + event_persister.mark_dirty(event_log.snapshot()); + broadcast_evt(clients, &Envelope::Evt(Evt::EventsRead { ids })); + } + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } Cmd::Close { surface_id } => { if reg.surface_spec(&surface_id).is_some() { @@ -514,6 +580,9 @@ async fn handle_request( if reg.is_running(&surface_id) { reg.set_state(&surface_id, state); broadcast_evt(clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state })); + if let Some(kind) = kind_for_state(state) { + record_event(reg, event_log, event_persister, clients, &surface_id, kind); + } let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { // unknown or stopped surface — status is only meaningful while running. @@ -526,6 +595,21 @@ async fn handle_request( let _ = out.send(ok(id, serde_json::json!({ "groups": groups, "workspaces": workspaces }))).await; } + Cmd::EventLog { limit } => { + let events = event_log.recent(limit); + let unread = event_log.unread_count(); + let _ = out.send(ok(id, serde_json::json!({ "events": events, "unread": unread }))).await; + } + + Cmd::MarkRead { target } => { + let ids = event_log.mark_read(&target); + if !ids.is_empty() { + event_persister.mark_dirty(event_log.snapshot()); + broadcast_evt(clients, &Envelope::Evt(Evt::EventsRead { ids })); + } + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } + Cmd::Shutdown => { let _ = out.send(ok(id, serde_json::Value::Null)).await; std::process::exit(0); @@ -589,6 +673,12 @@ mod tests { p } + /// Build an event store whose file lives inside the per-test temp dir so it is + /// cleaned up with the rest of the test fixtures (not left in the global temp root). + fn make_event_store(dir: &Path) -> std::sync::Arc { + std::sync::Arc::new(crate::event_store::JsonEventStore::new(dir.join("events.json"))) + } + async fn wait_for_socket(sock: &Path) { for _ in 0..300 { if UnixStream::connect(sock).await.is_ok() { return; } @@ -604,9 +694,10 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -645,9 +736,10 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Input { @@ -670,9 +762,10 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; // First client: open, new surface that prints a marker, attach, then disconnect. @@ -709,8 +802,9 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -738,8 +832,9 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -779,8 +874,11 @@ mod tests { { let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); + // Both daemon instances in this test share ONE event-store file under the + // per-test dir so instance B reads from disk what instance A persisted. + let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; @@ -795,8 +893,9 @@ mod tests { let sock_b = dir.join("sock2"); let store_b: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); + let event_store_b = make_event_store(&dir); let sb2 = sock_b.clone(); - tokio::spawn(async move { let _ = serve(&sock_b, store_b).await; }); + tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b).await; }); wait_for_socket(&sb2).await; let mut s2 = UnixStream::connect(&sb2).await.unwrap(); let r = req(&mut s2, 1, Cmd::Status).await; @@ -816,8 +915,9 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -844,4 +944,360 @@ mod tests { } assert!(saw_done, "expected a Done state event from OSC 133"); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn set_state_done_emits_event_record() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + let sock_for_task = sock.clone(); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + wait_for_socket(&sock).await; + + // Control connection: open workspace and spawn surface. + let mut ctrl = UnixStream::connect(&sock).await.unwrap(); + let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut ctrl, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws.clone()), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "sleep 5".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + // Observer connection: a second client that receives all broadcast events + // without its read loop consuming them via req(). + let mut observer = UnixStream::connect(&sock).await.unwrap(); + + // Trigger Done via SetState on the control connection. + let _ = req(&mut ctrl, 3, Cmd::SetState { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + state: spacesh_proto::status::SurfaceState::Done, + }).await; + + // Expect an Evt::Event for this surface on the observer within a short window. + let mut found = None; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(env))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await { + if let Envelope::Evt(Evt::Event { record }) = env { + if record.surface_id.0 == sid { found = Some(record); break; } + } + } + } + let rec = found.expect("expected an Evt::Event for the surface"); + assert_eq!(rec.kind, spacesh_proto::event::EventKind::Done); + assert!(!rec.read); + assert_eq!(rec.workspace_id.0, ws); + assert!(!rec.workspace_name.is_empty(), "workspace name should be denormalized into the record"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn close_does_not_emit_event_record() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + let sock_for_task = sock.clone(); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + wait_for_socket(&sock).await; + + // Control connection: open workspace and spawn surface. + let mut ctrl = UnixStream::connect(&sock).await.unwrap(); + let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut ctrl, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws.clone()), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "sleep 5".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + // Observer connection: receives all broadcast events. + let mut observer = UnixStream::connect(&sock).await.unwrap(); + + // User-initiated Close on the control connection. + let _ = req(&mut ctrl, 3, Cmd::Close { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + }).await; + + // A deliberate Close must surface an Evt::Exit but NEVER an Evt::Event for it. + let mut saw_exit = false; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(env))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await { + match env { + Envelope::Evt(Evt::Event { record }) if record.surface_id.0 == sid => { + panic!("user-initiated Close must not produce an Evt::Event"); + } + Envelope::Evt(Evt::Exit { surface_id, .. }) if surface_id.0 == sid => { + saw_exit = true; + } + _ => {} + } + } + } + assert!(saw_exit, "expected an Evt::Exit for the closed surface"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn osc133_detected_state_emits_event_record() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + let sock2 = sock.clone(); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); + wait_for_socket(&sock).await; + + // Control connection: open workspace and spawn a surface that emits OSC 133. + let mut ctrl = UnixStream::connect(&sock).await.unwrap(); + let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut ctrl, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws.clone()), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "printf '\\033]133;C\\007'; printf hi; printf '\\033]133;D;0\\007'; sleep 1".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + // Observer connection: receives all broadcast events (the detected-state path + // flows through ServerMsg::StateDetected → record_event → Evt::Event). + let mut observer = UnixStream::connect(&sock).await.unwrap(); + + // Drive the PTY output by attaching the control connection. + let _ = req(&mut ctrl, 3, Cmd::Attach { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + }).await; + + // Expect an Evt::Event (kind=done) for this surface from the OSC 133 Done detection. + let mut found = None; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(env))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await { + if let Envelope::Evt(Evt::Event { record }) = env { + if record.surface_id.0 == sid { found = Some(record); break; } + } + } + } + let rec = found.expect("expected an Evt::Event from the OSC 133 detected state"); + assert_eq!(rec.kind, spacesh_proto::event::EventKind::Done); + assert!(!rec.read); + assert_eq!(rec.workspace_id.0, ws); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn event_log_query_and_mark_read() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + let sock_for_task = sock.clone(); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + wait_for_socket(&sock).await; + + // Observer connection to catch the EventsRead broadcast. + let mut obs = UnixStream::connect(&sock).await.unwrap(); + // Control connection. + let mut s = UnixStream::connect(&sock).await.unwrap(); + + let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut s, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "sleep 5".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + let _ = req(&mut s, 3, Cmd::SetState { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + state: spacesh_proto::status::SurfaceState::Error, + }).await; + + // Query the log. + let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await; + let data = res_data(&log); + assert_eq!(data["unread"].as_u64().unwrap(), 1); + let first_id = data["events"][0]["id"].as_u64().unwrap(); + + // Mark it read by id. + let _ = req(&mut s, 5, Cmd::MarkRead { + target: spacesh_proto::event::MarkReadTarget::Ids(vec![first_id]), + }).await; + + // Observer should see EventsRead { ids: [first_id] }. + let mut saw_read = false; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(env))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut obs)).await { + if let Envelope::Evt(Evt::EventsRead { ids }) = env { + if ids == vec![first_id] { saw_read = true; break; } + } + } + } + assert!(saw_read, "expected an EventsRead broadcast for the marked id"); + + // Unread is now 0. + let log = req(&mut s, 6, Cmd::EventLog { limit: None }).await; + assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 0); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn event_log_persists_across_daemon_restart() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let state_path = dir.join("state.json"); + let sock = dir.join("sock"); + + let event_id: u64; + let ws_id: String; + + // ── Instance A ──────────────────────────────────────────────────────── + { + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); + let event_store = make_event_store(&dir); + let sock2 = sock.clone(); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); + wait_for_socket(&sock).await; + + let mut s = UnixStream::connect(&sock).await.unwrap(); + + // Open workspace, spawn surface. + let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + ws_id = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + + let r = req(&mut s, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws_id.clone()), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "sleep 5".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + // Drive an Error state → one unread event is logged. + let _ = req(&mut s, 3, Cmd::SetState { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + state: spacesh_proto::status::SurfaceState::Error, + }).await; + + // Query and assert unread == 1 before restart. + let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await; + let data = res_data(&log); + assert_eq!(data["unread"].as_u64().unwrap(), 1, "instance A: expected 1 unread event"); + assert_eq!(data["events"][0]["kind"].as_str().unwrap(), "error"); + assert_eq!(data["events"][0]["workspace_id"].as_str().unwrap(), ws_id); + event_id = data["events"][0]["id"].as_u64().unwrap(); + + // Wait comfortably longer than the 500 ms debounce so events.json is flushed. + tokio::time::sleep(tokio::time::Duration::from_millis(900)).await; + // Drop `s` (and instance A's task) by falling out of scope. + } + + // ── Instance B (same dir, fresh socket path) ────────────────────────── + let sock_b = dir.join("sock2"); + let store_b: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); + let event_store_b = make_event_store(&dir); + let sb2 = sock_b.clone(); + tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b).await; }); + wait_for_socket(&sb2).await; + + let mut s2 = UnixStream::connect(&sb2).await.unwrap(); + + // Query event log on instance B — the persisted event must survive the restart. + let log = req(&mut s2, 1, Cmd::EventLog { limit: None }).await; + let data = res_data(&log); + assert_eq!(data["unread"].as_u64().unwrap(), 1, + "instance B: event log unread count must survive cold restart"); + assert_eq!(data["events"][0]["id"].as_u64().unwrap(), event_id, + "instance B: event id must match"); + assert_eq!(data["events"][0]["kind"].as_str().unwrap(), "error", + "instance B: event kind must be 'error'"); + assert_eq!(data["events"][0]["workspace_id"].as_str().unwrap(), ws_id, + "instance B: workspace_id must match"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn focus_marks_surface_events_read() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + let sock_for_task = sock.clone(); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + wait_for_socket(&sock).await; + + // Observer connection. + let mut obs = UnixStream::connect(&sock).await.unwrap(); + // Control connection. + let mut s = UnixStream::connect(&sock).await.unwrap(); + + let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut s, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "sleep 5".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + // Drive an Error event for this surface. + let _ = req(&mut s, 3, Cmd::SetState { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + state: spacesh_proto::status::SurfaceState::Error, + }).await; + + // Verify unread == 1 before Focus. + let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await; + assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 1); + let first_id = res_data(&log)["events"][0]["id"].as_u64().unwrap(); + + // Focus that surface — should mark its events read. + let _ = req(&mut s, 5, Cmd::Focus { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + }).await; + + // Observer should receive EventsRead with the id from the Error event. + let mut saw_read = false; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(env))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut obs)).await { + if let Envelope::Evt(Evt::EventsRead { ids }) = env { + if ids.contains(&first_id) { saw_read = true; break; } + } + } + } + assert!(saw_read, "expected an EventsRead broadcast after Focus"); + + // Unread drops to 0. + let log = req(&mut s, 6, Cmd::EventLog { limit: None }).await; + assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 0); + } }