summaryrefslogtreecommitdiff
path: root/core/isolate.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/isolate.rs')
-rw-r--r--core/isolate.rs314
1 files changed, 193 insertions, 121 deletions
diff --git a/core/isolate.rs b/core/isolate.rs
index 8b15befa3..079ab5dcf 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -19,20 +19,27 @@ use crate::libdeno::Snapshot2;
use crate::ops::*;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
use futures::stream::FuturesUnordered;
+use futures::stream::IntoStream;
use futures::stream::Stream;
+use futures::stream::StreamExt;
use futures::stream::StreamFuture;
-use futures::task;
-use futures::Async::*;
-use futures::Future;
-use futures::Poll;
+use futures::stream::TryStream;
+use futures::stream::TryStreamExt;
+use futures::task::AtomicWaker;
use libc::c_char;
use libc::c_void;
use std::ffi::CStr;
use std::ffi::CString;
use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
use std::ptr::null;
use std::sync::{Arc, Mutex, Once};
+use std::task::Context;
+use std::task::Poll;
/// Stores a script used to initalize a Isolate
pub struct Script<'a> {
@@ -59,7 +66,7 @@ pub enum RecursiveLoadEvent {
Instantiate(deno_mod),
}
-pub trait ImportStream: Stream {
+pub trait ImportStream: TryStream {
fn register(
&mut self,
source_code_info: SourceCodeInfo,
@@ -67,8 +74,14 @@ pub trait ImportStream: Stream {
) -> Result<(), ErrBox>;
}
-type DynImportStream =
- Box<dyn ImportStream<Item = RecursiveLoadEvent, Error = ErrBox> + Send>;
+type DynImportStream = Box<
+ dyn ImportStream<
+ Ok = RecursiveLoadEvent,
+ Error = ErrBox,
+ Item = Result<RecursiveLoadEvent, ErrBox>,
+ > + Send
+ + Unpin,
+>;
type DynImportFn = dyn Fn(deno_dyn_import_id, &str, &str) -> DynImportStream;
@@ -87,15 +100,23 @@ impl fmt::Debug for DynImportStream {
}
impl Stream for DynImport {
- type Item = (deno_dyn_import_id, RecursiveLoadEvent);
- type Error = (deno_dyn_import_id, ErrBox);
-
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- match self.inner.poll() {
- Ok(Ready(Some(event))) => Ok(Ready(Some((self.id, event)))),
- Ok(Ready(None)) => unreachable!(),
- Err(e) => Err((self.id, e)),
- Ok(NotReady) => Ok(NotReady),
+ type Item = Result<
+ (deno_dyn_import_id, RecursiveLoadEvent),
+ (deno_dyn_import_id, ErrBox),
+ >;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Option<Self::Item>> {
+ let self_inner = self.get_mut();
+ match self_inner.inner.try_poll_next_unpin(cx) {
+ Poll::Ready(Some(Ok(event))) => {
+ Poll::Ready(Some(Ok((self_inner.id, event))))
+ }
+ Poll::Ready(None) => unreachable!(),
+ Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err((self_inner.id, e)))),
+ Poll::Pending => Poll::Pending,
}
}
}
@@ -154,11 +175,12 @@ pub struct Isolate {
needs_init: bool,
shared: SharedQueue,
pending_ops: FuturesUnordered<PendingOpFuture>,
- pending_dyn_imports: FuturesUnordered<StreamFuture<DynImport>>,
+ pending_dyn_imports: FuturesUnordered<StreamFuture<IntoStream<DynImport>>>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
op_registry: OpRegistry,
eager_poll_count: u32,
+ waker: AtomicWaker,
}
unsafe impl Send for Isolate {}
@@ -225,6 +247,7 @@ impl Isolate {
startup_script,
op_registry: OpRegistry::new(),
eager_poll_count: 0,
+ waker: AtomicWaker::new(),
}
}
@@ -296,8 +319,10 @@ impl Isolate {
if let Some(ref f) = isolate.dyn_import {
let inner = f(id, specifier, referrer);
let stream = DynImport { inner, id };
- task::current().notify();
- isolate.pending_dyn_imports.push(stream.into_future());
+ isolate.waker.wake();
+ isolate
+ .pending_dyn_imports
+ .push(stream.into_stream().into_future());
} else {
panic!("dyn_import callback not set")
}
@@ -334,10 +359,11 @@ impl Isolate {
// which case they can be turned into a sync op before we return to V8. This
// can save a boundary crossing.
#[allow(clippy::match_wild_err_arm)]
- match fut.poll() {
- Err(_) => panic!("unexpected op error"),
- Ok(Ready(buf)) => Op::Sync(buf),
- Ok(NotReady) => Op::Async(fut),
+ let mut cx = Context::from_waker(futures::task::noop_waker_ref());
+ match fut.poll_unpin(&mut cx) {
+ Poll::Ready(Err(_)) => panic!("unexpected op error"),
+ Poll::Ready(Ok(buf)) => Op::Sync(buf),
+ Poll::Pending => Op::Async(fut),
}
}
Op::Sync(buf) => Op::Sync(buf),
@@ -359,8 +385,8 @@ impl Isolate {
.expect("unexpected error");
}
Op::Async(fut) => {
- let fut2 = fut.map(move |buf| (op_id, buf));
- isolate.pending_ops.push(Box::new(fut2));
+ let fut2 = fut.map_ok(move |buf| (op_id, buf));
+ isolate.pending_ops.push(fut2.boxed());
isolate.have_unpolled_ops = true;
}
}
@@ -522,42 +548,45 @@ impl Isolate {
self.check_last_exception()
}
- fn poll_dyn_imports(&mut self) -> Poll<(), ErrBox> {
+ fn poll_dyn_imports(&mut self, cx: &mut Context) -> Poll<Result<(), ErrBox>> {
use RecursiveLoadEvent::*;
loop {
- match self.pending_dyn_imports.poll() {
- Ok(NotReady) | Ok(Ready(None)) => {
+ match self.pending_dyn_imports.poll_next_unpin(cx) {
+ Poll::Pending | Poll::Ready(None) => {
// There are no active dynamic import loaders, or none are ready.
- return Ok(futures::Async::Ready(()));
+ return Poll::Ready(Ok(()));
}
- Ok(Ready(Some((
- Some((dyn_import_id, Fetch(source_code_info))),
+ Poll::Ready(Some((
+ Some(Ok((dyn_import_id, Fetch(source_code_info)))),
mut stream,
- )))) => {
+ ))) => {
// A module (not necessarily the one dynamically imported) has been
// fetched. Create and register it, and if successful, poll for the
// next recursive-load event related to this dynamic import.
- match stream.register(source_code_info, self) {
+ match stream.get_mut().register(source_code_info, self) {
Ok(()) => self.pending_dyn_imports.push(stream.into_future()),
Err(err) => {
self.dyn_import_done(dyn_import_id, Err(Some(err.to_string())))?
}
}
}
- Ok(Ready(Some((Some((dyn_import_id, Instantiate(module_id))), _)))) => {
+ Poll::Ready(Some((
+ Some(Ok((dyn_import_id, Instantiate(module_id)))),
+ _,
+ ))) => {
// The top-level module from a dynamic import has been instantiated.
match self.mod_evaluate(module_id) {
Ok(()) => self.dyn_import_done(dyn_import_id, Ok(module_id))?,
Err(..) => self.dyn_import_done(dyn_import_id, Err(None))?,
}
}
- Err(((dyn_import_id, err), _)) => {
+ Poll::Ready(Some((Some(Err((dyn_import_id, err))), _))) => {
// A non-javascript error occurred; this could be due to a an invalid
// module specifier, or a problem with the source map, or a failure
// to fetch the module source code.
self.dyn_import_done(dyn_import_id, Err(Some(err.to_string())))?
}
- Ok(Ready(Some((None, _)))) => unreachable!(),
+ Poll::Ready(Some((None, _))) => unreachable!(),
}
}
}
@@ -654,30 +683,34 @@ impl Drop for LockerScope {
}
impl Future for Isolate {
- type Item = ();
- type Error = ErrBox;
+ type Output = Result<(), ErrBox>;
- fn poll(&mut self) -> Poll<(), ErrBox> {
- self.shared_init();
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+
+ inner.waker.register(cx.waker());
+
+ inner.shared_init();
let mut overflow_response: Option<(OpId, Buf)> = None;
loop {
// If there are any pending dyn_import futures, do those first.
- if !self.pending_dyn_imports.is_empty() {
- self.poll_dyn_imports()?;
+ if !inner.pending_dyn_imports.is_empty() {
+ let poll_imports = inner.poll_dyn_imports(cx)?;
+ assert!(poll_imports.is_ready());
}
// Now handle actual ops.
- self.have_unpolled_ops = false;
- self.eager_poll_count = 0;
+ inner.have_unpolled_ops = false;
+ inner.eager_poll_count = 0;
#[allow(clippy::match_wild_err_arm)]
- match self.pending_ops.poll() {
- Err(_) => panic!("unexpected op error"),
- Ok(Ready(None)) => break,
- Ok(NotReady) => break,
- Ok(Ready(Some((op_id, buf)))) => {
- let successful_push = self.shared.push(op_id, &buf);
+ match inner.pending_ops.poll_next_unpin(cx) {
+ Poll::Ready(Some(Err(_))) => panic!("unexpected op error"),
+ Poll::Ready(None) => break,
+ Poll::Pending => break,
+ Poll::Ready(Some(Ok((op_id, buf)))) => {
+ let successful_push = inner.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
@@ -689,34 +722,34 @@ impl Future for Isolate {
}
}
- if self.shared.size() > 0 {
+ if inner.shared.size() > 0 {
// Lock the current thread for V8.
- let locker = LockerScope::new(self.libdeno_isolate);
- self.respond(None)?;
+ let locker = LockerScope::new(inner.libdeno_isolate);
+ inner.respond(None)?;
// The other side should have shifted off all the messages.
- assert_eq!(self.shared.size(), 0);
+ assert_eq!(inner.shared.size(), 0);
drop(locker);
}
if overflow_response.is_some() {
// Lock the current thread for V8.
- let locker = LockerScope::new(self.libdeno_isolate);
+ let locker = LockerScope::new(inner.libdeno_isolate);
let (op_id, buf) = overflow_response.take().unwrap();
- self.respond(Some((op_id, &buf)))?;
+ inner.respond(Some((op_id, &buf)))?;
drop(locker);
}
- self.check_promise_errors();
- self.check_last_exception()?;
+ inner.check_promise_errors();
+ inner.check_last_exception()?;
// We're idle if pending_ops is empty.
- if self.pending_ops.is_empty() && self.pending_dyn_imports.is_empty() {
- Ok(futures::Async::Ready(()))
+ if inner.pending_ops.is_empty() && inner.pending_dyn_imports.is_empty() {
+ Poll::Ready(Ok(()))
} else {
- if self.have_unpolled_ops {
- task::current().notify();
+ if inner.have_unpolled_ops {
+ inner.waker.wake();
}
- Ok(futures::Async::NotReady)
+ Poll::Pending
}
}
}
@@ -752,33 +785,29 @@ pub fn js_check<T>(r: Result<T, ErrBox>) -> T {
#[cfg(test)]
pub mod tests {
use super::*;
- use futures::executor::spawn;
+ use futures::executor::ThreadPool;
use futures::future::lazy;
- use futures::future::ok;
- use futures::Async;
use std::io;
use std::ops::FnOnce;
use std::sync::atomic::{AtomicUsize, Ordering};
- pub fn run_in_task<F, R>(f: F) -> R
+ pub fn run_in_task<F>(f: F)
where
- F: FnOnce() -> R,
+ F: FnOnce(&mut Context) + Send + 'static,
{
- spawn(lazy(move || ok::<R, ()>(f()))).wait_future().unwrap()
+ let poll = ThreadPool::new().unwrap();
+ poll.spawn_ok(lazy(move |cx| f(cx)));
}
- fn poll_until_ready<F>(
- future: &mut F,
- max_poll_count: usize,
- ) -> Result<F::Item, F::Error>
+ fn poll_until_ready<F>(future: &mut F, max_poll_count: usize) -> F::Output
where
- F: Future,
+ F: Future + Unpin,
{
+ let mut cx = Context::from_waker(futures::task::noop_waker_ref());
for _ in 0..max_poll_count {
- match future.poll() {
- Ok(NotReady) => continue,
- Ok(Ready(val)) => return Ok(val),
- Err(err) => return Err(err),
+ match future.poll_unpin(&mut cx) {
+ Poll::Pending => continue,
+ Poll::Ready(val) => return val,
}
}
panic!(
@@ -799,16 +828,16 @@ pub mod tests {
}
impl Future for DelayedFuture {
- type Item = Box<[u8]>;
- type Error = ();
+ type Output = Result<Box<[u8]>, ()>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.counter > 0 {
- return Ok(Async::Ready(self.buf.clone()));
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ if inner.counter > 0 {
+ return Poll::Ready(Ok(inner.buf.clone()));
}
- self.counter += 1;
- Ok(Async::NotReady)
+ inner.counter += 1;
+ Poll::Pending
}
}
@@ -835,13 +864,13 @@ pub mod tests {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
- Op::Async(Box::new(futures::future::ok(buf)))
+ Op::Async(futures::future::ok(buf).boxed())
}
Mode::AsyncDelayed => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
- Op::Async(Box::new(DelayedFuture::new(buf)))
+ Op::Async(DelayedFuture::new(buf).boxed())
}
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
@@ -860,7 +889,7 @@ pub mod tests {
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
- Op::Async(Box::new(DelayedFuture::new(buf)))
+ Op::Async(DelayedFuture::new(buf).boxed())
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
@@ -869,7 +898,7 @@ pub mod tests {
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
- Op::Async(Box::new(DelayedFuture::new(buf)))
+ Op::Async(DelayedFuture::new(buf).boxed())
}
}
};
@@ -957,7 +986,7 @@ pub mod tests {
#[test]
fn test_poll_async_immediate_ops() {
- run_in_task(|| {
+ run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::AsyncImmediate);
js_check(isolate.execute(
@@ -992,16 +1021,22 @@ pub mod tests {
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
js_check(isolate.execute("check3.js", "assert(nrecv == 0)"));
// We are idle, so the next poll should be the last.
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
});
}
#[test]
fn test_poll_async_delayed_ops() {
- run_in_task(|| {
+ run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::AsyncDelayed);
js_check(isolate.execute(
@@ -1024,7 +1059,10 @@ pub mod tests {
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
js_check(isolate.execute(
"check2.js",
@@ -1035,26 +1073,36 @@ pub mod tests {
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
js_check(isolate.execute("check3.js", "assert(nrecv == 2)"));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
// We are idle, so the next poll should be the last.
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
});
}
struct MockImportStream(Vec<Result<RecursiveLoadEvent, ErrBox>>);
impl Stream for MockImportStream {
- type Item = RecursiveLoadEvent;
- type Error = ErrBox;
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- let event = if self.0.is_empty() {
+ type Item = Result<RecursiveLoadEvent, ErrBox>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ _cx: &mut Context,
+ ) -> Poll<Option<Self::Item>> {
+ let inner = self.get_mut();
+ let event = if inner.0.is_empty() {
None
} else {
- Some(self.0.remove(0)?)
+ Some(inner.0.remove(0))
};
- Ok(Ready(event))
+ Poll::Ready(event)
}
}
@@ -1080,7 +1128,7 @@ pub mod tests {
#[test]
fn dyn_import_err() {
// Test an erroneous dynamic import where the specified module isn't found.
- run_in_task(|| {
+ run_in_task(|cx| {
let count = Arc::new(AtomicUsize::new(0));
let count_ = count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
@@ -1103,8 +1151,10 @@ pub mod tests {
assert_eq!(count.load(Ordering::Relaxed), 1);
// We should get an error here.
- let result = isolate.poll();
- assert!(result.is_err());
+ let result = isolate.poll_unpin(cx);
+ if let Poll::Ready(Ok(_)) = result {
+ unreachable!();
+ }
})
}
@@ -1113,7 +1163,7 @@ pub mod tests {
use std::convert::TryInto;
// Import multiple modules to demonstrate that after failed dynamic import
// another dynamic import can still be run
- run_in_task(|| {
+ run_in_task(|cx| {
let count = Arc::new(AtomicUsize::new(0));
let count_ = count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
@@ -1156,15 +1206,24 @@ pub mod tests {
assert_eq!(count.load(Ordering::Relaxed), 3);
// Now each poll should return error
- assert!(isolate.poll().is_err());
- assert!(isolate.poll().is_err());
- assert!(isolate.poll().is_err());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Err(_)) => true,
+ _ => false,
+ });
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Err(_)) => true,
+ _ => false,
+ });
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Err(_)) => true,
+ _ => false,
+ });
})
}
#[test]
fn dyn_import_ok() {
- run_in_task(|| {
+ run_in_task(|cx| {
let count = Arc::new(AtomicUsize::new(0));
let count_ = count.clone();
@@ -1224,9 +1283,15 @@ pub mod tests {
));
assert_eq!(count.load(Ordering::Relaxed), 1);
- assert_eq!(Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
assert_eq!(count.load(Ordering::Relaxed), 2);
- assert_eq!(Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
assert_eq!(count.load(Ordering::Relaxed), 2);
})
}
@@ -1247,7 +1312,7 @@ pub mod tests {
shared.terminate_execution();
// allow shutdown
- std::thread::sleep(std::time::Duration::from_millis(100));
+ std::thread::sleep(std::time::Duration::from_millis(200));
// unless reported otherwise the test should fail after this point
tx_clone.send(false).ok();
@@ -1345,7 +1410,7 @@ pub mod tests {
#[test]
fn overflow_req_async() {
- run_in_task(|| {
+ run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::OverflowReqAsync);
js_check(isolate.execute(
"overflow_req_async.js",
@@ -1366,14 +1431,17 @@ pub mod tests {
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- assert_eq!(Async::Ready(()), js_check(isolate.poll()));
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
js_check(isolate.execute("check.js", "assert(asyncRecv == 1);"));
});
}
#[test]
fn overflow_res_async() {
- run_in_task(|| {
+ run_in_task(|_cx| {
// TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
// should optimize this.
let (mut isolate, dispatch_count) = setup(Mode::OverflowResAsync);
@@ -1404,7 +1472,7 @@ pub mod tests {
fn overflow_res_multiple_dispatch_async() {
// TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
// should optimize this.
- run_in_task(|| {
+ run_in_task(|_cx| {
let (mut isolate, dispatch_count) = setup(Mode::OverflowResAsync);
js_check(isolate.execute(
"overflow_res_multiple_dispatch_async.js",
@@ -1434,7 +1502,7 @@ pub mod tests {
#[test]
fn test_pre_dispatch() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let (mut isolate, _dispatch_count) = setup(Mode::OverflowResAsync);
js_check(isolate.execute(
"bad_op_id.js",
@@ -1448,13 +1516,15 @@ pub mod tests {
assert(thrown == "Unknown op id: 100");
"#,
));
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) {
+ unreachable!();
+ }
});
}
#[test]
fn test_js() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let (mut isolate, _dispatch_count) = setup(Mode::AsyncImmediate);
js_check(
isolate.execute(
@@ -1462,7 +1532,9 @@ pub mod tests {
include_str!("shared_queue_test.js"),
),
);
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) {
+ unreachable!();
+ }
});
}