-
-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Implement uring in AsyncWrite for fs::File
#7713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
d9d2ddc
093070a
6856207
863a69c
ea4040d
226fcf8
937da2d
c30a4dd
b91836c
58e9b0b
9504e53
daa0532
8cf4ca3
a72bf97
9c7387f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,13 @@ use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; | |
| #[cfg(not(test))] | ||
| use std::fs::File as StdFile; | ||
|
|
||
| cfg_io_uring! { | ||
| #[cfg(test)] | ||
| use super::mocks::spawn; | ||
| #[cfg(not(test))] | ||
| use crate::spawn; | ||
| } | ||
|
|
||
| /// A reference to an open file on the filesystem. | ||
| /// | ||
| /// This is a specialized version of [`std::fs::File`] for usage from the | ||
|
|
@@ -753,20 +760,10 @@ impl AsyncWrite for File { | |
| let n = buf.copy_from(src, me.max_buf_size); | ||
| let std = me.std.clone(); | ||
|
|
||
| let blocking_task_join_handle = spawn_mandatory_blocking(move || { | ||
| let res = if let Some(seek) = seek { | ||
| (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) | ||
| } else { | ||
| buf.write_to(&mut &*std) | ||
| }; | ||
|
|
||
| (Operation::Write(res), buf) | ||
| }) | ||
| .ok_or_else(|| { | ||
| io::Error::new(io::ErrorKind::Other, "background task failed") | ||
| })?; | ||
| #[allow(unused_mut)] | ||
| let mut task_join_handle = inner.poll_write_inner((std, buf), seek)?; | ||
|
|
||
| inner.state = State::Busy(blocking_task_join_handle); | ||
| inner.state = State::Busy(task_join_handle); | ||
|
|
||
| return Poll::Ready(Ok(n)); | ||
| } | ||
|
|
@@ -824,20 +821,88 @@ impl AsyncWrite for File { | |
| let n = buf.copy_from_bufs(bufs, me.max_buf_size); | ||
| let std = me.std.clone(); | ||
|
|
||
| let blocking_task_join_handle = spawn_mandatory_blocking(move || { | ||
| let res = if let Some(seek) = seek { | ||
| (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) | ||
| } else { | ||
| buf.write_to(&mut &*std) | ||
| }; | ||
| #[allow(unused_mut)] | ||
| let mut data = Some((std, buf)); | ||
|
|
||
| let mut task_join_handle: Option<JoinHandle<_>> = None; | ||
|
|
||
| #[cfg(all( | ||
| tokio_unstable, | ||
| feature = "io-uring", | ||
| feature = "rt", | ||
| feature = "fs", | ||
| target_os = "linux" | ||
| ))] | ||
| { | ||
| use crate::runtime::Handle; | ||
|
|
||
| // Handle not present in some tests? | ||
| if let Ok(handle) = Handle::try_current() { | ||
|
Comment on lines
+839
to
+840
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Surely if the handle is not present, then |
||
| if handle.inner.driver().io().check_and_init()? { | ||
| task_join_handle = { | ||
| use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op}; | ||
|
|
||
| let (std, mut buf) = data.take().unwrap(); | ||
| if let Some(seek) = seek { | ||
| // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset | ||
| // seeking only modifies kernel metadata and does not block, so we can do it here | ||
| (&*std).seek(seek).map_err(|e| { | ||
| io::Error::new( | ||
| e.kind(), | ||
| format!("failed to seek before write: {e}"), | ||
| ) | ||
| })?; | ||
| } | ||
|
|
||
| let mut fd: ArcFd = std; | ||
| let handle = spawn(async move { | ||
| loop { | ||
| let op = Op::write_at(fd, buf, u64::MAX); | ||
| let (r, _buf, _fd) = op.await; | ||
| buf = _buf; | ||
| fd = _fd; | ||
| match r { | ||
| Ok(_) if buf.is_empty() => { | ||
| break (Operation::Write(Ok(())), buf); | ||
| } | ||
| Ok(0) => { | ||
| break ( | ||
| Operation::Write(Err( | ||
| io::ErrorKind::WriteZero.into(), | ||
| )), | ||
| buf, | ||
| ); | ||
| } | ||
| Ok(_) => continue, // more to write | ||
| Err(e) => break (Operation::Write(Err(e)), buf), | ||
| } | ||
| } | ||
| }); | ||
|
Comment on lines
+858
to
+880
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is indented like 8 or 9 times. Can we move the loop into a separate function to reduce this? |
||
|
|
||
| Some(handle) | ||
| }; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| (Operation::Write(res), buf) | ||
| }) | ||
| .ok_or_else(|| { | ||
| io::Error::new(io::ErrorKind::Other, "background task failed") | ||
| })?; | ||
| if let Some((std, mut buf)) = data { | ||
| task_join_handle = Some( | ||
| spawn_mandatory_blocking(move || { | ||
| let res = if let Some(seek) = seek { | ||
| (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) | ||
| } else { | ||
| buf.write_to(&mut &*std) | ||
| }; | ||
|
|
||
| (Operation::Write(res), buf) | ||
| }) | ||
| .ok_or_else(|| { | ||
| io::Error::new(io::ErrorKind::Other, "background task failed") | ||
| })?, | ||
| ); | ||
| } | ||
|
|
||
| inner.state = State::Busy(blocking_task_join_handle); | ||
| inner.state = State::Busy(task_join_handle.unwrap()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The existing code returns
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this isn't a panic on the io error, its an unwrap for |
||
|
|
||
| return Poll::Ready(Ok(n)); | ||
| } | ||
|
|
@@ -989,6 +1054,93 @@ impl Inner { | |
| Operation::Seek(_) => Poll::Ready(Ok(())), | ||
| } | ||
| } | ||
|
|
||
| fn poll_write_inner( | ||
| &self, | ||
| data: (Arc<StdFile>, Buf), | ||
| seek: Option<SeekFrom>, | ||
| ) -> io::Result<JoinHandle<(Operation, Buf)>> { | ||
|
Comment on lines
+1058
to
+1062
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a lot of duplication between normal and vectored writes. The logic that actually writes is the same in both cases, so let's avoid this duplication. Please refactor to avoid this. |
||
| #[allow(unused_mut)] | ||
| let mut data = Some(data); | ||
| let mut task_join_handle = None; | ||
|
|
||
| #[cfg(all( | ||
| tokio_unstable, | ||
| feature = "io-uring", | ||
| feature = "rt", | ||
| feature = "fs", | ||
| target_os = "linux" | ||
| ))] | ||
| { | ||
| use crate::runtime::Handle; | ||
|
|
||
| // Handle not present in some tests? | ||
| if let Ok(handle) = Handle::try_current() { | ||
| if handle.inner.driver().io().check_and_init()? { | ||
| task_join_handle = { | ||
| use crate::{io::uring::utils::ArcFd, runtime::driver::op::Op}; | ||
|
|
||
| let (std, mut buf) = data.take().unwrap(); | ||
| if let Some(seek) = seek { | ||
| // we do std seek before a write, so we can always use u64::MAX (current cursor) for the file offset | ||
| // seeking only modifies kernel metadata and does not block, so we can do it here | ||
| (&*std).seek(seek).map_err(|e| { | ||
| io::Error::new( | ||
| e.kind(), | ||
| format!("failed to seek before write: {e}"), | ||
| ) | ||
| })?; | ||
| } | ||
|
|
||
| let mut fd: ArcFd = std; | ||
| let handle = spawn(async move { | ||
| loop { | ||
| let op = Op::write_at(fd, buf, u64::MAX); | ||
| let (r, _buf, _fd) = op.await; | ||
| buf = _buf; | ||
| fd = _fd; | ||
| match r { | ||
| Ok(_) if buf.is_empty() => { | ||
| break (Operation::Write(Ok(())), buf); | ||
| } | ||
| Ok(0) => { | ||
| break ( | ||
| Operation::Write(Err(io::ErrorKind::WriteZero.into())), | ||
| buf, | ||
| ); | ||
| } | ||
|
|
||
| Ok(_) => continue, // more to write | ||
| Err(e) => break (Operation::Write(Err(e)), buf), | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| Some(handle) | ||
| }; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if let Some((std, mut buf)) = data { | ||
| task_join_handle = { | ||
| let handle = spawn_mandatory_blocking(move || { | ||
| let res = if let Some(seek) = seek { | ||
| (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) | ||
| } else { | ||
| buf.write_to(&mut &*std) | ||
| }; | ||
|
|
||
| (Operation::Write(res), buf) | ||
| }) | ||
| .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "background task failed"))?; | ||
|
|
||
| Some(handle) | ||
| }; | ||
| } | ||
|
|
||
| Ok(task_join_handle.unwrap()) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,10 @@ | ||
| use std::os::fd::AsRawFd; | ||
| use std::os::unix::ffi::OsStrExt; | ||
| use std::sync::Arc; | ||
| use std::{ffi::CString, io, path::Path}; | ||
|
|
||
| pub(crate) type ArcFd = Arc<dyn AsRawFd + Send + Sync + 'static>; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of introducing this, could we make the |
||
|
|
||
| pub(crate) fn cstr(p: &Path) -> io::Result<CString> { | ||
| Ok(CString::new(p.as_os_str().as_bytes())?) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.