Skip to content

Commit d0c2bf6

Browse files
committed
Allow the use of unsound but faster BytesMut::set_len
1 parent 56d758b commit d0c2bf6

2 files changed

Lines changed: 112 additions & 8 deletions

File tree

src/protocol/frame/mod.rs

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,20 @@ where
6666
{
6767
/// Read a frame from stream.
6868
pub fn read(&mut self, max_size: Option<usize>) -> Result<Option<Frame>> {
69-
self.codec.read_frame(&mut self.stream, max_size, false, true)
69+
self.codec.read_frame(&mut self.stream, max_size, false, true, &FrameCodec::read_in)
70+
}
71+
72+
/// Read a frame from stream.
73+
///
74+
/// # Safety
75+
///
76+
/// This function uses `FrameCodec::read_in_unsound` which can be unsound.
77+
/// It is up to the caller of this function to ensure that the underlying `Read` implementation does not
78+
/// attempt to read the uninitialized buffer and only writes to it.
79+
pub unsafe fn read_unsound(&mut self, max_size: Option<usize>) -> Result<Option<Frame>> {
80+
self.codec.read_frame(&mut self.stream, max_size, false, true, &|fc, s| unsafe {
81+
fc.read_in_unsound(s)
82+
})
7083
}
7184
}
7285

@@ -155,13 +168,18 @@ impl FrameCodec {
155168
}
156169

157170
/// Read a frame from the provided stream.
158-
pub(super) fn read_frame(
171+
pub(super) fn read_frame<Stream, Reader>(
159172
&mut self,
160-
stream: &mut impl Read,
173+
stream: &mut Stream,
161174
max_size: Option<usize>,
162175
unmask: bool,
163176
accept_unmasked: bool,
164-
) -> Result<Option<Frame>> {
177+
reader: &Reader,
178+
) -> Result<Option<Frame>>
179+
where
180+
Stream: Read,
181+
Reader: Fn(&mut FrameCodec, &mut Stream) -> io::Result<usize>,
182+
{
165183
let max_size = max_size.unwrap_or_else(usize::max_value);
166184

167185
let mut payload = loop {
@@ -193,7 +211,7 @@ impl FrameCodec {
193211

194212
// Not enough data in buffer.
195213
self.in_buffer.reserve(self.header.as_ref().map(|(_, l)| *l as usize).unwrap_or(6));
196-
if self.read_in(stream)? == 0 {
214+
if reader(self, stream)? == 0 {
197215
trace!("no frame received");
198216
return Ok(None);
199217
}
@@ -222,7 +240,7 @@ impl FrameCodec {
222240
}
223241

224242
/// Read into available `in_buffer` capacity.
225-
fn read_in(&mut self, stream: &mut impl Read) -> io::Result<usize> {
243+
pub(super) fn read_in(&mut self, stream: &mut impl Read) -> io::Result<usize> {
226244
let len = self.in_buffer.len();
227245
debug_assert!(self.in_buffer.capacity() > len);
228246
self.in_buffer.resize(self.in_buffer.capacity(), 0);
@@ -231,6 +249,25 @@ impl FrameCodec {
231249
size
232250
}
233251

252+
/// Read into available `in_buffer` capacity.
253+
///
254+
/// # Safety
255+
///
256+
/// This function uses `BytesMut::set_len` to set the reading buffer length without initializing memory.
257+
/// This is generally unsound.
258+
/// It is up to the caller of this function to ensure that the underlying `Read` implementation does not
259+
/// attempt to read the uninitialized buffer and only writes to it.
260+
pub(super) unsafe fn read_in_unsound(&mut self, stream: &mut impl Read) -> io::Result<usize> {
261+
let len = self.in_buffer.len();
262+
debug_assert!(self.in_buffer.capacity() > len);
263+
unsafe {
264+
self.in_buffer.set_len(self.in_buffer.capacity());
265+
}
266+
let size = stream.read(&mut self.in_buffer[len..]);
267+
self.in_buffer.truncate(len + size.as_ref().copied().unwrap_or(0));
268+
size
269+
}
270+
234271
/// Writes a frame into the `out_buffer`.
235272
/// If the out buffer size is over the `out_buffer_write_len` will also write
236273
/// the out buffer into the provided `stream`.

src/protocol/mod.rs

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,29 @@ impl<Stream: Read + Write> WebSocket<Stream> {
255255
self.context.read(&mut self.socket)
256256
}
257257

258+
/// Read a message from stream, if possible.
259+
///
260+
/// This will also queue responses to ping and close messages. These responses
261+
/// will be written and flushed on the next call to [`read`](Self::read),
262+
/// [`write`](Self::write) or [`flush`](Self::flush).
263+
///
264+
/// # Closing the connection
265+
/// When the remote endpoint decides to close the connection this will return
266+
/// the close message with an optional close frame.
267+
///
268+
/// You should continue calling [`read`](Self::read), [`write`](Self::write) or
269+
/// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`]
270+
/// is returned. Once that happens it is safe to drop the underlying connection.
271+
///
272+
/// # Safety
273+
///
274+
/// This function uses `FrameCodec::read_in_unsound` which can be unsound.
275+
/// It is up to the caller of this function to ensure that the underlying `Read` implementation does not
276+
/// attempt to read the uninitialized buffer and only writes to it.
277+
pub unsafe fn read_unsound(&mut self) -> Result<Message> {
278+
unsafe { self.context.read_unsound(&mut self.socket) }
279+
}
280+
258281
/// Writes and immediately flushes a message.
259282
/// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
260283
pub fn send(&mut self, message: Message) -> Result<()> {
@@ -441,9 +464,44 @@ impl WebSocketContext {
441464
///
442465
/// This function sends pong and close responses automatically.
443466
/// However, it never blocks on write.
467+
#[inline]
444468
pub fn read<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
445469
where
446470
Stream: Read + Write,
471+
{
472+
self.read_inner(stream, &FrameCodec::read_in)
473+
}
474+
475+
/// Read a message from the provided stream, if possible.
476+
///
477+
/// This function sends pong and close responses automatically.
478+
/// However, it never blocks on write.
479+
///
480+
/// # Safety
481+
///
482+
/// This function uses `FrameCodec::read_in_unsound` which can be unsound.
483+
/// It is up to the caller of this function to ensure that the underlying `Read` implementation does not
484+
/// attempt to read the uninitialized buffer and only writes to it.
485+
#[inline]
486+
pub unsafe fn read_unsound<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
487+
where
488+
Stream: Read + Write,
489+
{
490+
self.read_inner(stream, &|fc, s| unsafe { fc.read_in_unsound(s) })
491+
}
492+
493+
/// Read a message from the provided stream, if possible.
494+
///
495+
/// This function sends pong and close responses automatically.
496+
/// However, it never blocks on write.
497+
fn read_inner<Stream, Reader>(
498+
&mut self,
499+
stream: &mut Stream,
500+
reader: &Reader,
501+
) -> Result<Message>
502+
where
503+
Stream: Read + Write,
504+
Reader: Fn(&mut FrameCodec, &mut Stream) -> io::Result<usize>,
447505
{
448506
// Do not read from already closed connections.
449507
self.state.check_not_terminated()?;
@@ -466,7 +524,7 @@ impl WebSocketContext {
466524

467525
// If we get here, either write blocks or we have nothing to write.
468526
// Thus if read blocks, just let it return WouldBlock.
469-
if let Some(message) = self.read_message_frame(stream)? {
527+
if let Some(message) = self.read_message_frame(stream, reader)? {
470528
trace!("Received message {message}");
471529
return Ok(message);
472530
}
@@ -598,14 +656,23 @@ impl WebSocketContext {
598656
}
599657

600658
/// Try to decode one message frame. May return None.
601-
fn read_message_frame(&mut self, stream: &mut impl Read) -> Result<Option<Message>> {
659+
fn read_message_frame<Stream, Reader>(
660+
&mut self,
661+
stream: &mut Stream,
662+
reader: &Reader,
663+
) -> Result<Option<Message>>
664+
where
665+
Stream: Read,
666+
Reader: Fn(&mut FrameCodec, &mut Stream) -> io::Result<usize>,
667+
{
602668
if let Some(frame) = self
603669
.frame
604670
.read_frame(
605671
stream,
606672
self.config.max_frame_size,
607673
matches!(self.role, Role::Server),
608674
self.config.accept_unmasked_frames,
675+
reader,
609676
)
610677
.check_connection_reset(self.state)?
611678
{

0 commit comments

Comments
 (0)