diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/lib.rs | 1 | ||||
-rw-r--r-- | core/modules.rs | 3 | ||||
-rw-r--r-- | core/ops.rs | 66 | ||||
-rw-r--r-- | core/ops_json.rs | 7 | ||||
-rw-r--r-- | core/runtime.rs | 13 |
5 files changed, 76 insertions, 14 deletions
diff --git a/core/lib.rs b/core/lib.rs index ea7b322e2..c0419f8ab 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -68,6 +68,7 @@ pub use crate::normalize_path::normalize_path; pub use crate::ops::serialize_op_result; pub use crate::ops::Op; pub use crate::ops::OpAsyncFuture; +pub use crate::ops::OpCall; pub use crate::ops::OpFn; pub use crate::ops::OpId; pub use crate::ops::OpPayload; diff --git a/core/modules.rs b/core/modules.rs index 2af09057f..31e03196a 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -722,6 +722,7 @@ impl ModuleMap { #[cfg(test)] mod tests { use super::*; + use crate::ops::OpCall; use crate::serialize_op_result; use crate::JsRuntime; use crate::Op; @@ -1009,7 +1010,7 @@ mod tests { let (control, _): (u8, ()) = payload.deserialize().unwrap(); assert_eq!(control, 42); let resp = (0, 1, serialize_op_result(Ok(43), state)); - Op::Async(Box::pin(futures::future::ready(resp))) + Op::Async(OpCall::ready(resp)) }; let mut runtime = JsRuntime::new(RuntimeOptions { diff --git a/core/ops.rs b/core/ops.rs index 80bb30eda..05b91f32f 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -6,6 +6,11 @@ use crate::gotham_state::GothamState; use crate::ops_metrics::OpsTracker; use crate::resources::ResourceTable; use crate::runtime::GetErrorClassFn; +use futures::future::maybe_done; +use futures::future::FusedFuture; +use futures::future::MaybeDone; +use futures::ready; +use futures::task::noop_waker; use futures::Future; use indexmap::IndexMap; use rusty_v8 as v8; @@ -17,10 +22,67 @@ use std::ops::Deref; use std::ops::DerefMut; use std::pin::Pin; use std::rc::Rc; +use std::task::Context; +use std::task::Poll; + +/// Wrapper around a Future, which causes that Future to be polled immediately. +/// (Background: ops are stored in a `FuturesUnordered` structure which polls +/// them, but without the `OpCall` wrapper this doesn't happen until the next +/// turn of the event loop, which is too late for certain ops.) +pub struct OpCall<T>(MaybeDone<Pin<Box<dyn Future<Output = T>>>>); + +impl<T> OpCall<T> { + /// Wraps a future, and polls the inner future immediately. + /// This should be the default choice for ops. + pub fn eager(fut: impl Future<Output = T> + 'static) -> Self { + let boxed = Box::pin(fut) as Pin<Box<dyn Future<Output = T>>>; + let mut inner = maybe_done(boxed); + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + let mut pinned = Pin::new(&mut inner); + let _ = pinned.as_mut().poll(&mut cx); + Self(inner) + } + + /// Wraps a future; the inner future is polled the usual way (lazily). + pub fn lazy(fut: impl Future<Output = T> + 'static) -> Self { + let boxed = Box::pin(fut) as Pin<Box<dyn Future<Output = T>>>; + let inner = maybe_done(boxed); + Self(inner) + } + + /// Create a future by specifying its output. This is basically the same as + /// `async { value }` or `futures::future::ready(value)`. + pub fn ready(value: T) -> Self { + Self(MaybeDone::Done(value)) + } +} + +impl<T> Future for OpCall<T> { + type Output = T; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Self::Output> { + let inner = unsafe { &mut self.get_unchecked_mut().0 }; + let mut pinned = Pin::new(inner); + ready!(pinned.as_mut().poll(cx)); + Poll::Ready(pinned.as_mut().take_output().unwrap()) + } +} + +impl<F> FusedFuture for OpCall<F> +where + F: Future, +{ + fn is_terminated(&self) -> bool { + self.0.is_terminated() + } +} pub type PromiseId = u64; -pub type OpAsyncFuture = - Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>; +pub type OpAsyncFuture = OpCall<(PromiseId, OpId, OpResult)>; pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static; pub type OpId = usize; diff --git a/core/ops_json.rs b/core/ops_json.rs index 0ca7e5ce4..dca9a9a77 100644 --- a/core/ops_json.rs +++ b/core/ops_json.rs @@ -1,6 +1,7 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. use crate::error::AnyError; +use crate::ops::OpCall; use crate::serialize_op_result; use crate::Op; use crate::OpFn; @@ -35,7 +36,7 @@ pub fn void_op_async() -> Box<OpFn> { let op_id = payload.op_id; let pid = payload.promise_id; let op_result = serialize_op_result(Ok(()), state); - Op::Async(Box::pin(futures::future::ready((pid, op_id, op_result)))) + Op::Async(OpCall::ready((pid, op_id, op_result))) }) } @@ -127,7 +128,7 @@ where use crate::futures::FutureExt; let fut = op_fn(state.clone(), a, b) .map(move |result| (pid, op_id, serialize_op_result(result, state))); - Op::Async(Box::pin(fut)) + Op::Async(OpCall::eager(fut)) }) } @@ -159,7 +160,7 @@ where use crate::futures::FutureExt; let fut = op_fn(state.clone(), a, b) .map(move |result| (pid, op_id, serialize_op_result(result, state))); - Op::AsyncUnref(Box::pin(fut)) + Op::AsyncUnref(OpCall::eager(fut)) }) } diff --git a/core/runtime.rs b/core/runtime.rs index 1928ff31c..873dcd3f5 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -28,7 +28,6 @@ use futures::future::FutureExt; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::task::AtomicWaker; -use futures::Future; use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; @@ -36,7 +35,6 @@ use std::convert::TryFrom; use std::ffi::c_void; use std::mem::forget; use std::option::Option; -use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; use std::sync::Mutex; @@ -44,8 +42,7 @@ use std::sync::Once; use std::task::Context; use std::task::Poll; -type PendingOpFuture = - Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>; +type PendingOpFuture = OpCall<(PromiseId, OpId, OpResult)>; pub enum Snapshot { Static(&'static [u8]), @@ -1613,6 +1610,7 @@ pub mod tests { use crate::ZeroCopyBuf; use futures::future::lazy; use std::ops::FnOnce; + use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -1645,16 +1643,15 @@ pub mod tests { Mode::Async => { assert_eq!(control, 42); let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state)); - Op::Async(Box::pin(futures::future::ready(resp))) + Op::Async(OpCall::ready(resp)) } Mode::AsyncZeroCopy(has_buffer) => { assert_eq!(buf.is_some(), has_buffer); if let Some(buf) = buf { assert_eq!(buf.len(), 1); } - - let resp = serialize_op_result(Ok(43), rc_op_state); - Op::Async(Box::pin(futures::future::ready((0, 1, resp)))) + let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state)); + Op::Async(OpCall::ready(resp)) } } } |