feat(proto): length-prefixed frame read/write with EOF handling
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1 +1,80 @@
|
||||
// populated in Task 2
|
||||
use crate::message::Envelope;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
|
||||
/// Maximum frame size we will accept (16 MiB). Guards against a corrupt length prefix.
|
||||
pub const MAX_FRAME: u32 = 16 * 1024 * 1024;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum CodecError {
|
||||
#[error("io: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("json: {0}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
#[error("frame too large: {0} bytes")]
|
||||
FrameTooLarge(u32),
|
||||
}
|
||||
|
||||
/// Write one envelope as `u32` BE length prefix + JSON payload.
|
||||
pub async fn write_frame<W: AsyncWrite + Unpin>(w: &mut W, env: &Envelope) -> Result<(), CodecError> {
|
||||
let payload = serde_json::to_vec(env)?;
|
||||
let len = payload.len() as u32;
|
||||
if len > MAX_FRAME {
|
||||
return Err(CodecError::FrameTooLarge(len));
|
||||
}
|
||||
w.write_all(&len.to_be_bytes()).await?;
|
||||
w.write_all(&payload).await?;
|
||||
w.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read one length-prefixed envelope. Returns `Ok(None)` on clean EOF.
|
||||
pub async fn read_frame<R: AsyncRead + Unpin>(r: &mut R) -> Result<Option<Envelope>, CodecError> {
|
||||
let mut len_buf = [0u8; 4];
|
||||
match r.read_exact(&mut len_buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
let len = u32::from_be_bytes(len_buf);
|
||||
if len > MAX_FRAME {
|
||||
return Err(CodecError::FrameTooLarge(len));
|
||||
}
|
||||
let mut payload = vec![0u8; len as usize];
|
||||
r.read_exact(&mut payload).await?;
|
||||
let env: Envelope = serde_json::from_slice(&payload)?;
|
||||
Ok(Some(env))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::ids::SurfaceId;
|
||||
use crate::message::{Cmd, Envelope};
|
||||
|
||||
#[tokio::test]
|
||||
async fn frame_round_trips_over_a_pipe() {
|
||||
let (mut client, mut server) = tokio::io::duplex(1024);
|
||||
let env = Envelope::Req { id: 9, cmd: Cmd::Status };
|
||||
write_frame(&mut client, &env).await.unwrap();
|
||||
let got = read_frame(&mut server).await.unwrap().unwrap();
|
||||
assert_eq!(got, env);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn two_frames_are_decoded_independently() {
|
||||
let (mut client, mut server) = tokio::io::duplex(4096);
|
||||
let a = Envelope::Req { id: 1, cmd: Cmd::Status };
|
||||
let b = Envelope::Req { id: 2, cmd: Cmd::Close { surface_id: SurfaceId("s_1".into()) } };
|
||||
write_frame(&mut client, &a).await.unwrap();
|
||||
write_frame(&mut client, &b).await.unwrap();
|
||||
assert_eq!(read_frame(&mut server).await.unwrap().unwrap(), a);
|
||||
assert_eq!(read_frame(&mut server).await.unwrap().unwrap(), b);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn clean_eof_returns_none() {
|
||||
let (client, mut server) = tokio::io::duplex(16);
|
||||
drop(client);
|
||||
assert!(read_frame(&mut server).await.unwrap().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user