diff --git a/crates/spaceshd/src/server.rs b/crates/spaceshd/src/server.rs index 2818641..75d5749 100644 --- a/crates/spaceshd/src/server.rs +++ b/crates/spaceshd/src/server.rs @@ -183,7 +183,7 @@ async fn handle_request( }; match PtyHandle::spawn(spec) { Ok(pty) => { - let handle = spawn_surface(sid.clone(), workspace_id.clone(), pty, exit_tx.clone()); + let handle = spawn_surface(sid.clone(), workspace_id.clone(), pty, cols, rows, exit_tx.clone()); // Bridge the surface's broadcast into the router as Output messages. spawn_output_bridge(sid.clone(), &handle, router_tx.clone()); reg.insert_surface(handle); @@ -219,10 +219,22 @@ async fn handle_request( } } Cmd::Attach { surface_id } => { - // M0 attach: register subscription, no snapshot yet (snapshot added in Task 13). - if reg.surface(&surface_id).is_some() { - subs.entry(surface_id.clone()).or_default().push(client); - let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0 }))).await; + if let Some(s) = reg.surface(&surface_id) { + let (reply_tx, reply_rx) = oneshot::channel(); + if s.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.is_ok() { + if let Ok((snap, _sub)) = reply_rx.await { + subs.entry(surface_id.clone()).or_default().push(client); + let _ = out.send(ok(id, serde_json::json!({ + "snapshot": snap.ansi, + "cols": snap.cols, + "rows": snap.rows, + "cursor_row": snap.cursor_row, + "cursor_col": snap.cursor_col, + }))).await; + return; + } + } + let _ = out.send(err(id, "INTERNAL", "attach failed")).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } diff --git a/crates/spaceshd/src/surface.rs b/crates/spaceshd/src/surface.rs index 95d0b9a..eb8b28a 100644 --- a/crates/spaceshd/src/surface.rs +++ b/crates/spaceshd/src/surface.rs @@ -1,31 +1,35 @@ +use spacesh_core::{snapshot::snapshot_ansi, GridSurface}; +use spacesh_core::snapshot::Snapshot; use spacesh_proto::{SurfaceId, WorkspaceId}; use spacesh_pty::PtyHandle; use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio::time::{Duration, Instant}; -/// Output broadcast capacity (chunks). Lagging subscribers drop intermediate frames. const BROADCAST_CAP: usize = 1024; +const FLUSH_INTERVAL: Duration = Duration::from_millis(6); +const FLUSH_BYTES: usize = 16 * 1024; -/// Messages sent to a surface actor. pub enum SurfaceMsg { Input(Vec), Resize { cols: u16, rows: u16 }, - /// Subscribe to the output stream. Reply carries a fresh receiver. Attach { reply: oneshot::Sender>> }, + /// Attach with snapshot: subscribe AND capture the grid in one actor turn. + AttachSnapshot { reply: oneshot::Sender<(Snapshot, broadcast::Receiver>)> }, Close, } -/// Handle the daemon keeps for a live surface. pub struct SurfaceHandle { pub id: SurfaceId, pub workspace_id: WorkspaceId, pub tx: mpsc::Sender, } -/// Spawn the actor; returns its handle. `exit_tx` is fired once with the exit code. pub fn spawn_surface( id: SurfaceId, workspace_id: WorkspaceId, mut pty: PtyHandle, + cols: u16, + rows: u16, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, ) -> SurfaceHandle { let (tx, mut rx) = mpsc::channel::(64); @@ -33,22 +37,76 @@ pub fn spawn_surface( let actor_id = id.clone(); tokio::spawn(async move { + let mut grid = GridSurface::new(cols, rows); + let mut pending: Vec = Vec::with_capacity(FLUSH_BYTES); + let mut flush_deadline: Option = None; + + // Helper closure can't borrow across awaits cleanly; inline the flush logic. loop { + // Copy the deadline into an owned local so the timer future doesn't + // hold a borrow of `flush_deadline` across the select! (other arms mutate it). + let next_flush = flush_deadline; + let timer = async move { + match next_flush { + Some(d) => tokio::time::sleep_until(d).await, + None => std::future::pending::<()>().await, + } + }; + tokio::select! { msg = rx.recv() => { match msg { Some(SurfaceMsg::Input(bytes)) => { let _ = pty.write_input(&bytes); } - Some(SurfaceMsg::Resize { cols, rows }) => { let _ = pty.resize(cols, rows); } + Some(SurfaceMsg::Resize { cols, rows }) => { + grid.resize(cols, rows); + let _ = pty.resize(cols, rows); + } Some(SurfaceMsg::Attach { reply }) => { let _ = reply.send(bcast.subscribe()); } + Some(SurfaceMsg::AttachSnapshot { reply }) => { + // Flush pending into the grid first so the snapshot is current, + // but DO NOT broadcast here; subscribe before any further output. + if !pending.is_empty() { + grid.feed(&pending); + let _ = bcast.send(std::mem::take(&mut pending)); + flush_deadline = None; + } + let sub = bcast.subscribe(); + let snap = snapshot_ansi(&grid); + let _ = reply.send((snap, sub)); + } Some(SurfaceMsg::Close) | None => { pty.kill(); break; } } } chunk = pty.output.recv() => { match chunk { - Some(bytes) => { let _ = bcast.send(bytes); } - None => { break; } // PTY EOF + Some(bytes) => { + pending.extend_from_slice(&bytes); + if flush_deadline.is_none() { + flush_deadline = Some(Instant::now() + FLUSH_INTERVAL); + } + if pending.len() >= FLUSH_BYTES { + grid.feed(&pending); + let _ = bcast.send(std::mem::take(&mut pending)); + flush_deadline = None; + } + } + None => { + // Final flush on EOF. + if !pending.is_empty() { + grid.feed(&pending); + let _ = bcast.send(std::mem::take(&mut pending)); + } + break; + } } } + _ = timer => { + if !pending.is_empty() { + grid.feed(&pending); + let _ = bcast.send(std::mem::take(&mut pending)); + } + flush_deadline = None; + } } } let code = pty.wait(); @@ -78,7 +136,7 @@ mod tests { async fn attach_receives_output() { let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); - let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, exit_tx); + let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx); let (reply_tx, reply_rx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap(); @@ -101,10 +159,25 @@ mod tests { async fn exit_is_reported() { let pty = PtyHandle::spawn(spec("exit 7")).unwrap(); let (exit_tx, mut exit_rx) = mpsc::unbounded_channel(); - let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, exit_tx); + let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx); let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv()) .await.unwrap().unwrap(); assert_eq!(sid, SurfaceId("s_2".into())); assert_eq!(code, 7); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn attach_snapshot_reflects_prior_output() { + let pty = PtyHandle::spawn(spec("printf SNAPME; sleep 0.5")).unwrap(); + let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); + let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx); + + // Give the child time to write and the actor time to flush into the grid. + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + let (reply_tx, reply_rx) = oneshot::channel(); + handle.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.unwrap(); + let (snap, _sub) = reply_rx.await.unwrap(); + assert!(snap.ansi.contains("SNAPME"), "snapshot: {:?}", snap.ansi); + } }