feat(daemon): JsonEventStore + debounced EventPersister
Adds disk persistence for the event log: atomic temp+fsync+rename write, corrupt-file backup, and a debounced coalescing persister task mirroring state_store/persist patterns. 4 tests: round-trip, missing, corrupt, burst. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,200 @@
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use anyhow::Result;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::{Duration, Instant};
|
||||
use crate::event_log::EventLogState;
|
||||
|
||||
pub trait EventStore: Send + Sync {
|
||||
fn load(&self) -> Result<EventLogState>;
|
||||
fn save(&self, state: &EventLogState) -> Result<()>;
|
||||
}
|
||||
|
||||
/// JSON file store with atomic write (temp + fsync + rename) and corrupt backup.
|
||||
pub struct JsonEventStore {
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl JsonEventStore {
|
||||
pub fn new(path: PathBuf) -> Self {
|
||||
Self { path }
|
||||
}
|
||||
|
||||
fn backup_corrupt(&self, ts: u128) {
|
||||
let bak = self.path.with_extension(format!("corrupt-{ts}"));
|
||||
let _ = std::fs::rename(&self.path, bak);
|
||||
}
|
||||
}
|
||||
|
||||
impl EventStore for JsonEventStore {
|
||||
fn load(&self) -> Result<EventLogState> {
|
||||
if !self.path.exists() {
|
||||
return Ok(EventLogState { version: 1, next_id: 1, records: vec![] });
|
||||
}
|
||||
let bytes = std::fs::read(&self.path)?;
|
||||
match serde_json::from_slice::<EventLogState>(&bytes) {
|
||||
Ok(state) => Ok(state),
|
||||
Err(_) => {
|
||||
let ts = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_nanos())
|
||||
.unwrap_or(0);
|
||||
self.backup_corrupt(ts);
|
||||
Ok(EventLogState { version: 1, next_id: 1, records: vec![] })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn save(&self, state: &EventLogState) -> Result<()> {
|
||||
if let Some(parent) = self.path.parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
}
|
||||
let tmp = self.path.with_extension("json.tmp");
|
||||
let bytes = serde_json::to_vec_pretty(state)?;
|
||||
std::fs::write(&tmp, &bytes)?;
|
||||
let f = std::fs::File::open(&tmp)?;
|
||||
// fsync the temp file before rename for durability.
|
||||
f.sync_all()?;
|
||||
std::fs::rename(&tmp, &self.path)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle the recorder uses to request a debounced persist.
|
||||
#[derive(Clone)]
|
||||
pub struct EventPersister {
|
||||
tx: mpsc::Sender<EventLogState>,
|
||||
}
|
||||
|
||||
impl EventPersister {
|
||||
pub fn mark_dirty(&self, state: EventLogState) {
|
||||
// Best-effort; dropping a snapshot is fine because a newer one will arrive.
|
||||
let _ = self.tx.try_send(state);
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the debounce task; coalesces a burst into one save.
|
||||
pub fn spawn(store: Arc<dyn EventStore>, debounce: Duration) -> EventPersister {
|
||||
let (tx, mut rx) = mpsc::channel::<EventLogState>(64);
|
||||
tokio::spawn(async move {
|
||||
let mut latest: Option<EventLogState> = None;
|
||||
let mut deadline: Option<Instant> = None;
|
||||
loop {
|
||||
let timer = async {
|
||||
match deadline {
|
||||
Some(d) => tokio::time::sleep_until(d).await,
|
||||
None => std::future::pending::<()>().await,
|
||||
}
|
||||
};
|
||||
tokio::select! {
|
||||
msg = rx.recv() => {
|
||||
match msg {
|
||||
Some(state) => {
|
||||
latest = Some(state);
|
||||
deadline = Some(Instant::now() + debounce);
|
||||
}
|
||||
None => {
|
||||
// channel closed: final flush then exit
|
||||
if let Some(s) = latest.take() { let _ = store.save(&s); }
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = timer => {
|
||||
if let Some(s) = latest.take() { let _ = store.save(&s); }
|
||||
deadline = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
EventPersister { tx }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use spacesh_proto::event::{EventKind, EventRecord};
|
||||
use spacesh_proto::ids::{SurfaceId, WorkspaceId};
|
||||
|
||||
fn tmp_file(name: &str) -> PathBuf {
|
||||
let mut p = std::env::temp_dir();
|
||||
let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos();
|
||||
p.push(format!("spacesh-events-{name}-{n}.json"));
|
||||
p
|
||||
}
|
||||
|
||||
fn sample() -> EventLogState {
|
||||
EventLogState {
|
||||
version: 1,
|
||||
next_id: 2,
|
||||
records: vec![EventRecord {
|
||||
id: 1,
|
||||
surface_id: SurfaceId("s_1".into()),
|
||||
workspace_id: WorkspaceId("w_1".into()),
|
||||
workspace_name: "infra".into(),
|
||||
agent_label: Some("claude".into()),
|
||||
kind: EventKind::Done,
|
||||
ts: 1,
|
||||
read: false,
|
||||
}],
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn save_then_load_round_trips() {
|
||||
let path = tmp_file("roundtrip");
|
||||
let store = JsonEventStore::new(path.clone());
|
||||
store.save(&sample()).unwrap();
|
||||
assert_eq!(store.load().unwrap(), sample());
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn missing_file_loads_empty() {
|
||||
let store = JsonEventStore::new(tmp_file("missing"));
|
||||
let s = store.load().unwrap();
|
||||
assert_eq!(s.next_id, 1);
|
||||
assert!(s.records.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn corrupt_file_is_backed_up_and_load_returns_empty() {
|
||||
let path = tmp_file("corrupt");
|
||||
std::fs::write(&path, b"{ not valid json").unwrap();
|
||||
let store = JsonEventStore::new(path.clone());
|
||||
let s = store.load().unwrap();
|
||||
assert!(s.records.is_empty());
|
||||
assert!(!path.exists());
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn burst_coalesces_to_one_save() {
|
||||
struct Counting {
|
||||
saves: std::sync::atomic::AtomicUsize,
|
||||
last: std::sync::Mutex<Option<EventLogState>>,
|
||||
}
|
||||
impl EventStore for Counting {
|
||||
fn load(&self) -> Result<EventLogState> { Ok(EventLogState::default()) }
|
||||
fn save(&self, s: &EventLogState) -> Result<()> {
|
||||
self.saves.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
*self.last.lock().unwrap() = Some(s.clone());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
let store = Arc::new(Counting {
|
||||
saves: std::sync::atomic::AtomicUsize::new(0),
|
||||
last: std::sync::Mutex::new(None),
|
||||
});
|
||||
let p = spawn(store.clone(), Duration::from_millis(80));
|
||||
for v in 1..=5u64 {
|
||||
let mut s = EventLogState::default();
|
||||
s.next_id = v;
|
||||
p.mark_dirty(s);
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
assert_eq!(store.saves.load(std::sync::atomic::Ordering::SeqCst), 1, "burst should coalesce to one save");
|
||||
assert_eq!(store.last.lock().unwrap().as_ref().unwrap().next_id, 5, "save uses the latest snapshot");
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
mod event_log;
|
||||
mod event_store;
|
||||
mod hooks;
|
||||
mod launchd;
|
||||
mod lifecycle;
|
||||
|
||||
Reference in New Issue
Block a user