diff options
author | Andy Finch <andyfinch7@gmail.com> | 2019-11-18 21:13:04 -0500 |
---|---|---|
committer | Ry Dahl <ry@tinyclouds.org> | 2019-11-18 21:13:04 -0500 |
commit | b6b813cbfcf1a440a4a3b5eb10f55cfc85c4e3f3 (patch) | |
tree | 7fe68a6c94ad700d2bf2074274cfcdbe9e8dd87e /core/ops.rs | |
parent | f437521afb5df52fc9d4b7ee5664dacbf5881e3e (diff) |
feat: op registration during calls (#3375)
Diffstat (limited to 'core/ops.rs')
-rw-r--r-- | core/ops.rs | 88 |
1 files changed, 70 insertions, 18 deletions
diff --git a/core/ops.rs b/core/ops.rs index 3a4f51b83..6dc0a7323 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -4,6 +4,8 @@ use crate::PinnedBuf; use futures::Future; use std::collections::HashMap; use std::pin::Pin; +use std::sync::Arc; +use std::sync::RwLock; pub type Buf = Box<[u8]>; @@ -25,17 +27,18 @@ pub type CoreError = (); pub type CoreOp = Op<CoreError>; /// Main type describing op -type OpDispatcher = dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp; +type OpDispatcher = + dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static; #[derive(Default)] pub struct OpRegistry { - dispatchers: Vec<Box<OpDispatcher>>, - name_to_id: HashMap<String, OpId>, + dispatchers: RwLock<Vec<Arc<Box<OpDispatcher>>>>, + name_to_id: RwLock<HashMap<String, OpId>>, } impl OpRegistry { pub fn new() -> Self { - let mut registry = Self::default(); + let registry = Self::default(); let op_id = registry.register("ops", |_, _| { // ops is a special op which is handled in call. unreachable!() @@ -44,24 +47,29 @@ impl OpRegistry { registry } - pub fn register<F>(&mut self, name: &str, op: F) -> OpId + pub fn register<F>(&self, name: &str, op: F) -> OpId where F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static, { - let op_id = self.dispatchers.len() as u32; + let mut lock = self.dispatchers.write().unwrap(); + let op_id = lock.len() as u32; - let existing = self.name_to_id.insert(name.to_string(), op_id); + let mut name_lock = self.name_to_id.write().unwrap(); + let existing = name_lock.insert(name.to_string(), op_id); assert!( existing.is_none(), format!("Op already registered: {}", name) ); - self.dispatchers.push(Box::new(op)); + lock.push(Arc::new(Box::new(op))); + drop(name_lock); + drop(lock); op_id } fn json_map(&self) -> Buf { - let op_map_json = serde_json::to_string(&self.name_to_id).unwrap(); + let lock = self.name_to_id.read().unwrap(); + let op_map_json = serde_json::to_string(&*lock).unwrap(); op_map_json.as_bytes().to_owned().into_boxed_slice() } @@ -78,13 +86,15 @@ impl OpRegistry { if op_id == 0 { return Some(Op::Sync(self.json_map())); } - - let d = match self.dispatchers.get(op_id as usize) { - Some(handler) => &*handler, - None => return None, - }; - - Some(d(control, zero_copy_buf)) + let lock = self.dispatchers.read().unwrap(); + if let Some(op) = lock.get(op_id as usize) { + let op_ = Arc::clone(&op); + // This should allow for changes to the dispatcher list during a call. + drop(lock); + Some(op_(control, zero_copy_buf)) + } else { + None + } } } @@ -92,7 +102,7 @@ impl OpRegistry { fn test_op_registry() { use std::sync::atomic; use std::sync::Arc; - let mut op_registry = OpRegistry::new(); + let op_registry = OpRegistry::new(); let c = Arc::new(atomic::AtomicUsize::new(0)); let c_ = c.clone(); @@ -106,7 +116,8 @@ fn test_op_registry() { let mut expected = HashMap::new(); expected.insert("ops".to_string(), 0); expected.insert("test".to_string(), 1); - assert_eq!(op_registry.name_to_id, expected); + let name_to_id = op_registry.name_to_id.read().unwrap(); + assert_eq!(*name_to_id, expected); let res = op_registry.call(test_id, &[], None).unwrap(); if let Op::Sync(buf) = res { @@ -119,3 +130,44 @@ fn test_op_registry() { let res = op_registry.call(100, &[], None); assert!(res.is_none()); } + +#[test] +fn register_op_during_call() { + use std::sync::atomic; + use std::sync::Arc; + let op_registry = Arc::new(OpRegistry::new()); + + let c = Arc::new(atomic::AtomicUsize::new(0)); + let c_ = c.clone(); + + let op_registry_ = op_registry.clone(); + let test_id = op_registry.register("dynamic_register_op", move |_, _| { + let c__ = c_.clone(); + op_registry_.register("test", move |_, _| { + c__.fetch_add(1, atomic::Ordering::SeqCst); + CoreOp::Sync(Box::new([])) + }); + CoreOp::Sync(Box::new([])) + }); + assert!(test_id != 0); + + op_registry.call(test_id, &[], None); + + let mut expected = HashMap::new(); + expected.insert("ops".to_string(), 0); + expected.insert("dynamic_register_op".to_string(), 1); + expected.insert("test".to_string(), 2); + let name_to_id = op_registry.name_to_id.read().unwrap(); + assert_eq!(*name_to_id, expected); + + let res = op_registry.call(2, &[], None).unwrap(); + if let Op::Sync(buf) = res { + assert_eq!(buf.len(), 0); + } else { + unreachable!(); + } + assert_eq!(c.load(atomic::Ordering::SeqCst), 1); + + let res = op_registry.call(100, &[], None); + assert!(res.is_none()); +} |