use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; use crate::state_store::{PersistState, StateStore}; /// A handle the registry uses to request a persist. `mark_dirty(state)` records /// the latest snapshot and (re)arms the debounce timer. #[derive(Clone)] pub struct Persister { tx: mpsc::Sender, } impl Persister { pub fn mark_dirty(&self, state: PersistState) { // Best-effort; dropping a snapshot is fine because a newer one will arrive. let _ = self.tx.try_send(state); } } /// Spawn the debounce task. Returns the `Persister` handle. /// `debounce` is configurable so tests can use a short window. pub fn spawn(store: Arc, debounce: Duration) -> Persister { let (tx, mut rx) = mpsc::channel::(64); tokio::spawn(async move { let mut latest: Option = None; let mut deadline: Option = None; loop { let timer = async { match deadline { Some(d) => tokio::time::sleep_until(d).await, None => std::future::pending::<()>().await, } }; tokio::select! { msg = rx.recv() => { match msg { Some(state) => { latest = Some(state); deadline = Some(Instant::now() + debounce); } None => { // channel closed: final flush then exit if let Some(s) = latest.take() { let _ = store.save(&s); } break; } } } _ = timer => { if let Some(s) = latest.take() { let _ = store.save(&s); } deadline = None; } } } }); Persister { tx } } #[cfg(test)] mod tests { use super::*; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; struct CountingStore { saves: AtomicUsize, last: Mutex>, } impl StateStore for CountingStore { fn load(&self) -> anyhow::Result { Ok(PersistState::default()) } fn save(&self, state: &PersistState) -> anyhow::Result<()> { self.saves.fetch_add(1, Ordering::SeqCst); *self.last.lock().unwrap() = Some(state.clone()); Ok(()) } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn burst_coalesces_to_one_save() { let store = Arc::new(CountingStore { saves: AtomicUsize::new(0), last: Mutex::new(None) }); let p = spawn(store.clone(), Duration::from_millis(80)); // Fire several dirty signals rapidly. for v in 1..=5u32 { let mut s = PersistState::default(); s.version = v; p.mark_dirty(s); tokio::time::sleep(Duration::from_millis(10)).await; } // Wait past the debounce window. tokio::time::sleep(Duration::from_millis(200)).await; assert_eq!(store.saves.load(Ordering::SeqCst), 1, "burst should coalesce to one save"); assert_eq!(store.last.lock().unwrap().as_ref().unwrap().version, 5, "save uses the latest snapshot"); } }