From e83524a0760cd57db2cf92b664cb034c3fde4de3 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Wed, 10 Jun 2026 07:46:55 +0700 Subject: [PATCH] feat(daemon): record done/wait/error/exit into the event log Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/spaceshd/src/server.rs | 214 +++++++++++++++++++++++++++++++++- 1 file changed, 210 insertions(+), 4 deletions(-) diff --git a/crates/spaceshd/src/server.rs b/crates/spaceshd/src/server.rs index 86b5fb9..500c100 100644 --- a/crates/spaceshd/src/server.rs +++ b/crates/spaceshd/src/server.rs @@ -129,8 +129,6 @@ async fn router( let mut reg = Registry::new(); reg.restore(initial); let mut event_log = EventLog::restore(event_initial, 1000); - let _ = &event_persister; // consumed in Task 6/7 - let _ = &mut event_log; // consumed in Task 6/7 let mut clients: HashMap = HashMap::new(); // surface_id → set of client ids subscribed (attached). let mut subs: HashMap> = HashMap::new(); @@ -159,17 +157,24 @@ async fn router( ServerMsg::Exit { surface_id, code } => { reg.mark_stopped(&surface_id); reg.drop_state(&surface_id); + record_event(®, &mut event_log, &event_persister, &clients, + &surface_id, spacesh_proto::event::EventKind::Exit); let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code }); broadcast_evt(&clients, &evt); } ServerMsg::StateDetected { surface_id, state } => { if reg.is_running(&surface_id) { reg.set_state(&surface_id, state); - broadcast_evt(&clients, &Envelope::Evt(Evt::State { surface_id, state })); + broadcast_evt(&clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state })); + if let Some(kind) = kind_for_state(state) { + record_event(®, &mut event_log, &event_persister, &clients, &surface_id, kind); + } } } ServerMsg::Request { id, cmd, client, out } => { - handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx, &state_tx, &persister).await; + handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, + &router_tx, &exit_tx, &state_tx, &persister, + &mut event_log, &event_persister).await; } } } @@ -181,6 +186,46 @@ fn broadcast_evt(clients: &HashMap, evt: &Envelope) { } } +/// Current unix-epoch milliseconds. `as u64` is safe — epoch millis fit u64 for ~584M years. +fn now_millis() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + +/// Which state transitions are worth logging. work/idle are noise → None. +fn kind_for_state(state: SurfaceState) -> Option { + use spacesh_proto::event::EventKind; + match state { + SurfaceState::Done => Some(EventKind::Done), + SurfaceState::Wait => Some(EventKind::Wait), + SurfaceState::Error => Some(EventKind::Error), + SurfaceState::Work | SurfaceState::Idle => None, + } +} + +/// Record one event (denormalizing workspace name + agent label), persist, broadcast. +fn record_event( + reg: &Registry, + log: &mut EventLog, + persister: &EventPersister, + clients: &HashMap, + sid: &SurfaceId, + kind: spacesh_proto::event::EventKind, +) { + // No workspace → the surface was already removed (user-initiated Close / ApplyPreset / + // CloseWorkspace remove it synchronously before the async Exit arrives). Such deliberate + // closes are intentionally NOT logged — only spontaneous process exits and status + // transitions become events. + let Some(ws_id) = reg.workspace_of(sid) else { return }; + let ws_name = reg.workspace(&ws_id).map(|w| w.name.clone()).unwrap_or_default(); + let agent = reg.surface_spec(sid).and_then(|s| s.agent_label); + let rec = log.record(sid.clone(), ws_id, ws_name, agent, kind, now_millis()); + persister.mark_dirty(log.snapshot()); + broadcast_evt(clients, &Envelope::Evt(Evt::Event { record: rec })); +} + fn ok(id: u64, data: serde_json::Value) -> Envelope { Envelope::Res { id, ok: true, data, error: None } } @@ -225,6 +270,8 @@ async fn handle_request( exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>, state_tx: &mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, persister: &Persister, + event_log: &mut EventLog, + event_persister: &EventPersister, ) { use spacesh_proto::message::SplitDir; use spacesh_proto::layout::{LayoutNode, Orient}; @@ -526,6 +573,9 @@ async fn handle_request( if reg.is_running(&surface_id) { reg.set_state(&surface_id, state); broadcast_evt(clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state })); + if let Some(kind) = kind_for_state(state) { + record_event(reg, event_log, event_persister, clients, &surface_id, kind); + } let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { // unknown or stopped surface — status is only meaningful while running. @@ -884,4 +934,160 @@ mod tests { } assert!(saw_done, "expected a Done state event from OSC 133"); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn set_state_done_emits_event_record() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + let sock_for_task = sock.clone(); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + wait_for_socket(&sock).await; + + // Control connection: open workspace and spawn surface. + let mut ctrl = UnixStream::connect(&sock).await.unwrap(); + let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut ctrl, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws.clone()), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "sleep 5".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + // Observer connection: a second client that receives all broadcast events + // without its read loop consuming them via req(). + let mut observer = UnixStream::connect(&sock).await.unwrap(); + + // Trigger Done via SetState on the control connection. + let _ = req(&mut ctrl, 3, Cmd::SetState { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + state: spacesh_proto::status::SurfaceState::Done, + }).await; + + // Expect an Evt::Event for this surface on the observer within a short window. + let mut found = None; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(env))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await { + if let Envelope::Evt(Evt::Event { record }) = env { + if record.surface_id.0 == sid { found = Some(record); break; } + } + } + } + let rec = found.expect("expected an Evt::Event for the surface"); + assert_eq!(rec.kind, spacesh_proto::event::EventKind::Done); + assert!(!rec.read); + assert_eq!(rec.workspace_id.0, ws); + assert!(!rec.workspace_name.is_empty(), "workspace name should be denormalized into the record"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn close_does_not_emit_event_record() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + let sock_for_task = sock.clone(); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + wait_for_socket(&sock).await; + + // Control connection: open workspace and spawn surface. + let mut ctrl = UnixStream::connect(&sock).await.unwrap(); + let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut ctrl, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws.clone()), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "sleep 5".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + // Observer connection: receives all broadcast events. + let mut observer = UnixStream::connect(&sock).await.unwrap(); + + // User-initiated Close on the control connection. + let _ = req(&mut ctrl, 3, Cmd::Close { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + }).await; + + // A deliberate Close must surface an Evt::Exit but NEVER an Evt::Event for it. + let mut saw_exit = false; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(env))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await { + match env { + Envelope::Evt(Evt::Event { record }) if record.surface_id.0 == sid => { + panic!("user-initiated Close must not produce an Evt::Event"); + } + Envelope::Evt(Evt::Exit { surface_id, .. }) if surface_id.0 == sid => { + saw_exit = true; + } + _ => {} + } + } + } + assert!(saw_exit, "expected an Evt::Exit for the closed surface"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn osc133_detected_state_emits_event_record() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + let sock2 = sock.clone(); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); + wait_for_socket(&sock).await; + + // Control connection: open workspace and spawn a surface that emits OSC 133. + let mut ctrl = UnixStream::connect(&sock).await.unwrap(); + let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut ctrl, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws.clone()), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "printf '\\033]133;C\\007'; printf hi; printf '\\033]133;D;0\\007'; sleep 1".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + + // Observer connection: receives all broadcast events (the detected-state path + // flows through ServerMsg::StateDetected → record_event → Evt::Event). + let mut observer = UnixStream::connect(&sock).await.unwrap(); + + // Drive the PTY output by attaching the control connection. + let _ = req(&mut ctrl, 3, Cmd::Attach { + surface_id: spacesh_proto::SurfaceId(sid.clone()), + }).await; + + // Expect an Evt::Event (kind=done) for this surface from the OSC 133 Done detection. + let mut found = None; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(env))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await { + if let Envelope::Evt(Evt::Event { record }) = env { + if record.surface_id.0 == sid { found = Some(record); break; } + } + } + } + let rec = found.expect("expected an Evt::Event from the OSC 133 detected state"); + assert_eq!(rec.kind, spacesh_proto::event::EventKind::Done); + assert!(!rec.read); + assert_eq!(rec.workspace_id.0, ws); + } }