diff options
Diffstat (limited to 'core/isolate.rs')
-rw-r--r-- | core/isolate.rs | 50 |
1 files changed, 49 insertions, 1 deletions
diff --git a/core/isolate.rs b/core/isolate.rs index f734f687c..e4405b704 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -16,6 +16,7 @@ use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use futures::future::FutureExt; use futures::future::TryFutureExt; +use futures::stream::select; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::task::AtomicWaker; @@ -178,6 +179,7 @@ pub struct Isolate { needs_init: bool, pub(crate) shared: SharedQueue, pending_ops: FuturesUnordered<PendingOpFuture>, + pending_unref_ops: FuturesUnordered<PendingOpFuture>, have_unpolled_ops: bool, startup_script: Option<OwnedScript>, pub op_registry: Arc<OpRegistry>, @@ -340,6 +342,7 @@ impl Isolate { shared, needs_init, pending_ops: FuturesUnordered::new(), + pending_unref_ops: FuturesUnordered::new(), have_unpolled_ops: false, startup_script, op_registry: Arc::new(OpRegistry::new()), @@ -519,6 +522,12 @@ impl Isolate { self.have_unpolled_ops = true; None } + Op::AsyncUnref(fut) => { + let fut2 = fut.map_ok(move |buf| (op_id, buf)); + self.pending_unref_ops.push(fut2.boxed()); + self.have_unpolled_ops = true; + None + } } } @@ -713,7 +722,9 @@ impl Future for Isolate { // Now handle actual ops. inner.have_unpolled_ops = false; #[allow(clippy::match_wild_err_arm)] - match inner.pending_ops.poll_next_unpin(cx) { + match select(&mut inner.pending_ops, &mut inner.pending_unref_ops) + .poll_next_unpin(cx) + { Poll::Ready(Some(Err(_))) => panic!("unexpected op error"), Poll::Ready(None) => break, Poll::Pending => break, @@ -816,6 +827,7 @@ pub mod tests { pub enum Mode { Async, + AsyncUnref, OverflowReqSync, OverflowResSync, OverflowReqAsync, @@ -838,6 +850,17 @@ pub mod tests { let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); Op::Async(futures::future::ok(buf).boxed()) } + Mode::AsyncUnref => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let fut = async { + // This future never finish. + futures::future::pending::<()>().await; + let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); + Ok(buf) + }; + Op::AsyncUnref(fut.boxed()) + } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); @@ -956,6 +979,31 @@ pub mod tests { } #[test] + fn test_poll_async_optional_ops() { + run_in_task(|cx| { + let (mut isolate, dispatch_count) = setup(Mode::AsyncUnref); + js_check(isolate.execute( + "check1.js", + r#" + Deno.core.setAsyncHandler(1, (buf) => { + // This handler will never be called + assert(false); + }); + let control = new Uint8Array([42]); + Deno.core.send(1, control); + "#, + )); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + // The above op never finish, but isolate can finish + // because the op is an unreffed async op. + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); + }) + } + + #[test] fn terminate_execution() { let (tx, rx) = std::sync::mpsc::channel::<bool>(); let tx_clone = tx.clone(); |