From f7bf9fc5c682ce70cd29cbcb7b140a2b3df28b49 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Wed, 10 Jun 2026 07:31:40 +0700 Subject: [PATCH] wire(daemon): thread EventLog + EventPersister through serve/router Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/spaceshd/src/main.rs | 5 +++- crates/spaceshd/src/server.rs | 48 +++++++++++++++++++++++++++-------- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/crates/spaceshd/src/main.rs b/crates/spaceshd/src/main.rs index 80384de..054364c 100644 --- a/crates/spaceshd/src/main.rs +++ b/crates/spaceshd/src/main.rs @@ -56,6 +56,9 @@ async fn run_daemon() -> Result<()> { let state_path = lifecycle::spacesh_dir()?.join("state.json"); let store: std::sync::Arc = std::sync::Arc::new(state_store::JsonStateStore::new(state_path)); + let events_path = lifecycle::spacesh_dir()?.join("events.json"); + let event_store: std::sync::Arc = + std::sync::Arc::new(event_store::JsonEventStore::new(events_path)); eprintln!("spaceshd listening on {}", sock.display()); - server::serve(&sock, store).await + server::serve(&sock, store, event_store).await } diff --git a/crates/spaceshd/src/server.rs b/crates/spaceshd/src/server.rs index a062c93..86b5fb9 100644 --- a/crates/spaceshd/src/server.rs +++ b/crates/spaceshd/src/server.rs @@ -9,6 +9,8 @@ use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId, WorkspaceId}; use spacesh_proto::status::SurfaceState; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{mpsc, oneshot}; +use crate::event_log::EventLog; +use crate::event_store::{self, EventPersister, EventStore}; use crate::persist::{self, Persister}; use crate::registry::Registry; use crate::state_store::StateStore; @@ -35,7 +37,7 @@ enum ServerMsg { type ClientId = u64; -pub async fn serve(socket: &Path, store: Arc) -> Result<()> { +pub async fn serve(socket: &Path, store: Arc, event_store: Arc) -> Result<()> { let listener = UnixListener::bind(socket)?; let (router_tx, router_rx) = mpsc::channel::(256); @@ -58,7 +60,12 @@ pub async fn serve(socket: &Path, store: Arc) -> Result<()> { let persister = persist::spawn(store.clone(), Duration::from_millis(500)); let initial = store.load().unwrap_or_default(); - let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx, state_tx, persister, initial)); + let event_persister = event_store::spawn(event_store.clone(), Duration::from_millis(500)); + let event_initial = event_store.load().unwrap_or_default(); + let shutdown = tokio::spawn(router( + router_rx, router_tx.clone(), exit_tx, state_tx, + persister, initial, event_persister, event_initial, + )); let mut next_client: ClientId = 0; loop { @@ -116,9 +123,14 @@ async fn router( state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, persister: Persister, initial: crate::state_store::PersistState, + event_persister: EventPersister, + event_initial: crate::event_log::EventLogState, ) { let mut reg = Registry::new(); reg.restore(initial); + let mut event_log = EventLog::restore(event_initial, 1000); + let _ = &event_persister; // consumed in Task 6/7 + let _ = &mut event_log; // consumed in Task 6/7 let mut clients: HashMap = HashMap::new(); // surface_id → set of client ids subscribed (attached). let mut subs: HashMap> = HashMap::new(); @@ -601,6 +613,12 @@ mod tests { p } + /// Build an event store whose file lives inside the per-test temp dir so it is + /// cleaned up with the rest of the test fixtures (not left in the global temp root). + fn make_event_store(dir: &Path) -> std::sync::Arc { + std::sync::Arc::new(crate::event_store::JsonEventStore::new(dir.join("events.json"))) + } + async fn wait_for_socket(sock: &Path) { for _ in 0..300 { if UnixStream::connect(sock).await.is_ok() { return; } @@ -616,9 +634,10 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -657,9 +676,10 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Input { @@ -682,9 +702,10 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; // First client: open, new surface that prints a marker, attach, then disconnect. @@ -721,8 +742,9 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -750,8 +772,9 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -791,8 +814,11 @@ mod tests { { let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); + // Both daemon instances in this test share ONE event-store file under the + // per-test dir so instance B reads from disk what instance A persisted. + let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; @@ -807,8 +833,9 @@ mod tests { let sock_b = dir.join("sock2"); let store_b: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); + let event_store_b = make_event_store(&dir); let sb2 = sock_b.clone(); - tokio::spawn(async move { let _ = serve(&sock_b, store_b).await; }); + tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b).await; }); wait_for_socket(&sb2).await; let mut s2 = UnixStream::connect(&sb2).await.unwrap(); let r = req(&mut s2, 1, Cmd::Status).await; @@ -828,8 +855,9 @@ mod tests { let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap();