diff options
-rw-r--r-- | cli/ops/dispatch_json.rs | 10 | ||||
-rw-r--r-- | cli/state.rs | 8 | ||||
-rw-r--r-- | core/isolate.rs | 50 | ||||
-rw-r--r-- | core/ops.rs | 3 |
4 files changed, 70 insertions, 1 deletions
diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index de1f0a752..87cfff51d 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -14,6 +14,9 @@ pub type AsyncJsonOp = pub enum JsonOp { Sync(Value), Async(AsyncJsonOp), + /// AsyncUnref is the variation of Async, which doesn't block the program + /// exiting. + AsyncUnref(AsyncJsonOp), } fn json_err(err: ErrBox) -> Value { @@ -77,6 +80,13 @@ where }); CoreOp::Async(fut2.boxed()) } + Ok(JsonOp::AsyncUnref(fut)) => { + assert!(promise_id.is_some()); + let fut2 = fut.then(move |result| { + futures::future::ok(serialize_result(promise_id, result)) + }); + CoreOp::AsyncUnref(fut2.boxed()) + } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); if is_sync { diff --git a/cli/state.rs b/cli/state.rs index 02c258280..c07b3d5d0 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -110,6 +110,14 @@ impl ThreadSafeState { }); Op::Async(result_fut.boxed()) } + Op::AsyncUnref(fut) => { + let state = state.clone(); + let result_fut = fut.map_ok(move |buf: Buf| { + state.metrics_op_completed(buf.len()); + buf + }); + Op::AsyncUnref(result_fut.boxed()) + } } } } diff --git a/core/isolate.rs b/core/isolate.rs index f734f687c..e4405b704 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -16,6 +16,7 @@ use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use futures::future::FutureExt; use futures::future::TryFutureExt; +use futures::stream::select; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::task::AtomicWaker; @@ -178,6 +179,7 @@ pub struct Isolate { needs_init: bool, pub(crate) shared: SharedQueue, pending_ops: FuturesUnordered<PendingOpFuture>, + pending_unref_ops: FuturesUnordered<PendingOpFuture>, have_unpolled_ops: bool, startup_script: Option<OwnedScript>, pub op_registry: Arc<OpRegistry>, @@ -340,6 +342,7 @@ impl Isolate { shared, needs_init, pending_ops: FuturesUnordered::new(), + pending_unref_ops: FuturesUnordered::new(), have_unpolled_ops: false, startup_script, op_registry: Arc::new(OpRegistry::new()), @@ -519,6 +522,12 @@ impl Isolate { self.have_unpolled_ops = true; None } + Op::AsyncUnref(fut) => { + let fut2 = fut.map_ok(move |buf| (op_id, buf)); + self.pending_unref_ops.push(fut2.boxed()); + self.have_unpolled_ops = true; + None + } } } @@ -713,7 +722,9 @@ impl Future for Isolate { // Now handle actual ops. inner.have_unpolled_ops = false; #[allow(clippy::match_wild_err_arm)] - match inner.pending_ops.poll_next_unpin(cx) { + match select(&mut inner.pending_ops, &mut inner.pending_unref_ops) + .poll_next_unpin(cx) + { Poll::Ready(Some(Err(_))) => panic!("unexpected op error"), Poll::Ready(None) => break, Poll::Pending => break, @@ -816,6 +827,7 @@ pub mod tests { pub enum Mode { Async, + AsyncUnref, OverflowReqSync, OverflowResSync, OverflowReqAsync, @@ -838,6 +850,17 @@ pub mod tests { let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); Op::Async(futures::future::ok(buf).boxed()) } + Mode::AsyncUnref => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let fut = async { + // This future never finish. + futures::future::pending::<()>().await; + let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); + Ok(buf) + }; + Op::AsyncUnref(fut.boxed()) + } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); @@ -956,6 +979,31 @@ pub mod tests { } #[test] + fn test_poll_async_optional_ops() { + run_in_task(|cx| { + let (mut isolate, dispatch_count) = setup(Mode::AsyncUnref); + js_check(isolate.execute( + "check1.js", + r#" + Deno.core.setAsyncHandler(1, (buf) => { + // This handler will never be called + assert(false); + }); + let control = new Uint8Array([42]); + Deno.core.send(1, control); + "#, + )); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + // The above op never finish, but isolate can finish + // because the op is an unreffed async op. + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); + }) + } + + #[test] fn terminate_execution() { let (tx, rx) = std::sync::mpsc::channel::<bool>(); let tx_clone = tx.clone(); diff --git a/core/ops.rs b/core/ops.rs index 7ed142682..e0bdb0184 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -21,6 +21,9 @@ pub type OpResult<E> = Result<Op<E>, E>; pub enum Op<E> { Sync(Buf), Async(OpAsyncFuture<E>), + /// AsyncUnref is the variation of Async, which doesn't block the program + /// exiting. + AsyncUnref(OpAsyncFuture<E>), } pub type CoreError = (); |