diff --git a/crates/spaceshd/src/main.rs b/crates/spaceshd/src/main.rs index 2715dc4..4950730 100644 --- a/crates/spaceshd/src/main.rs +++ b/crates/spaceshd/src/main.rs @@ -1,5 +1,6 @@ mod lifecycle; mod registry; +mod server; mod surface; fn main() { diff --git a/crates/spaceshd/src/server.rs b/crates/spaceshd/src/server.rs new file mode 100644 index 0000000..dde3eaf --- /dev/null +++ b/crates/spaceshd/src/server.rs @@ -0,0 +1,389 @@ +use std::collections::HashMap; +use std::path::Path; +use anyhow::Result; +use base64::Engine; +use spacesh_proto::codec::{read_frame, write_frame}; +use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId}; +use spacesh_pty::{PtyHandle, SpawnSpec}; +use tokio::net::{UnixListener, UnixStream}; +use tokio::sync::{mpsc, oneshot}; +use crate::registry::Registry; +use crate::surface::{spawn_surface, SurfaceMsg}; + +/// Per-client outbound channel: the router pushes envelopes the client task writes out. +type ClientTx = mpsc::Sender; + +/// Messages into the single router task. +enum ServerMsg { + /// A request from a client; reply routed to that client's `out`. + Request { id: u64, cmd: Cmd, client: ClientId, out: ClientTx }, + /// Forward an output chunk to all subscribers of `surface_id`. + Output { surface_id: SurfaceId, bytes: Vec }, + /// A surface process exited. + Exit { surface_id: SurfaceId, code: i32 }, + /// Register a new client's event sink. + ClientConnected { client: ClientId, out: ClientTx }, + /// Drop a client and all its subscriptions. + ClientDisconnected { client: ClientId }, +} + +type ClientId = u64; + +pub async fn serve(socket: &Path) -> Result<()> { + let listener = UnixListener::bind(socket)?; + let (router_tx, router_rx) = mpsc::channel::(256); + + // Exit events from surfaces are funneled into the router. + let (exit_tx, mut exit_rx) = mpsc::unbounded_channel::<(SurfaceId, i32)>(); + let router_for_exit = router_tx.clone(); + tokio::spawn(async move { + while let Some((sid, code)) = exit_rx.recv().await { + let _ = router_for_exit.send(ServerMsg::Exit { surface_id: sid, code }).await; + } + }); + + let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx)); + + let mut next_client: ClientId = 0; + loop { + let (stream, _addr) = listener.accept().await?; + let client_id = next_client; + next_client += 1; + let router_tx = router_tx.clone(); + tokio::spawn(handle_client(stream, client_id, router_tx)); + if shutdown.is_finished() { + break; + } + } + Ok(()) +} + +async fn handle_client(stream: UnixStream, client_id: ClientId, router_tx: mpsc::Sender) { + let (mut read_half, mut write_half) = stream.into_split(); + let (out_tx, mut out_rx) = mpsc::channel::(256); + + let _ = router_tx + .send(ServerMsg::ClientConnected { client: client_id, out: out_tx.clone() }) + .await; + + // Writer task: drain outbound envelopes to the socket. + let writer = tokio::spawn(async move { + while let Some(env) = out_rx.recv().await { + if write_frame(&mut write_half, &env).await.is_err() { + break; + } + } + }); + + // Reader loop: parse frames and forward requests to the router. + loop { + match read_frame(&mut read_half).await { + Ok(Some(Envelope::Req { id, cmd })) => { + let _ = router_tx + .send(ServerMsg::Request { id, cmd, client: client_id, out: out_tx.clone() }) + .await; + } + Ok(Some(_)) => { /* clients don't send res/evt; ignore */ } + Ok(None) => break, // EOF + Err(_) => break, + } + } + + let _ = router_tx.send(ServerMsg::ClientDisconnected { client: client_id }).await; + writer.abort(); +} + +async fn router( + mut rx: mpsc::Receiver, + router_tx: mpsc::Sender, + exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, +) { + let mut reg = Registry::new(); + let mut clients: HashMap = HashMap::new(); + // surface_id → set of client ids subscribed (attached). + let mut subs: HashMap> = HashMap::new(); + + while let Some(msg) = rx.recv().await { + match msg { + ServerMsg::ClientConnected { client, out } => { + clients.insert(client, out); + } + ServerMsg::ClientDisconnected { client } => { + clients.remove(&client); + for list in subs.values_mut() { + list.retain(|c| *c != client); + } + } + ServerMsg::Output { surface_id, bytes } => { + if let Some(list) = subs.get(&surface_id) { + let evt = Envelope::Evt(Evt::Output { surface_id: surface_id.clone(), bytes }); + for c in list { + if let Some(out) = clients.get(c) { + let _ = out.try_send(evt.clone()); + } + } + } + } + ServerMsg::Exit { surface_id, code } => { + let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code }); + broadcast_evt(&clients, &evt); + } + ServerMsg::Request { id, cmd, client, out } => { + handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx).await; + } + } + } +} + +fn broadcast_evt(clients: &HashMap, evt: &Envelope) { + for out in clients.values() { + let _ = out.try_send(evt.clone()); + } +} + +fn ok(id: u64, data: serde_json::Value) -> Envelope { + Envelope::Res { id, ok: true, data, error: None } +} +fn err(id: u64, code: &str, msg: &str) -> Envelope { + Envelope::Res { id, ok: false, data: serde_json::Value::Null, + error: Some(ErrorBody { code: code.into(), msg: msg.into() }) } +} + +#[allow(clippy::too_many_arguments)] +async fn handle_request( + id: u64, + cmd: Cmd, + client: ClientId, + out: ClientTx, + reg: &mut Registry, + subs: &mut HashMap>, + clients: &HashMap, + router_tx: &mpsc::Sender, + exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>, +) { + match cmd { + Cmd::Open { path } => { + let meta = reg.open_workspace(path.into()); + let _ = out.send(ok(id, serde_json::json!({ "workspace_id": meta.id.0 }))).await; + } + Cmd::NewSurface { workspace_id, command, args, cols, rows } => { + let Some(ws) = reg.workspace(&workspace_id).cloned() else { + let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; + return; + }; + let sid = reg.new_surface_id(); + let shell = command.unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into())); + let spec = SpawnSpec { + command: shell, + args, + cwd: ws.path.clone(), + cols, + rows, + env: vec![("SPACESH_SURFACE_ID".into(), sid.0.clone())], + }; + match PtyHandle::spawn(spec) { + Ok(pty) => { + let handle = spawn_surface(sid.clone(), workspace_id.clone(), pty, exit_tx.clone()); + // Bridge the surface's broadcast into the router as Output messages. + spawn_output_bridge(sid.clone(), &handle, router_tx.clone()); + reg.insert_surface(handle); + let created = Envelope::Evt(Evt::SurfaceCreated { + surface_id: sid.clone(), workspace_id: workspace_id.clone(), + }); + broadcast_evt(clients, &created); + let _ = out.send(ok(id, serde_json::json!({ "surface_id": sid.0 }))).await; + } + Err(e) => { + let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; + } + } + } + Cmd::Input { surface_id, bytes } => { + let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&bytes) else { + let _ = out.send(err(id, "BAD_REQUEST", "invalid base64")).await; + return; + }; + if let Some(s) = reg.surface(&surface_id) { + let _ = s.tx.send(SurfaceMsg::Input(decoded)).await; + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } else { + let _ = out.send(err(id, "NOT_FOUND", "surface")).await; + } + } + Cmd::Resize { surface_id, cols, rows } => { + if let Some(s) = reg.surface(&surface_id) { + let _ = s.tx.send(SurfaceMsg::Resize { cols, rows }).await; + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } else { + let _ = out.send(err(id, "NOT_FOUND", "surface")).await; + } + } + Cmd::Attach { surface_id } => { + // M0 attach: register subscription, no snapshot yet (snapshot added in Task 13). + if reg.surface(&surface_id).is_some() { + subs.entry(surface_id.clone()).or_default().push(client); + let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0 }))).await; + } else { + let _ = out.send(err(id, "NOT_FOUND", "surface")).await; + } + } + Cmd::Detach { surface_id } => { + if let Some(list) = subs.get_mut(&surface_id) { + list.retain(|c| *c != client); + } + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } + Cmd::Focus { surface_id: _ } => { + // Focus is a no-op in this slice (window raise is GUI-side; CLI parity later). + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } + Cmd::Close { surface_id } => { + if let Some(handle) = reg.remove_surface(&surface_id) { + let _ = handle.tx.send(SurfaceMsg::Close).await; + subs.remove(&surface_id); + let closed = Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() }); + broadcast_evt(clients, &closed); + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } else { + let _ = out.send(err(id, "NOT_FOUND", "surface")).await; + } + } + Cmd::Status => { + let workspaces: Vec<_> = reg.status().into_iter().map(|(w, sids)| { + serde_json::json!({ + "workspace_id": w.id.0, + "path": w.path.to_string_lossy(), + "surfaces": sids.iter().map(|s| s.0.clone()).collect::>(), + }) + }).collect(); + let _ = out.send(ok(id, serde_json::json!({ "workspaces": workspaces }))).await; + } + Cmd::Shutdown => { + let _ = out.send(ok(id, serde_json::Value::Null)).await; + std::process::exit(0); + } + } +} + +/// Pump a surface's broadcast output into the router as `ServerMsg::Output`. +fn spawn_output_bridge( + surface_id: SurfaceId, + handle: &crate::surface::SurfaceHandle, + router_tx: mpsc::Sender, +) { + let tx = handle.tx.clone(); + tokio::spawn(async move { + // Ask the actor for a subscription receiver. + let (reply_tx, reply_rx) = oneshot::channel(); + if tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.is_err() { + return; + } + let Ok(mut sub) = reply_rx.await else { return }; + loop { + match sub.recv().await { + Ok(bytes) => { + if router_tx.send(ServerMsg::Output { surface_id: surface_id.clone(), bytes }).await.is_err() { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(_) => break, // surface closed + } + } + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use base64::Engine; + + async fn req(stream: &mut UnixStream, id: u64, cmd: Cmd) -> Envelope { + write_frame(stream, &Envelope::Req { id, cmd }).await.unwrap(); + // Read until we see the matching res (skip interleaved evts). + loop { + let env = read_frame(stream).await.unwrap().unwrap(); + if let Envelope::Res { id: rid, .. } = &env { + if *rid == id { return env; } + } + } + } + + #[tokio::test] + async fn open_new_surface_attach_streams_output() { + let dir = tempdir_path(); + let sock = dir.join("sock"); + let sock_for_task = sock.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task).await; }); + wait_for_socket(&sock).await; + + let mut s = UnixStream::connect(&sock).await.unwrap(); + + let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + + let r = req(&mut s, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "printf STREAM_OK; sleep 0.5".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + let surface_id = spacesh_proto::SurfaceId(sid); + + let _ = req(&mut s, 3, Cmd::Attach { surface_id: surface_id.clone() }).await; + + // Now read frames looking for an Output evt containing STREAM_OK. + let mut got = String::new(); + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(Envelope::Evt(Evt::Output { bytes, .. })))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut s)).await { + got.push_str(&String::from_utf8_lossy(&bytes)); + if got.contains("STREAM_OK") { break; } + } + } + assert!(got.contains("STREAM_OK"), "got: {got:?}"); + } + + #[tokio::test] + async fn unknown_surface_returns_not_found() { + let dir = tempdir_path(); + let sock = dir.join("sock"); + let sock_for_task = sock.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task).await; }); + wait_for_socket(&sock).await; + let mut s = UnixStream::connect(&sock).await.unwrap(); + let r = req(&mut s, 1, Cmd::Input { + surface_id: spacesh_proto::SurfaceId("s_nope".into()), + bytes: base64::engine::general_purpose::STANDARD.encode(b"x"), + }).await; + match r { + Envelope::Res { ok, error, .. } => { + assert!(!ok); + assert_eq!(error.unwrap().code, "NOT_FOUND"); + } + _ => panic!(), + } + } + + fn res_data(env: &Envelope) -> &serde_json::Value { + match env { Envelope::Res { data, .. } => data, _ => panic!("not a res") } + } + + fn tempdir_path() -> std::path::PathBuf { + let mut p = std::env::temp_dir(); + let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos(); + p.push(format!("spaceshd-test-{n}")); + std::fs::create_dir_all(&p).unwrap(); + p + } + + async fn wait_for_socket(sock: &Path) { + for _ in 0..100 { + if UnixStream::connect(sock).await.is_ok() { return; } + tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + } + panic!("socket never came up"); + } +}