summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/io/lib.rs73
1 files changed, 43 insertions, 30 deletions
diff --git a/ext/io/lib.rs b/ext/io/lib.rs
index eb26c495a..038176b23 100644
--- a/ext/io/lib.rs
+++ b/ext/io/lib.rs
@@ -3,6 +3,7 @@
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::unsync::spawn_blocking;
+use deno_core::unsync::TaskQueue;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
@@ -16,7 +17,6 @@ use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceHandle;
use deno_core::ResourceHandleFd;
-use deno_core::TaskQueue;
use fs::FileResource;
use fs::FsError;
use fs::FsResult;
@@ -26,6 +26,7 @@ use once_cell::sync::Lazy;
use std::borrow::Cow;
use std::cell::RefCell;
use std::fs::File as StdFile;
+use std::future::Future;
use std::io;
use std::io::ErrorKind;
use std::io::Read;
@@ -309,7 +310,7 @@ pub struct StdFileResourceInner {
cell: RefCell<Option<StdFile>>,
// Used to keep async actions in order and only allow one
// to occur at a time
- cell_async_task_queue: TaskQueue,
+ cell_async_task_queue: Rc<TaskQueue>,
handle: ResourceHandleFd,
}
@@ -339,48 +340,60 @@ impl StdFileResourceInner {
}
}
- async fn with_inner_blocking_task<F, R: 'static + Send>(&self, action: F) -> R
+ fn with_inner_blocking_task<F, R: 'static + Send>(
+ &self,
+ action: F,
+ ) -> impl Future<Output = R> + '_
where
F: FnOnce(&mut StdFile) -> R + Send + 'static,
{
// we want to restrict this to one async action at a time
- let _permit = self.cell_async_task_queue.acquire().await;
- // we take the value out of the cell, use it on a blocking task,
- // then put it back into the cell when we're done
- let mut did_take = false;
- let mut cell_value = {
- let mut cell = self.cell.borrow_mut();
- match cell.as_mut().unwrap().try_clone().ok() {
- Some(value) => value,
- None => {
- did_take = true;
- cell.take().unwrap()
+ let acquire_fut = self.cell_async_task_queue.acquire();
+ async move {
+ let permit = acquire_fut.await;
+ // we take the value out of the cell, use it on a blocking task,
+ // then put it back into the cell when we're done
+ let mut did_take = false;
+ let mut cell_value = {
+ let mut cell = self.cell.borrow_mut();
+ match cell.as_mut().unwrap().try_clone().ok() {
+ Some(value) => value,
+ None => {
+ did_take = true;
+ cell.take().unwrap()
+ }
}
+ };
+ let (cell_value, result) = spawn_blocking(move || {
+ let result = action(&mut cell_value);
+ (cell_value, result)
+ })
+ .await
+ .unwrap();
+
+ if did_take {
+ // put it back
+ self.cell.borrow_mut().replace(cell_value);
}
- };
- let (cell_value, result) = spawn_blocking(move || {
- let result = action(&mut cell_value);
- (cell_value, result)
- })
- .await
- .unwrap();
- if did_take {
- // put it back
- self.cell.borrow_mut().replace(cell_value);
+ drop(permit); // explicit for clarity
+ result
}
-
- result
}
- async fn with_blocking_task<F, R: 'static + Send>(&self, action: F) -> R
+ fn with_blocking_task<F, R: 'static + Send>(
+ &self,
+ action: F,
+ ) -> impl Future<Output = R>
where
F: FnOnce() -> R + Send + 'static,
{
// we want to restrict this to one async action at a time
- let _permit = self.cell_async_task_queue.acquire().await;
-
- spawn_blocking(action).await.unwrap()
+ let acquire_fut = self.cell_async_task_queue.acquire();
+ async move {
+ let _permit = acquire_fut.await;
+ spawn_blocking(action).await.unwrap()
+ }
}
}