summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-10-05 11:08:19 +0200
committerGitHub <noreply@github.com>2020-10-05 11:08:19 +0200
commit8d00c32ee2e159ff8b833bf108e6e89c564ef562 (patch)
tree9c8994df53c6c5b2a2133b086987d27ec149ff90
parentf377b611bac0f3c7aaa81f4ec39221d719392922 (diff)
refactor(core): JsRuntime::poll (#7825)
This commit does reorganization of "JsRuntime::poll" to allow fixing of top-level-await bug.
-rw-r--r--core/runtime.rs443
1 files changed, 213 insertions, 230 deletions
diff --git a/core/runtime.rs b/core/runtime.rs
index ecb9828f4..514703f34 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -310,6 +310,10 @@ impl JsRuntime {
state.global_context.clone().unwrap()
}
+ fn v8_isolate(&mut self) -> &mut v8::OwnedIsolate {
+ self.v8_isolate.as_mut().unwrap()
+ }
+
fn setup_isolate(mut isolate: v8::OwnedIsolate) -> v8::OwnedIsolate {
isolate.set_capture_stack_trace_for_uncaught_exceptions(true, 10);
isolate.set_promise_reject_callback(bindings::promise_reject_callback);
@@ -328,7 +332,7 @@ impl JsRuntime {
}
/// Executes a bit of built-in JavaScript to provide Deno.sharedQueue.
- pub(crate) fn shared_init(&mut self) {
+ fn shared_init(&mut self) {
if self.needs_init {
self.needs_init = false;
self.execute("core.js", include_str!("core.js")).unwrap();
@@ -353,15 +357,9 @@ impl JsRuntime {
) -> Result<(), AnyError> {
self.shared_init();
- let state_rc = Self::state(self);
- let state = state_rc.borrow();
-
- let scope = &mut v8::HandleScope::with_context(
- self.v8_isolate.as_mut().unwrap(),
- state.global_context.as_ref().unwrap(),
- );
+ let context = self.global_context();
- drop(state);
+ let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let source = v8::String::new(scope, js_source).unwrap();
let name = v8::String::new(scope, js_filename).unwrap();
@@ -442,16 +440,12 @@ impl JsRuntime {
.replace((boxed_cb, near_heap_limit_callback::<C>));
if let Some((_, prev_cb)) = prev {
self
- .v8_isolate
- .as_mut()
- .unwrap()
+ .v8_isolate()
.remove_near_heap_limit_callback(prev_cb, 0);
}
self
- .v8_isolate
- .as_mut()
- .unwrap()
+ .v8_isolate()
.add_near_heap_limit_callback(near_heap_limit_callback::<C>, data);
}
@@ -459,9 +453,7 @@ impl JsRuntime {
if let Some((_, cb)) = self.allocations.near_heap_limit_callback_data.take()
{
self
- .v8_isolate
- .as_mut()
- .unwrap()
+ .v8_isolate()
.remove_near_heap_limit_callback(cb, heap_limit);
}
}
@@ -492,109 +484,43 @@ impl Future for JsRuntime {
state.waker.register(cx.waker());
}
- let has_preparing = {
- let state = state_rc.borrow();
- !state.preparing_dyn_imports.is_empty()
- };
- if has_preparing {
+ // Dynamic module loading - ie. modules loaded using "import()"
+ {
let poll_imports = runtime.prepare_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
- }
- let has_pending = {
- let state = state_rc.borrow();
- !state.pending_dyn_imports.is_empty()
- };
- if has_pending {
let poll_imports = runtime.poll_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
- }
-
- let scope = &mut v8::HandleScope::with_context(
- &mut **runtime,
- state_rc.borrow().global_context.as_ref().unwrap(),
- );
-
- check_promise_exceptions(scope)?;
-
- let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
-
- loop {
- let mut state = state_rc.borrow_mut();
- // Now handle actual ops.
- state.have_unpolled_ops.set(false);
- let pending_r = state.pending_ops.poll_next_unpin(cx);
- match pending_r {
- Poll::Ready(None) => break,
- Poll::Pending => break,
- Poll::Ready(Some((op_id, buf))) => {
- let successful_push = state.shared.push(op_id, &buf);
- if !successful_push {
- // If we couldn't push the response to the shared queue, because
- // there wasn't enough size, we will return the buffer via the
- // legacy route, using the argument of deno_respond.
- overflow_response = Some((op_id, buf));
- break;
- }
- }
- };
- }
-
- loop {
- let mut state = state_rc.borrow_mut();
- let unref_r = state.pending_unref_ops.poll_next_unpin(cx);
- #[allow(clippy::match_wild_err_arm)]
- match unref_r {
- Poll::Ready(None) => break,
- Poll::Pending => break,
- Poll::Ready(Some((op_id, buf))) => {
- let successful_push = state.shared.push(op_id, &buf);
- if !successful_push {
- // If we couldn't push the response to the shared queue, because
- // there wasn't enough size, we will return the buffer via the
- // legacy route, using the argument of deno_respond.
- overflow_response = Some((op_id, buf));
- break;
- }
- }
- };
+ runtime.check_promise_exceptions()?;
}
+ // Ops
{
- let state = state_rc.borrow();
- if state.shared.size() > 0 {
- drop(state);
- async_op_response(scope, None)?;
- // The other side should have shifted off all the messages.
- let state = state_rc.borrow();
- assert_eq!(state.shared.size(), 0);
- }
+ let overflow_response = runtime.poll_pending_ops(cx);
+ runtime.async_op_response(overflow_response)?;
+ runtime.drain_macrotasks()?;
+ runtime.check_promise_exceptions()?;
}
- {
- if let Some((op_id, buf)) = overflow_response.take() {
- async_op_response(scope, Some((op_id, buf)))?;
- }
-
- drain_macrotasks(scope)?;
+ let state = state_rc.borrow();
+ let is_idle = {
+ state.pending_ops.is_empty()
+ && state.pending_dyn_imports.is_empty()
+ && state.preparing_dyn_imports.is_empty()
+ };
- check_promise_exceptions(scope)?;
+ if is_idle {
+ return Poll::Ready(Ok(()));
}
- let state = state_rc.borrow();
- // We're idle if pending_ops is empty.
- if state.pending_ops.is_empty()
- && state.pending_dyn_imports.is_empty()
- && state.preparing_dyn_imports.is_empty()
- {
- Poll::Ready(Ok(()))
- } else {
- if state.have_unpolled_ops.get() {
- state.waker.wake();
- }
- Poll::Pending
+ // Check if more async ops have been dispatched
+ // during this turn of event loop.
+ if state.have_unpolled_ops.get() {
+ state.waker.wake();
}
+
+ Poll::Pending
}
}
@@ -635,74 +561,6 @@ impl JsRuntimeState {
}
}
-fn async_op_response<'s>(
- scope: &mut v8::HandleScope<'s>,
- maybe_buf: Option<(OpId, Box<[u8]>)>,
-) -> Result<(), AnyError> {
- let context = scope.get_current_context();
- let global: v8::Local<v8::Value> = context.global(scope).into();
- let js_recv_cb = JsRuntime::state(scope)
- .borrow()
- .js_recv_cb
- .as_ref()
- .map(|cb| v8::Local::new(scope, cb))
- .expect("Deno.core.recv has not been called.");
-
- let tc_scope = &mut v8::TryCatch::new(scope);
-
- match maybe_buf {
- Some((op_id, buf)) => {
- let op_id: v8::Local<v8::Value> =
- v8::Integer::new(tc_scope, op_id as i32).into();
- let ui8: v8::Local<v8::Value> =
- boxed_slice_to_uint8array(tc_scope, buf).into();
- js_recv_cb.call(tc_scope, global, &[op_id, ui8])
- }
- None => js_recv_cb.call(tc_scope, global, &[]),
- };
-
- match tc_scope.exception() {
- None => Ok(()),
- Some(exception) => exception_to_err_result(tc_scope, exception),
- }
-}
-
-fn drain_macrotasks<'s>(
- scope: &mut v8::HandleScope<'s>,
-) -> Result<(), AnyError> {
- let context = scope.get_current_context();
- let global: v8::Local<v8::Value> = context.global(scope).into();
-
- let js_macrotask_cb = match JsRuntime::state(scope)
- .borrow_mut()
- .js_macrotask_cb
- .as_ref()
- {
- Some(cb) => v8::Local::new(scope, cb),
- None => return Ok(()),
- };
-
- // Repeatedly invoke macrotask callback until it returns true (done),
- // such that ready microtasks would be automatically run before
- // next macrotask is processed.
- let tc_scope = &mut v8::TryCatch::new(scope);
-
- loop {
- let is_done = js_macrotask_cb.call(tc_scope, global, &[]);
-
- if let Some(exception) = tc_scope.exception() {
- return exception_to_err_result(tc_scope, exception);
- }
-
- let is_done = is_done.unwrap();
- if is_done.is_true() {
- break;
- }
- }
-
- Ok(())
-}
-
pub(crate) fn exception_to_err_result<'s, T>(
scope: &mut v8::HandleScope<'s>,
exception: v8::Local<v8::Value>,
@@ -743,35 +601,6 @@ pub(crate) fn exception_to_err_result<'s, T>(
Err(js_error)
}
-fn check_promise_exceptions<'s>(
- scope: &mut v8::HandleScope<'s>,
-) -> Result<(), AnyError> {
- let state_rc = JsRuntime::state(scope);
- let mut state = state_rc.borrow_mut();
-
- if let Some(&key) = state.pending_promise_exceptions.keys().next() {
- let handle = state.pending_promise_exceptions.remove(&key).unwrap();
- drop(state);
- let exception = v8::Local::new(scope, handle);
- exception_to_err_result(scope, exception)
- } else {
- Ok(())
- }
-}
-
-fn boxed_slice_to_uint8array<'sc>(
- scope: &mut v8::HandleScope<'sc>,
- buf: Box<[u8]>,
-) -> v8::Local<'sc, v8::Uint8Array> {
- assert!(!buf.is_empty());
- let buf_len = buf.len();
- let backing_store = v8::ArrayBuffer::new_backing_store_from_boxed_slice(buf);
- let backing_store_shared = backing_store.make_shared();
- let ab = v8::ArrayBuffer::with_backing_store(scope, &backing_store_shared);
- v8::Uint8Array::new(scope, ab, 0, buf_len)
- .expect("Failed to create UintArray8")
-}
-
// Related to module loading
impl JsRuntime {
/// Low-level module creation.
@@ -784,10 +613,8 @@ impl JsRuntime {
source: &str,
) -> Result<ModuleId, AnyError> {
let state_rc = Self::state(self);
- let scope = &mut v8::HandleScope::with_context(
- &mut **self,
- state_rc.borrow().global_context.as_ref().unwrap(),
- );
+ let context = self.global_context();
+ let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let name_str = v8::String::new(scope, name).unwrap();
let source_str = v8::String::new(scope, source).unwrap();
@@ -840,13 +667,12 @@ impl JsRuntime {
/// be a different type if `RuntimeOptions::js_error_create_fn` has been set.
fn mod_instantiate(&mut self, id: ModuleId) -> Result<(), AnyError> {
let state_rc = Self::state(self);
- let state = state_rc.borrow();
- let scope = &mut v8::HandleScope::with_context(
- &mut **self,
- state.global_context.as_ref().unwrap(),
- );
+ let context = self.global_context();
+
+ let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let tc_scope = &mut v8::TryCatch::new(scope);
+ let state = state_rc.borrow();
let module = match state.modules.get_info(id) {
Some(info) => v8::Local::new(tc_scope, &info.handle),
None if id == 0 => return Ok(()),
@@ -878,11 +704,9 @@ impl JsRuntime {
self.shared_init();
let state_rc = Self::state(self);
+ let context = self.global_context();
- let scope = &mut v8::HandleScope::with_context(
- &mut **self,
- state_rc.borrow().global_context.as_ref().unwrap(),
- );
+ let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let module = state_rc
.borrow()
@@ -946,11 +770,9 @@ impl JsRuntime {
err: AnyError,
) -> Result<(), AnyError> {
let state_rc = Self::state(self);
+ let context = self.global_context();
- let scope = &mut v8::HandleScope::with_context(
- &mut **self,
- state_rc.borrow().global_context.as_ref().unwrap(),
- );
+ let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let resolver_handle = state_rc
.borrow_mut()
@@ -979,13 +801,11 @@ impl JsRuntime {
mod_id: ModuleId,
) -> Result<(), AnyError> {
let state_rc = Self::state(self);
+ let context = self.global_context();
debug!("dyn_import_done {} {:?}", id, mod_id);
assert!(mod_id != 0);
- let scope = &mut v8::HandleScope::with_context(
- &mut **self,
- state_rc.borrow().global_context.as_ref().unwrap(),
- );
+ let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let resolver_handle = state_rc
.borrow_mut()
@@ -1017,6 +837,10 @@ impl JsRuntime {
) -> Poll<Result<(), AnyError>> {
let state_rc = Self::state(self);
+ if state_rc.borrow().preparing_dyn_imports.is_empty() {
+ return Poll::Ready(Ok(()));
+ }
+
loop {
let r = {
let mut state = state_rc.borrow_mut();
@@ -1050,6 +874,11 @@ impl JsRuntime {
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
let state_rc = Self::state(self);
+
+ if state_rc.borrow().pending_dyn_imports.is_empty() {
+ return Poll::Ready(Ok(()));
+ }
+
loop {
let poll_result = {
let mut state = state_rc.borrow_mut();
@@ -1222,6 +1051,163 @@ impl JsRuntime {
let root_id = load.root_module_id.expect("Root module id empty");
self.mod_instantiate(root_id).map(|_| root_id)
}
+
+ fn poll_pending_ops(
+ &mut self,
+ cx: &mut Context,
+ ) -> Option<(OpId, Box<[u8]>)> {
+ let state_rc = Self::state(self);
+ let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
+
+ loop {
+ let mut state = state_rc.borrow_mut();
+ // Now handle actual ops.
+ state.have_unpolled_ops.set(false);
+
+ let pending_r = state.pending_ops.poll_next_unpin(cx);
+ match pending_r {
+ Poll::Ready(None) => break,
+ Poll::Pending => break,
+ Poll::Ready(Some((op_id, buf))) => {
+ let successful_push = state.shared.push(op_id, &buf);
+ if !successful_push {
+ // If we couldn't push the response to the shared queue, because
+ // there wasn't enough size, we will return the buffer via the
+ // legacy route, using the argument of deno_respond.
+ overflow_response = Some((op_id, buf));
+ break;
+ }
+ }
+ };
+ }
+
+ loop {
+ let mut state = state_rc.borrow_mut();
+ let unref_r = state.pending_unref_ops.poll_next_unpin(cx);
+ #[allow(clippy::match_wild_err_arm)]
+ match unref_r {
+ Poll::Ready(None) => break,
+ Poll::Pending => break,
+ Poll::Ready(Some((op_id, buf))) => {
+ let successful_push = state.shared.push(op_id, &buf);
+ if !successful_push {
+ // If we couldn't push the response to the shared queue, because
+ // there wasn't enough size, we will return the buffer via the
+ // legacy route, using the argument of deno_respond.
+ overflow_response = Some((op_id, buf));
+ break;
+ }
+ }
+ };
+ }
+
+ overflow_response
+ }
+
+ fn check_promise_exceptions(&mut self) -> Result<(), AnyError> {
+ let state_rc = Self::state(self);
+ let mut state = state_rc.borrow_mut();
+
+ if state.pending_promise_exceptions.is_empty() {
+ return Ok(());
+ }
+
+ let key = { *state.pending_promise_exceptions.keys().next().unwrap() };
+ let handle = state.pending_promise_exceptions.remove(&key).unwrap();
+ drop(state);
+
+ let context = self.global_context();
+ let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
+
+ let exception = v8::Local::new(scope, handle);
+ exception_to_err_result(scope, exception)
+ }
+
+ // Respond using shared queue and optionally overflown response
+ fn async_op_response(
+ &mut self,
+ maybe_overflown_response: Option<(OpId, Box<[u8]>)>,
+ ) -> Result<(), AnyError> {
+ let state_rc = Self::state(self);
+
+ let shared_queue_size = state_rc.borrow().shared.size();
+
+ if shared_queue_size == 0 && maybe_overflown_response.is_none() {
+ return Ok(());
+ }
+
+ // FIXME(bartlomieju): without check above this call would panic
+ // because of lazy initialization in core.js. It seems this lazy initialization
+ // hides unnecessary complexity.
+ let js_recv_cb_handle = state_rc
+ .borrow()
+ .js_recv_cb
+ .clone()
+ .expect("Deno.core.recv has not been called.");
+
+ let context = self.global_context();
+ let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
+ let context = scope.get_current_context();
+ let global: v8::Local<v8::Value> = context.global(scope).into();
+ let js_recv_cb = js_recv_cb_handle.get(scope);
+
+ let tc_scope = &mut v8::TryCatch::new(scope);
+
+ if shared_queue_size > 0 {
+ js_recv_cb.call(tc_scope, global, &[]);
+ // The other side should have shifted off all the messages.
+ let shared_queue_size = state_rc.borrow().shared.size();
+ assert_eq!(shared_queue_size, 0);
+ }
+
+ if let Some(overflown_response) = maybe_overflown_response {
+ let (op_id, buf) = overflown_response;
+ let op_id: v8::Local<v8::Value> =
+ v8::Integer::new(tc_scope, op_id as i32).into();
+ let ui8: v8::Local<v8::Value> =
+ bindings::boxed_slice_to_uint8array(tc_scope, buf).into();
+ js_recv_cb.call(tc_scope, global, &[op_id, ui8]);
+ }
+
+ match tc_scope.exception() {
+ None => Ok(()),
+ Some(exception) => exception_to_err_result(tc_scope, exception),
+ }
+ }
+
+ fn drain_macrotasks(&mut self) -> Result<(), AnyError> {
+ let js_macrotask_cb_handle =
+ match &Self::state(self).borrow().js_macrotask_cb {
+ Some(handle) => handle.clone(),
+ None => return Ok(()),
+ };
+
+ let context = self.global_context();
+ let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
+ let context = scope.get_current_context();
+ let global: v8::Local<v8::Value> = context.global(scope).into();
+ let js_macrotask_cb = js_macrotask_cb_handle.get(scope);
+
+ // Repeatedly invoke macrotask callback until it returns true (done),
+ // such that ready microtasks would be automatically run before
+ // next macrotask is processed.
+ let tc_scope = &mut v8::TryCatch::new(scope);
+
+ loop {
+ let is_done = js_macrotask_cb.call(tc_scope, global, &[]);
+
+ if let Some(exception) = tc_scope.exception() {
+ return exception_to_err_result(tc_scope, exception);
+ }
+
+ let is_done = is_done.unwrap();
+ if is_done.is_true() {
+ break;
+ }
+ }
+
+ Ok(())
+ }
}
#[cfg(test)]
@@ -1516,8 +1502,7 @@ pub mod tests {
let (mut isolate, _dispatch_count) = setup(Mode::Async);
// TODO(piscisaureus): in rusty_v8, the `thread_safe_handle()` method
// should not require a mutable reference to `struct rusty_v8::Isolate`.
- let v8_isolate_handle =
- isolate.v8_isolate.as_mut().unwrap().thread_safe_handle();
+ let v8_isolate_handle = isolate.v8_isolate().thread_safe_handle();
let terminator_thread = std::thread::spawn(move || {
// allow deno to boot and run
@@ -1541,9 +1526,7 @@ pub mod tests {
// TODO(piscisaureus): in rusty_v8, `cancel_terminate_execution()` should
// also be implemented on `struct Isolate`.
let ok = isolate
- .v8_isolate
- .as_mut()
- .unwrap()
+ .v8_isolate()
.thread_safe_handle()
.cancel_terminate_execution();
assert!(ok);
@@ -1563,7 +1546,7 @@ pub mod tests {
let (mut runtime, _dispatch_count) = setup(Mode::Async);
// TODO(piscisaureus): in rusty_v8, the `thread_safe_handle()` method
// should not require a mutable reference to `struct rusty_v8::Isolate`.
- runtime.v8_isolate.as_mut().unwrap().thread_safe_handle()
+ runtime.v8_isolate().thread_safe_handle()
};
// this should not SEGFAULT