summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--BUILD.gn1
-rw-r--r--js/deno.ts1
-rw-r--r--js/metrics.ts39
-rw-r--r--js/metrics_test.ts24
-rw-r--r--js/unit_tests.ts1
-rw-r--r--src/isolate.rs154
-rw-r--r--src/msg.fbs12
-rw-r--r--src/ops.rs36
-rwxr-xr-xtools/http_server.py3
9 files changed, 266 insertions, 5 deletions
diff --git a/BUILD.gn b/BUILD.gn
index 4f2bf2a5b..16941c003 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -84,6 +84,7 @@ ts_sources = [
"js/libdeno.ts",
"js/main.ts",
"js/make_temp_dir.ts",
+ "js/metrics.ts",
"js/mkdir.ts",
"js/mock_builtin.js",
"js/net.ts",
diff --git a/js/deno.ts b/js/deno.ts
index a7350e03e..212544541 100644
--- a/js/deno.ts
+++ b/js/deno.ts
@@ -35,6 +35,7 @@ export { trace } from "./trace";
export { truncateSync, truncate } from "./truncate";
export { FileInfo } from "./file_info";
export { connect, dial, listen, Listener, Conn } from "./net";
+export { metrics } from "./metrics";
export const args: string[] = [];
// Provide the compiler API in an obfuscated way
diff --git a/js/metrics.ts b/js/metrics.ts
new file mode 100644
index 000000000..d76b781db
--- /dev/null
+++ b/js/metrics.ts
@@ -0,0 +1,39 @@
+// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+import * as msg from "gen/msg_generated";
+import { flatbuffers } from "flatbuffers";
+import { assert } from "./util";
+import * as dispatch from "./dispatch";
+
+interface Metrics {
+ opsDispatched: number;
+ opsCompleted: number;
+ bytesSentControl: number;
+ bytesSentData: number;
+ bytesReceived: number;
+}
+
+export function metrics(): Metrics {
+ return res(dispatch.sendSync(...req()));
+}
+
+function req(): [flatbuffers.Builder, msg.Any, flatbuffers.Offset] {
+ const builder = new flatbuffers.Builder();
+ msg.Metrics.startMetrics(builder);
+ const inner = msg.Metrics.endMetrics(builder);
+ return [builder, msg.Any.Metrics, inner];
+}
+
+function res(baseRes: null | msg.Base): Metrics {
+ assert(baseRes !== null);
+ assert(msg.Any.MetricsRes === baseRes!.innerType());
+ const res = new msg.MetricsRes();
+ assert(baseRes!.inner(res) !== null);
+
+ return {
+ opsDispatched: res.opsDispatched().toFloat64(),
+ opsCompleted: res.opsCompleted().toFloat64(),
+ bytesSentControl: res.bytesSentControl().toFloat64(),
+ bytesSentData: res.bytesSentData().toFloat64(),
+ bytesReceived: res.bytesReceived().toFloat64()
+ };
+}
diff --git a/js/metrics_test.ts b/js/metrics_test.ts
new file mode 100644
index 000000000..6954ae2ce
--- /dev/null
+++ b/js/metrics_test.ts
@@ -0,0 +1,24 @@
+// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+import { test, assert } from "./test_util.ts";
+import * as deno from "deno";
+
+test(function metrics() {
+ const m1 = deno.metrics();
+ assert(m1.opsDispatched > 0);
+ assert(m1.opsCompleted > 0);
+ assert(m1.bytesSentControl > 0);
+ assert(m1.bytesSentData >= 0);
+ assert(m1.bytesReceived > 0);
+
+ // Write to stdout to ensure a "data" message gets sent instead of just
+ // control messages.
+ const dataMsg = new Uint8Array([41, 42, 43]);
+ deno.stdout.write(dataMsg);
+
+ const m2 = deno.metrics();
+ assert(m2.opsDispatched > m1.opsDispatched);
+ assert(m2.opsCompleted > m1.opsCompleted);
+ assert(m2.bytesSentControl > m1.bytesSentControl);
+ assert(m2.bytesSentData >= m1.bytesSentData + dataMsg.byteLength);
+ assert(m2.bytesReceived > m1.bytesReceived);
+});
diff --git a/js/unit_tests.ts b/js/unit_tests.ts
index ca152ad39..24fdac823 100644
--- a/js/unit_tests.ts
+++ b/js/unit_tests.ts
@@ -25,3 +25,4 @@ import "./trace_test.ts";
import "./truncate_test.ts";
import "./v8_source_maps_test.ts";
import "../website/app_test.js";
+import "./metrics_test.ts";
diff --git a/src/isolate.rs b/src/isolate.rs
index 2dec64501..408259c5f 100644
--- a/src/isolate.rs
+++ b/src/isolate.rs
@@ -55,6 +55,7 @@ pub struct IsolateState {
pub argv: Vec<String>,
pub flags: flags::DenoFlags,
tx: Mutex<Option<mpsc::Sender<(i32, Buf)>>>,
+ pub metrics: Mutex<Metrics>,
}
impl IsolateState {
@@ -66,6 +67,32 @@ impl IsolateState {
let tx = maybe_tx.unwrap();
tx.send((req_id, buf)).expect("tx.send error");
}
+
+ fn metrics_op_dispatched(
+ &self,
+ bytes_sent_control: u64,
+ bytes_sent_data: u64,
+ ) {
+ let mut metrics = self.metrics.lock().unwrap();
+ metrics.ops_dispatched += 1;
+ metrics.bytes_sent_control += bytes_sent_control;
+ metrics.bytes_sent_data += bytes_sent_data;
+ }
+
+ fn metrics_op_completed(&self, bytes_received: u64) {
+ let mut metrics = self.metrics.lock().unwrap();
+ metrics.ops_completed += 1;
+ metrics.bytes_received += bytes_received;
+ }
+}
+
+#[derive(Default)]
+pub struct Metrics {
+ pub ops_dispatched: u64,
+ pub ops_completed: u64,
+ pub bytes_sent_control: u64,
+ pub bytes_sent_data: u64,
+ pub bytes_received: u64,
}
static DENO_INIT: std::sync::Once = std::sync::ONCE_INIT;
@@ -92,6 +119,7 @@ impl Isolate {
argv: argv_rest,
flags,
tx: Mutex::new(Some(tx)),
+ metrics: Mutex::new(Metrics::default()),
}),
}
}
@@ -221,6 +249,10 @@ extern "C" fn pre_dispatch(
control_buf: libdeno::deno_buf,
data_buf: libdeno::deno_buf,
) {
+ // for metrics
+ let bytes_sent_control = control_buf.data_len as u64;
+ let bytes_sent_data = data_buf.data_len as u64;
+
// control_buf is only valid for the lifetime of this call, thus is
// interpretted as a slice.
let control_slice = unsafe {
@@ -240,16 +272,23 @@ extern "C" fn pre_dispatch(
let dispatch = isolate.dispatch;
let (is_sync, op) = dispatch(isolate, control_slice, data_slice);
+ isolate
+ .state
+ .metrics_op_dispatched(bytes_sent_control, bytes_sent_data);
+
if is_sync {
// Execute op synchronously.
let buf = tokio_util::block_on(op).unwrap();
- if buf.len() != 0 {
+ let buf_size = buf.len();
+ if buf_size != 0 {
// Set the synchronous response, the value returned from isolate.send().
isolate.respond(req_id, buf);
}
+
+ isolate.state.metrics_op_completed(buf_size as u64);
} else {
// Execute op asynchronously.
- let state = isolate.state.clone();
+ let state = Arc::clone(&isolate.state);
// TODO Ideally Tokio would could tell us how many tasks are executing, but
// it cannot currently. Therefore we track top-level promises/tasks
@@ -258,7 +297,9 @@ extern "C" fn pre_dispatch(
let task = op
.and_then(move |buf| {
+ let buf_size = buf.len();
state.send_to_js(req_id, buf);
+ state.metrics_op_completed(buf_size as u64);
Ok(())
}).map_err(|_| ());
tokio::spawn(task);
@@ -330,4 +371,113 @@ mod tests {
let op = Box::new(futures::future::ok(control));
(true, op)
}
+
+ #[test]
+ fn test_metrics_sync() {
+ let argv = vec![String::from("./deno"), String::from("hello.js")];
+ let mut isolate = Isolate::new(argv, metrics_dispatch_sync);
+ tokio_util::init(|| {
+ // Verify that metrics have been properly initialized.
+ {
+ let metrics = isolate.state.metrics.lock().unwrap();
+ assert_eq!(metrics.ops_dispatched, 0);
+ assert_eq!(metrics.ops_completed, 0);
+ assert_eq!(metrics.bytes_sent_control, 0);
+ assert_eq!(metrics.bytes_sent_data, 0);
+ assert_eq!(metrics.bytes_received, 0);
+ }
+
+ isolate
+ .execute(
+ "y.js",
+ r#"
+ const control = new Uint8Array([4, 5, 6]);
+ const data = new Uint8Array([42, 43, 44, 45, 46]);
+ libdeno.send(control, data);
+ "#,
+ ).expect("execute error");
+ isolate.event_loop();
+ let metrics = isolate.state.metrics.lock().unwrap();
+ assert_eq!(metrics.ops_dispatched, 1);
+ assert_eq!(metrics.ops_completed, 1);
+ assert_eq!(metrics.bytes_sent_control, 3);
+ assert_eq!(metrics.bytes_sent_data, 5);
+ assert_eq!(metrics.bytes_received, 4);
+ });
+ }
+
+ #[test]
+ fn test_metrics_async() {
+ let argv = vec![String::from("./deno"), String::from("hello.js")];
+ let mut isolate = Isolate::new(argv, metrics_dispatch_async);
+ tokio_util::init(|| {
+ // Verify that metrics have been properly initialized.
+ {
+ let metrics = isolate.state.metrics.lock().unwrap();
+ assert_eq!(metrics.ops_dispatched, 0);
+ assert_eq!(metrics.ops_completed, 0);
+ assert_eq!(metrics.bytes_sent_control, 0);
+ assert_eq!(metrics.bytes_sent_data, 0);
+ assert_eq!(metrics.bytes_received, 0);
+ }
+
+ isolate
+ .execute(
+ "y.js",
+ r#"
+ const control = new Uint8Array([4, 5, 6]);
+ const data = new Uint8Array([42, 43, 44, 45, 46]);
+ let r = libdeno.send(control, data);
+ if (r != null) throw Error("expected null");
+ "#,
+ ).expect("execute error");
+
+ // Make sure relevant metrics are updated before task is executed.
+ {
+ let metrics = isolate.state.metrics.lock().unwrap();
+ assert_eq!(metrics.ops_dispatched, 1);
+ assert_eq!(metrics.bytes_sent_control, 3);
+ assert_eq!(metrics.bytes_sent_data, 5);
+ // Note we cannot check ops_completed nor bytes_received because that
+ // would be a race condition. It might be nice to have use a oneshot
+ // with metrics_dispatch_async() to properly validate them.
+ }
+
+ isolate.event_loop();
+
+ // Make sure relevant metrics are updated after task is executed.
+ {
+ let metrics = isolate.state.metrics.lock().unwrap();
+ assert_eq!(metrics.ops_dispatched, 1);
+ assert_eq!(metrics.ops_completed, 1);
+ assert_eq!(metrics.bytes_sent_control, 3);
+ assert_eq!(metrics.bytes_sent_data, 5);
+ assert_eq!(metrics.bytes_received, 4);
+ }
+ });
+ }
+
+ fn metrics_dispatch_sync(
+ _isolate: &mut Isolate,
+ _control: &[u8],
+ _data: &'static mut [u8],
+ ) -> (bool, Box<Op>) {
+ // Send back some sync response
+ let vec: Vec<u8> = vec![1, 2, 3, 4];
+ let control = vec.into_boxed_slice();
+ let op = Box::new(futures::future::ok(control));
+ (true, op)
+ }
+
+ fn metrics_dispatch_async(
+ _isolate: &mut Isolate,
+ _control: &[u8],
+ _data: &'static mut [u8],
+ ) -> (bool, Box<Op>) {
+ // Send back some sync response
+ let vec: Vec<u8> = vec![1, 2, 3, 4];
+ let control = vec.into_boxed_slice();
+ let op = Box::new(futures::future::ok(control));
+ (false, op)
+ }
}
diff --git a/src/msg.fbs b/src/msg.fbs
index 869452ced..40179fc14 100644
--- a/src/msg.fbs
+++ b/src/msg.fbs
@@ -41,6 +41,8 @@ union Any {
Accept,
Dial,
NewConn,
+ Metrics,
+ MetricsRes,
}
enum ErrorKind: byte {
@@ -321,4 +323,14 @@ table NewConn {
local_addr: string;
}
+table Metrics {}
+
+table MetricsRes {
+ ops_dispatched: uint64;
+ ops_completed: uint64;
+ bytes_sent_control: uint64;
+ bytes_sent_data: uint64;
+ bytes_received: uint64;
+}
+
root_type Base;
diff --git a/src/ops.rs b/src/ops.rs
index 3aad572bd..c32f53634 100644
--- a/src/ops.rs
+++ b/src/ops.rs
@@ -101,6 +101,7 @@ pub fn dispatch(
msg::Any::Listen => op_listen,
msg::Any::Accept => op_accept,
msg::Any::Dial => op_dial,
+ msg::Any::Metrics => op_metrics,
_ => panic!(format!(
"Unhandled message {}",
msg::enum_name_any(inner_type)
@@ -465,7 +466,7 @@ where
// fn blocking<F>(is_sync: bool, f: F) -> Box<Op>
// where F: FnOnce() -> DenoResult<Buf>
macro_rules! blocking {
- ($is_sync:expr,$fn:expr) => {
+ ($is_sync:expr, $fn:expr) => {
if $is_sync {
// If synchronous, execute the function immediately on the main thread.
Box::new(futures::future::result($fn()))
@@ -1153,3 +1154,36 @@ fn op_dial(
.and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream));
Box::new(op)
}
+
+fn op_metrics(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+
+ let metrics = state.metrics.lock().unwrap();
+
+ let builder = &mut FlatBufferBuilder::new();
+ let inner = msg::MetricsRes::create(
+ builder,
+ &msg::MetricsResArgs {
+ ops_dispatched: metrics.ops_dispatched,
+ ops_completed: metrics.ops_completed,
+ bytes_sent_control: metrics.bytes_sent_control,
+ bytes_sent_data: metrics.bytes_sent_data,
+ bytes_received: metrics.bytes_received,
+ ..Default::default()
+ },
+ );
+ ok_future(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::MetricsRes,
+ ..Default::default()
+ },
+ ))
+}
diff --git a/tools/http_server.py b/tools/http_server.py
index d33f24d5d..c627dfd5f 100755
--- a/tools/http_server.py
+++ b/tools/http_server.py
@@ -55,5 +55,4 @@ def spawn():
if __name__ == '__main__':
- s = server()
- s.serve_forever()
+ spawn().join()