summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/01_core.js9
-rw-r--r--core/bindings.rs57
-rw-r--r--core/lib.deno_core.d.ts8
-rw-r--r--core/lib.rs1
-rw-r--r--core/ops.rs5
-rw-r--r--core/ops_json.rs33
-rw-r--r--core/ops_metrics.rs14
-rw-r--r--core/runtime.rs120
-rw-r--r--runtime/js/40_signals.js5
-rw-r--r--runtime/ops/signal.rs4
10 files changed, 160 insertions, 96 deletions
diff --git a/core/01_core.js b/core/01_core.js
index 75bfc884f..c3fd7cf9d 100644
--- a/core/01_core.js
+++ b/core/01_core.js
@@ -23,6 +23,7 @@
MapPrototypeSet,
PromisePrototypeThen,
ObjectAssign,
+ SymbolFor,
} = window.__bootstrap.primordials;
// Available on start due to bindings.
@@ -43,6 +44,9 @@
const RING_SIZE = 4 * 1024;
const NO_PROMISE = null; // Alias to null is faster than plain nulls
const promiseRing = ArrayPrototypeFill(new Array(RING_SIZE), NO_PROMISE);
+ // TODO(bartlomieju): it future use `v8::Private` so it's not visible
+ // to users. Currently missing bindings.
+ const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
function setPromise(promiseId) {
const idx = promiseId % RING_SIZE;
@@ -135,7 +139,10 @@
const maybeError = opcallAsync(opsCache[opName], promiseId, arg1, arg2);
// Handle sync error (e.g: error parsing args)
if (maybeError) return unwrapOpResult(maybeError);
- return PromisePrototypeThen(setPromise(promiseId), unwrapOpResult);
+ const p = PromisePrototypeThen(setPromise(promiseId), unwrapOpResult);
+ // Save the id on the promise so it can later be ref'ed or unref'ed
+ p[promiseIdSymbol] = promiseId;
+ return p;
}
function opSync(opName, arg1 = null, arg2 = null) {
diff --git a/core/bindings.rs b/core/bindings.rs
index f5eb1705e..b18d24c79 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -36,6 +36,12 @@ lazy_static::lazy_static! {
function: opcall_sync.map_fn_to()
},
v8::ExternalReference {
+ function: ref_op.map_fn_to()
+ },
+ v8::ExternalReference {
+ function: unref_op.map_fn_to()
+ },
+ v8::ExternalReference {
function: set_macrotask_callback.map_fn_to()
},
v8::ExternalReference {
@@ -151,6 +157,8 @@ pub fn initialize_context<'s>(
// Bind functions to Deno.core.*
set_func(scope, core_val, "opcallSync", opcall_sync);
set_func(scope, core_val, "opcallAsync", opcall_async);
+ set_func(scope, core_val, "refOp", ref_op);
+ set_func(scope, core_val, "unrefOp", unref_op);
set_func(
scope,
core_val,
@@ -453,17 +461,56 @@ fn opcall_async<'s>(
state.pending_ops.push(fut);
state.have_unpolled_ops = true;
}
- Op::AsyncUnref(fut) => {
- state.op_state.borrow().tracker.track_unref(op_id);
- state.pending_unref_ops.push(fut);
- state.have_unpolled_ops = true;
- }
Op::NotFound => {
throw_type_error(scope, format!("Unknown op id: {}", op_id));
}
}
}
+fn ref_op<'s>(
+ scope: &mut v8::HandleScope<'s>,
+ args: v8::FunctionCallbackArguments,
+ _rv: v8::ReturnValue,
+) {
+ let state_rc = JsRuntime::state(scope);
+ let mut state = state_rc.borrow_mut();
+
+ let promise_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
+ .map(|l| l.value() as PromiseId)
+ .map_err(Error::from)
+ {
+ Ok(promise_id) => promise_id,
+ Err(err) => {
+ throw_type_error(scope, format!("invalid promise id: {}", err));
+ return;
+ }
+ };
+
+ state.unrefed_ops.remove(&promise_id);
+}
+
+fn unref_op<'s>(
+ scope: &mut v8::HandleScope<'s>,
+ args: v8::FunctionCallbackArguments,
+ _rv: v8::ReturnValue,
+) {
+ let state_rc = JsRuntime::state(scope);
+ let mut state = state_rc.borrow_mut();
+
+ let promise_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
+ .map(|l| l.value() as PromiseId)
+ .map_err(Error::from)
+ {
+ Ok(promise_id) => promise_id,
+ Err(err) => {
+ throw_type_error(scope, format!("invalid promise id: {}", err));
+ return;
+ }
+ };
+
+ state.unrefed_ops.insert(promise_id);
+}
+
fn has_tick_scheduled(
scope: &mut v8::HandleScope,
_args: v8::FunctionCallbackArguments,
diff --git a/core/lib.deno_core.d.ts b/core/lib.deno_core.d.ts
index f33f6164a..59b2df542 100644
--- a/core/lib.deno_core.d.ts
+++ b/core/lib.deno_core.d.ts
@@ -21,6 +21,14 @@ declare namespace Deno {
b?: any,
): Promise<any>;
+ /** Mark following promise as "ref", ie. event loop won't exit
+ * until all "ref" promises are resolved. All async ops are "ref" by default. */
+ function refOp(promiseId: number): void;
+
+ /** Mark following promise as "unref", ie. event loop will exit
+ * if there are only "unref" promises left. */
+ function unrefOps(promiseId: number): void;
+
/**
* Retrieve a list of all registered ops, in the form of a map that maps op
* name to internal numerical op id.
diff --git a/core/lib.rs b/core/lib.rs
index 87994720f..f47db7f2e 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -83,7 +83,6 @@ pub use crate::ops_builtin::op_close;
pub use crate::ops_builtin::op_print;
pub use crate::ops_builtin::op_resources;
pub use crate::ops_json::op_async;
-pub use crate::ops_json::op_async_unref;
pub use crate::ops_json::op_sync;
pub use crate::ops_json::void_op_async;
pub use crate::ops_json::void_op_sync;
diff --git a/core/ops.rs b/core/ops.rs
index 13f001146..6b2c06397 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -80,7 +80,7 @@ where
}
}
-pub type PromiseId = u64;
+pub type PromiseId = i32;
pub type OpAsyncFuture = OpCall<(PromiseId, OpId, OpResult)>;
pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static;
pub type OpId = usize;
@@ -111,9 +111,6 @@ impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> {
pub enum Op {
Sync(OpResult),
Async(OpAsyncFuture),
- /// AsyncUnref is the variation of Async, which doesn't block the program
- /// exiting.
- AsyncUnref(OpAsyncFuture),
NotFound,
}
diff --git a/core/ops_json.rs b/core/ops_json.rs
index b3153763e..ad4aeeb47 100644
--- a/core/ops_json.rs
+++ b/core/ops_json.rs
@@ -67,7 +67,6 @@ where
/// Creates an op that passes data asynchronously using JSON.
///
/// When this op is dispatched, the runtime doesn't exit while processing it.
-/// Use op_async_unref instead if you want to make the runtime exit while processing it.
///
/// The provided function `op_fn` has the following parameters:
/// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op.
@@ -118,38 +117,6 @@ where
})
}
-/// Creates an op that passes data asynchronously using JSON.
-///
-/// When this op is dispatched, the runtime still can exit while processing it.
-///
-/// The other usages are the same as `op_async`.
-pub fn op_async_unref<F, A, B, R, RV>(op_fn: F) -> Box<OpFn>
-where
- F: Fn(Rc<RefCell<OpState>>, A, B) -> R + 'static,
- A: DeserializeOwned,
- B: DeserializeOwned,
- R: Future<Output = Result<RV, Error>> + 'static,
- RV: Serialize + 'static,
-{
- Box::new(move |state, payload| -> Op {
- let op_id = payload.op_id;
- let pid = payload.promise_id;
- // Deserialize args, sync error on failure
- let args = match payload.deserialize() {
- Ok(args) => args,
- Err(err) => {
- return Op::Sync(serialize_op_result(Err::<(), Error>(err), state))
- }
- };
- let (a, b) = args;
-
- use crate::futures::FutureExt;
- let fut = op_fn(state.clone(), a, b)
- .map(move |result| (pid, op_id, serialize_op_result(result, state)));
- Op::AsyncUnref(OpCall::eager(fut))
- })
-}
-
#[cfg(test)]
mod tests {
use super::*;
diff --git a/core/ops_metrics.rs b/core/ops_metrics.rs
index 5c40a47ca..f2e9bc255 100644
--- a/core/ops_metrics.rs
+++ b/core/ops_metrics.rs
@@ -11,10 +11,12 @@ pub struct OpMetrics {
pub ops_dispatched: u64,
pub ops_dispatched_sync: u64,
pub ops_dispatched_async: u64,
+ // TODO(bartlomieju): this field is never updated
pub ops_dispatched_async_unref: u64,
pub ops_completed: u64,
pub ops_completed_sync: u64,
pub ops_completed_async: u64,
+ // TODO(bartlomieju): this field is never updated
pub ops_completed_async_unref: u64,
pub bytes_sent_control: u64,
pub bytes_sent_data: u64,
@@ -84,16 +86,4 @@ impl OpsTracker {
metrics.ops_completed += 1;
metrics.ops_completed_async += 1;
}
-
- pub fn track_unref(&self, id: OpId) {
- let metrics = &mut self.metrics_mut(id);
- metrics.ops_dispatched += 1;
- metrics.ops_dispatched_async_unref += 1;
- }
-
- pub fn track_unref_completed(&self, id: OpId) {
- let metrics = &mut self.metrics_mut(id);
- metrics.ops_completed += 1;
- metrics.ops_completed_async_unref += 1;
- }
}
diff --git a/core/runtime.rs b/core/runtime.rs
index 3af090a1c..ad7f16886 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -29,6 +29,7 @@ use futures::task::AtomicWaker;
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
+use std::collections::HashSet;
use std::ffi::c_void;
use std::mem::forget;
use std::option::Option;
@@ -135,23 +136,6 @@ pub type SharedArrayBufferStore =
pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>;
-struct AsyncOpIterator<'a, 'b, 'c> {
- ops: &'b mut FuturesUnordered<PendingOpFuture>,
- cx: &'a mut Context<'c>,
-}
-
-impl Iterator for AsyncOpIterator<'_, '_, '_> {
- type Item = (PromiseId, OpId, OpResult);
-
- #[inline]
- fn next(&mut self) -> Option<Self::Item> {
- match self.ops.poll_next_unpin(self.cx) {
- Poll::Ready(Some(item)) => Some(item),
- _ => None,
- }
- }
-}
-
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@@ -171,7 +155,7 @@ pub(crate) struct JsRuntimeState {
dyn_module_evaluate_idle_counter: u32,
pub(crate) js_error_create_fn: Rc<JsErrorCreateFn>,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
- pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
+ pub(crate) unrefed_ops: HashSet<i32>,
pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>,
@@ -371,7 +355,7 @@ impl JsRuntime {
js_wasm_streaming_cb: None,
js_error_create_fn,
pending_ops: FuturesUnordered::new(),
- pending_unref_ops: FuturesUnordered::new(),
+ unrefed_ops: HashSet::new(),
shared_array_buffer_store: options.shared_array_buffer_store,
compiled_wasm_module_store: options.compiled_wasm_module_store,
op_state: op_state.clone(),
@@ -801,7 +785,8 @@ impl JsRuntime {
let mut state = state_rc.borrow_mut();
let module_map = module_map_rc.borrow();
- let has_pending_ops = !state.pending_ops.is_empty();
+ let has_pending_refed_ops =
+ state.pending_ops.len() > state.unrefed_ops.len();
let has_pending_dyn_imports = module_map.has_pending_dynamic_imports();
let has_pending_dyn_module_evaluation =
!state.pending_dyn_mod_evaluate.is_empty();
@@ -815,7 +800,7 @@ impl JsRuntime {
.map(|i| i.has_active_sessions())
.unwrap_or(false);
- if !has_pending_ops
+ if !has_pending_refed_ops
&& !has_pending_dyn_imports
&& !has_pending_dyn_module_evaluation
&& !has_pending_module_evaluation
@@ -841,7 +826,7 @@ impl JsRuntime {
}
if has_pending_module_evaluation {
- if has_pending_ops
+ if has_pending_refed_ops
|| has_pending_dyn_imports
|| has_pending_dyn_module_evaluation
|| has_pending_background_tasks
@@ -854,7 +839,7 @@ impl JsRuntime {
}
if has_pending_dyn_module_evaluation {
- if has_pending_ops
+ if has_pending_refed_ops
|| has_pending_dyn_imports
|| has_pending_background_tasks
{
@@ -1529,21 +1514,12 @@ impl JsRuntime {
state.have_unpolled_ops = false;
let op_state = state.op_state.clone();
- let ops = AsyncOpIterator {
- ops: &mut state.pending_ops,
- cx,
- };
- for (promise_id, op_id, resp) in ops {
+
+ while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx)
+ {
+ let (promise_id, op_id, resp) = item;
op_state.borrow().tracker.track_async_completed(op_id);
- args.push(v8::Integer::new(scope, promise_id as i32).into());
- args.push(resp.to_v8(scope).unwrap());
- }
- let ops = AsyncOpIterator {
- ops: &mut state.pending_unref_ops,
- cx,
- };
- for (promise_id, op_id, resp) in ops {
- op_state.borrow().tracker.track_unref_completed(op_id);
+ state.unrefed_ops.remove(&promise_id);
args.push(v8::Integer::new(scope, promise_id as i32).into());
args.push(resp.to_v8(scope).unwrap());
}
@@ -1744,6 +1720,76 @@ pub mod tests {
}
#[test]
+ fn test_op_async_promise_id() {
+ let (mut runtime, _dispatch_count) = setup(Mode::Async);
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ const p = Deno.core.opAsync("op_test", 42);
+ if (p[Symbol.for("Deno.core.internalPromiseId")] == undefined) {
+ throw new Error("missing id on returned promise");
+ }
+ "#,
+ )
+ .unwrap();
+ }
+
+ #[test]
+ fn test_ref_unref_ops() {
+ let (mut runtime, _dispatch_count) = setup(Mode::Async);
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ var promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId");
+ var p1 = Deno.core.opAsync("op_test", 42);
+ var p2 = Deno.core.opAsync("op_test", 42);
+ "#,
+ )
+ .unwrap();
+ {
+ let isolate = runtime.v8_isolate();
+ let state_rc = JsRuntime::state(isolate);
+ let state = state_rc.borrow();
+ assert_eq!(state.pending_ops.len(), 2);
+ assert_eq!(state.unrefed_ops.len(), 0);
+ }
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ Deno.core.unrefOp(p1[promiseIdSymbol]);
+ Deno.core.unrefOp(p2[promiseIdSymbol]);
+ "#,
+ )
+ .unwrap();
+ {
+ let isolate = runtime.v8_isolate();
+ let state_rc = JsRuntime::state(isolate);
+ let state = state_rc.borrow();
+ assert_eq!(state.pending_ops.len(), 2);
+ assert_eq!(state.unrefed_ops.len(), 2);
+ }
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ Deno.core.refOp(p1[promiseIdSymbol]);
+ Deno.core.refOp(p2[promiseIdSymbol]);
+ "#,
+ )
+ .unwrap();
+ {
+ let isolate = runtime.v8_isolate();
+ let state_rc = JsRuntime::state(isolate);
+ let state = state_rc.borrow();
+ assert_eq!(state.pending_ops.len(), 2);
+ assert_eq!(state.unrefed_ops.len(), 0);
+ }
+ }
+
+ #[test]
fn test_dispatch_no_zero_copy_buf() {
let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false));
runtime
diff --git a/runtime/js/40_signals.js b/runtime/js/40_signals.js
index a4f3a6ccd..2498c40d7 100644
--- a/runtime/js/40_signals.js
+++ b/runtime/js/40_signals.js
@@ -5,6 +5,7 @@
const core = window.Deno.core;
const {
Set,
+ SymbolFor,
TypeError,
} = window.__bootstrap.primordials;
@@ -13,7 +14,9 @@
}
function pollSignal(rid) {
- return core.opAsync("op_signal_poll", rid);
+ const promise = core.opAsync("op_signal_poll", rid);
+ core.unrefOp(promise[SymbolFor("Deno.core.internalPromiseId")]);
+ return promise;
}
function unbindSignal(rid) {
diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs
index aa419c6c8..db30c66d8 100644
--- a/runtime/ops/signal.rs
+++ b/runtime/ops/signal.rs
@@ -4,7 +4,7 @@ use deno_core::error::generic_error;
#[cfg(not(target_os = "windows"))]
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::op_async_unref;
+use deno_core::op_async;
use deno_core::op_sync;
use deno_core::Extension;
use deno_core::OpState;
@@ -33,7 +33,7 @@ pub fn init() -> Extension {
.ops(vec![
("op_signal_bind", op_sync(op_signal_bind)),
("op_signal_unbind", op_sync(op_signal_unbind)),
- ("op_signal_poll", op_async_unref(op_signal_poll)),
+ ("op_signal_poll", op_async(op_signal_poll)),
])
.build()
}