wire(daemon): thread EventLog + EventPersister through serve/router
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -9,6 +9,8 @@ use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId, WorkspaceId};
|
||||
use spacesh_proto::status::SurfaceState;
|
||||
use tokio::net::{UnixListener, UnixStream};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use crate::event_log::EventLog;
|
||||
use crate::event_store::{self, EventPersister, EventStore};
|
||||
use crate::persist::{self, Persister};
|
||||
use crate::registry::Registry;
|
||||
use crate::state_store::StateStore;
|
||||
@@ -35,7 +37,7 @@ enum ServerMsg {
|
||||
|
||||
type ClientId = u64;
|
||||
|
||||
pub async fn serve(socket: &Path, store: Arc<dyn StateStore>) -> Result<()> {
|
||||
pub async fn serve(socket: &Path, store: Arc<dyn StateStore>, event_store: Arc<dyn EventStore>) -> Result<()> {
|
||||
let listener = UnixListener::bind(socket)?;
|
||||
let (router_tx, router_rx) = mpsc::channel::<ServerMsg>(256);
|
||||
|
||||
@@ -58,7 +60,12 @@ pub async fn serve(socket: &Path, store: Arc<dyn StateStore>) -> Result<()> {
|
||||
|
||||
let persister = persist::spawn(store.clone(), Duration::from_millis(500));
|
||||
let initial = store.load().unwrap_or_default();
|
||||
let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx, state_tx, persister, initial));
|
||||
let event_persister = event_store::spawn(event_store.clone(), Duration::from_millis(500));
|
||||
let event_initial = event_store.load().unwrap_or_default();
|
||||
let shutdown = tokio::spawn(router(
|
||||
router_rx, router_tx.clone(), exit_tx, state_tx,
|
||||
persister, initial, event_persister, event_initial,
|
||||
));
|
||||
|
||||
let mut next_client: ClientId = 0;
|
||||
loop {
|
||||
@@ -116,9 +123,14 @@ async fn router(
|
||||
state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
|
||||
persister: Persister,
|
||||
initial: crate::state_store::PersistState,
|
||||
event_persister: EventPersister,
|
||||
event_initial: crate::event_log::EventLogState,
|
||||
) {
|
||||
let mut reg = Registry::new();
|
||||
reg.restore(initial);
|
||||
let mut event_log = EventLog::restore(event_initial, 1000);
|
||||
let _ = &event_persister; // consumed in Task 6/7
|
||||
let _ = &mut event_log; // consumed in Task 6/7
|
||||
let mut clients: HashMap<ClientId, ClientTx> = HashMap::new();
|
||||
// surface_id → set of client ids subscribed (attached).
|
||||
let mut subs: HashMap<SurfaceId, Vec<ClientId>> = HashMap::new();
|
||||
@@ -601,6 +613,12 @@ mod tests {
|
||||
p
|
||||
}
|
||||
|
||||
/// Build an event store whose file lives inside the per-test temp dir so it is
|
||||
/// cleaned up with the rest of the test fixtures (not left in the global temp root).
|
||||
fn make_event_store(dir: &Path) -> std::sync::Arc<dyn crate::event_store::EventStore> {
|
||||
std::sync::Arc::new(crate::event_store::JsonEventStore::new(dir.join("events.json")))
|
||||
}
|
||||
|
||||
async fn wait_for_socket(sock: &Path) {
|
||||
for _ in 0..300 {
|
||||
if UnixStream::connect(sock).await.is_ok() { return; }
|
||||
@@ -616,9 +634,10 @@ mod tests {
|
||||
let sock = dir.join("sock");
|
||||
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||
let event_store = make_event_store(&dir);
|
||||
let sock_for_task = sock.clone();
|
||||
let store2 = store.clone();
|
||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; });
|
||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
||||
wait_for_socket(&sock).await;
|
||||
|
||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||
@@ -657,9 +676,10 @@ mod tests {
|
||||
let sock = dir.join("sock");
|
||||
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||
let event_store = make_event_store(&dir);
|
||||
let sock_for_task = sock.clone();
|
||||
let store2 = store.clone();
|
||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; });
|
||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
||||
wait_for_socket(&sock).await;
|
||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||
let r = req(&mut s, 1, Cmd::Input {
|
||||
@@ -682,9 +702,10 @@ mod tests {
|
||||
let sock = dir.join("sock");
|
||||
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||
let event_store = make_event_store(&dir);
|
||||
let sock_for_task = sock.clone();
|
||||
let store2 = store.clone();
|
||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; });
|
||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
||||
wait_for_socket(&sock).await;
|
||||
|
||||
// First client: open, new surface that prints a marker, attach, then disconnect.
|
||||
@@ -721,8 +742,9 @@ mod tests {
|
||||
let sock = dir.join("sock");
|
||||
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||
let event_store = make_event_store(&dir);
|
||||
let sock2 = sock.clone();
|
||||
tokio::spawn(async move { let _ = serve(&sock2, store).await; });
|
||||
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
|
||||
wait_for_socket(&sock).await;
|
||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||
|
||||
@@ -750,8 +772,9 @@ mod tests {
|
||||
let sock = dir.join("sock");
|
||||
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||
let event_store = make_event_store(&dir);
|
||||
let sock2 = sock.clone();
|
||||
tokio::spawn(async move { let _ = serve(&sock2, store).await; });
|
||||
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
|
||||
wait_for_socket(&sock).await;
|
||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||
|
||||
@@ -791,8 +814,11 @@ mod tests {
|
||||
{
|
||||
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
||||
// Both daemon instances in this test share ONE event-store file under the
|
||||
// per-test dir so instance B reads from disk what instance A persisted.
|
||||
let event_store = make_event_store(&dir);
|
||||
let sock2 = sock.clone();
|
||||
tokio::spawn(async move { let _ = serve(&sock2, store).await; });
|
||||
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
|
||||
wait_for_socket(&sock).await;
|
||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
||||
@@ -807,8 +833,9 @@ mod tests {
|
||||
let sock_b = dir.join("sock2");
|
||||
let store_b: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
||||
let event_store_b = make_event_store(&dir);
|
||||
let sb2 = sock_b.clone();
|
||||
tokio::spawn(async move { let _ = serve(&sock_b, store_b).await; });
|
||||
tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b).await; });
|
||||
wait_for_socket(&sb2).await;
|
||||
let mut s2 = UnixStream::connect(&sb2).await.unwrap();
|
||||
let r = req(&mut s2, 1, Cmd::Status).await;
|
||||
@@ -828,8 +855,9 @@ mod tests {
|
||||
let sock = dir.join("sock");
|
||||
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||
let event_store = make_event_store(&dir);
|
||||
let sock2 = sock.clone();
|
||||
tokio::spawn(async move { let _ = serve(&sock2, store).await; });
|
||||
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
|
||||
wait_for_socket(&sock).await;
|
||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user