Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 199 additions & 36 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,61 @@ struct Inner {
#[derive(Debug)]
enum State {
Idle(Option<Buf>),
Busy(JoinHandle<(Operation, Buf)>),
Busy(JoinHandleInner<(Operation, Buf)>),
}

#[derive(Debug)]
enum JoinHandleInner<T> {
Blocking(JoinHandle<T>),
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
Async(BoxedOp<T>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an idea:

What if you perform the io-uring logic in a tokio::spawn task? That way, you can use JoinHandle in both cases.

I think it will simply code a lot.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Blocking(JoinHandle) is crate::blocking::JoinHandle not tokio::runtime::task::join::JoinHandle so some enum would be required anyway, and at that point might as well just have this kind of a boxed future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a re-export of the same thing.

pub(crate) use crate::task::JoinHandle;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason that I suggest this is that the io-uring logic performs multiple writes in a loop to write the entire buffer, which requires the end-user to await flush or write on the file to make progress. However, the implementation we have today does not require the user to interact with the file to make progress - it happens in the background automatically.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it, I'll change it.

Out of curiosity, general rule is that for a future to keep making progress, user needs to keep polling it. I feel like not having to do that specifically for fs::File is just an implementation detail, and users shouldn't rely on that always being the case. So i guess my question is how much is it worth preserving old behavior like this when implementing new features, if it comes at the cost of (small) performance overhead? relevant XKCD

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to adjust mocks to accommodate non-blocking spawning https://github.com/tokio-rs/tokio/actions/runs/19464918802/job/55697554687 b91836c - can you doublecheck im doing it right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of thing is always a hard question. Though I would say that I'm pretty sure people rely on writes continuing even if the file is dropped, and it would be very hard to change that.

}

cfg_io_uring! {
struct BoxedOp<T>(Pin<Box<dyn Future<Output = T> + Send + Sync + 'static>>);

impl<T> std::fmt::Debug for BoxedOp<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// format of BoxedFuture(T::type_name())
f.debug_tuple("BoxedFuture")
.field(&std::any::type_name::<T>())
.finish()
}
}

impl<T> Future for BoxedOp<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.as_mut().poll(cx)
}
}
}

impl Future for JoinHandleInner<(Operation, Buf)> {
type Output = io::Result<(Operation, Buf)>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut() {
JoinHandleInner::Blocking(ref mut jh) => Pin::new(jh)
.poll(cx)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "background task failed")),
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
JoinHandleInner::Async(ref mut jh) => Pin::new(jh).poll(cx).map(Ok),
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -399,7 +453,7 @@ impl File {

let std = self.std.clone();

inner.state = State::Busy(spawn_blocking(move || {
inner.state = State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| std.set_len(size))
} else {
Expand All @@ -409,7 +463,7 @@ impl File {

// Return the result as a seek
(Operation::Seek(res), buf)
}));
})));

let (op, buf) = match inner.state {
State::Idle(_) => unreachable!(),
Expand Down Expand Up @@ -613,13 +667,14 @@ impl AsyncRead for File {
let std = me.std.clone();

let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
inner.state = State::Busy(spawn_blocking(move || {
// SAFETY: the `Read` implementation of `std` does not
// read from the buffer it is borrowing and correctly
// reports the length of the data written into the buffer.
let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
(Operation::Read(res), buf)
}));
inner.state =
State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || {
// SAFETY: the `Read` implementation of `std` does not
// read from the buffer it is borrowing and correctly
// reports the length of the data written into the buffer.
let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
(Operation::Read(res), buf)
})));
}
State::Busy(ref mut rx) => {
let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
Expand Down Expand Up @@ -685,10 +740,10 @@ impl AsyncSeek for File {

let std = me.std.clone();

inner.state = State::Busy(spawn_blocking(move || {
inner.state = State::Busy(JoinHandleInner::Blocking(spawn_blocking(move || {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));
})));
Ok(())
}
}
Expand Down Expand Up @@ -753,20 +808,75 @@ impl AsyncWrite for File {
let n = buf.copy_from(src, me.max_buf_size);
let std = me.std.clone();

let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};
#[allow(unused_mut)]
let mut data = Some((std, buf));

let mut task_join_handle = None;

#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
{
use crate::runtime::Handle;

// Handle not present in some tests?
if let Ok(handle) = Handle::try_current() {
if handle.inner.driver().io().check_and_init()? {
task_join_handle = {
use crate::runtime::driver::op::Op;

let (std, buf) = data.take().unwrap();
if let Some(seek) = seek {
// we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset
// seeking only modifies kernel metadata and does not block, so we can do it here
(&*std).seek(seek).map_err(|e| {
io::Error::new(
e.kind(),
format!("failed to seek before write: {e}"),
)
})?;
}

let op = Op::write_at(std, buf, u64::MAX);

let handle = BoxedOp(Box::pin(async move {
let (r, buf, _fd) = op.await;
match r {
Ok(_n) => (Operation::Write(Ok(())), buf),
Err(e) => (Operation::Write(Err(e)), buf),
}
}));

Some(JoinHandleInner::Async(handle))
};
}
}
}

(Operation::Write(res), buf)
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
})?;
if let Some((std, mut buf)) = data {
task_join_handle = {
let handle = spawn_mandatory_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};

(Operation::Write(res), buf)
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
})?;

Some(JoinHandleInner::Blocking(handle))
};
}

inner.state = State::Busy(blocking_task_join_handle);
inner.state = State::Busy(task_join_handle.unwrap());

return Poll::Ready(Ok(n));
}
Expand Down Expand Up @@ -824,20 +934,73 @@ impl AsyncWrite for File {
let n = buf.copy_from_bufs(bufs, me.max_buf_size);
let std = me.std.clone();

let blocking_task_join_handle = spawn_mandatory_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};
#[allow(unused_mut)]
let mut data = Some((std, buf));

let mut task_join_handle = None;

#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
{
use crate::runtime::Handle;

// Handle not present in some tests?
if let Ok(handle) = Handle::try_current() {
Comment on lines +839 to +840
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely if the handle is not present, then spawn_mandatory_blocking would also fail. What tests is this?

if handle.inner.driver().io().check_and_init()? {
task_join_handle = {
use crate::runtime::driver::op::Op;

let (std, buf) = data.take().unwrap();
if let Some(seek) = seek {
// we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset
// seeking only modifies kernel metadata and does not block, so we can do it here
(&*std).seek(seek).map_err(|e| {
io::Error::new(
e.kind(),
format!("failed to seek before write: {e}"),
)
})?;
}

let op = Op::write_at(std, buf, u64::MAX);

let handle = BoxedOp(Box::pin(async move {
let (r, buf, _fd) = op.await;
match r {
Ok(_n) => (Operation::Write(Ok(())), buf),
Err(e) => (Operation::Write(Err(e)), buf),
}
}));

Some(JoinHandleInner::Async(handle))
};
}
}
}

(Operation::Write(res), buf)
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
})?;
if let Some((std, mut buf)) = data {
task_join_handle = Some(JoinHandleInner::Blocking(
spawn_mandatory_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
buf.write_to(&mut &*std)
};

(Operation::Write(res), buf)
})
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "background task failed")
})?,
));
}

inner.state = State::Busy(blocking_task_join_handle);
inner.state = State::Busy(task_join_handle.unwrap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing code returns io::Error::new(io::ErrorKind::Other, "background task failed") instead of panicking here. Please follow that pattern. If there are not tests for this, please add a test that would have caught this mistake.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't a panic on the io error, its an unwrap for task_join_handle: Option<_>, which should always be present. It's an Option to allow falling back from uring to regular writes


return Poll::Ready(Ok(n));
}
Expand Down
37 changes: 23 additions & 14 deletions tokio/src/fs/write.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
use crate::io::blocking;
use crate::{fs::asyncify, util::as_ref::OwnedBuf};

use std::{io, path::Path};
Expand Down Expand Up @@ -25,7 +33,6 @@ use std::{io, path::Path};
/// ```
pub async fn write(path: impl AsRef<Path>, contents: impl AsRef<[u8]>) -> io::Result<()> {
let path = path.as_ref();
let contents = crate::util::as_ref::upgrade(contents);

#[cfg(all(
tokio_unstable,
Expand All @@ -38,10 +45,15 @@ pub async fn write(path: impl AsRef<Path>, contents: impl AsRef<[u8]>) -> io::Re
let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init()? {
return write_uring(path, contents).await;
use crate::io::blocking;

let mut buf = blocking::Buf::with_capacity(contents.as_ref().len());
buf.copy_from(contents.as_ref(), contents.as_ref().len());
return write_uring(path, buf).await;
}
}

let contents = crate::util::as_ref::upgrade(contents);
write_spawn_blocking(path, contents).await
}

Expand All @@ -52,9 +64,9 @@ pub async fn write(path: impl AsRef<Path>, contents: impl AsRef<[u8]>) -> io::Re
feature = "fs",
target_os = "linux"
))]
async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> {
use crate::{fs::OpenOptions, runtime::driver::op::Op};
use std::os::fd::OwnedFd;
async fn write_uring(path: &Path, mut buf: blocking::Buf) -> io::Result<()> {
use crate::{fs::OpenOptions, io::uring::utils::ArcFd, runtime::driver::op::Op};
use std::sync::Arc;

let file = OpenOptions::new()
.write(true)
Expand All @@ -63,16 +75,14 @@ async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> {
.open(path)
.await?;

let mut fd: OwnedFd = file
.try_into_std()
.expect("unexpected in-flight operation detected")
.into();
let mut fd: ArcFd = Arc::new(
file.try_into_std()
.expect("unexpected in-flight operation detected"),
);

let total: usize = buf.as_ref().len();
let mut buf_offset: usize = 0;
let mut file_offset: u64 = 0;
while buf_offset < total {
let (n, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await;
while !buf.is_empty() {
let (n, _buf, _fd) = Op::write_at(fd, buf, file_offset).await;
// TODO: handle EINT here
let n = n?;
if n == 0 {
Expand All @@ -81,7 +91,6 @@ async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> {

buf = _buf;
fd = _fd;
buf_offset += n as usize;
file_offset += n as u64;
}

Expand Down
19 changes: 19 additions & 0 deletions tokio/src/io/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,25 @@ impl Buf {
&self.buf[self.pos..]
}

#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
pub(crate) fn advance(&mut self, n: usize) {
if n > self.len() {
panic!("advance past end of buffer");
}

self.pos += n;
if self.pos == self.buf.len() {
self.buf.truncate(0);
self.pos = 0;
}
}

/// # Safety
///
/// `rd` must not read from the buffer `read` is borrowing and must correctly
Expand Down
Loading
Loading