diff --git a/crates/spaceshd/src/surface.rs b/crates/spaceshd/src/surface.rs index a81e345..e8aea9c 100644 --- a/crates/spaceshd/src/surface.rs +++ b/crates/spaceshd/src/surface.rs @@ -10,6 +10,12 @@ use tokio::time::{Duration, Instant}; /// Spawn (or restart) a surface actor from a persisted spec. Injects /// SPACESH_SURFACE_ID into the child env, mirroring `new_surface`. +/// +/// The child process is spawned lazily — see `spawn_surface_deferred`. This +/// lets the first `Resize` from an attaching GUI fix the PTY geometry *before* +/// the shell prints its first prompt, so prompts (e.g. powerlevel10k instant +/// prompt) render at the correct size instead of being drawn at the 80x24 +/// default and then reflowed. pub fn spawn_from_spec( id: SurfaceId, workspace_id: WorkspaceId, @@ -21,21 +27,25 @@ pub fn spawn_from_spec( ) -> std::io::Result { let mut env = vec![("SPACESH_SURFACE_ID".to_string(), id.0.clone())]; env.extend(extra_env); - let pty = PtyHandle::spawn(SpawnSpec { + let spawn_spec = SpawnSpec { command: spec.command.clone(), args: spec.args.clone(), cwd: std::path::PathBuf::from(&spec.cwd), cols: spec.cols, rows: spec.rows, env, - }) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; - Ok(spawn_surface(id, workspace_id, pty, spec.cols, spec.rows, hooks_active, state_tx, exit_tx)) + }; + Ok(spawn_surface_deferred(id, workspace_id, spawn_spec, spec.cols, spec.rows, hooks_active, state_tx, exit_tx)) } const BROADCAST_CAP: usize = 1024; const FLUSH_INTERVAL: Duration = Duration::from_millis(6); const FLUSH_BYTES: usize = 16 * 1024; +/// How long a deferred surface waits for the first `Resize` before spawning the +/// child at its default geometry. The GUI's fit-driven resize lands within a +/// frame (<16ms); this only bounds the wait for headless/CLI surfaces that +/// never attach a GUI. +const SPAWN_FALLBACK: Duration = Duration::from_millis(250); pub enum SurfaceMsg { Input(Vec), @@ -53,22 +63,108 @@ pub struct SurfaceHandle { pub tx: mpsc::Sender, } +/// Eager variant: spawn the actor over an already-spawned PTY. Retained for +/// tests and callers that have a live PTY in hand; production goes through +/// `spawn_surface_deferred` via `spawn_from_spec`. +#[allow(dead_code)] pub fn spawn_surface( id: SurfaceId, workspace_id: WorkspaceId, - mut pty: PtyHandle, + pty: PtyHandle, cols: u16, rows: u16, hooks_active: bool, state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, +) -> SurfaceHandle { + let (tx, rx) = mpsc::channel::(64); + let (bcast, _) = broadcast::channel::>(BROADCAST_CAP); + tokio::spawn(run_actor(id.clone(), pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, Vec::new())); + SurfaceHandle { id, workspace_id, tx } +} + +/// Create a surface whose child process is spawned lazily: on the first +/// `Resize` (using its geometry) or after `SPAWN_FALLBACK` (using `def_*`). +/// Attaches received before the spawn get an empty-grid snapshot and a live +/// subscription; input received before the spawn is buffered and replayed. +#[allow(clippy::too_many_arguments)] +pub fn spawn_surface_deferred( + id: SurfaceId, + workspace_id: WorkspaceId, + spawn_spec: SpawnSpec, + def_cols: u16, + def_rows: u16, + hooks_active: bool, + state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, + exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, ) -> SurfaceHandle { let (tx, mut rx) = mpsc::channel::(64); let (bcast, _) = broadcast::channel::>(BROADCAST_CAP); let actor_id = id.clone(); - let detect_id = id.clone(); tokio::spawn(async move { + let mut cols = def_cols; + let mut rows = def_rows; + let mut prebuf: Vec = Vec::new(); + let fallback = tokio::time::sleep(SPAWN_FALLBACK); + tokio::pin!(fallback); + + // Pre-spawn phase: hold the child until we know the real geometry. + let proceed = loop { + tokio::select! { + _ = &mut fallback => break true, + msg = rx.recv() => match msg { + Some(SurfaceMsg::Resize { cols: c, rows: r }) => { cols = c; rows = r; break true; } + Some(SurfaceMsg::Input(bytes)) => prebuf.extend_from_slice(&bytes), + Some(SurfaceMsg::Attach { reply }) => { let _ = reply.send(bcast.subscribe()); } + Some(SurfaceMsg::AttachSnapshot { reply }) => { + let sub = bcast.subscribe(); + let snap = snapshot_ansi(&GridSurface::new(cols, rows)); + let _ = reply.send((snap, sub)); + } + Some(SurfaceMsg::Close) | None => break false, + } + } + }; + if !proceed { + let _ = exit_tx.send((actor_id, 0)); + return; + } + + let mut spec = spawn_spec; + spec.cols = cols; + spec.rows = rows; + match PtyHandle::spawn(spec) { + Ok(pty) => run_actor(actor_id, pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, prebuf).await, + Err(_) => { let _ = exit_tx.send((actor_id, 127)); } + } + }); + + SurfaceHandle { id, workspace_id, tx } +} + +/// The surface actor's main loop: owns the PTY, fans output out to subscribers +/// through the batching `flush`, services input/resize/attach, and reports exit. +#[allow(clippy::too_many_arguments)] +async fn run_actor( + id: SurfaceId, + mut pty: PtyHandle, + cols: u16, + rows: u16, + hooks_active: bool, + bcast: broadcast::Sender>, + mut rx: mpsc::Receiver, + state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, + exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, + prebuffered_input: Vec, +) { + let actor_id = id.clone(); + let detect_id = id; + if !prebuffered_input.is_empty() { + let _ = pty.write_input(&prebuffered_input); + } + + { let mut grid = GridSurface::new(cols, rows); let mut pending: Vec = Vec::with_capacity(FLUSH_BYTES); let mut flush_deadline: Option = None; @@ -137,9 +233,7 @@ pub fn spawn_surface( } let code = pty.wait(); let _ = exit_tx.send((actor_id, code)); - }); - - SurfaceHandle { id, workspace_id, tx } + } } /// Feed pending bytes into the grid, run detectors, broadcast output, and emit a @@ -280,6 +374,63 @@ mod tests { assert!(got.contains("RESPAWN"), "got: {got:?}"); } + fn stty_spec() -> SpawnSpec { + // `stty size` prints " " — lets a test assert the geometry + // the child actually started with. + SpawnSpec { + command: "/bin/sh".into(), + args: vec!["-c".into(), "stty size; sleep 0.3".into()], + cwd: std::env::temp_dir(), + cols: 80, + rows: 24, + env: vec![], + } + } + + async fn collect_until(sub: &mut broadcast::Receiver>, needle: &str, ms: u64) -> String { + let mut got = String::new(); + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(ms); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(b)) = tokio::time::timeout(tokio::time::Duration::from_millis(100), sub.recv()).await { + got.push_str(&String::from_utf8_lossy(&b)); + if got.contains(needle) { break; } + } + } + got + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn deferred_spawn_uses_resize_geometry() { + let _serial = crate::test_support::serial(); + let (state_tx, _s) = mpsc::unbounded_channel(); + let (exit_tx, _e) = mpsc::unbounded_channel(); + let handle = spawn_surface_deferred(SurfaceId("s_d".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx); + + let (rtx, rrx) = oneshot::channel(); + handle.tx.send(SurfaceMsg::Attach { reply: rtx }).await.unwrap(); + let mut sub = rrx.await.unwrap(); + // Resize before the fallback fires -> child must start at 100x40. + handle.tx.send(SurfaceMsg::Resize { cols: 100, rows: 40 }).await.unwrap(); + + let got = collect_until(&mut sub, "40 100", 2000).await; + assert!(got.contains("40 100"), "expected '40 100' (rows cols), got: {got:?}"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn deferred_spawn_falls_back_to_default_geometry() { + let _serial = crate::test_support::serial(); + let (state_tx, _s) = mpsc::unbounded_channel(); + let (exit_tx, _e) = mpsc::unbounded_channel(); + let handle = spawn_surface_deferred(SurfaceId("s_f".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx); + + let (rtx, rrx) = oneshot::channel(); + handle.tx.send(SurfaceMsg::Attach { reply: rtx }).await.unwrap(); + let mut sub = rrx.await.unwrap(); + // No resize: after SPAWN_FALLBACK the child starts at the default 80x24. + let got = collect_until(&mut sub, "24 80", 3000).await; + assert!(got.contains("24 80"), "expected '24 80' (rows cols), got: {got:?}"); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn osc133_output_drives_state_detection() { let _serial = crate::test_support::serial();