summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/examples/http_bench.rs2
-rw-r--r--core/isolate.rs6
-rw-r--r--core/ops.rs88
3 files changed, 74 insertions, 22 deletions
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index e82fcc119..a7b26f4b1 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -165,7 +165,7 @@ fn main() {
filename: "http_bench.js",
});
- let mut isolate = deno::Isolate::new(startup_data, false);
+ let isolate = deno::Isolate::new(startup_data, false);
isolate.register_op("listen", http_op(op_listen));
isolate.register_op("accept", http_op(op_accept));
isolate.register_op("read", http_op(op_read));
diff --git a/core/isolate.rs b/core/isolate.rs
index 079ab5dcf..41c8b02fd 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -178,7 +178,7 @@ pub struct Isolate {
pending_dyn_imports: FuturesUnordered<StreamFuture<IntoStream<DynImport>>>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
- op_registry: OpRegistry,
+ pub op_registry: Arc<OpRegistry>,
eager_poll_count: u32,
waker: AtomicWaker,
}
@@ -245,7 +245,7 @@ impl Isolate {
have_unpolled_ops: false,
pending_dyn_imports: FuturesUnordered::new(),
startup_script,
- op_registry: OpRegistry::new(),
+ op_registry: Arc::new(OpRegistry::new()),
eager_poll_count: 0,
waker: AtomicWaker::new(),
}
@@ -256,7 +256,7 @@ impl Isolate {
/// corresponds to the second argument of Deno.core.dispatch().
///
/// Requires runtime to explicitly ask for op ids before using any of the ops.
- pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
+ pub fn register_op<F>(&self, name: &str, op: F) -> OpId
where
F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
diff --git a/core/ops.rs b/core/ops.rs
index 3a4f51b83..6dc0a7323 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -4,6 +4,8 @@ use crate::PinnedBuf;
use futures::Future;
use std::collections::HashMap;
use std::pin::Pin;
+use std::sync::Arc;
+use std::sync::RwLock;
pub type Buf = Box<[u8]>;
@@ -25,17 +27,18 @@ pub type CoreError = ();
pub type CoreOp = Op<CoreError>;
/// Main type describing op
-type OpDispatcher = dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp;
+type OpDispatcher =
+ dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static;
#[derive(Default)]
pub struct OpRegistry {
- dispatchers: Vec<Box<OpDispatcher>>,
- name_to_id: HashMap<String, OpId>,
+ dispatchers: RwLock<Vec<Arc<Box<OpDispatcher>>>>,
+ name_to_id: RwLock<HashMap<String, OpId>>,
}
impl OpRegistry {
pub fn new() -> Self {
- let mut registry = Self::default();
+ let registry = Self::default();
let op_id = registry.register("ops", |_, _| {
// ops is a special op which is handled in call.
unreachable!()
@@ -44,24 +47,29 @@ impl OpRegistry {
registry
}
- pub fn register<F>(&mut self, name: &str, op: F) -> OpId
+ pub fn register<F>(&self, name: &str, op: F) -> OpId
where
F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
- let op_id = self.dispatchers.len() as u32;
+ let mut lock = self.dispatchers.write().unwrap();
+ let op_id = lock.len() as u32;
- let existing = self.name_to_id.insert(name.to_string(), op_id);
+ let mut name_lock = self.name_to_id.write().unwrap();
+ let existing = name_lock.insert(name.to_string(), op_id);
assert!(
existing.is_none(),
format!("Op already registered: {}", name)
);
- self.dispatchers.push(Box::new(op));
+ lock.push(Arc::new(Box::new(op)));
+ drop(name_lock);
+ drop(lock);
op_id
}
fn json_map(&self) -> Buf {
- let op_map_json = serde_json::to_string(&self.name_to_id).unwrap();
+ let lock = self.name_to_id.read().unwrap();
+ let op_map_json = serde_json::to_string(&*lock).unwrap();
op_map_json.as_bytes().to_owned().into_boxed_slice()
}
@@ -78,13 +86,15 @@ impl OpRegistry {
if op_id == 0 {
return Some(Op::Sync(self.json_map()));
}
-
- let d = match self.dispatchers.get(op_id as usize) {
- Some(handler) => &*handler,
- None => return None,
- };
-
- Some(d(control, zero_copy_buf))
+ let lock = self.dispatchers.read().unwrap();
+ if let Some(op) = lock.get(op_id as usize) {
+ let op_ = Arc::clone(&op);
+ // This should allow for changes to the dispatcher list during a call.
+ drop(lock);
+ Some(op_(control, zero_copy_buf))
+ } else {
+ None
+ }
}
}
@@ -92,7 +102,7 @@ impl OpRegistry {
fn test_op_registry() {
use std::sync::atomic;
use std::sync::Arc;
- let mut op_registry = OpRegistry::new();
+ let op_registry = OpRegistry::new();
let c = Arc::new(atomic::AtomicUsize::new(0));
let c_ = c.clone();
@@ -106,7 +116,8 @@ fn test_op_registry() {
let mut expected = HashMap::new();
expected.insert("ops".to_string(), 0);
expected.insert("test".to_string(), 1);
- assert_eq!(op_registry.name_to_id, expected);
+ let name_to_id = op_registry.name_to_id.read().unwrap();
+ assert_eq!(*name_to_id, expected);
let res = op_registry.call(test_id, &[], None).unwrap();
if let Op::Sync(buf) = res {
@@ -119,3 +130,44 @@ fn test_op_registry() {
let res = op_registry.call(100, &[], None);
assert!(res.is_none());
}
+
+#[test]
+fn register_op_during_call() {
+ use std::sync::atomic;
+ use std::sync::Arc;
+ let op_registry = Arc::new(OpRegistry::new());
+
+ let c = Arc::new(atomic::AtomicUsize::new(0));
+ let c_ = c.clone();
+
+ let op_registry_ = op_registry.clone();
+ let test_id = op_registry.register("dynamic_register_op", move |_, _| {
+ let c__ = c_.clone();
+ op_registry_.register("test", move |_, _| {
+ c__.fetch_add(1, atomic::Ordering::SeqCst);
+ CoreOp::Sync(Box::new([]))
+ });
+ CoreOp::Sync(Box::new([]))
+ });
+ assert!(test_id != 0);
+
+ op_registry.call(test_id, &[], None);
+
+ let mut expected = HashMap::new();
+ expected.insert("ops".to_string(), 0);
+ expected.insert("dynamic_register_op".to_string(), 1);
+ expected.insert("test".to_string(), 2);
+ let name_to_id = op_registry.name_to_id.read().unwrap();
+ assert_eq!(*name_to_id, expected);
+
+ let res = op_registry.call(2, &[], None).unwrap();
+ if let Op::Sync(buf) = res {
+ assert_eq!(buf.len(), 0);
+ } else {
+ unreachable!();
+ }
+ assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
+
+ let res = op_registry.call(100, &[], None);
+ assert!(res.is_none());
+}