feat(daemon): snapshot writer task (Save/Remove over one channel)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-15 15:49:29 +07:00
parent 0674872c9d
commit 69f2e73832
+39
View File
@@ -59,6 +59,20 @@ impl SnapshotStore for JsonSnapshotStore {
}
}
/// Spawn the writer task; returns the sender used by the router and actors.
pub fn spawn_writer(store: std::sync::Arc<dyn SnapshotStore>) -> tokio::sync::mpsc::UnboundedSender<SnapshotMsg> {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<SnapshotMsg>();
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
match msg {
SnapshotMsg::Save(sid, snap) => store.save(&sid, &snap),
SnapshotMsg::Remove(sid) => store.remove(&sid),
}
}
});
tx
}
#[cfg(test)]
mod tests {
use super::*;
@@ -120,4 +134,29 @@ mod tests {
assert_eq!(store.load(&sid), None);
store.remove(&sid);
}
#[tokio::test]
async fn writer_saves_and_removes() {
let dir = tmp_dir("writer");
let store: std::sync::Arc<dyn SnapshotStore> = std::sync::Arc::new(JsonSnapshotStore::new(dir.clone()));
let tx = spawn_writer(store.clone());
let sid = SurfaceId("s_w".into());
tx.send(SnapshotMsg::Save(sid.clone(), sample())).unwrap();
let mut saved = None;
for _ in 0..50 {
if let Some(s) = store.load(&sid) { saved = Some(s); break; }
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
assert_eq!(saved, Some(sample()));
tx.send(SnapshotMsg::Remove(sid.clone())).unwrap();
let mut gone = false;
for _ in 0..50 {
if store.load(&sid).is_none() { gone = true; break; }
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
assert!(gone, "writer should have removed the snapshot file");
let _ = std::fs::remove_dir_all(dir);
}
}