diff --git a/crates/spaceshd/src/snapshot_store.rs b/crates/spaceshd/src/snapshot_store.rs index fdacf9d..137c630 100644 --- a/crates/spaceshd/src/snapshot_store.rs +++ b/crates/spaceshd/src/snapshot_store.rs @@ -59,6 +59,20 @@ impl SnapshotStore for JsonSnapshotStore { } } +/// Spawn the writer task; returns the sender used by the router and actors. +pub fn spawn_writer(store: std::sync::Arc) -> tokio::sync::mpsc::UnboundedSender { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + match msg { + SnapshotMsg::Save(sid, snap) => store.save(&sid, &snap), + SnapshotMsg::Remove(sid) => store.remove(&sid), + } + } + }); + tx +} + #[cfg(test)] mod tests { use super::*; @@ -120,4 +134,29 @@ mod tests { assert_eq!(store.load(&sid), None); store.remove(&sid); } + + #[tokio::test] + async fn writer_saves_and_removes() { + let dir = tmp_dir("writer"); + let store: std::sync::Arc = std::sync::Arc::new(JsonSnapshotStore::new(dir.clone())); + let tx = spawn_writer(store.clone()); + let sid = SurfaceId("s_w".into()); + + tx.send(SnapshotMsg::Save(sid.clone(), sample())).unwrap(); + let mut saved = None; + for _ in 0..50 { + if let Some(s) = store.load(&sid) { saved = Some(s); break; } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + assert_eq!(saved, Some(sample())); + + tx.send(SnapshotMsg::Remove(sid.clone())).unwrap(); + let mut gone = false; + for _ in 0..50 { + if store.load(&sid).is_none() { gone = true; break; } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + assert!(gone, "writer should have removed the snapshot file"); + let _ = std::fs::remove_dir_all(dir); + } }