diff --git a/crates/spaceshd/src/main.rs b/crates/spaceshd/src/main.rs index 6ea05e2..41f4914 100644 --- a/crates/spaceshd/src/main.rs +++ b/crates/spaceshd/src/main.rs @@ -61,6 +61,9 @@ async fn run_daemon() -> Result<()> { let events_path = lifecycle::spacesh_dir()?.join("events.json"); let event_store: std::sync::Arc = std::sync::Arc::new(event_store::JsonEventStore::new(events_path)); + let snapshots_dir = lifecycle::spacesh_dir()?.join("snapshots"); + let snapshot_store: std::sync::Arc = + std::sync::Arc::new(snapshot_store::JsonSnapshotStore::new(snapshots_dir)); eprintln!("spaceshd listening on {}", sock.display()); - server::serve(&sock, store, event_store).await + server::serve(&sock, store, event_store, snapshot_store).await } diff --git a/crates/spaceshd/src/registry.rs b/crates/spaceshd/src/registry.rs index a121cb3..3bd5480 100644 --- a/crates/spaceshd/src/registry.rs +++ b/crates/spaceshd/src/registry.rs @@ -114,6 +114,10 @@ impl Registry { pub fn is_running(&self, sid: &SurfaceId) -> bool { self.live.contains_key(sid) } + /// Ids of all currently-live surfaces. + pub fn live_ids(&self) -> Vec { + self.live.keys().cloned().collect() + } // ---- surface state ---- diff --git a/crates/spaceshd/src/server.rs b/crates/spaceshd/src/server.rs index 192fd78..1d68456 100644 --- a/crates/spaceshd/src/server.rs +++ b/crates/spaceshd/src/server.rs @@ -13,6 +13,7 @@ use crate::event_log::EventLog; use crate::event_store::{self, EventPersister, EventStore}; use crate::persist::{self, Persister}; use crate::registry::Registry; +use crate::snapshot_store::{SnapshotStore, SnapshotMsg, spawn_writer}; use crate::state_store::StateStore; use crate::surface::{SurfaceMsg}; @@ -33,11 +34,13 @@ enum ServerMsg { ClientDisconnected { client: ClientId }, /// A status change detected internally (OSC 133 / fallback) by a surface actor. StateDetected { surface_id: SurfaceId, state: SurfaceState }, + /// Periodic snapshot tick: ask all live surfaces for a snapshot and persist dirty ones. + SnapshotTick, } type ClientId = u64; -pub async fn serve(socket: &Path, store: Arc, event_store: Arc) -> Result<()> { +pub async fn serve(socket: &Path, store: Arc, event_store: Arc, snapshot_store: Arc) -> Result<()> { let listener = UnixListener::bind(socket)?; let (router_tx, router_rx) = mpsc::channel::(256); @@ -58,6 +61,20 @@ pub async fn serve(socket: &Path, store: Arc, event_store: Arc, event_store: Arc, + snapshot_tx: mpsc::UnboundedSender, ) { let mut reg = Registry::new(); reg.restore(initial); @@ -175,10 +194,24 @@ async fn router( } } } + ServerMsg::SnapshotTick => { + let ids = reg.live_ids(); + for sid in ids { + let Some(handle) = reg.live(&sid) else { continue }; + let (reply_tx, reply_rx) = oneshot::channel(); + if handle.tx.send(SurfaceMsg::Snapshot { reply: reply_tx }).await.is_err() { continue; } + if let Ok((snap, dirty)) = reply_rx.await { + if dirty { + let _ = snapshot_tx.send(SnapshotMsg::Save(sid.clone(), snap)); + } + } + } + } ServerMsg::Request { id, cmd, client, out } => { handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx, &state_tx, &persister, - &mut event_log, &event_persister, started_at_ms, &mut config).await; + &mut event_log, &event_persister, started_at_ms, &mut config, + &snapshot_store, &snapshot_tx).await; } } } @@ -284,6 +317,8 @@ async fn handle_request( event_persister: &EventPersister, started_at_ms: u64, config: &mut crate::config::Config, + snapshot_store: &Arc, + snapshot_tx: &mpsc::UnboundedSender, ) { use spacesh_proto::message::SplitDir; use spacesh_proto::layout::{LayoutNode, Orient}; @@ -312,7 +347,7 @@ async fn handle_request( agent_label: command, cols, rows, autostart: false, }; let (env, hooks_active) = spawn_env(&sid, &spec); - match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { + match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) { Ok(handle) => { spawn_output_bridge(sid.clone(), &handle, router_tx.clone()); reg.set_live(handle); @@ -344,7 +379,7 @@ async fn handle_request( let shell = command.clone().unwrap_or_else(|| config.resolved_shell()); let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false }; let (env, hooks_active) = spawn_env(&new_sid, &spec); - match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { + match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) { Ok(handle) => { spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone()); reg.set_live(handle); @@ -418,7 +453,7 @@ async fn handle_request( let args = slot.map(|s| s.args.clone()).unwrap_or_default(); let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false }; let (env, hooks_active) = spawn_env(&new_sid, &spec); - match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { + match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) { Ok(handle) => { spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone()); reg.set_live(handle); @@ -449,7 +484,7 @@ async fn handle_request( }; let ws_id = reg.workspace_of(&surface_id).unwrap(); let (env, hooks_active) = spawn_env(&surface_id, &spec); - match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { + match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) { Ok(handle) => { spawn_output_bridge(surface_id.clone(), &handle, router_tx.clone()); reg.set_live(handle); @@ -464,6 +499,7 @@ async fn handle_request( Cmd::CloseWorkspace { workspace_id } => { let ids = reg.close_workspace(&workspace_id); for sid in &ids { crate::hooks::cleanup(sid); crate::hooks::cleanup_shell(sid); subs.remove(sid); } + for sid in &ids { let _ = snapshot_tx.send(SnapshotMsg::Remove(sid.clone())); } broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceClosed { workspace_id: workspace_id.clone() })); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; @@ -553,8 +589,18 @@ async fn handle_request( } let _ = out.send(err(id, "INTERNAL", "attach failed")).await; } else { - // stopped panel: no live stream, return an empty snapshot so the GUI shows the restart overlay. - let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0, "stopped": true }))).await; + // stopped panel: no live stream. Paint the last on-disk screen if we have one. + match snapshot_store.load(&surface_id) { + Some(snap) => { + let _ = out.send(ok(id, serde_json::json!({ + "snapshot": snap.ansi, "cols": snap.cols, "rows": snap.rows, + "cursor_row": snap.cursor_row, "cursor_col": snap.cursor_col, "stopped": true, + }))).await; + } + None => { + let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0, "stopped": true }))).await; + } + } } } @@ -580,6 +626,7 @@ async fn handle_request( subs.remove(&surface_id); crate::hooks::cleanup(&surface_id); crate::hooks::cleanup_shell(&surface_id); + let _ = snapshot_tx.send(SnapshotMsg::Remove(surface_id.clone())); broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() })); if let Some(ws_id) = ws_id { emit_layout(reg, &ws_id, clients); @@ -739,6 +786,7 @@ fn spawn_output_bridge( mod tests { use super::*; use base64::Engine; + use crate::snapshot_store::NullSnapshotStore; async fn req(stream: &mut UnixStream, id: u64, cmd: Cmd) -> Envelope { write_frame(stream, &Envelope::Req { id, cmd }).await.unwrap(); @@ -787,7 +835,7 @@ mod tests { let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -829,7 +877,7 @@ mod tests { let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Input { @@ -855,7 +903,7 @@ mod tests { let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; // First client: open, new surface that prints a marker, attach, then disconnect. @@ -894,7 +942,7 @@ mod tests { std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -924,7 +972,7 @@ mod tests { std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -968,7 +1016,7 @@ mod tests { // per-test dir so instance B reads from disk what instance A persisted. let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; @@ -985,7 +1033,7 @@ mod tests { std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); let event_store_b = make_event_store(&dir); let sb2 = sock_b.clone(); - tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b).await; }); + tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sb2).await; let mut s2 = UnixStream::connect(&sb2).await.unwrap(); let r = req(&mut s2, 1, Cmd::Status).await; @@ -1007,7 +1055,7 @@ mod tests { std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -1045,7 +1093,7 @@ mod tests { let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; // Control connection: open workspace and spawn surface. @@ -1098,7 +1146,7 @@ mod tests { let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; // Control connection: open workspace and spawn surface. @@ -1150,7 +1198,7 @@ mod tests { std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; // Control connection: open workspace and spawn a surface that emits OSC 133. @@ -1201,7 +1249,7 @@ mod tests { let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; // Observer connection to catch the EventsRead broadcast. @@ -1269,7 +1317,7 @@ mod tests { std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); let event_store = make_event_store(&dir); let sock2 = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -1311,7 +1359,7 @@ mod tests { std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); let event_store_b = make_event_store(&dir); let sb2 = sock_b.clone(); - tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b).await; }); + tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sb2).await; let mut s2 = UnixStream::connect(&sb2).await.unwrap(); @@ -1339,7 +1387,7 @@ mod tests { let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; // Observer connection. @@ -1401,7 +1449,7 @@ mod tests { let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -1424,7 +1472,7 @@ mod tests { let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -1471,7 +1519,7 @@ mod tests { let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; }); wait_for_socket(&sock).await; // Control connection: open, spawn, zoom. @@ -1511,4 +1559,52 @@ mod tests { } assert!(saw_cleared, "expected a WorkspaceChanged broadcast with cleared zoom after closing the zoomed surface"); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn stopped_attach_returns_disk_snapshot() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let event_store = make_event_store(&dir); + // Use a real JsonSnapshotStore so the on-exit dump actually lands on disk. + let snap_dir = dir.join("snapshots"); + let snapshot_store: std::sync::Arc = + std::sync::Arc::new(crate::snapshot_store::JsonSnapshotStore::new(snap_dir.clone())); + let sock_for_task = sock.clone(); + let store2 = store.clone(); + let snap_store2 = snapshot_store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, snap_store2).await; }); + wait_for_socket(&sock).await; + + // Open workspace, spawn a surface that prints a unique marker then exits quickly. + let surface_id; + { + let mut s = UnixStream::connect(&sock).await.unwrap(); + let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut s, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "printf SNAPDISK; sleep 0.2".into()], + cols: 80, rows: 24, + }).await; + surface_id = spacesh_proto::SurfaceId(res_data(&r)["surface_id"].as_str().unwrap().to_string()); + // Give the process time to run, exit, and the actor to dump its snapshot to the writer. + tokio::time::sleep(tokio::time::Duration::from_millis(1500)).await; + // s drops here + } + + // Re-verify the socket is still alive. + wait_for_socket(&sock).await; + + // Fresh client: attach to the now-stopped surface. + let mut s2 = UnixStream::connect(&sock).await.unwrap(); + let r = req(&mut s2, 1, Cmd::Attach { surface_id: surface_id.clone() }).await; + let data = res_data(&r); + assert_eq!(data["stopped"].as_bool(), Some(true), "surface should be stopped"); + let snap = data["snapshot"].as_str().unwrap_or(""); + assert!(snap.contains("SNAPDISK"), "on-disk snapshot should contain SNAPDISK, got: {snap:?}"); + } }