summaryrefslogtreecommitdiff
path: root/core/ops.rs
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2020-09-06 02:34:02 +0200
committerGitHub <noreply@github.com>2020-09-06 02:34:02 +0200
commitc821e8f2f1fb8ad5e9eb00854277cafc8c80b2f5 (patch)
treec429a3c2707a4047fb512443a8468b7e15e5730d /core/ops.rs
parent849431eb1d112d1f79f4a327830dc1a5bf22dd47 (diff)
Move JSON ops to deno_core (#7336)
Diffstat (limited to 'core/ops.rs')
-rw-r--r--core/ops.rs245
1 files changed, 106 insertions, 139 deletions
diff --git a/core/ops.rs b/core/ops.rs
index 65a0f325b..838596dc0 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -1,179 +1,146 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use crate::core_isolate::CoreIsolateState;
+
+use crate::BufVec;
+use crate::ErrBox;
use crate::ZeroCopyBuf;
use futures::Future;
+use futures::FutureExt;
+use indexmap::IndexMap;
+use serde_json::json;
+use serde_json::Value;
use std::collections::HashMap;
+use std::iter::once;
+use std::ops::Deref;
+use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
-pub type OpId = u32;
-
-pub type Buf = Box<[u8]>;
-
-pub type OpAsyncFuture = Pin<Box<dyn Future<Output = Buf>>>;
+pub type OpAsyncFuture = Pin<Box<dyn Future<Output = Box<[u8]>>>>;
+pub type OpFn<S> = dyn Fn(Rc<S>, BufVec) -> Op + 'static;
+pub type OpId = usize;
pub enum Op {
- Sync(Buf),
+ Sync(Box<[u8]>),
Async(OpAsyncFuture),
/// AsyncUnref is the variation of Async, which doesn't block the program
/// exiting.
AsyncUnref(OpAsyncFuture),
+ NotFound,
}
-/// Main type describing op
-pub type OpDispatcher =
- dyn Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static;
-
-#[derive(Default)]
-pub struct OpRegistry {
- dispatchers: Vec<Rc<OpDispatcher>>,
- name_to_id: HashMap<String, OpId>,
+pub trait OpRouter {
+ fn route_op(self: Rc<Self>, op_id: OpId, bufs: BufVec) -> Op;
}
-impl OpRegistry {
- pub fn new() -> Self {
- let mut registry = Self::default();
- let op_id = registry.register("ops", |state, _| {
- let buf = state.op_registry.json_map();
+pub trait OpRegistry: OpRouter + 'static {
+ fn get_op_catalog(self: Rc<Self>) -> HashMap<String, OpId>;
+
+ fn register_op<F>(&self, name: &str, op_fn: F) -> OpId
+ where
+ F: Fn(Rc<Self>, BufVec) -> Op + 'static;
+
+ fn register_op_json_sync<F>(self: &Rc<Self>, name: &str, op_fn: F) -> OpId
+ where
+ F: Fn(&Self, Value, &mut [ZeroCopyBuf]) -> Result<Value, ErrBox> + 'static,
+ {
+ let base_op_fn = move |state: Rc<Self>, mut bufs: BufVec| -> Op {
+ let result = serde_json::from_slice(&bufs[0])
+ .map_err(ErrBox::from)
+ .and_then(|args| op_fn(&state, args, &mut bufs[1..]));
+ let buf = state.json_serialize_op_result(None, result);
Op::Sync(buf)
- });
- assert_eq!(op_id, 0);
- registry
+ };
+
+ self.register_op(name, base_op_fn)
}
- pub fn register<F>(&mut self, name: &str, op: F) -> OpId
+ fn register_op_json_async<F, R>(self: &Rc<Self>, name: &str, op_fn: F) -> OpId
where
- F: Fn(&mut CoreIsolateState, &mut [ZeroCopyBuf]) -> Op + 'static,
+ F: Fn(Rc<Self>, Value, BufVec) -> R + 'static,
+ R: Future<Output = Result<Value, ErrBox>> + 'static,
{
- let op_id = self.dispatchers.len() as u32;
-
- let existing = self.name_to_id.insert(name.to_string(), op_id);
- assert!(
- existing.is_none(),
- format!("Op already registered: {}", name)
- );
- self.dispatchers.push(Rc::new(op));
- op_id
- }
+ let try_dispatch_op = move |state: Rc<Self>,
+ bufs: BufVec|
+ -> Result<Op, ErrBox> {
+ let args: Value = serde_json::from_slice(&bufs[0])?;
+ let promise_id = args
+ .get("promiseId")
+ .and_then(Value::as_u64)
+ .ok_or_else(|| ErrBox::type_error("missing or invalid `promiseId`"))?;
+ let bufs = bufs[1..].into();
+ let fut = op_fn(state.clone(), args, bufs).map(move |result| {
+ state.json_serialize_op_result(Some(promise_id), result)
+ });
+ Ok(Op::Async(Box::pin(fut)))
+ };
- fn json_map(&self) -> Buf {
- let op_map_json = serde_json::to_string(&self.name_to_id).unwrap();
- op_map_json.as_bytes().to_owned().into_boxed_slice()
- }
+ let base_op_fn = move |state: Rc<Self>, bufs: BufVec| -> Op {
+ match try_dispatch_op(state.clone(), bufs) {
+ Ok(op) => op,
+ Err(err) => Op::Sync(state.json_serialize_op_result(None, Err(err))),
+ }
+ };
- pub fn get(&self, op_id: OpId) -> Option<Rc<OpDispatcher>> {
- self.dispatchers.get(op_id as usize).map(Rc::clone)
+ self.register_op(name, base_op_fn)
}
- pub fn unregister_op(&mut self, name: &str) {
- let id = self.name_to_id.remove(name).unwrap();
- drop(self.dispatchers.remove(id as usize));
+ fn json_serialize_op_result(
+ &self,
+ promise_id: Option<u64>,
+ result: Result<Value, ErrBox>,
+ ) -> Box<[u8]> {
+ let value = match result {
+ Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
+ Err(err) => json!({
+ "promiseId": promise_id ,
+ "err": {
+ "className": self.get_error_class_name(&err),
+ "message": err.to_string(),
+ }
+ }),
+ };
+ serde_json::to_vec(&value).unwrap().into_boxed_slice()
}
-}
-#[test]
-fn test_op_registry() {
- use crate::CoreIsolate;
- use std::sync::atomic;
- use std::sync::Arc;
- let mut op_registry = OpRegistry::new();
-
- let c = Arc::new(atomic::AtomicUsize::new(0));
- let c_ = c.clone();
-
- let test_id = op_registry.register("test", move |_, _| {
- c_.fetch_add(1, atomic::Ordering::SeqCst);
- Op::Sync(Box::new([]))
- });
- assert!(test_id != 0);
-
- let mut expected = HashMap::new();
- expected.insert("ops".to_string(), 0);
- expected.insert("test".to_string(), 1);
- assert_eq!(op_registry.name_to_id, expected);
-
- let isolate = CoreIsolate::new(crate::StartupData::None, false);
-
- let dispatch = op_registry.get(test_id).unwrap();
- let state_rc = CoreIsolate::state(&isolate);
- let mut state = state_rc.borrow_mut();
- let res = dispatch(&mut state, &mut []);
- if let Op::Sync(buf) = res {
- assert_eq!(buf.len(), 0);
- } else {
- unreachable!();
+ fn get_error_class_name(&self, _err: &ErrBox) -> &'static str {
+ "Error"
}
- assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
-
- assert!(op_registry.get(100).is_none());
- op_registry.unregister_op("test");
- expected.remove("test");
- assert_eq!(op_registry.name_to_id, expected);
- assert!(op_registry.get(1).is_none());
}
-#[test]
-fn register_op_during_call() {
- use crate::CoreIsolate;
- use std::sync::atomic;
- use std::sync::Arc;
- use std::sync::Mutex;
- let op_registry = Arc::new(Mutex::new(OpRegistry::new()));
-
- let c = Arc::new(atomic::AtomicUsize::new(0));
- let c_ = c.clone();
-
- let op_registry_ = op_registry.clone();
-
- let test_id = {
- let mut g = op_registry.lock().unwrap();
- g.register("dynamic_register_op", move |_, _| {
- let c__ = c_.clone();
- let mut g = op_registry_.lock().unwrap();
- g.register("test", move |_, _| {
- c__.fetch_add(1, atomic::Ordering::SeqCst);
- Op::Sync(Box::new([]))
- });
- Op::Sync(Box::new([]))
- })
- };
- assert!(test_id != 0);
+/// Collection for storing registered ops. The special 'get_op_catalog'
+/// op with OpId `0` is automatically added when the OpTable is created.
+pub struct OpTable<S>(IndexMap<String, Rc<OpFn<S>>>);
- let isolate = CoreIsolate::new(crate::StartupData::None, false);
+impl<S: OpRegistry> OpTable<S> {
+ pub fn get_op_catalog(&self) -> HashMap<String, OpId> {
+ self.keys().cloned().zip(0..).collect()
+ }
- let dispatcher1 = {
- let g = op_registry.lock().unwrap();
- g.get(test_id).unwrap()
- };
- {
- let state_rc = CoreIsolate::state(&isolate);
- let mut state = state_rc.borrow_mut();
- dispatcher1(&mut state, &mut []);
+ fn op_get_op_catalog(state: Rc<S>, _bufs: BufVec) -> Op {
+ let ops = state.get_op_catalog();
+ let buf = serde_json::to_vec(&ops).map(Into::into).unwrap();
+ Op::Sync(buf)
}
+}
- let mut expected = HashMap::new();
- expected.insert("ops".to_string(), 0);
- expected.insert("dynamic_register_op".to_string(), 1);
- expected.insert("test".to_string(), 2);
- {
- let g = op_registry.lock().unwrap();
- assert_eq!(g.name_to_id, expected);
+impl<S: OpRegistry> Default for OpTable<S> {
+ fn default() -> Self {
+ Self(
+ once(("ops".to_owned(), Rc::new(Self::op_get_op_catalog) as _)).collect(),
+ )
}
+}
- let dispatcher2 = {
- let g = op_registry.lock().unwrap();
- g.get(2).unwrap()
- };
- let state_rc = CoreIsolate::state(&isolate);
- let mut state = state_rc.borrow_mut();
- let res = dispatcher2(&mut state, &mut []);
- if let Op::Sync(buf) = res {
- assert_eq!(buf.len(), 0);
- } else {
- unreachable!();
+impl<S> Deref for OpTable<S> {
+ type Target = IndexMap<String, Rc<OpFn<S>>>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
}
- assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
+}
- let g = op_registry.lock().unwrap();
- assert!(g.get(100).is_none());
+impl<S> DerefMut for OpTable<S> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
}