summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/bindings.rs73
-rw-r--r--core/lib.deno_core.d.ts25
-rw-r--r--core/ops_builtin.rs56
3 files changed, 67 insertions, 87 deletions
diff --git a/core/bindings.rs b/core/bindings.rs
index 58de6a38a..5d9b0bdb8 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -9,7 +9,6 @@ use crate::OpId;
use crate::OpPayload;
use crate::OpTable;
use crate::PromiseId;
-use crate::ResourceId;
use crate::ZeroCopyBuf;
use log::debug;
use rusty_v8 as v8;
@@ -20,7 +19,6 @@ use std::cell::RefCell;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::option::Option;
-use std::rc::Rc;
use url::Url;
use v8::HandleScope;
use v8::Local;
@@ -73,9 +71,6 @@ lazy_static::lazy_static! {
},
v8::ExternalReference {
function: set_wasm_streaming_callback.map_fn_to()
- },
- v8::ExternalReference {
- function: wasm_streaming_feed.map_fn_to()
}
]);
}
@@ -160,8 +155,6 @@ pub fn initialize_context<'s>(
"setWasmStreamingCallback",
set_wasm_streaming_callback,
);
- set_func(scope, core_val, "wasmStreamingFeed", wasm_streaming_feed);
-
// Direct bindings on `window`.
set_func(scope, global, "queueMicrotask", queue_microtask);
@@ -535,14 +528,13 @@ fn call_console(
deno_console_method.call(scope, receiver.into(), &call_args);
}
-struct WasmStreamingResource(RefCell<v8::WasmStreaming>);
-impl crate::Resource for WasmStreamingResource {}
-
fn set_wasm_streaming_callback(
scope: &mut v8::HandleScope,
args: v8::FunctionCallbackArguments,
_rv: v8::ReturnValue,
) {
+ use crate::ops_builtin::WasmStreamingResource;
+
let state_rc = JsRuntime::state(scope);
let mut state = state_rc.borrow_mut();
@@ -584,67 +576,6 @@ fn set_wasm_streaming_callback(
});
}
-fn wasm_streaming_feed(
- scope: &mut v8::HandleScope,
- args: v8::FunctionCallbackArguments,
- _rv: v8::ReturnValue,
-) {
- #[derive(Deserialize)]
- #[serde(rename_all = "snake_case")]
- enum MessageType {
- Bytes,
- Abort,
- Finish,
- }
-
- let rid: ResourceId = match serde_v8::from_v8(scope, args.get(0)) {
- Ok(rid) => rid,
- Err(_) => return throw_type_error(scope, "Invalid argument"),
- };
- let message_type = match serde_v8::from_v8(scope, args.get(1)) {
- Ok(message_type) => message_type,
- Err(_) => return throw_type_error(scope, "Invalid argument"),
- };
-
- let wasm_streaming = {
- let state_rc = JsRuntime::state(scope);
- let state = state_rc.borrow();
- // If message_type is not Bytes, we'll be consuming the WasmStreaming
- // instance, so let's also remove it from the resource table.
- let wasm_streaming: Result<Rc<WasmStreamingResource>, AnyError> =
- match message_type {
- MessageType::Bytes => state.op_state.borrow().resource_table.get(rid),
- _ => state.op_state.borrow_mut().resource_table.take(rid),
- };
- match wasm_streaming {
- Ok(wasm_streaming) => wasm_streaming,
- Err(e) => return throw_type_error(scope, e.to_string()),
- }
- };
-
- match message_type {
- MessageType::Bytes => {
- let bytes: ZeroCopyBuf = match serde_v8::from_v8(scope, args.get(2)) {
- Ok(bytes) => bytes,
- Err(_) => return throw_type_error(scope, "Invalid resource ID."),
- };
- wasm_streaming.0.borrow_mut().on_bytes_received(&bytes);
- }
- _ => {
- // These types need to consume the WasmStreaming instance.
- let wasm_streaming = match Rc::try_unwrap(wasm_streaming) {
- Ok(streaming) => streaming.0.into_inner(),
- Err(_) => panic!("Couldn't consume WasmStreamingResource."),
- };
- match message_type {
- MessageType::Bytes => unreachable!(),
- MessageType::Finish => wasm_streaming.finish(),
- MessageType::Abort => wasm_streaming.abort(Some(args.get(2))),
- }
- }
- }
-}
-
fn encode(
scope: &mut v8::HandleScope,
args: v8::FunctionCallbackArguments,
diff --git a/core/lib.deno_core.d.ts b/core/lib.deno_core.d.ts
index 46cf6b42c..efa138d77 100644
--- a/core/lib.deno_core.d.ts
+++ b/core/lib.deno_core.d.ts
@@ -56,25 +56,18 @@ declare namespace Deno {
* (`WebAssembly.compileStreaming` and `WebAssembly.instantiateStreaming`)
* are called in order to feed the source's bytes to the wasm compiler.
* The callback is called with the source argument passed to the streaming
- * APIs and an rid to use with `Deno.core.wasmStreamingFeed`.
+ * APIs and an rid to use with the wasm streaming ops.
+ *
+ * The callback should eventually invoke the following ops:
+ * - `op_wasm_streaming_feed`. Feeds bytes from the wasm resource to the
+ * compiler. Takes the rid and a `Uint8Array`.
+ * - `op_wasm_streaming_abort`. Aborts the wasm compilation. Takes the rid
+ * and an exception. Invalidates the resource.
+ * - To indicate the end of the resource, use `Deno.core.close()` with the
+ * rid.
*/
function setWasmStreamingCallback(
cb: (source: any, rid: number) => void,
): void;
-
- /**
- * Affect the state of the WebAssembly streaming compiler, by either passing
- * it bytes, aborting it, or indicating that all bytes were received.
- * `rid` must be a resource ID that was passed to the callback set with
- * `Deno.core.setWasmStreamingCallback`. Calling this function with `type`
- * set to either "abort" or "finish" invalidates the rid.
- */
- function wasmStreamingFeed(
- rid: number,
- type: "bytes",
- bytes: Uint8Array,
- ): void;
- function wasmStreamingFeed(rid: number, type: "abort", error: any): void;
- function wasmStreamingFeed(rid: number, type: "finish"): void;
}
}
diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs
index 06de59054..e1313fa32 100644
--- a/core/ops_builtin.rs
+++ b/core/ops_builtin.rs
@@ -5,7 +5,11 @@ use crate::op_sync;
use crate::resources::ResourceId;
use crate::Extension;
use crate::OpState;
+use crate::Resource;
+use crate::ZeroCopyBuf;
+use std::cell::RefCell;
use std::io::{stderr, stdout, Write};
+use std::rc::Rc;
pub(crate) fn init_builtins() -> Extension {
Extension::builder()
@@ -20,6 +24,8 @@ pub(crate) fn init_builtins() -> Extension {
("op_try_close", op_sync(op_try_close)),
("op_print", op_sync(op_print)),
("op_resources", op_sync(op_resources)),
+ ("op_wasm_streaming_feed", op_sync(op_wasm_streaming_feed)),
+ ("op_wasm_streaming_abort", op_sync(op_wasm_streaming_abort)),
])
.build()
}
@@ -81,3 +87,53 @@ pub fn op_print(
}
Ok(())
}
+
+pub struct WasmStreamingResource(pub(crate) RefCell<rusty_v8::WasmStreaming>);
+
+impl Resource for WasmStreamingResource {
+ fn close(self: Rc<Self>) {
+ // At this point there are no clones of Rc<WasmStreamingResource> on the
+ // resource table, and no one should own a reference outside of the stack.
+ // Therefore, we can be sure `self` is the only reference.
+ if let Ok(wsr) = Rc::try_unwrap(self) {
+ wsr.0.into_inner().finish();
+ } else {
+ panic!("Couldn't consume WasmStreamingResource.");
+ }
+ }
+}
+
+/// Feed bytes to WasmStreamingResource.
+pub fn op_wasm_streaming_feed(
+ state: &mut OpState,
+ rid: ResourceId,
+ bytes: ZeroCopyBuf,
+) -> Result<(), AnyError> {
+ let wasm_streaming =
+ state.resource_table.get::<WasmStreamingResource>(rid)?;
+
+ wasm_streaming.0.borrow_mut().on_bytes_received(&bytes);
+
+ Ok(())
+}
+
+/// Abort a WasmStreamingResource.
+pub fn op_wasm_streaming_abort(
+ state: &mut OpState,
+ rid: ResourceId,
+ exception: serde_v8::Value,
+) -> Result<(), AnyError> {
+ let wasm_streaming =
+ state.resource_table.take::<WasmStreamingResource>(rid)?;
+
+ // At this point there are no clones of Rc<WasmStreamingResource> on the
+ // resource table, and no one should own a reference because we're never
+ // cloning them. So we can be sure `wasm_streaming` is the only reference.
+ if let Ok(wsr) = Rc::try_unwrap(wasm_streaming) {
+ wsr.0.into_inner().abort(Some(exception.v8_value));
+ } else {
+ panic!("Couldn't consume WasmStreamingResource.");
+ }
+
+ Ok(())
+}