use crate::message::Envelope; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; /// Maximum frame size we will accept (16 MiB). Guards against a corrupt length prefix. pub const MAX_FRAME: u32 = 16 * 1024 * 1024; #[derive(Debug, thiserror::Error)] pub enum CodecError { #[error("io: {0}")] Io(#[from] std::io::Error), #[error("json: {0}")] Json(#[from] serde_json::Error), #[error("frame too large: {0} bytes")] FrameTooLarge(u32), } /// Write one envelope as `u32` BE length prefix + JSON payload. pub async fn write_frame(w: &mut W, env: &Envelope) -> Result<(), CodecError> { let payload = serde_json::to_vec(env)?; let len = payload.len() as u32; if len > MAX_FRAME { return Err(CodecError::FrameTooLarge(len)); } w.write_all(&len.to_be_bytes()).await?; w.write_all(&payload).await?; w.flush().await?; Ok(()) } /// Read one length-prefixed envelope. Returns `Ok(None)` on clean EOF. pub async fn read_frame(r: &mut R) -> Result, CodecError> { let mut len_buf = [0u8; 4]; match r.read_exact(&mut len_buf).await { Ok(_) => {} Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None), Err(e) => return Err(e.into()), } let len = u32::from_be_bytes(len_buf); if len > MAX_FRAME { return Err(CodecError::FrameTooLarge(len)); } let mut payload = vec![0u8; len as usize]; r.read_exact(&mut payload).await?; let env: Envelope = serde_json::from_slice(&payload)?; Ok(Some(env)) } #[cfg(test)] mod tests { use super::*; use crate::ids::SurfaceId; use crate::message::{Cmd, Envelope}; #[tokio::test] async fn frame_round_trips_over_a_pipe() { let (mut client, mut server) = tokio::io::duplex(1024); let env = Envelope::Req { id: 9, cmd: Cmd::Status }; write_frame(&mut client, &env).await.unwrap(); let got = read_frame(&mut server).await.unwrap().unwrap(); assert_eq!(got, env); } #[tokio::test] async fn two_frames_are_decoded_independently() { let (mut client, mut server) = tokio::io::duplex(4096); let a = Envelope::Req { id: 1, cmd: Cmd::Status }; let b = Envelope::Req { id: 2, cmd: Cmd::Close { surface_id: SurfaceId("s_1".into()) } }; write_frame(&mut client, &a).await.unwrap(); write_frame(&mut client, &b).await.unwrap(); assert_eq!(read_frame(&mut server).await.unwrap().unwrap(), a); assert_eq!(read_frame(&mut server).await.unwrap().unwrap(), b); } #[tokio::test] async fn clean_eof_returns_none() { let (client, mut server) = tokio::io::duplex(16); drop(client); assert!(read_frame(&mut server).await.unwrap().is_none()); } }