diff options
Diffstat (limited to 'core/isolate.rs')
-rw-r--r-- | core/isolate.rs | 314 |
1 files changed, 193 insertions, 121 deletions
diff --git a/core/isolate.rs b/core/isolate.rs index 8b15befa3..079ab5dcf 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -19,20 +19,27 @@ use crate::libdeno::Snapshot2; use crate::ops::*; use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; +use futures::future::FutureExt; +use futures::future::TryFutureExt; use futures::stream::FuturesUnordered; +use futures::stream::IntoStream; use futures::stream::Stream; +use futures::stream::StreamExt; use futures::stream::StreamFuture; -use futures::task; -use futures::Async::*; -use futures::Future; -use futures::Poll; +use futures::stream::TryStream; +use futures::stream::TryStreamExt; +use futures::task::AtomicWaker; use libc::c_char; use libc::c_void; use std::ffi::CStr; use std::ffi::CString; use std::fmt; +use std::future::Future; +use std::pin::Pin; use std::ptr::null; use std::sync::{Arc, Mutex, Once}; +use std::task::Context; +use std::task::Poll; /// Stores a script used to initalize a Isolate pub struct Script<'a> { @@ -59,7 +66,7 @@ pub enum RecursiveLoadEvent { Instantiate(deno_mod), } -pub trait ImportStream: Stream { +pub trait ImportStream: TryStream { fn register( &mut self, source_code_info: SourceCodeInfo, @@ -67,8 +74,14 @@ pub trait ImportStream: Stream { ) -> Result<(), ErrBox>; } -type DynImportStream = - Box<dyn ImportStream<Item = RecursiveLoadEvent, Error = ErrBox> + Send>; +type DynImportStream = Box< + dyn ImportStream< + Ok = RecursiveLoadEvent, + Error = ErrBox, + Item = Result<RecursiveLoadEvent, ErrBox>, + > + Send + + Unpin, +>; type DynImportFn = dyn Fn(deno_dyn_import_id, &str, &str) -> DynImportStream; @@ -87,15 +100,23 @@ impl fmt::Debug for DynImportStream { } impl Stream for DynImport { - type Item = (deno_dyn_import_id, RecursiveLoadEvent); - type Error = (deno_dyn_import_id, ErrBox); - - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - match self.inner.poll() { - Ok(Ready(Some(event))) => Ok(Ready(Some((self.id, event)))), - Ok(Ready(None)) => unreachable!(), - Err(e) => Err((self.id, e)), - Ok(NotReady) => Ok(NotReady), + type Item = Result< + (deno_dyn_import_id, RecursiveLoadEvent), + (deno_dyn_import_id, ErrBox), + >; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Option<Self::Item>> { + let self_inner = self.get_mut(); + match self_inner.inner.try_poll_next_unpin(cx) { + Poll::Ready(Some(Ok(event))) => { + Poll::Ready(Some(Ok((self_inner.id, event)))) + } + Poll::Ready(None) => unreachable!(), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err((self_inner.id, e)))), + Poll::Pending => Poll::Pending, } } } @@ -154,11 +175,12 @@ pub struct Isolate { needs_init: bool, shared: SharedQueue, pending_ops: FuturesUnordered<PendingOpFuture>, - pending_dyn_imports: FuturesUnordered<StreamFuture<DynImport>>, + pending_dyn_imports: FuturesUnordered<StreamFuture<IntoStream<DynImport>>>, have_unpolled_ops: bool, startup_script: Option<OwnedScript>, op_registry: OpRegistry, eager_poll_count: u32, + waker: AtomicWaker, } unsafe impl Send for Isolate {} @@ -225,6 +247,7 @@ impl Isolate { startup_script, op_registry: OpRegistry::new(), eager_poll_count: 0, + waker: AtomicWaker::new(), } } @@ -296,8 +319,10 @@ impl Isolate { if let Some(ref f) = isolate.dyn_import { let inner = f(id, specifier, referrer); let stream = DynImport { inner, id }; - task::current().notify(); - isolate.pending_dyn_imports.push(stream.into_future()); + isolate.waker.wake(); + isolate + .pending_dyn_imports + .push(stream.into_stream().into_future()); } else { panic!("dyn_import callback not set") } @@ -334,10 +359,11 @@ impl Isolate { // which case they can be turned into a sync op before we return to V8. This // can save a boundary crossing. #[allow(clippy::match_wild_err_arm)] - match fut.poll() { - Err(_) => panic!("unexpected op error"), - Ok(Ready(buf)) => Op::Sync(buf), - Ok(NotReady) => Op::Async(fut), + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + match fut.poll_unpin(&mut cx) { + Poll::Ready(Err(_)) => panic!("unexpected op error"), + Poll::Ready(Ok(buf)) => Op::Sync(buf), + Poll::Pending => Op::Async(fut), } } Op::Sync(buf) => Op::Sync(buf), @@ -359,8 +385,8 @@ impl Isolate { .expect("unexpected error"); } Op::Async(fut) => { - let fut2 = fut.map(move |buf| (op_id, buf)); - isolate.pending_ops.push(Box::new(fut2)); + let fut2 = fut.map_ok(move |buf| (op_id, buf)); + isolate.pending_ops.push(fut2.boxed()); isolate.have_unpolled_ops = true; } } @@ -522,42 +548,45 @@ impl Isolate { self.check_last_exception() } - fn poll_dyn_imports(&mut self) -> Poll<(), ErrBox> { + fn poll_dyn_imports(&mut self, cx: &mut Context) -> Poll<Result<(), ErrBox>> { use RecursiveLoadEvent::*; loop { - match self.pending_dyn_imports.poll() { - Ok(NotReady) | Ok(Ready(None)) => { + match self.pending_dyn_imports.poll_next_unpin(cx) { + Poll::Pending | Poll::Ready(None) => { // There are no active dynamic import loaders, or none are ready. - return Ok(futures::Async::Ready(())); + return Poll::Ready(Ok(())); } - Ok(Ready(Some(( - Some((dyn_import_id, Fetch(source_code_info))), + Poll::Ready(Some(( + Some(Ok((dyn_import_id, Fetch(source_code_info)))), mut stream, - )))) => { + ))) => { // A module (not necessarily the one dynamically imported) has been // fetched. Create and register it, and if successful, poll for the // next recursive-load event related to this dynamic import. - match stream.register(source_code_info, self) { + match stream.get_mut().register(source_code_info, self) { Ok(()) => self.pending_dyn_imports.push(stream.into_future()), Err(err) => { self.dyn_import_done(dyn_import_id, Err(Some(err.to_string())))? } } } - Ok(Ready(Some((Some((dyn_import_id, Instantiate(module_id))), _)))) => { + Poll::Ready(Some(( + Some(Ok((dyn_import_id, Instantiate(module_id)))), + _, + ))) => { // The top-level module from a dynamic import has been instantiated. match self.mod_evaluate(module_id) { Ok(()) => self.dyn_import_done(dyn_import_id, Ok(module_id))?, Err(..) => self.dyn_import_done(dyn_import_id, Err(None))?, } } - Err(((dyn_import_id, err), _)) => { + Poll::Ready(Some((Some(Err((dyn_import_id, err))), _))) => { // A non-javascript error occurred; this could be due to a an invalid // module specifier, or a problem with the source map, or a failure // to fetch the module source code. self.dyn_import_done(dyn_import_id, Err(Some(err.to_string())))? } - Ok(Ready(Some((None, _)))) => unreachable!(), + Poll::Ready(Some((None, _))) => unreachable!(), } } } @@ -654,30 +683,34 @@ impl Drop for LockerScope { } impl Future for Isolate { - type Item = (); - type Error = ErrBox; + type Output = Result<(), ErrBox>; - fn poll(&mut self) -> Poll<(), ErrBox> { - self.shared_init(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + + inner.waker.register(cx.waker()); + + inner.shared_init(); let mut overflow_response: Option<(OpId, Buf)> = None; loop { // If there are any pending dyn_import futures, do those first. - if !self.pending_dyn_imports.is_empty() { - self.poll_dyn_imports()?; + if !inner.pending_dyn_imports.is_empty() { + let poll_imports = inner.poll_dyn_imports(cx)?; + assert!(poll_imports.is_ready()); } // Now handle actual ops. - self.have_unpolled_ops = false; - self.eager_poll_count = 0; + inner.have_unpolled_ops = false; + inner.eager_poll_count = 0; #[allow(clippy::match_wild_err_arm)] - match self.pending_ops.poll() { - Err(_) => panic!("unexpected op error"), - Ok(Ready(None)) => break, - Ok(NotReady) => break, - Ok(Ready(Some((op_id, buf)))) => { - let successful_push = self.shared.push(op_id, &buf); + match inner.pending_ops.poll_next_unpin(cx) { + Poll::Ready(Some(Err(_))) => panic!("unexpected op error"), + Poll::Ready(None) => break, + Poll::Pending => break, + Poll::Ready(Some(Ok((op_id, buf)))) => { + let successful_push = inner.shared.push(op_id, &buf); if !successful_push { // If we couldn't push the response to the shared queue, because // there wasn't enough size, we will return the buffer via the @@ -689,34 +722,34 @@ impl Future for Isolate { } } - if self.shared.size() > 0 { + if inner.shared.size() > 0 { // Lock the current thread for V8. - let locker = LockerScope::new(self.libdeno_isolate); - self.respond(None)?; + let locker = LockerScope::new(inner.libdeno_isolate); + inner.respond(None)?; // The other side should have shifted off all the messages. - assert_eq!(self.shared.size(), 0); + assert_eq!(inner.shared.size(), 0); drop(locker); } if overflow_response.is_some() { // Lock the current thread for V8. - let locker = LockerScope::new(self.libdeno_isolate); + let locker = LockerScope::new(inner.libdeno_isolate); let (op_id, buf) = overflow_response.take().unwrap(); - self.respond(Some((op_id, &buf)))?; + inner.respond(Some((op_id, &buf)))?; drop(locker); } - self.check_promise_errors(); - self.check_last_exception()?; + inner.check_promise_errors(); + inner.check_last_exception()?; // We're idle if pending_ops is empty. - if self.pending_ops.is_empty() && self.pending_dyn_imports.is_empty() { - Ok(futures::Async::Ready(())) + if inner.pending_ops.is_empty() && inner.pending_dyn_imports.is_empty() { + Poll::Ready(Ok(())) } else { - if self.have_unpolled_ops { - task::current().notify(); + if inner.have_unpolled_ops { + inner.waker.wake(); } - Ok(futures::Async::NotReady) + Poll::Pending } } } @@ -752,33 +785,29 @@ pub fn js_check<T>(r: Result<T, ErrBox>) -> T { #[cfg(test)] pub mod tests { use super::*; - use futures::executor::spawn; + use futures::executor::ThreadPool; use futures::future::lazy; - use futures::future::ok; - use futures::Async; use std::io; use std::ops::FnOnce; use std::sync::atomic::{AtomicUsize, Ordering}; - pub fn run_in_task<F, R>(f: F) -> R + pub fn run_in_task<F>(f: F) where - F: FnOnce() -> R, + F: FnOnce(&mut Context) + Send + 'static, { - spawn(lazy(move || ok::<R, ()>(f()))).wait_future().unwrap() + let poll = ThreadPool::new().unwrap(); + poll.spawn_ok(lazy(move |cx| f(cx))); } - fn poll_until_ready<F>( - future: &mut F, - max_poll_count: usize, - ) -> Result<F::Item, F::Error> + fn poll_until_ready<F>(future: &mut F, max_poll_count: usize) -> F::Output where - F: Future, + F: Future + Unpin, { + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); for _ in 0..max_poll_count { - match future.poll() { - Ok(NotReady) => continue, - Ok(Ready(val)) => return Ok(val), - Err(err) => return Err(err), + match future.poll_unpin(&mut cx) { + Poll::Pending => continue, + Poll::Ready(val) => return val, } } panic!( @@ -799,16 +828,16 @@ pub mod tests { } impl Future for DelayedFuture { - type Item = Box<[u8]>; - type Error = (); + type Output = Result<Box<[u8]>, ()>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.counter > 0 { - return Ok(Async::Ready(self.buf.clone())); + fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + if inner.counter > 0 { + return Poll::Ready(Ok(inner.buf.clone())); } - self.counter += 1; - Ok(Async::NotReady) + inner.counter += 1; + Poll::Pending } } @@ -835,13 +864,13 @@ pub mod tests { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(Box::new(futures::future::ok(buf))) + Op::Async(futures::future::ok(buf).boxed()) } Mode::AsyncDelayed => { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(Box::new(DelayedFuture::new(buf))) + Op::Async(DelayedFuture::new(buf).boxed()) } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); @@ -860,7 +889,7 @@ pub mod tests { Mode::OverflowReqAsync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(Box::new(DelayedFuture::new(buf))) + Op::Async(DelayedFuture::new(buf).boxed()) } Mode::OverflowResAsync => { assert_eq!(control.len(), 1); @@ -869,7 +898,7 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; let buf = vec.into_boxed_slice(); - Op::Async(Box::new(DelayedFuture::new(buf))) + Op::Async(DelayedFuture::new(buf).boxed()) } } }; @@ -957,7 +986,7 @@ pub mod tests { #[test] fn test_poll_async_immediate_ops() { - run_in_task(|| { + run_in_task(|cx| { let (mut isolate, dispatch_count) = setup(Mode::AsyncImmediate); js_check(isolate.execute( @@ -992,16 +1021,22 @@ pub mod tests { )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); js_check(isolate.execute("check3.js", "assert(nrecv == 0)")); // We are idle, so the next poll should be the last. - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); }); } #[test] fn test_poll_async_delayed_ops() { - run_in_task(|| { + run_in_task(|cx| { let (mut isolate, dispatch_count) = setup(Mode::AsyncDelayed); js_check(isolate.execute( @@ -1024,7 +1059,10 @@ pub mod tests { "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); js_check(isolate.execute( "check2.js", @@ -1035,26 +1073,36 @@ pub mod tests { "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); js_check(isolate.execute("check3.js", "assert(nrecv == 2)")); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); // We are idle, so the next poll should be the last. - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); }); } struct MockImportStream(Vec<Result<RecursiveLoadEvent, ErrBox>>); impl Stream for MockImportStream { - type Item = RecursiveLoadEvent; - type Error = ErrBox; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - let event = if self.0.is_empty() { + type Item = Result<RecursiveLoadEvent, ErrBox>; + + fn poll_next( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Option<Self::Item>> { + let inner = self.get_mut(); + let event = if inner.0.is_empty() { None } else { - Some(self.0.remove(0)?) + Some(inner.0.remove(0)) }; - Ok(Ready(event)) + Poll::Ready(event) } } @@ -1080,7 +1128,7 @@ pub mod tests { #[test] fn dyn_import_err() { // Test an erroneous dynamic import where the specified module isn't found. - run_in_task(|| { + run_in_task(|cx| { let count = Arc::new(AtomicUsize::new(0)); let count_ = count.clone(); let mut isolate = Isolate::new(StartupData::None, false); @@ -1103,8 +1151,10 @@ pub mod tests { assert_eq!(count.load(Ordering::Relaxed), 1); // We should get an error here. - let result = isolate.poll(); - assert!(result.is_err()); + let result = isolate.poll_unpin(cx); + if let Poll::Ready(Ok(_)) = result { + unreachable!(); + } }) } @@ -1113,7 +1163,7 @@ pub mod tests { use std::convert::TryInto; // Import multiple modules to demonstrate that after failed dynamic import // another dynamic import can still be run - run_in_task(|| { + run_in_task(|cx| { let count = Arc::new(AtomicUsize::new(0)); let count_ = count.clone(); let mut isolate = Isolate::new(StartupData::None, false); @@ -1156,15 +1206,24 @@ pub mod tests { assert_eq!(count.load(Ordering::Relaxed), 3); // Now each poll should return error - assert!(isolate.poll().is_err()); - assert!(isolate.poll().is_err()); - assert!(isolate.poll().is_err()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Err(_)) => true, + _ => false, + }); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Err(_)) => true, + _ => false, + }); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Err(_)) => true, + _ => false, + }); }) } #[test] fn dyn_import_ok() { - run_in_task(|| { + run_in_task(|cx| { let count = Arc::new(AtomicUsize::new(0)); let count_ = count.clone(); @@ -1224,9 +1283,15 @@ pub mod tests { )); assert_eq!(count.load(Ordering::Relaxed), 1); - assert_eq!(Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); assert_eq!(count.load(Ordering::Relaxed), 2); - assert_eq!(Ready(()), isolate.poll().unwrap()); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); assert_eq!(count.load(Ordering::Relaxed), 2); }) } @@ -1247,7 +1312,7 @@ pub mod tests { shared.terminate_execution(); // allow shutdown - std::thread::sleep(std::time::Duration::from_millis(100)); + std::thread::sleep(std::time::Duration::from_millis(200)); // unless reported otherwise the test should fail after this point tx_clone.send(false).ok(); @@ -1345,7 +1410,7 @@ pub mod tests { #[test] fn overflow_req_async() { - run_in_task(|| { + run_in_task(|cx| { let (mut isolate, dispatch_count) = setup(Mode::OverflowReqAsync); js_check(isolate.execute( "overflow_req_async.js", @@ -1366,14 +1431,17 @@ pub mod tests { "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert_eq!(Async::Ready(()), js_check(isolate.poll())); + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); js_check(isolate.execute("check.js", "assert(asyncRecv == 1);")); }); } #[test] fn overflow_res_async() { - run_in_task(|| { + run_in_task(|_cx| { // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We // should optimize this. let (mut isolate, dispatch_count) = setup(Mode::OverflowResAsync); @@ -1404,7 +1472,7 @@ pub mod tests { fn overflow_res_multiple_dispatch_async() { // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We // should optimize this. - run_in_task(|| { + run_in_task(|_cx| { let (mut isolate, dispatch_count) = setup(Mode::OverflowResAsync); js_check(isolate.execute( "overflow_res_multiple_dispatch_async.js", @@ -1434,7 +1502,7 @@ pub mod tests { #[test] fn test_pre_dispatch() { - run_in_task(|| { + run_in_task(|mut cx| { let (mut isolate, _dispatch_count) = setup(Mode::OverflowResAsync); js_check(isolate.execute( "bad_op_id.js", @@ -1448,13 +1516,15 @@ pub mod tests { assert(thrown == "Unknown op id: 100"); "#, )); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { + unreachable!(); + } }); } #[test] fn test_js() { - run_in_task(|| { + run_in_task(|mut cx| { let (mut isolate, _dispatch_count) = setup(Mode::AsyncImmediate); js_check( isolate.execute( @@ -1462,7 +1532,9 @@ pub mod tests { include_str!("shared_queue_test.js"), ), ); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) { + unreachable!(); + } }); } |