From 56ba1723b9fbaeb8da91607f46343b6ac5b0b292 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Wed, 10 Jun 2026 07:17:29 +0700 Subject: [PATCH] feat(daemon): EventLog ring model with read-flags Add event_log.rs with EventLog (capped VecDeque, monotonic ids, mark_read, snapshot/restore) and EventLogState. Register mod in main.rs. Stub Cmd::EventLog and Cmd::MarkRead arms in server.rs to keep the exhaustive match compiling; full wiring follows in Task 4. Co-Authored-By: Claude Sonnet 4.6 --- crates/spaceshd/src/event_log.rs | 211 +++++++++++++++++++++++++++++++ crates/spaceshd/src/main.rs | 1 + crates/spaceshd/src/server.rs | 12 ++ 3 files changed, 224 insertions(+) create mode 100644 crates/spaceshd/src/event_log.rs diff --git a/crates/spaceshd/src/event_log.rs b/crates/spaceshd/src/event_log.rs new file mode 100644 index 0000000..4c5906b --- /dev/null +++ b/crates/spaceshd/src/event_log.rs @@ -0,0 +1,211 @@ +use std::collections::VecDeque; +use serde::{Deserialize, Serialize}; +use spacesh_proto::event::{EventKind, EventRecord, MarkReadTarget}; +use spacesh_proto::ids::{SurfaceId, WorkspaceId}; + +const SNAPSHOT_VERSION: u32 = 1; + +/// Serializable form of the log, used for persistence. +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +pub struct EventLogState { + pub version: u32, + pub next_id: u64, + #[serde(default)] + pub records: Vec, +} + +/// In-memory event log: a capped ring with monotonic ids. +pub struct EventLog { + records: VecDeque, + next_id: u64, + cap: usize, +} + +impl EventLog { + pub fn new(cap: usize) -> Self { + Self { records: VecDeque::new(), next_id: 1, cap } + } + + /// Rebuild from a persisted snapshot, clamping to `cap` (keeping newest). + pub fn restore(state: EventLogState, cap: usize) -> Self { + let mut records: VecDeque = state.records.into_iter().collect(); + while records.len() > cap { + records.pop_front(); + } + let max_record_id = records.iter().map(|r| r.id).max().unwrap_or(0); + let next_id = state.next_id.max(max_record_id + 1).max(1); + Self { records, next_id, cap } + } + + /// Append a new event. Evicts the oldest when over capacity. Returns the + /// stored record (with its assigned id) for broadcasting. + #[allow(clippy::too_many_arguments)] + pub fn record( + &mut self, + surface_id: SurfaceId, + workspace_id: WorkspaceId, + workspace_name: String, + agent_label: Option, + kind: EventKind, + ts: u64, + ) -> EventRecord { + let rec = EventRecord { + id: self.next_id, + surface_id, + workspace_id, + workspace_name, + agent_label, + kind, + ts, + read: false, + }; + self.next_id += 1; + self.records.push_back(rec.clone()); + while self.records.len() > self.cap { + self.records.pop_front(); + } + rec + } + + /// Flip matching records to read. Returns the ids that actually changed. + pub fn mark_read(&mut self, target: &MarkReadTarget) -> Vec { + let mut changed = Vec::new(); + for r in self.records.iter_mut() { + if r.read { + continue; + } + let hit = match target { + MarkReadTarget::All => true, + MarkReadTarget::Ids(ids) => ids.contains(&r.id), + MarkReadTarget::Surface(sid) => &r.surface_id == sid, + }; + if hit { + r.read = true; + changed.push(r.id); + } + } + changed + } + + pub fn unread_count(&self) -> u32 { + self.records.iter().filter(|r| !r.read).count() as u32 + } + + /// Most-recent-first, optionally capped to `limit`. + pub fn recent(&self, limit: Option) -> Vec { + let iter = self.records.iter().rev().cloned(); + match limit { + Some(n) => iter.take(n as usize).collect(), + None => iter.collect(), + } + } + + pub fn snapshot(&self) -> EventLogState { + EventLogState { + version: SNAPSHOT_VERSION, + next_id: self.next_id, + records: self.records.iter().cloned().collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn rec(log: &mut EventLog, sid: &str, kind: EventKind) -> EventRecord { + log.record( + SurfaceId(sid.into()), + WorkspaceId("w_1".into()), + "infra".into(), + Some("claude".into()), + kind, + 1, + ) + } + + #[test] + fn record_assigns_monotonic_ids() { + let mut log = EventLog::new(10); + assert_eq!(rec(&mut log, "s_1", EventKind::Done).id, 1); + assert_eq!(rec(&mut log, "s_1", EventKind::Wait).id, 2); + } + + #[test] + fn push_beyond_cap_evicts_oldest() { + let mut log = EventLog::new(2); + rec(&mut log, "s_1", EventKind::Done); // id 1 + rec(&mut log, "s_2", EventKind::Done); // id 2 + rec(&mut log, "s_3", EventKind::Done); // id 3, evicts id 1 + let ids: Vec = log.recent(None).iter().map(|r| r.id).collect(); + assert_eq!(ids, vec![3, 2]); // newest first, id 1 gone + } + + #[test] + fn mark_read_by_surface_then_ids_then_all() { + let mut log = EventLog::new(10); + rec(&mut log, "s_1", EventKind::Done); // 1 + rec(&mut log, "s_2", EventKind::Error); // 2 + rec(&mut log, "s_1", EventKind::Wait); // 3 + assert_eq!(log.unread_count(), 3); + + let changed = log.mark_read(&MarkReadTarget::Surface(SurfaceId("s_1".into()))); + assert_eq!(changed, vec![1, 3]); + assert_eq!(log.unread_count(), 1); + + // Re-marking the same surface changes nothing. + assert!(log.mark_read(&MarkReadTarget::Surface(SurfaceId("s_1".into()))).is_empty()); + + let changed = log.mark_read(&MarkReadTarget::Ids(vec![2, 999])); + assert_eq!(changed, vec![2]); + assert_eq!(log.unread_count(), 0); + + assert!(log.mark_read(&MarkReadTarget::All).is_empty()); + } + + #[test] + fn snapshot_restore_preserves_next_id_and_records() { + let mut log = EventLog::new(10); + rec(&mut log, "s_1", EventKind::Done); + rec(&mut log, "s_2", EventKind::Done); + let snap = log.snapshot(); + assert_eq!(snap.next_id, 3); + + let restored = EventLog::restore(snap, 10); + assert_eq!(restored.recent(None).len(), 2); + // Next recorded id continues from 3, no reuse. + let mut restored = restored; + assert_eq!(rec(&mut restored, "s_3", EventKind::Done).id, 3); + } + + #[test] + fn restore_clamps_to_cap_keeping_newest() { + let state = EventLogState { + version: 1, + next_id: 4, + records: vec![ + EventRecord { id: 1, surface_id: SurfaceId("a".into()), workspace_id: WorkspaceId("w".into()), workspace_name: "x".into(), agent_label: None, kind: EventKind::Done, ts: 1, read: false }, + EventRecord { id: 2, surface_id: SurfaceId("a".into()), workspace_id: WorkspaceId("w".into()), workspace_name: "x".into(), agent_label: None, kind: EventKind::Done, ts: 1, read: false }, + EventRecord { id: 3, surface_id: SurfaceId("a".into()), workspace_id: WorkspaceId("w".into()), workspace_name: "x".into(), agent_label: None, kind: EventKind::Done, ts: 1, read: false }, + ], + }; + let log = EventLog::restore(state, 2); + let ids: Vec = log.recent(None).iter().map(|r| r.id).collect(); + assert_eq!(ids, vec![3, 2]); + } + + #[test] + fn restore_reconciles_next_id_against_records() { + // Snapshot claims next_id=1 but already holds id=5 → next must jump past 5. + let state = EventLogState { + version: 1, + next_id: 1, + records: vec![EventRecord { + id: 5, surface_id: SurfaceId("a".into()), workspace_id: WorkspaceId("w".into()), + workspace_name: "x".into(), agent_label: None, kind: EventKind::Done, ts: 1, read: false, + }], + }; + let mut log = EventLog::restore(state, 10); + assert_eq!(rec(&mut log, "s_1", EventKind::Done).id, 6); + } +} diff --git a/crates/spaceshd/src/main.rs b/crates/spaceshd/src/main.rs index 3715507..44997e8 100644 --- a/crates/spaceshd/src/main.rs +++ b/crates/spaceshd/src/main.rs @@ -1,3 +1,4 @@ +mod event_log; mod hooks; mod launchd; mod lifecycle; diff --git a/crates/spaceshd/src/server.rs b/crates/spaceshd/src/server.rs index 062aac5..a062c93 100644 --- a/crates/spaceshd/src/server.rs +++ b/crates/spaceshd/src/server.rs @@ -526,6 +526,18 @@ async fn handle_request( let _ = out.send(ok(id, serde_json::json!({ "groups": groups, "workspaces": workspaces }))).await; } + Cmd::EventLog { limit } => { + // TODO(SP2-T4): wire to EventLog once the shared state is plumbed in. + let _ = limit; + let _ = out.send(err(id, "NOT_IMPLEMENTED", "event log not yet wired")).await; + } + + Cmd::MarkRead { target } => { + // TODO(SP2-T4): wire to EventLog once the shared state is plumbed in. + let _ = target; + let _ = out.send(err(id, "NOT_IMPLEMENTED", "mark_read not yet wired")).await; + } + Cmd::Shutdown => { let _ = out.send(ok(id, serde_json::Value::Null)).await; std::process::exit(0);