use spacesh_core::{snapshot::snapshot_ansi, GridSurface}; use spacesh_core::snapshot::Snapshot; use spacesh_core::detect::{FallbackScanner, Osc133Scanner}; use spacesh_proto::{SurfaceId, WorkspaceId}; use spacesh_proto::status::SurfaceState; use spacesh_proto::workspace::SurfaceSpec; use spacesh_pty::{PtyHandle, SpawnSpec}; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{Duration, Instant}; /// Spawn (or restart) a surface actor from a persisted spec. Injects /// SPACESH_SURFACE_ID into the child env, mirroring `new_surface`. /// /// The child process is spawned lazily — see `spawn_surface_deferred`. This /// lets the first `Resize` from an attaching GUI fix the PTY geometry *before* /// the shell prints its first prompt, so prompts (e.g. powerlevel10k instant /// prompt) render at the correct size instead of being drawn at the 80x24 /// default and then reflowed. pub fn spawn_from_spec( id: SurfaceId, workspace_id: WorkspaceId, spec: &SurfaceSpec, extra_env: Vec<(String, String)>, hooks_active: bool, state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, ) -> std::io::Result { let mut env = vec![("SPACESH_SURFACE_ID".to_string(), id.0.clone())]; env.extend(extra_env); let spawn_spec = SpawnSpec { command: spec.command.clone(), args: spec.args.clone(), cwd: std::path::PathBuf::from(&spec.cwd), cols: spec.cols, rows: spec.rows, env, }; Ok(spawn_surface_deferred(id, workspace_id, spawn_spec, spec.cols, spec.rows, hooks_active, state_tx, exit_tx)) } const BROADCAST_CAP: usize = 1024; const FLUSH_INTERVAL: Duration = Duration::from_millis(6); const FLUSH_BYTES: usize = 16 * 1024; /// How long a deferred surface waits for the first `Resize` before spawning the /// child at its default geometry. The GUI's fit-driven resize lands within a /// frame (<16ms); this only bounds the wait for headless/CLI surfaces that /// never attach a GUI. const SPAWN_FALLBACK: Duration = Duration::from_millis(250); pub enum SurfaceMsg { Input(Vec), Resize { cols: u16, rows: u16 }, Attach { reply: oneshot::Sender>> }, /// Attach with snapshot: subscribe AND capture the grid in one actor turn. AttachSnapshot { reply: oneshot::Sender<(Snapshot, broadcast::Receiver>)> }, Close, } pub struct SurfaceHandle { pub id: SurfaceId, #[allow(dead_code)] pub workspace_id: WorkspaceId, pub tx: mpsc::Sender, } /// Eager variant: spawn the actor over an already-spawned PTY. Retained for /// tests and callers that have a live PTY in hand; production goes through /// `spawn_surface_deferred` via `spawn_from_spec`. #[allow(dead_code)] pub fn spawn_surface( id: SurfaceId, workspace_id: WorkspaceId, pty: PtyHandle, cols: u16, rows: u16, hooks_active: bool, state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, ) -> SurfaceHandle { let (tx, rx) = mpsc::channel::(64); let (bcast, _) = broadcast::channel::>(BROADCAST_CAP); tokio::spawn(run_actor(id.clone(), pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, Vec::new())); SurfaceHandle { id, workspace_id, tx } } /// Create a surface whose child process is spawned lazily: on the first /// `Resize` (using its geometry) or after `SPAWN_FALLBACK` (using `def_*`). /// Attaches received before the spawn get an empty-grid snapshot and a live /// subscription; input received before the spawn is buffered and replayed. #[allow(clippy::too_many_arguments)] pub fn spawn_surface_deferred( id: SurfaceId, workspace_id: WorkspaceId, spawn_spec: SpawnSpec, def_cols: u16, def_rows: u16, hooks_active: bool, state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, ) -> SurfaceHandle { let (tx, mut rx) = mpsc::channel::(64); let (bcast, _) = broadcast::channel::>(BROADCAST_CAP); let actor_id = id.clone(); tokio::spawn(async move { let mut cols = def_cols; let mut rows = def_rows; let mut prebuf: Vec = Vec::new(); let fallback = tokio::time::sleep(SPAWN_FALLBACK); tokio::pin!(fallback); // Pre-spawn phase: hold the child until we know the real geometry. let proceed = loop { tokio::select! { _ = &mut fallback => break true, msg = rx.recv() => match msg { Some(SurfaceMsg::Resize { cols: c, rows: r }) => { cols = c; rows = r; break true; } Some(SurfaceMsg::Input(bytes)) => prebuf.extend_from_slice(&bytes), Some(SurfaceMsg::Attach { reply }) => { let _ = reply.send(bcast.subscribe()); } Some(SurfaceMsg::AttachSnapshot { reply }) => { let sub = bcast.subscribe(); let snap = snapshot_ansi(&GridSurface::new(cols, rows)); let _ = reply.send((snap, sub)); } Some(SurfaceMsg::Close) | None => break false, } } }; if !proceed { let _ = exit_tx.send((actor_id, 0)); return; } let mut spec = spawn_spec; spec.cols = cols; spec.rows = rows; match PtyHandle::spawn(spec) { Ok(pty) => run_actor(actor_id, pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, prebuf).await, Err(_) => { let _ = exit_tx.send((actor_id, 127)); } } }); SurfaceHandle { id, workspace_id, tx } } /// The surface actor's main loop: owns the PTY, fans output out to subscribers /// through the batching `flush`, services input/resize/attach, and reports exit. #[allow(clippy::too_many_arguments)] async fn run_actor( id: SurfaceId, mut pty: PtyHandle, cols: u16, rows: u16, hooks_active: bool, bcast: broadcast::Sender>, mut rx: mpsc::Receiver, state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, prebuffered_input: Vec, ) { let actor_id = id.clone(); let detect_id = id; if !prebuffered_input.is_empty() { let _ = pty.write_input(&prebuffered_input); } { let mut grid = GridSurface::new(cols, rows); let mut pending: Vec = Vec::with_capacity(FLUSH_BYTES); let mut flush_deadline: Option = None; let mut osc = Osc133Scanner::new(); // `deterministic` suppresses fallback once a reliable source is seen // (hooks active, or any OSC 133 marker observed). let mut deterministic = hooks_active; let mut last_state = SurfaceState::Idle; loop { // Copy the deadline into an owned local so the timer future doesn't // hold a borrow of `flush_deadline` across the select! (other arms mutate it). let next_flush = flush_deadline; let timer = async move { match next_flush { Some(d) => tokio::time::sleep_until(d).await, None => std::future::pending::<()>().await, } }; tokio::select! { msg = rx.recv() => { match msg { Some(SurfaceMsg::Input(bytes)) => { let _ = pty.write_input(&bytes); } Some(SurfaceMsg::Resize { cols, rows }) => { grid.resize(cols, rows); let _ = pty.resize(cols, rows); } Some(SurfaceMsg::Attach { reply }) => { let _ = reply.send(bcast.subscribe()); } Some(SurfaceMsg::AttachSnapshot { reply }) => { // Subscribe-then-snapshot is atomic within this actor turn (no await, // no flush can interleave). Any not-yet-flushed `pending` stays in // `pending` and is delivered to ALL subscribers — including this new // one — exactly once by the normal 6ms/16KiB flush path. It is NOT in // this snapshot. Broadcasting here would double-render on reattach. let sub = bcast.subscribe(); let snap = snapshot_ansi(&grid); let _ = reply.send((snap, sub)); } Some(SurfaceMsg::Close) | None => { pty.kill(); break; } } } chunk = pty.output.recv() => { match chunk { Some(bytes) => { pending.extend_from_slice(&bytes); if flush_deadline.is_none() { flush_deadline = Some(Instant::now() + FLUSH_INTERVAL); } if pending.len() >= FLUSH_BYTES { flush(&mut pending, &mut grid, &mut osc, &mut deterministic, &mut last_state, &detect_id, &bcast, &state_tx); flush_deadline = None; } } None => { flush(&mut pending, &mut grid, &mut osc, &mut deterministic, &mut last_state, &detect_id, &bcast, &state_tx); break; } } } _ = timer => { flush(&mut pending, &mut grid, &mut osc, &mut deterministic, &mut last_state, &detect_id, &bcast, &state_tx); flush_deadline = None; } } } let code = pty.wait(); let _ = exit_tx.send((actor_id, code)); } } /// Feed pending bytes into the grid, run detectors, broadcast output, and emit a /// state change (if any). No-op when pending is empty. #[allow(clippy::too_many_arguments)] fn flush( pending: &mut Vec, grid: &mut GridSurface, osc: &mut Osc133Scanner, deterministic: &mut bool, last_state: &mut SurfaceState, id: &SurfaceId, bcast: &broadcast::Sender>, state_tx: &mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, ) { if pending.is_empty() { return; } // Deterministic source: OSC 133 markers in this chunk. // Emit each distinct state transition immediately so no marker is dropped // when multiple arrive in a single flush (e.g. C + D in the same buffer). let osc_states = osc.feed(&pending[..]); let had_osc = !osc_states.is_empty(); for st in osc_states { *deterministic = true; if st != *last_state { *last_state = st; let _ = state_tx.send((id.clone(), st)); } } grid.feed(&pending[..]); // Best-effort fallback only when no deterministic source is active. if !had_osc && !*deterministic { if let Some(st) = FallbackScanner::scan(&grid.tail_text(6)) { if st != *last_state { *last_state = st; let _ = state_tx.send((id.clone(), st)); } } } let _ = bcast.send(std::mem::take(pending)); } #[cfg(test)] mod tests { use super::*; use spacesh_pty::SpawnSpec; fn spec(script: &str) -> SpawnSpec { SpawnSpec { command: "/bin/sh".into(), args: vec!["-c".into(), script.into()], cwd: std::env::temp_dir(), cols: 80, rows: 24, env: vec![], } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn attach_receives_output() { let _serial = crate::test_support::serial(); let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap(); let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); let (reply_tx, reply_rx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap(); let mut sub = reply_rx.await.unwrap(); let mut collected = Vec::new(); // Collect for a short bounded window. let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(500); while tokio::time::Instant::now() < deadline { match tokio::time::timeout(tokio::time::Duration::from_millis(100), sub.recv()).await { Ok(Ok(bytes)) => collected.extend_from_slice(&bytes), _ => {} } if String::from_utf8_lossy(&collected).contains("HELLO") { break; } } assert!(String::from_utf8_lossy(&collected).contains("HELLO")); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn exit_is_reported() { let _serial = crate::test_support::serial(); let pty = PtyHandle::spawn(spec("exit 7")).unwrap(); let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, mut exit_rx) = mpsc::unbounded_channel(); let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv()) .await.unwrap().unwrap(); assert_eq!(sid, SurfaceId("s_2".into())); assert_eq!(code, 7); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn attach_snapshot_reflects_prior_output() { let _serial = crate::test_support::serial(); let pty = PtyHandle::spawn(spec("printf SNAPME; sleep 0.5")).unwrap(); let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); // Give the child time to write and the actor time to flush into the grid. tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; let (reply_tx, reply_rx) = oneshot::channel(); handle.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.unwrap(); let (snap, _sub) = reply_rx.await.unwrap(); assert!(snap.ansi.contains("SNAPME"), "snapshot: {:?}", snap.ansi); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn spawn_from_spec_runs_the_command() { let _serial = crate::test_support::serial(); let spec = SurfaceSpec { command: "/bin/sh".into(), args: vec!["-c".into(), "printf RESPAWN; sleep 0.3".into()], cwd: std::env::temp_dir().to_string_lossy().into(), agent_label: None, cols: 80, rows: 24, autostart: false, }; let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, _rx) = mpsc::unbounded_channel(); let handle = spawn_from_spec(SurfaceId("s_r".into()), WorkspaceId("w_1".into()), &spec, vec![], false, state_tx, exit_tx).unwrap(); let (reply_tx, reply_rx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap(); let mut sub = reply_rx.await.unwrap(); let mut got = String::new(); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2); while tokio::time::Instant::now() < deadline { if let Ok(Ok(b)) = tokio::time::timeout(tokio::time::Duration::from_millis(100), sub.recv()).await { got.push_str(&String::from_utf8_lossy(&b)); if got.contains("RESPAWN") { break; } } } assert!(got.contains("RESPAWN"), "got: {got:?}"); } fn stty_spec() -> SpawnSpec { // `stty size` prints " " — lets a test assert the geometry // the child actually started with. SpawnSpec { command: "/bin/sh".into(), args: vec!["-c".into(), "stty size; sleep 0.3".into()], cwd: std::env::temp_dir(), cols: 80, rows: 24, env: vec![], } } async fn collect_until(sub: &mut broadcast::Receiver>, needle: &str, ms: u64) -> String { let mut got = String::new(); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(ms); while tokio::time::Instant::now() < deadline { if let Ok(Ok(b)) = tokio::time::timeout(tokio::time::Duration::from_millis(100), sub.recv()).await { got.push_str(&String::from_utf8_lossy(&b)); if got.contains(needle) { break; } } } got } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn deferred_spawn_uses_resize_geometry() { let _serial = crate::test_support::serial(); let (state_tx, _s) = mpsc::unbounded_channel(); let (exit_tx, _e) = mpsc::unbounded_channel(); let handle = spawn_surface_deferred(SurfaceId("s_d".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx); let (rtx, rrx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: rtx }).await.unwrap(); let mut sub = rrx.await.unwrap(); // Resize before the fallback fires -> child must start at 100x40. handle.tx.send(SurfaceMsg::Resize { cols: 100, rows: 40 }).await.unwrap(); let got = collect_until(&mut sub, "40 100", 2000).await; assert!(got.contains("40 100"), "expected '40 100' (rows cols), got: {got:?}"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn deferred_spawn_falls_back_to_default_geometry() { let _serial = crate::test_support::serial(); let (state_tx, _s) = mpsc::unbounded_channel(); let (exit_tx, _e) = mpsc::unbounded_channel(); let handle = spawn_surface_deferred(SurfaceId("s_f".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx); let (rtx, rrx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: rtx }).await.unwrap(); let mut sub = rrx.await.unwrap(); // No resize: after SPAWN_FALLBACK the child starts at the default 80x24. let got = collect_until(&mut sub, "24 80", 3000).await; assert!(got.contains("24 80"), "expected '24 80' (rows cols), got: {got:?}"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn osc133_output_drives_state_detection() { let _serial = crate::test_support::serial(); let pty = PtyHandle::spawn(spec("printf '\\033]133;C\\007'; printf working; printf '\\033]133;D;0\\007'; sleep 0.3")).unwrap(); let (state_tx, mut state_rx) = mpsc::unbounded_channel(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); let _h = spawn_surface(SurfaceId("s_o".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); let mut seen = Vec::new(); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2); while tokio::time::Instant::now() < deadline { if let Ok(Some((_, st))) = tokio::time::timeout(tokio::time::Duration::from_millis(100), state_rx.recv()).await { seen.push(st); if seen.contains(&SurfaceState::Done) { break; } } } assert!(seen.contains(&SurfaceState::Work), "states: {seen:?}"); assert!(seen.contains(&SurfaceState::Done), "states: {seen:?}"); } }