summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/01_core.js12
-rw-r--r--core/bindings.rs7
-rw-r--r--core/lib.rs1
-rw-r--r--core/modules.rs2
-rw-r--r--core/ops.rs9
-rw-r--r--core/ops_builtin.rs12
-rw-r--r--core/ops_json.rs9
-rw-r--r--core/ops_metrics.rs96
-rw-r--r--core/runtime.rs15
9 files changed, 152 insertions, 11 deletions
diff --git a/core/01_core.js b/core/01_core.js
index e1b0529f7..24b844453 100644
--- a/core/01_core.js
+++ b/core/01_core.js
@@ -12,8 +12,10 @@
Map,
Array,
ArrayPrototypeFill,
+ ArrayPrototypeMap,
ErrorCaptureStackTrace,
Promise,
+ ObjectEntries,
ObjectFreeze,
ObjectFromEntries,
MapPrototypeGet,
@@ -152,6 +154,15 @@
opSync("op_print", str, isErr);
}
+ function metrics() {
+ const [aggregate, perOps] = opSync("op_metrics");
+ aggregate.ops = ObjectFromEntries(ArrayPrototypeMap(
+ ObjectEntries(opsCache),
+ ([opName, opId]) => [opName, perOps[opId]],
+ ));
+ return aggregate;
+ }
+
// Some "extensions" rely on "BadResource" and "Interrupted" errors in the
// JS code (eg. "deno_net") so they are provided in "Deno.core" but later
// reexported on "Deno.errors"
@@ -178,6 +189,7 @@
tryClose,
print,
resources,
+ metrics,
registerErrorBuilder,
registerErrorClass,
opresolve,
diff --git a/core/bindings.rs b/core/bindings.rs
index cf4fd07f6..2fc6b5092 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -315,7 +315,7 @@ fn opcall_sync<'s>(
mut rv: v8::ReturnValue,
) {
let state_rc = JsRuntime::state(scope);
- let state = state_rc.borrow();
+ let state = state_rc.borrow_mut();
let op_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
.map(|l| l.value() as OpId)
@@ -344,11 +344,13 @@ fn opcall_sync<'s>(
scope,
a,
b,
+ op_id,
promise_id: 0,
};
let op = OpTable::route_op(op_id, state.op_state.clone(), payload);
match op {
Op::Sync(result) => {
+ state.op_state.borrow_mut().tracker.track_sync(op_id);
rv.set(result.to_v8(scope).unwrap());
}
Op::NotFound => {
@@ -405,6 +407,7 @@ fn opcall_async<'s>(
scope,
a,
b,
+ op_id,
promise_id,
};
let op = OpTable::route_op(op_id, state.op_state.clone(), payload);
@@ -417,10 +420,12 @@ fn opcall_async<'s>(
OpResult::Err(_) => rv.set(result.to_v8(scope).unwrap()),
},
Op::Async(fut) => {
+ state.op_state.borrow_mut().tracker.track_async(op_id);
state.pending_ops.push(fut);
state.have_unpolled_ops = true;
}
Op::AsyncUnref(fut) => {
+ state.op_state.borrow_mut().tracker.track_unref(op_id);
state.pending_unref_ops.push(fut);
state.have_unpolled_ops = true;
}
diff --git a/core/lib.rs b/core/lib.rs
index 22509c462..ea7b322e2 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -13,6 +13,7 @@ mod normalize_path;
mod ops;
mod ops_builtin;
mod ops_json;
+mod ops_metrics;
mod resources;
mod runtime;
diff --git a/core/modules.rs b/core/modules.rs
index 18399b9b4..2af09057f 100644
--- a/core/modules.rs
+++ b/core/modules.rs
@@ -1008,7 +1008,7 @@ mod tests {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
let (control, _): (u8, ()) = payload.deserialize().unwrap();
assert_eq!(control, 42);
- let resp = (0, serialize_op_result(Ok(43), state));
+ let resp = (0, 1, serialize_op_result(Ok(43), state));
Op::Async(Box::pin(futures::future::ready(resp)))
};
diff --git a/core/ops.rs b/core/ops.rs
index 0e85bcf47..80bb30eda 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -3,6 +3,7 @@
use crate::error::type_error;
use crate::error::AnyError;
use crate::gotham_state::GothamState;
+use crate::ops_metrics::OpsTracker;
use crate::resources::ResourceTable;
use crate::runtime::GetErrorClassFn;
use futures::Future;
@@ -18,7 +19,8 @@ use std::pin::Pin;
use std::rc::Rc;
pub type PromiseId = u64;
-pub type OpAsyncFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResult)>>>;
+pub type OpAsyncFuture =
+ Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>;
pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static;
pub type OpId = usize;
@@ -26,6 +28,7 @@ pub struct OpPayload<'a, 'b, 'c> {
pub(crate) scope: &'a mut v8::HandleScope<'b>,
pub(crate) a: v8::Local<'c, v8::Value>,
pub(crate) b: v8::Local<'c, v8::Value>,
+ pub(crate) op_id: OpId,
pub(crate) promise_id: PromiseId,
}
@@ -96,6 +99,7 @@ pub struct OpState {
pub resource_table: ResourceTable,
pub op_table: OpTable,
pub get_error_class_fn: GetErrorClassFn,
+ pub(crate) tracker: OpsTracker,
gotham_state: GothamState,
}
@@ -105,6 +109,9 @@ impl OpState {
resource_table: Default::default(),
op_table: OpTable::default(),
get_error_class_fn: &|_| "Error",
+ tracker: OpsTracker {
+ ops: Vec::with_capacity(256),
+ },
gotham_state: Default::default(),
}
}
diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs
index 83f2e504a..c830f8eff 100644
--- a/core/ops_builtin.rs
+++ b/core/ops_builtin.rs
@@ -2,6 +2,7 @@ use crate::error::type_error;
use crate::error::AnyError;
use crate::include_js_files;
use crate::op_sync;
+use crate::ops_metrics::OpMetrics;
use crate::resources::ResourceId;
use crate::void_op_async;
use crate::void_op_sync;
@@ -32,6 +33,7 @@ pub(crate) fn init_builtins() -> Extension {
"op_wasm_streaming_set_url",
op_sync(op_wasm_streaming_set_url),
),
+ ("op_metrics", op_sync(op_metrics)),
("op_void_sync", void_op_sync()),
("op_void_async", void_op_async()),
])
@@ -158,3 +160,13 @@ pub fn op_wasm_streaming_set_url(
Ok(())
}
+
+pub fn op_metrics(
+ state: &mut OpState,
+ _: (),
+ _: (),
+) -> Result<(OpMetrics, Vec<OpMetrics>), AnyError> {
+ let aggregate = state.tracker.aggregate();
+ let per_op = state.tracker.per_op();
+ Ok((aggregate, per_op))
+}
diff --git a/core/ops_json.rs b/core/ops_json.rs
index 22a84154d..0ca7e5ce4 100644
--- a/core/ops_json.rs
+++ b/core/ops_json.rs
@@ -32,9 +32,10 @@ pub fn void_op_async() -> Box<OpFn> {
// to deserialize to the unit type instead of failing with `ExpectedNull`
// op_async(|_, _: (), _: ()| futures::future::ok(()))
Box::new(move |state, payload| -> Op {
+ let op_id = payload.op_id;
let pid = payload.promise_id;
let op_result = serialize_op_result(Ok(()), state);
- Op::Async(Box::pin(futures::future::ready((pid, op_result))))
+ Op::Async(Box::pin(futures::future::ready((pid, op_id, op_result))))
})
}
@@ -112,6 +113,7 @@ where
RV: Serialize + 'static,
{
Box::new(move |state, payload| -> Op {
+ let op_id = payload.op_id;
let pid = payload.promise_id;
// Deserialize args, sync error on failure
let args = match payload.deserialize() {
@@ -124,7 +126,7 @@ where
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
- .map(move |result| (pid, serialize_op_result(result, state)));
+ .map(move |result| (pid, op_id, serialize_op_result(result, state)));
Op::Async(Box::pin(fut))
})
}
@@ -143,6 +145,7 @@ where
RV: Serialize + 'static,
{
Box::new(move |state, payload| -> Op {
+ let op_id = payload.op_id;
let pid = payload.promise_id;
// Deserialize args, sync error on failure
let args = match payload.deserialize() {
@@ -155,7 +158,7 @@ where
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
- .map(move |result| (pid, serialize_op_result(result, state)));
+ .map(move |result| (pid, op_id, serialize_op_result(result, state)));
Op::AsyncUnref(Box::pin(fut))
})
}
diff --git a/core/ops_metrics.rs b/core/ops_metrics.rs
new file mode 100644
index 000000000..9dd5e2edf
--- /dev/null
+++ b/core/ops_metrics.rs
@@ -0,0 +1,96 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+use crate::serde::Serialize;
+use crate::OpId;
+
+// TODO(@AaronO): split into AggregateMetrics & PerOpMetrics
+#[derive(Clone, Default, Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct OpMetrics {
+ pub ops_dispatched: u64,
+ pub ops_dispatched_sync: u64,
+ pub ops_dispatched_async: u64,
+ pub ops_dispatched_async_unref: u64,
+ pub ops_completed: u64,
+ pub ops_completed_sync: u64,
+ pub ops_completed_async: u64,
+ pub ops_completed_async_unref: u64,
+ pub bytes_sent_control: u64,
+ pub bytes_sent_data: u64,
+ pub bytes_received: u64,
+}
+
+// TODO(@AaronO): track errors
+#[derive(Default, Debug)]
+pub struct OpsTracker {
+ pub ops: Vec<OpMetrics>,
+}
+
+impl OpsTracker {
+ pub fn per_op(&self) -> Vec<OpMetrics> {
+ self.ops.clone()
+ }
+
+ pub fn aggregate(&self) -> OpMetrics {
+ let mut sum = OpMetrics::default();
+
+ for metrics in self.ops.iter() {
+ sum.ops_dispatched += metrics.ops_dispatched;
+ sum.ops_dispatched_sync += metrics.ops_dispatched_sync;
+ sum.ops_dispatched_async += metrics.ops_dispatched_async;
+ sum.ops_dispatched_async_unref += metrics.ops_dispatched_async_unref;
+ sum.ops_completed += metrics.ops_completed;
+ sum.ops_completed_sync += metrics.ops_completed_sync;
+ sum.ops_completed_async += metrics.ops_completed_async;
+ sum.ops_completed_async_unref += metrics.ops_completed_async_unref;
+ sum.bytes_sent_control += metrics.bytes_sent_control;
+ sum.bytes_sent_data += metrics.bytes_sent_data;
+ sum.bytes_received += metrics.bytes_received;
+ }
+
+ sum
+ }
+
+ fn ensure_capacity(&mut self, op_id: OpId) {
+ if op_id >= self.ops.len() {
+ let delta_len = 1 + op_id - self.ops.len();
+ self.ops.extend(vec![OpMetrics::default(); delta_len])
+ }
+ }
+
+ fn metrics_mut(&mut self, id: OpId) -> &mut OpMetrics {
+ self.ensure_capacity(id);
+ self.ops.get_mut(id).unwrap()
+ }
+
+ pub fn track_sync(&mut self, id: OpId) {
+ let metrics = self.metrics_mut(id);
+ metrics.ops_dispatched += 1;
+ metrics.ops_completed += 1;
+ metrics.ops_dispatched_sync += 1;
+ metrics.ops_completed_sync += 1;
+ }
+
+ pub fn track_async(&mut self, id: OpId) {
+ let metrics = self.metrics_mut(id);
+ metrics.ops_dispatched += 1;
+ metrics.ops_dispatched_async += 1;
+ }
+
+ pub fn track_async_completed(&mut self, id: OpId) {
+ let metrics = self.metrics_mut(id);
+ metrics.ops_completed += 1;
+ metrics.ops_completed_async += 1;
+ }
+
+ pub fn track_unref(&mut self, id: OpId) {
+ let metrics = self.metrics_mut(id);
+ metrics.ops_dispatched += 1;
+ metrics.ops_dispatched_async_unref += 1;
+ }
+
+ pub fn track_unref_completed(&mut self, id: OpId) {
+ let metrics = self.metrics_mut(id);
+ metrics.ops_completed += 1;
+ metrics.ops_completed_async_unref += 1;
+ }
+}
diff --git a/core/runtime.rs b/core/runtime.rs
index 1150746f3..1928ff31c 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -44,7 +44,8 @@ use std::sync::Once;
use std::task::Context;
use std::task::Poll;
-type PendingOpFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResult)>>>;
+type PendingOpFuture =
+ Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>;
pub enum Snapshot {
Static(&'static [u8]),
@@ -1477,7 +1478,9 @@ impl JsRuntime {
match pending_r {
Poll::Ready(None) => break,
Poll::Pending => break,
- Poll::Ready(Some((promise_id, resp))) => {
+ Poll::Ready(Some((promise_id, op_id, resp))) => {
+ let tracker = &mut state.op_state.borrow_mut().tracker;
+ tracker.track_async_completed(op_id);
async_responses.push((promise_id, resp));
}
};
@@ -1488,7 +1491,9 @@ impl JsRuntime {
match unref_r {
Poll::Ready(None) => break,
Poll::Pending => break,
- Poll::Ready(Some((promise_id, resp))) => {
+ Poll::Ready(Some((promise_id, op_id, resp))) => {
+ let tracker = &mut state.op_state.borrow_mut().tracker;
+ tracker.track_unref_completed(op_id);
async_responses.push((promise_id, resp));
}
};
@@ -1639,7 +1644,7 @@ pub mod tests {
match test_state.mode {
Mode::Async => {
assert_eq!(control, 42);
- let resp = (0, serialize_op_result(Ok(43), rc_op_state));
+ let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state));
Op::Async(Box::pin(futures::future::ready(resp)))
}
Mode::AsyncZeroCopy(has_buffer) => {
@@ -1649,7 +1654,7 @@ pub mod tests {
}
let resp = serialize_op_result(Ok(43), rc_op_state);
- Op::Async(Box::pin(futures::future::ready((0, resp))))
+ Op::Async(Box::pin(futures::future::ready((0, 1, resp))))
}
}
}