diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/Cargo.toml | 2 | ||||
-rw-r--r-- | core/examples/http_bench.rs | 140 | ||||
-rw-r--r-- | core/isolate.rs | 314 | ||||
-rw-r--r-- | core/modules.rs | 198 | ||||
-rw-r--r-- | core/ops.rs | 6 |
5 files changed, 385 insertions, 275 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml index b1f08d234..ee15308ba 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,7 +15,7 @@ path = "lib.rs" [dependencies] downcast-rs = "1.1.1" -futures = "0.1.29" +futures = { version = "0.3", features = [ "thread-pool", "compat" ] } lazy_static = "1.4.0" libc = "0.2.65" log = "0.4.8" diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 8635d4f23..6a9213cbe 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -13,14 +13,20 @@ extern crate log; extern crate lazy_static; use deno::*; -use futures::future::lazy; +use futures::future::FutureExt; +use futures::future::TryFutureExt; use std::env; +use std::future::Future; use std::io::Error; use std::io::ErrorKind; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Mutex; use std::sync::MutexGuard; -use tokio::prelude::*; +use std::task::Poll; +use tokio::prelude::Async; +use tokio::prelude::AsyncRead; +use tokio::prelude::AsyncWrite; static LOGGER: Logger = Logger; struct Logger; @@ -98,10 +104,10 @@ fn test_record_from() { // TODO test From<&[u8]> for Record } -pub type HttpOp = dyn Future<Item = i32, Error = std::io::Error> + Send; +pub type HttpOp = dyn Future<Output = Result<i32, std::io::Error>> + Send; pub type HttpOpHandler = - fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp>; + fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Pin<Box<HttpOp>>; fn http_op( handler: HttpOpHandler, @@ -117,53 +123,28 @@ fn http_op( let fut = Box::new( op.and_then(move |result| { record_a.result = result; - Ok(record_a) + futures::future::ok(record_a) }) - .or_else(|err| -> Result<Record, ()> { + .or_else(|err| { eprintln!("unexpected err {}", err); record_b.result = -1; - Ok(record_b) + futures::future::ok(record_b) }) - .then(|result| -> Result<Buf, ()> { + .then(|result: Result<Record, ()>| { let record = result.unwrap(); - Ok(record.into()) + futures::future::ok(record.into()) }), ); if is_sync { - Op::Sync(fut.wait().unwrap()) + Op::Sync(futures::executor::block_on(fut).unwrap()) } else { - Op::Async(fut) + Op::Async(fut.boxed()) } } } fn main() { - let main_future = lazy(move || { - // TODO currently isolate.execute() must be run inside tokio, hence the - // lazy(). It would be nice to not have that contraint. Probably requires - // using v8::MicrotasksPolicy::kExplicit - - let js_source = include_str!("http_bench.js"); - - let startup_data = StartupData::Script(Script { - source: js_source, - filename: "http_bench.js", - }); - - let mut isolate = deno::Isolate::new(startup_data, false); - isolate.register_op("listen", http_op(op_listen)); - isolate.register_op("accept", http_op(op_accept)); - isolate.register_op("read", http_op(op_read)); - isolate.register_op("write", http_op(op_write)); - isolate.register_op("close", http_op(op_close)); - - isolate.then(|r| { - js_check(r); - Ok(()) - }) - }); - let args: Vec<String> = env::args().collect(); // NOTE: `--help` arg will display V8 help and exit let args = deno::v8_set_flags(args); @@ -175,12 +156,33 @@ fn main() { log::LevelFilter::Warn }); + let js_source = include_str!("http_bench.js"); + + let startup_data = StartupData::Script(Script { + source: js_source, + filename: "http_bench.js", + }); + + let mut isolate = deno::Isolate::new(startup_data, false); + isolate.register_op("listen", http_op(op_listen)); + isolate.register_op("accept", http_op(op_accept)); + isolate.register_op("read", http_op(op_read)); + isolate.register_op("write", http_op(op_write)); + isolate.register_op("close", http_op(op_close)); + + let main_future = isolate + .then(|r| { + js_check(r); + futures::future::ok(()) + }) + .boxed(); + if args.iter().any(|a| a == "--multi-thread") { println!("multi-thread"); - tokio::run(main_future); + tokio::run(main_future.compat()); } else { println!("single-thread"); - tokio::runtime::current_thread::run(main_future); + tokio::runtime::current_thread::run(main_future.compat()); } } @@ -205,37 +207,47 @@ fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { RESOURCE_TABLE.lock().unwrap() } -fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { +fn op_accept( + record: Record, + _zero_copy_buf: Option<PinnedBuf>, +) -> Pin<Box<HttpOp>> { let rid = record.arg as u32; debug!("accept {}", rid); - let fut = futures::future::poll_fn(move || { + let fut = futures::future::poll_fn(move |_cx| { let mut table = lock_resource_table(); let listener = table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?; - listener.0.poll_accept() + match listener.0.poll_accept() { + Err(e) => Poll::Ready(Err(e)), + Ok(Async::Ready(v)) => Poll::Ready(Ok(v)), + Ok(Async::NotReady) => Poll::Pending, + } }) .and_then(move |(stream, addr)| { debug!("accept success {}", addr); let mut table = lock_resource_table(); let rid = table.add("tcpStream", Box::new(TcpStream(stream))); - Ok(rid as i32) + futures::future::ok(rid as i32) }); - Box::new(fut) + fut.boxed() } fn op_listen( _record: Record, _zero_copy_buf: Option<PinnedBuf>, -) -> Box<HttpOp> { +) -> Pin<Box<HttpOp>> { debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let listener = tokio::net::TcpListener::bind(&addr).unwrap(); let mut table = lock_resource_table(); let rid = table.add("tcpListener", Box::new(TcpListener(listener))); - Box::new(futures::future::ok(rid as i32)) + futures::future::ok(rid as i32).boxed() } -fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { +fn op_close( + record: Record, + _zero_copy_buf: Option<PinnedBuf>, +) -> Pin<Box<HttpOp>> { debug!("close"); let rid = record.arg as u32; let mut table = lock_resource_table(); @@ -243,39 +255,53 @@ fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { Some(_) => futures::future::ok(0), None => futures::future::err(bad_resource()), }; - Box::new(fut) + fut.boxed() } -fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { +fn op_read( + record: Record, + zero_copy_buf: Option<PinnedBuf>, +) -> Pin<Box<HttpOp>> { let rid = record.arg as u32; debug!("read rid={}", rid); let mut zero_copy_buf = zero_copy_buf.unwrap(); - let fut = futures::future::poll_fn(move || { + let fut = futures::future::poll_fn(move |_cx| { let mut table = lock_resource_table(); let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?; - stream.0.poll_read(&mut zero_copy_buf) + match stream.0.poll_read(&mut zero_copy_buf) { + Err(e) => Poll::Ready(Err(e)), + Ok(Async::Ready(v)) => Poll::Ready(Ok(v)), + Ok(Async::NotReady) => Poll::Pending, + } }) .and_then(move |nread| { debug!("read success {}", nread); - Ok(nread as i32) + futures::future::ok(nread as i32) }); - Box::new(fut) + fut.boxed() } -fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { +fn op_write( + record: Record, + zero_copy_buf: Option<PinnedBuf>, +) -> Pin<Box<HttpOp>> { let rid = record.arg as u32; debug!("write rid={}", rid); let zero_copy_buf = zero_copy_buf.unwrap(); - let fut = futures::future::poll_fn(move || { + let fut = futures::future::poll_fn(move |_cx| { let mut table = lock_resource_table(); let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?; - stream.0.poll_write(&zero_copy_buf) + match stream.0.poll_write(&zero_copy_buf) { + Err(e) => Poll::Ready(Err(e)), + Ok(Async::Ready(v)) => Poll::Ready(Ok(v)), + Ok(Async::NotReady) => Poll::Pending, + } }) .and_then(move |nwritten| { debug!("write success {}", nwritten); - Ok(nwritten as i32) + futures::future::ok(nwritten as i32) }); - Box::new(fut) + fut.boxed() } fn js_check(r: Result<(), ErrBox>) { diff --git a/core/isolate.rs b/core/isolate.rs index 8b15befa3..079ab5dcf 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -19,20 +19,27 @@ use crate::libdeno::Snapshot2; use crate::ops::*; use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; +use futures::future::FutureExt; +use futures::future::TryFutureExt; use futures::stream::FuturesUnordered; +use futures::stream::IntoStream; use futures::stream::Stream; +use futures::stream::StreamExt; use futures::stream::StreamFuture; -use futures::task; -use futures::Async::*; -use futures::Future; -use futures::Poll; +use futures::stream::TryStream; +use futures::stream::TryStreamExt; +use futures::task::AtomicWaker; use libc::c_char; use libc::c_void; use std::ffi::CStr; use std::ffi::CString; use std::fmt; +use std::future::Future; +use std::pin::Pin; use std::ptr::null; use std::sync::{Arc, Mutex, Once}; +use std::task::Context; +use std::task::Poll; /// Stores a script used to initalize a Isolate pub struct Script<'a> { @@ -59,7 +66,7 @@ pub enum RecursiveLoadEvent { Instantiate(deno_mod), } -pub trait ImportStream: Stream { +pub trait ImportStream: TryStream { fn register( &mut self, source_code_info: SourceCodeInfo, @@ -67,8 +74,14 @@ pub trait ImportStream: Stream { ) -> Result<(), ErrBox>; } -type DynImportStream = - Box<dyn ImportStream<Item = RecursiveLoadEvent, Error = ErrBox> + Send>; +type DynImportStream = Box< + dyn ImportStream< + Ok = RecursiveLoadEvent, + Error = ErrBox, + Item = Result<RecursiveLoadEvent, ErrBox>, + > + Send + + Unpin, +>; type DynImportFn = dyn Fn(deno_dyn_import_id, &str, &str) -> DynImportStream; @@ -87,15 +100,23 @@ impl fmt::Debug for DynImportStream { } impl Stream for DynImport { - type Item = (deno_dyn_import_id, RecursiveLoadEvent); - type Error = (deno_dyn_import_id, ErrBox); - - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - match self.inner.poll() { - Ok(Ready(Some(event))) => Ok(Ready(Some((self.id, event)))), - Ok(Ready(None)) => unreachable!(), - Err(e) => Err((self.id, e)), - Ok(NotReady) => Ok(NotReady), + type Item = Result< + (deno_dyn_import_id, RecursiveLoadEvent), + (deno_dyn_import_id, ErrBox), + >; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Option<Self::Item>> { + let self_inner = self.get_mut(); + match self_inner.inner.try_poll_next_unpin(cx) { + Poll::Ready(Some(Ok(event))) => { + Poll::Ready(Some(Ok((self_inner.id, event)))) + } + Poll::Ready(None) => unreachable!(), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err((self_inner.id, e)))), + Poll::Pending => Poll::Pending, } } } @@ -154,11 +175,12 @@ pub struct Isolate { needs_init: bool, shared: SharedQueue, pending_ops: FuturesUnordered<PendingOpFuture>, - pending_dyn_imports: FuturesUnordered<StreamFuture<DynImport>>, + pending_dyn_imports: FuturesUnordered<StreamFuture<IntoStream<DynImport>>>, have_unpolled_ops: bool, startup_script: Option<OwnedScript>, op_registry: OpRegistry, eager_poll_count: u32, + waker: AtomicWaker, } unsafe impl Send for Isolate {} @@ -225,6 +247,7 @@ impl Isolate { startup_script, op_registry: OpRegistry::new(), eager_poll_count: 0, + waker: AtomicWaker::new(), } } @@ -296,8 +319,10 @@ impl Isolate { if let Some(ref f) = isolate.dyn_import { let inner = f(id, specifier, referrer); let stream = DynImport { inner, id }; - task::current().notify(); - isolate.pending_dyn_imports.push(stream.into_future()); + isolate.waker.wake(); + isolate + .pending_dyn_imports + .push(stream.into_stream().into_future()); } else { panic!("dyn_import callback not set") } @@ -334,10 +359,11 @@ impl Isolate { // which case they can be turned into a sync op before we return to V8. This // can save a boundary crossing. #[allow(clippy::match_wild_err_arm)] - match fut.poll() { - Err(_) => panic!("unexpected op error"), - Ok(Ready(buf)) => Op::Sync(buf), - Ok(NotReady) => Op::Async(fut), + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + match fut.poll_unpin(&mut cx) { + Poll::Ready(Err(_)) => panic!("unexpected op error"), + Poll::Ready(Ok(buf)) => Op::Sync(buf), + Poll::Pending => Op::Async(fut), } } Op::Sync(buf) => Op::Sync(buf), @@ -359,8 +385,8 @@ impl Isolate { .expect("unexpected error"); } Op::Async(fut) => { - let fut2 = fut.map(move |buf| (op_id, buf)); - isolate.pending_ops.push(Box::new(fut2)); + let fut2 = fut.map_ok(move |buf| (op_id, buf)); + isolate.pending_ops.push(fut2.boxed()); isolate.have_unpolled_ops = true; } } @@ -522,42 +548,45 @@ impl Isolate { self.check_last_exception() } - fn poll_dyn_imports(&mut self) -> Poll<(), ErrBox> { + fn poll_dyn_imports(&mut self, cx: &mut Context) -> Poll<Result<(), ErrBox>> { use RecursiveLoadEvent::*; loop { - match self.pending_dyn_imports.poll() { - Ok(NotReady) | Ok(Ready(None)) => { + match self.pending_dyn_imports.poll_next_unpin(cx) { + Poll::Pending | Poll::Ready(None) => { // There are no active dynamic import loaders, or none are ready. - return Ok(futures::Async::Ready(())); + return Poll::Ready(Ok(())); } - Ok(Ready(Some(( - Some((dyn_import_id, Fetch(source_code_info))), + Poll::Ready(Some(( + Some(Ok((dyn_import_id, Fetch(source_code_info)))), mut stream, - )))) => { + ))) => { // A module (not necessarily the one dynamically imported) has been // fetched. Create and register it, and if successful, poll for the // next recursive-load event related to this dynamic import. - match stream.register(source_code_info, self) { + match stream.get_mut().register(source_code_info, self) { Ok(()) => self.pending_dyn_imports.push(stream.into_future()), Err(err) => { self.dyn_import_done(dyn_import_id, Err(Some(err.to_string())))? } } } - Ok(Ready(Some((Some((dyn_import_id, Instantiate(module_id))), _)))) => { + Poll::Ready(Some(( + Some(Ok((dyn_import_id, Instantiate(module_id)))), + _, + ))) => { // The top-level module from a dynamic import has been instantiated. match self.mod_evaluate(module_id) { Ok(()) => self.dyn_import_done(dyn_import_id, Ok(module_id))?, Err(..) => self.dyn_import_done(dyn_import_id, Err(None))?, } } - Err(((dyn_import_id, err), _)) => { + Poll::Ready(Some((Some(Err((dyn_import_id, err))), _))) => { // A non-javascript error occurred; this could be due to a an invalid // module specifier, or a problem with the source map, or a failure // to fetch the module source code. self.dyn_import_done(dyn_import_id, Err(Some(err.to_string())))? } - Ok(Ready(Some((None, _)))) => unreachable!(), + Poll::Ready(Some((None, _))) => unreachable!(), } } } @@ -654,30 +683,34 @@ impl Drop for LockerScope { } impl Future for Isolate { - type Item = (); - type Error = ErrBox; + type Output = Result<(), ErrBox>; - fn poll(&mut self) -> Poll<(), ErrBox> { - self.shared_init(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + + inner.waker.register(cx.waker()); + + inner.shared_init(); let mut overflow_response: Option<(OpId, Buf)> = None; loop { // If there are any pending dyn_import futures, do those first. - if !self.pending_dyn_imports.is_empty() { - self.poll_dyn_imports()?; + if !inner.pending_dyn_imports.is_empty() { + let poll_imports = inner.poll_dyn_imports(cx)?; + assert!(poll_imports.is_ready()); } // Now handle actual ops. - self.have_unpolled_ops = false; - self.eager_poll_count = 0; + inner.have_unpolled_ops = false; + inner.eager_poll_count = 0; #[allow(clippy::match_wild_err_arm)] - match self.pending_ops.poll() { - Err(_) => panic!("unexpected op error"), - Ok(Ready(None)) => break, - Ok(NotReady) => break, - Ok(Ready(Some((op_id, buf)))) => { - let successful_push = self.shared.push(op_id, &buf); + match inner.pending_ops.poll_next_unpin(cx) { + Poll::Ready(Some(Err(_))) => panic!("unexpected op error"), + Poll::Ready(None) => break, + Poll::Pending => break, + Poll::Ready(Some(Ok((op_id, buf)))) => { + let successful_push = inner.shared.push(op_id, &buf); if !successful_push { // If we couldn't push the response to the shared queue, because // there wasn't enough size, we will return the buffer via the @@ -689,34 +722,34 @@ impl Future for Isolate { } } - if self.shared.size() > 0 { + if inner.shared.size() > 0 { // Lock the current thread for V8. - let locker = LockerScope::new(self.libdeno_isolate); - self.respond(None)?; + let locker = LockerScope::new(inner.libdeno_isolate); + inner.respond(None)?; // The other side should have shifted off all the messages. - assert_eq!(self.shared.size(), 0); + assert_eq!(inner.shared.size(), 0); drop(locker); } if overflow_response.is_some() { // Lock the current thread for V8. - let locker = LockerScope::new(self.libdeno_isolate); + let locker = LockerScope::new(inner.libdeno_isolate); let (op_id, buf) = overflow_response.take().unwrap(); - self.respond(Some((op_id, &buf)))?; + inner.respond(Some((op_id, &buf)))?; drop(locker); } - self.check_promise_errors(); - self.check_last_exception()?; + inner.check_promise_errors(); + inner.check_last_exception()?; // We're idle if pending_ops is empty. - if self.pending_ops.is_empty() && self.pending_dyn_imports.is_empty() { - Ok(futures::Async::Ready(())) + if inner.pending_ops.is_empty() && inner.pending_dyn_imports.is_empty() { + Poll::Ready(Ok(())) } else { - if self.have_unpolled_ops { - task::current().notify(); + if inner.have_unpolled_ops { + inner.waker.wake(); } - Ok(futures::Async::NotReady) + Poll::Pending } } } @@ -752,33 +785,29 @@ pub fn js_check<T>(r: Result<T, ErrBox>) -> T { #[cfg(test)] pub mod tests { use super::*; - use futures::executor::spawn; + use futures::executor::ThreadPool; use futures::future::lazy; - use futures::future::ok; - use futures::Async; use std::io; use std::ops::FnOnce; use std::sync::atomic::{AtomicUsize, Ordering}; - pub fn run_in_task<F, R>(f: F) -> R + pub fn run_in_task<F>(f: F) where - F: FnOnce() -> R, + F: FnOnce(&mut Context) + Send + 'static, { - spawn(lazy(move || ok::<R, ()>(f()))).wait_future().unwrap() + let poll = ThreadPool::new().unwrap(); + poll.spawn_ok(lazy(move |cx| f(cx))); } - fn poll_until_ready<F>( - future: &mut F, - max_poll_count: usize, - ) -> Result<F::Item, F::Error> + fn poll_until_ready<F>(future: &mut F, max_poll_count: usize) -> F::Output where - F: Future, + F: Future + Unpin, { + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); for _ in 0..max_poll_count { - match future.poll() { - Ok(NotReady) => continue, - Ok(Ready(val)) => return Ok(val), - Err(err) => return Err(err), + match future.poll_unpin(&mut cx) { + Poll::Pending => continue, + Poll::Ready(val) => return val, } } panic!( @@ -799,16 +828,16 @@ pub mod tests { } impl Future for DelayedFuture { - type Item = Box<[u8]>; - type Error = (); + type Output = Result<Box<[u8]>, ()>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.counter > 0 { - return Ok(Async::Ready(self.buf.clone())); + fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + if inner.counter > 0 { + return Poll::Ready(Ok(inner.buf.clone())); } - self.counter += 1; - Ok(Async::NotReady) + inner.counter += 1; + Poll::Pending } } @@ -835,13 +864,13 @@ pub mod tests { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(Box::new(futures::future::ok(buf))) + Op::Async(futures::future::ok(buf).boxed()) } Mode::AsyncDelayed => { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(Box::new(DelayedFuture::new(buf))) + Op::Async(DelayedFuture::new(buf).boxed()) } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); @@ -860,7 +889,7 @@ pub mod tests { Mode::OverflowReqAsync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(Box::new(DelayedFuture::new(buf))) + Op::Async(DelayedFuture::new(buf).boxed()) } Mode::OverflowResAsync => { assert_eq!(control.len(), 1); @@ -869,7 +898,7 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; let buf = vec.into_boxed_slice(); - Op::Async(Box::new(DelayedFuture::new(buf))) + Op::Async(DelayedFuture::new(buf).boxed()) } } }; @@ -957,7 +986,7 @@ pub mod tests { #[test] fn test_poll_async_immediate_ops() { - run_in_task(|| { + run_in_task(|cx| { let (mut isolate, dispatch_count) = setup(Mode::AsyncImmediate); js_check(isolate.execute( @@ -992,16 +1021,22 @@ pub mod tests { )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); js_check(isolate.execute("check3.js", "assert(nrecv == 0)")); // We are idle, so the next poll should be the last. - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); }); } #[test] fn test_poll_async_delayed_ops() { - run_in_task(|| { + run_in_task(|cx| { let (mut isolate, dispatch_count) = setup(Mode::AsyncDelayed); js_check(isolate.execute( @@ -1024,7 +1059,10 @@ pub mod tests { "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); js_check(isolate.execute( "check2.js", @@ -1035,26 +1073,36 @@ pub mod tests { "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); js_check(isolate.execute("check3.js", "assert(nrecv == 2)")); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); // We are idle, so the next poll should be the last. - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); }); } struct MockImportStream(Vec<Result<RecursiveLoadEvent, ErrBox>>); impl Stream for MockImportStream { - type Item = RecursiveLoadEvent; - type Error = ErrBox; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - let event = if self.0.is_empty() { + type Item = Result<RecursiveLoadEvent, ErrBox>; + + fn poll_next( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Option<Self::Item>> { + let inner = self.get_mut(); + let event = if inner.0.is_empty() { None } else { - Some(self.0.remove(0)?) + Some(inner.0.remove(0)) }; - Ok(Ready(event)) + Poll::Ready(event) } } @@ -1080,7 +1128,7 @@ pub mod tests { #[test] fn dyn_import_err() { // Test an erroneous dynamic import where the specified module isn't found. - run_in_task(|| { + run_in_task(|cx| { let count = Arc::new(AtomicUsize::new(0)); let count_ = count.clone(); let mut isolate = Isolate::new(StartupData::None, false); @@ -1103,8 +1151,10 @@ pub mod tests { assert_eq!(count.load(Ordering::Relaxed), 1); // We should get an error here. - let result = isolate.poll(); - assert!(result.is_err()); + let result = isolate.poll_unpin(cx); + if let Poll::Ready(Ok(_)) = result { + unreachable!(); + } }) } @@ -1113,7 +1163,7 @@ pub mod tests { use std::convert::TryInto; // Import multiple modules to demonstrate that after failed dynamic import // another dynamic import can still be run - run_in_task(|| { + run_in_task(|cx| { let count = Arc::new(AtomicUsize::new(0)); let count_ = count.clone(); let mut isolate = Isolate::new(StartupData::None, false); @@ -1156,15 +1206,24 @@ pub mod tests { assert_eq!(count.load(Ordering::Relaxed), 3); // Now each poll should return error - assert!(isolate.poll().is_err()); - assert!(isolate.poll().is_err()); - assert!(isolate.poll().is_err()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Err(_)) => true, + _ => false, + }); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Err(_)) => true, + _ => false, + }); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Err(_)) => true, + _ => false, + }); }) } #[test] fn dyn_import_ok() { - run_in_task(|| { + run_in_task(|cx| { let count = Arc::new(AtomicUsize::new(0)); let count_ = count.clone(); @@ -1224,9 +1283,15 @@ pub mod tests { )); assert_eq!(count.load(Ordering::Relaxed), 1); - assert_eq!(Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); assert_eq!(count.load(Ordering::Relaxed), 2); - assert_eq!(Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); assert_eq!(count.load(Ordering::Relaxed), 2); }) } @@ -1247,7 +1312,7 @@ pub mod tests { shared.terminate_execution(); // allow shutdown - std::thread::sleep(std::time::Duration::from_millis(100)); + std::thread::sleep(std::time::Duration::from_millis(200)); // unless reported otherwise the test should fail after this point tx_clone.send(false).ok(); @@ -1345,7 +1410,7 @@ pub mod tests { #[test] fn overflow_req_async() { - run_in_task(|| { + run_in_task(|cx| { let (mut isolate, dispatch_count) = setup(Mode::OverflowReqAsync); js_check(isolate.execute( "overflow_req_async.js", @@ -1366,14 +1431,17 @@ pub mod tests { "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert_eq!(Async::Ready(()), js_check(isolate.poll())); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); js_check(isolate.execute("check.js", "assert(asyncRecv == 1);")); }); } #[test] fn overflow_res_async() { - run_in_task(|| { + run_in_task(|_cx| { // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We // should optimize this. let (mut isolate, dispatch_count) = setup(Mode::OverflowResAsync); @@ -1404,7 +1472,7 @@ pub mod tests { fn overflow_res_multiple_dispatch_async() { // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We // should optimize this. - run_in_task(|| { + run_in_task(|_cx| { let (mut isolate, dispatch_count) = setup(Mode::OverflowResAsync); js_check(isolate.execute( "overflow_res_multiple_dispatch_async.js", @@ -1434,7 +1502,7 @@ pub mod tests { #[test] fn test_pre_dispatch() { - run_in_task(|| { + run_in_task(|mut cx| { let (mut isolate, _dispatch_count) = setup(Mode::OverflowResAsync); js_check(isolate.execute( "bad_op_id.js", @@ -1448,13 +1516,15 @@ pub mod tests { assert(thrown == "Unknown op id: 100"); "#, )); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { + unreachable!(); + } }); } #[test] fn test_js() { - run_in_task(|| { + run_in_task(|mut cx| { let (mut isolate, _dispatch_count) = setup(Mode::AsyncImmediate); js_check( isolate.execute( @@ -1462,7 +1532,9 @@ pub mod tests { include_str!("shared_queue_test.js"), ), ); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { + unreachable!(); + } }); } diff --git a/core/modules.rs b/core/modules.rs index 85de79cca..9f3434a4f 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -14,21 +14,22 @@ use crate::isolate::SourceCodeInfo; use crate::libdeno::deno_dyn_import_id; use crate::libdeno::deno_mod; use crate::module_specifier::ModuleSpecifier; -use futures::future::loop_fn; -use futures::future::Loop; +use futures::future::FutureExt; use futures::stream::FuturesUnordered; use futures::stream::Stream; -use futures::Async::*; -use futures::Future; -use futures::Poll; +use futures::stream::TryStreamExt; use std::collections::HashMap; use std::collections::HashSet; use std::fmt; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; use std::sync::Mutex; +use std::task::Context; +use std::task::Poll; pub type SourceCodeInfoFuture = - dyn Future<Item = SourceCodeInfo, Error = ErrBox> + Send; + dyn Future<Output = Result<SourceCodeInfo, ErrBox>> + Send; pub trait Loader: Send + Sync { /// Returns an absolute URL. @@ -47,7 +48,7 @@ pub trait Loader: Send + Sync { fn load( &self, module_specifier: &ModuleSpecifier, - ) -> Box<SourceCodeInfoFuture>; + ) -> Pin<Box<SourceCodeInfoFuture>>; } #[derive(Debug, Eq, PartialEq)] @@ -68,16 +69,16 @@ enum State { /// This future is used to implement parallel async module loading without /// complicating the Isolate API. /// TODO: RecursiveLoad desperately needs to be merged with Modules. -pub struct RecursiveLoad<L: Loader> { +pub struct RecursiveLoad<L: Loader + Unpin> { kind: Kind, state: State, loader: L, modules: Arc<Mutex<Modules>>, - pending: FuturesUnordered<Box<SourceCodeInfoFuture>>, + pending: FuturesUnordered<Pin<Box<SourceCodeInfoFuture>>>, is_pending: HashSet<ModuleSpecifier>, } -impl<L: Loader> RecursiveLoad<L> { +impl<L: Loader + Unpin> RecursiveLoad<L> { /// Starts a new parallel load of the given URL of the main module. pub fn main( specifier: &str, @@ -153,7 +154,7 @@ impl<L: Loader> RecursiveLoad<L> { // integrated into one thing. self .pending - .push(Box::new(self.loader.load(&module_specifier))); + .push(self.loader.load(&module_specifier).boxed()); self.state = State::LoadingRoot; Ok(()) @@ -182,7 +183,7 @@ impl<L: Loader> RecursiveLoad<L> { { self .pending - .push(Box::new(self.loader.load(&module_specifier))); + .push(self.loader.load(&module_specifier).boxed()); self.is_pending.insert(module_specifier); } @@ -194,26 +195,24 @@ impl<L: Loader> RecursiveLoad<L> { pub fn get_future( self, isolate: Arc<Mutex<Isolate>>, - ) -> impl Future<Item = deno_mod, Error = ErrBox> { - loop_fn(self, move |load| { - let isolate = isolate.clone(); - load.into_future().map_err(|(e, _)| e).and_then( - move |(event, mut load)| { - Ok(match event.unwrap() { - Event::Fetch(info) => { - let mut isolate = isolate.lock().unwrap(); - load.register(info, &mut isolate)?; - Loop::Continue(load) - } - Event::Instantiate(id) => Loop::Break(id), - }) - }, - ) - }) + ) -> impl Future<Output = Result<deno_mod, ErrBox>> { + async move { + let mut load = self; + loop { + let event = load.try_next().await?; + match event.unwrap() { + Event::Fetch(info) => { + let mut isolate = isolate.lock().unwrap(); + load.register(info, &mut isolate)?; + } + Event::Instantiate(id) => return Ok(id), + } + } + } } } -impl<L: Loader> ImportStream for RecursiveLoad<L> { +impl<L: Loader + Unpin> ImportStream for RecursiveLoad<L> { // TODO: this should not be part of RecursiveLoad. fn register( &mut self, @@ -308,40 +307,45 @@ impl<L: Loader> ImportStream for RecursiveLoad<L> { } } -impl<L: Loader> Stream for RecursiveLoad<L> { - type Item = Event; - type Error = ErrBox; +impl<L: Loader + Unpin> Stream for RecursiveLoad<L> { + type Item = Result<Event, ErrBox>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - Ok(match self.state { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Option<Self::Item>> { + let inner = self.get_mut(); + match inner.state { State::ResolveMain(ref specifier, Some(ref code)) => { - let module_specifier = self.loader.resolve( + let module_specifier = inner.loader.resolve( specifier, ".", true, - self.dyn_import_id().is_some(), + inner.dyn_import_id().is_some(), )?; let info = SourceCodeInfo { code: code.to_owned(), module_url_specified: module_specifier.to_string(), module_url_found: module_specifier.to_string(), }; - self.state = State::LoadingRoot; - Ready(Some(Event::Fetch(info))) + inner.state = State::LoadingRoot; + Poll::Ready(Some(Ok(Event::Fetch(info)))) } State::ResolveMain(..) | State::ResolveImport(..) => { - self.add_root()?; - self.poll()? + if let Err(e) = inner.add_root() { + return Poll::Ready(Some(Err(e))); + } + inner.try_poll_next_unpin(cx) } State::LoadingRoot | State::LoadingImports(..) => { - match self.pending.poll()? { - Ready(None) => unreachable!(), - Ready(Some(info)) => Ready(Some(Event::Fetch(info))), - NotReady => NotReady, + match inner.pending.try_poll_next_unpin(cx)? { + Poll::Ready(None) => unreachable!(), + Poll::Ready(Some(info)) => Poll::Ready(Some(Ok(Event::Fetch(info)))), + Poll::Pending => Poll::Pending, } } - State::Instantiated(id) => Ready(Some(Event::Instantiate(id))), - }) + State::Instantiated(id) => Poll::Ready(Some(Ok(Event::Instantiate(id)))), + } } } @@ -603,9 +607,11 @@ mod tests { use super::*; use crate::isolate::js_check; use crate::isolate::tests::*; - use futures::Async; + use futures::future::FutureExt; + use futures::stream::StreamExt; use std::error::Error; use std::fmt; + use std::future::Future; struct MockLoader { pub loads: Arc<Mutex<Vec<String>>>, @@ -676,27 +682,27 @@ mod tests { } impl Future for DelayedSourceCodeFuture { - type Item = SourceCodeInfo; - type Error = ErrBox; + type Output = Result<SourceCodeInfo, ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, ErrBox> { - self.counter += 1; - if self.url == "file:///never_ready.js" { - return Ok(Async::NotReady); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + inner.counter += 1; + if inner.url == "file:///never_ready.js" { + return Poll::Pending; } - if self.url == "file:///slow.js" && self.counter < 2 { + if inner.url == "file:///slow.js" && inner.counter < 2 { // TODO(ry) Hopefully in the future we can remove current task // notification. See comment above run_in_task. - futures::task::current().notify(); - return Ok(Async::NotReady); + cx.waker().wake_by_ref(); + return Poll::Pending; } - match mock_source_code(&self.url) { - Some(src) => Ok(Async::Ready(SourceCodeInfo { + match mock_source_code(&inner.url) { + Some(src) => Poll::Ready(Ok(SourceCodeInfo { code: src.0.to_owned(), - module_url_specified: self.url.clone(), + module_url_specified: inner.url.clone(), module_url_found: src.1.to_owned(), })), - None => Err(MockError::LoadErr.into()), + None => Poll::Ready(Err(MockError::LoadErr.into())), } } } @@ -733,11 +739,11 @@ mod tests { fn load( &self, module_specifier: &ModuleSpecifier, - ) -> Box<SourceCodeInfoFuture> { + ) -> Pin<Box<SourceCodeInfoFuture>> { let mut loads = self.loads.lock().unwrap(); loads.push(module_specifier.to_string()); let url = module_specifier.to_string(); - Box::new(DelayedSourceCodeFuture { url, counter: 0 }) + DelayedSourceCodeFuture { url, counter: 0 }.boxed() } } @@ -780,7 +786,7 @@ mod tests { #[test] fn test_recursive_load() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let modules = loader.modules.clone(); let modules_ = modules.clone(); @@ -791,12 +797,12 @@ mod tests { RecursiveLoad::main("/a.js", None, loader, modules); let a_id = loop { - match recursive_load.poll() { - Ok(Ready(Some(Event::Fetch(info)))) => { + match recursive_load.try_poll_next_unpin(&mut cx) { + Poll::Ready(Some(Ok(Event::Fetch(info)))) => { let mut isolate = isolate.lock().unwrap(); recursive_load.register(info, &mut isolate).unwrap(); } - Ok(Ready(Some(Event::Instantiate(id)))) => break id, + Poll::Ready(Some(Ok(Event::Instantiate(id)))) => break id, _ => panic!("unexpected result"), }; }; @@ -859,7 +865,7 @@ mod tests { #[test] fn test_circular_load() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let isolate = loader.isolate.clone(); let isolate_ = isolate.clone(); @@ -868,9 +874,10 @@ mod tests { let loads = loader.loads.clone(); let recursive_load = RecursiveLoad::main("/circular1.js", None, loader, modules); - let result = recursive_load.get_future(isolate.clone()).poll(); - assert!(result.is_ok()); - if let Async::Ready(circular1_id) = result.ok().unwrap() { + let mut load_fut = recursive_load.get_future(isolate.clone()).boxed(); + let result = Pin::new(&mut load_fut).poll(&mut cx); + assert!(result.is_ready()); + if let Poll::Ready(Ok(circular1_id)) = result { let mut isolate = isolate_.lock().unwrap(); js_check(isolate.mod_evaluate(circular1_id)); @@ -930,7 +937,7 @@ mod tests { #[test] fn test_redirect_load() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let isolate = loader.isolate.clone(); let isolate_ = isolate.clone(); @@ -939,10 +946,11 @@ mod tests { let loads = loader.loads.clone(); let recursive_load = RecursiveLoad::main("/redirect1.js", None, loader, modules); - let result = recursive_load.get_future(isolate.clone()).poll(); + let mut load_fut = recursive_load.get_future(isolate.clone()).boxed(); + let result = Pin::new(&mut load_fut).poll(&mut cx); println!(">> result {:?}", result); - assert!(result.is_ok()); - if let Async::Ready(redirect1_id) = result.ok().unwrap() { + assert!(result.is_ready()); + if let Poll::Ready(Ok(redirect1_id)) = result { let mut isolate = isolate_.lock().unwrap(); js_check(isolate.mod_evaluate(redirect1_id)); let l = loads.lock().unwrap(); @@ -995,18 +1003,18 @@ mod tests { #[test] fn slow_never_ready_modules() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let isolate = loader.isolate.clone(); let modules = loader.modules.clone(); let loads = loader.loads.clone(); let mut recursive_load = RecursiveLoad::main("/main.js", None, loader, modules) - .get_future(isolate); + .get_future(isolate) + .boxed(); - let result = recursive_load.poll(); - assert!(result.is_ok()); - assert!(result.ok().unwrap().is_not_ready()); + let result = recursive_load.poll_unpin(&mut cx); + assert!(result.is_pending()); // TODO(ry) Arguably the first time we poll only the following modules // should be loaded: @@ -1018,9 +1026,8 @@ mod tests { // run_in_task. for _ in 0..10 { - let result = recursive_load.poll(); - assert!(result.is_ok()); - assert!(result.ok().unwrap().is_not_ready()); + let result = recursive_load.poll_unpin(&mut cx); + assert!(result.is_pending()); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -1045,19 +1052,22 @@ mod tests { #[test] fn loader_disappears_after_error() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let isolate = loader.isolate.clone(); let modules = loader.modules.clone(); let recursive_load = RecursiveLoad::main("/bad_import.js", None, loader, modules); - let result = recursive_load.get_future(isolate).poll(); - assert!(result.is_err()); - let err = result.err().unwrap(); - assert_eq!( - err.downcast_ref::<MockError>().unwrap(), - &MockError::ResolveErr - ); + let mut load_fut = recursive_load.get_future(isolate).boxed(); + let result = load_fut.poll_unpin(&mut cx); + if let Poll::Ready(Err(err)) = result { + assert_eq!( + err.downcast_ref::<MockError>().unwrap(), + &MockError::ResolveErr + ); + } else { + unreachable!(); + } }) } @@ -1072,7 +1082,7 @@ mod tests { #[test] fn recursive_load_main_with_code() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let modules = loader.modules.clone(); let modules_ = modules.clone(); @@ -1090,12 +1100,12 @@ mod tests { ); let main_id = loop { - match recursive_load.poll() { - Ok(Ready(Some(Event::Fetch(info)))) => { + match recursive_load.poll_next_unpin(&mut cx) { + Poll::Ready(Some(Ok(Event::Fetch(info)))) => { let mut isolate = isolate.lock().unwrap(); recursive_load.register(info, &mut isolate).unwrap(); } - Ok(Ready(Some(Event::Instantiate(id)))) => break id, + Poll::Ready(Some(Ok(Event::Instantiate(id)))) => break id, _ => panic!("unexpected result"), }; }; diff --git a/core/ops.rs b/core/ops.rs index cce454348..3a4f51b83 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -3,13 +3,15 @@ pub use crate::libdeno::OpId; use crate::PinnedBuf; use futures::Future; use std::collections::HashMap; +use std::pin::Pin; pub type Buf = Box<[u8]>; -pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>; +pub type OpAsyncFuture<E> = + Pin<Box<dyn Future<Output = Result<Buf, E>> + Send>>; pub(crate) type PendingOpFuture = - Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>; + Pin<Box<dyn Future<Output = Result<(OpId, Buf), CoreError>> + Send>>; pub type OpResult<E> = Result<Op<E>, E>; |