diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/01_core.js | 12 | ||||
-rw-r--r-- | core/bindings.rs | 7 | ||||
-rw-r--r-- | core/lib.rs | 1 | ||||
-rw-r--r-- | core/modules.rs | 2 | ||||
-rw-r--r-- | core/ops.rs | 9 | ||||
-rw-r--r-- | core/ops_builtin.rs | 12 | ||||
-rw-r--r-- | core/ops_json.rs | 9 | ||||
-rw-r--r-- | core/ops_metrics.rs | 96 | ||||
-rw-r--r-- | core/runtime.rs | 15 |
9 files changed, 152 insertions, 11 deletions
diff --git a/core/01_core.js b/core/01_core.js index e1b0529f7..24b844453 100644 --- a/core/01_core.js +++ b/core/01_core.js @@ -12,8 +12,10 @@ Map, Array, ArrayPrototypeFill, + ArrayPrototypeMap, ErrorCaptureStackTrace, Promise, + ObjectEntries, ObjectFreeze, ObjectFromEntries, MapPrototypeGet, @@ -152,6 +154,15 @@ opSync("op_print", str, isErr); } + function metrics() { + const [aggregate, perOps] = opSync("op_metrics"); + aggregate.ops = ObjectFromEntries(ArrayPrototypeMap( + ObjectEntries(opsCache), + ([opName, opId]) => [opName, perOps[opId]], + )); + return aggregate; + } + // Some "extensions" rely on "BadResource" and "Interrupted" errors in the // JS code (eg. "deno_net") so they are provided in "Deno.core" but later // reexported on "Deno.errors" @@ -178,6 +189,7 @@ tryClose, print, resources, + metrics, registerErrorBuilder, registerErrorClass, opresolve, diff --git a/core/bindings.rs b/core/bindings.rs index cf4fd07f6..2fc6b5092 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -315,7 +315,7 @@ fn opcall_sync<'s>( mut rv: v8::ReturnValue, ) { let state_rc = JsRuntime::state(scope); - let state = state_rc.borrow(); + let state = state_rc.borrow_mut(); let op_id = match v8::Local::<v8::Integer>::try_from(args.get(0)) .map(|l| l.value() as OpId) @@ -344,11 +344,13 @@ fn opcall_sync<'s>( scope, a, b, + op_id, promise_id: 0, }; let op = OpTable::route_op(op_id, state.op_state.clone(), payload); match op { Op::Sync(result) => { + state.op_state.borrow_mut().tracker.track_sync(op_id); rv.set(result.to_v8(scope).unwrap()); } Op::NotFound => { @@ -405,6 +407,7 @@ fn opcall_async<'s>( scope, a, b, + op_id, promise_id, }; let op = OpTable::route_op(op_id, state.op_state.clone(), payload); @@ -417,10 +420,12 @@ fn opcall_async<'s>( OpResult::Err(_) => rv.set(result.to_v8(scope).unwrap()), }, Op::Async(fut) => { + state.op_state.borrow_mut().tracker.track_async(op_id); state.pending_ops.push(fut); state.have_unpolled_ops = true; } Op::AsyncUnref(fut) => { + state.op_state.borrow_mut().tracker.track_unref(op_id); state.pending_unref_ops.push(fut); state.have_unpolled_ops = true; } diff --git a/core/lib.rs b/core/lib.rs index 22509c462..ea7b322e2 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -13,6 +13,7 @@ mod normalize_path; mod ops; mod ops_builtin; mod ops_json; +mod ops_metrics; mod resources; mod runtime; diff --git a/core/modules.rs b/core/modules.rs index 18399b9b4..2af09057f 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -1008,7 +1008,7 @@ mod tests { dispatch_count_.fetch_add(1, Ordering::Relaxed); let (control, _): (u8, ()) = payload.deserialize().unwrap(); assert_eq!(control, 42); - let resp = (0, serialize_op_result(Ok(43), state)); + let resp = (0, 1, serialize_op_result(Ok(43), state)); Op::Async(Box::pin(futures::future::ready(resp))) }; diff --git a/core/ops.rs b/core/ops.rs index 0e85bcf47..80bb30eda 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -3,6 +3,7 @@ use crate::error::type_error; use crate::error::AnyError; use crate::gotham_state::GothamState; +use crate::ops_metrics::OpsTracker; use crate::resources::ResourceTable; use crate::runtime::GetErrorClassFn; use futures::Future; @@ -18,7 +19,8 @@ use std::pin::Pin; use std::rc::Rc; pub type PromiseId = u64; -pub type OpAsyncFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResult)>>>; +pub type OpAsyncFuture = + Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>; pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static; pub type OpId = usize; @@ -26,6 +28,7 @@ pub struct OpPayload<'a, 'b, 'c> { pub(crate) scope: &'a mut v8::HandleScope<'b>, pub(crate) a: v8::Local<'c, v8::Value>, pub(crate) b: v8::Local<'c, v8::Value>, + pub(crate) op_id: OpId, pub(crate) promise_id: PromiseId, } @@ -96,6 +99,7 @@ pub struct OpState { pub resource_table: ResourceTable, pub op_table: OpTable, pub get_error_class_fn: GetErrorClassFn, + pub(crate) tracker: OpsTracker, gotham_state: GothamState, } @@ -105,6 +109,9 @@ impl OpState { resource_table: Default::default(), op_table: OpTable::default(), get_error_class_fn: &|_| "Error", + tracker: OpsTracker { + ops: Vec::with_capacity(256), + }, gotham_state: Default::default(), } } diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs index 83f2e504a..c830f8eff 100644 --- a/core/ops_builtin.rs +++ b/core/ops_builtin.rs @@ -2,6 +2,7 @@ use crate::error::type_error; use crate::error::AnyError; use crate::include_js_files; use crate::op_sync; +use crate::ops_metrics::OpMetrics; use crate::resources::ResourceId; use crate::void_op_async; use crate::void_op_sync; @@ -32,6 +33,7 @@ pub(crate) fn init_builtins() -> Extension { "op_wasm_streaming_set_url", op_sync(op_wasm_streaming_set_url), ), + ("op_metrics", op_sync(op_metrics)), ("op_void_sync", void_op_sync()), ("op_void_async", void_op_async()), ]) @@ -158,3 +160,13 @@ pub fn op_wasm_streaming_set_url( Ok(()) } + +pub fn op_metrics( + state: &mut OpState, + _: (), + _: (), +) -> Result<(OpMetrics, Vec<OpMetrics>), AnyError> { + let aggregate = state.tracker.aggregate(); + let per_op = state.tracker.per_op(); + Ok((aggregate, per_op)) +} diff --git a/core/ops_json.rs b/core/ops_json.rs index 22a84154d..0ca7e5ce4 100644 --- a/core/ops_json.rs +++ b/core/ops_json.rs @@ -32,9 +32,10 @@ pub fn void_op_async() -> Box<OpFn> { // to deserialize to the unit type instead of failing with `ExpectedNull` // op_async(|_, _: (), _: ()| futures::future::ok(())) Box::new(move |state, payload| -> Op { + let op_id = payload.op_id; let pid = payload.promise_id; let op_result = serialize_op_result(Ok(()), state); - Op::Async(Box::pin(futures::future::ready((pid, op_result)))) + Op::Async(Box::pin(futures::future::ready((pid, op_id, op_result)))) }) } @@ -112,6 +113,7 @@ where RV: Serialize + 'static, { Box::new(move |state, payload| -> Op { + let op_id = payload.op_id; let pid = payload.promise_id; // Deserialize args, sync error on failure let args = match payload.deserialize() { @@ -124,7 +126,7 @@ where use crate::futures::FutureExt; let fut = op_fn(state.clone(), a, b) - .map(move |result| (pid, serialize_op_result(result, state))); + .map(move |result| (pid, op_id, serialize_op_result(result, state))); Op::Async(Box::pin(fut)) }) } @@ -143,6 +145,7 @@ where RV: Serialize + 'static, { Box::new(move |state, payload| -> Op { + let op_id = payload.op_id; let pid = payload.promise_id; // Deserialize args, sync error on failure let args = match payload.deserialize() { @@ -155,7 +158,7 @@ where use crate::futures::FutureExt; let fut = op_fn(state.clone(), a, b) - .map(move |result| (pid, serialize_op_result(result, state))); + .map(move |result| (pid, op_id, serialize_op_result(result, state))); Op::AsyncUnref(Box::pin(fut)) }) } diff --git a/core/ops_metrics.rs b/core/ops_metrics.rs new file mode 100644 index 000000000..9dd5e2edf --- /dev/null +++ b/core/ops_metrics.rs @@ -0,0 +1,96 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +use crate::serde::Serialize; +use crate::OpId; + +// TODO(@AaronO): split into AggregateMetrics & PerOpMetrics +#[derive(Clone, Default, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct OpMetrics { + pub ops_dispatched: u64, + pub ops_dispatched_sync: u64, + pub ops_dispatched_async: u64, + pub ops_dispatched_async_unref: u64, + pub ops_completed: u64, + pub ops_completed_sync: u64, + pub ops_completed_async: u64, + pub ops_completed_async_unref: u64, + pub bytes_sent_control: u64, + pub bytes_sent_data: u64, + pub bytes_received: u64, +} + +// TODO(@AaronO): track errors +#[derive(Default, Debug)] +pub struct OpsTracker { + pub ops: Vec<OpMetrics>, +} + +impl OpsTracker { + pub fn per_op(&self) -> Vec<OpMetrics> { + self.ops.clone() + } + + pub fn aggregate(&self) -> OpMetrics { + let mut sum = OpMetrics::default(); + + for metrics in self.ops.iter() { + sum.ops_dispatched += metrics.ops_dispatched; + sum.ops_dispatched_sync += metrics.ops_dispatched_sync; + sum.ops_dispatched_async += metrics.ops_dispatched_async; + sum.ops_dispatched_async_unref += metrics.ops_dispatched_async_unref; + sum.ops_completed += metrics.ops_completed; + sum.ops_completed_sync += metrics.ops_completed_sync; + sum.ops_completed_async += metrics.ops_completed_async; + sum.ops_completed_async_unref += metrics.ops_completed_async_unref; + sum.bytes_sent_control += metrics.bytes_sent_control; + sum.bytes_sent_data += metrics.bytes_sent_data; + sum.bytes_received += metrics.bytes_received; + } + + sum + } + + fn ensure_capacity(&mut self, op_id: OpId) { + if op_id >= self.ops.len() { + let delta_len = 1 + op_id - self.ops.len(); + self.ops.extend(vec![OpMetrics::default(); delta_len]) + } + } + + fn metrics_mut(&mut self, id: OpId) -> &mut OpMetrics { + self.ensure_capacity(id); + self.ops.get_mut(id).unwrap() + } + + pub fn track_sync(&mut self, id: OpId) { + let metrics = self.metrics_mut(id); + metrics.ops_dispatched += 1; + metrics.ops_completed += 1; + metrics.ops_dispatched_sync += 1; + metrics.ops_completed_sync += 1; + } + + pub fn track_async(&mut self, id: OpId) { + let metrics = self.metrics_mut(id); + metrics.ops_dispatched += 1; + metrics.ops_dispatched_async += 1; + } + + pub fn track_async_completed(&mut self, id: OpId) { + let metrics = self.metrics_mut(id); + metrics.ops_completed += 1; + metrics.ops_completed_async += 1; + } + + pub fn track_unref(&mut self, id: OpId) { + let metrics = self.metrics_mut(id); + metrics.ops_dispatched += 1; + metrics.ops_dispatched_async_unref += 1; + } + + pub fn track_unref_completed(&mut self, id: OpId) { + let metrics = self.metrics_mut(id); + metrics.ops_completed += 1; + metrics.ops_completed_async_unref += 1; + } +} diff --git a/core/runtime.rs b/core/runtime.rs index 1150746f3..1928ff31c 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -44,7 +44,8 @@ use std::sync::Once; use std::task::Context; use std::task::Poll; -type PendingOpFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResult)>>>; +type PendingOpFuture = + Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>; pub enum Snapshot { Static(&'static [u8]), @@ -1477,7 +1478,9 @@ impl JsRuntime { match pending_r { Poll::Ready(None) => break, Poll::Pending => break, - Poll::Ready(Some((promise_id, resp))) => { + Poll::Ready(Some((promise_id, op_id, resp))) => { + let tracker = &mut state.op_state.borrow_mut().tracker; + tracker.track_async_completed(op_id); async_responses.push((promise_id, resp)); } }; @@ -1488,7 +1491,9 @@ impl JsRuntime { match unref_r { Poll::Ready(None) => break, Poll::Pending => break, - Poll::Ready(Some((promise_id, resp))) => { + Poll::Ready(Some((promise_id, op_id, resp))) => { + let tracker = &mut state.op_state.borrow_mut().tracker; + tracker.track_unref_completed(op_id); async_responses.push((promise_id, resp)); } }; @@ -1639,7 +1644,7 @@ pub mod tests { match test_state.mode { Mode::Async => { assert_eq!(control, 42); - let resp = (0, serialize_op_result(Ok(43), rc_op_state)); + let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state)); Op::Async(Box::pin(futures::future::ready(resp))) } Mode::AsyncZeroCopy(has_buffer) => { @@ -1649,7 +1654,7 @@ pub mod tests { } let resp = serialize_op_result(Ok(43), rc_op_state); - Op::Async(Box::pin(futures::future::ready((0, resp)))) + Op::Async(Box::pin(futures::future::ready((0, 1, resp)))) } } } |