diff options
Diffstat (limited to 'cli/worker.rs')
-rw-r--r-- | cli/worker.rs | 125 |
1 files changed, 75 insertions, 50 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index 5d4675d00..aca822888 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -8,15 +8,18 @@ use deno::ErrBox; use deno::ModuleSpecifier; use deno::RecursiveLoad; use deno::StartupData; -use futures::Async; -use futures::Future; -use futures::Poll; -use futures::Sink; -use futures::Stream; +use futures::channel::mpsc; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::sink::SinkExt; +use futures::stream::StreamExt; use std::env; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; use std::sync::Mutex; -use tokio::sync::mpsc; +use std::task::Context; +use std::task::Poll; use url::Url; /// Wraps mpsc channels so they can be referenced @@ -115,7 +118,7 @@ impl Worker { module_specifier: &ModuleSpecifier, maybe_code: Option<String>, is_prefetch: bool, - ) -> impl Future<Item = (), Error = ErrBox> { + ) -> impl Future<Output = Result<(), ErrBox>> { let worker = self.clone(); let loader = self.state.clone(); let isolate = self.isolate.clone(); @@ -127,13 +130,13 @@ impl Worker { modules, ) .get_future(isolate); - recursive_load.and_then(move |id| -> Result<(), ErrBox> { + recursive_load.and_then(move |id| { worker.state.global_state.progress.done(); if is_prefetch { - Ok(()) + futures::future::ok(()) } else { let mut isolate = worker.isolate.lock().unwrap(); - isolate.mod_evaluate(id) + futures::future::ready(isolate.mod_evaluate(id)) } }) } @@ -141,10 +144,17 @@ impl Worker { /// Post message to worker as a host. /// /// This method blocks current thread. - pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> { - let mut channels = self.external_channels.lock().unwrap(); - let sender = &mut channels.sender; - sender.send(buf).wait().map(|_| ()).map_err(ErrBox::from) + pub fn post_message( + self: &Self, + buf: Buf, + ) -> impl Future<Output = Result<(), ErrBox>> { + let channels = self.external_channels.lock().unwrap(); + let mut sender = channels.sender.clone(); + async move { + let result = sender.send(buf).map_err(ErrBox::from).await; + drop(sender); + result + } } /// Get message from worker as a host. @@ -156,12 +166,12 @@ impl Worker { } impl Future for Worker { - type Item = (); - type Error = ErrBox; + type Output = Result<(), ErrBox>; - fn poll(&mut self) -> Result<Async<()>, ErrBox> { - let mut isolate = self.isolate.lock().unwrap(); - isolate.poll() + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut isolate = inner.isolate.lock().unwrap(); + isolate.poll_unpin(cx) } } @@ -173,12 +183,14 @@ pub struct WorkerReceiver { } impl Future for WorkerReceiver { - type Item = Option<Buf>; - type Error = ErrBox; + type Output = Result<Option<Buf>, ErrBox>; - fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { let mut channels = self.channels.lock().unwrap(); - channels.receiver.poll().map_err(ErrBox::from) + match channels.receiver.poll_next_unpin(cx) { + Poll::Ready(v) => Poll::Ready(Ok(v)), + Poll::Pending => Poll::Pending, + } } } @@ -192,7 +204,6 @@ mod tests { use crate::startup_data; use crate::state::ThreadSafeState; use crate::tokio_util; - use futures::future::lazy; use std::sync::atomic::Ordering; #[test] @@ -220,7 +231,7 @@ mod tests { ) .unwrap(); let state_ = state.clone(); - tokio_util::run(lazy(move || { + tokio_util::run(async move { let mut worker = Worker::new("TEST".to_string(), StartupData::None, state, ext); worker @@ -231,7 +242,8 @@ mod tests { } tokio_util::panic_on_error(worker) }) - })); + .await + }); let metrics = &state_.metrics; assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2); @@ -261,7 +273,7 @@ mod tests { ) .unwrap(); let state_ = state.clone(); - tokio_util::run(lazy(move || { + tokio_util::run(async move { let mut worker = Worker::new("TEST".to_string(), StartupData::None, state, ext); worker @@ -272,7 +284,8 @@ mod tests { } tokio_util::panic_on_error(worker) }) - })); + .await + }); let metrics = &state_.metrics; // TODO assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2); @@ -306,7 +319,7 @@ mod tests { .unwrap(); let global_state_ = global_state.clone(); let state_ = state.clone(); - tokio_util::run(lazy(move || { + tokio_util::run(async move { let mut worker = Worker::new( "TEST".to_string(), startup_data::deno_isolate_init(), @@ -322,7 +335,8 @@ mod tests { } tokio_util::panic_on_error(worker) }) - })); + .await + }); assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3); // Check that we've only invoked the compiler once. @@ -371,19 +385,22 @@ mod tests { let worker_ = worker.clone(); - tokio::spawn(lazy(move || { - worker.then(move |r| -> Result<(), ()> { - r.unwrap(); - Ok(()) - }) - })); + tokio::spawn( + worker + .then(move |r| { + r.unwrap(); + futures::future::ok(()) + }) + .compat(), + ); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = worker_.post_message(msg); + let r = futures::executor::block_on(worker_.post_message(msg).boxed()); assert!(r.is_ok()); - let maybe_msg = worker_.get_message().wait().unwrap(); + let maybe_msg = + futures::executor::block_on(worker_.get_message()).unwrap(); assert!(maybe_msg.is_some()); // Check if message received is [1, 2, 3] in json assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); @@ -392,7 +409,7 @@ mod tests { .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = worker_.post_message(msg); + let r = futures::executor::block_on(worker_.post_message(msg).boxed()); assert!(r.is_ok()); }) } @@ -407,21 +424,25 @@ mod tests { let worker_ = worker.clone(); let worker_future = worker - .then(move |r| -> Result<(), ()> { + .then(move |r| { println!("workers.rs after resource close"); r.unwrap(); - Ok(()) + futures::future::ok(()) }) .shared(); let worker_future_ = worker_future.clone(); - tokio::spawn(lazy(move || worker_future_.then(|_| Ok(())))); + tokio::spawn( + worker_future_ + .then(|_: Result<(), ()>| futures::future::ok(())) + .compat(), + ); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = worker_.post_message(msg); + let r = futures::executor::block_on(worker_.post_message(msg)); assert!(r.is_ok()); - worker_future.wait().unwrap(); + futures::executor::block_on(worker_future).unwrap(); }) } @@ -432,9 +453,11 @@ mod tests { let mut worker = create_test_worker(); let module_specifier = ModuleSpecifier::resolve_url_or_path("does-not-exist").unwrap(); - let result = worker - .execute_mod_async(&module_specifier, None, false) - .wait(); + let result = futures::executor::block_on(worker.execute_mod_async( + &module_specifier, + None, + false, + )); assert!(result.is_err()); }) } @@ -452,9 +475,11 @@ mod tests { .to_owned(); let module_specifier = ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); - let result = worker - .execute_mod_async(&module_specifier, None, false) - .wait(); + let result = futures::executor::block_on(worker.execute_mod_async( + &module_specifier, + None, + false, + )); assert!(result.is_ok()); }) } |