From d9d2ddc5e0be2b871c5cf19faec0022ed7dff9c8 Mon Sep 17 00:00:00 2001 From: red Date: Mon, 27 Oct 2025 14:56:12 +0100 Subject: [PATCH 01/12] implement uring AsyncWrite for File --- tokio/src/fs/file.rs | 235 ++++++++++++++++++++++++++++++------ tokio/src/fs/write.rs | 37 +++--- tokio/src/io/blocking.rs | 21 +++- tokio/src/io/uring/utils.rs | 4 + tokio/src/io/uring/write.rs | 40 +++--- 5 files changed, 269 insertions(+), 68 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 755b5eabd16..b63e21a4c04 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -107,7 +107,61 @@ struct Inner { #[derive(Debug)] enum State { Idle(Option), - Busy(JoinHandle<(Operation, Buf)>), + Busy(JoinHandleInner<(Operation, Buf)>), +} + +#[derive(Debug)] +enum JoinHandleInner { + Blocking(JoinHandle), + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + Async(BoxedOp), +} + +cfg_io_uring! { + struct BoxedOp(Pin + Send + Sync + 'static>>); + + impl std::fmt::Debug for BoxedOp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // format of BoxedFuture(T::type_name()) + f.debug_tuple("BoxedFuture") + .field(&std::any::type_name::()) + .finish() + } + } + + impl Future for BoxedOp { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0.as_mut().poll(cx) + } + } +} + +impl Future for JoinHandleInner<(Operation, Buf)> { + type Output = io::Result<(Operation, Buf)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.get_mut() { + JoinHandleInner::Blocking(ref mut jh) => Pin::new(jh) + .poll(cx) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "background task failed")), + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + JoinHandleInner::Async(ref mut jh) => Pin::new(jh).poll(cx).map(Ok), + } + } } #[derive(Debug)] @@ -399,7 +453,7 @@ impl File { let std = self.std.clone(); - inner.state = State::Busy(spawn_blocking(move || { + inner.state = State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || { let res = if let Some(seek) = seek { (&*std).seek(seek).and_then(|_| std.set_len(size)) } else { @@ -409,7 +463,7 @@ impl File { // Return the result as a seek (Operation::Seek(res), buf) - })); + }))); let (op, buf) = match inner.state { State::Idle(_) => unreachable!(), @@ -613,13 +667,14 @@ impl AsyncRead for File { let std = me.std.clone(); let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size); - inner.state = State::Busy(spawn_blocking(move || { - // SAFETY: the `Read` implementation of `std` does not - // read from the buffer it is borrowing and correctly - // reports the length of the data written into the buffer. - let res = unsafe { buf.read_from(&mut &*std, max_buf_size) }; - (Operation::Read(res), buf) - })); + inner.state = + State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || { + // SAFETY: the `Read` implementation of `std` does not + // read from the buffer it is borrowing and correctly + // reports the length of the data written into the buffer. + let res = unsafe { buf.read_from(&mut &*std, max_buf_size) }; + (Operation::Read(res), buf) + }))); } State::Busy(ref mut rx) => { let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; @@ -685,10 +740,10 @@ impl AsyncSeek for File { let std = me.std.clone(); - inner.state = State::Busy(spawn_blocking(move || { + inner.state = State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || { let res = (&*std).seek(pos); (Operation::Seek(res), buf) - })); + }))); Ok(()) } } @@ -753,20 +808,75 @@ impl AsyncWrite for File { let n = buf.copy_from(src, me.max_buf_size); let std = me.std.clone(); - let blocking_task_join_handle = spawn_mandatory_blocking(move || { - let res = if let Some(seek) = seek { - (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) - } else { - buf.write_to(&mut &*std) - }; + #[allow(unused_mut)] + let mut data = Some((std, buf)); + + let mut task_join_handle = None; + + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + { + use crate::runtime::Handle; + + // Handle not present in some tests? + if let Ok(handle) = Handle::try_current() { + if handle.inner.driver().io().check_and_init()? { + task_join_handle = { + use crate::runtime::driver::op::Op; + + let (std, buf) = data.take().unwrap(); + if let Some(seek) = seek { + // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset + // seeking only modifies kernel metadata and does not block, so we can do it here + (&*std).seek(seek).map_err(|e| { + io::Error::new( + e.kind(), + format!("failed to seek before write: {e}"), + ) + })?; + } + + let op = Op::write_at(std, buf, u64::MAX); + + let handle = BoxedOp(Box::pin(async move { + let (r, buf, _fd) = op.await; + match r { + Ok(_n) => (Operation::Write(Ok(())), buf), + Err(e) => (Operation::Write(Err(e)), buf), + } + })); + + Some(JoinHandleInner::Async(handle)) + }; + } + } + } - (Operation::Write(res), buf) - }) - .ok_or_else(|| { - io::Error::new(io::ErrorKind::Other, "background task failed") - })?; + if let Some((std, mut buf)) = data { + task_join_handle = { + let handle = spawn_mandatory_blocking(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + } else { + buf.write_to(&mut &*std) + }; + + (Operation::Write(res), buf) + }) + .ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "background task failed") + })?; + + Some(JoinHandleInner::Blocking(handle)) + }; + } - inner.state = State::Busy(blocking_task_join_handle); + inner.state = State::Busy(task_join_handle.unwrap()); return Poll::Ready(Ok(n)); } @@ -824,20 +934,73 @@ impl AsyncWrite for File { let n = buf.copy_from_bufs(bufs, me.max_buf_size); let std = me.std.clone(); - let blocking_task_join_handle = spawn_mandatory_blocking(move || { - let res = if let Some(seek) = seek { - (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) - } else { - buf.write_to(&mut &*std) - }; + #[allow(unused_mut)] + let mut data = Some((std, buf)); + + let mut task_join_handle = None; + + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + { + use crate::runtime::Handle; + + // Handle not present in some tests? + if let Ok(handle) = Handle::try_current() { + if handle.inner.driver().io().check_and_init()? { + task_join_handle = { + use crate::runtime::driver::op::Op; + + let (std, buf) = data.take().unwrap(); + if let Some(seek) = seek { + // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset + // seeking only modifies kernel metadata and does not block, so we can do it here + (&*std).seek(seek).map_err(|e| { + io::Error::new( + e.kind(), + format!("failed to seek before write: {e}"), + ) + })?; + } + + let op = Op::write_at(std, buf, u64::MAX); + + let handle = BoxedOp(Box::pin(async move { + let (r, buf, _fd) = op.await; + match r { + Ok(_n) => (Operation::Write(Ok(())), buf), + Err(e) => (Operation::Write(Err(e)), buf), + } + })); + + Some(JoinHandleInner::Async(handle)) + }; + } + } + } - (Operation::Write(res), buf) - }) - .ok_or_else(|| { - io::Error::new(io::ErrorKind::Other, "background task failed") - })?; + if let Some((std, mut buf)) = data { + task_join_handle = Some(JoinHandleInner::Blocking( + spawn_mandatory_blocking(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + } else { + buf.write_to(&mut &*std) + }; + + (Operation::Write(res), buf) + }) + .ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "background task failed") + })?, + )); + } - inner.state = State::Busy(blocking_task_join_handle); + inner.state = State::Busy(task_join_handle.unwrap()); return Poll::Ready(Ok(n)); } diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index c70a1978811..443905f563b 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -1,3 +1,11 @@ +#[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] +use crate::io::blocking; use crate::{fs::asyncify, util::as_ref::OwnedBuf}; use std::{io, path::Path}; @@ -25,7 +33,6 @@ use std::{io, path::Path}; /// ``` pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Result<()> { let path = path.as_ref(); - let contents = crate::util::as_ref::upgrade(contents); #[cfg(all( tokio_unstable, @@ -38,10 +45,15 @@ pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Re let handle = crate::runtime::Handle::current(); let driver_handle = handle.inner.driver().io(); if driver_handle.check_and_init()? { - return write_uring(path, contents).await; + use crate::io::blocking; + + let mut buf = blocking::Buf::with_capacity(contents.as_ref().len()); + buf.copy_from(contents.as_ref(), contents.as_ref().len()); + return write_uring(path, buf).await; } } + let contents = crate::util::as_ref::upgrade(contents); write_spawn_blocking(path, contents).await } @@ -52,9 +64,9 @@ pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Re feature = "fs", target_os = "linux" ))] -async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { - use crate::{fs::OpenOptions, runtime::driver::op::Op}; - use std::os::fd::OwnedFd; +async fn write_uring(path: &Path, mut buf: blocking::Buf) -> io::Result<()> { + use crate::{fs::OpenOptions, io::uring::utils::ArcFd, runtime::driver::op::Op}; + use std::sync::Arc; let file = OpenOptions::new() .write(true) @@ -63,16 +75,14 @@ async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { .open(path) .await?; - let mut fd: OwnedFd = file - .try_into_std() - .expect("unexpected in-flight operation detected") - .into(); + let mut fd: ArcFd = Arc::new( + file.try_into_std() + .expect("unexpected in-flight operation detected"), + ); - let total: usize = buf.as_ref().len(); - let mut buf_offset: usize = 0; let mut file_offset: u64 = 0; - while buf_offset < total { - let (n, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await; + while !buf.is_empty() { + let (n, _buf, _fd) = Op::write_at(fd, buf, file_offset).await; // TODO: handle EINT here let n = n?; if n == 0 { @@ -81,7 +91,6 @@ async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { buf = _buf; fd = _fd; - buf_offset += n as usize; file_offset += n as u64; } diff --git a/tokio/src/io/blocking.rs b/tokio/src/io/blocking.rs index 1af5065456d..34046046b90 100644 --- a/tokio/src/io/blocking.rs +++ b/tokio/src/io/blocking.rs @@ -7,7 +7,7 @@ use std::io; use std::io::prelude::*; use std::mem::MaybeUninit; use std::pin::Pin; -use std::task::{ready, Context, Poll}; +use std::task::{Context, Poll, ready}; /// `T` should not implement _both_ Read and Write. #[derive(Debug)] @@ -234,6 +234,25 @@ impl Buf { &self.buf[self.pos..] } + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + pub(crate) fn advance(&mut self, n: usize) { + if n > self.len() { + panic!("advance past end of buffer"); + } + + self.pos += n; + if self.pos == self.buf.len() { + self.buf.truncate(0); + self.pos = 0; + } + } + /// # Safety /// /// `rd` must not read from the buffer `read` is borrowing and must correctly diff --git a/tokio/src/io/uring/utils.rs b/tokio/src/io/uring/utils.rs index e30e7a5ddc4..7b731c8ea46 100644 --- a/tokio/src/io/uring/utils.rs +++ b/tokio/src/io/uring/utils.rs @@ -1,6 +1,10 @@ +use std::os::fd::AsRawFd; use std::os::unix::ffi::OsStrExt; +use std::sync::Arc; use std::{ffi::CString, io, path::Path}; +pub(crate) type ArcFd = Arc; + pub(crate) fn cstr(p: &Path) -> io::Result { Ok(CString::new(p.as_os_str().as_bytes())?) } diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 7341f7622da..14c61ce2ebe 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -1,19 +1,31 @@ +use crate::io::blocking; +use crate::io::uring::utils::ArcFd; use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; -use crate::util::as_ref::OwnedBuf; use io_uring::{opcode, types}; use std::io::{self, Error}; -use std::os::fd::{AsRawFd, OwnedFd}; -#[derive(Debug)] pub(crate) struct Write { - buf: OwnedBuf, - fd: OwnedFd, + buf: blocking::Buf, + fd: ArcFd, +} + +impl std::fmt::Debug for Write { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Write") + .field("buf_len", &self.buf.len()) + .field("fd", &self.fd.as_raw_fd()) + .finish() + } } impl Completable for Write { - type Output = (io::Result, OwnedBuf, OwnedFd); - fn complete(self, cqe: CqeResult) -> Self::Output { + type Output = (io::Result, blocking::Buf, ArcFd); + fn complete(mut self, cqe: CqeResult) -> Self::Output { + if let Ok(n) = cqe.result.as_ref() { + self.buf.advance(*n as usize); + } + (cqe.result, self.buf, self.fd) } @@ -31,17 +43,12 @@ impl Cancellable for Write { impl Op { /// Issue a write that starts at `buf_offset` within `buf` and writes some bytes /// into `file` at `file_offset`. - pub(crate) fn write_at( - fd: OwnedFd, - buf: OwnedBuf, - buf_offset: usize, - file_offset: u64, - ) -> io::Result { + pub(crate) fn write_at(fd: ArcFd, buf: blocking::Buf, file_offset: u64) -> Self { // There is a cap on how many bytes we can write in a single uring write operation. // ref: https://github.com/axboe/liburing/discussions/497 - let len = u32::try_from(buf.as_ref().len() - buf_offset).unwrap_or(u32::MAX); + let len = u32::try_from(buf.len()).unwrap_or(u32::MAX); - let ptr = buf.as_ref()[buf_offset..buf_offset + len as usize].as_ptr(); + let ptr = buf.bytes().as_ptr(); let sqe = opcode::Write::new(types::Fd(fd.as_raw_fd()), ptr, len) .offset(file_offset) @@ -49,7 +56,6 @@ impl Op { // SAFETY: parameters of the entry, such as `fd` and `buf`, are valid // until this operation completes. - let op = unsafe { Op::new(sqe, Write { buf, fd }) }; - Ok(op) + unsafe { Op::new(sqe, Write { buf, fd }) } } } From 093070a8243dcb34b4a517f4f48fc548bd422c80 Mon Sep 17 00:00:00 2001 From: red Date: Mon, 27 Oct 2025 15:08:07 +0100 Subject: [PATCH 02/12] add basic uring File write tests --- tokio/tests/fs_uring.rs | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tokio/tests/fs_uring.rs b/tokio/tests/fs_uring.rs index cd0d207d278..e914cdf6112 100644 --- a/tokio/tests/fs_uring.rs +++ b/tokio/tests/fs_uring.rs @@ -14,6 +14,9 @@ use std::task::Poll; use std::time::Duration; use std::{future::poll_fn, path::PathBuf}; use tempfile::NamedTempFile; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; +use tokio::io::AsyncWriteExt; use tokio::{ fs::OpenOptions, runtime::{Builder, Runtime}, @@ -145,6 +148,41 @@ async fn cancel_op_future() { assert!(res.is_cancelled()); } +#[tokio::test] +async fn test_file_write() { + let (_tmp_file, path): (Vec, Vec) = create_tmp_files(1); + + let mut file = OpenOptions::new().write(true).open(&path[0]).await.unwrap(); + + let data = b"hello io_uring"; + file.write_all(data).await.unwrap(); +} + +#[tokio::test] +async fn test_file_write_seek() { + let (_tmp_file, path): (Vec, Vec) = create_tmp_files(1); + + let mut file = OpenOptions::new() + .write(true) + .read(true) + .open(&path[0]) + .await + .unwrap(); + + let data = b"hello uring"; + file.write_all(data).await.unwrap(); + + file.seek(std::io::SeekFrom::Start(6)).await.unwrap(); + + let data2 = b"world"; + file.write_all(data2).await.unwrap(); + + let mut content = vec![0u8; 11]; + file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + file.read_exact(&mut content).await.unwrap(); + assert_eq!(&content, b"hello world"); +} + fn create_tmp_files(num_files: usize) -> (Vec, Vec) { let mut files = Vec::with_capacity(num_files); for _ in 0..num_files { From 6856207f952fa00a5b5755b2fda6526f69ab5312 Mon Sep 17 00:00:00 2001 From: red Date: Mon, 27 Oct 2025 15:23:50 +0100 Subject: [PATCH 03/12] formatting --- tokio/src/io/blocking.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/io/blocking.rs b/tokio/src/io/blocking.rs index 34046046b90..5c9307d5614 100644 --- a/tokio/src/io/blocking.rs +++ b/tokio/src/io/blocking.rs @@ -7,7 +7,7 @@ use std::io; use std::io::prelude::*; use std::mem::MaybeUninit; use std::pin::Pin; -use std::task::{Context, Poll, ready}; +use std::task::{ready, Context, Poll}; /// `T` should not implement _both_ Read and Write. #[derive(Debug)] From 863a69c8599e390dd273115693e3349c2432bdb0 Mon Sep 17 00:00:00 2001 From: red Date: Wed, 29 Oct 2025 14:30:01 +0100 Subject: [PATCH 04/12] remove sync bound from BoxedOp --- tokio/src/fs/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index b63e21a4c04..44733db8c4a 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -124,7 +124,7 @@ enum JoinHandleInner { } cfg_io_uring! { - struct BoxedOp(Pin + Send + Sync + 'static>>); + struct BoxedOp(Pin + Send + 'static>>); impl std::fmt::Debug for BoxedOp { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { From ea4040d0c902781d2af4ed6b98afb540eb1abfa9 Mon Sep 17 00:00:00 2001 From: red Date: Wed, 29 Oct 2025 14:35:20 +0100 Subject: [PATCH 05/12] make sure to write the full buffer in uring file writes --- tokio/src/fs/file.rs | 62 ++++++++++++++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 44733db8c4a..a88bc62d9eb 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -827,9 +827,9 @@ impl AsyncWrite for File { if let Ok(handle) = Handle::try_current() { if handle.inner.driver().io().check_and_init()? { task_join_handle = { - use crate::runtime::driver::op::Op; + use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op}; - let (std, buf) = data.take().unwrap(); + let (std, mut buf) = data.take().unwrap(); if let Some(seek) = seek { // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset // seeking only modifies kernel metadata and does not block, so we can do it here @@ -841,13 +841,28 @@ impl AsyncWrite for File { })?; } - let op = Op::write_at(std, buf, u64::MAX); - + let mut fd: ArcFd = std; let handle = BoxedOp(Box::pin(async move { - let (r, buf, _fd) = op.await; - match r { - Ok(_n) => (Operation::Write(Ok(())), buf), - Err(e) => (Operation::Write(Err(e)), buf), + loop { + let op = Op::write_at(fd, buf, u64::MAX); + let (r, _buf, _fd) = op.await; + buf = _buf; + fd = _fd; + match r { + Ok(0) => { + break ( + Operation::Write(Err( + io::ErrorKind::WriteZero.into(), + )), + buf, + ); + } + Ok(_) if buf.is_empty() => { + break (Operation::Write(Ok(())), buf); + } + Ok(_) => continue, // more to write + Err(e) => break (Operation::Write(Err(e)), buf), + } } })); @@ -953,9 +968,9 @@ impl AsyncWrite for File { if let Ok(handle) = Handle::try_current() { if handle.inner.driver().io().check_and_init()? { task_join_handle = { - use crate::runtime::driver::op::Op; + use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op}; - let (std, buf) = data.take().unwrap(); + let (std, mut buf) = data.take().unwrap(); if let Some(seek) = seek { // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset // seeking only modifies kernel metadata and does not block, so we can do it here @@ -967,13 +982,28 @@ impl AsyncWrite for File { })?; } - let op = Op::write_at(std, buf, u64::MAX); - + let mut fd: ArcFd = std; let handle = BoxedOp(Box::pin(async move { - let (r, buf, _fd) = op.await; - match r { - Ok(_n) => (Operation::Write(Ok(())), buf), - Err(e) => (Operation::Write(Err(e)), buf), + loop { + let op = Op::write_at(fd, buf, u64::MAX); + let (r, _buf, _fd) = op.await; + buf = _buf; + fd = _fd; + match r { + Ok(0) => { + break ( + Operation::Write(Err( + io::ErrorKind::WriteZero.into(), + )), + buf, + ); + } + Ok(_) if buf.is_empty() => { + break (Operation::Write(Ok(())), buf); + } + Ok(_) => continue, // more to write + Err(e) => break (Operation::Write(Err(e)), buf), + } } })); From 226fcf8e700cdc524b91c3422d9f1252f8e5a665 Mon Sep 17 00:00:00 2001 From: red Date: Wed, 29 Oct 2025 14:54:46 +0100 Subject: [PATCH 06/12] Revert "remove sync bound from BoxedOp" This reverts commit 863a69c8599e390dd273115693e3349c2432bdb0. --- tokio/src/fs/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index a88bc62d9eb..d7e9121c6e3 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -124,7 +124,7 @@ enum JoinHandleInner { } cfg_io_uring! { - struct BoxedOp(Pin + Send + 'static>>); + struct BoxedOp(Pin + Send + Sync + 'static>>); impl std::fmt::Debug for BoxedOp { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { From 937da2df80e75ca36e0e20335c535f04c537c0b5 Mon Sep 17 00:00:00 2001 From: red Date: Tue, 18 Nov 2025 01:51:45 +0100 Subject: [PATCH 07/12] move part of poll_write to its own function --- tokio/src/fs/file.rs | 171 ++++++++++++++++++++++--------------------- 1 file changed, 88 insertions(+), 83 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index d7e9121c6e3..d95bbfa5daf 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -137,7 +137,6 @@ cfg_io_uring! { impl Future for BoxedOp { type Output = T; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.0.as_mut().poll(cx) } @@ -809,89 +808,9 @@ impl AsyncWrite for File { let std = me.std.clone(); #[allow(unused_mut)] - let mut data = Some((std, buf)); - - let mut task_join_handle = None; + let mut task_join_handle = inner.poll_write_inner((std, buf), seek)?; - #[cfg(all( - tokio_unstable, - feature = "io-uring", - feature = "rt", - feature = "fs", - target_os = "linux" - ))] - { - use crate::runtime::Handle; - - // Handle not present in some tests? - if let Ok(handle) = Handle::try_current() { - if handle.inner.driver().io().check_and_init()? { - task_join_handle = { - use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op}; - - let (std, mut buf) = data.take().unwrap(); - if let Some(seek) = seek { - // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset - // seeking only modifies kernel metadata and does not block, so we can do it here - (&*std).seek(seek).map_err(|e| { - io::Error::new( - e.kind(), - format!("failed to seek before write: {e}"), - ) - })?; - } - - let mut fd: ArcFd = std; - let handle = BoxedOp(Box::pin(async move { - loop { - let op = Op::write_at(fd, buf, u64::MAX); - let (r, _buf, _fd) = op.await; - buf = _buf; - fd = _fd; - match r { - Ok(0) => { - break ( - Operation::Write(Err( - io::ErrorKind::WriteZero.into(), - )), - buf, - ); - } - Ok(_) if buf.is_empty() => { - break (Operation::Write(Ok(())), buf); - } - Ok(_) => continue, // more to write - Err(e) => break (Operation::Write(Err(e)), buf), - } - } - })); - - Some(JoinHandleInner::Async(handle)) - }; - } - } - } - - if let Some((std, mut buf)) = data { - task_join_handle = { - let handle = spawn_mandatory_blocking(move || { - let res = if let Some(seek) = seek { - (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) - } else { - buf.write_to(&mut &*std) - }; - - (Operation::Write(res), buf) - }) - .ok_or_else(|| { - io::Error::new(io::ErrorKind::Other, "background task failed") - })?; - - Some(JoinHandleInner::Blocking(handle)) - }; - } - - inner.state = State::Busy(task_join_handle.unwrap()); + inner.state = State::Busy(task_join_handle); return Poll::Ready(Ok(n)); } @@ -1178,6 +1097,92 @@ impl Inner { Operation::Seek(_) => Poll::Ready(Ok(())), } } + + fn poll_write_inner( + &self, + data: (Arc, Buf), + seek: Option, + ) -> io::Result> { + #[allow(unused_mut)] + let mut data = Some(data); + let mut task_join_handle = None; + + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + { + use crate::runtime::Handle; + + // Handle not present in some tests? + if let Ok(handle) = Handle::try_current() { + if handle.inner.driver().io().check_and_init()? { + task_join_handle = { + use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op}; + + let (std, mut buf) = data.take().unwrap(); + if let Some(seek) = seek { + // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset + // seeking only modifies kernel metadata and does not block, so we can do it here + (&*std).seek(seek).map_err(|e| { + io::Error::new( + e.kind(), + format!("failed to seek before write: {e}"), + ) + })?; + } + + let mut fd: ArcFd = std; + let handle = BoxedOp(Box::pin(async move { + loop { + let op = Op::write_at(fd, buf, u64::MAX); + let (r, _buf, _fd) = op.await; + buf = _buf; + fd = _fd; + match r { + Ok(0) => { + break ( + Operation::Write(Err(io::ErrorKind::WriteZero.into())), + buf, + ); + } + Ok(_) if buf.is_empty() => { + break (Operation::Write(Ok(())), buf); + } + Ok(_) => continue, // more to write + Err(e) => break (Operation::Write(Err(e)), buf), + } + } + })); + + Some(JoinHandleInner::Async(handle)) + }; + } + } + } + + if let Some((std, mut buf)) = data { + task_join_handle = { + let handle = spawn_mandatory_blocking(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + } else { + buf.write_to(&mut &*std) + }; + + (Operation::Write(res), buf) + }) + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "background task failed"))?; + + Some(JoinHandleInner::Blocking(handle)) + }; + } + + Ok(task_join_handle.unwrap()) + } } #[cfg(test)] From c30a4dd8d4b717b85bc61b67511eff390830d279 Mon Sep 17 00:00:00 2001 From: red Date: Tue, 18 Nov 2025 12:42:40 +0100 Subject: [PATCH 08/12] implement review comment suggestions --- tokio/src/fs/file.rs | 113 ++++++++++-------------------------- tokio/src/fs/write.rs | 2 - tokio/src/io/uring/write.rs | 3 +- 3 files changed, 31 insertions(+), 87 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index d95bbfa5daf..48a056238a1 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -107,60 +107,7 @@ struct Inner { #[derive(Debug)] enum State { Idle(Option), - Busy(JoinHandleInner<(Operation, Buf)>), -} - -#[derive(Debug)] -enum JoinHandleInner { - Blocking(JoinHandle), - #[cfg(all( - tokio_unstable, - feature = "io-uring", - feature = "rt", - feature = "fs", - target_os = "linux" - ))] - Async(BoxedOp), -} - -cfg_io_uring! { - struct BoxedOp(Pin + Send + Sync + 'static>>); - - impl std::fmt::Debug for BoxedOp { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // format of BoxedFuture(T::type_name()) - f.debug_tuple("BoxedFuture") - .field(&std::any::type_name::()) - .finish() - } - } - - impl Future for BoxedOp { - type Output = T; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.0.as_mut().poll(cx) - } - } -} - -impl Future for JoinHandleInner<(Operation, Buf)> { - type Output = io::Result<(Operation, Buf)>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.get_mut() { - JoinHandleInner::Blocking(ref mut jh) => Pin::new(jh) - .poll(cx) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "background task failed")), - #[cfg(all( - tokio_unstable, - feature = "io-uring", - feature = "rt", - feature = "fs", - target_os = "linux" - ))] - JoinHandleInner::Async(ref mut jh) => Pin::new(jh).poll(cx).map(Ok), - } - } + Busy(JoinHandle<(Operation, Buf)>), } #[derive(Debug)] @@ -452,7 +399,7 @@ impl File { let std = self.std.clone(); - inner.state = State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || { + inner.state = State::Busy(spawn_blocking(move || { let res = if let Some(seek) = seek { (&*std).seek(seek).and_then(|_| std.set_len(size)) } else { @@ -462,7 +409,7 @@ impl File { // Return the result as a seek (Operation::Seek(res), buf) - }))); + })); let (op, buf) = match inner.state { State::Idle(_) => unreachable!(), @@ -666,14 +613,13 @@ impl AsyncRead for File { let std = me.std.clone(); let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size); - inner.state = - State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || { - // SAFETY: the `Read` implementation of `std` does not - // read from the buffer it is borrowing and correctly - // reports the length of the data written into the buffer. - let res = unsafe { buf.read_from(&mut &*std, max_buf_size) }; - (Operation::Read(res), buf) - }))); + inner.state = State::Busy(spawn_blocking(move || { + // SAFETY: the `Read` implementation of `std` does not + // read from the buffer it is borrowing and correctly + // reports the length of the data written into the buffer. + let res = unsafe { buf.read_from(&mut &*std, max_buf_size) }; + (Operation::Read(res), buf) + })); } State::Busy(ref mut rx) => { let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; @@ -739,10 +685,10 @@ impl AsyncSeek for File { let std = me.std.clone(); - inner.state = State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || { + inner.state = State::Busy(spawn_blocking(move || { let res = (&*std).seek(pos); (Operation::Seek(res), buf) - }))); + })); Ok(()) } } @@ -871,7 +817,7 @@ impl AsyncWrite for File { #[allow(unused_mut)] let mut data = Some((std, buf)); - let mut task_join_handle = None; + let mut task_join_handle: Option> = None; #[cfg(all( tokio_unstable, @@ -902,13 +848,16 @@ impl AsyncWrite for File { } let mut fd: ArcFd = std; - let handle = BoxedOp(Box::pin(async move { + let handle = crate::spawn(async move { loop { let op = Op::write_at(fd, buf, u64::MAX); let (r, _buf, _fd) = op.await; buf = _buf; fd = _fd; match r { + Ok(_) if buf.is_empty() => { + break (Operation::Write(Ok(())), buf); + } Ok(0) => { break ( Operation::Write(Err( @@ -917,23 +866,20 @@ impl AsyncWrite for File { buf, ); } - Ok(_) if buf.is_empty() => { - break (Operation::Write(Ok(())), buf); - } Ok(_) => continue, // more to write Err(e) => break (Operation::Write(Err(e)), buf), } } - })); + }); - Some(JoinHandleInner::Async(handle)) + Some(handle) }; } } } if let Some((std, mut buf)) = data { - task_join_handle = Some(JoinHandleInner::Blocking( + task_join_handle = Some( spawn_mandatory_blocking(move || { let res = if let Some(seek) = seek { (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) @@ -946,7 +892,7 @@ impl AsyncWrite for File { .ok_or_else(|| { io::Error::new(io::ErrorKind::Other, "background task failed") })?, - )); + ); } inner.state = State::Busy(task_join_handle.unwrap()); @@ -1102,7 +1048,7 @@ impl Inner { &self, data: (Arc, Buf), seek: Option, - ) -> io::Result> { + ) -> io::Result> { #[allow(unused_mut)] let mut data = Some(data); let mut task_join_handle = None; @@ -1136,29 +1082,30 @@ impl Inner { } let mut fd: ArcFd = std; - let handle = BoxedOp(Box::pin(async move { + let handle = crate::spawn(async move { loop { let op = Op::write_at(fd, buf, u64::MAX); let (r, _buf, _fd) = op.await; buf = _buf; fd = _fd; match r { + Ok(_) if buf.is_empty() => { + break (Operation::Write(Ok(())), buf); + } Ok(0) => { break ( Operation::Write(Err(io::ErrorKind::WriteZero.into())), buf, ); } - Ok(_) if buf.is_empty() => { - break (Operation::Write(Ok(())), buf); - } + Ok(_) => continue, // more to write Err(e) => break (Operation::Write(Err(e)), buf), } } - })); + }); - Some(JoinHandleInner::Async(handle)) + Some(handle) }; } } @@ -1177,7 +1124,7 @@ impl Inner { }) .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "background task failed"))?; - Some(JoinHandleInner::Blocking(handle)) + Some(handle) }; } diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index 443905f563b..c4cf81aca7c 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -45,8 +45,6 @@ pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Re let handle = crate::runtime::Handle::current(); let driver_handle = handle.inner.driver().io(); if driver_handle.check_and_init()? { - use crate::io::blocking; - let mut buf = blocking::Buf::with_capacity(contents.as_ref().len()); buf.copy_from(contents.as_ref(), contents.as_ref().len()); return write_uring(path, buf).await; diff --git a/tokio/src/io/uring/write.rs b/tokio/src/io/uring/write.rs index 14c61ce2ebe..eb999ba0003 100644 --- a/tokio/src/io/uring/write.rs +++ b/tokio/src/io/uring/write.rs @@ -41,8 +41,7 @@ impl Cancellable for Write { } impl Op { - /// Issue a write that starts at `buf_offset` within `buf` and writes some bytes - /// into `file` at `file_offset`. + /// Issue a write at `file_offset` from the provided `buf`. To use current file cursor, set `file_offset` to `-1` or `u64::MAX`. pub(crate) fn write_at(fd: ArcFd, buf: blocking::Buf, file_offset: u64) -> Self { // There is a cap on how many bytes we can write in a single uring write operation. // ref: https://github.com/axboe/liburing/discussions/497 From b91836c93f3e0a1d669848d3e807536a07aa92e4 Mon Sep 17 00:00:00 2001 From: red Date: Tue, 18 Nov 2025 13:04:10 +0100 Subject: [PATCH 09/12] add non-blocking spawn to mocks --- tokio/src/fs/file.rs | 11 +++++++++-- tokio/src/fs/mocks.rs | 14 ++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 48a056238a1..42bad2897b6 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -30,6 +30,13 @@ use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; #[cfg(not(test))] use std::fs::File as StdFile; +cfg_io_uring! { + #[cfg(test)] + use super::mocks::spawn; + #[cfg(not(test))] + use crate::spawn; +} + /// A reference to an open file on the filesystem. /// /// This is a specialized version of [`std::fs::File`] for usage from the @@ -848,7 +855,7 @@ impl AsyncWrite for File { } let mut fd: ArcFd = std; - let handle = crate::spawn(async move { + let handle = spawn(async move { loop { let op = Op::write_at(fd, buf, u64::MAX); let (r, _buf, _fd) = op.await; @@ -1082,7 +1089,7 @@ impl Inner { } let mut fd: ArcFd = std; - let handle = crate::spawn(async move { + let handle = spawn(async move { loop { let op = Op::write_at(fd, buf, u64::MAX); let (r, _buf, _fd) = op.await; diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index ae5d7e5368e..f07796818aa 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -145,6 +145,20 @@ where Some(JoinHandle { rx }) } +pub(super) fn spawn(f: F) -> JoinHandle +where + F: Future + Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + + let task = crate::spawn(async move { + let res = f.await; + let _ = tx.send(res); + }); + + Some(JoinHandle { rx }) +} + impl Future for JoinHandle { type Output = Result; From 58e9b0b03bc8e27cad30f613de53c1c41de582e8 Mon Sep 17 00:00:00 2001 From: red Date: Tue, 18 Nov 2025 13:37:18 +0100 Subject: [PATCH 10/12] fix type issue in mock spawn --- tokio/src/fs/mocks.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index f07796818aa..ec620ddb94d 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -156,7 +156,7 @@ where let _ = tx.send(res); }); - Some(JoinHandle { rx }) + JoinHandle { rx } } impl Future for JoinHandle { From 9504e533fe1c5ebe24add3801f6c6d7b26fb663f Mon Sep 17 00:00:00 2001 From: red Date: Tue, 18 Nov 2025 13:43:00 +0100 Subject: [PATCH 11/12] fix spawn signature 2.0 --- tokio/src/fs/mocks.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index ec620ddb94d..cb2044fa659 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -145,13 +145,15 @@ where Some(JoinHandle { rx }) } -pub(super) fn spawn(f: F) -> JoinHandle +#[allow(dead_code)] +pub(super) fn spawn(f: F) -> JoinHandle where - F: Future + Send + 'static, + F: Future + Send + 'static, + F::Output: Send + 'static, { let (tx, rx) = oneshot::channel(); - let task = crate::spawn(async move { + let _ = crate::spawn(async move { let res = f.await; let _ = tx.send(res); }); From daa0532793e3de7c2f545549f9e60436ceacacf2 Mon Sep 17 00:00:00 2001 From: red Date: Tue, 18 Nov 2025 13:45:28 +0100 Subject: [PATCH 12/12] appease clippy --- tokio/src/fs/mocks.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index cb2044fa659..2bdeedbfce8 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -153,7 +153,7 @@ where { let (tx, rx) = oneshot::channel(); - let _ = crate::spawn(async move { + let _task = crate::spawn(async move { let res = f.await; let _ = tx.send(res); });