From 482f693835a6ebad66d8934b80324b6a8a113811 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Tue, 9 Jun 2026 19:56:00 +0700 Subject: [PATCH] feat(proto): length-prefixed frame read/write with EOF handling Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/spacesh-proto/src/codec.rs | 81 ++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 1 deletion(-) diff --git a/crates/spacesh-proto/src/codec.rs b/crates/spacesh-proto/src/codec.rs index 0ca532e..548b484 100644 --- a/crates/spacesh-proto/src/codec.rs +++ b/crates/spacesh-proto/src/codec.rs @@ -1 +1,80 @@ -// populated in Task 2 +use crate::message::Envelope; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +/// Maximum frame size we will accept (16 MiB). Guards against a corrupt length prefix. +pub const MAX_FRAME: u32 = 16 * 1024 * 1024; + +#[derive(Debug, thiserror::Error)] +pub enum CodecError { + #[error("io: {0}")] + Io(#[from] std::io::Error), + #[error("json: {0}")] + Json(#[from] serde_json::Error), + #[error("frame too large: {0} bytes")] + FrameTooLarge(u32), +} + +/// Write one envelope as `u32` BE length prefix + JSON payload. +pub async fn write_frame(w: &mut W, env: &Envelope) -> Result<(), CodecError> { + let payload = serde_json::to_vec(env)?; + let len = payload.len() as u32; + if len > MAX_FRAME { + return Err(CodecError::FrameTooLarge(len)); + } + w.write_all(&len.to_be_bytes()).await?; + w.write_all(&payload).await?; + w.flush().await?; + Ok(()) +} + +/// Read one length-prefixed envelope. Returns `Ok(None)` on clean EOF. +pub async fn read_frame(r: &mut R) -> Result, CodecError> { + let mut len_buf = [0u8; 4]; + match r.read_exact(&mut len_buf).await { + Ok(_) => {} + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None), + Err(e) => return Err(e.into()), + } + let len = u32::from_be_bytes(len_buf); + if len > MAX_FRAME { + return Err(CodecError::FrameTooLarge(len)); + } + let mut payload = vec![0u8; len as usize]; + r.read_exact(&mut payload).await?; + let env: Envelope = serde_json::from_slice(&payload)?; + Ok(Some(env)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ids::SurfaceId; + use crate::message::{Cmd, Envelope}; + + #[tokio::test] + async fn frame_round_trips_over_a_pipe() { + let (mut client, mut server) = tokio::io::duplex(1024); + let env = Envelope::Req { id: 9, cmd: Cmd::Status }; + write_frame(&mut client, &env).await.unwrap(); + let got = read_frame(&mut server).await.unwrap().unwrap(); + assert_eq!(got, env); + } + + #[tokio::test] + async fn two_frames_are_decoded_independently() { + let (mut client, mut server) = tokio::io::duplex(4096); + let a = Envelope::Req { id: 1, cmd: Cmd::Status }; + let b = Envelope::Req { id: 2, cmd: Cmd::Close { surface_id: SurfaceId("s_1".into()) } }; + write_frame(&mut client, &a).await.unwrap(); + write_frame(&mut client, &b).await.unwrap(); + assert_eq!(read_frame(&mut server).await.unwrap().unwrap(), a); + assert_eq!(read_frame(&mut server).await.unwrap().unwrap(), b); + } + + #[tokio::test] + async fn clean_eof_returns_none() { + let (client, mut server) = tokio::io::duplex(16); + drop(client); + assert!(read_frame(&mut server).await.unwrap().is_none()); + } +}