diff options
-rw-r--r-- | cli/ops.rs | 11 | ||||
-rw-r--r-- | cli/state.rs | 6 | ||||
-rw-r--r-- | core/examples/http_bench.rs | 14 | ||||
-rw-r--r-- | core/isolate.rs | 55 |
4 files changed, 48 insertions, 38 deletions
diff --git a/cli/ops.rs b/cli/ops.rs index 5463bac4d..c49ce517c 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -83,7 +83,7 @@ pub fn dispatch_all( control: &[u8], zero_copy: Option<PinnedBuf>, op_selector: OpSelector, -) -> (bool, Box<Op>) { +) -> Op { let bytes_sent_control = control.len(); let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); let base = msg::get_root_as_base(&control); @@ -101,7 +101,7 @@ pub fn dispatch_all( let state = state.clone(); state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); - let boxed_op = Box::new( + let fut = Box::new( op.or_else(move |err: DenoError| -> Result<Buf, ()> { debug!("op err {}", err); // No matter whether we got an Err or Ok, we want a serialized message to @@ -143,7 +143,12 @@ pub fn dispatch_all( msg::enum_name_any(inner_type), base.sync() ); - (base.sync(), boxed_op) + + if base.sync() { + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) + } } pub fn op_selector_compiler(inner_type: msg::Any) -> Option<OpCreator> { diff --git a/cli/state.rs b/cli/state.rs index 2bfc641d5..8a4f4eaee 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -81,11 +81,7 @@ impl Deref for ThreadSafeState { } impl ThreadSafeState { - pub fn dispatch( - &self, - control: &[u8], - zero_copy: Option<PinnedBuf>, - ) -> (bool, Box<Op>) { + pub fn dispatch(&self, control: &[u8], zero_copy: Option<PinnedBuf>) -> Op { ops::dispatch_all(self, control, zero_copy, self.dispatch_selector) } } diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index b355f5568..757e9a3b7 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -111,10 +111,7 @@ fn test_record_from() { pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send; -fn dispatch( - control: &[u8], - zero_copy_buf: Option<PinnedBuf>, -) -> (bool, Box<Op>) { +fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> Op { let record = Record::from(control); let is_sync = record.promise_id == 0; let http_bench_op = match record.op_id { @@ -147,7 +144,7 @@ fn dispatch( let mut record_a = record.clone(); let mut record_b = record.clone(); - let op = Box::new( + let fut = Box::new( http_bench_op .and_then(move |result| { record_a.result = result; @@ -161,7 +158,12 @@ fn dispatch( Ok(record.into()) }), ); - (is_sync, op) + + if is_sync { + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) + } } fn main() { diff --git a/core/isolate.rs b/core/isolate.rs index 2cafb29b6..96d9dc24b 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -26,7 +26,13 @@ use std::ptr::null; use std::sync::{Arc, Mutex, Once, ONCE_INIT}; pub type Buf = Box<[u8]>; -pub type Op = dyn Future<Item = Buf, Error = ()> + Send; + +pub type OpAsyncFuture = Box<dyn Future<Item = Buf, Error = ()> + Send>; + +pub enum Op { + Sync(Buf), + Async(OpAsyncFuture), +} /// Stores a script used to initalize a Isolate pub struct Script<'a> { @@ -46,8 +52,7 @@ pub enum StartupData<'a> { #[derive(Default)] pub struct Config { - dispatch: - Option<Arc<Fn(&[u8], Option<PinnedBuf>) -> (bool, Box<Op>) + Send + Sync>>, + dispatch: Option<Arc<Fn(&[u8], Option<PinnedBuf>) -> Op>>, pub will_snapshot: bool, } @@ -57,7 +62,7 @@ impl Config { /// corresponds to the second argument of Deno.core.dispatch(). pub fn dispatch<F>(&mut self, f: F) where - F: Fn(&[u8], Option<PinnedBuf>) -> (bool, Box<Op>) + Send + Sync + 'static, + F: Fn(&[u8], Option<PinnedBuf>) -> Op + Send + Sync + 'static, { self.dispatch = Some(Arc::new(f)); } @@ -69,15 +74,15 @@ impl Config { /// pending ops have completed. /// /// Ops are created in JavaScript by calling Deno.core.dispatch(), and in Rust -/// by implementing deno::Dispatch::dispatch. An Op corresponds exactly to a -/// Promise in JavaScript. +/// by implementing deno::Dispatch::dispatch. An async Op corresponds exactly to +/// a Promise in JavaScript. pub struct Isolate { libdeno_isolate: *const libdeno::isolate, shared_libdeno_isolate: Arc<Mutex<Option<*const libdeno::isolate>>>, config: Config, needs_init: bool, shared: SharedQueue, - pending_ops: FuturesUnordered<Box<Op>>, + pending_ops: FuturesUnordered<OpAsyncFuture>, have_unpolled_ops: bool, } @@ -175,7 +180,7 @@ impl Isolate { let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; let control_shared = isolate.shared.shift(); - let (is_sync, op) = if control_argv0.len() > 0 { + let op = if control_argv0.len() > 0 { // The user called Deno.core.send(control) if let Some(ref f) = isolate.config.dispatch { f(control_argv0.as_ref(), PinnedBuf::new(zero_copy_buf)) @@ -201,16 +206,18 @@ impl Isolate { // At this point the SharedQueue should be empty. assert_eq!(isolate.shared.size(), 0); - if is_sync { - let res_record = op.wait().unwrap(); - // For sync messages, we always return the response via Deno.core.send's - // return value. - // TODO(ry) check that if JSError thrown during respond(), that it will be - // picked up. - let _ = isolate.respond(Some(&res_record)); - } else { - isolate.pending_ops.push(op); - isolate.have_unpolled_ops = true; + match op { + Op::Sync(buf) => { + // For sync messages, we always return the response via Deno.core.send's + // return value. + // TODO(ry) check that if JSError thrown during respond(), that it will be + // picked up. + let _ = isolate.respond(Some(&buf)); + } + Op::Async(fut) => { + isolate.pending_ops.push(fut); + isolate.have_unpolled_ops = true; + } } } @@ -555,19 +562,19 @@ pub mod tests { let dispatch_count_ = dispatch_count.clone(); let mut config = Config::default(); - config.dispatch(move |control, _| -> (bool, Box<Op>) { + config.dispatch(move |control, _| -> Op { dispatch_count_.fetch_add(1, Ordering::Relaxed); match mode { Mode::AsyncImmediate => { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8].into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) + Op::Async(Box::new(futures::future::ok(buf))) } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8].into_boxed_slice(); - (true, Box::new(futures::future::ok(buf))) + Op::Sync(buf) } Mode::OverflowResSync => { assert_eq!(control.len(), 1); @@ -576,12 +583,12 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 99; let buf = vec.into_boxed_slice(); - (true, Box::new(futures::future::ok(buf))) + Op::Sync(buf) } Mode::OverflowReqAsync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8].into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) + Op::Async(Box::new(futures::future::ok(buf))) } Mode::OverflowResAsync => { assert_eq!(control.len(), 1); @@ -590,7 +597,7 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; let buf = vec.into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) + Op::Async(Box::new(futures::future::ok(buf))) } } }); |