From 32560ea364e224fc5087f76052b7692c1e2ab35a Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Tue, 9 Jun 2026 19:57:53 +0700 Subject: [PATCH] feat(daemon): surface actor owning pty + broadcast fan-out (M0, no grid) Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/spaceshd/src/main.rs | 1 + crates/spaceshd/src/surface.rs | 110 +++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 crates/spaceshd/src/surface.rs diff --git a/crates/spaceshd/src/main.rs b/crates/spaceshd/src/main.rs index cdb9fd3..e12c513 100644 --- a/crates/spaceshd/src/main.rs +++ b/crates/spaceshd/src/main.rs @@ -1,4 +1,5 @@ mod lifecycle; +mod surface; fn main() { println!("spaceshd skeleton"); diff --git a/crates/spaceshd/src/surface.rs b/crates/spaceshd/src/surface.rs new file mode 100644 index 0000000..e3d06bf --- /dev/null +++ b/crates/spaceshd/src/surface.rs @@ -0,0 +1,110 @@ +use spacesh_proto::{SurfaceId, WorkspaceId}; +use spacesh_pty::PtyHandle; +use tokio::sync::{broadcast, mpsc, oneshot}; + +/// Output broadcast capacity (chunks). Lagging subscribers drop intermediate frames. +const BROADCAST_CAP: usize = 1024; + +/// Messages sent to a surface actor. +pub enum SurfaceMsg { + Input(Vec), + Resize { cols: u16, rows: u16 }, + /// Subscribe to the output stream. Reply carries a fresh receiver. + Attach { reply: oneshot::Sender>> }, + Close, +} + +/// Handle the daemon keeps for a live surface. +pub struct SurfaceHandle { + pub id: SurfaceId, + pub workspace_id: WorkspaceId, + pub tx: mpsc::Sender, +} + +/// Spawn the actor; returns its handle. `exit_tx` is fired once with the exit code. +pub fn spawn_surface( + id: SurfaceId, + workspace_id: WorkspaceId, + mut pty: PtyHandle, + exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, +) -> SurfaceHandle { + let (tx, mut rx) = mpsc::channel::(64); + let (bcast, _) = broadcast::channel::>(BROADCAST_CAP); + let actor_id = id.clone(); + + tokio::spawn(async move { + loop { + tokio::select! { + msg = rx.recv() => { + match msg { + Some(SurfaceMsg::Input(bytes)) => { let _ = pty.write_input(&bytes); } + Some(SurfaceMsg::Resize { cols, rows }) => { let _ = pty.resize(cols, rows); } + Some(SurfaceMsg::Attach { reply }) => { let _ = reply.send(bcast.subscribe()); } + Some(SurfaceMsg::Close) | None => { pty.kill(); break; } + } + } + chunk = pty.output.recv() => { + match chunk { + Some(bytes) => { let _ = bcast.send(bytes); } + None => { break; } // PTY EOF + } + } + } + } + let code = pty.wait(); + let _ = exit_tx.send((actor_id, code)); + }); + + SurfaceHandle { id, workspace_id, tx } +} + +#[cfg(test)] +mod tests { + use super::*; + use spacesh_pty::SpawnSpec; + + fn spec(script: &str) -> SpawnSpec { + SpawnSpec { + command: "/bin/sh".into(), + args: vec!["-c".into(), script.into()], + cwd: std::env::temp_dir(), + cols: 80, + rows: 24, + env: vec![], + } + } + + #[tokio::test] + async fn attach_receives_output() { + let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap(); + let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); + let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, exit_tx); + + let (reply_tx, reply_rx) = oneshot::channel(); + handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap(); + let mut sub = reply_rx.await.unwrap(); + + let mut collected = Vec::new(); + // Collect for a short bounded window. + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(500); + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(tokio::time::Duration::from_millis(100), sub.recv()).await { + Ok(Ok(bytes)) => collected.extend_from_slice(&bytes), + _ => {} + } + if String::from_utf8_lossy(&collected).contains("HELLO") { break; } + } + assert!(String::from_utf8_lossy(&collected).contains("HELLO")); + } + + #[tokio::test] + async fn exit_is_reported() { + let pty = PtyHandle::spawn(spec("exit 7")).unwrap(); + let (exit_tx, mut exit_rx) = mpsc::unbounded_channel(); + let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, exit_tx); + let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv()) + .await.unwrap().unwrap(); + assert_eq!(sid, SurfaceId("s_2".into())); + assert_eq!(code, 7); + } +}