diff --git a/crates/spaceshd/src/event_log.rs b/crates/spaceshd/src/event_log.rs index 4c5906b..d60b614 100644 --- a/crates/spaceshd/src/event_log.rs +++ b/crates/spaceshd/src/event_log.rs @@ -22,6 +22,7 @@ pub struct EventLog { } impl EventLog { + #[cfg(test)] pub fn new(cap: usize) -> Self { Self { records: VecDeque::new(), next_id: 1, cap } } diff --git a/crates/spaceshd/src/server.rs b/crates/spaceshd/src/server.rs index 500c100..ee3f740 100644 --- a/crates/spaceshd/src/server.rs +++ b/crates/spaceshd/src/server.rs @@ -550,7 +550,14 @@ async fn handle_request( let _ = out.send(ok(id, serde_json::Value::Null)).await; } - Cmd::Focus { surface_id: _ } => { let _ = out.send(ok(id, serde_json::Value::Null)).await; } + Cmd::Focus { surface_id } => { + let ids = event_log.mark_read(&spacesh_proto::event::MarkReadTarget::Surface(surface_id)); + if !ids.is_empty() { + event_persister.mark_dirty(event_log.snapshot()); + broadcast_evt(clients, &Envelope::Evt(Evt::EventsRead { ids })); + } + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } Cmd::Close { surface_id } => { if reg.surface_spec(&surface_id).is_some() { @@ -589,15 +596,18 @@ async fn handle_request( } Cmd::EventLog { limit } => { - // TODO(SP2-T4): wire to EventLog once the shared state is plumbed in. - let _ = limit; - let _ = out.send(err(id, "NOT_IMPLEMENTED", "event log not yet wired")).await; + let events = event_log.recent(limit); + let unread = event_log.unread_count(); + let _ = out.send(ok(id, serde_json::json!({ "events": events, "unread": unread }))).await; } Cmd::MarkRead { target } => { - // TODO(SP2-T4): wire to EventLog once the shared state is plumbed in. - let _ = target; - let _ = out.send(err(id, "NOT_IMPLEMENTED", "mark_read not yet wired")).await; + let ids = event_log.mark_read(&target); + if !ids.is_empty() { + event_persister.mark_dirty(event_log.snapshot()); + broadcast_evt(clients, &Envelope::Evt(Evt::EventsRead { ids })); + } + let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::Shutdown => { @@ -1090,4 +1100,128 @@ mod tests { assert!(!rec.read); assert_eq!(rec.workspace_id.0, ws); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn event_log_query_and_mark_read() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + let sock_for_task = sock.clone(); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + wait_for_socket(&sock).await; + + // Observer connection to catch the EventsRead broadcast. + let mut obs = UnixStream::connect(&sock).await.unwrap(); + // Control connection. + let mut s = UnixStream::connect(&sock).await.unwrap(); + + let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut s, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "sleep 5".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + let _ = req(&mut s, 3, Cmd::SetState { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + state: spacesh_proto::status::SurfaceState::Error, + }).await; + + // Query the log. + let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await; + let data = res_data(&log); + assert_eq!(data["unread"].as_u64().unwrap(), 1); + let first_id = data["events"][0]["id"].as_u64().unwrap(); + + // Mark it read by id. + let _ = req(&mut s, 5, Cmd::MarkRead { + target: spacesh_proto::event::MarkReadTarget::Ids(vec![first_id]), + }).await; + + // Observer should see EventsRead { ids: [first_id] }. + let mut saw_read = false; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(env))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut obs)).await { + if let Envelope::Evt(Evt::EventsRead { ids }) = env { + if ids == vec![first_id] { saw_read = true; break; } + } + } + } + assert!(saw_read, "expected an EventsRead broadcast for the marked id"); + + // Unread is now 0. + let log = req(&mut s, 6, Cmd::EventLog { limit: None }).await; + assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 0); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn focus_marks_surface_events_read() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + let sock_for_task = sock.clone(); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + wait_for_socket(&sock).await; + + // Observer connection. + let mut obs = UnixStream::connect(&sock).await.unwrap(); + // Control connection. + let mut s = UnixStream::connect(&sock).await.unwrap(); + + let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut s, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "sleep 5".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + // Drive an Error event for this surface. + let _ = req(&mut s, 3, Cmd::SetState { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + state: spacesh_proto::status::SurfaceState::Error, + }).await; + + // Verify unread == 1 before Focus. + let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await; + assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 1); + let first_id = res_data(&log)["events"][0]["id"].as_u64().unwrap(); + + // Focus that surface — should mark its events read. + let _ = req(&mut s, 5, Cmd::Focus { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + }).await; + + // Observer should receive EventsRead with the id from the Error event. + let mut saw_read = false; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(env))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut obs)).await { + if let Envelope::Evt(Evt::EventsRead { ids }) = env { + if ids.contains(&first_id) { saw_read = true; break; } + } + } + } + assert!(saw_read, "expected an EventsRead broadcast after Focus"); + + // Unread drops to 0. + let log = req(&mut s, 6, Cmd::EventLog { limit: None }).await; + assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 0); + } }