use std::collections::HashMap; use std::path::Path; use anyhow::Result; use base64::Engine; use spacesh_proto::codec::{read_frame, write_frame}; use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId}; use spacesh_pty::{PtyHandle, SpawnSpec}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{mpsc, oneshot}; use crate::registry::Registry; use crate::surface::{spawn_surface, SurfaceMsg}; /// Per-client outbound channel: the router pushes envelopes the client task writes out. type ClientTx = mpsc::Sender; /// Messages into the single router task. enum ServerMsg { /// A request from a client; reply routed to that client's `out`. Request { id: u64, cmd: Cmd, client: ClientId, out: ClientTx }, /// Forward an output chunk to all subscribers of `surface_id`. Output { surface_id: SurfaceId, bytes: Vec }, /// A surface process exited. Exit { surface_id: SurfaceId, code: i32 }, /// Register a new client's event sink. ClientConnected { client: ClientId, out: ClientTx }, /// Drop a client and all its subscriptions. ClientDisconnected { client: ClientId }, } type ClientId = u64; pub async fn serve(socket: &Path) -> Result<()> { let listener = UnixListener::bind(socket)?; let (router_tx, router_rx) = mpsc::channel::(256); // Exit events from surfaces are funneled into the router. let (exit_tx, mut exit_rx) = mpsc::unbounded_channel::<(SurfaceId, i32)>(); let router_for_exit = router_tx.clone(); tokio::spawn(async move { while let Some((sid, code)) = exit_rx.recv().await { let _ = router_for_exit.send(ServerMsg::Exit { surface_id: sid, code }).await; } }); let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx)); let mut next_client: ClientId = 0; loop { let (stream, _addr) = listener.accept().await?; let client_id = next_client; next_client += 1; let router_tx = router_tx.clone(); tokio::spawn(handle_client(stream, client_id, router_tx)); if shutdown.is_finished() { break; } } Ok(()) } async fn handle_client(stream: UnixStream, client_id: ClientId, router_tx: mpsc::Sender) { let (mut read_half, mut write_half) = stream.into_split(); let (out_tx, mut out_rx) = mpsc::channel::(256); let _ = router_tx .send(ServerMsg::ClientConnected { client: client_id, out: out_tx.clone() }) .await; // Writer task: drain outbound envelopes to the socket. let writer = tokio::spawn(async move { while let Some(env) = out_rx.recv().await { if write_frame(&mut write_half, &env).await.is_err() { break; } } }); // Reader loop: parse frames and forward requests to the router. loop { match read_frame(&mut read_half).await { Ok(Some(Envelope::Req { id, cmd })) => { let _ = router_tx .send(ServerMsg::Request { id, cmd, client: client_id, out: out_tx.clone() }) .await; } Ok(Some(_)) => { /* clients don't send res/evt; ignore */ } Ok(None) => break, // EOF Err(_) => break, } } let _ = router_tx.send(ServerMsg::ClientDisconnected { client: client_id }).await; writer.abort(); } async fn router( mut rx: mpsc::Receiver, router_tx: mpsc::Sender, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, ) { let mut reg = Registry::new(); let mut clients: HashMap = HashMap::new(); // surface_id → set of client ids subscribed (attached). let mut subs: HashMap> = HashMap::new(); while let Some(msg) = rx.recv().await { match msg { ServerMsg::ClientConnected { client, out } => { clients.insert(client, out); } ServerMsg::ClientDisconnected { client } => { clients.remove(&client); for list in subs.values_mut() { list.retain(|c| *c != client); } } ServerMsg::Output { surface_id, bytes } => { if let Some(list) = subs.get(&surface_id) { let evt = Envelope::Evt(Evt::Output { surface_id: surface_id.clone(), bytes }); for c in list { if let Some(out) = clients.get(c) { let _ = out.try_send(evt.clone()); } } } } ServerMsg::Exit { surface_id, code } => { let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code }); broadcast_evt(&clients, &evt); } ServerMsg::Request { id, cmd, client, out } => { handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx).await; } } } } fn broadcast_evt(clients: &HashMap, evt: &Envelope) { for out in clients.values() { let _ = out.try_send(evt.clone()); } } fn ok(id: u64, data: serde_json::Value) -> Envelope { Envelope::Res { id, ok: true, data, error: None } } fn err(id: u64, code: &str, msg: &str) -> Envelope { Envelope::Res { id, ok: false, data: serde_json::Value::Null, error: Some(ErrorBody { code: code.into(), msg: msg.into() }) } } #[allow(clippy::too_many_arguments)] async fn handle_request( id: u64, cmd: Cmd, client: ClientId, out: ClientTx, reg: &mut Registry, subs: &mut HashMap>, clients: &HashMap, router_tx: &mpsc::Sender, exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>, ) { match cmd { Cmd::Open { path } => { let meta = reg.open_workspace(path.into()); let _ = out.send(ok(id, serde_json::json!({ "workspace_id": meta.id.0 }))).await; } Cmd::NewSurface { workspace_id, command, args, cols, rows } => { let Some(ws) = reg.workspace(&workspace_id).cloned() else { let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return; }; let sid = reg.new_surface_id(); let shell = command.unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into())); let spec = SpawnSpec { command: shell, args, cwd: ws.path.clone(), cols, rows, env: vec![("SPACESH_SURFACE_ID".into(), sid.0.clone())], }; match PtyHandle::spawn(spec) { Ok(pty) => { let handle = spawn_surface(sid.clone(), workspace_id.clone(), pty, cols, rows, exit_tx.clone()); // Bridge the surface's broadcast into the router as Output messages. spawn_output_bridge(sid.clone(), &handle, router_tx.clone()); reg.insert_surface(handle); let created = Envelope::Evt(Evt::SurfaceCreated { surface_id: sid.clone(), workspace_id: workspace_id.clone(), }); broadcast_evt(clients, &created); let _ = out.send(ok(id, serde_json::json!({ "surface_id": sid.0 }))).await; } Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; } } } Cmd::Input { surface_id, bytes } => { let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&bytes) else { let _ = out.send(err(id, "BAD_REQUEST", "invalid base64")).await; return; }; if let Some(s) = reg.surface(&surface_id) { let _ = s.tx.send(SurfaceMsg::Input(decoded)).await; let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } } Cmd::Resize { surface_id, cols, rows } => { if let Some(s) = reg.surface(&surface_id) { let _ = s.tx.send(SurfaceMsg::Resize { cols, rows }).await; let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } } Cmd::Attach { surface_id } => { if let Some(s) = reg.surface(&surface_id) { let (reply_tx, reply_rx) = oneshot::channel(); if s.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.is_ok() { if let Ok((snap, _sub)) = reply_rx.await { subs.entry(surface_id.clone()).or_default().push(client); let _ = out.send(ok(id, serde_json::json!({ "snapshot": snap.ansi, "cols": snap.cols, "rows": snap.rows, "cursor_row": snap.cursor_row, "cursor_col": snap.cursor_col, }))).await; return; } } let _ = out.send(err(id, "INTERNAL", "attach failed")).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } } Cmd::Detach { surface_id } => { if let Some(list) = subs.get_mut(&surface_id) { list.retain(|c| *c != client); } let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::Focus { surface_id: _ } => { // Focus is a no-op in this slice (window raise is GUI-side; CLI parity later). let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::Close { surface_id } => { if let Some(handle) = reg.remove_surface(&surface_id) { let _ = handle.tx.send(SurfaceMsg::Close).await; subs.remove(&surface_id); let closed = Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() }); broadcast_evt(clients, &closed); let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } } Cmd::Status => { let workspaces: Vec<_> = reg.status().into_iter().map(|(w, sids)| { serde_json::json!({ "workspace_id": w.id.0, "path": w.path.to_string_lossy(), "surfaces": sids.iter().map(|s| s.0.clone()).collect::>(), }) }).collect(); let _ = out.send(ok(id, serde_json::json!({ "workspaces": workspaces }))).await; } Cmd::Shutdown => { let _ = out.send(ok(id, serde_json::Value::Null)).await; std::process::exit(0); } } } /// Pump a surface's broadcast output into the router as `ServerMsg::Output`. fn spawn_output_bridge( surface_id: SurfaceId, handle: &crate::surface::SurfaceHandle, router_tx: mpsc::Sender, ) { let tx = handle.tx.clone(); tokio::spawn(async move { // Ask the actor for a subscription receiver. let (reply_tx, reply_rx) = oneshot::channel(); if tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.is_err() { return; } let Ok(mut sub) = reply_rx.await else { return }; loop { match sub.recv().await { Ok(bytes) => { if router_tx.send(ServerMsg::Output { surface_id: surface_id.clone(), bytes }).await.is_err() { break; } } Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, Err(_) => break, // surface closed } } }); } #[cfg(test)] mod tests { use super::*; use base64::Engine; async fn req(stream: &mut UnixStream, id: u64, cmd: Cmd) -> Envelope { write_frame(stream, &Envelope::Req { id, cmd }).await.unwrap(); // Read until we see the matching res (skip interleaved evts). loop { let env = read_frame(stream).await.unwrap().unwrap(); if let Envelope::Res { id: rid, .. } = &env { if *rid == id { return env; } } } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn open_new_surface_attach_streams_output() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let sock_for_task = sock.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws), command: Some("/bin/sh".into()), args: vec!["-c".into(), "printf STREAM_OK; sleep 0.5".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); let surface_id = spacesh_proto::SurfaceId(sid); let _ = req(&mut s, 3, Cmd::Attach { surface_id: surface_id.clone() }).await; // Now read frames looking for an Output evt containing STREAM_OK. let mut got = String::new(); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(6); while tokio::time::Instant::now() < deadline { if let Ok(Ok(Some(Envelope::Evt(Evt::Output { bytes, .. })))) = tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut s)).await { got.push_str(&String::from_utf8_lossy(&bytes)); if got.contains("STREAM_OK") { break; } } } assert!(got.contains("STREAM_OK"), "got: {got:?}"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unknown_surface_returns_not_found() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let sock_for_task = sock.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Input { surface_id: spacesh_proto::SurfaceId("s_nope".into()), bytes: base64::engine::general_purpose::STANDARD.encode(b"x"), }).await; match r { Envelope::Res { ok, error, .. } => { assert!(!ok); assert_eq!(error.unwrap().code, "NOT_FOUND"); } _ => panic!(), } } fn res_data(env: &Envelope) -> &serde_json::Value { match env { Envelope::Res { data, .. } => data, _ => panic!("not a res") } } fn tempdir_path() -> std::path::PathBuf { let mut p = std::env::temp_dir(); let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos(); p.push(format!("spaceshd-test-{n}")); std::fs::create_dir_all(&p).unwrap(); p } async fn wait_for_socket(sock: &Path) { for _ in 0..300 { if UnixStream::connect(sock).await.is_ok() { return; } tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; } panic!("socket never came up"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn reattach_returns_snapshot_with_prior_output() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let sock_for_task = sock.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task).await; }); wait_for_socket(&sock).await; // First client: open, new surface that prints a marker, attach, then disconnect. let surface_id; { let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws), command: Some("/bin/sh".into()), args: vec!["-c".into(), "printf REPAINT_ME; sleep 2".into()], cols: 80, rows: 24, }).await; surface_id = spacesh_proto::SurfaceId(res_data(&r)["surface_id"].as_str().unwrap().to_string()); // Give the actor time to flush output into the grid. tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; // disconnect by dropping `s` } // Second client: attach to the same surface, expect snapshot to contain the marker. // Re-verify the socket is still up before connecting (handles any scheduling jitter). wait_for_socket(&sock).await; let mut s2 = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s2, 1, Cmd::Attach { surface_id: surface_id.clone() }).await; let snap = res_data(&r)["snapshot"].as_str().unwrap(); assert!(snap.contains("REPAINT_ME"), "snapshot was: {snap:?}"); } }