feat(daemon): surface actor owning pty + broadcast fan-out (M0, no grid)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
mod lifecycle;
|
mod lifecycle;
|
||||||
|
mod surface;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
println!("spaceshd skeleton");
|
println!("spaceshd skeleton");
|
||||||
|
|||||||
@@ -0,0 +1,110 @@
|
|||||||
|
use spacesh_proto::{SurfaceId, WorkspaceId};
|
||||||
|
use spacesh_pty::PtyHandle;
|
||||||
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||||
|
|
||||||
|
/// Output broadcast capacity (chunks). Lagging subscribers drop intermediate frames.
|
||||||
|
const BROADCAST_CAP: usize = 1024;
|
||||||
|
|
||||||
|
/// Messages sent to a surface actor.
|
||||||
|
pub enum SurfaceMsg {
|
||||||
|
Input(Vec<u8>),
|
||||||
|
Resize { cols: u16, rows: u16 },
|
||||||
|
/// Subscribe to the output stream. Reply carries a fresh receiver.
|
||||||
|
Attach { reply: oneshot::Sender<broadcast::Receiver<Vec<u8>>> },
|
||||||
|
Close,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle the daemon keeps for a live surface.
|
||||||
|
pub struct SurfaceHandle {
|
||||||
|
pub id: SurfaceId,
|
||||||
|
pub workspace_id: WorkspaceId,
|
||||||
|
pub tx: mpsc::Sender<SurfaceMsg>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn the actor; returns its handle. `exit_tx` is fired once with the exit code.
|
||||||
|
pub fn spawn_surface(
|
||||||
|
id: SurfaceId,
|
||||||
|
workspace_id: WorkspaceId,
|
||||||
|
mut pty: PtyHandle,
|
||||||
|
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
|
||||||
|
) -> SurfaceHandle {
|
||||||
|
let (tx, mut rx) = mpsc::channel::<SurfaceMsg>(64);
|
||||||
|
let (bcast, _) = broadcast::channel::<Vec<u8>>(BROADCAST_CAP);
|
||||||
|
let actor_id = id.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
msg = rx.recv() => {
|
||||||
|
match msg {
|
||||||
|
Some(SurfaceMsg::Input(bytes)) => { let _ = pty.write_input(&bytes); }
|
||||||
|
Some(SurfaceMsg::Resize { cols, rows }) => { let _ = pty.resize(cols, rows); }
|
||||||
|
Some(SurfaceMsg::Attach { reply }) => { let _ = reply.send(bcast.subscribe()); }
|
||||||
|
Some(SurfaceMsg::Close) | None => { pty.kill(); break; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
chunk = pty.output.recv() => {
|
||||||
|
match chunk {
|
||||||
|
Some(bytes) => { let _ = bcast.send(bytes); }
|
||||||
|
None => { break; } // PTY EOF
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let code = pty.wait();
|
||||||
|
let _ = exit_tx.send((actor_id, code));
|
||||||
|
});
|
||||||
|
|
||||||
|
SurfaceHandle { id, workspace_id, tx }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use spacesh_pty::SpawnSpec;
|
||||||
|
|
||||||
|
fn spec(script: &str) -> SpawnSpec {
|
||||||
|
SpawnSpec {
|
||||||
|
command: "/bin/sh".into(),
|
||||||
|
args: vec!["-c".into(), script.into()],
|
||||||
|
cwd: std::env::temp_dir(),
|
||||||
|
cols: 80,
|
||||||
|
rows: 24,
|
||||||
|
env: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn attach_receives_output() {
|
||||||
|
let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap();
|
||||||
|
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
|
||||||
|
let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, exit_tx);
|
||||||
|
|
||||||
|
let (reply_tx, reply_rx) = oneshot::channel();
|
||||||
|
handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap();
|
||||||
|
let mut sub = reply_rx.await.unwrap();
|
||||||
|
|
||||||
|
let mut collected = Vec::new();
|
||||||
|
// Collect for a short bounded window.
|
||||||
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(500);
|
||||||
|
while tokio::time::Instant::now() < deadline {
|
||||||
|
match tokio::time::timeout(tokio::time::Duration::from_millis(100), sub.recv()).await {
|
||||||
|
Ok(Ok(bytes)) => collected.extend_from_slice(&bytes),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
if String::from_utf8_lossy(&collected).contains("HELLO") { break; }
|
||||||
|
}
|
||||||
|
assert!(String::from_utf8_lossy(&collected).contains("HELLO"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn exit_is_reported() {
|
||||||
|
let pty = PtyHandle::spawn(spec("exit 7")).unwrap();
|
||||||
|
let (exit_tx, mut exit_rx) = mpsc::unbounded_channel();
|
||||||
|
let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, exit_tx);
|
||||||
|
let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv())
|
||||||
|
.await.unwrap().unwrap();
|
||||||
|
assert_eq!(sid, SurfaceId("s_2".into()));
|
||||||
|
assert_eq!(code, 7);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user