summaryrefslogtreecommitdiff
path: root/core/bindings.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/bindings.rs')
-rw-r--r--core/bindings.rs125
1 files changed, 125 insertions, 0 deletions
diff --git a/core/bindings.rs b/core/bindings.rs
index bee3ecf6d..143ccda9b 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -9,15 +9,18 @@ use crate::OpId;
use crate::OpPayload;
use crate::OpTable;
use crate::PromiseId;
+use crate::ResourceId;
use crate::ZeroCopyBuf;
use log::debug;
use rusty_v8 as v8;
use serde::Deserialize;
use serde::Serialize;
use serde_v8::to_v8;
+use std::cell::RefCell;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::option::Option;
+use std::rc::Rc;
use url::Url;
use v8::MapFnTo;
@@ -63,6 +66,12 @@ lazy_static::lazy_static! {
v8::ExternalReference {
function: call_console.map_fn_to(),
},
+ v8::ExternalReference {
+ function: set_wasm_streaming_callback.map_fn_to()
+ },
+ v8::ExternalReference {
+ function: wasm_streaming_feed.map_fn_to()
+ }
]);
}
@@ -140,6 +149,13 @@ pub fn initialize_context<'s>(
set_func(scope, core_val, "memoryUsage", memory_usage);
set_func(scope, core_val, "callConsole", call_console);
set_func(scope, core_val, "createHostObject", create_host_object);
+ set_func(
+ scope,
+ core_val,
+ "setWasmStreamingCallback",
+ set_wasm_streaming_callback,
+ );
+ set_func(scope, core_val, "wasmStreamingFeed", wasm_streaming_feed);
// Direct bindings on `window`.
set_func(scope, global, "queueMicrotask", queue_microtask);
@@ -514,6 +530,115 @@ fn call_console(
deno_console_method.call(scope, receiver.into(), &call_args);
}
+struct WasmStreamingResource(RefCell<v8::WasmStreaming>);
+impl crate::Resource for WasmStreamingResource {}
+
+fn set_wasm_streaming_callback(
+ scope: &mut v8::HandleScope,
+ args: v8::FunctionCallbackArguments,
+ _rv: v8::ReturnValue,
+) {
+ let state_rc = JsRuntime::state(scope);
+ let mut state = state_rc.borrow_mut();
+
+ let cb = match v8::Local::<v8::Function>::try_from(args.get(0)) {
+ Ok(cb) => cb,
+ Err(err) => return throw_type_error(scope, err.to_string()),
+ };
+
+ // The callback to pass to the v8 API has to be a unit type, so it can't
+ // borrow or move any local variables. Therefore, we're storing the JS
+ // callback in a JsRuntimeState slot.
+ if let slot @ None = &mut state.js_wasm_streaming_cb {
+ slot.replace(v8::Global::new(scope, cb));
+ } else {
+ return throw_type_error(
+ scope,
+ "Deno.core.setWasmStreamingCallback() already called",
+ );
+ }
+
+ scope.set_wasm_streaming_callback(|scope, arg, wasm_streaming| {
+ let (cb_handle, streaming_rid) = {
+ let state_rc = JsRuntime::state(scope);
+ let state = state_rc.borrow();
+ let cb_handle = state.js_wasm_streaming_cb.as_ref().unwrap().clone();
+ let streaming_rid = state
+ .op_state
+ .borrow_mut()
+ .resource_table
+ .add(WasmStreamingResource(RefCell::new(wasm_streaming)));
+ (cb_handle, streaming_rid)
+ };
+
+ let undefined = v8::undefined(scope);
+ let rid = serde_v8::to_v8(scope, streaming_rid).unwrap();
+ cb_handle
+ .get(scope)
+ .call(scope, undefined.into(), &[arg, rid]);
+ });
+}
+
+fn wasm_streaming_feed(
+ scope: &mut v8::HandleScope,
+ args: v8::FunctionCallbackArguments,
+ _rv: v8::ReturnValue,
+) {
+ #[derive(Deserialize)]
+ #[serde(rename_all = "snake_case")]
+ enum MessageType {
+ Bytes,
+ Abort,
+ Finish,
+ }
+
+ let rid: ResourceId = match serde_v8::from_v8(scope, args.get(0)) {
+ Ok(rid) => rid,
+ Err(_) => return throw_type_error(scope, "Invalid argument"),
+ };
+ let message_type = match serde_v8::from_v8(scope, args.get(1)) {
+ Ok(message_type) => message_type,
+ Err(_) => return throw_type_error(scope, "Invalid argument"),
+ };
+
+ let wasm_streaming = {
+ let state_rc = JsRuntime::state(scope);
+ let state = state_rc.borrow();
+ // If message_type is not Bytes, we'll be consuming the WasmStreaming
+ // instance, so let's also remove it from the resource table.
+ let wasm_streaming: Option<Rc<WasmStreamingResource>> = match message_type {
+ MessageType::Bytes => state.op_state.borrow().resource_table.get(rid),
+ _ => state.op_state.borrow_mut().resource_table.take(rid),
+ };
+ match wasm_streaming {
+ Some(wasm_streaming) => wasm_streaming,
+ None => return throw_type_error(scope, "Invalid resource ID."),
+ }
+ };
+
+ match message_type {
+ MessageType::Bytes => {
+ let bytes: ZeroCopyBuf = match serde_v8::from_v8(scope, args.get(2)) {
+ Ok(bytes) => bytes,
+ Err(_) => return throw_type_error(scope, "Invalid resource ID."),
+ };
+ wasm_streaming.0.borrow_mut().on_bytes_received(&bytes);
+ }
+ _ => {
+ // These types need to consume the WasmStreaming instance.
+ let wasm_streaming = match Rc::try_unwrap(wasm_streaming) {
+ Ok(streaming) => streaming.0.into_inner(),
+ Err(_) => panic!("Couldn't consume WasmStreamingResource."),
+ };
+ match message_type {
+ MessageType::Bytes => unreachable!(),
+ MessageType::Finish => wasm_streaming.finish(),
+ MessageType::Abort => wasm_streaming.abort(Some(args.get(2))),
+ }
+ }
+ }
+}
+
fn encode(
scope: &mut v8::HandleScope,
args: v8::FunctionCallbackArguments,