summaryrefslogtreecommitdiff
path: root/core/modules.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/modules.rs')
-rw-r--r--core/modules.rs198
1 files changed, 104 insertions, 94 deletions
diff --git a/core/modules.rs b/core/modules.rs
index 85de79cca..9f3434a4f 100644
--- a/core/modules.rs
+++ b/core/modules.rs
@@ -14,21 +14,22 @@ use crate::isolate::SourceCodeInfo;
use crate::libdeno::deno_dyn_import_id;
use crate::libdeno::deno_mod;
use crate::module_specifier::ModuleSpecifier;
-use futures::future::loop_fn;
-use futures::future::Loop;
+use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::Stream;
-use futures::Async::*;
-use futures::Future;
-use futures::Poll;
+use futures::stream::TryStreamExt;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
+use std::task::Context;
+use std::task::Poll;
pub type SourceCodeInfoFuture =
- dyn Future<Item = SourceCodeInfo, Error = ErrBox> + Send;
+ dyn Future<Output = Result<SourceCodeInfo, ErrBox>> + Send;
pub trait Loader: Send + Sync {
/// Returns an absolute URL.
@@ -47,7 +48,7 @@ pub trait Loader: Send + Sync {
fn load(
&self,
module_specifier: &ModuleSpecifier,
- ) -> Box<SourceCodeInfoFuture>;
+ ) -> Pin<Box<SourceCodeInfoFuture>>;
}
#[derive(Debug, Eq, PartialEq)]
@@ -68,16 +69,16 @@ enum State {
/// This future is used to implement parallel async module loading without
/// complicating the Isolate API.
/// TODO: RecursiveLoad desperately needs to be merged with Modules.
-pub struct RecursiveLoad<L: Loader> {
+pub struct RecursiveLoad<L: Loader + Unpin> {
kind: Kind,
state: State,
loader: L,
modules: Arc<Mutex<Modules>>,
- pending: FuturesUnordered<Box<SourceCodeInfoFuture>>,
+ pending: FuturesUnordered<Pin<Box<SourceCodeInfoFuture>>>,
is_pending: HashSet<ModuleSpecifier>,
}
-impl<L: Loader> RecursiveLoad<L> {
+impl<L: Loader + Unpin> RecursiveLoad<L> {
/// Starts a new parallel load of the given URL of the main module.
pub fn main(
specifier: &str,
@@ -153,7 +154,7 @@ impl<L: Loader> RecursiveLoad<L> {
// integrated into one thing.
self
.pending
- .push(Box::new(self.loader.load(&module_specifier)));
+ .push(self.loader.load(&module_specifier).boxed());
self.state = State::LoadingRoot;
Ok(())
@@ -182,7 +183,7 @@ impl<L: Loader> RecursiveLoad<L> {
{
self
.pending
- .push(Box::new(self.loader.load(&module_specifier)));
+ .push(self.loader.load(&module_specifier).boxed());
self.is_pending.insert(module_specifier);
}
@@ -194,26 +195,24 @@ impl<L: Loader> RecursiveLoad<L> {
pub fn get_future(
self,
isolate: Arc<Mutex<Isolate>>,
- ) -> impl Future<Item = deno_mod, Error = ErrBox> {
- loop_fn(self, move |load| {
- let isolate = isolate.clone();
- load.into_future().map_err(|(e, _)| e).and_then(
- move |(event, mut load)| {
- Ok(match event.unwrap() {
- Event::Fetch(info) => {
- let mut isolate = isolate.lock().unwrap();
- load.register(info, &mut isolate)?;
- Loop::Continue(load)
- }
- Event::Instantiate(id) => Loop::Break(id),
- })
- },
- )
- })
+ ) -> impl Future<Output = Result<deno_mod, ErrBox>> {
+ async move {
+ let mut load = self;
+ loop {
+ let event = load.try_next().await?;
+ match event.unwrap() {
+ Event::Fetch(info) => {
+ let mut isolate = isolate.lock().unwrap();
+ load.register(info, &mut isolate)?;
+ }
+ Event::Instantiate(id) => return Ok(id),
+ }
+ }
+ }
}
}
-impl<L: Loader> ImportStream for RecursiveLoad<L> {
+impl<L: Loader + Unpin> ImportStream for RecursiveLoad<L> {
// TODO: this should not be part of RecursiveLoad.
fn register(
&mut self,
@@ -308,40 +307,45 @@ impl<L: Loader> ImportStream for RecursiveLoad<L> {
}
}
-impl<L: Loader> Stream for RecursiveLoad<L> {
- type Item = Event;
- type Error = ErrBox;
+impl<L: Loader + Unpin> Stream for RecursiveLoad<L> {
+ type Item = Result<Event, ErrBox>;
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- Ok(match self.state {
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Option<Self::Item>> {
+ let inner = self.get_mut();
+ match inner.state {
State::ResolveMain(ref specifier, Some(ref code)) => {
- let module_specifier = self.loader.resolve(
+ let module_specifier = inner.loader.resolve(
specifier,
".",
true,
- self.dyn_import_id().is_some(),
+ inner.dyn_import_id().is_some(),
)?;
let info = SourceCodeInfo {
code: code.to_owned(),
module_url_specified: module_specifier.to_string(),
module_url_found: module_specifier.to_string(),
};
- self.state = State::LoadingRoot;
- Ready(Some(Event::Fetch(info)))
+ inner.state = State::LoadingRoot;
+ Poll::Ready(Some(Ok(Event::Fetch(info))))
}
State::ResolveMain(..) | State::ResolveImport(..) => {
- self.add_root()?;
- self.poll()?
+ if let Err(e) = inner.add_root() {
+ return Poll::Ready(Some(Err(e)));
+ }
+ inner.try_poll_next_unpin(cx)
}
State::LoadingRoot | State::LoadingImports(..) => {
- match self.pending.poll()? {
- Ready(None) => unreachable!(),
- Ready(Some(info)) => Ready(Some(Event::Fetch(info))),
- NotReady => NotReady,
+ match inner.pending.try_poll_next_unpin(cx)? {
+ Poll::Ready(None) => unreachable!(),
+ Poll::Ready(Some(info)) => Poll::Ready(Some(Ok(Event::Fetch(info)))),
+ Poll::Pending => Poll::Pending,
}
}
- State::Instantiated(id) => Ready(Some(Event::Instantiate(id))),
- })
+ State::Instantiated(id) => Poll::Ready(Some(Ok(Event::Instantiate(id)))),
+ }
}
}
@@ -603,9 +607,11 @@ mod tests {
use super::*;
use crate::isolate::js_check;
use crate::isolate::tests::*;
- use futures::Async;
+ use futures::future::FutureExt;
+ use futures::stream::StreamExt;
use std::error::Error;
use std::fmt;
+ use std::future::Future;
struct MockLoader {
pub loads: Arc<Mutex<Vec<String>>>,
@@ -676,27 +682,27 @@ mod tests {
}
impl Future for DelayedSourceCodeFuture {
- type Item = SourceCodeInfo;
- type Error = ErrBox;
+ type Output = Result<SourceCodeInfo, ErrBox>;
- fn poll(&mut self) -> Poll<Self::Item, ErrBox> {
- self.counter += 1;
- if self.url == "file:///never_ready.js" {
- return Ok(Async::NotReady);
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ inner.counter += 1;
+ if inner.url == "file:///never_ready.js" {
+ return Poll::Pending;
}
- if self.url == "file:///slow.js" && self.counter < 2 {
+ if inner.url == "file:///slow.js" && inner.counter < 2 {
// TODO(ry) Hopefully in the future we can remove current task
// notification. See comment above run_in_task.
- futures::task::current().notify();
- return Ok(Async::NotReady);
+ cx.waker().wake_by_ref();
+ return Poll::Pending;
}
- match mock_source_code(&self.url) {
- Some(src) => Ok(Async::Ready(SourceCodeInfo {
+ match mock_source_code(&inner.url) {
+ Some(src) => Poll::Ready(Ok(SourceCodeInfo {
code: src.0.to_owned(),
- module_url_specified: self.url.clone(),
+ module_url_specified: inner.url.clone(),
module_url_found: src.1.to_owned(),
})),
- None => Err(MockError::LoadErr.into()),
+ None => Poll::Ready(Err(MockError::LoadErr.into())),
}
}
}
@@ -733,11 +739,11 @@ mod tests {
fn load(
&self,
module_specifier: &ModuleSpecifier,
- ) -> Box<SourceCodeInfoFuture> {
+ ) -> Pin<Box<SourceCodeInfoFuture>> {
let mut loads = self.loads.lock().unwrap();
loads.push(module_specifier.to_string());
let url = module_specifier.to_string();
- Box::new(DelayedSourceCodeFuture { url, counter: 0 })
+ DelayedSourceCodeFuture { url, counter: 0 }.boxed()
}
}
@@ -780,7 +786,7 @@ mod tests {
#[test]
fn test_recursive_load() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let modules = loader.modules.clone();
let modules_ = modules.clone();
@@ -791,12 +797,12 @@ mod tests {
RecursiveLoad::main("/a.js", None, loader, modules);
let a_id = loop {
- match recursive_load.poll() {
- Ok(Ready(Some(Event::Fetch(info)))) => {
+ match recursive_load.try_poll_next_unpin(&mut cx) {
+ Poll::Ready(Some(Ok(Event::Fetch(info)))) => {
let mut isolate = isolate.lock().unwrap();
recursive_load.register(info, &mut isolate).unwrap();
}
- Ok(Ready(Some(Event::Instantiate(id)))) => break id,
+ Poll::Ready(Some(Ok(Event::Instantiate(id)))) => break id,
_ => panic!("unexpected result"),
};
};
@@ -859,7 +865,7 @@ mod tests {
#[test]
fn test_circular_load() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let isolate = loader.isolate.clone();
let isolate_ = isolate.clone();
@@ -868,9 +874,10 @@ mod tests {
let loads = loader.loads.clone();
let recursive_load =
RecursiveLoad::main("/circular1.js", None, loader, modules);
- let result = recursive_load.get_future(isolate.clone()).poll();
- assert!(result.is_ok());
- if let Async::Ready(circular1_id) = result.ok().unwrap() {
+ let mut load_fut = recursive_load.get_future(isolate.clone()).boxed();
+ let result = Pin::new(&mut load_fut).poll(&mut cx);
+ assert!(result.is_ready());
+ if let Poll::Ready(Ok(circular1_id)) = result {
let mut isolate = isolate_.lock().unwrap();
js_check(isolate.mod_evaluate(circular1_id));
@@ -930,7 +937,7 @@ mod tests {
#[test]
fn test_redirect_load() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let isolate = loader.isolate.clone();
let isolate_ = isolate.clone();
@@ -939,10 +946,11 @@ mod tests {
let loads = loader.loads.clone();
let recursive_load =
RecursiveLoad::main("/redirect1.js", None, loader, modules);
- let result = recursive_load.get_future(isolate.clone()).poll();
+ let mut load_fut = recursive_load.get_future(isolate.clone()).boxed();
+ let result = Pin::new(&mut load_fut).poll(&mut cx);
println!(">> result {:?}", result);
- assert!(result.is_ok());
- if let Async::Ready(redirect1_id) = result.ok().unwrap() {
+ assert!(result.is_ready());
+ if let Poll::Ready(Ok(redirect1_id)) = result {
let mut isolate = isolate_.lock().unwrap();
js_check(isolate.mod_evaluate(redirect1_id));
let l = loads.lock().unwrap();
@@ -995,18 +1003,18 @@ mod tests {
#[test]
fn slow_never_ready_modules() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let isolate = loader.isolate.clone();
let modules = loader.modules.clone();
let loads = loader.loads.clone();
let mut recursive_load =
RecursiveLoad::main("/main.js", None, loader, modules)
- .get_future(isolate);
+ .get_future(isolate)
+ .boxed();
- let result = recursive_load.poll();
- assert!(result.is_ok());
- assert!(result.ok().unwrap().is_not_ready());
+ let result = recursive_load.poll_unpin(&mut cx);
+ assert!(result.is_pending());
// TODO(ry) Arguably the first time we poll only the following modules
// should be loaded:
@@ -1018,9 +1026,8 @@ mod tests {
// run_in_task.
for _ in 0..10 {
- let result = recursive_load.poll();
- assert!(result.is_ok());
- assert!(result.ok().unwrap().is_not_ready());
+ let result = recursive_load.poll_unpin(&mut cx);
+ assert!(result.is_pending());
let l = loads.lock().unwrap();
assert_eq!(
l.to_vec(),
@@ -1045,19 +1052,22 @@ mod tests {
#[test]
fn loader_disappears_after_error() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let isolate = loader.isolate.clone();
let modules = loader.modules.clone();
let recursive_load =
RecursiveLoad::main("/bad_import.js", None, loader, modules);
- let result = recursive_load.get_future(isolate).poll();
- assert!(result.is_err());
- let err = result.err().unwrap();
- assert_eq!(
- err.downcast_ref::<MockError>().unwrap(),
- &MockError::ResolveErr
- );
+ let mut load_fut = recursive_load.get_future(isolate).boxed();
+ let result = load_fut.poll_unpin(&mut cx);
+ if let Poll::Ready(Err(err)) = result {
+ assert_eq!(
+ err.downcast_ref::<MockError>().unwrap(),
+ &MockError::ResolveErr
+ );
+ } else {
+ unreachable!();
+ }
})
}
@@ -1072,7 +1082,7 @@ mod tests {
#[test]
fn recursive_load_main_with_code() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let modules = loader.modules.clone();
let modules_ = modules.clone();
@@ -1090,12 +1100,12 @@ mod tests {
);
let main_id = loop {
- match recursive_load.poll() {
- Ok(Ready(Some(Event::Fetch(info)))) => {
+ match recursive_load.poll_next_unpin(&mut cx) {
+ Poll::Ready(Some(Ok(Event::Fetch(info)))) => {
let mut isolate = isolate.lock().unwrap();
recursive_load.register(info, &mut isolate).unwrap();
}
- Ok(Ready(Some(Event::Instantiate(id)))) => break id,
+ Poll::Ready(Some(Ok(Event::Instantiate(id)))) => break id,
_ => panic!("unexpected result"),
};
};