Skip to content

Commit b9f2acc

Browse files
committed
Fix large message read performance by enforcing max read_buffer_size read chunks
1 parent 255aaa2 commit b9f2acc

3 files changed

Lines changed: 20 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# Unreleased
2+
- Fix large message read performance by enforcing max `read_buffer_size` read chunks.
3+
14
# 0.26.2
25
- Add `WebSocketConfig::read_buffer_size` docs explaining performance/memory tradeoff.
36
- Implement traits and add helper methods for the UTF8 payloads making them comparable and more ergonomic.

src/protocol/frame/frame.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ impl Default for FrameHeader {
6666
}
6767

6868
impl FrameHeader {
69+
/// > longest possible header is 14 bytes, which would represent a message sent from
70+
/// > the client to the server with a payload greater then 64KB.
71+
pub(crate) const MAX_SIZE: usize = 14;
72+
6973
/// Parse a header from an input stream.
7074
/// Returns `None` if insufficient data and does not consume anything in this case.
7175
/// Payload size is returned along with the header.

src/protocol/frame/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ where
104104
pub(super) struct FrameCodec {
105105
/// Buffer to read data from the stream.
106106
in_buffer: BytesMut,
107+
in_buf_max_read: usize,
107108
/// Buffer to send packets to the network.
108109
out_buffer: Vec<u8>,
109110
/// Capacity limit for `out_buffer`.
@@ -123,6 +124,7 @@ impl FrameCodec {
123124
pub(super) fn new(in_buf_len: usize) -> Self {
124125
Self {
125126
in_buffer: BytesMut::with_capacity(in_buf_len),
127+
in_buf_max_read: in_buf_len,
126128
out_buffer: <_>::default(),
127129
max_out_buffer_len: usize::MAX,
128130
out_buffer_write_len: 0,
@@ -136,6 +138,7 @@ impl FrameCodec {
136138
in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len()));
137139
Self {
138140
in_buffer,
141+
in_buf_max_read: min_in_buf_len,
139142
out_buffer: <_>::default(),
140143
max_out_buffer_len: usize::MAX,
141144
out_buffer_write_len: 0,
@@ -164,6 +167,7 @@ impl FrameCodec {
164167
) -> Result<Option<Frame>> {
165168
let max_size = max_size.unwrap_or_else(usize::max_value);
166169

170+
let mut reserved_full_msg_len = false;
167171
let mut payload = loop {
168172
{
169173
if self.header.is_none() {
@@ -192,7 +196,14 @@ impl FrameCodec {
192196
}
193197

194198
// Not enough data in buffer.
195-
self.in_buffer.reserve(self.header.as_ref().map(|(_, l)| *l as usize).unwrap_or(6));
199+
if let Some((_, len)) = &self.header {
200+
if !reserved_full_msg_len {
201+
self.in_buffer.reserve(*len as usize);
202+
reserved_full_msg_len = true;
203+
}
204+
} else {
205+
self.in_buffer.reserve(FrameHeader::MAX_SIZE);
206+
}
196207
if self.read_in(stream)? == 0 {
197208
trace!("no frame received");
198209
return Ok(None);
@@ -225,7 +236,7 @@ impl FrameCodec {
225236
fn read_in(&mut self, stream: &mut impl Read) -> io::Result<usize> {
226237
let len = self.in_buffer.len();
227238
debug_assert!(self.in_buffer.capacity() > len);
228-
self.in_buffer.resize(self.in_buffer.capacity(), 0);
239+
self.in_buffer.resize(self.in_buffer.capacity().min(len + self.in_buf_max_read), 0);
229240
let size = stream.read(&mut self.in_buffer[len..]);
230241
self.in_buffer.truncate(len + size.as_ref().copied().unwrap_or(0));
231242
size

0 commit comments

Comments
 (0)