summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/isolate.rs50
-rw-r--r--core/ops.rs3
2 files changed, 52 insertions, 1 deletions
diff --git a/core/isolate.rs b/core/isolate.rs
index f734f687c..e4405b704 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -16,6 +16,7 @@ use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
+use futures::stream::select;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::AtomicWaker;
@@ -178,6 +179,7 @@ pub struct Isolate {
needs_init: bool,
pub(crate) shared: SharedQueue,
pending_ops: FuturesUnordered<PendingOpFuture>,
+ pending_unref_ops: FuturesUnordered<PendingOpFuture>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
pub op_registry: Arc<OpRegistry>,
@@ -340,6 +342,7 @@ impl Isolate {
shared,
needs_init,
pending_ops: FuturesUnordered::new(),
+ pending_unref_ops: FuturesUnordered::new(),
have_unpolled_ops: false,
startup_script,
op_registry: Arc::new(OpRegistry::new()),
@@ -519,6 +522,12 @@ impl Isolate {
self.have_unpolled_ops = true;
None
}
+ Op::AsyncUnref(fut) => {
+ let fut2 = fut.map_ok(move |buf| (op_id, buf));
+ self.pending_unref_ops.push(fut2.boxed());
+ self.have_unpolled_ops = true;
+ None
+ }
}
}
@@ -713,7 +722,9 @@ impl Future for Isolate {
// Now handle actual ops.
inner.have_unpolled_ops = false;
#[allow(clippy::match_wild_err_arm)]
- match inner.pending_ops.poll_next_unpin(cx) {
+ match select(&mut inner.pending_ops, &mut inner.pending_unref_ops)
+ .poll_next_unpin(cx)
+ {
Poll::Ready(Some(Err(_))) => panic!("unexpected op error"),
Poll::Ready(None) => break,
Poll::Pending => break,
@@ -816,6 +827,7 @@ pub mod tests {
pub enum Mode {
Async,
+ AsyncUnref,
OverflowReqSync,
OverflowResSync,
OverflowReqAsync,
@@ -838,6 +850,17 @@ pub mod tests {
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
}
+ Mode::AsyncUnref => {
+ assert_eq!(control.len(), 1);
+ assert_eq!(control[0], 42);
+ let fut = async {
+ // This future never finish.
+ futures::future::pending::<()>().await;
+ let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
+ Ok(buf)
+ };
+ Op::AsyncUnref(fut.boxed())
+ }
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
@@ -956,6 +979,31 @@ pub mod tests {
}
#[test]
+ fn test_poll_async_optional_ops() {
+ run_in_task(|cx| {
+ let (mut isolate, dispatch_count) = setup(Mode::AsyncUnref);
+ js_check(isolate.execute(
+ "check1.js",
+ r#"
+ Deno.core.setAsyncHandler(1, (buf) => {
+ // This handler will never be called
+ assert(false);
+ });
+ let control = new Uint8Array([42]);
+ Deno.core.send(1, control);
+ "#,
+ ));
+ assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
+ // The above op never finish, but isolate can finish
+ // because the op is an unreffed async op.
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
+ })
+ }
+
+ #[test]
fn terminate_execution() {
let (tx, rx) = std::sync::mpsc::channel::<bool>();
let tx_clone = tx.clone();
diff --git a/core/ops.rs b/core/ops.rs
index 7ed142682..e0bdb0184 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -21,6 +21,9 @@ pub type OpResult<E> = Result<Op<E>, E>;
pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture<E>),
+ /// AsyncUnref is the variation of Async, which doesn't block the program
+ /// exiting.
+ AsyncUnref(OpAsyncFuture<E>),
}
pub type CoreError = ();