From e60cbfadc0c75d48ff004fa8f47d4b5d022cb98c Mon Sep 17 00:00:00 2001 From: David Sherret Date: Wed, 13 Sep 2023 23:36:24 -0400 Subject: refactor: use TaskQueue from deno_unsync (#20485) --- Cargo.lock | 17 ++++++++++---- Cargo.toml | 2 +- ext/io/lib.rs | 73 +++++++++++++++++++++++++++++++++++------------------------ 3 files changed, 57 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2cbcf1368..42b608bb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1074,14 +1074,14 @@ dependencies = [ [[package]] name = "deno_core" -version = "0.210.0" +version = "0.211.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1abaede1f57501d6c92075d678f522aea22794b82523e67db8130ca1a6a1cb5d" +checksum = "2e63f2803555dae13a88f66e0d6b97a5339753fc86651b118f56de3733ba2f57" dependencies = [ "anyhow", "bytes", "deno_ops", - "deno_unsync", + "deno_unsync 0.2.1", "futures", "indexmap 2.0.0", "libc", @@ -1299,7 +1299,7 @@ dependencies = [ "base64 0.13.1", "chrono", "deno_core", - "deno_unsync", + "deno_unsync 0.1.1", "hex", "log", "num-bigint", @@ -1583,6 +1583,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "deno_unsync" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0720e562455d6374a5292baec3fc895ed8bfed0937428e3245e50979924e5b15" +dependencies = [ + "tokio", +] + [[package]] name = "deno_url" version = "0.118.0" diff --git a/Cargo.toml b/Cargo.toml index 595df1b87..707a974ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ repository = "https://github.com/denoland/deno" [workspace.dependencies] deno_ast = { version = "0.29.1", features = ["transpiling"] } -deno_core = { version = "0.210.0" } +deno_core = { version = "0.211.0" } deno_runtime = { version = "0.126.0", path = "./runtime" } napi_sym = { version = "0.48.0", path = "./cli/napi/sym" } diff --git a/ext/io/lib.rs b/ext/io/lib.rs index eb26c495a..038176b23 100644 --- a/ext/io/lib.rs +++ b/ext/io/lib.rs @@ -3,6 +3,7 @@ use deno_core::error::AnyError; use deno_core::op2; use deno_core::unsync::spawn_blocking; +use deno_core::unsync::TaskQueue; use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::AsyncResult; @@ -16,7 +17,6 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceHandle; use deno_core::ResourceHandleFd; -use deno_core::TaskQueue; use fs::FileResource; use fs::FsError; use fs::FsResult; @@ -26,6 +26,7 @@ use once_cell::sync::Lazy; use std::borrow::Cow; use std::cell::RefCell; use std::fs::File as StdFile; +use std::future::Future; use std::io; use std::io::ErrorKind; use std::io::Read; @@ -309,7 +310,7 @@ pub struct StdFileResourceInner { cell: RefCell>, // Used to keep async actions in order and only allow one // to occur at a time - cell_async_task_queue: TaskQueue, + cell_async_task_queue: Rc, handle: ResourceHandleFd, } @@ -339,48 +340,60 @@ impl StdFileResourceInner { } } - async fn with_inner_blocking_task(&self, action: F) -> R + fn with_inner_blocking_task( + &self, + action: F, + ) -> impl Future + '_ where F: FnOnce(&mut StdFile) -> R + Send + 'static, { // we want to restrict this to one async action at a time - let _permit = self.cell_async_task_queue.acquire().await; - // we take the value out of the cell, use it on a blocking task, - // then put it back into the cell when we're done - let mut did_take = false; - let mut cell_value = { - let mut cell = self.cell.borrow_mut(); - match cell.as_mut().unwrap().try_clone().ok() { - Some(value) => value, - None => { - did_take = true; - cell.take().unwrap() + let acquire_fut = self.cell_async_task_queue.acquire(); + async move { + let permit = acquire_fut.await; + // we take the value out of the cell, use it on a blocking task, + // then put it back into the cell when we're done + let mut did_take = false; + let mut cell_value = { + let mut cell = self.cell.borrow_mut(); + match cell.as_mut().unwrap().try_clone().ok() { + Some(value) => value, + None => { + did_take = true; + cell.take().unwrap() + } } + }; + let (cell_value, result) = spawn_blocking(move || { + let result = action(&mut cell_value); + (cell_value, result) + }) + .await + .unwrap(); + + if did_take { + // put it back + self.cell.borrow_mut().replace(cell_value); } - }; - let (cell_value, result) = spawn_blocking(move || { - let result = action(&mut cell_value); - (cell_value, result) - }) - .await - .unwrap(); - if did_take { - // put it back - self.cell.borrow_mut().replace(cell_value); + drop(permit); // explicit for clarity + result } - - result } - async fn with_blocking_task(&self, action: F) -> R + fn with_blocking_task( + &self, + action: F, + ) -> impl Future where F: FnOnce() -> R + Send + 'static, { // we want to restrict this to one async action at a time - let _permit = self.cell_async_task_queue.acquire().await; - - spawn_blocking(action).await.unwrap() + let acquire_fut = self.cell_async_task_queue.acquire(); + async move { + let _permit = acquire_fut.await; + spawn_blocking(action).await.unwrap() + } } } -- cgit v1.2.3