diff options
Diffstat (limited to 'core/runtime.rs')
-rw-r--r-- | core/runtime.rs | 152 |
1 files changed, 120 insertions, 32 deletions
diff --git a/core/runtime.rs b/core/runtime.rs index 6820df6bc..3723a917a 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -35,8 +35,10 @@ use futures::channel::oneshot; use futures::future::poll_fn; use futures::future::Future; use futures::future::FutureExt; +use futures::future::MaybeDone; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; +use futures::task::noop_waker; use futures::task::AtomicWaker; use smallvec::SmallVec; use std::any::Any; @@ -45,6 +47,7 @@ use std::collections::HashMap; use std::collections::VecDeque; use std::ffi::c_void; use std::option::Option; +use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; use std::sync::Mutex; @@ -53,8 +56,6 @@ use std::task::Context; use std::task::Poll; use v8::OwnedIsolate; -type PendingOpFuture = OpCall<(RealmIdx, PromiseId, OpId, OpResult)>; - pub enum Snapshot { Static(&'static [u8]), JustCreated(v8::StartupData), @@ -165,7 +166,7 @@ pub struct JsRuntimeState { dyn_module_evaluate_idle_counter: u32, pub(crate) source_map_getter: Option<Rc<Box<dyn SourceMapGetter>>>, pub(crate) source_map_cache: Rc<RefCell<SourceMapCache>>, - pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>, + pub(crate) pending_ops: FuturesUnordered<OpCall>, pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc<RefCell<OpState>>, pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>, @@ -360,7 +361,7 @@ impl JsRuntime { .into_iter() .enumerate() .map(|(id, decl)| { - OpCtx::new(id, 0, Rc::new(decl), op_state.clone(), weak.clone()) + OpCtx::new(id as u16, 0, Rc::new(decl), op_state.clone(), weak.clone()) }) .collect::<Vec<_>>() .into_boxed_slice(); @@ -610,7 +611,7 @@ impl JsRuntime { /// constructed. pub fn create_realm(&mut self) -> Result<JsRealm, Error> { let realm = { - let realm_idx = self.state.borrow().known_realms.len(); + let realm_idx = self.state.borrow().known_realms.len() as u16; let op_ctxs: Box<[OpCtx]> = self .global_realm() @@ -2231,7 +2232,7 @@ impl JsRuntime { { let (realm_idx, promise_id, op_id, resp) = item; state.op_state.borrow().tracker.track_async_completed(op_id); - responses_per_realm[realm_idx].push((promise_id, resp)); + responses_per_realm[realm_idx as usize].push((promise_id, resp)); } } @@ -2335,7 +2336,7 @@ impl JsRuntime { { let (realm_idx, promise_id, op_id, mut resp) = item; debug_assert_eq!( - state.known_realms[realm_idx], + state.known_realms[realm_idx as usize], state.global_realm.as_ref().unwrap().context() ); realm_state.unrefed_ops.remove(&promise_id); @@ -2382,27 +2383,106 @@ impl JsRuntime { } #[inline] -pub fn queue_fast_async_op( +pub fn queue_fast_async_op<R: serde::Serialize + 'static>( ctx: &OpCtx, - op: impl Future<Output = (RealmIdx, PromiseId, OpId, OpResult)> + 'static, + promise_id: PromiseId, + op: impl Future<Output = Result<R, Error>> + 'static, ) { let runtime_state = match ctx.runtime_state.upgrade() { Some(rc_state) => rc_state, // atleast 1 Rc is held by the JsRuntime. None => unreachable!(), }; - + let get_class = { + let state = RefCell::borrow(&ctx.state); + state.tracker.track_async(ctx.id); + state.get_error_class_fn + }; + let fut = op + .map(|result| crate::_ops::to_op_result(get_class, result)) + .boxed_local(); let mut state = runtime_state.borrow_mut(); - state.pending_ops.push(OpCall::lazy(op)); + state + .pending_ops + .push(OpCall::pending(ctx, promise_id, fut)); state.have_unpolled_ops = true; } #[inline] +pub fn map_async_op1<R: serde::Serialize + 'static>( + ctx: &OpCtx, + op: impl Future<Output = Result<R, Error>> + 'static, +) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { + let get_class = { + let state = RefCell::borrow(&ctx.state); + state.tracker.track_async(ctx.id); + state.get_error_class_fn + }; + + let fut = op + .map(|result| crate::_ops::to_op_result(get_class, result)) + .boxed_local(); + MaybeDone::Future(fut) +} + +#[inline] +pub fn map_async_op2<R: serde::Serialize + 'static>( + ctx: &OpCtx, + op: impl Future<Output = R> + 'static, +) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { + let state = RefCell::borrow(&ctx.state); + state.tracker.track_async(ctx.id); + + let fut = op.map(|result| OpResult::Ok(result.into())).boxed_local(); + MaybeDone::Future(fut) +} + +#[inline] +pub fn map_async_op3<R: serde::Serialize + 'static>( + ctx: &OpCtx, + op: Result<impl Future<Output = Result<R, Error>> + 'static, Error>, +) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { + let get_class = { + let state = RefCell::borrow(&ctx.state); + state.tracker.track_async(ctx.id); + state.get_error_class_fn + }; + + match op { + Err(err) => MaybeDone::Done(OpResult::Err(OpError::new(get_class, err))), + Ok(fut) => MaybeDone::Future( + fut + .map(|result| crate::_ops::to_op_result(get_class, result)) + .boxed_local(), + ), + } +} + +#[inline] +pub fn map_async_op4<R: serde::Serialize + 'static>( + ctx: &OpCtx, + op: Result<impl Future<Output = R> + 'static, Error>, +) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { + let get_class = { + let state = RefCell::borrow(&ctx.state); + state.tracker.track_async(ctx.id); + state.get_error_class_fn + }; + + match op { + Err(err) => MaybeDone::Done(OpResult::Err(OpError::new(get_class, err))), + Ok(fut) => MaybeDone::Future( + fut.map(|result| OpResult::Ok(result.into())).boxed_local(), + ), + } +} + pub fn queue_async_op<'s>( ctx: &OpCtx, scope: &'s mut v8::HandleScope, deferred: bool, - op: impl Future<Output = (RealmIdx, PromiseId, OpId, OpResult)> + 'static, + promise_id: PromiseId, + mut op: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>, ) -> Option<v8::Local<'s, v8::Value>> { let runtime_state = match ctx.runtime_state.upgrade() { Some(rc_state) => rc_state, @@ -2415,32 +2495,40 @@ pub fn queue_async_op<'s>( // deno_core doesn't currently support such exposure, even though embedders // can cause them, so we panic in debug mode (since the check is expensive). debug_assert_eq!( - runtime_state.borrow().known_realms[ctx.realm_idx].to_local(scope), + runtime_state.borrow().known_realms[ctx.realm_idx as usize].to_local(scope), Some(scope.get_current_context()) ); - match OpCall::eager(op) { - // If the result is ready we'll just return it straight to the caller, so - // we don't have to invoke a JS callback to respond. // This works under the - // assumption that `()` return value is serialized as `null`. - EagerPollResult::Ready((_, _, op_id, mut resp)) if !deferred => { - let resp = resp.to_v8(scope).unwrap(); - ctx.state.borrow_mut().tracker.track_async_completed(op_id); - return Some(resp); - } - EagerPollResult::Ready(op) => { - let ready = OpCall::ready(op); - let mut state = runtime_state.borrow_mut(); - state.pending_ops.push(ready); - state.have_unpolled_ops = true; + // All ops are polled immediately + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + // Note that MaybeDone returns () from the future + let op_call = match op.poll_unpin(&mut cx) { + Poll::Pending => { + let MaybeDone::Future(fut) = op else { + unreachable!() + }; + OpCall::pending(ctx, promise_id, fut) } - EagerPollResult::Pending(op) => { - let mut state = runtime_state.borrow_mut(); - state.pending_ops.push(op); - state.have_unpolled_ops = true; + Poll::Ready(_) => { + let mut op_result = Pin::new(&mut op).take_output().unwrap(); + // If the op is ready and is not marked as deferred we can immediately return + // the result. + if !deferred { + ctx.state.borrow_mut().tracker.track_async_completed(ctx.id); + return Some(op_result.to_v8(scope).unwrap()); + } + + OpCall::ready(ctx, promise_id, op_result) } - } + }; + // Otherwise we will push it to the `pending_ops` and let it be polled again + // or resolved on the next tick of the event loop. + let mut state = runtime_state.borrow_mut(); + state.pending_ops.push(op_call); + state.have_unpolled_ops = true; None } |