diff --git a/crates/spaceshd/src/event_store.rs b/crates/spaceshd/src/event_store.rs new file mode 100644 index 0000000..202f7fe --- /dev/null +++ b/crates/spaceshd/src/event_store.rs @@ -0,0 +1,200 @@ +use std::path::PathBuf; +use std::sync::Arc; +use anyhow::Result; +use tokio::sync::mpsc; +use tokio::time::{Duration, Instant}; +use crate::event_log::EventLogState; + +pub trait EventStore: Send + Sync { + fn load(&self) -> Result; + fn save(&self, state: &EventLogState) -> Result<()>; +} + +/// JSON file store with atomic write (temp + fsync + rename) and corrupt backup. +pub struct JsonEventStore { + path: PathBuf, +} + +impl JsonEventStore { + pub fn new(path: PathBuf) -> Self { + Self { path } + } + + fn backup_corrupt(&self, ts: u128) { + let bak = self.path.with_extension(format!("corrupt-{ts}")); + let _ = std::fs::rename(&self.path, bak); + } +} + +impl EventStore for JsonEventStore { + fn load(&self) -> Result { + if !self.path.exists() { + return Ok(EventLogState { version: 1, next_id: 1, records: vec![] }); + } + let bytes = std::fs::read(&self.path)?; + match serde_json::from_slice::(&bytes) { + Ok(state) => Ok(state), + Err(_) => { + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or(0); + self.backup_corrupt(ts); + Ok(EventLogState { version: 1, next_id: 1, records: vec![] }) + } + } + } + + fn save(&self, state: &EventLogState) -> Result<()> { + if let Some(parent) = self.path.parent() { + std::fs::create_dir_all(parent)?; + } + let tmp = self.path.with_extension("json.tmp"); + let bytes = serde_json::to_vec_pretty(state)?; + std::fs::write(&tmp, &bytes)?; + let f = std::fs::File::open(&tmp)?; + // fsync the temp file before rename for durability. + f.sync_all()?; + std::fs::rename(&tmp, &self.path)?; + Ok(()) + } +} + +/// Handle the recorder uses to request a debounced persist. +#[derive(Clone)] +pub struct EventPersister { + tx: mpsc::Sender, +} + +impl EventPersister { + pub fn mark_dirty(&self, state: EventLogState) { + // Best-effort; dropping a snapshot is fine because a newer one will arrive. + let _ = self.tx.try_send(state); + } +} + +/// Spawn the debounce task; coalesces a burst into one save. +pub fn spawn(store: Arc, debounce: Duration) -> EventPersister { + let (tx, mut rx) = mpsc::channel::(64); + tokio::spawn(async move { + let mut latest: Option = None; + let mut deadline: Option = None; + loop { + let timer = async { + match deadline { + Some(d) => tokio::time::sleep_until(d).await, + None => std::future::pending::<()>().await, + } + }; + tokio::select! { + msg = rx.recv() => { + match msg { + Some(state) => { + latest = Some(state); + deadline = Some(Instant::now() + debounce); + } + None => { + // channel closed: final flush then exit + if let Some(s) = latest.take() { let _ = store.save(&s); } + break; + } + } + } + _ = timer => { + if let Some(s) = latest.take() { let _ = store.save(&s); } + deadline = None; + } + } + } + }); + EventPersister { tx } +} + +#[cfg(test)] +mod tests { + use super::*; + use spacesh_proto::event::{EventKind, EventRecord}; + use spacesh_proto::ids::{SurfaceId, WorkspaceId}; + + fn tmp_file(name: &str) -> PathBuf { + let mut p = std::env::temp_dir(); + let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos(); + p.push(format!("spacesh-events-{name}-{n}.json")); + p + } + + fn sample() -> EventLogState { + EventLogState { + version: 1, + next_id: 2, + records: vec![EventRecord { + id: 1, + surface_id: SurfaceId("s_1".into()), + workspace_id: WorkspaceId("w_1".into()), + workspace_name: "infra".into(), + agent_label: Some("claude".into()), + kind: EventKind::Done, + ts: 1, + read: false, + }], + } + } + + #[test] + fn save_then_load_round_trips() { + let path = tmp_file("roundtrip"); + let store = JsonEventStore::new(path.clone()); + store.save(&sample()).unwrap(); + assert_eq!(store.load().unwrap(), sample()); + let _ = std::fs::remove_file(path); + } + + #[test] + fn missing_file_loads_empty() { + let store = JsonEventStore::new(tmp_file("missing")); + let s = store.load().unwrap(); + assert_eq!(s.next_id, 1); + assert!(s.records.is_empty()); + } + + #[test] + fn corrupt_file_is_backed_up_and_load_returns_empty() { + let path = tmp_file("corrupt"); + std::fs::write(&path, b"{ not valid json").unwrap(); + let store = JsonEventStore::new(path.clone()); + let s = store.load().unwrap(); + assert!(s.records.is_empty()); + assert!(!path.exists()); + let _ = std::fs::remove_file(path); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn burst_coalesces_to_one_save() { + struct Counting { + saves: std::sync::atomic::AtomicUsize, + last: std::sync::Mutex>, + } + impl EventStore for Counting { + fn load(&self) -> Result { Ok(EventLogState::default()) } + fn save(&self, s: &EventLogState) -> Result<()> { + self.saves.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + *self.last.lock().unwrap() = Some(s.clone()); + Ok(()) + } + } + let store = Arc::new(Counting { + saves: std::sync::atomic::AtomicUsize::new(0), + last: std::sync::Mutex::new(None), + }); + let p = spawn(store.clone(), Duration::from_millis(80)); + for v in 1..=5u64 { + let mut s = EventLogState::default(); + s.next_id = v; + p.mark_dirty(s); + tokio::time::sleep(Duration::from_millis(10)).await; + } + tokio::time::sleep(Duration::from_millis(200)).await; + assert_eq!(store.saves.load(std::sync::atomic::Ordering::SeqCst), 1, "burst should coalesce to one save"); + assert_eq!(store.last.lock().unwrap().as_ref().unwrap().next_id, 5, "save uses the latest snapshot"); + } +} diff --git a/crates/spaceshd/src/main.rs b/crates/spaceshd/src/main.rs index 44997e8..80384de 100644 --- a/crates/spaceshd/src/main.rs +++ b/crates/spaceshd/src/main.rs @@ -1,4 +1,5 @@ mod event_log; +mod event_store; mod hooks; mod launchd; mod lifecycle;