summaryrefslogtreecommitdiff
path: root/core/runtime.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/runtime.rs')
-rw-r--r--core/runtime.rs512
1 files changed, 87 insertions, 425 deletions
diff --git a/core/runtime.rs b/core/runtime.rs
index 80fe90d2f..1f9e62f4f 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -20,10 +20,11 @@ use crate::modules::NoopModuleLoader;
use crate::modules::PrepareLoadFuture;
use crate::modules::RecursiveModuleLoad;
use crate::ops::*;
-use crate::shared_queue::SharedQueue;
-use crate::shared_queue::RECOMMENDED_SIZE;
-use crate::BufVec;
+use crate::OpPayload;
+use crate::OpResponse;
use crate::OpState;
+use crate::PromiseId;
+use crate::ZeroCopyBuf;
use futures::channel::mpsc;
use futures::future::poll_fn;
use futures::stream::FuturesUnordered;
@@ -45,7 +46,7 @@ use std::sync::Once;
use std::task::Context;
use std::task::Poll;
-type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Box<[u8]>)>>>;
+type PendingOpFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResponse)>>>;
pub enum Snapshot {
Static(&'static [u8]),
@@ -99,7 +100,6 @@ struct ModEvaluate {
/// embedder slots.
pub(crate) struct JsRuntimeState {
pub global_context: Option<v8::Global<v8::Context>>,
- pub(crate) shared_ab: Option<v8::Global<v8::SharedArrayBuffer>>,
pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>,
pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>,
pub(crate) pending_promise_exceptions:
@@ -107,7 +107,6 @@ pub(crate) struct JsRuntimeState {
pending_dyn_mod_evaluate: HashMap<ModuleLoadId, DynImportModEvaluate>,
pending_mod_evaluate: Option<ModEvaluate>,
pub(crate) js_error_create_fn: Rc<JsErrorCreateFn>,
- pub(crate) shared: SharedQueue,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) have_unpolled_ops: bool,
@@ -276,11 +275,9 @@ impl JsRuntime {
pending_promise_exceptions: HashMap::new(),
pending_dyn_mod_evaluate: HashMap::new(),
pending_mod_evaluate: None,
- shared_ab: None,
js_recv_cb: None,
js_macrotask_cb: None,
js_error_create_fn,
- shared: SharedQueue::new(RECOMMENDED_SIZE),
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
op_state: Rc::new(RefCell::new(op_state)),
@@ -305,7 +302,7 @@ impl JsRuntime {
}
if !options.will_snapshot {
- js_runtime.shared_queue_init();
+ js_runtime.core_js_init();
}
js_runtime
@@ -350,16 +347,13 @@ impl JsRuntime {
.unwrap();
}
- /// Executes a JavaScript code to initialize shared queue binding
- /// between Rust and JS.
+ /// Executes JavaScript code to initialize core.js,
+ /// specifically the js_recv_cb setter
///
/// This function mustn't be called during snapshotting.
- fn shared_queue_init(&mut self) {
+ fn core_js_init(&mut self) {
self
- .execute(
- "deno:core/shared_queue_init.js",
- "Deno.core.sharedQueueInit()",
- )
+ .execute("deno:core/init.js", "Deno.core.init()")
.unwrap();
}
@@ -448,7 +442,7 @@ impl JsRuntime {
/// * [json_op_async()](fn.json_op_async.html)
pub fn register_op<F>(&mut self, name: &str, op_fn: F) -> OpId
where
- F: Fn(Rc<RefCell<OpState>>, BufVec) -> Op + 'static,
+ F: Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static,
{
Self::state(self.v8_isolate())
.borrow_mut()
@@ -516,8 +510,8 @@ impl JsRuntime {
// Ops
{
- let overflow_response = self.poll_pending_ops(cx);
- self.async_op_response(overflow_response)?;
+ let async_responses = self.poll_pending_ops(cx);
+ self.async_op_response(async_responses)?;
self.drain_macrotasks()?;
self.check_promise_exceptions()?;
}
@@ -1325,9 +1319,12 @@ impl JsRuntime {
self.mod_instantiate(root_id).map(|_| root_id)
}
- fn poll_pending_ops(&mut self, cx: &mut Context) -> Vec<(OpId, Box<[u8]>)> {
+ fn poll_pending_ops(
+ &mut self,
+ cx: &mut Context,
+ ) -> Vec<(PromiseId, OpResponse)> {
let state_rc = Self::state(self.v8_isolate());
- let mut overflow_response: Vec<(OpId, Box<[u8]>)> = Vec::new();
+ let mut async_responses: Vec<(PromiseId, OpResponse)> = Vec::new();
let mut state = state_rc.borrow_mut();
@@ -1339,11 +1336,8 @@ impl JsRuntime {
match pending_r {
Poll::Ready(None) => break,
Poll::Pending => break,
- Poll::Ready(Some((op_id, buf))) => {
- let successful_push = state.shared.push(op_id, &buf);
- if !successful_push {
- overflow_response.push((op_id, buf));
- }
+ Poll::Ready(Some((promise_id, resp))) => {
+ async_responses.push((promise_id, resp));
}
};
}
@@ -1353,16 +1347,13 @@ impl JsRuntime {
match unref_r {
Poll::Ready(None) => break,
Poll::Pending => break,
- Poll::Ready(Some((op_id, buf))) => {
- let successful_push = state.shared.push(op_id, &buf);
- if !successful_push {
- overflow_response.push((op_id, buf));
- }
+ Poll::Ready(Some((promise_id, resp))) => {
+ async_responses.push((promise_id, resp));
}
};
}
- overflow_response
+ async_responses
}
fn check_promise_exceptions(&mut self) -> Result<(), AnyError> {
@@ -1391,17 +1382,15 @@ impl JsRuntime {
exception_to_err_result(scope, exception, true)
}
- // Respond using shared queue and optionally overflown response
+ // Send finished responses to JS
fn async_op_response(
&mut self,
- overflown_responses: Vec<(OpId, Box<[u8]>)>,
+ async_responses: Vec<(PromiseId, OpResponse)>,
) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
- let shared_queue_size = state_rc.borrow().shared.size();
- let overflown_responses_size = overflown_responses.len();
-
- if shared_queue_size == 0 && overflown_responses_size == 0 {
+ let async_responses_size = async_responses.len();
+ if async_responses_size == 0 {
return Ok(());
}
@@ -1422,26 +1411,32 @@ impl JsRuntime {
let tc_scope = &mut v8::TryCatch::new(scope);
+ // We return async responses to JS in unbounded batches (may change),
+ // each batch is a flat vector of tuples:
+ // `[promise_id1, op_result1, promise_id2, op_result2, ...]`
+ // promise_id is a simple integer, op_result is an ops::OpResult
+ // which contains a value OR an error, encoded as a tuple.
+ // This batch is received in JS via the special `arguments` variable
+ // and then each tuple is used to resolve or reject promises
let mut args: Vec<v8::Local<v8::Value>> =
- Vec::with_capacity(2 * overflown_responses_size);
- for overflown_response in overflown_responses {
- let (op_id, buf) = overflown_response;
- args.push(v8::Integer::new(tc_scope, op_id as i32).into());
- args.push(bindings::boxed_slice_to_uint8array(tc_scope, buf).into());
+ Vec::with_capacity(2 * async_responses_size);
+ for overflown_response in async_responses {
+ let (promise_id, resp) = overflown_response;
+ args.push(v8::Integer::new(tc_scope, promise_id as i32).into());
+ args.push(match resp {
+ OpResponse::Value(value) => serde_v8::to_v8(tc_scope, value).unwrap(),
+ OpResponse::Buffer(buf) => {
+ bindings::boxed_slice_to_uint8array(tc_scope, buf).into()
+ }
+ });
}
- if shared_queue_size > 0 || overflown_responses_size > 0 {
+ if async_responses_size > 0 {
js_recv_cb.call(tc_scope, global, args.as_slice());
}
match tc_scope.exception() {
- None => {
- // The other side should have shifted off all the messages.
- let shared_queue_size = state_rc.borrow().shared.size();
- assert_eq!(shared_queue_size, 0);
-
- Ok(())
- }
+ None => Ok(()),
Some(exception) => exception_to_err_result(tc_scope, exception, false),
}
}
@@ -1485,7 +1480,6 @@ impl JsRuntime {
pub mod tests {
use super::*;
use crate::modules::ModuleSourceFuture;
- use crate::BufVec;
use futures::future::lazy;
use futures::FutureExt;
use std::io;
@@ -1501,31 +1495,10 @@ pub mod tests {
futures::executor::block_on(lazy(move |cx| f(cx)));
}
- fn poll_until_ready(
- runtime: &mut JsRuntime,
- max_poll_count: usize,
- ) -> Result<(), AnyError> {
- let mut cx = Context::from_waker(futures::task::noop_waker_ref());
- for _ in 0..max_poll_count {
- match runtime.poll_event_loop(&mut cx) {
- Poll::Pending => continue,
- Poll::Ready(val) => return val,
- }
- }
- panic!(
- "JsRuntime still not ready after polling {} times.",
- max_poll_count
- )
- }
-
enum Mode {
Async,
AsyncUnref,
- AsyncZeroCopy(u8),
- OverflowReqSync,
- OverflowResSync,
- OverflowReqAsync,
- OverflowResAsync,
+ AsyncZeroCopy(bool),
}
struct TestState {
@@ -1533,68 +1506,39 @@ pub mod tests {
dispatch_count: Arc<AtomicUsize>,
}
- fn dispatch(op_state: Rc<RefCell<OpState>>, bufs: BufVec) -> Op {
+ fn dispatch(
+ op_state: Rc<RefCell<OpState>>,
+ payload: OpPayload,
+ buf: Option<ZeroCopyBuf>,
+ ) -> Op {
let op_state_ = op_state.borrow();
let test_state = op_state_.borrow::<TestState>();
test_state.dispatch_count.fetch_add(1, Ordering::Relaxed);
match test_state.mode {
Mode::Async => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 1);
- assert_eq!(bufs[0][0], 42);
- let buf = vec![43u8].into_boxed_slice();
- Op::Async(futures::future::ready(buf).boxed())
+ let control: u8 = payload.deserialize().unwrap();
+ assert_eq!(control, 42);
+ let resp = OpResponse::Value(Box::new(43));
+ Op::Async(Box::pin(futures::future::ready(resp)))
}
Mode::AsyncUnref => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 1);
- assert_eq!(bufs[0][0], 42);
+ let control: u8 = payload.deserialize().unwrap();
+ assert_eq!(control, 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
- vec![43u8].into_boxed_slice()
+ OpResponse::Value(Box::new(43))
};
- Op::AsyncUnref(fut.boxed())
+ Op::AsyncUnref(Box::pin(fut))
}
- Mode::AsyncZeroCopy(count) => {
- assert_eq!(bufs.len(), count as usize);
- bufs.iter().enumerate().for_each(|(idx, buf)| {
+ Mode::AsyncZeroCopy(has_buffer) => {
+ assert_eq!(buf.is_some(), has_buffer);
+ if let Some(buf) = buf {
assert_eq!(buf.len(), 1);
- assert_eq!(idx, buf[0] as usize);
- });
+ }
- let buf = vec![43u8].into_boxed_slice();
- Op::Async(futures::future::ready(buf).boxed())
- }
- Mode::OverflowReqSync => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 100 * 1024 * 1024);
- let buf = vec![43u8].into_boxed_slice();
- Op::Sync(buf)
- }
- Mode::OverflowResSync => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 1);
- assert_eq!(bufs[0][0], 42);
- let mut vec = vec![0u8; 100 * 1024 * 1024];
- vec[0] = 99;
- let buf = vec.into_boxed_slice();
- Op::Sync(buf)
- }
- Mode::OverflowReqAsync => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 100 * 1024 * 1024);
- let buf = vec![43u8].into_boxed_slice();
- Op::Async(futures::future::ready(buf).boxed())
- }
- Mode::OverflowResAsync => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 1);
- assert_eq!(bufs[0][0], 42);
- let mut vec = vec![0u8; 100 * 1024 * 1024];
- vec[0] = 4;
- let buf = vec.into_boxed_slice();
- Op::Async(futures::future::ready(buf).boxed())
+ let resp = OpResponse::Value(Box::new(43));
+ Op::Async(Box::pin(futures::future::ready(resp)))
}
}
}
@@ -1633,10 +1577,10 @@ pub mod tests {
.execute(
"filename.js",
r#"
- let control = new Uint8Array([42]);
- Deno.core.send(1, control);
+ let control = 42;
+ Deno.core.send(1, null, control);
async function main() {
- Deno.core.send(1, control);
+ Deno.core.send(1, null, control);
}
main();
"#,
@@ -1647,7 +1591,7 @@ pub mod tests {
#[test]
fn test_dispatch_no_zero_copy_buf() {
- let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(0));
+ let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false));
runtime
.execute(
"filename.js",
@@ -1661,14 +1605,13 @@ pub mod tests {
#[test]
fn test_dispatch_stack_zero_copy_bufs() {
- let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(2));
+ let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(true));
runtime
.execute(
"filename.js",
r#"
let zero_copy_a = new Uint8Array([0]);
- let zero_copy_b = new Uint8Array([1]);
- Deno.core.send(1, zero_copy_a, zero_copy_b);
+ Deno.core.send(1, null, null, zero_copy_a);
"#,
)
.unwrap();
@@ -1676,23 +1619,7 @@ pub mod tests {
}
#[test]
- fn test_dispatch_heap_zero_copy_bufs() {
- let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(5));
- runtime.execute(
- "filename.js",
- r#"
- let zero_copy_a = new Uint8Array([0]);
- let zero_copy_b = new Uint8Array([1]);
- let zero_copy_c = new Uint8Array([2]);
- let zero_copy_d = new Uint8Array([3]);
- let zero_copy_e = new Uint8Array([4]);
- Deno.core.send(1, zero_copy_a, zero_copy_b, zero_copy_c, zero_copy_d, zero_copy_e);
- "#,
- ).unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- }
-
- #[test]
+ #[ignore] // TODO(ry) re-enable? setAsyncHandler has been removed
fn test_poll_async_delayed_ops() {
run_in_task(|cx| {
let (mut runtime, dispatch_count) = setup(Mode::Async);
@@ -1714,8 +1641,8 @@ pub mod tests {
"check1.js",
r#"
assert(nrecv == 0);
- let control = new Uint8Array([42]);
- Deno.core.send(1, control);
+ let control = 42;
+ Deno.core.send(1, null, control);
assert(nrecv == 0);
"#,
)
@@ -1728,7 +1655,7 @@ pub mod tests {
"check2.js",
r#"
assert(nrecv == 1);
- Deno.core.send(1, control);
+ Deno.core.send(1, null, control);
assert(nrecv == 1);
"#,
)
@@ -1743,6 +1670,7 @@ pub mod tests {
}
#[test]
+ #[ignore] // TODO(ry) re-enable? setAsyncHandler has been removed
fn test_poll_async_optional_ops() {
run_in_task(|cx| {
let (mut runtime, dispatch_count) = setup(Mode::AsyncUnref);
@@ -1754,8 +1682,8 @@ pub mod tests {
// This handler will never be called
assert(false);
});
- let control = new Uint8Array([42]);
- Deno.core.send(1, control);
+ let control = 42;
+ Deno.core.send(1, null, control);
"#,
)
.unwrap();
@@ -1818,261 +1746,9 @@ pub mod tests {
}
#[test]
- fn overflow_req_sync() {
- let (mut runtime, dispatch_count) = setup(Mode::OverflowReqSync);
- runtime
- .execute(
- "overflow_req_sync.js",
- r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ });
- // Large message that will overflow the shared space.
- let control = new Uint8Array(100 * 1024 * 1024);
- let response = Deno.core.dispatch(1, control);
- assert(response instanceof Uint8Array);
- assert(response.length == 1);
- assert(response[0] == 43);
- assert(asyncRecv == 0);
- "#,
- )
- .unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- }
-
- #[test]
- fn overflow_res_sync() {
- // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
- // should optimize this.
- let (mut runtime, dispatch_count) = setup(Mode::OverflowResSync);
- runtime
- .execute(
- "overflow_res_sync.js",
- r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ });
- // Large message that will overflow the shared space.
- let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(1, control);
- assert(response instanceof Uint8Array);
- assert(response.length == 100 * 1024 * 1024);
- assert(response[0] == 99);
- assert(asyncRecv == 0);
- "#,
- )
- .unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- }
-
- #[test]
- fn overflow_req_async() {
- run_in_task(|cx| {
- let (mut runtime, dispatch_count) = setup(Mode::OverflowReqAsync);
- runtime
- .execute(
- "overflow_req_async.js",
- r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => {
- assert(buf.byteLength === 1);
- assert(buf[0] === 43);
- asyncRecv++;
- });
- // Large message that will overflow the shared space.
- let control = new Uint8Array(100 * 1024 * 1024);
- let response = Deno.core.dispatch(1, control);
- // Async messages always have null response.
- assert(response == null);
- assert(asyncRecv == 0);
- "#,
- )
- .unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
- runtime
- .execute("check.js", "assert(asyncRecv == 1);")
- .unwrap();
- });
- }
-
- #[test]
- fn overflow_res_async_combined_with_unref() {
- run_in_task(|cx| {
- let mut runtime = JsRuntime::new(Default::default());
-
- runtime.register_op(
- "test1",
- |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
- let mut vec = vec![0u8; 100 * 1024 * 1024];
- vec[0] = 4;
- let buf = vec.into_boxed_slice();
- Op::Async(futures::future::ready(buf).boxed())
- },
- );
-
- runtime.register_op(
- "test2",
- |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
- let mut vec = vec![0u8; 100 * 1024 * 1024];
- vec[0] = 4;
- let buf = vec.into_boxed_slice();
- Op::AsyncUnref(futures::future::ready(buf).boxed())
- },
- );
-
- runtime
- .execute(
- "overflow_res_async_combined_with_unref.js",
- r#"
- function assert(cond) {
- if (!cond) {
- throw Error("assert");
- }
- }
-
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => {
- assert(buf.byteLength === 100 * 1024 * 1024);
- assert(buf[0] === 4);
- asyncRecv++;
- });
- Deno.core.setAsyncHandler(2, (buf) => {
- assert(buf.byteLength === 100 * 1024 * 1024);
- assert(buf[0] === 4);
- asyncRecv++;
- });
- let control = new Uint8Array(1);
- let response1 = Deno.core.dispatch(1, control);
- // Async messages always have null response.
- assert(response1 == null);
- assert(asyncRecv == 0);
- let response2 = Deno.core.dispatch(2, control);
- // Async messages always have null response.
- assert(response2 == null);
- assert(asyncRecv == 0);
- "#,
- )
- .unwrap();
- assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
- runtime
- .execute("check.js", "assert(asyncRecv == 2);")
- .unwrap();
- });
- }
-
- #[test]
- fn overflow_res_async() {
- run_in_task(|_cx| {
- // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
- // should optimize this.
- let (mut runtime, dispatch_count) = setup(Mode::OverflowResAsync);
- runtime
- .execute(
- "overflow_res_async.js",
- r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => {
- assert(buf.byteLength === 100 * 1024 * 1024);
- assert(buf[0] === 4);
- asyncRecv++;
- });
- // Large message that will overflow the shared space.
- let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(1, control);
- assert(response == null);
- assert(asyncRecv == 0);
- "#,
- )
- .unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- poll_until_ready(&mut runtime, 3).unwrap();
- runtime
- .execute("check.js", "assert(asyncRecv == 1);")
- .unwrap();
- });
- }
-
- #[test]
- fn overflow_res_multiple_dispatch_async() {
- // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
- // should optimize this.
- run_in_task(|_cx| {
- let (mut runtime, dispatch_count) = setup(Mode::OverflowResAsync);
- runtime
- .execute(
- "overflow_res_multiple_dispatch_async.js",
- r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => {
- assert(buf.byteLength === 100 * 1024 * 1024);
- assert(buf[0] === 4);
- asyncRecv++;
- });
- // Large message that will overflow the shared space.
- let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(1, control);
- assert(response == null);
- assert(asyncRecv == 0);
- // Dispatch another message to verify that pending ops
- // are done even if shared space overflows
- Deno.core.dispatch(1, control);
- "#,
- )
- .unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
- poll_until_ready(&mut runtime, 3).unwrap();
- runtime
- .execute("check.js", "assert(asyncRecv == 2);")
- .unwrap();
- });
- }
-
- #[test]
- fn shared_queue_not_empty_when_js_error() {
- run_in_task(|_cx| {
- let dispatch_count = Arc::new(AtomicUsize::new(0));
- let mut runtime = JsRuntime::new(Default::default());
- let op_state = runtime.op_state();
- op_state.borrow_mut().put(TestState {
- mode: Mode::Async,
- dispatch_count: dispatch_count.clone(),
- });
-
- runtime.register_op("test", dispatch);
- runtime
- .execute(
- "shared_queue_not_empty_when_js_error.js",
- r#"
- const assert = (cond) => {if (!cond) throw Error("assert")};
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => {
- asyncRecv++;
- throw Error('x');
- });
-
- Deno.core.dispatch(1, new Uint8Array([42]));
- Deno.core.dispatch(1, new Uint8Array([42]));
- "#,
- )
- .unwrap();
-
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
- if poll_until_ready(&mut runtime, 3).is_ok() {
- panic!("Thrown error was not detected!")
- }
- runtime
- .execute("check.js", "assert(asyncRecv == 1);")
- .unwrap();
-
- let state_rc = JsRuntime::state(runtime.v8_isolate());
- let shared_queue_size = state_rc.borrow().shared.size();
- assert_eq!(shared_queue_size, 1);
- });
- }
-
- #[test]
fn test_pre_dispatch() {
run_in_task(|mut cx| {
- let (mut runtime, _dispatch_count) = setup(Mode::OverflowResAsync);
+ let (mut runtime, _dispatch_count) = setup(Mode::Async);
runtime
.execute(
"bad_op_id.js",
@@ -2094,19 +1770,6 @@ pub mod tests {
}
#[test]
- fn core_test_js() {
- run_in_task(|mut cx| {
- let (mut runtime, _dispatch_count) = setup(Mode::Async);
- runtime
- .execute("core_test.js", include_str!("core_test.js"))
- .unwrap();
- if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) {
- unreachable!();
- }
- });
- }
-
- #[test]
fn syntax_error() {
let mut runtime = JsRuntime::new(Default::default());
let src = "hocuspocus(";
@@ -2315,13 +1978,12 @@ pub mod tests {
let dispatch_count = Arc::new(AtomicUsize::new(0));
let dispatch_count_ = dispatch_count.clone();
- let dispatcher = move |_state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
+ let dispatcher = move |_state, payload: OpPayload, _buf| -> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 1);
- assert_eq!(bufs[0][0], 42);
- let buf = [43u8, 0, 0, 0][..].into();
- Op::Async(futures::future::ready(buf).boxed())
+ let control: u8 = payload.deserialize().unwrap();
+ assert_eq!(control, 42);
+ let resp = OpResponse::Value(Box::new(43));
+ Op::Async(Box::pin(futures::future::ready(resp)))
};
let mut runtime = JsRuntime::new(RuntimeOptions {
@@ -2353,8 +2015,8 @@ pub mod tests {
r#"
import { b } from './b.js'
if (b() != 'b') throw Error();
- let control = new Uint8Array([42]);
- Deno.core.send(1, control);
+ let control = 42;
+ Deno.core.send(1, null, control);
"#,
)
.unwrap();