Skip to content

Commit f943312

Browse files
authored
fs: support io-uring in AsyncRead for File (#7907)
1 parent 5db10f5 commit f943312

9 files changed

Lines changed: 766 additions & 40 deletions

File tree

spellcheck.dic

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
317
1+
318
22
&
33
+
44
<
@@ -244,6 +244,7 @@ spawner
244244
Splitter
245245
spmc
246246
spsc
247+
SQE
247248
src
248249
stabilised
249250
startup

tokio/src/fs/file.rs

Lines changed: 125 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
3030
#[cfg(not(test))]
3131
use std::fs::File as StdFile;
3232

33+
cfg_io_uring! {
34+
#[cfg(not(test))]
35+
use crate::spawn;
36+
}
37+
3338
/// A reference to an open file on the filesystem.
3439
///
3540
/// This is a specialized version of [`std::fs::File`] for usage from the
@@ -613,13 +618,7 @@ impl AsyncRead for File {
613618
let std = me.std.clone();
614619

615620
let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
616-
inner.state = State::Busy(spawn_blocking(move || {
617-
// SAFETY: the `Read` implementation of `std` does not
618-
// read from the buffer it is borrowing and correctly
619-
// reports the length of the data written into the buffer.
620-
let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
621-
(Operation::Read(res), buf)
622-
}));
621+
inner.state = State::Busy(Inner::poll_read_inner(std, buf, max_buf_size)?);
623622
}
624623
State::Busy(ref mut rx) => {
625624
let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
@@ -952,6 +951,125 @@ cfg_windows! {
952951
}
953952

954953
impl Inner {
954+
fn poll_read_inner(
955+
std: Arc<StdFile>,
956+
buf: Buf,
957+
max_buf_size: usize,
958+
) -> io::Result<JoinHandle<(Operation, Buf)>> {
959+
// Unit tests use `MockFile` and the mock `spawn_blocking` infrastructure,
960+
// which can't drive real io_uring operations. The io_uring read path
961+
// is tested through integration tests in `tests/fs_uring_file_read.rs`.
962+
#[cfg(all(
963+
not(test),
964+
tokio_unstable,
965+
feature = "io-uring",
966+
feature = "rt",
967+
feature = "fs",
968+
target_os = "linux",
969+
))]
970+
{
971+
if let Ok(handle) = crate::runtime::Handle::try_current() {
972+
let driver_handle = handle.inner.driver().io();
973+
974+
if driver_handle.is_uring_ready(io_uring::opcode::Read::CODE) {
975+
// Fast path: uring already initialized and Read supported.
976+
let fd: crate::io::uring::utils::ArcFd = std;
977+
return Ok(spawn(Self::uring_read(fd, buf, max_buf_size)));
978+
}
979+
980+
if !driver_handle.is_uring_probed() {
981+
// Not yet probed: lazy init inside an async task so
982+
// `File::from_std()` can still benefit from io-uring.
983+
return Ok(spawn(Self::lazy_init_read(std, buf, max_buf_size)));
984+
}
985+
// Probed but unsupported: fall through to spawn_blocking.
986+
}
987+
}
988+
989+
// Fallback: spawn_blocking
990+
let join = Self::spawn_blocking_read(buf, std, max_buf_size);
991+
Ok(join)
992+
}
993+
994+
/// Perform an io-uring read with interrupt retry.
995+
#[cfg(all(
996+
not(test),
997+
tokio_unstable,
998+
feature = "io-uring",
999+
feature = "rt",
1000+
feature = "fs",
1001+
target_os = "linux",
1002+
))]
1003+
async fn uring_read(
1004+
mut fd: crate::io::uring::utils::ArcFd,
1005+
mut buf: Buf,
1006+
max_buf_size: usize,
1007+
) -> (Operation, Buf) {
1008+
use crate::runtime::driver::op::Op;
1009+
1010+
loop {
1011+
let (res, r_fd, r_buf) =
1012+
// u64::MAX to use and advance the file position
1013+
Op::read_at(fd, buf, max_buf_size, u64::MAX).await;
1014+
match res {
1015+
Err(e) if e.kind() == io::ErrorKind::Interrupted => {
1016+
buf = r_buf;
1017+
fd = r_fd;
1018+
continue;
1019+
}
1020+
Err(e) => break (Operation::Read(Err(e)), r_buf),
1021+
Ok(n) => break (Operation::Read(Ok(n as usize)), r_buf),
1022+
}
1023+
}
1024+
}
1025+
1026+
/// Attempt lazy io-uring initialization, then read via uring or fall back
1027+
/// to a blocking read. Covers the `File::from_std()` path where
1028+
/// `check_and_init()` hasn't been called yet.
1029+
#[cfg(all(
1030+
not(test),
1031+
tokio_unstable,
1032+
feature = "io-uring",
1033+
feature = "rt",
1034+
feature = "fs",
1035+
target_os = "linux",
1036+
))]
1037+
async fn lazy_init_read(std: Arc<StdFile>, buf: Buf, max_buf_size: usize) -> (Operation, Buf) {
1038+
let handle = crate::runtime::Handle::current();
1039+
let driver_handle = handle.inner.driver().io();
1040+
if driver_handle
1041+
.check_and_init(io_uring::opcode::Read::CODE)
1042+
.await
1043+
.unwrap_or(false)
1044+
{
1045+
let fd: crate::io::uring::utils::ArcFd = std;
1046+
Self::uring_read(fd, buf, max_buf_size).await
1047+
} else {
1048+
match Self::spawn_blocking_read(buf, std, max_buf_size).await {
1049+
Ok(result) => result,
1050+
Err(e) => (
1051+
Operation::Read(Err(io::Error::new(io::ErrorKind::Other, e))),
1052+
Buf::with_capacity(0),
1053+
),
1054+
}
1055+
}
1056+
}
1057+
1058+
fn spawn_blocking_read(
1059+
buf: Buf,
1060+
std: Arc<StdFile>,
1061+
max_buf_size: usize,
1062+
) -> JoinHandle<(Operation, Buf)> {
1063+
spawn_blocking(move || {
1064+
let mut buf = buf;
1065+
// SAFETY: the `Read` implementation of `std` does not
1066+
// read from the buffer it is borrowing and correctly
1067+
// reports the length of the data written into the buffer.
1068+
let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
1069+
(Operation::Read(res), buf)
1070+
})
1071+
}
1072+
9551073
async fn complete_inflight(&mut self) {
9561074
use std::future::poll_fn;
9571075

tokio/src/fs/read_uring.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ async fn op_read(
116116
read_len: u32,
117117
) -> io::Result<(OwnedFd, Vec<u8>, bool)> {
118118
loop {
119-
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await;
119+
let (res, r_fd, r_buf) = Op::read_at(fd, buf, read_len as usize, *offset).await;
120120

121121
match res {
122122
Err(e) if e.kind() == ErrorKind::Interrupted => {

tokio/src/io/blocking.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,39 @@ impl Buf {
277277
}
278278
}
279279

280+
cfg_io_uring! {
281+
impl Buf {
282+
/// Prepare the internal buffer for an io-uring read operation.
283+
///
284+
/// Returns a pointer to the spare capacity and the length available
285+
/// for the kernel to write into.
286+
pub(crate) fn prepare_uring_read(&mut self, max_buf_size: usize) -> (*mut u8, u32) {
287+
assert!(self.is_empty());
288+
self.buf.reserve(max_buf_size);
289+
let spare = self.buf.spare_capacity_mut();
290+
let len = std::cmp::min(spare.len(), max_buf_size);
291+
let ptr = spare.as_mut_ptr().cast::<u8>();
292+
(ptr, len as u32)
293+
}
294+
295+
/// Complete an io-uring read operation.
296+
///
297+
/// # Safety
298+
///
299+
/// The caller must ensure that the kernel wrote exactly `n` bytes
300+
/// into the buffer that was returned by `prepare_uring_read`.
301+
pub(crate) unsafe fn complete_uring_read(&mut self, n: usize) {
302+
assert_eq!(self.pos, 0);
303+
// SAFETY: `prepare_uring_read` handed out a pointer to
304+
// `self.buf.spare_capacity_mut()` after asserting it's empty.
305+
// The caller guarantees the kernel initialised exactly `n` bytes
306+
// starting at that pointer, so bytes `0..n` are now initialised and
307+
// it is sound to set the Vec length to `n`.
308+
unsafe { self.buf.set_len(n) };
309+
}
310+
}
311+
}
312+
280313
cfg_fs! {
281314
impl Buf {
282315
pub(crate) fn discard_read(&mut self) -> i64 {

tokio/src/io/uring/read.rs

Lines changed: 83 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,74 @@
1+
use crate::io::blocking::Buf;
2+
use crate::io::uring::utils::{ArcFd, UringFd};
13
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
24

35
use io_uring::{opcode, types};
6+
use std::fmt;
47
use std::io::{self, Error};
5-
use std::os::fd::{AsRawFd, OwnedFd};
8+
use std::os::fd::OwnedFd;
69

7-
#[derive(Debug)]
8-
pub(crate) struct Read {
9-
fd: OwnedFd,
10-
buf: Vec<u8>,
10+
/// Trait for buffers that can be used with io-uring read operations.
11+
pub(crate) trait ReadBuffer: Send + 'static {
12+
/// Prepare the buffer for a read operation.
13+
/// Returns a pointer and length for the io-uring SQE.
14+
fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32);
15+
16+
/// Complete a read of `n` bytes.
17+
///
18+
/// # Safety
19+
///
20+
/// The caller must ensure the kernel wrote exactly `n` bytes
21+
/// into the buffer at the pointer returned by `uring_read_prepare`.
22+
unsafe fn uring_read_complete(&mut self, n: u32);
1123
}
1224

13-
impl Completable for Read {
14-
type Output = (io::Result<u32>, OwnedFd, Vec<u8>);
25+
impl ReadBuffer for Vec<u8> {
26+
fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) {
27+
assert!(self.spare_capacity_mut().len() >= max_len);
28+
let ptr = self.spare_capacity_mut().as_mut_ptr().cast();
29+
(ptr, max_len as u32)
30+
}
31+
32+
unsafe fn uring_read_complete(&mut self, n: u32) {
33+
// SAFETY: the kernel wrote `n` bytes into spare capacity starting
34+
// at the old self.len(), so self.len() + n bytes are now initialized.
35+
unsafe { self.set_len(self.len() + n as usize) };
36+
}
37+
}
38+
39+
impl ReadBuffer for Buf {
40+
fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) {
41+
self.prepare_uring_read(max_len)
42+
}
43+
44+
unsafe fn uring_read_complete(&mut self, n: u32) {
45+
// SAFETY: caller guarantees kernel wrote exactly n bytes.
46+
unsafe { self.complete_uring_read(n as usize) };
47+
}
48+
}
49+
50+
pub(crate) struct Read<B, F = ArcFd> {
51+
fd: F,
52+
buf: B,
53+
}
54+
55+
impl<B: fmt::Debug, F> fmt::Debug for Read<B, F> {
56+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57+
f.debug_struct("Read")
58+
.field("buf", &self.buf)
59+
.finish_non_exhaustive()
60+
}
61+
}
62+
63+
impl<B: ReadBuffer, F: UringFd> Completable for Read<B, F> {
64+
type Output = (io::Result<u32>, F, B);
1565

1666
fn complete(self, cqe: CqeResult) -> Self::Output {
1767
let mut buf = self.buf;
18-
1968
if let Ok(len) = cqe.result {
20-
let new_len = buf.len() + len as usize;
21-
// SAFETY: Kernel read len bytes
22-
unsafe { buf.set_len(new_len) };
69+
// SAFETY: kernel wrote exactly `len` bytes into the prepared buffer.
70+
unsafe { buf.uring_read_complete(len) };
2371
}
24-
2572
(cqe.result, self.fd, buf)
2673
}
2774

@@ -30,32 +77,38 @@ impl Completable for Read {
3077
}
3178
}
3279

33-
impl Cancellable for Read {
80+
impl Cancellable for Read<Vec<u8>, OwnedFd> {
3481
fn cancel(self) -> CancelData {
35-
CancelData::Read(self)
82+
CancelData::ReadVec(self)
3683
}
3784
}
3885

39-
impl Op<Read> {
40-
// Submit a request to read a FD at given length and offset into a
41-
// dynamic buffer with uninitialized memory. The read happens on uninitialized
42-
// buffer and no overwriting happens.
86+
impl Cancellable for Read<Buf, ArcFd> {
87+
fn cancel(self) -> CancelData {
88+
CancelData::ReadBuf(self)
89+
}
90+
}
4391

44-
// SAFETY: The `len` of the amount to be read and the buffer that is passed
45-
// should have capacity > len.
46-
//
47-
// If `len` read is higher than vector capacity then setting its length by
48-
// the caller in terms of size_read can be unsound.
49-
pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>, len: u32, offset: u64) -> Self {
50-
// don't overwrite on already written part
51-
assert!(buf.spare_capacity_mut().len() >= len as usize);
52-
let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast();
92+
impl<B, F> Op<Read<B, F>>
93+
where
94+
B: ReadBuffer + fmt::Debug,
95+
F: UringFd,
96+
Read<B, F>: Cancellable,
97+
{
98+
/// Submit a read operation via io-uring.
99+
///
100+
/// `max_len` is the maximum number of bytes to read.
101+
/// `offset` is the file offset; use `u64::MAX` for the current cursor.
102+
pub(crate) fn read_at(fd: F, mut buf: B, max_len: usize, offset: u64) -> Self {
103+
let (ptr, len) = buf.uring_read_prepare(max_len);
53104

54-
let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len)
105+
let sqe = opcode::Read::new(types::Fd(UringFd::as_raw_fd(&fd)), ptr, len)
55106
.offset(offset)
56107
.build();
57108

58-
// SAFETY: Parameters are valid for the entire duration of the operation
59-
unsafe { Op::new(read_op, Read { fd, buf }) }
109+
// SAFETY: `fd` and `buf`, which owns the heap buffer, are moved into `Read`,
110+
// which is held by the `Op` for the entire duration of the io-uring operation.
111+
// The buffer pointer remains valid because Vec heap data doesn't move.
112+
unsafe { Op::new(sqe, Read { fd, buf }) }
60113
}
61114
}

tokio/src/io/uring/utils.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,31 @@
1+
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
12
use std::os::unix::ffi::OsStrExt;
3+
use std::sync::Arc;
24
use std::{ffi::CString, io, path::Path};
35

6+
pub(crate) type ArcFd = Arc<dyn AsRawFd + Send + Sync + 'static>;
7+
8+
/// Raw file descriptor trait for io-uring operations.
9+
///
10+
/// `Arc<dyn AsRawFd>` does not satisfy `AsRawFd` because the blanket impl
11+
/// for `Arc<T>` requires `T: Sized`. This trait bridges that gap so both
12+
/// `OwnedFd` and `ArcFd` can be used generically with `Op::read_at`.
13+
pub(crate) trait UringFd: Send + Sync + 'static {
14+
fn as_raw_fd(&self) -> RawFd;
15+
}
16+
17+
impl UringFd for OwnedFd {
18+
fn as_raw_fd(&self) -> RawFd {
19+
AsRawFd::as_raw_fd(self)
20+
}
21+
}
22+
23+
impl UringFd for ArcFd {
24+
fn as_raw_fd(&self) -> RawFd {
25+
(**self).as_raw_fd()
26+
}
27+
}
28+
429
pub(crate) fn cstr(p: &Path) -> io::Result<CString> {
530
Ok(CString::new(p.as_os_str().as_bytes())?)
631
}

0 commit comments

Comments
 (0)