diff options
-rw-r--r-- | core/01_core.js | 9 | ||||
-rw-r--r-- | core/bindings.rs | 57 | ||||
-rw-r--r-- | core/lib.deno_core.d.ts | 8 | ||||
-rw-r--r-- | core/lib.rs | 1 | ||||
-rw-r--r-- | core/ops.rs | 5 | ||||
-rw-r--r-- | core/ops_json.rs | 33 | ||||
-rw-r--r-- | core/ops_metrics.rs | 14 | ||||
-rw-r--r-- | core/runtime.rs | 120 | ||||
-rw-r--r-- | runtime/js/40_signals.js | 5 | ||||
-rw-r--r-- | runtime/ops/signal.rs | 4 |
10 files changed, 160 insertions, 96 deletions
diff --git a/core/01_core.js b/core/01_core.js index 75bfc884f..c3fd7cf9d 100644 --- a/core/01_core.js +++ b/core/01_core.js @@ -23,6 +23,7 @@ MapPrototypeSet, PromisePrototypeThen, ObjectAssign, + SymbolFor, } = window.__bootstrap.primordials; // Available on start due to bindings. @@ -43,6 +44,9 @@ const RING_SIZE = 4 * 1024; const NO_PROMISE = null; // Alias to null is faster than plain nulls const promiseRing = ArrayPrototypeFill(new Array(RING_SIZE), NO_PROMISE); + // TODO(bartlomieju): it future use `v8::Private` so it's not visible + // to users. Currently missing bindings. + const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); function setPromise(promiseId) { const idx = promiseId % RING_SIZE; @@ -135,7 +139,10 @@ const maybeError = opcallAsync(opsCache[opName], promiseId, arg1, arg2); // Handle sync error (e.g: error parsing args) if (maybeError) return unwrapOpResult(maybeError); - return PromisePrototypeThen(setPromise(promiseId), unwrapOpResult); + const p = PromisePrototypeThen(setPromise(promiseId), unwrapOpResult); + // Save the id on the promise so it can later be ref'ed or unref'ed + p[promiseIdSymbol] = promiseId; + return p; } function opSync(opName, arg1 = null, arg2 = null) { diff --git a/core/bindings.rs b/core/bindings.rs index f5eb1705e..b18d24c79 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -36,6 +36,12 @@ lazy_static::lazy_static! { function: opcall_sync.map_fn_to() }, v8::ExternalReference { + function: ref_op.map_fn_to() + }, + v8::ExternalReference { + function: unref_op.map_fn_to() + }, + v8::ExternalReference { function: set_macrotask_callback.map_fn_to() }, v8::ExternalReference { @@ -151,6 +157,8 @@ pub fn initialize_context<'s>( // Bind functions to Deno.core.* set_func(scope, core_val, "opcallSync", opcall_sync); set_func(scope, core_val, "opcallAsync", opcall_async); + set_func(scope, core_val, "refOp", ref_op); + set_func(scope, core_val, "unrefOp", unref_op); set_func( scope, core_val, @@ -453,17 +461,56 @@ fn opcall_async<'s>( state.pending_ops.push(fut); state.have_unpolled_ops = true; } - Op::AsyncUnref(fut) => { - state.op_state.borrow().tracker.track_unref(op_id); - state.pending_unref_ops.push(fut); - state.have_unpolled_ops = true; - } Op::NotFound => { throw_type_error(scope, format!("Unknown op id: {}", op_id)); } } } +fn ref_op<'s>( + scope: &mut v8::HandleScope<'s>, + args: v8::FunctionCallbackArguments, + _rv: v8::ReturnValue, +) { + let state_rc = JsRuntime::state(scope); + let mut state = state_rc.borrow_mut(); + + let promise_id = match v8::Local::<v8::Integer>::try_from(args.get(0)) + .map(|l| l.value() as PromiseId) + .map_err(Error::from) + { + Ok(promise_id) => promise_id, + Err(err) => { + throw_type_error(scope, format!("invalid promise id: {}", err)); + return; + } + }; + + state.unrefed_ops.remove(&promise_id); +} + +fn unref_op<'s>( + scope: &mut v8::HandleScope<'s>, + args: v8::FunctionCallbackArguments, + _rv: v8::ReturnValue, +) { + let state_rc = JsRuntime::state(scope); + let mut state = state_rc.borrow_mut(); + + let promise_id = match v8::Local::<v8::Integer>::try_from(args.get(0)) + .map(|l| l.value() as PromiseId) + .map_err(Error::from) + { + Ok(promise_id) => promise_id, + Err(err) => { + throw_type_error(scope, format!("invalid promise id: {}", err)); + return; + } + }; + + state.unrefed_ops.insert(promise_id); +} + fn has_tick_scheduled( scope: &mut v8::HandleScope, _args: v8::FunctionCallbackArguments, diff --git a/core/lib.deno_core.d.ts b/core/lib.deno_core.d.ts index f33f6164a..59b2df542 100644 --- a/core/lib.deno_core.d.ts +++ b/core/lib.deno_core.d.ts @@ -21,6 +21,14 @@ declare namespace Deno { b?: any, ): Promise<any>; + /** Mark following promise as "ref", ie. event loop won't exit + * until all "ref" promises are resolved. All async ops are "ref" by default. */ + function refOp(promiseId: number): void; + + /** Mark following promise as "unref", ie. event loop will exit + * if there are only "unref" promises left. */ + function unrefOps(promiseId: number): void; + /** * Retrieve a list of all registered ops, in the form of a map that maps op * name to internal numerical op id. diff --git a/core/lib.rs b/core/lib.rs index 87994720f..f47db7f2e 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -83,7 +83,6 @@ pub use crate::ops_builtin::op_close; pub use crate::ops_builtin::op_print; pub use crate::ops_builtin::op_resources; pub use crate::ops_json::op_async; -pub use crate::ops_json::op_async_unref; pub use crate::ops_json::op_sync; pub use crate::ops_json::void_op_async; pub use crate::ops_json::void_op_sync; diff --git a/core/ops.rs b/core/ops.rs index 13f001146..6b2c06397 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -80,7 +80,7 @@ where } } -pub type PromiseId = u64; +pub type PromiseId = i32; pub type OpAsyncFuture = OpCall<(PromiseId, OpId, OpResult)>; pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static; pub type OpId = usize; @@ -111,9 +111,6 @@ impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> { pub enum Op { Sync(OpResult), Async(OpAsyncFuture), - /// AsyncUnref is the variation of Async, which doesn't block the program - /// exiting. - AsyncUnref(OpAsyncFuture), NotFound, } diff --git a/core/ops_json.rs b/core/ops_json.rs index b3153763e..ad4aeeb47 100644 --- a/core/ops_json.rs +++ b/core/ops_json.rs @@ -67,7 +67,6 @@ where /// Creates an op that passes data asynchronously using JSON. /// /// When this op is dispatched, the runtime doesn't exit while processing it. -/// Use op_async_unref instead if you want to make the runtime exit while processing it. /// /// The provided function `op_fn` has the following parameters: /// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op. @@ -118,38 +117,6 @@ where }) } -/// Creates an op that passes data asynchronously using JSON. -/// -/// When this op is dispatched, the runtime still can exit while processing it. -/// -/// The other usages are the same as `op_async`. -pub fn op_async_unref<F, A, B, R, RV>(op_fn: F) -> Box<OpFn> -where - F: Fn(Rc<RefCell<OpState>>, A, B) -> R + 'static, - A: DeserializeOwned, - B: DeserializeOwned, - R: Future<Output = Result<RV, Error>> + 'static, - RV: Serialize + 'static, -{ - Box::new(move |state, payload| -> Op { - let op_id = payload.op_id; - let pid = payload.promise_id; - // Deserialize args, sync error on failure - let args = match payload.deserialize() { - Ok(args) => args, - Err(err) => { - return Op::Sync(serialize_op_result(Err::<(), Error>(err), state)) - } - }; - let (a, b) = args; - - use crate::futures::FutureExt; - let fut = op_fn(state.clone(), a, b) - .map(move |result| (pid, op_id, serialize_op_result(result, state))); - Op::AsyncUnref(OpCall::eager(fut)) - }) -} - #[cfg(test)] mod tests { use super::*; diff --git a/core/ops_metrics.rs b/core/ops_metrics.rs index 5c40a47ca..f2e9bc255 100644 --- a/core/ops_metrics.rs +++ b/core/ops_metrics.rs @@ -11,10 +11,12 @@ pub struct OpMetrics { pub ops_dispatched: u64, pub ops_dispatched_sync: u64, pub ops_dispatched_async: u64, + // TODO(bartlomieju): this field is never updated pub ops_dispatched_async_unref: u64, pub ops_completed: u64, pub ops_completed_sync: u64, pub ops_completed_async: u64, + // TODO(bartlomieju): this field is never updated pub ops_completed_async_unref: u64, pub bytes_sent_control: u64, pub bytes_sent_data: u64, @@ -84,16 +86,4 @@ impl OpsTracker { metrics.ops_completed += 1; metrics.ops_completed_async += 1; } - - pub fn track_unref(&self, id: OpId) { - let metrics = &mut self.metrics_mut(id); - metrics.ops_dispatched += 1; - metrics.ops_dispatched_async_unref += 1; - } - - pub fn track_unref_completed(&self, id: OpId) { - let metrics = &mut self.metrics_mut(id); - metrics.ops_completed += 1; - metrics.ops_completed_async_unref += 1; - } } diff --git a/core/runtime.rs b/core/runtime.rs index 3af090a1c..ad7f16886 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -29,6 +29,7 @@ use futures::task::AtomicWaker; use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; +use std::collections::HashSet; use std::ffi::c_void; use std::mem::forget; use std::option::Option; @@ -135,23 +136,6 @@ pub type SharedArrayBufferStore = pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>; -struct AsyncOpIterator<'a, 'b, 'c> { - ops: &'b mut FuturesUnordered<PendingOpFuture>, - cx: &'a mut Context<'c>, -} - -impl Iterator for AsyncOpIterator<'_, '_, '_> { - type Item = (PromiseId, OpId, OpResult); - - #[inline] - fn next(&mut self) -> Option<Self::Item> { - match self.ops.poll_next_unpin(self.cx) { - Poll::Ready(Some(item)) => Some(item), - _ => None, - } - } -} - /// Internal state for JsRuntime which is stored in one of v8::Isolate's /// embedder slots. pub(crate) struct JsRuntimeState { @@ -171,7 +155,7 @@ pub(crate) struct JsRuntimeState { dyn_module_evaluate_idle_counter: u32, pub(crate) js_error_create_fn: Rc<JsErrorCreateFn>, pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>, - pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>, + pub(crate) unrefed_ops: HashSet<i32>, pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc<RefCell<OpState>>, pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>, @@ -371,7 +355,7 @@ impl JsRuntime { js_wasm_streaming_cb: None, js_error_create_fn, pending_ops: FuturesUnordered::new(), - pending_unref_ops: FuturesUnordered::new(), + unrefed_ops: HashSet::new(), shared_array_buffer_store: options.shared_array_buffer_store, compiled_wasm_module_store: options.compiled_wasm_module_store, op_state: op_state.clone(), @@ -801,7 +785,8 @@ impl JsRuntime { let mut state = state_rc.borrow_mut(); let module_map = module_map_rc.borrow(); - let has_pending_ops = !state.pending_ops.is_empty(); + let has_pending_refed_ops = + state.pending_ops.len() > state.unrefed_ops.len(); let has_pending_dyn_imports = module_map.has_pending_dynamic_imports(); let has_pending_dyn_module_evaluation = !state.pending_dyn_mod_evaluate.is_empty(); @@ -815,7 +800,7 @@ impl JsRuntime { .map(|i| i.has_active_sessions()) .unwrap_or(false); - if !has_pending_ops + if !has_pending_refed_ops && !has_pending_dyn_imports && !has_pending_dyn_module_evaluation && !has_pending_module_evaluation @@ -841,7 +826,7 @@ impl JsRuntime { } if has_pending_module_evaluation { - if has_pending_ops + if has_pending_refed_ops || has_pending_dyn_imports || has_pending_dyn_module_evaluation || has_pending_background_tasks @@ -854,7 +839,7 @@ impl JsRuntime { } if has_pending_dyn_module_evaluation { - if has_pending_ops + if has_pending_refed_ops || has_pending_dyn_imports || has_pending_background_tasks { @@ -1529,21 +1514,12 @@ impl JsRuntime { state.have_unpolled_ops = false; let op_state = state.op_state.clone(); - let ops = AsyncOpIterator { - ops: &mut state.pending_ops, - cx, - }; - for (promise_id, op_id, resp) in ops { + + while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx) + { + let (promise_id, op_id, resp) = item; op_state.borrow().tracker.track_async_completed(op_id); - args.push(v8::Integer::new(scope, promise_id as i32).into()); - args.push(resp.to_v8(scope).unwrap()); - } - let ops = AsyncOpIterator { - ops: &mut state.pending_unref_ops, - cx, - }; - for (promise_id, op_id, resp) in ops { - op_state.borrow().tracker.track_unref_completed(op_id); + state.unrefed_ops.remove(&promise_id); args.push(v8::Integer::new(scope, promise_id as i32).into()); args.push(resp.to_v8(scope).unwrap()); } @@ -1744,6 +1720,76 @@ pub mod tests { } #[test] + fn test_op_async_promise_id() { + let (mut runtime, _dispatch_count) = setup(Mode::Async); + runtime + .execute_script( + "filename.js", + r#" + const p = Deno.core.opAsync("op_test", 42); + if (p[Symbol.for("Deno.core.internalPromiseId")] == undefined) { + throw new Error("missing id on returned promise"); + } + "#, + ) + .unwrap(); + } + + #[test] + fn test_ref_unref_ops() { + let (mut runtime, _dispatch_count) = setup(Mode::Async); + runtime + .execute_script( + "filename.js", + r#" + var promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId"); + var p1 = Deno.core.opAsync("op_test", 42); + var p2 = Deno.core.opAsync("op_test", 42); + "#, + ) + .unwrap(); + { + let isolate = runtime.v8_isolate(); + let state_rc = JsRuntime::state(isolate); + let state = state_rc.borrow(); + assert_eq!(state.pending_ops.len(), 2); + assert_eq!(state.unrefed_ops.len(), 0); + } + runtime + .execute_script( + "filename.js", + r#" + Deno.core.unrefOp(p1[promiseIdSymbol]); + Deno.core.unrefOp(p2[promiseIdSymbol]); + "#, + ) + .unwrap(); + { + let isolate = runtime.v8_isolate(); + let state_rc = JsRuntime::state(isolate); + let state = state_rc.borrow(); + assert_eq!(state.pending_ops.len(), 2); + assert_eq!(state.unrefed_ops.len(), 2); + } + runtime + .execute_script( + "filename.js", + r#" + Deno.core.refOp(p1[promiseIdSymbol]); + Deno.core.refOp(p2[promiseIdSymbol]); + "#, + ) + .unwrap(); + { + let isolate = runtime.v8_isolate(); + let state_rc = JsRuntime::state(isolate); + let state = state_rc.borrow(); + assert_eq!(state.pending_ops.len(), 2); + assert_eq!(state.unrefed_ops.len(), 0); + } + } + + #[test] fn test_dispatch_no_zero_copy_buf() { let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false)); runtime diff --git a/runtime/js/40_signals.js b/runtime/js/40_signals.js index a4f3a6ccd..2498c40d7 100644 --- a/runtime/js/40_signals.js +++ b/runtime/js/40_signals.js @@ -5,6 +5,7 @@ const core = window.Deno.core; const { Set, + SymbolFor, TypeError, } = window.__bootstrap.primordials; @@ -13,7 +14,9 @@ } function pollSignal(rid) { - return core.opAsync("op_signal_poll", rid); + const promise = core.opAsync("op_signal_poll", rid); + core.unrefOp(promise[SymbolFor("Deno.core.internalPromiseId")]); + return promise; } function unbindSignal(rid) { diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs index aa419c6c8..db30c66d8 100644 --- a/runtime/ops/signal.rs +++ b/runtime/ops/signal.rs @@ -4,7 +4,7 @@ use deno_core::error::generic_error; #[cfg(not(target_os = "windows"))] use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::op_async_unref; +use deno_core::op_async; use deno_core::op_sync; use deno_core::Extension; use deno_core::OpState; @@ -33,7 +33,7 @@ pub fn init() -> Extension { .ops(vec![ ("op_signal_bind", op_sync(op_signal_bind)), ("op_signal_unbind", op_sync(op_signal_unbind)), - ("op_signal_poll", op_async_unref(op_signal_poll)), + ("op_signal_poll", op_async(op_signal_poll)), ]) .build() } |