-
-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Implement uring in AsyncWrite for fs::File
#7713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
This should be ready for merging i think, is there anything else blocking it? |
|
Sorry I've been too focused on #7696. I will try to review this PR too. |
tokio/src/fs/file.rs
Outdated
| #[derive(Debug)] | ||
| enum JoinHandleInner<T> { | ||
| Blocking(JoinHandle<T>), | ||
| #[cfg(all( | ||
| tokio_unstable, | ||
| feature = "io-uring", | ||
| feature = "rt", | ||
| feature = "fs", | ||
| target_os = "linux" | ||
| ))] | ||
| Async(BoxedOp<T>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is an idea:
What if you perform the io-uring logic in a tokio::spawn task? That way, you can use JoinHandle in both cases.
I think it will simply code a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Blocking(JoinHandle) is crate::blocking::JoinHandle not tokio::runtime::task::join::JoinHandle so some enum would be required anyway, and at that point might as well just have this kind of a boxed future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a re-export of the same thing.
Line 9 in 4714ca1
| pub(crate) use crate::task::JoinHandle; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One reason that I suggest this is that the io-uring logic performs multiple writes in a loop to write the entire buffer, which requires the end-user to await flush or write on the file to make progress. However, the implementation we have today does not require the user to interact with the file to make progress - it happens in the background automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it, I'll change it.
Out of curiosity, general rule is that for a future to keep making progress, user needs to keep polling it. I feel like not having to do that specifically for fs::File is just an implementation detail, and users shouldn't rely on that always being the case. So i guess my question is how much is it worth preserving old behavior like this when implementing new features, if it comes at the cost of (small) performance overhead? relevant XKCD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to adjust mocks to accommodate non-blocking spawning https://github.com/tokio-rs/tokio/actions/runs/19464918802/job/55697554687 b91836c - can you doublecheck im doing it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of thing is always a hard question. Though I would say that I'm pretty sure people rely on writes continuing even if the file is dropped, and it would be very hard to change that.
|
If someone can doublecheck that what I added to Also looks like GCP is breaking the CI :/ |
| // Handle not present in some tests? | ||
| if let Ok(handle) = Handle::try_current() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surely if the handle is not present, then spawn_mandatory_blocking would also fail. What tests is this?
| /// Issue a write at `file_offset` from the provided `buf`. To use current file cursor, set `file_offset` to `-1` or `u64::MAX`. | ||
| pub(crate) fn write_at(fd: ArcFd, buf: blocking::Buf, file_offset: u64) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a feature that relies on a flag IORING_FEAT_RW_CUR_POS being present. We should probably make sure to check this feature somehow.
| use std::sync::Arc; | ||
| use std::{ffi::CString, io, path::Path}; | ||
|
|
||
| pub(crate) type ArcFd = Arc<dyn AsRawFd + Send + Sync + 'static>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of introducing this, could we make the Write op generic instead? This avoids allocating a useless Arc in fs::write.
| #[allow(unused_mut)] | ||
| let mut task_join_handle = inner.poll_write_inner((std, buf), seek)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| #[allow(unused_mut)] | |
| let mut task_join_handle = inner.poll_write_inner((std, buf), seek)?; | |
| let task_join_handle = inner.poll_write_inner((std, buf), seek)?; |
| let handle = spawn(async move { | ||
| loop { | ||
| let op = Op::write_at(fd, buf, u64::MAX); | ||
| let (r, _buf, _fd) = op.await; | ||
| buf = _buf; | ||
| fd = _fd; | ||
| match r { | ||
| Ok(_) if buf.is_empty() => { | ||
| break (Operation::Write(Ok(())), buf); | ||
| } | ||
| Ok(0) => { | ||
| break ( | ||
| Operation::Write(Err( | ||
| io::ErrorKind::WriteZero.into(), | ||
| )), | ||
| buf, | ||
| ); | ||
| } | ||
| Ok(_) => continue, // more to write | ||
| Err(e) => break (Operation::Write(Err(e)), buf), | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is indented like 8 or 9 times. Can we move the loop into a separate function to reduce this?
| } | ||
|
|
||
| inner.state = State::Busy(blocking_task_join_handle); | ||
| inner.state = State::Busy(task_join_handle.unwrap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing code returns io::Error::new(io::ErrorKind::Other, "background task failed") instead of panicking here. Please follow that pattern. If there are not tests for this, please add a test that would have caught this mistake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't a panic on the io error, its an unwrap for task_join_handle: Option<_>, which should always be present. It's an Option to allow falling back from uring to regular writes
| fn poll_write_inner( | ||
| &self, | ||
| data: (Arc<StdFile>, Buf), | ||
| seek: Option<SeekFrom>, | ||
| ) -> io::Result<JoinHandle<(Operation, Buf)>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot of duplication between normal and vectored writes. The logic that actually writes is the same in both cases, so let's avoid this duplication. Please refactor to avoid this.
Motivation
Currently IO operations for
fs::Fileis implemented usingspawn_blockingto perform IO, which is not great for an async runtime. This PR implements IO operations usingio_uring. Thefs::writefunction already usesio_uringfor async operations, and this PR extends the uring functionality intoimpl AsyncWrite for fs::File.For full discussion see here #7684 and subsequently on discord https://discord.com/channels/500028886025895936/810724255046172692/1427735341003051079
Solution
The
fs::Filestruct already implements an internal buffer when writing usingspawn_blocking, which is similar to whatio_uringrequires. Therefore inpoll_write, we can check ifio_uringis initialized and supported, and if not fall back to thespawn_blockingimplementation, without requiring any API changes.This PR only implements
AsyncWrite, and once this PR is accepted, I am happy to also open a PR forAsyncRead for Fileusing the same structure.