summaryrefslogtreecommitdiff
path: root/cli/state.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/state.rs')
-rw-r--r--cli/state.rs353
1 files changed, 88 insertions, 265 deletions
diff --git a/cli/state.rs b/cli/state.rs
index 9f225e522..c85701bac 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -1,31 +1,30 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::errors::get_error_class_name;
use crate::file_fetcher::SourceFileFetcher;
use crate::global_state::GlobalState;
use crate::global_timer::GlobalTimer;
use crate::http_util::create_http_client;
use crate::import_map::ImportMap;
use crate::metrics::Metrics;
-use crate::ops::serialize_result;
-use crate::ops::JsonOp;
-use crate::ops::MinimalOp;
use crate::permissions::Permissions;
use crate::tsc::TargetLib;
use crate::web_worker::WebWorkerHandle;
-use deno_core::Buf;
use deno_core::BufVec;
-use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ModuleLoadId;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::Op;
+use deno_core::OpId;
+use deno_core::OpRegistry;
+use deno_core::OpRouter;
+use deno_core::OpTable;
use deno_core::ResourceTable;
-use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
use futures::Future;
use rand::rngs::StdRng;
use rand::SeedableRng;
-use serde_json::Value;
use std::cell::Cell;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -55,266 +54,11 @@ pub struct State {
pub is_main: bool,
pub is_internal: bool,
pub http_client: RefCell<reqwest::Client>,
+ pub resource_table: RefCell<ResourceTable>,
+ pub op_table: RefCell<OpTable<Self>>,
}
impl State {
- pub fn stateful_json_op_sync<D>(
- self: &Rc<Self>,
- resource_table: &Rc<RefCell<ResourceTable>>,
- dispatcher: D,
- ) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op
- where
- D: Fn(
- &State,
- &mut ResourceTable,
- Value,
- &mut [ZeroCopyBuf],
- ) -> Result<Value, ErrBox>,
- {
- let state = self.clone();
- let resource_table = resource_table.clone();
-
- let f = move |isolate_state: &mut CoreIsolateState,
- bufs: &mut [ZeroCopyBuf]| {
- let get_error_class_fn = isolate_state.get_error_class_fn;
-
- // The first buffer should contain JSON encoded op arguments; parse them.
- let args: Value = match serde_json::from_slice(&bufs[0]) {
- Ok(v) => v,
- Err(e) => {
- return Op::Sync(serialize_result(
- None,
- Err(e.into()),
- get_error_class_fn,
- ));
- }
- };
-
- // Make a slice containing all buffers except for the first one.
- let zero_copy = &mut bufs[1..];
-
- let result =
- dispatcher(&state, &mut *resource_table.borrow_mut(), args, zero_copy);
-
- // Convert to Op.
- Op::Sync(serialize_result(None, result, get_error_class_fn))
- };
- self.core_op(f)
- }
-
- pub fn stateful_json_op_async<D, F>(
- self: &Rc<Self>,
- resource_table: &Rc<RefCell<ResourceTable>>,
- dispatcher: D,
- ) -> impl Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op
- where
- D:
- FnOnce(Rc<State>, Rc<RefCell<ResourceTable>>, Value, BufVec) -> F + Clone,
- F: Future<Output = Result<Value, ErrBox>> + 'static,
- {
- let state = self.clone();
- let resource_table = resource_table.clone();
-
- let f = move |isolate_state: &mut CoreIsolateState,
- bufs: &mut [ZeroCopyBuf]| {
- let get_error_class_fn = isolate_state.get_error_class_fn;
-
- // The first buffer should contain JSON encoded op arguments; parse them.
- let args: Value = match serde_json::from_slice(&bufs[0]) {
- Ok(v) => v,
- Err(e) => {
- let e = e.into();
- return Op::Sync(serialize_result(None, Err(e), get_error_class_fn));
- }
- };
-
- // `args` should have a `promiseId` property with positive integer value.
- let promise_id = match args.get("promiseId").and_then(|v| v.as_u64()) {
- Some(i) => i,
- None => {
- let e = ErrBox::new("TypeError", "`promiseId` invalid/missing");
- return Op::Sync(serialize_result(None, Err(e), get_error_class_fn));
- }
- };
-
- // Take ownership of all buffers after the first one.
- let zero_copy: BufVec = bufs[1..].into();
-
- // Call dispatcher to obtain op future.
- let fut = (dispatcher.clone())(
- state.clone(),
- resource_table.clone(),
- args,
- zero_copy,
- );
-
- // Convert to Op.
- Op::Async(
- async move {
- serialize_result(Some(promise_id), fut.await, get_error_class_fn)
- }
- .boxed_local(),
- )
- };
- self.core_op(f)
- }
-
- // TODO(bartlomieju): remove me - still used by `op_open_plugin` which
- // needs access to isolate_state
- pub fn stateful_json_op2<D>(
- self: &Rc<Self>,
- dispatcher: D,
- ) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op
- where
- D: Fn(
- &mut deno_core::CoreIsolateState,
- &Rc<State>,
- Value,
- &mut [ZeroCopyBuf],
- ) -> Result<JsonOp, ErrBox>,
- {
- use crate::ops::json_op;
- self.core_op(json_op(self.stateful_op2(dispatcher)))
- }
-
- /// Wrap core `OpDispatcher` to collect metrics.
- // TODO(ry) this should be private. Is called by stateful_json_op or
- // stateful_minimal_op
- pub(crate) fn core_op<D>(
- self: &Rc<Self>,
- dispatcher: D,
- ) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op
- where
- D: Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op,
- {
- let state = self.clone();
-
- move |isolate_state: &mut deno_core::CoreIsolateState,
- zero_copy: &mut [ZeroCopyBuf]|
- -> Op {
- let bytes_sent_control =
- zero_copy.get(0).map(|s| s.len()).unwrap_or(0) as u64;
- let bytes_sent_zero_copy =
- zero_copy[1..].iter().map(|b| b.len()).sum::<usize>() as u64;
-
- let op = dispatcher(isolate_state, zero_copy);
-
- match op {
- Op::Sync(buf) => {
- state.metrics.borrow_mut().op_sync(
- bytes_sent_control,
- bytes_sent_zero_copy,
- buf.len() as u64,
- );
- Op::Sync(buf)
- }
- Op::Async(fut) => {
- state
- .metrics
- .borrow_mut()
- .op_dispatched_async(bytes_sent_control, bytes_sent_zero_copy);
- let state = state.clone();
- let result_fut = fut.map(move |buf: Buf| {
- state
- .metrics
- .borrow_mut()
- .op_completed_async(buf.len() as u64);
- buf
- });
- Op::Async(result_fut.boxed_local())
- }
- Op::AsyncUnref(fut) => {
- state.metrics.borrow_mut().op_dispatched_async_unref(
- bytes_sent_control,
- bytes_sent_zero_copy,
- );
- let state = state.clone();
- let result_fut = fut.map(move |buf: Buf| {
- state
- .metrics
- .borrow_mut()
- .op_completed_async_unref(buf.len() as u64);
- buf
- });
- Op::AsyncUnref(result_fut.boxed_local())
- }
- }
- }
- }
-
- pub fn stateful_minimal_op2<D>(
- self: &Rc<Self>,
- dispatcher: D,
- ) -> impl Fn(&mut deno_core::CoreIsolateState, &mut [ZeroCopyBuf]) -> Op
- where
- D: Fn(
- &mut deno_core::CoreIsolateState,
- &Rc<State>,
- bool,
- i32,
- &mut [ZeroCopyBuf],
- ) -> MinimalOp,
- {
- let state = self.clone();
- self.core_op(crate::ops::minimal_op(
- move |isolate_state: &mut deno_core::CoreIsolateState,
- is_sync: bool,
- rid: i32,
- zero_copy: &mut [ZeroCopyBuf]|
- -> MinimalOp {
- dispatcher(isolate_state, &state, is_sync, rid, zero_copy)
- },
- ))
- }
-
- /// This is a special function that provides `state` argument to dispatcher.
- ///
- /// NOTE: This only works with JSON dispatcher.
- /// This is a band-aid for transition to `CoreIsolate.register_op` API as most of our
- /// ops require `state` argument.
- pub fn stateful_op<D>(
- self: &Rc<Self>,
- dispatcher: D,
- ) -> impl Fn(
- &mut deno_core::CoreIsolateState,
- Value,
- &mut [ZeroCopyBuf],
- ) -> Result<JsonOp, ErrBox>
- where
- D: Fn(&Rc<State>, Value, &mut [ZeroCopyBuf]) -> Result<JsonOp, ErrBox>,
- {
- let state = self.clone();
- move |_isolate_state: &mut deno_core::CoreIsolateState,
- args: Value,
- zero_copy: &mut [ZeroCopyBuf]|
- -> Result<JsonOp, ErrBox> { dispatcher(&state, args, zero_copy) }
- }
-
- pub fn stateful_op2<D>(
- self: &Rc<Self>,
- dispatcher: D,
- ) -> impl Fn(
- &mut deno_core::CoreIsolateState,
- Value,
- &mut [ZeroCopyBuf],
- ) -> Result<JsonOp, ErrBox>
- where
- D: Fn(
- &mut deno_core::CoreIsolateState,
- &Rc<State>,
- Value,
- &mut [ZeroCopyBuf],
- ) -> Result<JsonOp, ErrBox>,
- {
- let state = self.clone();
- move |isolate_state: &mut deno_core::CoreIsolateState,
- args: Value,
- zero_copy: &mut [ZeroCopyBuf]|
- -> Result<JsonOp, ErrBox> {
- dispatcher(isolate_state, &state, args, zero_copy)
- }
- }
-
/// Quits the process if the --unstable flag was not provided.
///
/// This is intentionally a non-recoverable check so that people cannot probe
@@ -458,6 +202,8 @@ impl State {
is_main: true,
is_internal,
http_client: create_http_client(fl.ca_file.as_deref())?.into(),
+ resource_table: Default::default(),
+ op_table: Default::default(),
};
Ok(Rc::new(state))
}
@@ -486,6 +232,8 @@ impl State {
is_main: false,
is_internal: false,
http_client: create_http_client(fl.ca_file.as_deref())?.into(),
+ resource_table: Default::default(),
+ op_table: Default::default(),
};
Ok(Rc::new(state))
}
@@ -570,7 +318,7 @@ impl State {
}
#[cfg(test)]
- pub fn mock(main_module: &str) -> Rc<State> {
+ pub fn mock(main_module: &str) -> Rc<Self> {
let module_specifier = ModuleSpecifier::resolve_url_or_path(main_module)
.expect("Invalid entry module");
State::new(
@@ -583,3 +331,78 @@ impl State {
.unwrap()
}
}
+
+impl OpRouter for State {
+ fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op {
+ // TODOs:
+ // * The 'bytes' metrics seem pretty useless, especially now that the
+ // distinction between 'control' and 'data' buffers has become blurry.
+ // * Tracking completion of async ops currently makes us put the boxed
+ // future into _another_ box. Keeping some counters may not be expensive
+ // in itself, but adding a heap allocation for every metric seems bad.
+ let mut buf_len_iter = bufs.iter().map(|buf| buf.len());
+ let bytes_sent_control = buf_len_iter.next().unwrap_or(0);
+ let bytes_sent_data = buf_len_iter.sum();
+
+ let op_fn = self
+ .op_table
+ .borrow()
+ .get_index(op_id)
+ .map(|(_, op_fn)| op_fn.clone())
+ .unwrap();
+
+ let self_ = self.clone();
+ let op = (op_fn)(self_, bufs);
+
+ let self_ = self.clone();
+ let mut metrics = self_.metrics.borrow_mut();
+ match op {
+ Op::Sync(buf) => {
+ metrics.op_sync(bytes_sent_control, bytes_sent_data, buf.len());
+ Op::Sync(buf)
+ }
+ Op::Async(fut) => {
+ metrics.op_dispatched_async(bytes_sent_control, bytes_sent_data);
+ let fut = fut
+ .inspect(move |buf| {
+ self.metrics.borrow_mut().op_completed_async(buf.len());
+ })
+ .boxed_local();
+ Op::Async(fut)
+ }
+ Op::AsyncUnref(fut) => {
+ metrics.op_dispatched_async_unref(bytes_sent_control, bytes_sent_data);
+ let fut = fut
+ .inspect(move |buf| {
+ self
+ .metrics
+ .borrow_mut()
+ .op_completed_async_unref(buf.len());
+ })
+ .boxed_local();
+ Op::AsyncUnref(fut)
+ }
+ other => other,
+ }
+ }
+}
+
+impl OpRegistry for State {
+ fn get_op_catalog(self: Rc<Self>) -> HashMap<String, OpId> {
+ self.op_table.borrow().get_op_catalog()
+ }
+
+ fn register_op<F>(&self, name: &str, op_fn: F) -> OpId
+ where
+ F: Fn(Rc<Self>, BufVec) -> Op + 'static,
+ {
+ let mut op_table = self.op_table.borrow_mut();
+ let (op_id, prev) = op_table.insert_full(name.to_owned(), Rc::new(op_fn));
+ assert!(prev.is_none());
+ op_id
+ }
+
+ fn get_error_class_name(&self, err: &ErrBox) -> &'static str {
+ get_error_class_name(err)
+ }
+}