summaryrefslogtreecommitdiff
path: root/core/isolate.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/isolate.rs')
-rw-r--r--core/isolate.rs130
1 files changed, 57 insertions, 73 deletions
diff --git a/core/isolate.rs b/core/isolate.rs
index 4a44f4439..497522356 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -12,11 +12,12 @@ use crate::libdeno::Snapshot1;
use crate::libdeno::Snapshot2;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
-use futures::Async;
+use futures::stream::{FuturesUnordered, Stream};
+use futures::task;
+use futures::Async::*;
use futures::Future;
use futures::Poll;
use libc::c_void;
-use std::collections::VecDeque;
use std::ffi::CStr;
use std::ffi::CString;
use std::ptr::null;
@@ -27,27 +28,28 @@ pub type Op = dyn Future<Item = Buf, Error = ()> + Send;
struct PendingOp {
op: Box<Op>,
- polled_recently: bool,
zero_copy_id: usize, // non-zero if associated zero-copy buffer.
}
+struct OpResult {
+ buf: Buf,
+ zero_copy_id: usize,
+}
+
impl Future for PendingOp {
- type Item = Buf;
+ type Item = OpResult;
type Error = ();
- fn poll(&mut self) -> Poll<Buf, ()> {
- // Do not call poll on ops we've already polled this turn.
- if self.polled_recently {
- Ok(Async::NotReady)
- } else {
- self.polled_recently = true;
- let op = &mut self.op;
- op.poll().map_err(|()| {
- // Ops should not error. If an op experiences an error it needs to
- // encode that error into a buf, so it can be returned to JS.
- panic!("ops should not error")
- })
- }
+ fn poll(&mut self) -> Poll<OpResult, ()> {
+ // Ops should not error. If an op experiences an error it needs to
+ // encode that error into a buf, so it can be returned to JS.
+ Ok(match self.op.poll().expect("ops should not error") {
+ NotReady => NotReady,
+ Ready(buf) => Ready(OpResult {
+ buf,
+ zero_copy_id: self.zero_copy_id,
+ }),
+ })
}
}
@@ -91,8 +93,8 @@ pub struct Isolate<B: Dispatch> {
dispatcher: B,
needs_init: bool,
shared: SharedQueue,
- pending_ops: VecDeque<PendingOp>,
- polled_recently: bool,
+ pending_ops: FuturesUnordered<PendingOp>,
+ have_unpolled_ops: bool,
}
unsafe impl<B: Dispatch> Send for Isolate<B> {}
@@ -142,8 +144,8 @@ impl<B: Dispatch> Isolate<B> {
dispatcher,
shared,
needs_init,
- pending_ops: VecDeque::new(),
- polled_recently: false,
+ pending_ops: FuturesUnordered::new(),
+ have_unpolled_ops: false,
};
// If we want to use execute this has to happen here sadly.
@@ -209,12 +211,8 @@ impl<B: Dispatch> Isolate<B> {
// picked up.
let _ = isolate.respond(Some(&res_record));
} else {
- isolate.pending_ops.push_back(PendingOp {
- op,
- polled_recently: false,
- zero_copy_id,
- });
- isolate.polled_recently = false;
+ isolate.pending_ops.push(PendingOp { op, zero_copy_id });
+ isolate.have_unpolled_ops = true;
}
}
@@ -438,58 +436,41 @@ impl<B: Dispatch> Future for Isolate<B> {
// Lock the current thread for V8.
let _locker = LockerScope::new(self.libdeno_isolate);
- // Clear poll_recently state both on the Isolate itself and
- // on the pending ops.
- self.polled_recently = false;
- for pending in self.pending_ops.iter_mut() {
- pending.polled_recently = false;
- }
-
- while !self.polled_recently {
- let mut completed_count = 0;
- self.polled_recently = true;
- assert_eq!(self.shared.size(), 0);
-
- let mut overflow_response: Option<Buf> = None;
-
- for _ in 0..self.pending_ops.len() {
- assert!(overflow_response.is_none());
- let mut op = self.pending_ops.pop_front().unwrap();
- match op.poll() {
- Err(()) => panic!("unexpected error"),
- Ok(Async::NotReady) => self.pending_ops.push_back(op),
- Ok(Async::Ready(buf)) => {
- if op.zero_copy_id > 0 {
- self.zero_copy_release(op.zero_copy_id);
- }
-
- let successful_push = self.shared.push(&buf);
- if !successful_push {
- // If we couldn't push the response to the shared queue, because
- // there wasn't enough size, we will return the buffer via the
- // legacy route, using the argument of deno_respond.
- overflow_response = Some(buf);
- // reset `polled_recently` so pending ops can be
- // done even if shared space overflows
- self.polled_recently = false;
- break;
- }
+ let mut overflow_response: Option<Buf> = None;
+
+ loop {
+ self.have_unpolled_ops = false;
+ #[allow(clippy::match_wild_err_arm)]
+ match self.pending_ops.poll() {
+ Err(_) => panic!("unexpected op error"),
+ Ok(Ready(None)) => break,
+ Ok(NotReady) => break,
+ Ok(Ready(Some(r))) => {
+ if r.zero_copy_id > 0 {
+ self.zero_copy_release(r.zero_copy_id);
+ }
- completed_count += 1;
+ let successful_push = self.shared.push(&r.buf);
+ if !successful_push {
+ // If we couldn't push the response to the shared queue, because
+ // there wasn't enough size, we will return the buffer via the
+ // legacy route, using the argument of deno_respond.
+ overflow_response = Some(r.buf);
+ break;
}
}
}
+ }
- if completed_count > 0 {
- self.respond(None)?;
- // The other side should have shifted off all the messages.
- assert_eq!(self.shared.size(), 0);
- }
+ if self.shared.size() > 0 {
+ self.respond(None)?;
+ // The other side should have shifted off all the messages.
+ assert_eq!(self.shared.size(), 0);
+ }
- if overflow_response.is_some() {
- let buf = overflow_response.take().unwrap();
- self.respond(Some(&buf))?;
- }
+ if overflow_response.is_some() {
+ let buf = overflow_response.take().unwrap();
+ self.respond(Some(&buf))?;
}
self.check_promise_errors();
@@ -501,6 +482,9 @@ impl<B: Dispatch> Future for Isolate<B> {
if self.pending_ops.is_empty() {
Ok(futures::Async::Ready(()))
} else {
+ if self.have_unpolled_ops {
+ task::current().notify();
+ }
Ok(futures::Async::NotReady)
}
}