diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-06-14 16:22:54 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-14 16:22:54 -0600 |
commit | 48c6f7178703d448da229a5baf19efb403416da0 (patch) | |
tree | 176fb4b767cfe71d18d2f24d9d8ec0fa0f4bf171 | |
parent | fc4e4c3e93c337ae2b549cf618f69c87a9647a4f (diff) |
refactor(core): Remove MaybeDone from ops to eventually remove the box (#19508)
This removes MaybeDone from op resolution. While it would be nice to avoid the box, most of the work for that future task is done here.
-rw-r--r-- | core/ops.rs | 46 | ||||
-rw-r--r-- | core/runtime/ops.rs | 97 |
2 files changed, 52 insertions, 91 deletions
diff --git a/core/ops.rs b/core/ops.rs index 372ffe5b2..a5c76e412 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -9,16 +9,13 @@ use crate::runtime::JsRuntimeState; use crate::OpDecl; use crate::OpsTracker; use anyhow::Error; -use futures::future::MaybeDone; use futures::task::AtomicWaker; use futures::Future; -use futures::FutureExt; use pin_project::pin_project; use serde::Serialize; use std::cell::RefCell; use std::ops::Deref; use std::ops::DerefMut; -use std::pin::Pin; use std::ptr::NonNull; use std::rc::Rc; use std::rc::Weak; @@ -30,40 +27,26 @@ pub type PromiseId = i32; pub type OpId = u16; #[pin_project] -pub struct OpCall { +pub struct OpCall<F: Future<Output = OpResult>> { promise_id: PromiseId, op_id: OpId, /// Future is not necessarily Unpin, so we need to pin_project. #[pin] - fut: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>, + fut: F, } -impl OpCall { +impl<F: Future<Output = OpResult>> OpCall<F> { /// Wraps a future; the inner future is polled the usual way (lazily). - pub fn pending( - op_ctx: &OpCtx, - promise_id: PromiseId, - fut: Pin<Box<dyn Future<Output = OpResult> + 'static>>, - ) -> Self { - Self { - op_id: op_ctx.id, - promise_id, - fut: MaybeDone::Future(fut), - } - } - - /// Create a future by specifying its output. This is basically the same as - /// `async { value }` or `futures::future::ready(value)`. - pub fn ready(op_ctx: &OpCtx, promise_id: PromiseId, value: OpResult) -> Self { + pub fn new(op_ctx: &OpCtx, promise_id: PromiseId, fut: F) -> Self { Self { op_id: op_ctx.id, promise_id, - fut: MaybeDone::Done(value), + fut, } } } -impl Future for OpCall { +impl<F: Future<Output = OpResult>> Future for OpCall<F> { type Output = (PromiseId, OpId, OpResult); fn poll( @@ -72,21 +55,8 @@ impl Future for OpCall { ) -> std::task::Poll<Self::Output> { let promise_id = self.promise_id; let op_id = self.op_id; - let fut = &mut *self.project().fut; - match fut { - MaybeDone::Done(_) => { - // Let's avoid using take_output as it keeps our Pin::box - let res = std::mem::replace(fut, MaybeDone::Gone); - let MaybeDone::Done(res) = res - else { - unreachable!() - }; - std::task::Poll::Ready(res) - } - MaybeDone::Future(f) => f.poll_unpin(cx), - MaybeDone::Gone => std::task::Poll::Pending, - } - .map(move |res| (promise_id, op_id, res)) + let fut = self.project().fut; + fut.poll(cx).map(move |res| (promise_id, op_id, res)) } } diff --git a/core/runtime/ops.rs b/core/runtime/ops.rs index c9e7fa6c7..1a67a1d66 100644 --- a/core/runtime/ops.rs +++ b/core/runtime/ops.rs @@ -3,13 +3,13 @@ use crate::ops::*; use crate::OpResult; use crate::PromiseId; use anyhow::Error; +use futures::future::Either; use futures::future::Future; use futures::future::FutureExt; -use futures::future::MaybeDone; -use futures::task::noop_waker; +use futures::task::noop_waker_ref; use std::cell::RefCell; +use std::future::ready; use std::option::Option; -use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -24,12 +24,10 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>( state.tracker.track_async(ctx.id); state.get_error_class_fn }; - let fut = op - .map(|result| crate::_ops::to_op_result(get_class, result)) - .boxed_local(); - // SAFETY: this this is guaranteed to be running on a current-thread executor + let fut = op.map(|result| crate::_ops::to_op_result(get_class, result)); + // SAFETY: this is guaranteed to be running on a current-thread executor ctx.context_state.borrow_mut().pending_ops.spawn(unsafe { - crate::task::MaskFutureAsSend::new(OpCall::pending(ctx, promise_id, fut)) + crate::task::MaskFutureAsSend::new(OpCall::new(ctx, promise_id, fut)) }); } @@ -37,36 +35,32 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>( pub fn map_async_op1<R: serde::Serialize + 'static>( ctx: &OpCtx, op: impl Future<Output = Result<R, Error>> + 'static, -) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { +) -> impl Future<Output = OpResult> { let get_class = { let state = RefCell::borrow(&ctx.state); state.tracker.track_async(ctx.id); state.get_error_class_fn }; - let fut = op - .map(|result| crate::_ops::to_op_result(get_class, result)) - .boxed_local(); - MaybeDone::Future(fut) + op.map(|res| crate::_ops::to_op_result(get_class, res)) } #[inline] pub fn map_async_op2<R: serde::Serialize + 'static>( ctx: &OpCtx, op: impl Future<Output = R> + 'static, -) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { +) -> impl Future<Output = OpResult> { let state = RefCell::borrow(&ctx.state); state.tracker.track_async(ctx.id); - let fut = op.map(|result| OpResult::Ok(result.into())).boxed_local(); - MaybeDone::Future(fut) + op.map(|res| OpResult::Ok(res.into())) } #[inline] pub fn map_async_op3<R: serde::Serialize + 'static>( ctx: &OpCtx, op: Result<impl Future<Output = Result<R, Error>> + 'static, Error>, -) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { +) -> impl Future<Output = OpResult> { let get_class = { let state = RefCell::borrow(&ctx.state); state.tracker.track_async(ctx.id); @@ -74,12 +68,12 @@ pub fn map_async_op3<R: serde::Serialize + 'static>( }; match op { - Err(err) => MaybeDone::Done(OpResult::Err(OpError::new(get_class, err))), - Ok(fut) => MaybeDone::Future( - fut - .map(|result| crate::_ops::to_op_result(get_class, result)) - .boxed_local(), - ), + Err(err) => { + Either::Left(ready(OpResult::Err(OpError::new(get_class, err)))) + } + Ok(fut) => { + Either::Right(fut.map(|res| crate::_ops::to_op_result(get_class, res))) + } } } @@ -87,7 +81,7 @@ pub fn map_async_op3<R: serde::Serialize + 'static>( pub fn map_async_op4<R: serde::Serialize + 'static>( ctx: &OpCtx, op: Result<impl Future<Output = R> + 'static, Error>, -) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { +) -> impl Future<Output = OpResult> { let get_class = { let state = RefCell::borrow(&ctx.state); state.tracker.track_async(ctx.id); @@ -95,10 +89,10 @@ pub fn map_async_op4<R: serde::Serialize + 'static>( }; match op { - Err(err) => MaybeDone::Done(OpResult::Err(OpError::new(get_class, err))), - Ok(fut) => MaybeDone::Future( - fut.map(|result| OpResult::Ok(result.into())).boxed_local(), - ), + Err(err) => { + Either::Left(ready(OpResult::Err(OpError::new(get_class, err)))) + } + Ok(fut) => Either::Right(fut.map(|r| OpResult::Ok(r.into()))), } } @@ -107,7 +101,7 @@ pub fn queue_async_op<'s>( scope: &'s mut v8::HandleScope, deferred: bool, promise_id: PromiseId, - mut op: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>, + op: impl Future<Output = OpResult> + 'static, ) -> Option<v8::Local<'s, v8::Value>> { // An op's realm (as given by `OpCtx::realm_idx`) must match the realm in // which it is invoked. Otherwise, we might have cross-realm object exposure. @@ -119,38 +113,35 @@ pub fn queue_async_op<'s>( // Some(scope.get_current_context()) // ); - // All ops are polled immediately - let waker = noop_waker(); - let mut cx = Context::from_waker(&waker); + let id = ctx.id; - // Note that MaybeDone returns () from the future - let op_call = match op.poll_unpin(&mut cx) { - Poll::Pending => { - let MaybeDone::Future(fut) = op else { - unreachable!() - }; - OpCall::pending(ctx, promise_id, fut) - } - Poll::Ready(_) => { - let mut op_result = Pin::new(&mut op).take_output().unwrap(); - // If the op is ready and is not marked as deferred we can immediately return - // the result. - if !deferred { + // TODO(mmastrac): We have to poll every future here because that assumption is baked into a large number + // of ops. If we can figure out a way around this, we can remove this call to boxed_local and save a malloc per future. + let mut pinned = op.map(move |res| (promise_id, id, res)).boxed_local(); + + match pinned.poll_unpin(&mut Context::from_waker(noop_waker_ref())) { + Poll::Pending => {} + Poll::Ready(mut res) => { + if deferred { + ctx + .context_state + .borrow_mut() + .pending_ops + // SAFETY: this is guaranteed to be running on a current-thread executor + .spawn(unsafe { crate::task::MaskFutureAsSend::new(ready(res)) }); + return None; + } else { ctx.state.borrow_mut().tracker.track_async_completed(ctx.id); - return Some(op_result.to_v8(scope).unwrap()); + return Some(res.2.to_v8(scope).unwrap()); } - - OpCall::ready(ctx, promise_id, op_result) } - }; + } - // Otherwise we will push it to the `pending_ops` and let it be polled again - // or resolved on the next tick of the event loop. ctx .context_state .borrow_mut() .pending_ops - // SAFETY: this this is guaranteed to be running on a current-thread executor - .spawn(unsafe { crate::task::MaskFutureAsSend::new(op_call) }); + // SAFETY: this is guaranteed to be running on a current-thread executor + .spawn(unsafe { crate::task::MaskFutureAsSend::new(pinned) }); None } |