feat(daemon): EventLog/MarkRead commands; Focus marks surface read

Wire Cmd::EventLog and Cmd::MarkRead to the live EventLog (replacing
NOT_IMPLEMENTED stubs). Cmd::Focus now calls mark_read for the surface,
persists the updated snapshot, and broadcasts Evt::EventsRead.
Add integration tests: event_log_query_and_mark_read and
focus_marks_surface_events_read.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-10 08:08:59 +07:00
parent e83524a076
commit 21fe1308cc
2 changed files with 142 additions and 7 deletions
+141 -7
View File
@@ -550,7 +550,14 @@ async fn handle_request(
let _ = out.send(ok(id, serde_json::Value::Null)).await;
}
Cmd::Focus { surface_id: _ } => { let _ = out.send(ok(id, serde_json::Value::Null)).await; }
Cmd::Focus { surface_id } => {
let ids = event_log.mark_read(&spacesh_proto::event::MarkReadTarget::Surface(surface_id));
if !ids.is_empty() {
event_persister.mark_dirty(event_log.snapshot());
broadcast_evt(clients, &Envelope::Evt(Evt::EventsRead { ids }));
}
let _ = out.send(ok(id, serde_json::Value::Null)).await;
}
Cmd::Close { surface_id } => {
if reg.surface_spec(&surface_id).is_some() {
@@ -589,15 +596,18 @@ async fn handle_request(
}
Cmd::EventLog { limit } => {
// TODO(SP2-T4): wire to EventLog once the shared state is plumbed in.
let _ = limit;
let _ = out.send(err(id, "NOT_IMPLEMENTED", "event log not yet wired")).await;
let events = event_log.recent(limit);
let unread = event_log.unread_count();
let _ = out.send(ok(id, serde_json::json!({ "events": events, "unread": unread }))).await;
}
Cmd::MarkRead { target } => {
// TODO(SP2-T4): wire to EventLog once the shared state is plumbed in.
let _ = target;
let _ = out.send(err(id, "NOT_IMPLEMENTED", "mark_read not yet wired")).await;
let ids = event_log.mark_read(&target);
if !ids.is_empty() {
event_persister.mark_dirty(event_log.snapshot());
broadcast_evt(clients, &Envelope::Evt(Evt::EventsRead { ids }));
}
let _ = out.send(ok(id, serde_json::Value::Null)).await;
}
Cmd::Shutdown => {
@@ -1090,4 +1100,128 @@ mod tests {
assert!(!rec.read);
assert_eq!(rec.workspace_id.0, ws);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn event_log_query_and_mark_read() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let event_store = make_event_store(&dir);
let sock_for_task = sock.clone();
let store2 = store.clone();
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
wait_for_socket(&sock).await;
// Observer connection to catch the EventsRead broadcast.
let mut obs = UnixStream::connect(&sock).await.unwrap();
// Control connection.
let mut s = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
let r = req(&mut s, 2, Cmd::NewSurface {
workspace_id: spacesh_proto::WorkspaceId(ws),
command: Some("/bin/sh".into()),
args: vec!["-c".into(), "sleep 5".into()],
cols: 80, rows: 24,
}).await;
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
let _ = req(&mut s, 3, Cmd::SetState {
surface_id: spacesh_proto::SurfaceId(sid.clone()),
state: spacesh_proto::status::SurfaceState::Error,
}).await;
// Query the log.
let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await;
let data = res_data(&log);
assert_eq!(data["unread"].as_u64().unwrap(), 1);
let first_id = data["events"][0]["id"].as_u64().unwrap();
// Mark it read by id.
let _ = req(&mut s, 5, Cmd::MarkRead {
target: spacesh_proto::event::MarkReadTarget::Ids(vec![first_id]),
}).await;
// Observer should see EventsRead { ids: [first_id] }.
let mut saw_read = false;
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
if let Ok(Ok(Some(env))) =
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut obs)).await {
if let Envelope::Evt(Evt::EventsRead { ids }) = env {
if ids == vec![first_id] { saw_read = true; break; }
}
}
}
assert!(saw_read, "expected an EventsRead broadcast for the marked id");
// Unread is now 0.
let log = req(&mut s, 6, Cmd::EventLog { limit: None }).await;
assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn focus_marks_surface_events_read() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let event_store = make_event_store(&dir);
let sock_for_task = sock.clone();
let store2 = store.clone();
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
wait_for_socket(&sock).await;
// Observer connection.
let mut obs = UnixStream::connect(&sock).await.unwrap();
// Control connection.
let mut s = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
let r = req(&mut s, 2, Cmd::NewSurface {
workspace_id: spacesh_proto::WorkspaceId(ws),
command: Some("/bin/sh".into()),
args: vec!["-c".into(), "sleep 5".into()],
cols: 80, rows: 24,
}).await;
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
// Drive an Error event for this surface.
let _ = req(&mut s, 3, Cmd::SetState {
surface_id: spacesh_proto::SurfaceId(sid.clone()),
state: spacesh_proto::status::SurfaceState::Error,
}).await;
// Verify unread == 1 before Focus.
let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await;
assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 1);
let first_id = res_data(&log)["events"][0]["id"].as_u64().unwrap();
// Focus that surface — should mark its events read.
let _ = req(&mut s, 5, Cmd::Focus {
surface_id: spacesh_proto::SurfaceId(sid.clone()),
}).await;
// Observer should receive EventsRead with the id from the Error event.
let mut saw_read = false;
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
if let Ok(Ok(Some(env))) =
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut obs)).await {
if let Envelope::Evt(Evt::EventsRead { ids }) = env {
if ids.contains(&first_id) { saw_read = true; break; }
}
}
}
assert!(saw_read, "expected an EventsRead broadcast after Focus");
// Unread drops to 0.
let log = req(&mut s, 6, Cmd::EventLog { limit: None }).await;
assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 0);
}
}