diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/bindings.rs | 73 | ||||
-rw-r--r-- | core/lib.deno_core.d.ts | 25 | ||||
-rw-r--r-- | core/ops_builtin.rs | 56 |
3 files changed, 67 insertions, 87 deletions
diff --git a/core/bindings.rs b/core/bindings.rs index 58de6a38a..5d9b0bdb8 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -9,7 +9,6 @@ use crate::OpId; use crate::OpPayload; use crate::OpTable; use crate::PromiseId; -use crate::ResourceId; use crate::ZeroCopyBuf; use log::debug; use rusty_v8 as v8; @@ -20,7 +19,6 @@ use std::cell::RefCell; use std::convert::TryFrom; use std::convert::TryInto; use std::option::Option; -use std::rc::Rc; use url::Url; use v8::HandleScope; use v8::Local; @@ -73,9 +71,6 @@ lazy_static::lazy_static! { }, v8::ExternalReference { function: set_wasm_streaming_callback.map_fn_to() - }, - v8::ExternalReference { - function: wasm_streaming_feed.map_fn_to() } ]); } @@ -160,8 +155,6 @@ pub fn initialize_context<'s>( "setWasmStreamingCallback", set_wasm_streaming_callback, ); - set_func(scope, core_val, "wasmStreamingFeed", wasm_streaming_feed); - // Direct bindings on `window`. set_func(scope, global, "queueMicrotask", queue_microtask); @@ -535,14 +528,13 @@ fn call_console( deno_console_method.call(scope, receiver.into(), &call_args); } -struct WasmStreamingResource(RefCell<v8::WasmStreaming>); -impl crate::Resource for WasmStreamingResource {} - fn set_wasm_streaming_callback( scope: &mut v8::HandleScope, args: v8::FunctionCallbackArguments, _rv: v8::ReturnValue, ) { + use crate::ops_builtin::WasmStreamingResource; + let state_rc = JsRuntime::state(scope); let mut state = state_rc.borrow_mut(); @@ -584,67 +576,6 @@ fn set_wasm_streaming_callback( }); } -fn wasm_streaming_feed( - scope: &mut v8::HandleScope, - args: v8::FunctionCallbackArguments, - _rv: v8::ReturnValue, -) { - #[derive(Deserialize)] - #[serde(rename_all = "snake_case")] - enum MessageType { - Bytes, - Abort, - Finish, - } - - let rid: ResourceId = match serde_v8::from_v8(scope, args.get(0)) { - Ok(rid) => rid, - Err(_) => return throw_type_error(scope, "Invalid argument"), - }; - let message_type = match serde_v8::from_v8(scope, args.get(1)) { - Ok(message_type) => message_type, - Err(_) => return throw_type_error(scope, "Invalid argument"), - }; - - let wasm_streaming = { - let state_rc = JsRuntime::state(scope); - let state = state_rc.borrow(); - // If message_type is not Bytes, we'll be consuming the WasmStreaming - // instance, so let's also remove it from the resource table. - let wasm_streaming: Result<Rc<WasmStreamingResource>, AnyError> = - match message_type { - MessageType::Bytes => state.op_state.borrow().resource_table.get(rid), - _ => state.op_state.borrow_mut().resource_table.take(rid), - }; - match wasm_streaming { - Ok(wasm_streaming) => wasm_streaming, - Err(e) => return throw_type_error(scope, e.to_string()), - } - }; - - match message_type { - MessageType::Bytes => { - let bytes: ZeroCopyBuf = match serde_v8::from_v8(scope, args.get(2)) { - Ok(bytes) => bytes, - Err(_) => return throw_type_error(scope, "Invalid resource ID."), - }; - wasm_streaming.0.borrow_mut().on_bytes_received(&bytes); - } - _ => { - // These types need to consume the WasmStreaming instance. - let wasm_streaming = match Rc::try_unwrap(wasm_streaming) { - Ok(streaming) => streaming.0.into_inner(), - Err(_) => panic!("Couldn't consume WasmStreamingResource."), - }; - match message_type { - MessageType::Bytes => unreachable!(), - MessageType::Finish => wasm_streaming.finish(), - MessageType::Abort => wasm_streaming.abort(Some(args.get(2))), - } - } - } -} - fn encode( scope: &mut v8::HandleScope, args: v8::FunctionCallbackArguments, diff --git a/core/lib.deno_core.d.ts b/core/lib.deno_core.d.ts index 46cf6b42c..efa138d77 100644 --- a/core/lib.deno_core.d.ts +++ b/core/lib.deno_core.d.ts @@ -56,25 +56,18 @@ declare namespace Deno { * (`WebAssembly.compileStreaming` and `WebAssembly.instantiateStreaming`) * are called in order to feed the source's bytes to the wasm compiler. * The callback is called with the source argument passed to the streaming - * APIs and an rid to use with `Deno.core.wasmStreamingFeed`. + * APIs and an rid to use with the wasm streaming ops. + * + * The callback should eventually invoke the following ops: + * - `op_wasm_streaming_feed`. Feeds bytes from the wasm resource to the + * compiler. Takes the rid and a `Uint8Array`. + * - `op_wasm_streaming_abort`. Aborts the wasm compilation. Takes the rid + * and an exception. Invalidates the resource. + * - To indicate the end of the resource, use `Deno.core.close()` with the + * rid. */ function setWasmStreamingCallback( cb: (source: any, rid: number) => void, ): void; - - /** - * Affect the state of the WebAssembly streaming compiler, by either passing - * it bytes, aborting it, or indicating that all bytes were received. - * `rid` must be a resource ID that was passed to the callback set with - * `Deno.core.setWasmStreamingCallback`. Calling this function with `type` - * set to either "abort" or "finish" invalidates the rid. - */ - function wasmStreamingFeed( - rid: number, - type: "bytes", - bytes: Uint8Array, - ): void; - function wasmStreamingFeed(rid: number, type: "abort", error: any): void; - function wasmStreamingFeed(rid: number, type: "finish"): void; } } diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs index 06de59054..e1313fa32 100644 --- a/core/ops_builtin.rs +++ b/core/ops_builtin.rs @@ -5,7 +5,11 @@ use crate::op_sync; use crate::resources::ResourceId; use crate::Extension; use crate::OpState; +use crate::Resource; +use crate::ZeroCopyBuf; +use std::cell::RefCell; use std::io::{stderr, stdout, Write}; +use std::rc::Rc; pub(crate) fn init_builtins() -> Extension { Extension::builder() @@ -20,6 +24,8 @@ pub(crate) fn init_builtins() -> Extension { ("op_try_close", op_sync(op_try_close)), ("op_print", op_sync(op_print)), ("op_resources", op_sync(op_resources)), + ("op_wasm_streaming_feed", op_sync(op_wasm_streaming_feed)), + ("op_wasm_streaming_abort", op_sync(op_wasm_streaming_abort)), ]) .build() } @@ -81,3 +87,53 @@ pub fn op_print( } Ok(()) } + +pub struct WasmStreamingResource(pub(crate) RefCell<rusty_v8::WasmStreaming>); + +impl Resource for WasmStreamingResource { + fn close(self: Rc<Self>) { + // At this point there are no clones of Rc<WasmStreamingResource> on the + // resource table, and no one should own a reference outside of the stack. + // Therefore, we can be sure `self` is the only reference. + if let Ok(wsr) = Rc::try_unwrap(self) { + wsr.0.into_inner().finish(); + } else { + panic!("Couldn't consume WasmStreamingResource."); + } + } +} + +/// Feed bytes to WasmStreamingResource. +pub fn op_wasm_streaming_feed( + state: &mut OpState, + rid: ResourceId, + bytes: ZeroCopyBuf, +) -> Result<(), AnyError> { + let wasm_streaming = + state.resource_table.get::<WasmStreamingResource>(rid)?; + + wasm_streaming.0.borrow_mut().on_bytes_received(&bytes); + + Ok(()) +} + +/// Abort a WasmStreamingResource. +pub fn op_wasm_streaming_abort( + state: &mut OpState, + rid: ResourceId, + exception: serde_v8::Value, +) -> Result<(), AnyError> { + let wasm_streaming = + state.resource_table.take::<WasmStreamingResource>(rid)?; + + // At this point there are no clones of Rc<WasmStreamingResource> on the + // resource table, and no one should own a reference because we're never + // cloning them. So we can be sure `wasm_streaming` is the only reference. + if let Ok(wsr) = Rc::try_unwrap(wasm_streaming) { + wsr.0.into_inner().abort(Some(exception.v8_value)); + } else { + panic!("Couldn't consume WasmStreamingResource."); + } + + Ok(()) +} |