diff options
Diffstat (limited to 'cli/state.rs')
-rw-r--r-- | cli/state.rs | 353 |
1 files changed, 88 insertions, 265 deletions
diff --git a/cli/state.rs b/cli/state.rs index 9f225e522..c85701bac 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -1,31 +1,30 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::errors::get_error_class_name; use crate::file_fetcher::SourceFileFetcher; use crate::global_state::GlobalState; use crate::global_timer::GlobalTimer; use crate::http_util::create_http_client; use crate::import_map::ImportMap; use crate::metrics::Metrics; -use crate::ops::serialize_result; -use crate::ops::JsonOp; -use crate::ops::MinimalOp; use crate::permissions::Permissions; use crate::tsc::TargetLib; use crate::web_worker::WebWorkerHandle; -use deno_core::Buf; use deno_core::BufVec; -use deno_core::CoreIsolateState; use deno_core::ErrBox; use deno_core::ModuleLoadId; use deno_core::ModuleLoader; use deno_core::ModuleSpecifier; use deno_core::Op; +use deno_core::OpId; +use deno_core::OpRegistry; +use deno_core::OpRouter; +use deno_core::OpTable; use deno_core::ResourceTable; -use deno_core::ZeroCopyBuf; use futures::future::FutureExt; use futures::Future; use rand::rngs::StdRng; use rand::SeedableRng; -use serde_json::Value; use std::cell::Cell; use std::cell::RefCell; use std::collections::HashMap; @@ -55,266 +54,11 @@ pub struct State { pub is_main: bool, pub is_internal: bool, pub http_client: RefCell<reqwest::Client>, + pub resource_table: RefCell<ResourceTable>, + pub op_table: RefCell<OpTable<Self>>, } impl State { - pub fn stateful_json_op_sync<D>( - self: &Rc<Self>, - resource_table: &Rc<RefCell<ResourceTable>>, - dispatcher: D, - ) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op - where - D: Fn( - &State, - &mut ResourceTable, - Value, - &mut [ZeroCopyBuf], - ) -> Result<Value, ErrBox>, - { - let state = self.clone(); - let resource_table = resource_table.clone(); - - let f = move |isolate_state: &mut CoreIsolateState, - bufs: &mut [ZeroCopyBuf]| { - let get_error_class_fn = isolate_state.get_error_class_fn; - - // The first buffer should contain JSON encoded op arguments; parse them. - let args: Value = match serde_json::from_slice(&bufs[0]) { - Ok(v) => v, - Err(e) => { - return Op::Sync(serialize_result( - None, - Err(e.into()), - get_error_class_fn, - )); - } - }; - - // Make a slice containing all buffers except for the first one. - let zero_copy = &mut bufs[1..]; - - let result = - dispatcher(&state, &mut *resource_table.borrow_mut(), args, zero_copy); - - // Convert to Op. - Op::Sync(serialize_result(None, result, get_error_class_fn)) - }; - self.core_op(f) - } - - pub fn stateful_json_op_async<D, F>( - self: &Rc<Self>, - resource_table: &Rc<RefCell<ResourceTable>>, - dispatcher: D, - ) -> impl Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op - where - D: - FnOnce(Rc<State>, Rc<RefCell<ResourceTable>>, Value, BufVec) -> F + Clone, - F: Future<Output = Result<Value, ErrBox>> + 'static, - { - let state = self.clone(); - let resource_table = resource_table.clone(); - - let f = move |isolate_state: &mut CoreIsolateState, - bufs: &mut [ZeroCopyBuf]| { - let get_error_class_fn = isolate_state.get_error_class_fn; - - // The first buffer should contain JSON encoded op arguments; parse them. - let args: Value = match serde_json::from_slice(&bufs[0]) { - Ok(v) => v, - Err(e) => { - let e = e.into(); - return Op::Sync(serialize_result(None, Err(e), get_error_class_fn)); - } - }; - - // `args` should have a `promiseId` property with positive integer value. - let promise_id = match args.get("promiseId").and_then(|v| v.as_u64()) { - Some(i) => i, - None => { - let e = ErrBox::new("TypeError", "`promiseId` invalid/missing"); - return Op::Sync(serialize_result(None, Err(e), get_error_class_fn)); - } - }; - - // Take ownership of all buffers after the first one. - let zero_copy: BufVec = bufs[1..].into(); - - // Call dispatcher to obtain op future. - let fut = (dispatcher.clone())( - state.clone(), - resource_table.clone(), - args, - zero_copy, - ); - - // Convert to Op. - Op::Async( - async move { - serialize_result(Some(promise_id), fut.await, get_error_class_fn) - } - .boxed_local(), - ) - }; - self.core_op(f) - } - - // TODO(bartlomieju): remove me - still used by `op_open_plugin` which - // needs access to isolate_state - pub fn stateful_json_op2<D>( - self: &Rc<Self>, - dispatcher: D, - ) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op - where - D: Fn( - &mut deno_core::CoreIsolateState, - &Rc<State>, - Value, - &mut [ZeroCopyBuf], - ) -> Result<JsonOp, ErrBox>, - { - use crate::ops::json_op; - self.core_op(json_op(self.stateful_op2(dispatcher))) - } - - /// Wrap core `OpDispatcher` to collect metrics. - // TODO(ry) this should be private. Is called by stateful_json_op or - // stateful_minimal_op - pub(crate) fn core_op<D>( - self: &Rc<Self>, - dispatcher: D, - ) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op - where - D: Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op, - { - let state = self.clone(); - - move |isolate_state: &mut deno_core::CoreIsolateState, - zero_copy: &mut [ZeroCopyBuf]| - -> Op { - let bytes_sent_control = - zero_copy.get(0).map(|s| s.len()).unwrap_or(0) as u64; - let bytes_sent_zero_copy = - zero_copy[1..].iter().map(|b| b.len()).sum::<usize>() as u64; - - let op = dispatcher(isolate_state, zero_copy); - - match op { - Op::Sync(buf) => { - state.metrics.borrow_mut().op_sync( - bytes_sent_control, - bytes_sent_zero_copy, - buf.len() as u64, - ); - Op::Sync(buf) - } - Op::Async(fut) => { - state - .metrics - .borrow_mut() - .op_dispatched_async(bytes_sent_control, bytes_sent_zero_copy); - let state = state.clone(); - let result_fut = fut.map(move |buf: Buf| { - state - .metrics - .borrow_mut() - .op_completed_async(buf.len() as u64); - buf - }); - Op::Async(result_fut.boxed_local()) - } - Op::AsyncUnref(fut) => { - state.metrics.borrow_mut().op_dispatched_async_unref( - bytes_sent_control, - bytes_sent_zero_copy, - ); - let state = state.clone(); - let result_fut = fut.map(move |buf: Buf| { - state - .metrics - .borrow_mut() - .op_completed_async_unref(buf.len() as u64); - buf - }); - Op::AsyncUnref(result_fut.boxed_local()) - } - } - } - } - - pub fn stateful_minimal_op2<D>( - self: &Rc<Self>, - dispatcher: D, - ) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op - where - D: Fn( - &mut deno_core::CoreIsolateState, - &Rc<State>, - bool, - i32, - &mut [ZeroCopyBuf], - ) -> MinimalOp, - { - let state = self.clone(); - self.core_op(crate::ops::minimal_op( - move |isolate_state: &mut deno_core::CoreIsolateState, - is_sync: bool, - rid: i32, - zero_copy: &mut [ZeroCopyBuf]| - -> MinimalOp { - dispatcher(isolate_state, &state, is_sync, rid, zero_copy) - }, - )) - } - - /// This is a special function that provides `state` argument to dispatcher. - /// - /// NOTE: This only works with JSON dispatcher. - /// This is a band-aid for transition to `CoreIsolate.register_op` API as most of our - /// ops require `state` argument. - pub fn stateful_op<D>( - self: &Rc<Self>, - dispatcher: D, - ) -> impl Fn( - &mut deno_core::CoreIsolateState, - Value, - &mut [ZeroCopyBuf], - ) -> Result<JsonOp, ErrBox> - where - D: Fn(&Rc<State>, Value, &mut [ZeroCopyBuf]) -> Result<JsonOp, ErrBox>, - { - let state = self.clone(); - move |_isolate_state: &mut deno_core::CoreIsolateState, - args: Value, - zero_copy: &mut [ZeroCopyBuf]| - -> Result<JsonOp, ErrBox> { dispatcher(&state, args, zero_copy) } - } - - pub fn stateful_op2<D>( - self: &Rc<Self>, - dispatcher: D, - ) -> impl Fn( - &mut deno_core::CoreIsolateState, - Value, - &mut [ZeroCopyBuf], - ) -> Result<JsonOp, ErrBox> - where - D: Fn( - &mut deno_core::CoreIsolateState, - &Rc<State>, - Value, - &mut [ZeroCopyBuf], - ) -> Result<JsonOp, ErrBox>, - { - let state = self.clone(); - move |isolate_state: &mut deno_core::CoreIsolateState, - args: Value, - zero_copy: &mut [ZeroCopyBuf]| - -> Result<JsonOp, ErrBox> { - dispatcher(isolate_state, &state, args, zero_copy) - } - } - /// Quits the process if the --unstable flag was not provided. /// /// This is intentionally a non-recoverable check so that people cannot probe @@ -458,6 +202,8 @@ impl State { is_main: true, is_internal, http_client: create_http_client(fl.ca_file.as_deref())?.into(), + resource_table: Default::default(), + op_table: Default::default(), }; Ok(Rc::new(state)) } @@ -486,6 +232,8 @@ impl State { is_main: false, is_internal: false, http_client: create_http_client(fl.ca_file.as_deref())?.into(), + resource_table: Default::default(), + op_table: Default::default(), }; Ok(Rc::new(state)) } @@ -570,7 +318,7 @@ impl State { } #[cfg(test)] - pub fn mock(main_module: &str) -> Rc<State> { + pub fn mock(main_module: &str) -> Rc<Self> { let module_specifier = ModuleSpecifier::resolve_url_or_path(main_module) .expect("Invalid entry module"); State::new( @@ -583,3 +331,78 @@ impl State { .unwrap() } } + +impl OpRouter for State { + fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op { + // TODOs: + // * The 'bytes' metrics seem pretty useless, especially now that the + // distinction between 'control' and 'data' buffers has become blurry. + // * Tracking completion of async ops currently makes us put the boxed + // future into _another_ box. Keeping some counters may not be expensive + // in itself, but adding a heap allocation for every metric seems bad. + let mut buf_len_iter = bufs.iter().map(|buf| buf.len()); + let bytes_sent_control = buf_len_iter.next().unwrap_or(0); + let bytes_sent_data = buf_len_iter.sum(); + + let op_fn = self + .op_table + .borrow() + .get_index(op_id) + .map(|(_, op_fn)| op_fn.clone()) + .unwrap(); + + let self_ = self.clone(); + let op = (op_fn)(self_, bufs); + + let self_ = self.clone(); + let mut metrics = self_.metrics.borrow_mut(); + match op { + Op::Sync(buf) => { + metrics.op_sync(bytes_sent_control, bytes_sent_data, buf.len()); + Op::Sync(buf) + } + Op::Async(fut) => { + metrics.op_dispatched_async(bytes_sent_control, bytes_sent_data); + let fut = fut + .inspect(move |buf| { + self.metrics.borrow_mut().op_completed_async(buf.len()); + }) + .boxed_local(); + Op::Async(fut) + } + Op::AsyncUnref(fut) => { + metrics.op_dispatched_async_unref(bytes_sent_control, bytes_sent_data); + let fut = fut + .inspect(move |buf| { + self + .metrics + .borrow_mut() + .op_completed_async_unref(buf.len()); + }) + .boxed_local(); + Op::AsyncUnref(fut) + } + other => other, + } + } +} + +impl OpRegistry for State { + fn get_op_catalog(self: Rc<Self>) -> HashMap<String, OpId> { + self.op_table.borrow().get_op_catalog() + } + + fn register_op<F>(&self, name: &str, op_fn: F) -> OpId + where + F: Fn(Rc<Self>, BufVec) -> Op + 'static, + { + let mut op_table = self.op_table.borrow_mut(); + let (op_id, prev) = op_table.insert_full(name.to_owned(), Rc::new(op_fn)); + assert!(prev.is_none()); + op_id + } + + fn get_error_class_name(&self, err: &ErrBox) -> &'static str { + get_error_class_name(err) + } +} |