diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-12-16 17:14:12 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-16 17:14:12 +0100 |
commit | 6984b63f2f3c8d0819fe2dced8252a81f3400ae7 (patch) | |
tree | 5201bc962f913927409ae2770aca48ffa3aaaa34 /runtime/ops/io.rs | |
parent | 9fe26f8ca189ac81d9c20c454b9dbfa5e1011c3f (diff) |
refactor: rewrite ops to use ResourceTable2 (#8512)
This commit migrates all ops to use new resource table
and "AsyncRefCell".
Old implementation of resource table was completely
removed and all code referencing it was updated to use
new system.
Diffstat (limited to 'runtime/ops/io.rs')
-rw-r--r-- | runtime/ops/io.rs | 633 |
1 files changed, 386 insertions, 247 deletions
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 0f8af905a..de56f5b55 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -7,26 +7,29 @@ use deno_core::error::bad_resource_id; use deno_core::error::resource_unavailable; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; -use deno_core::futures::ready; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::JsRuntime; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use std::borrow::Cow; use std::cell::RefCell; -use std::collections::HashMap; -use std::pin::Pin; use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::Context; -use std::task::Poll; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; +use tokio::net::tcp; use tokio::net::TcpStream; use tokio_rustls::client::TlsStream as ClientTlsStream; use tokio_rustls::server::TlsStream as ServerTlsStream; -#[cfg(not(windows))] +#[cfg(unix)] use std::os::unix::io::FromRawFd; #[cfg(windows)] @@ -94,26 +97,28 @@ pub fn init(rt: &mut JsRuntime) { } pub fn get_stdio() -> ( - Option<StreamResourceHolder>, - Option<StreamResourceHolder>, - Option<StreamResourceHolder>, + Option<StreamResource>, + Option<StreamResource>, + Option<StreamResource>, ) { - let stdin = get_stdio_stream(&STDIN_HANDLE); - let stdout = get_stdio_stream(&STDOUT_HANDLE); - let stderr = get_stdio_stream(&STDERR_HANDLE); + let stdin = get_stdio_stream(&STDIN_HANDLE, "stdin"); + let stdout = get_stdio_stream(&STDOUT_HANDLE, "stdout"); + let stderr = get_stdio_stream(&STDERR_HANDLE, "stderr"); (stdin, stdout, stderr) } fn get_stdio_stream( handle: &Option<std::fs::File>, -) -> Option<StreamResourceHolder> { + name: &str, +) -> Option<StreamResource> { match handle { None => None, Some(file_handle) => match file_handle.try_clone() { - Ok(clone) => Some(StreamResourceHolder::new(StreamResource::FsFile( - Some((tokio::fs::File::from_std(clone), FileMetadata::default())), - ))), + Ok(clone) => { + let tokio_file = tokio::fs::File::from_std(clone); + Some(StreamResource::stdio(tokio_file, name)) + } Err(_e) => None, }, } @@ -137,100 +142,317 @@ pub struct FileMetadata { pub tty: TTYMetadata, } -pub struct StreamResourceHolder { - pub resource: StreamResource, - waker: HashMap<usize, futures::task::AtomicWaker>, - waker_counter: AtomicUsize, +#[derive(Debug)] +pub struct FullDuplexResource<R, W> { + rd: AsyncRefCell<R>, + wr: AsyncRefCell<W>, + // When a full-duplex resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures should be attached to this cancel handle. + cancel_handle: CancelHandle, } -impl StreamResourceHolder { - pub fn new(resource: StreamResource) -> StreamResourceHolder { - StreamResourceHolder { - resource, - // Atleast one task is expecter for the resource - waker: HashMap::with_capacity(1), - // Tracks wakers Ids - waker_counter: AtomicUsize::new(0), +impl<R: 'static, W: 'static> FullDuplexResource<R, W> { + pub fn new((rd, wr): (R, W)) -> Self { + Self { + rd: rd.into(), + wr: wr.into(), + cancel_handle: Default::default(), } } -} -impl Drop for StreamResourceHolder { - fn drop(&mut self) { - self.wake_tasks(); + pub fn into_inner(self) -> (R, W) { + (self.rd.into_inner(), self.wr.into_inner()) + } + + pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> { + RcRef::map(self, |r| &r.wr).borrow_mut() + } + + pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> { + RcRef::map(self, |r| &r.cancel_handle) + } + + pub fn cancel_read_ops(&self) { + self.cancel_handle.cancel() } } -impl StreamResourceHolder { - pub fn track_task(&mut self, cx: &Context) -> Result<usize, AnyError> { - let waker = futures::task::AtomicWaker::new(); - waker.register(cx.waker()); - // Its OK if it overflows - let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed); - self.waker.insert(task_waker_id, waker); - Ok(task_waker_id) +impl<R, W> FullDuplexResource<R, W> +where + R: AsyncRead + Unpin + 'static, + W: AsyncWrite + Unpin + 'static, +{ + async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { + let mut rd = self.rd_borrow_mut().await; + let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) } - pub fn wake_tasks(&mut self) { - for waker in self.waker.values() { - waker.wake(); - } + async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + let mut wr = self.wr_borrow_mut().await; + let nwritten = wr.write(buf).await?; + Ok(nwritten) } +} - pub fn untrack_task(&mut self, task_waker_id: usize) { - self.waker.remove(&task_waker_id); +pub type TcpStreamResource = + FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>; + +impl Resource for TcpStreamResource { + fn name(&self) -> Cow<str> { + "tcpStream".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); } } -pub enum StreamResource { - FsFile(Option<(tokio::fs::File, FileMetadata)>), - TcpStream(Option<tokio::net::TcpStream>), - #[cfg(not(windows))] - UnixStream(tokio::net::UnixStream), - ServerTlsStream(Box<ServerTlsStream<TcpStream>>), - ClientTlsStream(Box<ClientTlsStream<TcpStream>>), - ChildStdin(tokio::process::ChildStdin), - ChildStdout(tokio::process::ChildStdout), - ChildStderr(tokio::process::ChildStderr), +#[derive(Default)] +pub struct StreamResource { + pub fs_file: + Option<AsyncRefCell<(Option<tokio::fs::File>, Option<FileMetadata>)>>, + + #[cfg(unix)] + pub unix_stream: Option<AsyncRefCell<tokio::net::UnixStream>>, + + child_stdin: Option<AsyncRefCell<tokio::process::ChildStdin>>, + + child_stdout: Option<AsyncRefCell<tokio::process::ChildStdout>>, + + child_stderr: Option<AsyncRefCell<tokio::process::ChildStderr>>, + + client_tls_stream: Option<AsyncRefCell<ClientTlsStream<TcpStream>>>, + + server_tls_stream: Option<AsyncRefCell<ServerTlsStream<TcpStream>>>, + + cancel: CancelHandle, + name: String, +} + +impl std::fmt::Debug for StreamResource { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "StreamResource") + } } -trait UnpinAsyncRead: AsyncRead + Unpin {} -trait UnpinAsyncWrite: AsyncWrite + Unpin {} +impl StreamResource { + pub fn stdio(fs_file: tokio::fs::File, name: &str) -> Self { + Self { + fs_file: Some(AsyncRefCell::new(( + Some(fs_file), + Some(FileMetadata::default()), + ))), + name: name.to_string(), + ..Default::default() + } + } + + pub fn fs_file(fs_file: tokio::fs::File) -> Self { + Self { + fs_file: Some(AsyncRefCell::new(( + Some(fs_file), + Some(FileMetadata::default()), + ))), + name: "fsFile".to_string(), + ..Default::default() + } + } + + #[cfg(unix)] + pub fn unix_stream(unix_stream: tokio::net::UnixStream) -> Self { + Self { + unix_stream: Some(AsyncRefCell::new(unix_stream)), + name: "unixStream".to_string(), + ..Default::default() + } + } + + pub fn child_stdout(child: tokio::process::ChildStdout) -> Self { + Self { + child_stdout: Some(AsyncRefCell::new(child)), + name: "childStdout".to_string(), + ..Default::default() + } + } -impl<T: AsyncRead + Unpin> UnpinAsyncRead for T {} -impl<T: AsyncWrite + Unpin> UnpinAsyncWrite for T {} + pub fn child_stderr(child: tokio::process::ChildStderr) -> Self { + Self { + child_stderr: Some(AsyncRefCell::new(child)), + name: "childStderr".to_string(), + ..Default::default() + } + } + + pub fn child_stdin(child: tokio::process::ChildStdin) -> Self { + Self { + child_stdin: Some(AsyncRefCell::new(child)), + name: "childStdin".to_string(), + ..Default::default() + } + } + + pub fn client_tls_stream(stream: ClientTlsStream<TcpStream>) -> Self { + Self { + client_tls_stream: Some(AsyncRefCell::new(stream)), + name: "clientTlsStream".to_string(), + ..Default::default() + } + } -/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait -/// but uses an `AnyError` error instead of `std::io:Error` -pub trait DenoAsyncRead { - fn poll_read( - &mut self, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll<Result<usize, AnyError>>; + pub fn server_tls_stream(stream: ServerTlsStream<TcpStream>) -> Self { + Self { + server_tls_stream: Some(AsyncRefCell::new(stream)), + name: "serverTlsStream".to_string(), + ..Default::default() + } + } + + async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { + // TODO(bartlomieju): in the future, it would be better for `StreamResource` + // to be an enum instead a struct with many `Option` fields, however I + // wasn't able to get it to work with `AsyncRefCell`s. + if self.fs_file.is_some() { + debug_assert!(self.child_stdin.is_none()); + debug_assert!(self.child_stdout.is_none()); + debug_assert!(self.child_stderr.is_none()); + debug_assert!(self.server_tls_stream.is_none()); + debug_assert!(self.client_tls_stream.is_none()); + let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = (*fs_file).0.as_mut().unwrap().read(buf).await?; + return Ok(nwritten); + } else if self.child_stdout.is_some() { + debug_assert!(self.child_stdin.is_none()); + debug_assert!(self.child_stderr.is_none()); + debug_assert!(self.server_tls_stream.is_none()); + debug_assert!(self.client_tls_stream.is_none()); + let mut child_stdout = + RcRef::map(&self, |r| r.child_stdout.as_ref().unwrap()) + .borrow_mut() + .await; + let cancel = RcRef::map(self, |r| &r.cancel); + let nread = child_stdout.read(buf).try_or_cancel(cancel).await?; + return Ok(nread); + } else if self.child_stderr.is_some() { + debug_assert!(self.child_stdin.is_none()); + debug_assert!(self.server_tls_stream.is_none()); + debug_assert!(self.client_tls_stream.is_none()); + let mut child_stderr = + RcRef::map(&self, |r| r.child_stderr.as_ref().unwrap()) + .borrow_mut() + .await; + let cancel = RcRef::map(self, |r| &r.cancel); + let nread = child_stderr.read(buf).try_or_cancel(cancel).await?; + return Ok(nread); + } else if self.client_tls_stream.is_some() { + debug_assert!(self.server_tls_stream.is_none()); + let mut client_tls_stream = + RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let cancel = RcRef::map(self, |r| &r.cancel); + let nread = client_tls_stream.read(buf).try_or_cancel(cancel).await?; + return Ok(nread); + } else if self.server_tls_stream.is_some() { + let mut server_tls_stream = + RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let cancel = RcRef::map(self, |r| &r.cancel); + let nread = server_tls_stream.read(buf).try_or_cancel(cancel).await?; + return Ok(nread); + } + + #[cfg(unix)] + if self.unix_stream.is_some() { + let mut unix_stream = + RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let cancel = RcRef::map(self, |r| &r.cancel); + let nread = unix_stream.read(buf).try_or_cancel(cancel).await?; + return Ok(nread); + } + + Err(bad_resource_id()) + } + + async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + // TODO(bartlomieju): in the future, it would be better for `StreamResource` + // to be an enum instead a struct with many `Option` fields, however I + // wasn't able to get it to work with `AsyncRefCell`s. + if self.fs_file.is_some() { + debug_assert!(self.child_stdin.is_none()); + debug_assert!(self.child_stdout.is_none()); + debug_assert!(self.child_stderr.is_none()); + debug_assert!(self.server_tls_stream.is_none()); + debug_assert!(self.client_tls_stream.is_none()); + let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = (*fs_file).0.as_mut().unwrap().write(buf).await?; + (*fs_file).0.as_mut().unwrap().flush().await?; + return Ok(nwritten); + } else if self.child_stdin.is_some() { + debug_assert!(self.child_stdout.is_none()); + debug_assert!(self.child_stderr.is_none()); + debug_assert!(self.server_tls_stream.is_none()); + debug_assert!(self.client_tls_stream.is_none()); + let mut child_stdin = + RcRef::map(&self, |r| r.child_stdin.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = child_stdin.write(buf).await?; + child_stdin.flush().await?; + return Ok(nwritten); + } else if self.client_tls_stream.is_some() { + debug_assert!(self.server_tls_stream.is_none()); + let mut client_tls_stream = + RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = client_tls_stream.write(buf).await?; + client_tls_stream.flush().await?; + return Ok(nwritten); + } else if self.server_tls_stream.is_some() { + let mut server_tls_stream = + RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = server_tls_stream.write(buf).await?; + server_tls_stream.flush().await?; + return Ok(nwritten); + } + + #[cfg(unix)] + if self.unix_stream.is_some() { + let mut unix_stream = + RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = unix_stream.write(buf).await?; + unix_stream.flush().await?; + return Ok(nwritten); + } + + Err(bad_resource_id()) + } } -impl DenoAsyncRead for StreamResource { - fn poll_read( - &mut self, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll<Result<usize, AnyError>> { - use StreamResource::*; - let f: &mut dyn UnpinAsyncRead = match self { - FsFile(Some((f, _))) => f, - FsFile(None) => return Poll::Ready(Err(resource_unavailable())), - TcpStream(Some(f)) => f, - #[cfg(not(windows))] - UnixStream(f) => f, - ClientTlsStream(f) => f, - ServerTlsStream(f) => f, - ChildStdout(f) => f, - ChildStderr(f) => f, - _ => return Err(bad_resource_id()).into(), - }; - let v = ready!(Pin::new(f).poll_read(cx, buf))?; - Ok(v).into() +impl Resource for StreamResource { + fn name(&self) -> Cow<str> { + self.name.clone().into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() } } @@ -263,92 +485,26 @@ pub fn op_read( }) } else { let mut zero_copy = zero_copy[0].clone(); - MinimalOp::Async( - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let resource_holder = state + MinimalOp::Async({ + async move { + let resource = state + .borrow() .resource_table - .get_mut::<StreamResourceHolder>(rid as u32) + .get_any(rid as u32) .ok_or_else(bad_resource_id)?; - - let mut task_tracker_id: Option<usize> = None; - let nread = match resource_holder.resource.poll_read(cx, &mut zero_copy) + let nread = if let Some(stream) = + resource.downcast_rc::<TcpStreamResource>() { - Poll::Ready(t) => { - if let Some(id) = task_tracker_id { - resource_holder.untrack_task(id); - } - t - } - Poll::Pending => { - task_tracker_id.replace(resource_holder.track_task(cx)?); - return Poll::Pending; - } - }?; - Poll::Ready(Ok(nread as i32)) - }) - .boxed_local(), - ) - } -} - -/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait -/// but uses an `AnyError` error instead of `std::io:Error` -pub trait DenoAsyncWrite { - fn poll_write( - &mut self, - cx: &mut Context, - buf: &[u8], - ) -> Poll<Result<usize, AnyError>>; - - fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>; - - fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>; -} - -impl DenoAsyncWrite for StreamResource { - fn poll_write( - &mut self, - cx: &mut Context, - buf: &[u8], - ) -> Poll<Result<usize, AnyError>> { - use StreamResource::*; - let f: &mut dyn UnpinAsyncWrite = match self { - FsFile(Some((f, _))) => f, - FsFile(None) => return Poll::Pending, - TcpStream(Some(f)) => f, - #[cfg(not(windows))] - UnixStream(f) => f, - ClientTlsStream(f) => f, - ServerTlsStream(f) => f, - ChildStdin(f) => f, - _ => return Err(bad_resource_id()).into(), - }; - - let v = ready!(Pin::new(f).poll_write(cx, buf))?; - Ok(v).into() - } - - fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>> { - use StreamResource::*; - let f: &mut dyn UnpinAsyncWrite = match self { - FsFile(Some((f, _))) => f, - FsFile(None) => return Poll::Pending, - TcpStream(Some(f)) => f, - #[cfg(not(windows))] - UnixStream(f) => f, - ClientTlsStream(f) => f, - ServerTlsStream(f) => f, - ChildStdin(f) => f, - _ => return Err(bad_resource_id()).into(), - }; - - ready!(Pin::new(f).poll_flush(cx))?; - Ok(()).into() - } - - fn poll_close(&mut self, _cx: &mut Context) -> Poll<Result<(), AnyError>> { - unimplemented!() + stream.read(&mut zero_copy).await? + } else if let Some(stream) = resource.downcast_rc::<StreamResource>() { + stream.clone().read(&mut zero_copy).await? + } else { + return Err(bad_resource_id()); + }; + Ok(nread as i32) + } + .boxed_local() + }) } } @@ -381,93 +537,76 @@ pub fn op_write( }) } else { let zero_copy = zero_copy[0].clone(); - MinimalOp::Async( + MinimalOp::Async({ async move { - let nwritten = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let resource_holder = state - .resource_table - .get_mut::<StreamResourceHolder>(rid as u32) - .ok_or_else(bad_resource_id)?; - resource_holder.resource.poll_write(cx, &zero_copy) - }) - .await?; - - // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2 - // and the reasons for the need to explicitly flush are not fully known. - // Figure out why it's needed and preferably remove it. - // https://github.com/denoland/deno/issues/3565 - poll_fn(|cx| { - let mut state = state.borrow_mut(); - let resource_holder = state - .resource_table - .get_mut::<StreamResourceHolder>(rid as u32) - .ok_or_else(bad_resource_id)?; - resource_holder.resource.poll_flush(cx) - }) - .await?; - + let resource = state + .borrow() + .resource_table + .get_any(rid as u32) + .ok_or_else(bad_resource_id)?; + let nwritten = if let Some(stream) = + resource.downcast_rc::<TcpStreamResource>() + { + stream.write(&zero_copy).await? + } else if let Some(stream) = resource.downcast_rc::<StreamResource>() { + stream.clone().write(&zero_copy).await? + } else { + return Err(bad_resource_id()); + }; Ok(nwritten as i32) } - .boxed_local(), - ) + .boxed_local() + }) } } -/// Helper function for operating on a std::fs::File stored in the resource table. -/// -/// We store file system file resources as tokio::fs::File, so this is a little -/// utility function that gets a std::fs:File when you need to do blocking -/// operations. -/// -/// Returns ErrorKind::Busy if the resource is being used by another op. pub fn std_file_resource<F, T>( state: &mut OpState, rid: u32, mut f: F, ) -> Result<T, AnyError> where - F: FnMut( - Result<&mut std::fs::File, &mut StreamResource>, - ) -> Result<T, AnyError>, + F: FnMut(Result<&mut std::fs::File, ()>) -> Result<T, AnyError>, { // First we look up the rid in the resource table. - let mut r = state.resource_table.get_mut::<StreamResourceHolder>(rid); - if let Some(ref mut resource_holder) = r { - // Sync write only works for FsFile. It doesn't make sense to do this - // for non-blocking sockets. So we error out if not FsFile. - match &mut resource_holder.resource { - StreamResource::FsFile(option_file_metadata) => { - // The object in the resource table is a tokio::fs::File - but in - // order to do a blocking write on it, we must turn it into a - // std::fs::File. Hopefully this code compiles down to nothing. - if let Some((tokio_file, metadata)) = option_file_metadata.take() { - match tokio_file.try_into_std() { - Ok(mut std_file) => { - let result = f(Ok(&mut std_file)); - // Turn the std_file handle back into a tokio file, put it back - // in the resource table. - let tokio_file = tokio::fs::File::from_std(std_file); - resource_holder.resource = - StreamResource::FsFile(Some((tokio_file, metadata))); - // return the result. - result - } - Err(tokio_file) => { - // This function will return an error containing the file if - // some operation is in-flight. - resource_holder.resource = - StreamResource::FsFile(Some((tokio_file, metadata))); - Err(resource_unavailable()) - } - } - } else { - Err(resource_unavailable()) - } + let resource = state + .resource_table + .get::<StreamResource>(rid) + .ok_or_else(bad_resource_id)?; + + // Sync write only works for FsFile. It doesn't make sense to do this + // for non-blocking sockets. So we error out if not FsFile. + if resource.fs_file.is_none() { + return f(Err(())); + } + + // The object in the resource table is a tokio::fs::File - but in + // order to do a blocking write on it, we must turn it into a + // std::fs::File. Hopefully this code compiles down to nothing. + + let fs_file_resource = + RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut(); + + if let Some(mut fs_file) = fs_file_resource { + let tokio_file = fs_file.0.take().unwrap(); + match tokio_file.try_into_std() { + Ok(mut std_file) => { + let result = f(Ok(&mut std_file)); + // Turn the std_file handle back into a tokio file, put it back + // in the resource table. + let tokio_file = tokio::fs::File::from_std(std_file); + fs_file.0 = Some(tokio_file); + // return the result. + result + } + Err(tokio_file) => { + // This function will return an error containing the file if + // some operation is in-flight. + fs_file.0 = Some(tokio_file); + Err(resource_unavailable()) } - _ => f(Err(&mut resource_holder.resource)), } } else { - Err(bad_resource_id()) + Err(resource_unavailable()) } } |