feat(daemon): debounced persist scheduler coalescing bursts into one save

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 21:20:58 +07:00
parent 4f7ed2a5a3
commit 7515516699
+99
View File
@@ -0,0 +1,99 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
use crate::state_store::{PersistState, StateStore};
/// Debounce window: coalesce a burst of dirty signals into one save.
const DEBOUNCE: Duration = Duration::from_millis(500);
/// A handle the registry uses to request a persist. `mark_dirty(state)` records
/// the latest snapshot and (re)arms the debounce timer.
#[derive(Clone)]
pub struct Persister {
tx: mpsc::Sender<PersistState>,
}
impl Persister {
pub fn mark_dirty(&self, state: PersistState) {
// Best-effort; dropping a snapshot is fine because a newer one will arrive.
let _ = self.tx.try_send(state);
}
}
/// Spawn the debounce task. Returns the `Persister` handle.
/// `debounce` is configurable so tests can use a short window.
pub fn spawn(store: Arc<dyn StateStore>, debounce: Duration) -> Persister {
let (tx, mut rx) = mpsc::channel::<PersistState>(64);
tokio::spawn(async move {
let mut latest: Option<PersistState> = None;
let mut deadline: Option<Instant> = None;
loop {
let timer = async {
match deadline {
Some(d) => tokio::time::sleep_until(d).await,
None => std::future::pending::<()>().await,
}
};
tokio::select! {
msg = rx.recv() => {
match msg {
Some(state) => {
latest = Some(state);
deadline = Some(Instant::now() + debounce);
}
None => {
// channel closed: final flush then exit
if let Some(s) = latest.take() { let _ = store.save(&s); }
break;
}
}
}
_ = timer => {
if let Some(s) = latest.take() { let _ = store.save(&s); }
deadline = None;
}
}
}
});
Persister { tx }
}
#[allow(dead_code)]
fn _unused(_c: &AtomicUsize) {}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
struct CountingStore {
saves: AtomicUsize,
last: Mutex<Option<PersistState>>,
}
impl StateStore for CountingStore {
fn load(&self) -> anyhow::Result<PersistState> { Ok(PersistState::default()) }
fn save(&self, state: &PersistState) -> anyhow::Result<()> {
self.saves.fetch_add(1, Ordering::SeqCst);
*self.last.lock().unwrap() = Some(state.clone());
Ok(())
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn burst_coalesces_to_one_save() {
let store = Arc::new(CountingStore { saves: AtomicUsize::new(0), last: Mutex::new(None) });
let p = spawn(store.clone(), Duration::from_millis(80));
// Fire several dirty signals rapidly.
for v in 1..=5u32 {
let mut s = PersistState::default();
s.version = v;
p.mark_dirty(s);
tokio::time::sleep(Duration::from_millis(10)).await;
}
// Wait past the debounce window.
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(store.saves.load(Ordering::SeqCst), 1, "burst should coalesce to one save");
assert_eq!(store.last.lock().unwrap().as_ref().unwrap().version, 5, "save uses the latest snapshot");
}
}