diff options
Diffstat (limited to 'core/modules.rs')
-rw-r--r-- | core/modules.rs | 198 |
1 files changed, 104 insertions, 94 deletions
diff --git a/core/modules.rs b/core/modules.rs index 85de79cca..9f3434a4f 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -14,21 +14,22 @@ use crate::isolate::SourceCodeInfo; use crate::libdeno::deno_dyn_import_id; use crate::libdeno::deno_mod; use crate::module_specifier::ModuleSpecifier; -use futures::future::loop_fn; -use futures::future::Loop; +use futures::future::FutureExt; use futures::stream::FuturesUnordered; use futures::stream::Stream; -use futures::Async::*; -use futures::Future; -use futures::Poll; +use futures::stream::TryStreamExt; use std::collections::HashMap; use std::collections::HashSet; use std::fmt; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; use std::sync::Mutex; +use std::task::Context; +use std::task::Poll; pub type SourceCodeInfoFuture = - dyn Future<Item = SourceCodeInfo, Error = ErrBox> + Send; + dyn Future<Output = Result<SourceCodeInfo, ErrBox>> + Send; pub trait Loader: Send + Sync { /// Returns an absolute URL. @@ -47,7 +48,7 @@ pub trait Loader: Send + Sync { fn load( &self, module_specifier: &ModuleSpecifier, - ) -> Box<SourceCodeInfoFuture>; + ) -> Pin<Box<SourceCodeInfoFuture>>; } #[derive(Debug, Eq, PartialEq)] @@ -68,16 +69,16 @@ enum State { /// This future is used to implement parallel async module loading without /// complicating the Isolate API. /// TODO: RecursiveLoad desperately needs to be merged with Modules. -pub struct RecursiveLoad<L: Loader> { +pub struct RecursiveLoad<L: Loader + Unpin> { kind: Kind, state: State, loader: L, modules: Arc<Mutex<Modules>>, - pending: FuturesUnordered<Box<SourceCodeInfoFuture>>, + pending: FuturesUnordered<Pin<Box<SourceCodeInfoFuture>>>, is_pending: HashSet<ModuleSpecifier>, } -impl<L: Loader> RecursiveLoad<L> { +impl<L: Loader + Unpin> RecursiveLoad<L> { /// Starts a new parallel load of the given URL of the main module. pub fn main( specifier: &str, @@ -153,7 +154,7 @@ impl<L: Loader> RecursiveLoad<L> { // integrated into one thing. self .pending - .push(Box::new(self.loader.load(&module_specifier))); + .push(self.loader.load(&module_specifier).boxed()); self.state = State::LoadingRoot; Ok(()) @@ -182,7 +183,7 @@ impl<L: Loader> RecursiveLoad<L> { { self .pending - .push(Box::new(self.loader.load(&module_specifier))); + .push(self.loader.load(&module_specifier).boxed()); self.is_pending.insert(module_specifier); } @@ -194,26 +195,24 @@ impl<L: Loader> RecursiveLoad<L> { pub fn get_future( self, isolate: Arc<Mutex<Isolate>>, - ) -> impl Future<Item = deno_mod, Error = ErrBox> { - loop_fn(self, move |load| { - let isolate = isolate.clone(); - load.into_future().map_err(|(e, _)| e).and_then( - move |(event, mut load)| { - Ok(match event.unwrap() { - Event::Fetch(info) => { - let mut isolate = isolate.lock().unwrap(); - load.register(info, &mut isolate)?; - Loop::Continue(load) - } - Event::Instantiate(id) => Loop::Break(id), - }) - }, - ) - }) + ) -> impl Future<Output = Result<deno_mod, ErrBox>> { + async move { + let mut load = self; + loop { + let event = load.try_next().await?; + match event.unwrap() { + Event::Fetch(info) => { + let mut isolate = isolate.lock().unwrap(); + load.register(info, &mut isolate)?; + } + Event::Instantiate(id) => return Ok(id), + } + } + } } } -impl<L: Loader> ImportStream for RecursiveLoad<L> { +impl<L: Loader + Unpin> ImportStream for RecursiveLoad<L> { // TODO: this should not be part of RecursiveLoad. fn register( &mut self, @@ -308,40 +307,45 @@ impl<L: Loader> ImportStream for RecursiveLoad<L> { } } -impl<L: Loader> Stream for RecursiveLoad<L> { - type Item = Event; - type Error = ErrBox; +impl<L: Loader + Unpin> Stream for RecursiveLoad<L> { + type Item = Result<Event, ErrBox>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - Ok(match self.state { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Option<Self::Item>> { + let inner = self.get_mut(); + match inner.state { State::ResolveMain(ref specifier, Some(ref code)) => { - let module_specifier = self.loader.resolve( + let module_specifier = inner.loader.resolve( specifier, ".", true, - self.dyn_import_id().is_some(), + inner.dyn_import_id().is_some(), )?; let info = SourceCodeInfo { code: code.to_owned(), module_url_specified: module_specifier.to_string(), module_url_found: module_specifier.to_string(), }; - self.state = State::LoadingRoot; - Ready(Some(Event::Fetch(info))) + inner.state = State::LoadingRoot; + Poll::Ready(Some(Ok(Event::Fetch(info)))) } State::ResolveMain(..) | State::ResolveImport(..) => { - self.add_root()?; - self.poll()? + if let Err(e) = inner.add_root() { + return Poll::Ready(Some(Err(e))); + } + inner.try_poll_next_unpin(cx) } State::LoadingRoot | State::LoadingImports(..) => { - match self.pending.poll()? { - Ready(None) => unreachable!(), - Ready(Some(info)) => Ready(Some(Event::Fetch(info))), - NotReady => NotReady, + match inner.pending.try_poll_next_unpin(cx)? { + Poll::Ready(None) => unreachable!(), + Poll::Ready(Some(info)) => Poll::Ready(Some(Ok(Event::Fetch(info)))), + Poll::Pending => Poll::Pending, } } - State::Instantiated(id) => Ready(Some(Event::Instantiate(id))), - }) + State::Instantiated(id) => Poll::Ready(Some(Ok(Event::Instantiate(id)))), + } } } @@ -603,9 +607,11 @@ mod tests { use super::*; use crate::isolate::js_check; use crate::isolate::tests::*; - use futures::Async; + use futures::future::FutureExt; + use futures::stream::StreamExt; use std::error::Error; use std::fmt; + use std::future::Future; struct MockLoader { pub loads: Arc<Mutex<Vec<String>>>, @@ -676,27 +682,27 @@ mod tests { } impl Future for DelayedSourceCodeFuture { - type Item = SourceCodeInfo; - type Error = ErrBox; + type Output = Result<SourceCodeInfo, ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, ErrBox> { - self.counter += 1; - if self.url == "file:///never_ready.js" { - return Ok(Async::NotReady); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + inner.counter += 1; + if inner.url == "file:///never_ready.js" { + return Poll::Pending; } - if self.url == "file:///slow.js" && self.counter < 2 { + if inner.url == "file:///slow.js" && inner.counter < 2 { // TODO(ry) Hopefully in the future we can remove current task // notification. See comment above run_in_task. - futures::task::current().notify(); - return Ok(Async::NotReady); + cx.waker().wake_by_ref(); + return Poll::Pending; } - match mock_source_code(&self.url) { - Some(src) => Ok(Async::Ready(SourceCodeInfo { + match mock_source_code(&inner.url) { + Some(src) => Poll::Ready(Ok(SourceCodeInfo { code: src.0.to_owned(), - module_url_specified: self.url.clone(), + module_url_specified: inner.url.clone(), module_url_found: src.1.to_owned(), })), - None => Err(MockError::LoadErr.into()), + None => Poll::Ready(Err(MockError::LoadErr.into())), } } } @@ -733,11 +739,11 @@ mod tests { fn load( &self, module_specifier: &ModuleSpecifier, - ) -> Box<SourceCodeInfoFuture> { + ) -> Pin<Box<SourceCodeInfoFuture>> { let mut loads = self.loads.lock().unwrap(); loads.push(module_specifier.to_string()); let url = module_specifier.to_string(); - Box::new(DelayedSourceCodeFuture { url, counter: 0 }) + DelayedSourceCodeFuture { url, counter: 0 }.boxed() } } @@ -780,7 +786,7 @@ mod tests { #[test] fn test_recursive_load() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let modules = loader.modules.clone(); let modules_ = modules.clone(); @@ -791,12 +797,12 @@ mod tests { RecursiveLoad::main("/a.js", None, loader, modules); let a_id = loop { - match recursive_load.poll() { - Ok(Ready(Some(Event::Fetch(info)))) => { + match recursive_load.try_poll_next_unpin(&mut cx) { + Poll::Ready(Some(Ok(Event::Fetch(info)))) => { let mut isolate = isolate.lock().unwrap(); recursive_load.register(info, &mut isolate).unwrap(); } - Ok(Ready(Some(Event::Instantiate(id)))) => break id, + Poll::Ready(Some(Ok(Event::Instantiate(id)))) => break id, _ => panic!("unexpected result"), }; }; @@ -859,7 +865,7 @@ mod tests { #[test] fn test_circular_load() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let isolate = loader.isolate.clone(); let isolate_ = isolate.clone(); @@ -868,9 +874,10 @@ mod tests { let loads = loader.loads.clone(); let recursive_load = RecursiveLoad::main("/circular1.js", None, loader, modules); - let result = recursive_load.get_future(isolate.clone()).poll(); - assert!(result.is_ok()); - if let Async::Ready(circular1_id) = result.ok().unwrap() { + let mut load_fut = recursive_load.get_future(isolate.clone()).boxed(); + let result = Pin::new(&mut load_fut).poll(&mut cx); + assert!(result.is_ready()); + if let Poll::Ready(Ok(circular1_id)) = result { let mut isolate = isolate_.lock().unwrap(); js_check(isolate.mod_evaluate(circular1_id)); @@ -930,7 +937,7 @@ mod tests { #[test] fn test_redirect_load() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let isolate = loader.isolate.clone(); let isolate_ = isolate.clone(); @@ -939,10 +946,11 @@ mod tests { let loads = loader.loads.clone(); let recursive_load = RecursiveLoad::main("/redirect1.js", None, loader, modules); - let result = recursive_load.get_future(isolate.clone()).poll(); + let mut load_fut = recursive_load.get_future(isolate.clone()).boxed(); + let result = Pin::new(&mut load_fut).poll(&mut cx); println!(">> result {:?}", result); - assert!(result.is_ok()); - if let Async::Ready(redirect1_id) = result.ok().unwrap() { + assert!(result.is_ready()); + if let Poll::Ready(Ok(redirect1_id)) = result { let mut isolate = isolate_.lock().unwrap(); js_check(isolate.mod_evaluate(redirect1_id)); let l = loads.lock().unwrap(); @@ -995,18 +1003,18 @@ mod tests { #[test] fn slow_never_ready_modules() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let isolate = loader.isolate.clone(); let modules = loader.modules.clone(); let loads = loader.loads.clone(); let mut recursive_load = RecursiveLoad::main("/main.js", None, loader, modules) - .get_future(isolate); + .get_future(isolate) + .boxed(); - let result = recursive_load.poll(); - assert!(result.is_ok()); - assert!(result.ok().unwrap().is_not_ready()); + let result = recursive_load.poll_unpin(&mut cx); + assert!(result.is_pending()); // TODO(ry) Arguably the first time we poll only the following modules // should be loaded: @@ -1018,9 +1026,8 @@ mod tests { // run_in_task. for _ in 0..10 { - let result = recursive_load.poll(); - assert!(result.is_ok()); - assert!(result.ok().unwrap().is_not_ready()); + let result = recursive_load.poll_unpin(&mut cx); + assert!(result.is_pending()); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -1045,19 +1052,22 @@ mod tests { #[test] fn loader_disappears_after_error() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let isolate = loader.isolate.clone(); let modules = loader.modules.clone(); let recursive_load = RecursiveLoad::main("/bad_import.js", None, loader, modules); - let result = recursive_load.get_future(isolate).poll(); - assert!(result.is_err()); - let err = result.err().unwrap(); - assert_eq!( - err.downcast_ref::<MockError>().unwrap(), - &MockError::ResolveErr - ); + let mut load_fut = recursive_load.get_future(isolate).boxed(); + let result = load_fut.poll_unpin(&mut cx); + if let Poll::Ready(Err(err)) = result { + assert_eq!( + err.downcast_ref::<MockError>().unwrap(), + &MockError::ResolveErr + ); + } else { + unreachable!(); + } }) } @@ -1072,7 +1082,7 @@ mod tests { #[test] fn recursive_load_main_with_code() { - run_in_task(|| { + run_in_task(|mut cx| { let loader = MockLoader::new(); let modules = loader.modules.clone(); let modules_ = modules.clone(); @@ -1090,12 +1100,12 @@ mod tests { ); let main_id = loop { - match recursive_load.poll() { - Ok(Ready(Some(Event::Fetch(info)))) => { + match recursive_load.poll_next_unpin(&mut cx) { + Poll::Ready(Some(Ok(Event::Fetch(info)))) => { let mut isolate = isolate.lock().unwrap(); recursive_load.register(info, &mut isolate).unwrap(); } - Ok(Ready(Some(Event::Instantiate(id)))) => break id, + Poll::Ready(Some(Ok(Event::Instantiate(id)))) => break id, _ => panic!("unexpected result"), }; }; |