Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Unreleased
- Fix large message read performance by enforcing max `read_buffer_size` read chunks.

# 0.26.2
- Add `WebSocketConfig::read_buffer_size` docs explaining performance/memory tradeoff.
- Implement traits and add helper methods for the UTF8 payloads making them comparable and more ergonomic.
Expand Down
4 changes: 4 additions & 0 deletions src/protocol/frame/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ impl Default for FrameHeader {
}

impl FrameHeader {
/// > The longest possible header is 14 bytes, which would represent a message sent from
/// > the client to the server with a payload greater than 64KB.
pub(crate) const MAX_SIZE: usize = 14;

/// Parse a header from an input stream.
/// Returns `None` if insufficient data and does not consume anything in this case.
/// Payload size is returned along with the header.
Expand Down
15 changes: 13 additions & 2 deletions src/protocol/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ where
pub(super) struct FrameCodec {
/// Buffer to read data from the stream.
in_buffer: BytesMut,
in_buf_max_read: usize,
/// Buffer to send packets to the network.
out_buffer: Vec<u8>,
/// Capacity limit for `out_buffer`.
Expand All @@ -123,6 +124,7 @@ impl FrameCodec {
pub(super) fn new(in_buf_len: usize) -> Self {
Self {
in_buffer: BytesMut::with_capacity(in_buf_len),
in_buf_max_read: in_buf_len.max(FrameHeader::MAX_SIZE),
out_buffer: <_>::default(),
max_out_buffer_len: usize::MAX,
out_buffer_write_len: 0,
Expand All @@ -136,6 +138,7 @@ impl FrameCodec {
in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len()));
Self {
in_buffer,
in_buf_max_read: min_in_buf_len.max(FrameHeader::MAX_SIZE),
out_buffer: <_>::default(),
max_out_buffer_len: usize::MAX,
out_buffer_write_len: 0,
Expand Down Expand Up @@ -164,6 +167,7 @@ impl FrameCodec {
) -> Result<Option<Frame>> {
let max_size = max_size.unwrap_or_else(usize::max_value);

let mut reserved_full_msg_len = false;
let mut payload = loop {
{
if self.header.is_none() {
Expand Down Expand Up @@ -192,7 +196,14 @@ impl FrameCodec {
}

// Not enough data in buffer.
self.in_buffer.reserve(self.header.as_ref().map(|(_, l)| *l as usize).unwrap_or(6));
if let Some((_, len)) = &self.header {
if !reserved_full_msg_len {
self.in_buffer.reserve(*len as usize);
reserved_full_msg_len = true;
}
} else {
self.in_buffer.reserve(FrameHeader::MAX_SIZE);
}
Comment thread
alexheretic marked this conversation as resolved.
if self.read_in(stream)? == 0 {
trace!("no frame received");
return Ok(None);
Expand Down Expand Up @@ -225,7 +236,7 @@ impl FrameCodec {
fn read_in(&mut self, stream: &mut impl Read) -> io::Result<usize> {
let len = self.in_buffer.len();
debug_assert!(self.in_buffer.capacity() > len);
self.in_buffer.resize(self.in_buffer.capacity(), 0);
self.in_buffer.resize(self.in_buffer.capacity().min(len + self.in_buf_max_read), 0);
let size = stream.read(&mut self.in_buffer[len..]);
self.in_buffer.truncate(len + size.as_ref().copied().unwrap_or(0));
size
Expand Down