Skip to content

Commit 4d36aea

Browse files
committed
Rework as using underlying bytes.len as capacity
1 parent 8e0f19d commit 4d36aea

2 files changed

Lines changed: 59 additions & 70 deletions

File tree

src/protocol/frame/init_aware_buf.rs

Lines changed: 54 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,118 +1,113 @@
11
use bytes::{Buf, BytesMut};
2-
use std::{
3-
ops::{Deref, DerefMut},
4-
ptr,
5-
};
2+
use std::ops::{Deref, DerefMut};
63

7-
/// [`BytesMut`] wrapper that tracks initialization state of its spare capacity.
4+
/// Buffer that provides fast & safe [`Self::resize`] & [`Self::truncate`] usage.
85
///
9-
/// Supports safe & efficient repeated calls to [`Self::resize`] + [`Self::truncate`].
6+
/// It is aware of the initialization state of its spare capacity avoiding the
7+
/// need to zero uninitialized bytes on resizing more than once for safe usage.
108
///
119
/// This optimisation is useful for [`std::io::Read`] to safely provide spare
1210
/// capacity as an initialized slice.
1311
///
1412
/// Related, may be obsoleted by: <https://github.com/rust-lang/rust/issues/78485>
1513
#[derive(Debug, Default)]
1614
pub struct InitAwareBuf {
15+
/// Backing buf length is used as capacity. This ensure this extra region
16+
/// is always initialized (initially with zero, but otherwise the last previously
17+
/// set value).
1718
bytes: BytesMut,
18-
/// Capacity that has been initialized.
19-
init_cap: usize,
19+
/// Length of bytes in use (always <= bytes.len).
20+
len: usize,
2021
}
2122

2223
impl InitAwareBuf {
2324
#[inline]
2425
pub fn with_capacity(capacity: usize) -> Self {
25-
Self { bytes: BytesMut::with_capacity(capacity), init_cap: 0 }
26+
Self { bytes: BytesMut::zeroed(capacity), len: 0 }
2627
}
2728

2829
#[inline]
2930
pub fn len(&self) -> usize {
30-
self.bytes.len()
31+
self.len
3132
}
3233

34+
/// Capacity that may be resized to cheaply.
3335
#[inline]
3436
pub fn capacity(&self) -> usize {
35-
self.bytes.capacity()
37+
self.bytes.len()
3638
}
3739

3840
#[inline]
3941
pub fn split_to(&mut self, at: usize) -> BytesMut {
42+
assert!(at <= self.len, "split_to out of bounds: {at} <= {}", self.len);
4043
let split = self.bytes.split_to(at);
41-
self.init_cap -= at;
44+
self.len -= at;
4245
split
4346
}
4447

48+
/// Reserve capacity for `min_additional` more bytes than the current [`Self::len`].
49+
///
50+
/// `max_additional` sets the maximum number of additional bytes zeroed as extra
51+
/// capacity if available after reserving in the underlying buffer. Has no effect
52+
/// if `max_additional <= additional`.
4553
#[inline]
46-
pub fn reserve(&mut self, additional: usize) {
47-
// Increasing capacity doesn't change `init_cap`
48-
self.bytes.reserve(additional);
54+
pub fn reserve(&mut self, additional: usize, max_additional: usize) {
55+
let min_len = self.len + additional;
56+
let cap = self.capacity();
57+
if min_len > cap {
58+
self.bytes.reserve(min_len - cap);
59+
let new_cap = self.bytes.capacity().min(self.len + max_additional.max(additional));
60+
self.bytes.resize(new_cap, 0);
61+
}
4962
}
5063

51-
/// Sets the length of the buffer to `len`. If above the current
52-
/// initialized capacity any uninitialized bytes will be zeroed.
53-
///
54-
/// This is more efficient that [`BytesMut::resize`] as spare capacity
55-
/// is only initialized **once** past the initialized_capacity. This
56-
/// allow the method to be efficiently called after truncating.
64+
/// Resizes the buffer to `new_len`.
5765
///
58-
/// # Panics
59-
/// Panics if `len > capacity`.
66+
/// If greater the new bytes will be either initialized to zero or as
67+
/// they were last set to.
6068
#[inline]
61-
pub fn resize(&mut self, len: usize) {
62-
if len <= self.init_cap {
63-
// SAFETY: init_cap tracks initialised bytes.
64-
unsafe {
65-
self.bytes.set_len(len);
66-
}
67-
} else {
68-
assert!(len <= self.capacity());
69-
let cur_len = self.bytes.len();
70-
let spare = self.bytes.spare_capacity_mut();
71-
let already_init = self.init_cap - cur_len;
72-
let zeroes = len - self.init_cap;
73-
debug_assert!(already_init + zeroes <= spare.len());
74-
unsafe {
75-
// SAFETY: spare capacity is sufficient for `zeroes` extra bytes
76-
ptr::write_bytes(spare[already_init..].as_mut_ptr().cast::<u8>(), 0, zeroes);
77-
// SAFETY: len has been initialized
78-
self.bytes.set_len(len);
79-
}
80-
self.init_cap = len;
69+
pub fn resize(&mut self, new_len: usize) {
70+
if new_len > self.capacity() {
71+
self.bytes.resize(new_len, 0);
8172
}
73+
self.len = new_len;
8274
}
8375

8476
#[inline]
8577
pub fn truncate(&mut self, len: usize) {
86-
// truncating doesn't change `init_cap`
87-
self.bytes.truncate(len);
78+
if len < self.len {
79+
self.len = len;
80+
}
8881
}
8982

9083
#[inline]
9184
pub fn advance(&mut self, cnt: usize) {
85+
assert!(cnt <= self.len, "cannot advance past len: {cnt} <= {}", self.len);
9286
self.bytes.advance(cnt);
93-
self.init_cap -= cnt;
87+
self.len -= cnt;
9488
}
9589
}
9690

9791
impl From<BytesMut> for InitAwareBuf {
9892
#[inline]
9993
fn from(bytes: BytesMut) -> Self {
100-
let init_cap = bytes.len();
101-
Self { bytes, init_cap }
94+
let len = bytes.len();
95+
Self { bytes, len }
10296
}
10397
}
10498

10599
impl From<InitAwareBuf> for BytesMut {
106100
#[inline]
107-
fn from(value: InitAwareBuf) -> Self {
108-
value.bytes
101+
fn from(mut zb: InitAwareBuf) -> Self {
102+
zb.bytes.truncate(zb.len);
103+
zb.bytes
109104
}
110105
}
111106

112107
impl AsRef<[u8]> for InitAwareBuf {
113108
#[inline]
114109
fn as_ref(&self) -> &[u8] {
115-
&self.bytes
110+
&self.bytes[..self.len]
116111
}
117112
}
118113

@@ -121,21 +116,21 @@ impl Deref for InitAwareBuf {
121116

122117
#[inline]
123118
fn deref(&self) -> &[u8] {
124-
&self.bytes
119+
self.as_ref()
125120
}
126121
}
127122

128123
impl AsMut<[u8]> for InitAwareBuf {
129124
#[inline]
130125
fn as_mut(&mut self) -> &mut [u8] {
131-
&mut self.bytes
126+
&mut self.bytes[..self.len]
132127
}
133128
}
134129

135130
impl DerefMut for InitAwareBuf {
136131
#[inline]
137132
fn deref_mut(&mut self) -> &mut [u8] {
138-
&mut self.bytes
133+
self.as_mut()
139134
}
140135
}
141136

@@ -147,18 +142,15 @@ mod test {
147142
fn reserve_resize_truncate() {
148143
let mut buf = InitAwareBuf::default();
149144
assert_eq!(buf.len(), 0);
150-
assert_eq!(buf.init_cap, 0);
151145
assert_eq!(buf.capacity(), 0);
152146

153-
buf.reserve(64);
147+
buf.reserve(64, 0);
154148
assert_eq!(buf.len(), 0);
155-
assert_eq!(buf.init_cap, 0);
156149
let new_capacity = buf.capacity();
157150
assert!(new_capacity >= 64);
158151

159152
buf.resize(10);
160153
assert_eq!(buf.len(), 10);
161-
assert_eq!(buf.init_cap, 10);
162154
assert_eq!(buf.capacity(), new_capacity);
163155
assert_eq!(&*buf, &[0; 10]);
164156

@@ -172,14 +164,12 @@ mod test {
172164
}
173165
buf.truncate(3);
174166
assert_eq!(buf.len(), 3);
175-
assert_eq!(buf.init_cap, 10);
176167
assert_eq!(buf.capacity(), new_capacity);
177168
assert_eq!(&*buf, &[8; 3]);
178169

179170
// resizing should need do nothing now since this has already been initialized once
180171
buf.resize(10);
181172
assert_eq!(buf.len(), 10);
182-
assert_eq!(buf.init_cap, 10);
183173
assert_eq!(buf.capacity(), new_capacity);
184174
assert_eq!(&*buf, &[8, 8, 8, 44, 44, 44, 44, 44, 44, 44]);
185175

@@ -189,7 +179,6 @@ mod test {
189179
// resizing should only init to zero the 3 bytes that hadn't previously been
190180
buf.resize(13);
191181
assert_eq!(buf.len(), 13);
192-
assert_eq!(buf.init_cap, 13);
193182
assert_eq!(buf.capacity(), new_capacity);
194183
assert_eq!(&*buf, &[8, 8, 8, 44, 44, 44, 44, 44, 44, 44, 0, 0, 0]);
195184
}
@@ -198,23 +187,23 @@ mod test {
198187
fn advance() {
199188
let mut buf = InitAwareBuf::from(BytesMut::from(&[0, 1, 2, 3, 4][..]));
200189
assert_eq!(buf.len(), 5);
201-
assert_eq!(buf.init_cap, 5);
190+
assert_eq!(buf.capacity(), 5);
202191

203192
buf.advance(2);
204193
assert_eq!(buf.len(), 3);
205-
assert_eq!(buf.init_cap, 3);
194+
assert_eq!(buf.capacity(), 3);
206195
assert_eq!(&*buf, &[2, 3, 4]);
207196
}
208197

209198
#[test]
210199
fn split_to() {
211200
let mut buf = InitAwareBuf::from(BytesMut::from(&[0, 1, 2, 3, 4][..]));
212201
assert_eq!(buf.len(), 5);
213-
assert_eq!(buf.init_cap, 5);
202+
assert_eq!(buf.capacity(), 5);
214203

215204
let split = buf.split_to(2);
216205
assert_eq!(buf.len(), 3);
217-
assert_eq!(buf.init_cap, 3);
206+
assert_eq!(buf.capacity(), 3);
218207
assert_eq!(&*buf, &[2, 3, 4]);
219208
assert_eq!(&*split, &[0, 1]);
220209
}

src/protocol/frame/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,10 @@ impl FrameCodec {
135135

136136
/// Create a new frame codec from partially read data.
137137
pub(super) fn from_partially_read(part: Vec<u8>, min_in_buf_len: usize) -> Self {
138-
let mut in_buffer = BytesMut::from_iter(part);
139-
in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len()));
138+
let mut in_buffer = InitAwareBuf::from(BytesMut::from_iter(part));
139+
in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len()), min_in_buf_len);
140140
Self {
141-
in_buffer: in_buffer.into(),
141+
in_buffer,
142142
in_buf_max_read: min_in_buf_len.max(FrameHeader::MAX_SIZE),
143143
out_buffer: <_>::default(),
144144
max_out_buffer_len: usize::MAX,
@@ -188,9 +188,9 @@ impl FrameCodec {
188188

189189
// Reserve full message length only once, even for multiple
190190
// loops or if WouldBlock errors cause multiple fn calls.
191-
self.in_buffer.reserve(len);
191+
self.in_buffer.reserve(len, self.in_buf_max_read);
192192
} else {
193-
self.in_buffer.reserve(FrameHeader::MAX_SIZE);
193+
self.in_buffer.reserve(FrameHeader::MAX_SIZE, self.in_buf_max_read);
194194
}
195195
}
196196

0 commit comments

Comments
 (0)