summaryrefslogtreecommitdiff
path: root/cli/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/worker.rs')
-rw-r--r--cli/worker.rs125
1 files changed, 75 insertions, 50 deletions
diff --git a/cli/worker.rs b/cli/worker.rs
index 5d4675d00..aca822888 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -8,15 +8,18 @@ use deno::ErrBox;
use deno::ModuleSpecifier;
use deno::RecursiveLoad;
use deno::StartupData;
-use futures::Async;
-use futures::Future;
-use futures::Poll;
-use futures::Sink;
-use futures::Stream;
+use futures::channel::mpsc;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
use std::env;
+use std::future::Future;
+use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
-use tokio::sync::mpsc;
+use std::task::Context;
+use std::task::Poll;
use url::Url;
/// Wraps mpsc channels so they can be referenced
@@ -115,7 +118,7 @@ impl Worker {
module_specifier: &ModuleSpecifier,
maybe_code: Option<String>,
is_prefetch: bool,
- ) -> impl Future<Item = (), Error = ErrBox> {
+ ) -> impl Future<Output = Result<(), ErrBox>> {
let worker = self.clone();
let loader = self.state.clone();
let isolate = self.isolate.clone();
@@ -127,13 +130,13 @@ impl Worker {
modules,
)
.get_future(isolate);
- recursive_load.and_then(move |id| -> Result<(), ErrBox> {
+ recursive_load.and_then(move |id| {
worker.state.global_state.progress.done();
if is_prefetch {
- Ok(())
+ futures::future::ok(())
} else {
let mut isolate = worker.isolate.lock().unwrap();
- isolate.mod_evaluate(id)
+ futures::future::ready(isolate.mod_evaluate(id))
}
})
}
@@ -141,10 +144,17 @@ impl Worker {
/// Post message to worker as a host.
///
/// This method blocks current thread.
- pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> {
- let mut channels = self.external_channels.lock().unwrap();
- let sender = &mut channels.sender;
- sender.send(buf).wait().map(|_| ()).map_err(ErrBox::from)
+ pub fn post_message(
+ self: &Self,
+ buf: Buf,
+ ) -> impl Future<Output = Result<(), ErrBox>> {
+ let channels = self.external_channels.lock().unwrap();
+ let mut sender = channels.sender.clone();
+ async move {
+ let result = sender.send(buf).map_err(ErrBox::from).await;
+ drop(sender);
+ result
+ }
}
/// Get message from worker as a host.
@@ -156,12 +166,12 @@ impl Worker {
}
impl Future for Worker {
- type Item = ();
- type Error = ErrBox;
+ type Output = Result<(), ErrBox>;
- fn poll(&mut self) -> Result<Async<()>, ErrBox> {
- let mut isolate = self.isolate.lock().unwrap();
- isolate.poll()
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut isolate = inner.isolate.lock().unwrap();
+ isolate.poll_unpin(cx)
}
}
@@ -173,12 +183,14 @@ pub struct WorkerReceiver {
}
impl Future for WorkerReceiver {
- type Item = Option<Buf>;
- type Error = ErrBox;
+ type Output = Result<Option<Buf>, ErrBox>;
- fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut channels = self.channels.lock().unwrap();
- channels.receiver.poll().map_err(ErrBox::from)
+ match channels.receiver.poll_next_unpin(cx) {
+ Poll::Ready(v) => Poll::Ready(Ok(v)),
+ Poll::Pending => Poll::Pending,
+ }
}
}
@@ -192,7 +204,6 @@ mod tests {
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::tokio_util;
- use futures::future::lazy;
use std::sync::atomic::Ordering;
#[test]
@@ -220,7 +231,7 @@ mod tests {
)
.unwrap();
let state_ = state.clone();
- tokio_util::run(lazy(move || {
+ tokio_util::run(async move {
let mut worker =
Worker::new("TEST".to_string(), StartupData::None, state, ext);
worker
@@ -231,7 +242,8 @@ mod tests {
}
tokio_util::panic_on_error(worker)
})
- }));
+ .await
+ });
let metrics = &state_.metrics;
assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
@@ -261,7 +273,7 @@ mod tests {
)
.unwrap();
let state_ = state.clone();
- tokio_util::run(lazy(move || {
+ tokio_util::run(async move {
let mut worker =
Worker::new("TEST".to_string(), StartupData::None, state, ext);
worker
@@ -272,7 +284,8 @@ mod tests {
}
tokio_util::panic_on_error(worker)
})
- }));
+ .await
+ });
let metrics = &state_.metrics;
// TODO assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
@@ -306,7 +319,7 @@ mod tests {
.unwrap();
let global_state_ = global_state.clone();
let state_ = state.clone();
- tokio_util::run(lazy(move || {
+ tokio_util::run(async move {
let mut worker = Worker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
@@ -322,7 +335,8 @@ mod tests {
}
tokio_util::panic_on_error(worker)
})
- }));
+ .await
+ });
assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3);
// Check that we've only invoked the compiler once.
@@ -371,19 +385,22 @@ mod tests {
let worker_ = worker.clone();
- tokio::spawn(lazy(move || {
- worker.then(move |r| -> Result<(), ()> {
- r.unwrap();
- Ok(())
- })
- }));
+ tokio::spawn(
+ worker
+ .then(move |r| {
+ r.unwrap();
+ futures::future::ok(())
+ })
+ .compat(),
+ );
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = worker_.post_message(msg);
+ let r = futures::executor::block_on(worker_.post_message(msg).boxed());
assert!(r.is_ok());
- let maybe_msg = worker_.get_message().wait().unwrap();
+ let maybe_msg =
+ futures::executor::block_on(worker_.get_message()).unwrap();
assert!(maybe_msg.is_some());
// Check if message received is [1, 2, 3] in json
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
@@ -392,7 +409,7 @@ mod tests {
.to_string()
.into_boxed_str()
.into_boxed_bytes();
- let r = worker_.post_message(msg);
+ let r = futures::executor::block_on(worker_.post_message(msg).boxed());
assert!(r.is_ok());
})
}
@@ -407,21 +424,25 @@ mod tests {
let worker_ = worker.clone();
let worker_future = worker
- .then(move |r| -> Result<(), ()> {
+ .then(move |r| {
println!("workers.rs after resource close");
r.unwrap();
- Ok(())
+ futures::future::ok(())
})
.shared();
let worker_future_ = worker_future.clone();
- tokio::spawn(lazy(move || worker_future_.then(|_| Ok(()))));
+ tokio::spawn(
+ worker_future_
+ .then(|_: Result<(), ()>| futures::future::ok(()))
+ .compat(),
+ );
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = worker_.post_message(msg);
+ let r = futures::executor::block_on(worker_.post_message(msg));
assert!(r.is_ok());
- worker_future.wait().unwrap();
+ futures::executor::block_on(worker_future).unwrap();
})
}
@@ -432,9 +453,11 @@ mod tests {
let mut worker = create_test_worker();
let module_specifier =
ModuleSpecifier::resolve_url_or_path("does-not-exist").unwrap();
- let result = worker
- .execute_mod_async(&module_specifier, None, false)
- .wait();
+ let result = futures::executor::block_on(worker.execute_mod_async(
+ &module_specifier,
+ None,
+ false,
+ ));
assert!(result.is_err());
})
}
@@ -452,9 +475,11 @@ mod tests {
.to_owned();
let module_specifier =
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
- let result = worker
- .execute_mod_async(&module_specifier, None, false)
- .wait();
+ let result = futures::executor::block_on(worker.execute_mod_async(
+ &module_specifier,
+ None,
+ false,
+ ));
assert!(result.is_ok());
})
}