summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2021-10-09 22:37:19 +0200
committerBert Belder <bertbelder@gmail.com>2021-10-17 19:50:42 +0200
commitff932b411d63269dbd4d30ea6bd0aa5160fd8aff (patch)
tree5dad617aea815c4145262860d6e3b5115224ab92 /core
parentff95fc167d7124f3c7f2c6951070e2c40701cf32 (diff)
fix(core): poll async ops eagerly (#12385)
Currently all async ops are polled lazily, which means that op initialization code is postponed until control is yielded to the event loop. This has some weird consequences, e.g. ```js let listener = Deno.listen(...); let conn_promise = listener.accept(); listener.close(); // `BadResource` is thrown. A reasonable error would be `Interrupted`. let conn = await conn_promise; ``` JavaScript promises are expected to be eagerly evaluated. This patch makes ops actually do that.
Diffstat (limited to 'core')
-rw-r--r--core/lib.rs1
-rw-r--r--core/modules.rs3
-rw-r--r--core/ops.rs66
-rw-r--r--core/ops_json.rs7
-rw-r--r--core/runtime.rs13
5 files changed, 76 insertions, 14 deletions
diff --git a/core/lib.rs b/core/lib.rs
index ea7b322e2..c0419f8ab 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -68,6 +68,7 @@ pub use crate::normalize_path::normalize_path;
pub use crate::ops::serialize_op_result;
pub use crate::ops::Op;
pub use crate::ops::OpAsyncFuture;
+pub use crate::ops::OpCall;
pub use crate::ops::OpFn;
pub use crate::ops::OpId;
pub use crate::ops::OpPayload;
diff --git a/core/modules.rs b/core/modules.rs
index 2af09057f..31e03196a 100644
--- a/core/modules.rs
+++ b/core/modules.rs
@@ -722,6 +722,7 @@ impl ModuleMap {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::ops::OpCall;
use crate::serialize_op_result;
use crate::JsRuntime;
use crate::Op;
@@ -1009,7 +1010,7 @@ mod tests {
let (control, _): (u8, ()) = payload.deserialize().unwrap();
assert_eq!(control, 42);
let resp = (0, 1, serialize_op_result(Ok(43), state));
- Op::Async(Box::pin(futures::future::ready(resp)))
+ Op::Async(OpCall::ready(resp))
};
let mut runtime = JsRuntime::new(RuntimeOptions {
diff --git a/core/ops.rs b/core/ops.rs
index 80bb30eda..05b91f32f 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -6,6 +6,11 @@ use crate::gotham_state::GothamState;
use crate::ops_metrics::OpsTracker;
use crate::resources::ResourceTable;
use crate::runtime::GetErrorClassFn;
+use futures::future::maybe_done;
+use futures::future::FusedFuture;
+use futures::future::MaybeDone;
+use futures::ready;
+use futures::task::noop_waker;
use futures::Future;
use indexmap::IndexMap;
use rusty_v8 as v8;
@@ -17,10 +22,67 @@ use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+
+/// Wrapper around a Future, which causes that Future to be polled immediately.
+/// (Background: ops are stored in a `FuturesUnordered` structure which polls
+/// them, but without the `OpCall` wrapper this doesn't happen until the next
+/// turn of the event loop, which is too late for certain ops.)
+pub struct OpCall<T>(MaybeDone<Pin<Box<dyn Future<Output = T>>>>);
+
+impl<T> OpCall<T> {
+ /// Wraps a future, and polls the inner future immediately.
+ /// This should be the default choice for ops.
+ pub fn eager(fut: impl Future<Output = T> + 'static) -> Self {
+ let boxed = Box::pin(fut) as Pin<Box<dyn Future<Output = T>>>;
+ let mut inner = maybe_done(boxed);
+ let waker = noop_waker();
+ let mut cx = Context::from_waker(&waker);
+ let mut pinned = Pin::new(&mut inner);
+ let _ = pinned.as_mut().poll(&mut cx);
+ Self(inner)
+ }
+
+ /// Wraps a future; the inner future is polled the usual way (lazily).
+ pub fn lazy(fut: impl Future<Output = T> + 'static) -> Self {
+ let boxed = Box::pin(fut) as Pin<Box<dyn Future<Output = T>>>;
+ let inner = maybe_done(boxed);
+ Self(inner)
+ }
+
+ /// Create a future by specifying its output. This is basically the same as
+ /// `async { value }` or `futures::future::ready(value)`.
+ pub fn ready(value: T) -> Self {
+ Self(MaybeDone::Done(value))
+ }
+}
+
+impl<T> Future for OpCall<T> {
+ type Output = T;
+
+ fn poll(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ let inner = unsafe { &mut self.get_unchecked_mut().0 };
+ let mut pinned = Pin::new(inner);
+ ready!(pinned.as_mut().poll(cx));
+ Poll::Ready(pinned.as_mut().take_output().unwrap())
+ }
+}
+
+impl<F> FusedFuture for OpCall<F>
+where
+ F: Future,
+{
+ fn is_terminated(&self) -> bool {
+ self.0.is_terminated()
+ }
+}
pub type PromiseId = u64;
-pub type OpAsyncFuture =
- Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>;
+pub type OpAsyncFuture = OpCall<(PromiseId, OpId, OpResult)>;
pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static;
pub type OpId = usize;
diff --git a/core/ops_json.rs b/core/ops_json.rs
index 0ca7e5ce4..dca9a9a77 100644
--- a/core/ops_json.rs
+++ b/core/ops_json.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::error::AnyError;
+use crate::ops::OpCall;
use crate::serialize_op_result;
use crate::Op;
use crate::OpFn;
@@ -35,7 +36,7 @@ pub fn void_op_async() -> Box<OpFn> {
let op_id = payload.op_id;
let pid = payload.promise_id;
let op_result = serialize_op_result(Ok(()), state);
- Op::Async(Box::pin(futures::future::ready((pid, op_id, op_result))))
+ Op::Async(OpCall::ready((pid, op_id, op_result)))
})
}
@@ -127,7 +128,7 @@ where
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
.map(move |result| (pid, op_id, serialize_op_result(result, state)));
- Op::Async(Box::pin(fut))
+ Op::Async(OpCall::eager(fut))
})
}
@@ -159,7 +160,7 @@ where
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
.map(move |result| (pid, op_id, serialize_op_result(result, state)));
- Op::AsyncUnref(Box::pin(fut))
+ Op::AsyncUnref(OpCall::eager(fut))
})
}
diff --git a/core/runtime.rs b/core/runtime.rs
index 1928ff31c..873dcd3f5 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -28,7 +28,6 @@ use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::AtomicWaker;
-use futures::Future;
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -36,7 +35,6 @@ use std::convert::TryFrom;
use std::ffi::c_void;
use std::mem::forget;
use std::option::Option;
-use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::Mutex;
@@ -44,8 +42,7 @@ use std::sync::Once;
use std::task::Context;
use std::task::Poll;
-type PendingOpFuture =
- Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>;
+type PendingOpFuture = OpCall<(PromiseId, OpId, OpResult)>;
pub enum Snapshot {
Static(&'static [u8]),
@@ -1613,6 +1610,7 @@ pub mod tests {
use crate::ZeroCopyBuf;
use futures::future::lazy;
use std::ops::FnOnce;
+ use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -1645,16 +1643,15 @@ pub mod tests {
Mode::Async => {
assert_eq!(control, 42);
let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state));
- Op::Async(Box::pin(futures::future::ready(resp)))
+ Op::Async(OpCall::ready(resp))
}
Mode::AsyncZeroCopy(has_buffer) => {
assert_eq!(buf.is_some(), has_buffer);
if let Some(buf) = buf {
assert_eq!(buf.len(), 1);
}
-
- let resp = serialize_op_result(Ok(43), rc_op_state);
- Op::Async(Box::pin(futures::future::ready((0, 1, resp))))
+ let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state));
+ Op::Async(OpCall::ready(resp))
}
}
}