diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/io/lib.rs | 73 |
1 files changed, 43 insertions, 30 deletions
diff --git a/ext/io/lib.rs b/ext/io/lib.rs index eb26c495a..038176b23 100644 --- a/ext/io/lib.rs +++ b/ext/io/lib.rs @@ -3,6 +3,7 @@ use deno_core::error::AnyError; use deno_core::op2; use deno_core::unsync::spawn_blocking; +use deno_core::unsync::TaskQueue; use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::AsyncResult; @@ -16,7 +17,6 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceHandle; use deno_core::ResourceHandleFd; -use deno_core::TaskQueue; use fs::FileResource; use fs::FsError; use fs::FsResult; @@ -26,6 +26,7 @@ use once_cell::sync::Lazy; use std::borrow::Cow; use std::cell::RefCell; use std::fs::File as StdFile; +use std::future::Future; use std::io; use std::io::ErrorKind; use std::io::Read; @@ -309,7 +310,7 @@ pub struct StdFileResourceInner { cell: RefCell<Option<StdFile>>, // Used to keep async actions in order and only allow one // to occur at a time - cell_async_task_queue: TaskQueue, + cell_async_task_queue: Rc<TaskQueue>, handle: ResourceHandleFd, } @@ -339,48 +340,60 @@ impl StdFileResourceInner { } } - async fn with_inner_blocking_task<F, R: 'static + Send>(&self, action: F) -> R + fn with_inner_blocking_task<F, R: 'static + Send>( + &self, + action: F, + ) -> impl Future<Output = R> + '_ where F: FnOnce(&mut StdFile) -> R + Send + 'static, { // we want to restrict this to one async action at a time - let _permit = self.cell_async_task_queue.acquire().await; - // we take the value out of the cell, use it on a blocking task, - // then put it back into the cell when we're done - let mut did_take = false; - let mut cell_value = { - let mut cell = self.cell.borrow_mut(); - match cell.as_mut().unwrap().try_clone().ok() { - Some(value) => value, - None => { - did_take = true; - cell.take().unwrap() + let acquire_fut = self.cell_async_task_queue.acquire(); + async move { + let permit = acquire_fut.await; + // we take the value out of the cell, use it on a blocking task, + // then put it back into the cell when we're done + let mut did_take = false; + let mut cell_value = { + let mut cell = self.cell.borrow_mut(); + match cell.as_mut().unwrap().try_clone().ok() { + Some(value) => value, + None => { + did_take = true; + cell.take().unwrap() + } } + }; + let (cell_value, result) = spawn_blocking(move || { + let result = action(&mut cell_value); + (cell_value, result) + }) + .await + .unwrap(); + + if did_take { + // put it back + self.cell.borrow_mut().replace(cell_value); } - }; - let (cell_value, result) = spawn_blocking(move || { - let result = action(&mut cell_value); - (cell_value, result) - }) - .await - .unwrap(); - if did_take { - // put it back - self.cell.borrow_mut().replace(cell_value); + drop(permit); // explicit for clarity + result } - - result } - async fn with_blocking_task<F, R: 'static + Send>(&self, action: F) -> R + fn with_blocking_task<F, R: 'static + Send>( + &self, + action: F, + ) -> impl Future<Output = R> where F: FnOnce() -> R + Send + 'static, { // we want to restrict this to one async action at a time - let _permit = self.cell_async_task_queue.acquire().await; - - spawn_blocking(action).await.unwrap() + let acquire_fut = self.cell_async_task_queue.acquire(); + async move { + let _permit = acquire_fut.await; + spawn_blocking(action).await.unwrap() + } } } |