feat(daemon): record done/wait/error/exit into the event log

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-10 07:46:55 +07:00
parent f7bf9fc5c6
commit e83524a076
+210 -4
View File
@@ -129,8 +129,6 @@ async fn router(
let mut reg = Registry::new();
reg.restore(initial);
let mut event_log = EventLog::restore(event_initial, 1000);
let _ = &event_persister; // consumed in Task 6/7
let _ = &mut event_log; // consumed in Task 6/7
let mut clients: HashMap<ClientId, ClientTx> = HashMap::new();
// surface_id → set of client ids subscribed (attached).
let mut subs: HashMap<SurfaceId, Vec<ClientId>> = HashMap::new();
@@ -159,17 +157,24 @@ async fn router(
ServerMsg::Exit { surface_id, code } => {
reg.mark_stopped(&surface_id);
reg.drop_state(&surface_id);
record_event(&reg, &mut event_log, &event_persister, &clients,
&surface_id, spacesh_proto::event::EventKind::Exit);
let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code });
broadcast_evt(&clients, &evt);
}
ServerMsg::StateDetected { surface_id, state } => {
if reg.is_running(&surface_id) {
reg.set_state(&surface_id, state);
broadcast_evt(&clients, &Envelope::Evt(Evt::State { surface_id, state }));
broadcast_evt(&clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state }));
if let Some(kind) = kind_for_state(state) {
record_event(&reg, &mut event_log, &event_persister, &clients, &surface_id, kind);
}
}
}
ServerMsg::Request { id, cmd, client, out } => {
handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx, &state_tx, &persister).await;
handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients,
&router_tx, &exit_tx, &state_tx, &persister,
&mut event_log, &event_persister).await;
}
}
}
@@ -181,6 +186,46 @@ fn broadcast_evt(clients: &HashMap<ClientId, ClientTx>, evt: &Envelope) {
}
}
/// Current unix-epoch milliseconds. `as u64` is safe — epoch millis fit u64 for ~584M years.
fn now_millis() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
/// Which state transitions are worth logging. work/idle are noise → None.
fn kind_for_state(state: SurfaceState) -> Option<spacesh_proto::event::EventKind> {
use spacesh_proto::event::EventKind;
match state {
SurfaceState::Done => Some(EventKind::Done),
SurfaceState::Wait => Some(EventKind::Wait),
SurfaceState::Error => Some(EventKind::Error),
SurfaceState::Work | SurfaceState::Idle => None,
}
}
/// Record one event (denormalizing workspace name + agent label), persist, broadcast.
fn record_event(
reg: &Registry,
log: &mut EventLog,
persister: &EventPersister,
clients: &HashMap<ClientId, ClientTx>,
sid: &SurfaceId,
kind: spacesh_proto::event::EventKind,
) {
// No workspace → the surface was already removed (user-initiated Close / ApplyPreset /
// CloseWorkspace remove it synchronously before the async Exit arrives). Such deliberate
// closes are intentionally NOT logged — only spontaneous process exits and status
// transitions become events.
let Some(ws_id) = reg.workspace_of(sid) else { return };
let ws_name = reg.workspace(&ws_id).map(|w| w.name.clone()).unwrap_or_default();
let agent = reg.surface_spec(sid).and_then(|s| s.agent_label);
let rec = log.record(sid.clone(), ws_id, ws_name, agent, kind, now_millis());
persister.mark_dirty(log.snapshot());
broadcast_evt(clients, &Envelope::Evt(Evt::Event { record: rec }));
}
fn ok(id: u64, data: serde_json::Value) -> Envelope {
Envelope::Res { id, ok: true, data, error: None }
}
@@ -225,6 +270,8 @@ async fn handle_request(
exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>,
state_tx: &mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
persister: &Persister,
event_log: &mut EventLog,
event_persister: &EventPersister,
) {
use spacesh_proto::message::SplitDir;
use spacesh_proto::layout::{LayoutNode, Orient};
@@ -526,6 +573,9 @@ async fn handle_request(
if reg.is_running(&surface_id) {
reg.set_state(&surface_id, state);
broadcast_evt(clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state }));
if let Some(kind) = kind_for_state(state) {
record_event(reg, event_log, event_persister, clients, &surface_id, kind);
}
let _ = out.send(ok(id, serde_json::Value::Null)).await;
} else {
// unknown or stopped surface — status is only meaningful while running.
@@ -884,4 +934,160 @@ mod tests {
}
assert!(saw_done, "expected a Done state event from OSC 133");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn set_state_done_emits_event_record() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let event_store = make_event_store(&dir);
let sock_for_task = sock.clone();
let store2 = store.clone();
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
wait_for_socket(&sock).await;
// Control connection: open workspace and spawn surface.
let mut ctrl = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
let r = req(&mut ctrl, 2, Cmd::NewSurface {
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
command: Some("/bin/sh".into()),
args: vec!["-c".into(), "sleep 5".into()],
cols: 80, rows: 24,
}).await;
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
// Observer connection: a second client that receives all broadcast events
// without its read loop consuming them via req().
let mut observer = UnixStream::connect(&sock).await.unwrap();
// Trigger Done via SetState on the control connection.
let _ = req(&mut ctrl, 3, Cmd::SetState {
surface_id: spacesh_proto::SurfaceId(sid.clone()),
state: spacesh_proto::status::SurfaceState::Done,
}).await;
// Expect an Evt::Event for this surface on the observer within a short window.
let mut found = None;
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
if let Ok(Ok(Some(env))) =
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await {
if let Envelope::Evt(Evt::Event { record }) = env {
if record.surface_id.0 == sid { found = Some(record); break; }
}
}
}
let rec = found.expect("expected an Evt::Event for the surface");
assert_eq!(rec.kind, spacesh_proto::event::EventKind::Done);
assert!(!rec.read);
assert_eq!(rec.workspace_id.0, ws);
assert!(!rec.workspace_name.is_empty(), "workspace name should be denormalized into the record");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn close_does_not_emit_event_record() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let event_store = make_event_store(&dir);
let sock_for_task = sock.clone();
let store2 = store.clone();
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
wait_for_socket(&sock).await;
// Control connection: open workspace and spawn surface.
let mut ctrl = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
let r = req(&mut ctrl, 2, Cmd::NewSurface {
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
command: Some("/bin/sh".into()),
args: vec!["-c".into(), "sleep 5".into()],
cols: 80, rows: 24,
}).await;
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
// Observer connection: receives all broadcast events.
let mut observer = UnixStream::connect(&sock).await.unwrap();
// User-initiated Close on the control connection.
let _ = req(&mut ctrl, 3, Cmd::Close {
surface_id: spacesh_proto::SurfaceId(sid.clone()),
}).await;
// A deliberate Close must surface an Evt::Exit but NEVER an Evt::Event for it.
let mut saw_exit = false;
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2);
while tokio::time::Instant::now() < deadline {
if let Ok(Ok(Some(env))) =
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await {
match env {
Envelope::Evt(Evt::Event { record }) if record.surface_id.0 == sid => {
panic!("user-initiated Close must not produce an Evt::Event");
}
Envelope::Evt(Evt::Exit { surface_id, .. }) if surface_id.0 == sid => {
saw_exit = true;
}
_ => {}
}
}
}
assert!(saw_exit, "expected an Evt::Exit for the closed surface");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn osc133_detected_state_emits_event_record() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let event_store = make_event_store(&dir);
let sock2 = sock.clone();
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
wait_for_socket(&sock).await;
// Control connection: open workspace and spawn a surface that emits OSC 133.
let mut ctrl = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
let r = req(&mut ctrl, 2, Cmd::NewSurface {
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
command: Some("/bin/sh".into()),
args: vec!["-c".into(), "printf '\\033]133;C\\007'; printf hi; printf '\\033]133;D;0\\007'; sleep 1".into()],
cols: 80, rows: 24,
}).await;
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
// Observer connection: receives all broadcast events (the detected-state path
// flows through ServerMsg::StateDetected → record_event → Evt::Event).
let mut observer = UnixStream::connect(&sock).await.unwrap();
// Drive the PTY output by attaching the control connection.
let _ = req(&mut ctrl, 3, Cmd::Attach {
surface_id: spacesh_proto::SurfaceId(sid.clone()),
}).await;
// Expect an Evt::Event (kind=done) for this surface from the OSC 133 Done detection.
let mut found = None;
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
if let Ok(Ok(Some(env))) =
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await {
if let Envelope::Evt(Evt::Event { record }) = env {
if record.surface_id.0 == sid { found = Some(record); break; }
}
}
}
let rec = found.expect("expected an Evt::Event from the OSC 133 detected state");
assert_eq!(rec.kind, spacesh_proto::event::EventKind::Done);
assert!(!rec.read);
assert_eq!(rec.workspace_id.0, ws);
}
}