use spacesh_core::{snapshot::snapshot_ansi, GridSurface}; use spacesh_core::snapshot::Snapshot; use spacesh_proto::{SurfaceId, WorkspaceId}; use spacesh_pty::PtyHandle; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{Duration, Instant}; const BROADCAST_CAP: usize = 1024; const FLUSH_INTERVAL: Duration = Duration::from_millis(6); const FLUSH_BYTES: usize = 16 * 1024; pub enum SurfaceMsg { Input(Vec), Resize { cols: u16, rows: u16 }, Attach { reply: oneshot::Sender>> }, /// Attach with snapshot: subscribe AND capture the grid in one actor turn. AttachSnapshot { reply: oneshot::Sender<(Snapshot, broadcast::Receiver>)> }, Close, } pub struct SurfaceHandle { pub id: SurfaceId, pub workspace_id: WorkspaceId, pub tx: mpsc::Sender, } pub fn spawn_surface( id: SurfaceId, workspace_id: WorkspaceId, mut pty: PtyHandle, cols: u16, rows: u16, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, ) -> SurfaceHandle { let (tx, mut rx) = mpsc::channel::(64); let (bcast, _) = broadcast::channel::>(BROADCAST_CAP); let actor_id = id.clone(); tokio::spawn(async move { let mut grid = GridSurface::new(cols, rows); let mut pending: Vec = Vec::with_capacity(FLUSH_BYTES); let mut flush_deadline: Option = None; // Helper closure can't borrow across awaits cleanly; inline the flush logic. loop { // Copy the deadline into an owned local so the timer future doesn't // hold a borrow of `flush_deadline` across the select! (other arms mutate it). let next_flush = flush_deadline; let timer = async move { match next_flush { Some(d) => tokio::time::sleep_until(d).await, None => std::future::pending::<()>().await, } }; tokio::select! { msg = rx.recv() => { match msg { Some(SurfaceMsg::Input(bytes)) => { let _ = pty.write_input(&bytes); } Some(SurfaceMsg::Resize { cols, rows }) => { grid.resize(cols, rows); let _ = pty.resize(cols, rows); } Some(SurfaceMsg::Attach { reply }) => { let _ = reply.send(bcast.subscribe()); } Some(SurfaceMsg::AttachSnapshot { reply }) => { // Subscribe-then-snapshot is atomic within this actor turn (no await, // no flush can interleave). Any not-yet-flushed `pending` stays in // `pending` and is delivered to ALL subscribers — including this new // one — exactly once by the normal 6ms/16KiB flush path. It is NOT in // this snapshot. Broadcasting here would double-render on reattach. let sub = bcast.subscribe(); let snap = snapshot_ansi(&grid); let _ = reply.send((snap, sub)); } Some(SurfaceMsg::Close) | None => { pty.kill(); break; } } } chunk = pty.output.recv() => { match chunk { Some(bytes) => { pending.extend_from_slice(&bytes); if flush_deadline.is_none() { flush_deadline = Some(Instant::now() + FLUSH_INTERVAL); } if pending.len() >= FLUSH_BYTES { grid.feed(&pending); let _ = bcast.send(std::mem::take(&mut pending)); flush_deadline = None; } } None => { // Final flush on EOF. if !pending.is_empty() { grid.feed(&pending); let _ = bcast.send(std::mem::take(&mut pending)); } break; } } } _ = timer => { if !pending.is_empty() { grid.feed(&pending); let _ = bcast.send(std::mem::take(&mut pending)); } flush_deadline = None; } } } let code = pty.wait(); let _ = exit_tx.send((actor_id, code)); }); SurfaceHandle { id, workspace_id, tx } } #[cfg(test)] mod tests { use super::*; use spacesh_pty::SpawnSpec; fn spec(script: &str) -> SpawnSpec { SpawnSpec { command: "/bin/sh".into(), args: vec!["-c".into(), script.into()], cwd: std::env::temp_dir(), cols: 80, rows: 24, env: vec![], } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn attach_receives_output() { let _serial = crate::test_support::serial(); let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx); let (reply_tx, reply_rx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap(); let mut sub = reply_rx.await.unwrap(); let mut collected = Vec::new(); // Collect for a short bounded window. let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(500); while tokio::time::Instant::now() < deadline { match tokio::time::timeout(tokio::time::Duration::from_millis(100), sub.recv()).await { Ok(Ok(bytes)) => collected.extend_from_slice(&bytes), _ => {} } if String::from_utf8_lossy(&collected).contains("HELLO") { break; } } assert!(String::from_utf8_lossy(&collected).contains("HELLO")); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn exit_is_reported() { let _serial = crate::test_support::serial(); let pty = PtyHandle::spawn(spec("exit 7")).unwrap(); let (exit_tx, mut exit_rx) = mpsc::unbounded_channel(); let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx); let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv()) .await.unwrap().unwrap(); assert_eq!(sid, SurfaceId("s_2".into())); assert_eq!(code, 7); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn attach_snapshot_reflects_prior_output() { let _serial = crate::test_support::serial(); let pty = PtyHandle::spawn(spec("printf SNAPME; sleep 0.5")).unwrap(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx); // Give the child time to write and the actor time to flush into the grid. tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; let (reply_tx, reply_rx) = oneshot::channel(); handle.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.unwrap(); let (snap, _sub) = reply_rx.await.unwrap(); assert!(snap.ansi.contains("SNAPME"), "snapshot: {:?}", snap.ansi); } }