summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/ops.rs46
-rw-r--r--core/runtime/ops.rs97
2 files changed, 52 insertions, 91 deletions
diff --git a/core/ops.rs b/core/ops.rs
index 372ffe5b2..a5c76e412 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -9,16 +9,13 @@ use crate::runtime::JsRuntimeState;
use crate::OpDecl;
use crate::OpsTracker;
use anyhow::Error;
-use futures::future::MaybeDone;
use futures::task::AtomicWaker;
use futures::Future;
-use futures::FutureExt;
use pin_project::pin_project;
use serde::Serialize;
use std::cell::RefCell;
use std::ops::Deref;
use std::ops::DerefMut;
-use std::pin::Pin;
use std::ptr::NonNull;
use std::rc::Rc;
use std::rc::Weak;
@@ -30,40 +27,26 @@ pub type PromiseId = i32;
pub type OpId = u16;
#[pin_project]
-pub struct OpCall {
+pub struct OpCall<F: Future<Output = OpResult>> {
promise_id: PromiseId,
op_id: OpId,
/// Future is not necessarily Unpin, so we need to pin_project.
#[pin]
- fut: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>,
+ fut: F,
}
-impl OpCall {
+impl<F: Future<Output = OpResult>> OpCall<F> {
/// Wraps a future; the inner future is polled the usual way (lazily).
- pub fn pending(
- op_ctx: &OpCtx,
- promise_id: PromiseId,
- fut: Pin<Box<dyn Future<Output = OpResult> + 'static>>,
- ) -> Self {
- Self {
- op_id: op_ctx.id,
- promise_id,
- fut: MaybeDone::Future(fut),
- }
- }
-
- /// Create a future by specifying its output. This is basically the same as
- /// `async { value }` or `futures::future::ready(value)`.
- pub fn ready(op_ctx: &OpCtx, promise_id: PromiseId, value: OpResult) -> Self {
+ pub fn new(op_ctx: &OpCtx, promise_id: PromiseId, fut: F) -> Self {
Self {
op_id: op_ctx.id,
promise_id,
- fut: MaybeDone::Done(value),
+ fut,
}
}
}
-impl Future for OpCall {
+impl<F: Future<Output = OpResult>> Future for OpCall<F> {
type Output = (PromiseId, OpId, OpResult);
fn poll(
@@ -72,21 +55,8 @@ impl Future for OpCall {
) -> std::task::Poll<Self::Output> {
let promise_id = self.promise_id;
let op_id = self.op_id;
- let fut = &mut *self.project().fut;
- match fut {
- MaybeDone::Done(_) => {
- // Let's avoid using take_output as it keeps our Pin::box
- let res = std::mem::replace(fut, MaybeDone::Gone);
- let MaybeDone::Done(res) = res
- else {
- unreachable!()
- };
- std::task::Poll::Ready(res)
- }
- MaybeDone::Future(f) => f.poll_unpin(cx),
- MaybeDone::Gone => std::task::Poll::Pending,
- }
- .map(move |res| (promise_id, op_id, res))
+ let fut = self.project().fut;
+ fut.poll(cx).map(move |res| (promise_id, op_id, res))
}
}
diff --git a/core/runtime/ops.rs b/core/runtime/ops.rs
index c9e7fa6c7..1a67a1d66 100644
--- a/core/runtime/ops.rs
+++ b/core/runtime/ops.rs
@@ -3,13 +3,13 @@ use crate::ops::*;
use crate::OpResult;
use crate::PromiseId;
use anyhow::Error;
+use futures::future::Either;
use futures::future::Future;
use futures::future::FutureExt;
-use futures::future::MaybeDone;
-use futures::task::noop_waker;
+use futures::task::noop_waker_ref;
use std::cell::RefCell;
+use std::future::ready;
use std::option::Option;
-use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
@@ -24,12 +24,10 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
state.tracker.track_async(ctx.id);
state.get_error_class_fn
};
- let fut = op
- .map(|result| crate::_ops::to_op_result(get_class, result))
- .boxed_local();
- // SAFETY: this this is guaranteed to be running on a current-thread executor
+ let fut = op.map(|result| crate::_ops::to_op_result(get_class, result));
+ // SAFETY: this is guaranteed to be running on a current-thread executor
ctx.context_state.borrow_mut().pending_ops.spawn(unsafe {
- crate::task::MaskFutureAsSend::new(OpCall::pending(ctx, promise_id, fut))
+ crate::task::MaskFutureAsSend::new(OpCall::new(ctx, promise_id, fut))
});
}
@@ -37,36 +35,32 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
pub fn map_async_op1<R: serde::Serialize + 'static>(
ctx: &OpCtx,
op: impl Future<Output = Result<R, Error>> + 'static,
-) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> {
+) -> impl Future<Output = OpResult> {
let get_class = {
let state = RefCell::borrow(&ctx.state);
state.tracker.track_async(ctx.id);
state.get_error_class_fn
};
- let fut = op
- .map(|result| crate::_ops::to_op_result(get_class, result))
- .boxed_local();
- MaybeDone::Future(fut)
+ op.map(|res| crate::_ops::to_op_result(get_class, res))
}
#[inline]
pub fn map_async_op2<R: serde::Serialize + 'static>(
ctx: &OpCtx,
op: impl Future<Output = R> + 'static,
-) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> {
+) -> impl Future<Output = OpResult> {
let state = RefCell::borrow(&ctx.state);
state.tracker.track_async(ctx.id);
- let fut = op.map(|result| OpResult::Ok(result.into())).boxed_local();
- MaybeDone::Future(fut)
+ op.map(|res| OpResult::Ok(res.into()))
}
#[inline]
pub fn map_async_op3<R: serde::Serialize + 'static>(
ctx: &OpCtx,
op: Result<impl Future<Output = Result<R, Error>> + 'static, Error>,
-) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> {
+) -> impl Future<Output = OpResult> {
let get_class = {
let state = RefCell::borrow(&ctx.state);
state.tracker.track_async(ctx.id);
@@ -74,12 +68,12 @@ pub fn map_async_op3<R: serde::Serialize + 'static>(
};
match op {
- Err(err) => MaybeDone::Done(OpResult::Err(OpError::new(get_class, err))),
- Ok(fut) => MaybeDone::Future(
- fut
- .map(|result| crate::_ops::to_op_result(get_class, result))
- .boxed_local(),
- ),
+ Err(err) => {
+ Either::Left(ready(OpResult::Err(OpError::new(get_class, err))))
+ }
+ Ok(fut) => {
+ Either::Right(fut.map(|res| crate::_ops::to_op_result(get_class, res)))
+ }
}
}
@@ -87,7 +81,7 @@ pub fn map_async_op3<R: serde::Serialize + 'static>(
pub fn map_async_op4<R: serde::Serialize + 'static>(
ctx: &OpCtx,
op: Result<impl Future<Output = R> + 'static, Error>,
-) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> {
+) -> impl Future<Output = OpResult> {
let get_class = {
let state = RefCell::borrow(&ctx.state);
state.tracker.track_async(ctx.id);
@@ -95,10 +89,10 @@ pub fn map_async_op4<R: serde::Serialize + 'static>(
};
match op {
- Err(err) => MaybeDone::Done(OpResult::Err(OpError::new(get_class, err))),
- Ok(fut) => MaybeDone::Future(
- fut.map(|result| OpResult::Ok(result.into())).boxed_local(),
- ),
+ Err(err) => {
+ Either::Left(ready(OpResult::Err(OpError::new(get_class, err))))
+ }
+ Ok(fut) => Either::Right(fut.map(|r| OpResult::Ok(r.into()))),
}
}
@@ -107,7 +101,7 @@ pub fn queue_async_op<'s>(
scope: &'s mut v8::HandleScope,
deferred: bool,
promise_id: PromiseId,
- mut op: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>,
+ op: impl Future<Output = OpResult> + 'static,
) -> Option<v8::Local<'s, v8::Value>> {
// An op's realm (as given by `OpCtx::realm_idx`) must match the realm in
// which it is invoked. Otherwise, we might have cross-realm object exposure.
@@ -119,38 +113,35 @@ pub fn queue_async_op<'s>(
// Some(scope.get_current_context())
// );
- // All ops are polled immediately
- let waker = noop_waker();
- let mut cx = Context::from_waker(&waker);
+ let id = ctx.id;
- // Note that MaybeDone returns () from the future
- let op_call = match op.poll_unpin(&mut cx) {
- Poll::Pending => {
- let MaybeDone::Future(fut) = op else {
- unreachable!()
- };
- OpCall::pending(ctx, promise_id, fut)
- }
- Poll::Ready(_) => {
- let mut op_result = Pin::new(&mut op).take_output().unwrap();
- // If the op is ready and is not marked as deferred we can immediately return
- // the result.
- if !deferred {
+ // TODO(mmastrac): We have to poll every future here because that assumption is baked into a large number
+ // of ops. If we can figure out a way around this, we can remove this call to boxed_local and save a malloc per future.
+ let mut pinned = op.map(move |res| (promise_id, id, res)).boxed_local();
+
+ match pinned.poll_unpin(&mut Context::from_waker(noop_waker_ref())) {
+ Poll::Pending => {}
+ Poll::Ready(mut res) => {
+ if deferred {
+ ctx
+ .context_state
+ .borrow_mut()
+ .pending_ops
+ // SAFETY: this is guaranteed to be running on a current-thread executor
+ .spawn(unsafe { crate::task::MaskFutureAsSend::new(ready(res)) });
+ return None;
+ } else {
ctx.state.borrow_mut().tracker.track_async_completed(ctx.id);
- return Some(op_result.to_v8(scope).unwrap());
+ return Some(res.2.to_v8(scope).unwrap());
}
-
- OpCall::ready(ctx, promise_id, op_result)
}
- };
+ }
- // Otherwise we will push it to the `pending_ops` and let it be polled again
- // or resolved on the next tick of the event loop.
ctx
.context_state
.borrow_mut()
.pending_ops
- // SAFETY: this this is guaranteed to be running on a current-thread executor
- .spawn(unsafe { crate::task::MaskFutureAsSend::new(op_call) });
+ // SAFETY: this is guaranteed to be running on a current-thread executor
+ .spawn(unsafe { crate::task::MaskFutureAsSend::new(pinned) });
None
}