diff options
Diffstat (limited to 'core/bindings.rs')
-rw-r--r-- | core/bindings.rs | 125 |
1 files changed, 125 insertions, 0 deletions
diff --git a/core/bindings.rs b/core/bindings.rs index bee3ecf6d..143ccda9b 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -9,15 +9,18 @@ use crate::OpId; use crate::OpPayload; use crate::OpTable; use crate::PromiseId; +use crate::ResourceId; use crate::ZeroCopyBuf; use log::debug; use rusty_v8 as v8; use serde::Deserialize; use serde::Serialize; use serde_v8::to_v8; +use std::cell::RefCell; use std::convert::TryFrom; use std::convert::TryInto; use std::option::Option; +use std::rc::Rc; use url::Url; use v8::MapFnTo; @@ -63,6 +66,12 @@ lazy_static::lazy_static! { v8::ExternalReference { function: call_console.map_fn_to(), }, + v8::ExternalReference { + function: set_wasm_streaming_callback.map_fn_to() + }, + v8::ExternalReference { + function: wasm_streaming_feed.map_fn_to() + } ]); } @@ -140,6 +149,13 @@ pub fn initialize_context<'s>( set_func(scope, core_val, "memoryUsage", memory_usage); set_func(scope, core_val, "callConsole", call_console); set_func(scope, core_val, "createHostObject", create_host_object); + set_func( + scope, + core_val, + "setWasmStreamingCallback", + set_wasm_streaming_callback, + ); + set_func(scope, core_val, "wasmStreamingFeed", wasm_streaming_feed); // Direct bindings on `window`. set_func(scope, global, "queueMicrotask", queue_microtask); @@ -514,6 +530,115 @@ fn call_console( deno_console_method.call(scope, receiver.into(), &call_args); } +struct WasmStreamingResource(RefCell<v8::WasmStreaming>); +impl crate::Resource for WasmStreamingResource {} + +fn set_wasm_streaming_callback( + scope: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + _rv: v8::ReturnValue, +) { + let state_rc = JsRuntime::state(scope); + let mut state = state_rc.borrow_mut(); + + let cb = match v8::Local::<v8::Function>::try_from(args.get(0)) { + Ok(cb) => cb, + Err(err) => return throw_type_error(scope, err.to_string()), + }; + + // The callback to pass to the v8 API has to be a unit type, so it can't + // borrow or move any local variables. Therefore, we're storing the JS + // callback in a JsRuntimeState slot. + if let slot @ None = &mut state.js_wasm_streaming_cb { + slot.replace(v8::Global::new(scope, cb)); + } else { + return throw_type_error( + scope, + "Deno.core.setWasmStreamingCallback() already called", + ); + } + + scope.set_wasm_streaming_callback(|scope, arg, wasm_streaming| { + let (cb_handle, streaming_rid) = { + let state_rc = JsRuntime::state(scope); + let state = state_rc.borrow(); + let cb_handle = state.js_wasm_streaming_cb.as_ref().unwrap().clone(); + let streaming_rid = state + .op_state + .borrow_mut() + .resource_table + .add(WasmStreamingResource(RefCell::new(wasm_streaming))); + (cb_handle, streaming_rid) + }; + + let undefined = v8::undefined(scope); + let rid = serde_v8::to_v8(scope, streaming_rid).unwrap(); + cb_handle + .get(scope) + .call(scope, undefined.into(), &[arg, rid]); + }); +} + +fn wasm_streaming_feed( + scope: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + _rv: v8::ReturnValue, +) { + #[derive(Deserialize)] + #[serde(rename_all = "snake_case")] + enum MessageType { + Bytes, + Abort, + Finish, + } + + let rid: ResourceId = match serde_v8::from_v8(scope, args.get(0)) { + Ok(rid) => rid, + Err(_) => return throw_type_error(scope, "Invalid argument"), + }; + let message_type = match serde_v8::from_v8(scope, args.get(1)) { + Ok(message_type) => message_type, + Err(_) => return throw_type_error(scope, "Invalid argument"), + }; + + let wasm_streaming = { + let state_rc = JsRuntime::state(scope); + let state = state_rc.borrow(); + // If message_type is not Bytes, we'll be consuming the WasmStreaming + // instance, so let's also remove it from the resource table. + let wasm_streaming: Option<Rc<WasmStreamingResource>> = match message_type { + MessageType::Bytes => state.op_state.borrow().resource_table.get(rid), + _ => state.op_state.borrow_mut().resource_table.take(rid), + }; + match wasm_streaming { + Some(wasm_streaming) => wasm_streaming, + None => return throw_type_error(scope, "Invalid resource ID."), + } + }; + + match message_type { + MessageType::Bytes => { + let bytes: ZeroCopyBuf = match serde_v8::from_v8(scope, args.get(2)) { + Ok(bytes) => bytes, + Err(_) => return throw_type_error(scope, "Invalid resource ID."), + }; + wasm_streaming.0.borrow_mut().on_bytes_received(&bytes); + } + _ => { + // These types need to consume the WasmStreaming instance. + let wasm_streaming = match Rc::try_unwrap(wasm_streaming) { + Ok(streaming) => streaming.0.into_inner(), + Err(_) => panic!("Couldn't consume WasmStreamingResource."), + }; + match message_type { + MessageType::Bytes => unreachable!(), + MessageType::Finish => wasm_streaming.finish(), + MessageType::Abort => wasm_streaming.abort(Some(args.get(2))), + } + } + } +} + fn encode( scope: &mut v8::HandleScope, args: v8::FunctionCallbackArguments, |