feat(daemon): socket server with router task, command dispatch, event fan-out (M0)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
mod lifecycle;
|
mod lifecycle;
|
||||||
mod registry;
|
mod registry;
|
||||||
|
mod server;
|
||||||
mod surface;
|
mod surface;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
|||||||
@@ -0,0 +1,389 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::Path;
|
||||||
|
use anyhow::Result;
|
||||||
|
use base64::Engine;
|
||||||
|
use spacesh_proto::codec::{read_frame, write_frame};
|
||||||
|
use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId};
|
||||||
|
use spacesh_pty::{PtyHandle, SpawnSpec};
|
||||||
|
use tokio::net::{UnixListener, UnixStream};
|
||||||
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
use crate::registry::Registry;
|
||||||
|
use crate::surface::{spawn_surface, SurfaceMsg};
|
||||||
|
|
||||||
|
/// Per-client outbound channel: the router pushes envelopes the client task writes out.
|
||||||
|
type ClientTx = mpsc::Sender<Envelope>;
|
||||||
|
|
||||||
|
/// Messages into the single router task.
|
||||||
|
enum ServerMsg {
|
||||||
|
/// A request from a client; reply routed to that client's `out`.
|
||||||
|
Request { id: u64, cmd: Cmd, client: ClientId, out: ClientTx },
|
||||||
|
/// Forward an output chunk to all subscribers of `surface_id`.
|
||||||
|
Output { surface_id: SurfaceId, bytes: Vec<u8> },
|
||||||
|
/// A surface process exited.
|
||||||
|
Exit { surface_id: SurfaceId, code: i32 },
|
||||||
|
/// Register a new client's event sink.
|
||||||
|
ClientConnected { client: ClientId, out: ClientTx },
|
||||||
|
/// Drop a client and all its subscriptions.
|
||||||
|
ClientDisconnected { client: ClientId },
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClientId = u64;
|
||||||
|
|
||||||
|
pub async fn serve(socket: &Path) -> Result<()> {
|
||||||
|
let listener = UnixListener::bind(socket)?;
|
||||||
|
let (router_tx, router_rx) = mpsc::channel::<ServerMsg>(256);
|
||||||
|
|
||||||
|
// Exit events from surfaces are funneled into the router.
|
||||||
|
let (exit_tx, mut exit_rx) = mpsc::unbounded_channel::<(SurfaceId, i32)>();
|
||||||
|
let router_for_exit = router_tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some((sid, code)) = exit_rx.recv().await {
|
||||||
|
let _ = router_for_exit.send(ServerMsg::Exit { surface_id: sid, code }).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx));
|
||||||
|
|
||||||
|
let mut next_client: ClientId = 0;
|
||||||
|
loop {
|
||||||
|
let (stream, _addr) = listener.accept().await?;
|
||||||
|
let client_id = next_client;
|
||||||
|
next_client += 1;
|
||||||
|
let router_tx = router_tx.clone();
|
||||||
|
tokio::spawn(handle_client(stream, client_id, router_tx));
|
||||||
|
if shutdown.is_finished() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_client(stream: UnixStream, client_id: ClientId, router_tx: mpsc::Sender<ServerMsg>) {
|
||||||
|
let (mut read_half, mut write_half) = stream.into_split();
|
||||||
|
let (out_tx, mut out_rx) = mpsc::channel::<Envelope>(256);
|
||||||
|
|
||||||
|
let _ = router_tx
|
||||||
|
.send(ServerMsg::ClientConnected { client: client_id, out: out_tx.clone() })
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Writer task: drain outbound envelopes to the socket.
|
||||||
|
let writer = tokio::spawn(async move {
|
||||||
|
while let Some(env) = out_rx.recv().await {
|
||||||
|
if write_frame(&mut write_half, &env).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Reader loop: parse frames and forward requests to the router.
|
||||||
|
loop {
|
||||||
|
match read_frame(&mut read_half).await {
|
||||||
|
Ok(Some(Envelope::Req { id, cmd })) => {
|
||||||
|
let _ = router_tx
|
||||||
|
.send(ServerMsg::Request { id, cmd, client: client_id, out: out_tx.clone() })
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Ok(Some(_)) => { /* clients don't send res/evt; ignore */ }
|
||||||
|
Ok(None) => break, // EOF
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = router_tx.send(ServerMsg::ClientDisconnected { client: client_id }).await;
|
||||||
|
writer.abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn router(
|
||||||
|
mut rx: mpsc::Receiver<ServerMsg>,
|
||||||
|
router_tx: mpsc::Sender<ServerMsg>,
|
||||||
|
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
|
||||||
|
) {
|
||||||
|
let mut reg = Registry::new();
|
||||||
|
let mut clients: HashMap<ClientId, ClientTx> = HashMap::new();
|
||||||
|
// surface_id → set of client ids subscribed (attached).
|
||||||
|
let mut subs: HashMap<SurfaceId, Vec<ClientId>> = HashMap::new();
|
||||||
|
|
||||||
|
while let Some(msg) = rx.recv().await {
|
||||||
|
match msg {
|
||||||
|
ServerMsg::ClientConnected { client, out } => {
|
||||||
|
clients.insert(client, out);
|
||||||
|
}
|
||||||
|
ServerMsg::ClientDisconnected { client } => {
|
||||||
|
clients.remove(&client);
|
||||||
|
for list in subs.values_mut() {
|
||||||
|
list.retain(|c| *c != client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ServerMsg::Output { surface_id, bytes } => {
|
||||||
|
if let Some(list) = subs.get(&surface_id) {
|
||||||
|
let evt = Envelope::Evt(Evt::Output { surface_id: surface_id.clone(), bytes });
|
||||||
|
for c in list {
|
||||||
|
if let Some(out) = clients.get(c) {
|
||||||
|
let _ = out.try_send(evt.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ServerMsg::Exit { surface_id, code } => {
|
||||||
|
let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code });
|
||||||
|
broadcast_evt(&clients, &evt);
|
||||||
|
}
|
||||||
|
ServerMsg::Request { id, cmd, client, out } => {
|
||||||
|
handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn broadcast_evt(clients: &HashMap<ClientId, ClientTx>, evt: &Envelope) {
|
||||||
|
for out in clients.values() {
|
||||||
|
let _ = out.try_send(evt.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ok(id: u64, data: serde_json::Value) -> Envelope {
|
||||||
|
Envelope::Res { id, ok: true, data, error: None }
|
||||||
|
}
|
||||||
|
fn err(id: u64, code: &str, msg: &str) -> Envelope {
|
||||||
|
Envelope::Res { id, ok: false, data: serde_json::Value::Null,
|
||||||
|
error: Some(ErrorBody { code: code.into(), msg: msg.into() }) }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
async fn handle_request(
|
||||||
|
id: u64,
|
||||||
|
cmd: Cmd,
|
||||||
|
client: ClientId,
|
||||||
|
out: ClientTx,
|
||||||
|
reg: &mut Registry,
|
||||||
|
subs: &mut HashMap<SurfaceId, Vec<ClientId>>,
|
||||||
|
clients: &HashMap<ClientId, ClientTx>,
|
||||||
|
router_tx: &mpsc::Sender<ServerMsg>,
|
||||||
|
exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>,
|
||||||
|
) {
|
||||||
|
match cmd {
|
||||||
|
Cmd::Open { path } => {
|
||||||
|
let meta = reg.open_workspace(path.into());
|
||||||
|
let _ = out.send(ok(id, serde_json::json!({ "workspace_id": meta.id.0 }))).await;
|
||||||
|
}
|
||||||
|
Cmd::NewSurface { workspace_id, command, args, cols, rows } => {
|
||||||
|
let Some(ws) = reg.workspace(&workspace_id).cloned() else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await;
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let sid = reg.new_surface_id();
|
||||||
|
let shell = command.unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into()));
|
||||||
|
let spec = SpawnSpec {
|
||||||
|
command: shell,
|
||||||
|
args,
|
||||||
|
cwd: ws.path.clone(),
|
||||||
|
cols,
|
||||||
|
rows,
|
||||||
|
env: vec![("SPACESH_SURFACE_ID".into(), sid.0.clone())],
|
||||||
|
};
|
||||||
|
match PtyHandle::spawn(spec) {
|
||||||
|
Ok(pty) => {
|
||||||
|
let handle = spawn_surface(sid.clone(), workspace_id.clone(), pty, exit_tx.clone());
|
||||||
|
// Bridge the surface's broadcast into the router as Output messages.
|
||||||
|
spawn_output_bridge(sid.clone(), &handle, router_tx.clone());
|
||||||
|
reg.insert_surface(handle);
|
||||||
|
let created = Envelope::Evt(Evt::SurfaceCreated {
|
||||||
|
surface_id: sid.clone(), workspace_id: workspace_id.clone(),
|
||||||
|
});
|
||||||
|
broadcast_evt(clients, &created);
|
||||||
|
let _ = out.send(ok(id, serde_json::json!({ "surface_id": sid.0 }))).await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cmd::Input { surface_id, bytes } => {
|
||||||
|
let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&bytes) else {
|
||||||
|
let _ = out.send(err(id, "BAD_REQUEST", "invalid base64")).await;
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
if let Some(s) = reg.surface(&surface_id) {
|
||||||
|
let _ = s.tx.send(SurfaceMsg::Input(decoded)).await;
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
} else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cmd::Resize { surface_id, cols, rows } => {
|
||||||
|
if let Some(s) = reg.surface(&surface_id) {
|
||||||
|
let _ = s.tx.send(SurfaceMsg::Resize { cols, rows }).await;
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
} else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cmd::Attach { surface_id } => {
|
||||||
|
// M0 attach: register subscription, no snapshot yet (snapshot added in Task 13).
|
||||||
|
if reg.surface(&surface_id).is_some() {
|
||||||
|
subs.entry(surface_id.clone()).or_default().push(client);
|
||||||
|
let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0 }))).await;
|
||||||
|
} else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cmd::Detach { surface_id } => {
|
||||||
|
if let Some(list) = subs.get_mut(&surface_id) {
|
||||||
|
list.retain(|c| *c != client);
|
||||||
|
}
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
}
|
||||||
|
Cmd::Focus { surface_id: _ } => {
|
||||||
|
// Focus is a no-op in this slice (window raise is GUI-side; CLI parity later).
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
}
|
||||||
|
Cmd::Close { surface_id } => {
|
||||||
|
if let Some(handle) = reg.remove_surface(&surface_id) {
|
||||||
|
let _ = handle.tx.send(SurfaceMsg::Close).await;
|
||||||
|
subs.remove(&surface_id);
|
||||||
|
let closed = Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() });
|
||||||
|
broadcast_evt(clients, &closed);
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
} else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cmd::Status => {
|
||||||
|
let workspaces: Vec<_> = reg.status().into_iter().map(|(w, sids)| {
|
||||||
|
serde_json::json!({
|
||||||
|
"workspace_id": w.id.0,
|
||||||
|
"path": w.path.to_string_lossy(),
|
||||||
|
"surfaces": sids.iter().map(|s| s.0.clone()).collect::<Vec<_>>(),
|
||||||
|
})
|
||||||
|
}).collect();
|
||||||
|
let _ = out.send(ok(id, serde_json::json!({ "workspaces": workspaces }))).await;
|
||||||
|
}
|
||||||
|
Cmd::Shutdown => {
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
std::process::exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pump a surface's broadcast output into the router as `ServerMsg::Output`.
|
||||||
|
fn spawn_output_bridge(
|
||||||
|
surface_id: SurfaceId,
|
||||||
|
handle: &crate::surface::SurfaceHandle,
|
||||||
|
router_tx: mpsc::Sender<ServerMsg>,
|
||||||
|
) {
|
||||||
|
let tx = handle.tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Ask the actor for a subscription receiver.
|
||||||
|
let (reply_tx, reply_rx) = oneshot::channel();
|
||||||
|
if tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let Ok(mut sub) = reply_rx.await else { return };
|
||||||
|
loop {
|
||||||
|
match sub.recv().await {
|
||||||
|
Ok(bytes) => {
|
||||||
|
if router_tx.send(ServerMsg::Output { surface_id: surface_id.clone(), bytes }).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||||
|
Err(_) => break, // surface closed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use base64::Engine;
|
||||||
|
|
||||||
|
async fn req(stream: &mut UnixStream, id: u64, cmd: Cmd) -> Envelope {
|
||||||
|
write_frame(stream, &Envelope::Req { id, cmd }).await.unwrap();
|
||||||
|
// Read until we see the matching res (skip interleaved evts).
|
||||||
|
loop {
|
||||||
|
let env = read_frame(stream).await.unwrap().unwrap();
|
||||||
|
if let Envelope::Res { id: rid, .. } = &env {
|
||||||
|
if *rid == id { return env; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn open_new_surface_attach_streams_output() {
|
||||||
|
let dir = tempdir_path();
|
||||||
|
let sock = dir.join("sock");
|
||||||
|
let sock_for_task = sock.clone();
|
||||||
|
tokio::spawn(async move { let _ = serve(&sock_for_task).await; });
|
||||||
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
|
||||||
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
||||||
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
||||||
|
|
||||||
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
||||||
|
workspace_id: spacesh_proto::WorkspaceId(ws),
|
||||||
|
command: Some("/bin/sh".into()),
|
||||||
|
args: vec!["-c".into(), "printf STREAM_OK; sleep 0.5".into()],
|
||||||
|
cols: 80, rows: 24,
|
||||||
|
}).await;
|
||||||
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
||||||
|
let surface_id = spacesh_proto::SurfaceId(sid);
|
||||||
|
|
||||||
|
let _ = req(&mut s, 3, Cmd::Attach { surface_id: surface_id.clone() }).await;
|
||||||
|
|
||||||
|
// Now read frames looking for an Output evt containing STREAM_OK.
|
||||||
|
let mut got = String::new();
|
||||||
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2);
|
||||||
|
while tokio::time::Instant::now() < deadline {
|
||||||
|
if let Ok(Ok(Some(Envelope::Evt(Evt::Output { bytes, .. })))) =
|
||||||
|
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut s)).await {
|
||||||
|
got.push_str(&String::from_utf8_lossy(&bytes));
|
||||||
|
if got.contains("STREAM_OK") { break; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!(got.contains("STREAM_OK"), "got: {got:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn unknown_surface_returns_not_found() {
|
||||||
|
let dir = tempdir_path();
|
||||||
|
let sock = dir.join("sock");
|
||||||
|
let sock_for_task = sock.clone();
|
||||||
|
tokio::spawn(async move { let _ = serve(&sock_for_task).await; });
|
||||||
|
wait_for_socket(&sock).await;
|
||||||
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
let r = req(&mut s, 1, Cmd::Input {
|
||||||
|
surface_id: spacesh_proto::SurfaceId("s_nope".into()),
|
||||||
|
bytes: base64::engine::general_purpose::STANDARD.encode(b"x"),
|
||||||
|
}).await;
|
||||||
|
match r {
|
||||||
|
Envelope::Res { ok, error, .. } => {
|
||||||
|
assert!(!ok);
|
||||||
|
assert_eq!(error.unwrap().code, "NOT_FOUND");
|
||||||
|
}
|
||||||
|
_ => panic!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn res_data(env: &Envelope) -> &serde_json::Value {
|
||||||
|
match env { Envelope::Res { data, .. } => data, _ => panic!("not a res") }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tempdir_path() -> std::path::PathBuf {
|
||||||
|
let mut p = std::env::temp_dir();
|
||||||
|
let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos();
|
||||||
|
p.push(format!("spaceshd-test-{n}"));
|
||||||
|
std::fs::create_dir_all(&p).unwrap();
|
||||||
|
p
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_socket(sock: &Path) {
|
||||||
|
for _ in 0..100 {
|
||||||
|
if UnixStream::connect(sock).await.is_ok() { return; }
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
|
||||||
|
}
|
||||||
|
panic!("socket never came up");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user