summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/examples/http_bench.js18
-rw-r--r--core/examples/http_bench.rs99
-rw-r--r--core/isolate.rs52
-rw-r--r--core/lib.rs2
-rw-r--r--core/ops.rs111
-rw-r--r--core/shared_queue.js11
-rw-r--r--core/shared_queue.rs2
7 files changed, 203 insertions, 92 deletions
diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js
index 4c68f2be6..a7142b09d 100644
--- a/core/examples/http_bench.js
+++ b/core/examples/http_bench.js
@@ -1,11 +1,6 @@
// This is not a real HTTP server. We read blindly one time into 'requestBuf',
// then write this fixed 'responseBuf'. The point of this benchmark is to
// exercise the event loop in a simple yet semi-realistic way.
-const OP_LISTEN = 1;
-const OP_ACCEPT = 2;
-const OP_READ = 3;
-const OP_WRITE = 4;
-const OP_CLOSE = 5;
const requestBuf = new Uint8Array(64 * 1024);
const responseBuf = new Uint8Array(
"HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n"
@@ -80,12 +75,12 @@ function handleAsyncMsgFromRust(opId, buf) {
/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
- return sendSync(OP_LISTEN, -1);
+ return sendSync(ops["listen"], -1);
}
/** Accepts a connection, returns rid. */
async function accept(rid) {
- return await sendAsync(OP_ACCEPT, rid);
+ return await sendAsync(ops["accept"], rid);
}
/**
@@ -93,16 +88,16 @@ async function accept(rid) {
* Returns bytes read.
*/
async function read(rid, data) {
- return await sendAsync(OP_READ, rid, data);
+ return await sendAsync(ops["read"], rid, data);
}
/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
async function write(rid, data) {
- return await sendAsync(OP_WRITE, rid, data);
+ return await sendAsync(ops["write"], rid, data);
}
function close(rid) {
- return sendSync(OP_CLOSE, rid);
+ return sendSync(ops["close"], rid);
}
async function serve(rid) {
@@ -120,8 +115,11 @@ async function serve(rid) {
close(rid);
}
+let ops;
+
async function main() {
Deno.core.setAsyncHandler(handleAsyncMsgFromRust);
+ ops = Deno.core.ops();
Deno.core.print("http_bench.js start\n");
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index 3c077562d..c019d8a11 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -36,12 +36,6 @@ impl log::Log for Logger {
fn flush(&self) {}
}
-const OP_LISTEN: OpId = 1;
-const OP_ACCEPT: OpId = 2;
-const OP_READ: OpId = 3;
-const OP_WRITE: OpId = 4;
-const OP_CLOSE: OpId = 5;
-
#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
@@ -104,48 +98,24 @@ fn test_record_from() {
// TODO test From<&[u8]> for Record
}
-pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
-
-fn dispatch(
- op_id: OpId,
- control: &[u8],
- zero_copy_buf: Option<PinnedBuf>,
-) -> CoreOp {
- let record = Record::from(control);
- let is_sync = record.promise_id == 0;
- let http_bench_op = match op_id {
- OP_LISTEN => {
- assert!(is_sync);
- op_listen()
- }
- OP_CLOSE => {
- assert!(is_sync);
- let rid = record.arg;
- op_close(rid)
- }
- OP_ACCEPT => {
- assert!(!is_sync);
- let listener_rid = record.arg;
- op_accept(listener_rid)
- }
- OP_READ => {
- assert!(!is_sync);
- let rid = record.arg;
- op_read(rid, zero_copy_buf)
- }
- OP_WRITE => {
- assert!(!is_sync);
- let rid = record.arg;
- op_write(rid, zero_copy_buf)
- }
- _ => panic!("bad op {}", op_id),
- };
- let mut record_a = record.clone();
- let mut record_b = record.clone();
+pub type HttpOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
- let fut = Box::new(
- http_bench_op
- .and_then(move |result| {
+pub type HttpOpHandler =
+ fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp>;
+
+fn http_op(
+ handler: HttpOpHandler,
+) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp {
+ move |control: &[u8], zero_copy_buf: Option<PinnedBuf>| -> CoreOp {
+ let record = Record::from(control);
+ let is_sync = record.promise_id == 0;
+ let op = handler(record.clone(), zero_copy_buf);
+
+ let mut record_a = record.clone();
+ let mut record_b = record.clone();
+
+ let fut = Box::new(
+ op.and_then(move |result| {
record_a.result = result;
Ok(record_a)
})
@@ -158,12 +128,13 @@ fn dispatch(
let record = result.unwrap();
Ok(record.into())
}),
- );
+ );
- if is_sync {
- Op::Sync(fut.wait().unwrap())
- } else {
- Op::Async(fut)
+ if is_sync {
+ Op::Sync(fut.wait().unwrap())
+ } else {
+ Op::Async(fut)
+ }
}
}
@@ -181,7 +152,11 @@ fn main() {
});
let mut isolate = deno::Isolate::new(startup_data, false);
- isolate.set_dispatch(dispatch);
+ isolate.register_op("listen", http_op(op_listen));
+ isolate.register_op("accept", http_op(op_accept));
+ isolate.register_op("read", http_op(op_read));
+ isolate.register_op("write", http_op(op_write));
+ isolate.register_op("close", http_op(op_close));
isolate.then(|r| {
js_check(r);
@@ -225,7 +200,8 @@ fn new_rid() -> i32 {
rid as i32
}
-fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
+fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+ let listener_rid = record.arg;
debug!("accept {}", listener_rid);
Box::new(
futures::future::poll_fn(move || {
@@ -248,9 +224,11 @@ fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
)
}
-fn op_listen() -> Box<HttpBenchOp> {
+fn op_listen(
+ _record: Record,
+ _zero_copy_buf: Option<PinnedBuf>,
+) -> Box<HttpOp> {
debug!("listen");
-
Box::new(lazy(move || {
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
@@ -262,8 +240,9 @@ fn op_listen() -> Box<HttpBenchOp> {
}))
}
-fn op_close(rid: i32) -> Box<HttpBenchOp> {
+fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
debug!("close");
+ let rid = record.arg;
Box::new(lazy(move || {
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&rid);
@@ -272,7 +251,8 @@ fn op_close(rid: i32) -> Box<HttpBenchOp> {
}))
}
-fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
+fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+ let rid = record.arg;
debug!("read rid={}", rid);
let mut zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
@@ -293,7 +273,8 @@ fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
)
}
-fn op_write(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> {
+fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+ let rid = record.arg;
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
Box::new(
diff --git a/core/isolate.rs b/core/isolate.rs
index bad79b579..6795f25f0 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -13,10 +13,10 @@ use crate::libdeno::deno_buf;
use crate::libdeno::deno_dyn_import_id;
use crate::libdeno::deno_mod;
use crate::libdeno::deno_pinned_buf;
-use crate::libdeno::OpId;
use crate::libdeno::PinnedBuf;
use crate::libdeno::Snapshot1;
use crate::libdeno::Snapshot2;
+use crate::ops::*;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::stream::FuturesUnordered;
@@ -34,24 +34,6 @@ use std::fmt;
use std::ptr::null;
use std::sync::{Arc, Mutex, Once};
-pub type Buf = Box<[u8]>;
-
-pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>;
-
-type PendingOpFuture =
- Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>;
-
-pub enum Op<E> {
- Sync(Buf),
- Async(OpAsyncFuture<E>),
-}
-
-pub type CoreError = ();
-
-pub type CoreOp = Op<CoreError>;
-
-pub type OpResult<E> = Result<Op<E>, E>;
-
/// Args: op_id, control_buf, zero_copy_buf
type CoreDispatchFn = dyn Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp;
@@ -179,6 +161,7 @@ pub struct Isolate {
pending_dyn_imports: FuturesUnordered<StreamFuture<DynImport>>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
+ op_registry: OpRegistry,
}
unsafe impl Send for Isolate {}
@@ -244,12 +227,17 @@ impl Isolate {
have_unpolled_ops: false,
pending_dyn_imports: FuturesUnordered::new(),
startup_script,
+ op_registry: OpRegistry::new(),
}
}
/// Defines the how Deno.core.dispatch() acts.
/// Called whenever Deno.core.dispatch() is called in JavaScript. zero_copy_buf
/// corresponds to the second argument of Deno.core.dispatch().
+ ///
+ /// If this method is used then ops registered using `op_register` function are
+ /// ignored and all dispatching must be handled manually in provided callback.
+ // TODO: we want to deprecate and remove this API and move to `register_op` API
pub fn set_dispatch<F>(&mut self, f: F)
where
F: Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
@@ -257,6 +245,22 @@ impl Isolate {
self.dispatch = Some(Arc::new(f));
}
+ /// New dispatch mechanism. Requires runtime to explicitly ask for op ids
+ /// before using any of the ops.
+ ///
+ /// Ops added using this method are only usable if `dispatch` is not set
+ /// (using `set_dispatch` method).
+ pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
+ where
+ F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
+ {
+ assert!(
+ self.dispatch.is_none(),
+ "set_dispatch should not be used in conjunction with register_op"
+ );
+ self.op_registry.register(name, op)
+ }
+
pub fn set_dyn_import<F>(&mut self, f: F)
where
F: Fn(deno_dyn_import_id, &str, &str) -> DynImportStream
@@ -329,9 +333,17 @@ impl Isolate {
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };
let op = if let Some(ref f) = isolate.dispatch {
+ assert!(
+ op_id != 0,
+ "op_id 0 is a special value that shouldn't be used with dispatch"
+ );
f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf))
} else {
- panic!("isolate.dispatch not set")
+ isolate.op_registry.call(
+ op_id,
+ control_buf.as_ref(),
+ PinnedBuf::new(zero_copy_buf),
+ )
};
debug_assert_eq!(isolate.shared.size(), 0);
diff --git a/core/lib.rs b/core/lib.rs
index 9be1c3891..42a692f1a 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -11,6 +11,7 @@ mod js_errors;
mod libdeno;
mod module_specifier;
mod modules;
+mod ops;
mod shared_queue;
pub use crate::any_error::*;
@@ -22,6 +23,7 @@ pub use crate::libdeno::OpId;
pub use crate::libdeno::PinnedBuf;
pub use crate::module_specifier::*;
pub use crate::modules::*;
+pub use crate::ops::*;
pub fn v8_version() -> &'static str {
use std::ffi::CStr;
diff --git a/core/ops.rs b/core/ops.rs
new file mode 100644
index 000000000..84c15e096
--- /dev/null
+++ b/core/ops.rs
@@ -0,0 +1,111 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+pub use crate::libdeno::OpId;
+use crate::PinnedBuf;
+use futures::Future;
+use std::collections::HashMap;
+
+pub type Buf = Box<[u8]>;
+
+pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>;
+
+pub(crate) type PendingOpFuture =
+ Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>;
+
+pub type OpResult<E> = Result<Op<E>, E>;
+
+pub enum Op<E> {
+ Sync(Buf),
+ Async(OpAsyncFuture<E>),
+}
+
+pub type CoreError = ();
+
+pub type CoreOp = Op<CoreError>;
+
+/// Main type describing op
+type OpDispatcher = dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp;
+
+#[derive(Default)]
+pub struct OpRegistry {
+ dispatchers: Vec<Box<OpDispatcher>>,
+ name_to_id: HashMap<String, OpId>,
+}
+
+impl OpRegistry {
+ pub fn new() -> Self {
+ let mut registry = Self::default();
+ let op_id = registry.register("ops", |_, _| {
+ // ops is a special op which is handled in call.
+ unreachable!()
+ });
+ assert_eq!(op_id, 0);
+ registry
+ }
+
+ pub fn register<F>(&mut self, name: &str, op: F) -> OpId
+ where
+ F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
+ {
+ let op_id = self.dispatchers.len() as u32;
+
+ let existing = self.name_to_id.insert(name.to_string(), op_id);
+ assert!(
+ existing.is_none(),
+ format!("Op already registered: {}", name)
+ );
+
+ self.dispatchers.push(Box::new(op));
+ op_id
+ }
+
+ fn json_map(&self) -> Buf {
+ let op_map_json = serde_json::to_string(&self.name_to_id).unwrap();
+ op_map_json.as_bytes().to_owned().into_boxed_slice()
+ }
+
+ pub fn call(
+ &self,
+ op_id: OpId,
+ control: &[u8],
+ zero_copy_buf: Option<PinnedBuf>,
+ ) -> CoreOp {
+ // Op with id 0 has special meaning - it's a special op that is always
+ // provided to retrieve op id map. The map consists of name to `OpId`
+ // mappings.
+ if op_id == 0 {
+ return Op::Sync(self.json_map());
+ }
+
+ let d = &*self.dispatchers.get(op_id as usize).expect("Op not found!");
+ d(control, zero_copy_buf)
+ }
+}
+
+#[test]
+fn test_op_registry() {
+ use std::sync::atomic;
+ use std::sync::Arc;
+ let mut op_registry = OpRegistry::new();
+
+ let c = Arc::new(atomic::AtomicUsize::new(0));
+ let c_ = c.clone();
+
+ let test_id = op_registry.register("test", move |_, _| {
+ c_.fetch_add(1, atomic::Ordering::SeqCst);
+ CoreOp::Sync(Box::new([]))
+ });
+ assert!(test_id != 0);
+
+ let mut expected = HashMap::new();
+ expected.insert("ops".to_string(), 0);
+ expected.insert("test".to_string(), 1);
+ assert_eq!(op_registry.name_to_id, expected);
+
+ let res = op_registry.call(test_id, &[], None);
+ if let Op::Sync(buf) = res {
+ assert_eq!(buf.len(), 0);
+ } else {
+ unreachable!();
+ }
+ assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
+}
diff --git a/core/shared_queue.js b/core/shared_queue.js
index 22a64a312..7eeb61255 100644
--- a/core/shared_queue.js
+++ b/core/shared_queue.js
@@ -58,6 +58,13 @@ SharedQueue Binary Layout
Deno.core.recv(handleAsyncMsgFromRust);
}
+ function ops() {
+ // op id 0 is a special value to retreive the map of registered ops.
+ const opsMapBytes = Deno.core.send(0, new Uint8Array([]), null);
+ const opsMapJson = String.fromCharCode.apply(null, opsMapBytes);
+ return JSON.parse(opsMapJson);
+ }
+
function assert(cond) {
if (!cond) {
throw Error("assert");
@@ -84,7 +91,6 @@ SharedQueue Binary Layout
return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
}
- // TODO(ry) rename to setMeta
function setMeta(index, end, opId) {
shared32[INDEX_OFFSETS + 2 * index] = end;
shared32[INDEX_OFFSETS + 2 * index + 1] = opId;
@@ -189,7 +195,8 @@ SharedQueue Binary Layout
push,
reset,
shift
- }
+ },
+ ops
};
assert(window[GLOBAL_NAMESPACE] != null);
diff --git a/core/shared_queue.rs b/core/shared_queue.rs
index 5f9554ad2..dbb738f15 100644
--- a/core/shared_queue.rs
+++ b/core/shared_queue.rs
@@ -196,7 +196,7 @@ impl SharedQueue {
#[cfg(test)]
mod tests {
use super::*;
- use crate::isolate::Buf;
+ use crate::ops::Buf;
#[test]
fn basic() {