Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Unreleased
- Fix large message read performance by enforcing max `read_buffer_size` read chunks.

# 0.26.2
- Add `WebSocketConfig::read_buffer_size` docs explaining performance/memory tradeoff.
- Implement traits and add helper methods for the UTF8 payloads making them comparable and more ergonomic.
Expand Down
4 changes: 4 additions & 0 deletions src/protocol/frame/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ impl Default for FrameHeader {
}

impl FrameHeader {
/// > longest possible header is 14 bytes, which would represent a message sent from
/// > the client to the server with a payload greater then 64KB.
pub(crate) const MAX_SIZE: usize = 14;
Comment thread
alexheretic marked this conversation as resolved.
Outdated

/// Parse a header from an input stream.
/// Returns `None` if insufficient data and does not consume anything in this case.
/// Payload size is returned along with the header.
Expand Down
15 changes: 13 additions & 2 deletions src/protocol/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ where
pub(super) struct FrameCodec {
/// Buffer to read data from the stream.
in_buffer: BytesMut,
in_buf_max_read: usize,
/// Buffer to send packets to the network.
out_buffer: Vec<u8>,
/// Capacity limit for `out_buffer`.
Expand All @@ -123,6 +124,7 @@ impl FrameCodec {
pub(super) fn new(in_buf_len: usize) -> Self {
Self {
in_buffer: BytesMut::with_capacity(in_buf_len),
in_buf_max_read: in_buf_len.max(FrameHeader::MAX_SIZE),
out_buffer: <_>::default(),
max_out_buffer_len: usize::MAX,
out_buffer_write_len: 0,
Expand All @@ -136,6 +138,7 @@ impl FrameCodec {
in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len()));
Self {
in_buffer,
in_buf_max_read: min_in_buf_len.max(FrameHeader::MAX_SIZE),
out_buffer: <_>::default(),
max_out_buffer_len: usize::MAX,
out_buffer_write_len: 0,
Expand Down Expand Up @@ -164,6 +167,7 @@ impl FrameCodec {
) -> Result<Option<Frame>> {
let max_size = max_size.unwrap_or_else(usize::max_value);

let mut reserved_full_msg_len = false;
let mut payload = loop {
{
if self.header.is_none() {
Expand Down Expand Up @@ -192,7 +196,14 @@ impl FrameCodec {
}

// Not enough data in buffer.
self.in_buffer.reserve(self.header.as_ref().map(|(_, l)| *l as usize).unwrap_or(6));
if let Some((_, len)) = &self.header {
if !reserved_full_msg_len {
self.in_buffer.reserve(*len as usize);
reserved_full_msg_len = true;
}
} else {
self.in_buffer.reserve(FrameHeader::MAX_SIZE);
}
Comment thread
alexheretic marked this conversation as resolved.
if self.read_in(stream)? == 0 {
trace!("no frame received");
return Ok(None);
Expand Down Expand Up @@ -225,7 +236,7 @@ impl FrameCodec {
fn read_in(&mut self, stream: &mut impl Read) -> io::Result<usize> {
let len = self.in_buffer.len();
debug_assert!(self.in_buffer.capacity() > len);
self.in_buffer.resize(self.in_buffer.capacity(), 0);
self.in_buffer.resize(self.in_buffer.capacity().min(len + self.in_buf_max_read), 0);
let size = stream.read(&mut self.in_buffer[len..]);
self.in_buffer.truncate(len + size.as_ref().copied().unwrap_or(0));
size
Expand Down