Files
spaceshell/crates/spaceshd/src/surface.rs
T
2026-06-09 21:39:43 +07:00

232 lines
9.8 KiB
Rust

use spacesh_core::{snapshot::snapshot_ansi, GridSurface};
use spacesh_core::snapshot::Snapshot;
use spacesh_proto::{SurfaceId, WorkspaceId};
use spacesh_proto::workspace::SurfaceSpec;
use spacesh_pty::{PtyHandle, SpawnSpec};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::time::{Duration, Instant};
/// Spawn (or restart) a surface actor from a persisted spec. Injects
/// SPACESH_SURFACE_ID into the child env, mirroring `new_surface`.
pub fn spawn_from_spec(
id: SurfaceId,
workspace_id: WorkspaceId,
spec: &SurfaceSpec,
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
) -> std::io::Result<SurfaceHandle> {
let pty = PtyHandle::spawn(SpawnSpec {
command: spec.command.clone(),
args: spec.args.clone(),
cwd: std::path::PathBuf::from(&spec.cwd),
cols: spec.cols,
rows: spec.rows,
env: vec![("SPACESH_SURFACE_ID".into(), id.0.clone())],
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
Ok(spawn_surface(id, workspace_id, pty, spec.cols, spec.rows, exit_tx))
}
const BROADCAST_CAP: usize = 1024;
const FLUSH_INTERVAL: Duration = Duration::from_millis(6);
const FLUSH_BYTES: usize = 16 * 1024;
pub enum SurfaceMsg {
Input(Vec<u8>),
Resize { cols: u16, rows: u16 },
Attach { reply: oneshot::Sender<broadcast::Receiver<Vec<u8>>> },
/// Attach with snapshot: subscribe AND capture the grid in one actor turn.
AttachSnapshot { reply: oneshot::Sender<(Snapshot, broadcast::Receiver<Vec<u8>>)> },
Close,
}
pub struct SurfaceHandle {
pub id: SurfaceId,
#[allow(dead_code)]
pub workspace_id: WorkspaceId,
pub tx: mpsc::Sender<SurfaceMsg>,
}
pub fn spawn_surface(
id: SurfaceId,
workspace_id: WorkspaceId,
mut pty: PtyHandle,
cols: u16,
rows: u16,
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
) -> SurfaceHandle {
let (tx, mut rx) = mpsc::channel::<SurfaceMsg>(64);
let (bcast, _) = broadcast::channel::<Vec<u8>>(BROADCAST_CAP);
let actor_id = id.clone();
tokio::spawn(async move {
let mut grid = GridSurface::new(cols, rows);
let mut pending: Vec<u8> = Vec::with_capacity(FLUSH_BYTES);
let mut flush_deadline: Option<Instant> = None;
// Helper closure can't borrow across awaits cleanly; inline the flush logic.
loop {
// Copy the deadline into an owned local so the timer future doesn't
// hold a borrow of `flush_deadline` across the select! (other arms mutate it).
let next_flush = flush_deadline;
let timer = async move {
match next_flush {
Some(d) => tokio::time::sleep_until(d).await,
None => std::future::pending::<()>().await,
}
};
tokio::select! {
msg = rx.recv() => {
match msg {
Some(SurfaceMsg::Input(bytes)) => { let _ = pty.write_input(&bytes); }
Some(SurfaceMsg::Resize { cols, rows }) => {
grid.resize(cols, rows);
let _ = pty.resize(cols, rows);
}
Some(SurfaceMsg::Attach { reply }) => { let _ = reply.send(bcast.subscribe()); }
Some(SurfaceMsg::AttachSnapshot { reply }) => {
// Subscribe-then-snapshot is atomic within this actor turn (no await,
// no flush can interleave). Any not-yet-flushed `pending` stays in
// `pending` and is delivered to ALL subscribers — including this new
// one — exactly once by the normal 6ms/16KiB flush path. It is NOT in
// this snapshot. Broadcasting here would double-render on reattach.
let sub = bcast.subscribe();
let snap = snapshot_ansi(&grid);
let _ = reply.send((snap, sub));
}
Some(SurfaceMsg::Close) | None => { pty.kill(); break; }
}
}
chunk = pty.output.recv() => {
match chunk {
Some(bytes) => {
pending.extend_from_slice(&bytes);
if flush_deadline.is_none() {
flush_deadline = Some(Instant::now() + FLUSH_INTERVAL);
}
if pending.len() >= FLUSH_BYTES {
grid.feed(&pending);
let _ = bcast.send(std::mem::take(&mut pending));
flush_deadline = None;
}
}
None => {
// Final flush on EOF.
if !pending.is_empty() {
grid.feed(&pending);
let _ = bcast.send(std::mem::take(&mut pending));
}
break;
}
}
}
_ = timer => {
if !pending.is_empty() {
grid.feed(&pending);
let _ = bcast.send(std::mem::take(&mut pending));
}
flush_deadline = None;
}
}
}
let code = pty.wait();
let _ = exit_tx.send((actor_id, code));
});
SurfaceHandle { id, workspace_id, tx }
}
#[cfg(test)]
mod tests {
use super::*;
use spacesh_pty::SpawnSpec;
fn spec(script: &str) -> SpawnSpec {
SpawnSpec {
command: "/bin/sh".into(),
args: vec!["-c".into(), script.into()],
cwd: std::env::temp_dir(),
cols: 80,
rows: 24,
env: vec![],
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn attach_receives_output() {
let _serial = crate::test_support::serial();
let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap();
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx);
let (reply_tx, reply_rx) = oneshot::channel();
handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap();
let mut sub = reply_rx.await.unwrap();
let mut collected = Vec::new();
// Collect for a short bounded window.
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(500);
while tokio::time::Instant::now() < deadline {
match tokio::time::timeout(tokio::time::Duration::from_millis(100), sub.recv()).await {
Ok(Ok(bytes)) => collected.extend_from_slice(&bytes),
_ => {}
}
if String::from_utf8_lossy(&collected).contains("HELLO") { break; }
}
assert!(String::from_utf8_lossy(&collected).contains("HELLO"));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exit_is_reported() {
let _serial = crate::test_support::serial();
let pty = PtyHandle::spawn(spec("exit 7")).unwrap();
let (exit_tx, mut exit_rx) = mpsc::unbounded_channel();
let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx);
let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv())
.await.unwrap().unwrap();
assert_eq!(sid, SurfaceId("s_2".into()));
assert_eq!(code, 7);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn attach_snapshot_reflects_prior_output() {
let _serial = crate::test_support::serial();
let pty = PtyHandle::spawn(spec("printf SNAPME; sleep 0.5")).unwrap();
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx);
// Give the child time to write and the actor time to flush into the grid.
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let (reply_tx, reply_rx) = oneshot::channel();
handle.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.unwrap();
let (snap, _sub) = reply_rx.await.unwrap();
assert!(snap.ansi.contains("SNAPME"), "snapshot: {:?}", snap.ansi);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_from_spec_runs_the_command() {
let _serial = crate::test_support::serial();
let spec = SurfaceSpec {
command: "/bin/sh".into(),
args: vec!["-c".into(), "printf RESPAWN; sleep 0.3".into()],
cwd: std::env::temp_dir().to_string_lossy().into(),
agent_label: None, cols: 80, rows: 24, autostart: false,
};
let (exit_tx, _rx) = mpsc::unbounded_channel();
let handle = spawn_from_spec(SurfaceId("s_r".into()), WorkspaceId("w_1".into()), &spec, exit_tx).unwrap();
let (reply_tx, reply_rx) = oneshot::channel();
handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap();
let mut sub = reply_rx.await.unwrap();
let mut got = String::new();
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2);
while tokio::time::Instant::now() < deadline {
if let Ok(Ok(b)) = tokio::time::timeout(tokio::time::Duration::from_millis(100), sub.recv()).await {
got.push_str(&String::from_utf8_lossy(&b));
if got.contains("RESPAWN") { break; }
}
}
assert!(got.contains("RESPAWN"), "got: {got:?}");
}
}