diff options
author | Aaron O'Mullan <aaron.omullan@gmail.com> | 2021-03-31 16:37:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-31 10:37:38 -0400 |
commit | fec1b2a5a4324a7eecdfbb2471931f3b6b0139c5 (patch) | |
tree | 8a650553c2d70e047d9d7365f9ac8702ec9861a5 /core/runtime.rs | |
parent | 6dc3549a818ad49b3907d18c93fd422a9cc743a5 (diff) |
refactor: new optimized op-layer using serde_v8 (#9843)
- Improves op performance.
- Handle op-metadata (errors, promise IDs) explicitly in the op-layer vs
per op-encoding (aka: out-of-payload).
- Remove shared queue & custom "asyncHandlers", all async values are
returned in batches via js_recv_cb.
- The op-layer should be thought of as simple function calls with little
indirection or translation besides the conceptually straightforward
serde_v8 bijections.
- Preserve concepts of json/bin/min as semantic groups of their
inputs/outputs instead of their op-encoding strategy, preserving these
groups will also facilitate partial transitions over to v8 Fast API for the
"min" and "bin" groups
Diffstat (limited to 'core/runtime.rs')
-rw-r--r-- | core/runtime.rs | 512 |
1 files changed, 87 insertions, 425 deletions
diff --git a/core/runtime.rs b/core/runtime.rs index 80fe90d2f..1f9e62f4f 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -20,10 +20,11 @@ use crate::modules::NoopModuleLoader; use crate::modules::PrepareLoadFuture; use crate::modules::RecursiveModuleLoad; use crate::ops::*; -use crate::shared_queue::SharedQueue; -use crate::shared_queue::RECOMMENDED_SIZE; -use crate::BufVec; +use crate::OpPayload; +use crate::OpResponse; use crate::OpState; +use crate::PromiseId; +use crate::ZeroCopyBuf; use futures::channel::mpsc; use futures::future::poll_fn; use futures::stream::FuturesUnordered; @@ -45,7 +46,7 @@ use std::sync::Once; use std::task::Context; use std::task::Poll; -type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Box<[u8]>)>>>; +type PendingOpFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResponse)>>>; pub enum Snapshot { Static(&'static [u8]), @@ -99,7 +100,6 @@ struct ModEvaluate { /// embedder slots. pub(crate) struct JsRuntimeState { pub global_context: Option<v8::Global<v8::Context>>, - pub(crate) shared_ab: Option<v8::Global<v8::SharedArrayBuffer>>, pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>, pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>, pub(crate) pending_promise_exceptions: @@ -107,7 +107,6 @@ pub(crate) struct JsRuntimeState { pending_dyn_mod_evaluate: HashMap<ModuleLoadId, DynImportModEvaluate>, pending_mod_evaluate: Option<ModEvaluate>, pub(crate) js_error_create_fn: Rc<JsErrorCreateFn>, - pub(crate) shared: SharedQueue, pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>, pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>, pub(crate) have_unpolled_ops: bool, @@ -276,11 +275,9 @@ impl JsRuntime { pending_promise_exceptions: HashMap::new(), pending_dyn_mod_evaluate: HashMap::new(), pending_mod_evaluate: None, - shared_ab: None, js_recv_cb: None, js_macrotask_cb: None, js_error_create_fn, - shared: SharedQueue::new(RECOMMENDED_SIZE), pending_ops: FuturesUnordered::new(), pending_unref_ops: FuturesUnordered::new(), op_state: Rc::new(RefCell::new(op_state)), @@ -305,7 +302,7 @@ impl JsRuntime { } if !options.will_snapshot { - js_runtime.shared_queue_init(); + js_runtime.core_js_init(); } js_runtime @@ -350,16 +347,13 @@ impl JsRuntime { .unwrap(); } - /// Executes a JavaScript code to initialize shared queue binding - /// between Rust and JS. + /// Executes JavaScript code to initialize core.js, + /// specifically the js_recv_cb setter /// /// This function mustn't be called during snapshotting. - fn shared_queue_init(&mut self) { + fn core_js_init(&mut self) { self - .execute( - "deno:core/shared_queue_init.js", - "Deno.core.sharedQueueInit()", - ) + .execute("deno:core/init.js", "Deno.core.init()") .unwrap(); } @@ -448,7 +442,7 @@ impl JsRuntime { /// * [json_op_async()](fn.json_op_async.html) pub fn register_op<F>(&mut self, name: &str, op_fn: F) -> OpId where - F: Fn(Rc<RefCell<OpState>>, BufVec) -> Op + 'static, + F: Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static, { Self::state(self.v8_isolate()) .borrow_mut() @@ -516,8 +510,8 @@ impl JsRuntime { // Ops { - let overflow_response = self.poll_pending_ops(cx); - self.async_op_response(overflow_response)?; + let async_responses = self.poll_pending_ops(cx); + self.async_op_response(async_responses)?; self.drain_macrotasks()?; self.check_promise_exceptions()?; } @@ -1325,9 +1319,12 @@ impl JsRuntime { self.mod_instantiate(root_id).map(|_| root_id) } - fn poll_pending_ops(&mut self, cx: &mut Context) -> Vec<(OpId, Box<[u8]>)> { + fn poll_pending_ops( + &mut self, + cx: &mut Context, + ) -> Vec<(PromiseId, OpResponse)> { let state_rc = Self::state(self.v8_isolate()); - let mut overflow_response: Vec<(OpId, Box<[u8]>)> = Vec::new(); + let mut async_responses: Vec<(PromiseId, OpResponse)> = Vec::new(); let mut state = state_rc.borrow_mut(); @@ -1339,11 +1336,8 @@ impl JsRuntime { match pending_r { Poll::Ready(None) => break, Poll::Pending => break, - Poll::Ready(Some((op_id, buf))) => { - let successful_push = state.shared.push(op_id, &buf); - if !successful_push { - overflow_response.push((op_id, buf)); - } + Poll::Ready(Some((promise_id, resp))) => { + async_responses.push((promise_id, resp)); } }; } @@ -1353,16 +1347,13 @@ impl JsRuntime { match unref_r { Poll::Ready(None) => break, Poll::Pending => break, - Poll::Ready(Some((op_id, buf))) => { - let successful_push = state.shared.push(op_id, &buf); - if !successful_push { - overflow_response.push((op_id, buf)); - } + Poll::Ready(Some((promise_id, resp))) => { + async_responses.push((promise_id, resp)); } }; } - overflow_response + async_responses } fn check_promise_exceptions(&mut self) -> Result<(), AnyError> { @@ -1391,17 +1382,15 @@ impl JsRuntime { exception_to_err_result(scope, exception, true) } - // Respond using shared queue and optionally overflown response + // Send finished responses to JS fn async_op_response( &mut self, - overflown_responses: Vec<(OpId, Box<[u8]>)>, + async_responses: Vec<(PromiseId, OpResponse)>, ) -> Result<(), AnyError> { let state_rc = Self::state(self.v8_isolate()); - let shared_queue_size = state_rc.borrow().shared.size(); - let overflown_responses_size = overflown_responses.len(); - - if shared_queue_size == 0 && overflown_responses_size == 0 { + let async_responses_size = async_responses.len(); + if async_responses_size == 0 { return Ok(()); } @@ -1422,26 +1411,32 @@ impl JsRuntime { let tc_scope = &mut v8::TryCatch::new(scope); + // We return async responses to JS in unbounded batches (may change), + // each batch is a flat vector of tuples: + // `[promise_id1, op_result1, promise_id2, op_result2, ...]` + // promise_id is a simple integer, op_result is an ops::OpResult + // which contains a value OR an error, encoded as a tuple. + // This batch is received in JS via the special `arguments` variable + // and then each tuple is used to resolve or reject promises let mut args: Vec<v8::Local<v8::Value>> = - Vec::with_capacity(2 * overflown_responses_size); - for overflown_response in overflown_responses { - let (op_id, buf) = overflown_response; - args.push(v8::Integer::new(tc_scope, op_id as i32).into()); - args.push(bindings::boxed_slice_to_uint8array(tc_scope, buf).into()); + Vec::with_capacity(2 * async_responses_size); + for overflown_response in async_responses { + let (promise_id, resp) = overflown_response; + args.push(v8::Integer::new(tc_scope, promise_id as i32).into()); + args.push(match resp { + OpResponse::Value(value) => serde_v8::to_v8(tc_scope, value).unwrap(), + OpResponse::Buffer(buf) => { + bindings::boxed_slice_to_uint8array(tc_scope, buf).into() + } + }); } - if shared_queue_size > 0 || overflown_responses_size > 0 { + if async_responses_size > 0 { js_recv_cb.call(tc_scope, global, args.as_slice()); } match tc_scope.exception() { - None => { - // The other side should have shifted off all the messages. - let shared_queue_size = state_rc.borrow().shared.size(); - assert_eq!(shared_queue_size, 0); - - Ok(()) - } + None => Ok(()), Some(exception) => exception_to_err_result(tc_scope, exception, false), } } @@ -1485,7 +1480,6 @@ impl JsRuntime { pub mod tests { use super::*; use crate::modules::ModuleSourceFuture; - use crate::BufVec; use futures::future::lazy; use futures::FutureExt; use std::io; @@ -1501,31 +1495,10 @@ pub mod tests { futures::executor::block_on(lazy(move |cx| f(cx))); } - fn poll_until_ready( - runtime: &mut JsRuntime, - max_poll_count: usize, - ) -> Result<(), AnyError> { - let mut cx = Context::from_waker(futures::task::noop_waker_ref()); - for _ in 0..max_poll_count { - match runtime.poll_event_loop(&mut cx) { - Poll::Pending => continue, - Poll::Ready(val) => return val, - } - } - panic!( - "JsRuntime still not ready after polling {} times.", - max_poll_count - ) - } - enum Mode { Async, AsyncUnref, - AsyncZeroCopy(u8), - OverflowReqSync, - OverflowResSync, - OverflowReqAsync, - OverflowResAsync, + AsyncZeroCopy(bool), } struct TestState { @@ -1533,68 +1506,39 @@ pub mod tests { dispatch_count: Arc<AtomicUsize>, } - fn dispatch(op_state: Rc<RefCell<OpState>>, bufs: BufVec) -> Op { + fn dispatch( + op_state: Rc<RefCell<OpState>>, + payload: OpPayload, + buf: Option<ZeroCopyBuf>, + ) -> Op { let op_state_ = op_state.borrow(); let test_state = op_state_.borrow::<TestState>(); test_state.dispatch_count.fetch_add(1, Ordering::Relaxed); match test_state.mode { Mode::Async => { - assert_eq!(bufs.len(), 1); - assert_eq!(bufs[0].len(), 1); - assert_eq!(bufs[0][0], 42); - let buf = vec![43u8].into_boxed_slice(); - Op::Async(futures::future::ready(buf).boxed()) + let control: u8 = payload.deserialize().unwrap(); + assert_eq!(control, 42); + let resp = OpResponse::Value(Box::new(43)); + Op::Async(Box::pin(futures::future::ready(resp))) } Mode::AsyncUnref => { - assert_eq!(bufs.len(), 1); - assert_eq!(bufs[0].len(), 1); - assert_eq!(bufs[0][0], 42); + let control: u8 = payload.deserialize().unwrap(); + assert_eq!(control, 42); let fut = async { // This future never finish. futures::future::pending::<()>().await; - vec![43u8].into_boxed_slice() + OpResponse::Value(Box::new(43)) }; - Op::AsyncUnref(fut.boxed()) + Op::AsyncUnref(Box::pin(fut)) } - Mode::AsyncZeroCopy(count) => { - assert_eq!(bufs.len(), count as usize); - bufs.iter().enumerate().for_each(|(idx, buf)| { + Mode::AsyncZeroCopy(has_buffer) => { + assert_eq!(buf.is_some(), has_buffer); + if let Some(buf) = buf { assert_eq!(buf.len(), 1); - assert_eq!(idx, buf[0] as usize); - }); + } - let buf = vec![43u8].into_boxed_slice(); - Op::Async(futures::future::ready(buf).boxed()) - } - Mode::OverflowReqSync => { - assert_eq!(bufs.len(), 1); - assert_eq!(bufs[0].len(), 100 * 1024 * 1024); - let buf = vec![43u8].into_boxed_slice(); - Op::Sync(buf) - } - Mode::OverflowResSync => { - assert_eq!(bufs.len(), 1); - assert_eq!(bufs[0].len(), 1); - assert_eq!(bufs[0][0], 42); - let mut vec = vec![0u8; 100 * 1024 * 1024]; - vec[0] = 99; - let buf = vec.into_boxed_slice(); - Op::Sync(buf) - } - Mode::OverflowReqAsync => { - assert_eq!(bufs.len(), 1); - assert_eq!(bufs[0].len(), 100 * 1024 * 1024); - let buf = vec![43u8].into_boxed_slice(); - Op::Async(futures::future::ready(buf).boxed()) - } - Mode::OverflowResAsync => { - assert_eq!(bufs.len(), 1); - assert_eq!(bufs[0].len(), 1); - assert_eq!(bufs[0][0], 42); - let mut vec = vec![0u8; 100 * 1024 * 1024]; - vec[0] = 4; - let buf = vec.into_boxed_slice(); - Op::Async(futures::future::ready(buf).boxed()) + let resp = OpResponse::Value(Box::new(43)); + Op::Async(Box::pin(futures::future::ready(resp))) } } } @@ -1633,10 +1577,10 @@ pub mod tests { .execute( "filename.js", r#" - let control = new Uint8Array([42]); - Deno.core.send(1, control); + let control = 42; + Deno.core.send(1, null, control); async function main() { - Deno.core.send(1, control); + Deno.core.send(1, null, control); } main(); "#, @@ -1647,7 +1591,7 @@ pub mod tests { #[test] fn test_dispatch_no_zero_copy_buf() { - let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(0)); + let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false)); runtime .execute( "filename.js", @@ -1661,14 +1605,13 @@ pub mod tests { #[test] fn test_dispatch_stack_zero_copy_bufs() { - let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(2)); + let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(true)); runtime .execute( "filename.js", r#" let zero_copy_a = new Uint8Array([0]); - let zero_copy_b = new Uint8Array([1]); - Deno.core.send(1, zero_copy_a, zero_copy_b); + Deno.core.send(1, null, null, zero_copy_a); "#, ) .unwrap(); @@ -1676,23 +1619,7 @@ pub mod tests { } #[test] - fn test_dispatch_heap_zero_copy_bufs() { - let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(5)); - runtime.execute( - "filename.js", - r#" - let zero_copy_a = new Uint8Array([0]); - let zero_copy_b = new Uint8Array([1]); - let zero_copy_c = new Uint8Array([2]); - let zero_copy_d = new Uint8Array([3]); - let zero_copy_e = new Uint8Array([4]); - Deno.core.send(1, zero_copy_a, zero_copy_b, zero_copy_c, zero_copy_d, zero_copy_e); - "#, - ).unwrap(); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - } - - #[test] + #[ignore] // TODO(ry) re-enable? setAsyncHandler has been removed fn test_poll_async_delayed_ops() { run_in_task(|cx| { let (mut runtime, dispatch_count) = setup(Mode::Async); @@ -1714,8 +1641,8 @@ pub mod tests { "check1.js", r#" assert(nrecv == 0); - let control = new Uint8Array([42]); - Deno.core.send(1, control); + let control = 42; + Deno.core.send(1, null, control); assert(nrecv == 0); "#, ) @@ -1728,7 +1655,7 @@ pub mod tests { "check2.js", r#" assert(nrecv == 1); - Deno.core.send(1, control); + Deno.core.send(1, null, control); assert(nrecv == 1); "#, ) @@ -1743,6 +1670,7 @@ pub mod tests { } #[test] + #[ignore] // TODO(ry) re-enable? setAsyncHandler has been removed fn test_poll_async_optional_ops() { run_in_task(|cx| { let (mut runtime, dispatch_count) = setup(Mode::AsyncUnref); @@ -1754,8 +1682,8 @@ pub mod tests { // This handler will never be called assert(false); }); - let control = new Uint8Array([42]); - Deno.core.send(1, control); + let control = 42; + Deno.core.send(1, null, control); "#, ) .unwrap(); @@ -1818,261 +1746,9 @@ pub mod tests { } #[test] - fn overflow_req_sync() { - let (mut runtime, dispatch_count) = setup(Mode::OverflowReqSync); - runtime - .execute( - "overflow_req_sync.js", - r#" - let asyncRecv = 0; - Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ }); - // Large message that will overflow the shared space. - let control = new Uint8Array(100 * 1024 * 1024); - let response = Deno.core.dispatch(1, control); - assert(response instanceof Uint8Array); - assert(response.length == 1); - assert(response[0] == 43); - assert(asyncRecv == 0); - "#, - ) - .unwrap(); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - } - - #[test] - fn overflow_res_sync() { - // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We - // should optimize this. - let (mut runtime, dispatch_count) = setup(Mode::OverflowResSync); - runtime - .execute( - "overflow_res_sync.js", - r#" - let asyncRecv = 0; - Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ }); - // Large message that will overflow the shared space. - let control = new Uint8Array([42]); - let response = Deno.core.dispatch(1, control); - assert(response instanceof Uint8Array); - assert(response.length == 100 * 1024 * 1024); - assert(response[0] == 99); - assert(asyncRecv == 0); - "#, - ) - .unwrap(); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - } - - #[test] - fn overflow_req_async() { - run_in_task(|cx| { - let (mut runtime, dispatch_count) = setup(Mode::OverflowReqAsync); - runtime - .execute( - "overflow_req_async.js", - r#" - let asyncRecv = 0; - Deno.core.setAsyncHandler(1, (buf) => { - assert(buf.byteLength === 1); - assert(buf[0] === 43); - asyncRecv++; - }); - // Large message that will overflow the shared space. - let control = new Uint8Array(100 * 1024 * 1024); - let response = Deno.core.dispatch(1, control); - // Async messages always have null response. - assert(response == null); - assert(asyncRecv == 0); - "#, - ) - .unwrap(); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); - runtime - .execute("check.js", "assert(asyncRecv == 1);") - .unwrap(); - }); - } - - #[test] - fn overflow_res_async_combined_with_unref() { - run_in_task(|cx| { - let mut runtime = JsRuntime::new(Default::default()); - - runtime.register_op( - "test1", - |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op { - let mut vec = vec![0u8; 100 * 1024 * 1024]; - vec[0] = 4; - let buf = vec.into_boxed_slice(); - Op::Async(futures::future::ready(buf).boxed()) - }, - ); - - runtime.register_op( - "test2", - |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op { - let mut vec = vec![0u8; 100 * 1024 * 1024]; - vec[0] = 4; - let buf = vec.into_boxed_slice(); - Op::AsyncUnref(futures::future::ready(buf).boxed()) - }, - ); - - runtime - .execute( - "overflow_res_async_combined_with_unref.js", - r#" - function assert(cond) { - if (!cond) { - throw Error("assert"); - } - } - - let asyncRecv = 0; - Deno.core.setAsyncHandler(1, (buf) => { - assert(buf.byteLength === 100 * 1024 * 1024); - assert(buf[0] === 4); - asyncRecv++; - }); - Deno.core.setAsyncHandler(2, (buf) => { - assert(buf.byteLength === 100 * 1024 * 1024); - assert(buf[0] === 4); - asyncRecv++; - }); - let control = new Uint8Array(1); - let response1 = Deno.core.dispatch(1, control); - // Async messages always have null response. - assert(response1 == null); - assert(asyncRecv == 0); - let response2 = Deno.core.dispatch(2, control); - // Async messages always have null response. - assert(response2 == null); - assert(asyncRecv == 0); - "#, - ) - .unwrap(); - assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); - runtime - .execute("check.js", "assert(asyncRecv == 2);") - .unwrap(); - }); - } - - #[test] - fn overflow_res_async() { - run_in_task(|_cx| { - // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We - // should optimize this. - let (mut runtime, dispatch_count) = setup(Mode::OverflowResAsync); - runtime - .execute( - "overflow_res_async.js", - r#" - let asyncRecv = 0; - Deno.core.setAsyncHandler(1, (buf) => { - assert(buf.byteLength === 100 * 1024 * 1024); - assert(buf[0] === 4); - asyncRecv++; - }); - // Large message that will overflow the shared space. - let control = new Uint8Array([42]); - let response = Deno.core.dispatch(1, control); - assert(response == null); - assert(asyncRecv == 0); - "#, - ) - .unwrap(); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - poll_until_ready(&mut runtime, 3).unwrap(); - runtime - .execute("check.js", "assert(asyncRecv == 1);") - .unwrap(); - }); - } - - #[test] - fn overflow_res_multiple_dispatch_async() { - // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We - // should optimize this. - run_in_task(|_cx| { - let (mut runtime, dispatch_count) = setup(Mode::OverflowResAsync); - runtime - .execute( - "overflow_res_multiple_dispatch_async.js", - r#" - let asyncRecv = 0; - Deno.core.setAsyncHandler(1, (buf) => { - assert(buf.byteLength === 100 * 1024 * 1024); - assert(buf[0] === 4); - asyncRecv++; - }); - // Large message that will overflow the shared space. - let control = new Uint8Array([42]); - let response = Deno.core.dispatch(1, control); - assert(response == null); - assert(asyncRecv == 0); - // Dispatch another message to verify that pending ops - // are done even if shared space overflows - Deno.core.dispatch(1, control); - "#, - ) - .unwrap(); - assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); - poll_until_ready(&mut runtime, 3).unwrap(); - runtime - .execute("check.js", "assert(asyncRecv == 2);") - .unwrap(); - }); - } - - #[test] - fn shared_queue_not_empty_when_js_error() { - run_in_task(|_cx| { - let dispatch_count = Arc::new(AtomicUsize::new(0)); - let mut runtime = JsRuntime::new(Default::default()); - let op_state = runtime.op_state(); - op_state.borrow_mut().put(TestState { - mode: Mode::Async, - dispatch_count: dispatch_count.clone(), - }); - - runtime.register_op("test", dispatch); - runtime - .execute( - "shared_queue_not_empty_when_js_error.js", - r#" - const assert = (cond) => {if (!cond) throw Error("assert")}; - let asyncRecv = 0; - Deno.core.setAsyncHandler(1, (buf) => { - asyncRecv++; - throw Error('x'); - }); - - Deno.core.dispatch(1, new Uint8Array([42])); - Deno.core.dispatch(1, new Uint8Array([42])); - "#, - ) - .unwrap(); - - assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); - if poll_until_ready(&mut runtime, 3).is_ok() { - panic!("Thrown error was not detected!") - } - runtime - .execute("check.js", "assert(asyncRecv == 1);") - .unwrap(); - - let state_rc = JsRuntime::state(runtime.v8_isolate()); - let shared_queue_size = state_rc.borrow().shared.size(); - assert_eq!(shared_queue_size, 1); - }); - } - - #[test] fn test_pre_dispatch() { run_in_task(|mut cx| { - let (mut runtime, _dispatch_count) = setup(Mode::OverflowResAsync); + let (mut runtime, _dispatch_count) = setup(Mode::Async); runtime .execute( "bad_op_id.js", @@ -2094,19 +1770,6 @@ pub mod tests { } #[test] - fn core_test_js() { - run_in_task(|mut cx| { - let (mut runtime, _dispatch_count) = setup(Mode::Async); - runtime - .execute("core_test.js", include_str!("core_test.js")) - .unwrap(); - if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) { - unreachable!(); - } - }); - } - - #[test] fn syntax_error() { let mut runtime = JsRuntime::new(Default::default()); let src = "hocuspocus("; @@ -2315,13 +1978,12 @@ pub mod tests { let dispatch_count = Arc::new(AtomicUsize::new(0)); let dispatch_count_ = dispatch_count.clone(); - let dispatcher = move |_state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op { + let dispatcher = move |_state, payload: OpPayload, _buf| -> Op { dispatch_count_.fetch_add(1, Ordering::Relaxed); - assert_eq!(bufs.len(), 1); - assert_eq!(bufs[0].len(), 1); - assert_eq!(bufs[0][0], 42); - let buf = [43u8, 0, 0, 0][..].into(); - Op::Async(futures::future::ready(buf).boxed()) + let control: u8 = payload.deserialize().unwrap(); + assert_eq!(control, 42); + let resp = OpResponse::Value(Box::new(43)); + Op::Async(Box::pin(futures::future::ready(resp))) }; let mut runtime = JsRuntime::new(RuntimeOptions { @@ -2353,8 +2015,8 @@ pub mod tests { r#" import { b } from './b.js' if (b() != 'b') throw Error(); - let control = new Uint8Array([42]); - Deno.core.send(1, control); + let control = 42; + Deno.core.send(1, null, control); "#, ) .unwrap(); |