diff --git a/crates/spaceshd/src/persist.rs b/crates/spaceshd/src/persist.rs new file mode 100644 index 0000000..3e6278f --- /dev/null +++ b/crates/spaceshd/src/persist.rs @@ -0,0 +1,99 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio::sync::mpsc; +use tokio::time::{Duration, Instant}; +use crate::state_store::{PersistState, StateStore}; + +/// Debounce window: coalesce a burst of dirty signals into one save. +const DEBOUNCE: Duration = Duration::from_millis(500); + +/// A handle the registry uses to request a persist. `mark_dirty(state)` records +/// the latest snapshot and (re)arms the debounce timer. +#[derive(Clone)] +pub struct Persister { + tx: mpsc::Sender, +} + +impl Persister { + pub fn mark_dirty(&self, state: PersistState) { + // Best-effort; dropping a snapshot is fine because a newer one will arrive. + let _ = self.tx.try_send(state); + } +} + +/// Spawn the debounce task. Returns the `Persister` handle. +/// `debounce` is configurable so tests can use a short window. +pub fn spawn(store: Arc, debounce: Duration) -> Persister { + let (tx, mut rx) = mpsc::channel::(64); + tokio::spawn(async move { + let mut latest: Option = None; + let mut deadline: Option = None; + loop { + let timer = async { + match deadline { + Some(d) => tokio::time::sleep_until(d).await, + None => std::future::pending::<()>().await, + } + }; + tokio::select! { + msg = rx.recv() => { + match msg { + Some(state) => { + latest = Some(state); + deadline = Some(Instant::now() + debounce); + } + None => { + // channel closed: final flush then exit + if let Some(s) = latest.take() { let _ = store.save(&s); } + break; + } + } + } + _ = timer => { + if let Some(s) = latest.take() { let _ = store.save(&s); } + deadline = None; + } + } + } + }); + Persister { tx } +} + +#[allow(dead_code)] +fn _unused(_c: &AtomicUsize) {} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + + struct CountingStore { + saves: AtomicUsize, + last: Mutex>, + } + impl StateStore for CountingStore { + fn load(&self) -> anyhow::Result { Ok(PersistState::default()) } + fn save(&self, state: &PersistState) -> anyhow::Result<()> { + self.saves.fetch_add(1, Ordering::SeqCst); + *self.last.lock().unwrap() = Some(state.clone()); + Ok(()) + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn burst_coalesces_to_one_save() { + let store = Arc::new(CountingStore { saves: AtomicUsize::new(0), last: Mutex::new(None) }); + let p = spawn(store.clone(), Duration::from_millis(80)); + // Fire several dirty signals rapidly. + for v in 1..=5u32 { + let mut s = PersistState::default(); + s.version = v; + p.mark_dirty(s); + tokio::time::sleep(Duration::from_millis(10)).await; + } + // Wait past the debounce window. + tokio::time::sleep(Duration::from_millis(200)).await; + assert_eq!(store.saves.load(Ordering::SeqCst), 1, "burst should coalesce to one save"); + assert_eq!(store.last.lock().unwrap().as_ref().unwrap().version, 5, "save uses the latest snapshot"); + } +}