summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Ramlot <42113979+inteon@users.noreply.github.com>2021-05-11 21:09:09 +0200
committerGitHub <noreply@github.com>2021-05-11 21:09:09 +0200
commit635253bd3a3895f49e6c9606beb852da22fee205 (patch)
treecec9d75354b4e985a376f888564ecb63c99f2643
parent0d319161bc19a520df653bc0c8386f14a68efbdb (diff)
feat(runtime/worker): Structured cloning worker message passing (#9323)
This commit upgrade "Worker.postMessage()" implementation to use structured clone algorithm instead of non-spec compliant JSON serialization.
-rw-r--r--cli/bench/main.rs2
-rw-r--r--cli/tests/workers/bench_large_message.ts (renamed from cli/tests/workers_large_message_bench.ts)12
-rw-r--r--cli/tests/workers/racy_worker.js34
-rw-r--r--cli/tests/workers/test.ts40
-rw-r--r--cli/tests/workers/worker_large_message.js (renamed from cli/tests/workers/large_message_worker.js)0
-rw-r--r--cli/tests/workers/worker_structured_cloning.ts15
-rw-r--r--core/bindings.rs2
-rw-r--r--runtime/js/11_workers.js102
-rw-r--r--runtime/js/99_main.js103
-rw-r--r--runtime/ops/web_worker.rs92
-rw-r--r--runtime/ops/worker_host.rs125
-rw-r--r--runtime/web_worker.rs309
-rw-r--r--runtime/worker.rs2
-rw-r--r--serde_v8/src/magic/buffer.rs17
14 files changed, 454 insertions, 401 deletions
diff --git a/cli/bench/main.rs b/cli/bench/main.rs
index b98a9d141..e15f76277 100644
--- a/cli/bench/main.rs
+++ b/cli/bench/main.rs
@@ -75,7 +75,7 @@ const EXEC_TIME_BENCHMARKS: &[(&str, &[&str], Option<i32>)] = &[
&[
"run",
"--allow-read",
- "cli/tests/workers_large_message_bench.ts",
+ "cli/tests/workers/bench_large_message.ts",
],
None,
),
diff --git a/cli/tests/workers_large_message_bench.ts b/cli/tests/workers/bench_large_message.ts
index 9cda5a40d..53076e711 100644
--- a/cli/tests/workers_large_message_bench.ts
+++ b/cli/tests/workers/bench_large_message.ts
@@ -1,14 +1,10 @@
// Copyright 2020 the Deno authors. All rights reserved. MIT license.
-// deno-lint-ignore-file
-
-import { deferred } from "../../test_util/std/async/deferred.ts";
-
-function oneWorker(i: any): Promise<void> {
+function oneWorker(i: number) {
return new Promise<void>((resolve) => {
let countDown = 10;
const worker = new Worker(
- new URL("workers/large_message_worker.js", import.meta.url).href,
+ new URL("worker_large_message.js", import.meta.url).href,
{ type: "module" },
);
worker.onmessage = (e): void => {
@@ -23,8 +19,8 @@ function oneWorker(i: any): Promise<void> {
});
}
-function bench(): Promise<any> {
- let promises = [];
+function bench() {
+ const promises = [];
for (let i = 0; i < 50; i++) {
promises.push(oneWorker(i));
}
diff --git a/cli/tests/workers/racy_worker.js b/cli/tests/workers/racy_worker.js
index 83756b791..0f66c6278 100644
--- a/cli/tests/workers/racy_worker.js
+++ b/cli/tests/workers/racy_worker.js
@@ -1,21 +1,25 @@
// See issue for details
// https://github.com/denoland/deno/issues/4080
//
-// After first call to `postMessage() this worker schedules
-// [close(), postMessage()] ops on the same turn of microtask queue
-// (because message is rather big).
-// Only single `postMessage()` call should make it
-// to host, ie. after calling `close()` no more code should be run.
+// After first received message, this worker schedules
+// [assert(), close(), assert()] ops on the same turn of microtask queue
+// All tasks after close should not make it
-setTimeout(() => {
- close();
-}, 50);
-
-while (true) {
- await new Promise((done) => {
+onmessage = async function () {
+ let stage = 0;
+ await new Promise((_) => {
+ setTimeout(() => {
+ if (stage !== 0) throw "Unexpected stage";
+ stage = 1;
+ }, 50);
+ setTimeout(() => {
+ if (stage !== 1) throw "Unexpected stage";
+ stage = 2;
+ postMessage("DONE");
+ close();
+ }, 50);
setTimeout(() => {
- postMessage({ buf: new Array(999999) });
- done();
- });
+ throw "This should not be run";
+ }, 50);
});
-}
+};
diff --git a/cli/tests/workers/test.ts b/cli/tests/workers/test.ts
index c3ccebfbb..41988d204 100644
--- a/cli/tests/workers/test.ts
+++ b/cli/tests/workers/test.ts
@@ -198,15 +198,12 @@ Deno.test({
);
racyWorker.onmessage = (e): void => {
- assertEquals(e.data.buf.length, 999999);
- racyWorker.onmessage = (_e): void => {
- throw new Error("unreachable");
- };
setTimeout(() => {
promise.resolve();
}, 100);
};
+ racyWorker.postMessage("START");
await promise;
},
});
@@ -726,3 +723,38 @@ Deno.test({
worker.terminate();
},
});
+
+Deno.test({
+ name: "structured cloning postMessage",
+ fn: async function (): Promise<void> {
+ const result = deferred();
+ const worker = new Worker(
+ new URL("worker_structured_cloning.ts", import.meta.url).href,
+ { type: "module" },
+ );
+
+ worker.onmessage = (e): void => {
+ // self field should reference itself (circular ref)
+ const value = e.data.self.self.self;
+
+ // fields a and b refer to the same array
+ assertEquals(value.a, ["a", true, 432]);
+ assertEquals(value.a, ["a", true, 432]);
+ value.b[0] = "b";
+ value.a[2] += 5;
+ assertEquals(value.a, ["b", true, 437]);
+ assertEquals(value.b, ["b", true, 437]);
+
+ const len = value.c.size;
+ value.c.add(1); // This value is already in the set.
+ value.c.add(2);
+ assertEquals(len + 1, value.c.size);
+
+ result.resolve();
+ };
+
+ worker.postMessage("START");
+ await result;
+ worker.terminate();
+ },
+});
diff --git a/cli/tests/workers/large_message_worker.js b/cli/tests/workers/worker_large_message.js
index f7b7da8a0..f7b7da8a0 100644
--- a/cli/tests/workers/large_message_worker.js
+++ b/cli/tests/workers/worker_large_message.js
diff --git a/cli/tests/workers/worker_structured_cloning.ts b/cli/tests/workers/worker_structured_cloning.ts
new file mode 100644
index 000000000..eb1719a9a
--- /dev/null
+++ b/cli/tests/workers/worker_structured_cloning.ts
@@ -0,0 +1,15 @@
+// More info on structured cloning can be found here:
+// https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
+
+self.onmessage = () => {
+ const arr = ["a", true, 432];
+ const set = new Set([1, 3, 5, 7, 9]);
+ const selfReference = {
+ a: arr,
+ b: arr,
+ c: set,
+ };
+ // deno-lint-ignore no-explicit-any
+ (selfReference as any).self = selfReference;
+ self.postMessage(selfReference);
+};
diff --git a/core/bindings.rs b/core/bindings.rs
index edf115d27..f6c94b335 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -553,7 +553,7 @@ fn deserialize(
match value {
Some(deserialized) => rv.set(deserialized),
None => {
- let msg = v8::String::new(scope, "string too long").unwrap();
+ let msg = v8::String::new(scope, "could not deserialize value").unwrap();
let exception = v8::Exception::range_error(scope, msg);
scope.throw_exception(exception);
}
diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js
index 508dd46d4..dca83c818 100644
--- a/runtime/js/11_workers.js
+++ b/runtime/js/11_workers.js
@@ -39,26 +39,8 @@
return core.opAsync("op_host_get_message", id);
}
- const encoder = new TextEncoder();
const decoder = new TextDecoder();
- function encodeMessage(data) {
- const dataJson = JSON.stringify(data);
- return encoder.encode(dataJson);
- }
-
- function decodeMessage(dataIntArray) {
- // Temporary solution until structured clone arrives in v8.
- // Current clone is made by parsing json to byte array and from byte array back to json.
- // In that case "undefined" transforms to empty byte array, but empty byte array does not transform back to undefined.
- // Thats why this special is statement is needed.
- if (dataIntArray.length == 0) {
- return undefined;
- }
- const dataJson = decoder.decode(dataIntArray);
- return JSON.parse(dataJson);
- }
-
/**
* @param {string} permission
* @return {boolean}
@@ -211,18 +193,7 @@
this.#poll();
}
- #handleMessage = (msgData) => {
- let data;
- try {
- data = decodeMessage(new Uint8Array(msgData));
- } catch (e) {
- const msgErrorEvent = new MessageEvent("messageerror", {
- cancelable: false,
- data,
- });
- return;
- }
-
+ #handleMessage = (data) => {
const msgEvent = new MessageEvent("message", {
cancelable: false,
data,
@@ -253,56 +224,44 @@
#poll = async () => {
while (!this.#terminated) {
- const event = await hostGetMessage(this.#id);
+ const [type, data] = await hostGetMessage(this.#id);
// If terminate was called then we ignore all messages
if (this.#terminated) {
return;
}
- const type = event.type;
-
- if (type === "terminalError") {
- this.#terminated = true;
- if (!this.#handleError(event.error)) {
- if (globalThis instanceof Window) {
- throw new Error("Unhandled error event reached main worker.");
- } else {
- core.opSync(
- "op_host_unhandled_error",
- event.error.message,
- );
- }
+ switch (type) {
+ case 0: { // Message
+ const msg = core.deserialize(data);
+ this.#handleMessage(msg);
+ break;
}
- continue;
- }
-
- if (type === "msg") {
- this.#handleMessage(event.data);
- continue;
- }
-
- if (type === "error") {
- if (!this.#handleError(event.error)) {
- if (globalThis instanceof Window) {
- throw new Error("Unhandled error event reached main worker.");
- } else {
- core.opSync(
- "op_host_unhandled_error",
- event.error.message,
- );
+ case 1: { // TerminalError
+ this.#terminated = true;
+ } /* falls through */
+ case 2: { // Error
+ if (!this.#handleError(data)) {
+ if (globalThis instanceof Window) {
+ throw new Error("Unhandled error event reached main worker.");
+ } else {
+ core.opSync(
+ "op_worker_unhandled_error",
+ data.message,
+ );
+ }
}
+ break;
+ }
+ case 3: { // Close
+ log(`Host got "close" message from worker: ${this.#name}`);
+ this.#terminated = true;
+ return;
+ }
+ default: {
+ throw new Error(`Unknown worker event: "${type}"`);
}
- continue;
- }
-
- if (type === "close") {
- log(`Host got "close" message from worker: ${this.#name}`);
- this.#terminated = true;
- return;
}
-
- throw new Error(`Unknown worker event: "${type}"`);
}
};
@@ -317,7 +276,8 @@
return;
}
- hostPostMessage(this.#id, encodeMessage(message));
+ const bufferMsg = core.serialize(message);
+ hostPostMessage(this.#id, bufferMsg);
}
terminate() {
diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js
index d2926bb1f..082c83593 100644
--- a/runtime/js/99_main.js
+++ b/runtime/js/99_main.js
@@ -67,7 +67,7 @@ delete Object.prototype.__proto__;
}
isClosing = true;
- opCloseWorker();
+ core.opSync("op_worker_close");
}
// TODO(bartlomieju): remove these functions
@@ -76,68 +76,64 @@ delete Object.prototype.__proto__;
const onerror = () => {};
function postMessage(data) {
- const dataJson = JSON.stringify(data);
- const dataIntArray = encoder.encode(dataJson);
- opPostMessage(dataIntArray);
+ const dataIntArray = core.serialize(data);
+ core.opSync("op_worker_post_message", null, dataIntArray);
}
let isClosing = false;
- async function workerMessageRecvCallback(data) {
- const msgEvent = new MessageEvent("message", {
- cancelable: false,
- data,
- });
-
- try {
- if (globalThis["onmessage"]) {
- const result = globalThis.onmessage(msgEvent);
- if (result && "then" in result) {
- await result;
- }
- }
- globalThis.dispatchEvent(msgEvent);
- } catch (e) {
- let handled = false;
-
- const errorEvent = new ErrorEvent("error", {
- cancelable: true,
- message: e.message,
- lineno: e.lineNumber ? e.lineNumber + 1 : undefined,
- colno: e.columnNumber ? e.columnNumber + 1 : undefined,
- filename: e.fileName,
- error: null,
+ async function pollForMessages() {
+ while (!isClosing) {
+ const bufferMsg = await core.opAsync("op_worker_get_message");
+ const data = core.deserialize(bufferMsg);
+
+ const msgEvent = new MessageEvent("message", {
+ cancelable: false,
+ data,
});
- if (globalThis["onerror"]) {
- const ret = globalThis.onerror(
- e.message,
- e.fileName,
- e.lineNumber,
- e.columnNumber,
- e,
- );
- handled = ret === true;
- }
+ try {
+ if (globalThis.onmessage) {
+ await globalThis.onmessage(msgEvent);
+ }
+ globalThis.dispatchEvent(msgEvent);
+ } catch (e) {
+ let handled = false;
+
+ const errorEvent = new ErrorEvent("error", {
+ cancelable: true,
+ message: e.message,
+ lineno: e.lineNumber ? e.lineNumber + 1 : undefined,
+ colno: e.columnNumber ? e.columnNumber + 1 : undefined,
+ filename: e.fileName,
+ error: null,
+ });
+
+ if (globalThis["onerror"]) {
+ const ret = globalThis.onerror(
+ e.message,
+ e.fileName,
+ e.lineNumber,
+ e.columnNumber,
+ e,
+ );
+ handled = ret === true;
+ }
- globalThis.dispatchEvent(errorEvent);
- if (errorEvent.defaultPrevented) {
- handled = true;
- }
+ globalThis.dispatchEvent(errorEvent);
+ if (errorEvent.defaultPrevented) {
+ handled = true;
+ }
- if (!handled) {
- throw e;
+ if (!handled) {
+ core.opSync(
+ "op_worker_unhandled_error",
+ e.message,
+ );
+ }
}
}
}
- function opPostMessage(data) {
- core.opSync("op_worker_post_message", null, data);
- }
-
- function opCloseWorker() {
- core.opSync("op_worker_close");
- }
-
function opMainModule() {
return core.opSync("op_main_module");
}
@@ -395,7 +391,6 @@ delete Object.prototype.__proto__;
// TODO(bartlomieju): should be readonly?
close: util.nonEnumerable(workerClose),
postMessage: util.writable(postMessage),
- workerMessageRecvCallback: util.nonEnumerable(workerMessageRecvCallback),
};
let hasBootstrapped = false;
@@ -506,6 +501,8 @@ delete Object.prototype.__proto__;
location.setLocationHref(locationHref);
registerErrors();
+ pollForMessages();
+
const internalSymbol = Symbol("Deno.internal");
const finalDenoNs = {
diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs
index 1689b2587..e3ede869d 100644
--- a/runtime/ops/web_worker.rs
+++ b/runtime/ops/web_worker.rs
@@ -1,41 +1,85 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use crate::web_worker::WebWorkerHandle;
+use crate::web_worker::WebWorkerInternalHandle;
use crate::web_worker::WorkerEvent;
+use deno_core::error::generic_error;
use deno_core::error::null_opbuf;
-use deno_core::futures::channel::mpsc;
+use deno_core::error::AnyError;
+use deno_core::op_async;
use deno_core::op_sync;
use deno_core::Extension;
+use deno_core::OpState;
use deno_core::ZeroCopyBuf;
+use std::cell::RefCell;
+use std::rc::Rc;
pub fn init() -> Extension {
Extension::builder()
.ops(vec![
- (
- "op_worker_post_message",
- op_sync(move |state, _args: (), buf: Option<ZeroCopyBuf>| {
- let buf = buf.ok_or_else(null_opbuf)?;
- let msg_buf: Box<[u8]> = (*buf).into();
- let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
- sender
- .try_send(WorkerEvent::Message(msg_buf))
- .expect("Failed to post message to host");
- Ok(())
- }),
- ),
+ ("op_worker_post_message", op_sync(op_worker_post_message)),
+ ("op_worker_get_message", op_async(op_worker_get_message)),
// Notify host that guest worker closes.
+ ("op_worker_close", op_sync(op_worker_close)),
+ // Notify host that guest worker has unhandled error.
(
- "op_worker_close",
- op_sync(move |state, _: (), _: ()| {
- // Notify parent that we're finished
- let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
- sender.close_channel();
- // Terminate execution of current worker
- let handle = state.borrow::<WebWorkerHandle>();
- handle.terminate();
- Ok(())
- }),
+ "op_worker_unhandled_error",
+ op_sync(op_worker_unhandled_error),
),
])
.build()
}
+
+fn op_worker_post_message(
+ state: &mut OpState,
+ _: (),
+ buf: Option<ZeroCopyBuf>,
+) -> Result<(), AnyError> {
+ let buf = buf.ok_or_else(null_opbuf)?;
+ let handle = state.borrow::<WebWorkerInternalHandle>().clone();
+ handle
+ .post_event(WorkerEvent::Message(buf))
+ .expect("Failed to post message to host");
+ Ok(())
+}
+
+async fn op_worker_get_message(
+ state: Rc<RefCell<OpState>>,
+ _: (),
+ _: (),
+) -> Result<ZeroCopyBuf, AnyError> {
+ let temp = {
+ let a = state.borrow();
+ a.borrow::<WebWorkerInternalHandle>().clone()
+ };
+
+ let maybe_data = temp.get_message().await;
+
+ Ok(maybe_data.unwrap_or_else(ZeroCopyBuf::empty))
+}
+
+#[allow(clippy::unnecessary_wraps)]
+fn op_worker_close(state: &mut OpState, _: (), _: ()) -> Result<(), AnyError> {
+ // Notify parent that we're finished
+ let mut handle = state.borrow_mut::<WebWorkerInternalHandle>().clone();
+
+ handle.terminate();
+ Ok(())
+}
+
+/// A worker that encounters an uncaught error will pass this error
+/// to its parent worker using this op. The parent worker will use
+/// this same op to pass the error to its own parent (in case
+/// `e.preventDefault()` was not called in `worker.onerror`). This
+/// is done until the error reaches the root/ main worker.
+#[allow(clippy::unnecessary_wraps)]
+fn op_worker_unhandled_error(
+ state: &mut OpState,
+ message: String,
+ _: (),
+) -> Result<(), AnyError> {
+ let sender = state.borrow::<WebWorkerInternalHandle>().clone();
+ sender
+ .post_event(WorkerEvent::Error(generic_error(message)))
+ .expect("Failed to propagate error event to parent worker");
+ Ok(())
+}
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index f8d03850d..a5698fa6e 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -15,12 +15,11 @@ use crate::web_worker::run_web_worker;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WorkerEvent;
+use crate::web_worker::WorkerId;
use deno_core::error::custom_error;
-use deno_core::error::generic_error;
use deno_core::error::null_opbuf;
use deno_core::error::AnyError;
use deno_core::error::JsError;
-use deno_core::futures::channel::mpsc;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::serde::de;
@@ -28,7 +27,6 @@ use deno_core::serde::de::SeqAccess;
use deno_core::serde::Deserialize;
use deno_core::serde::Deserializer;
use deno_core::serde_json::json;
-use deno_core::serde_json::Value;
use deno_core::Extension;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
@@ -46,7 +44,7 @@ use std::thread::JoinHandle;
pub struct CreateWebWorkerArgs {
pub name: String,
- pub worker_id: u32,
+ pub worker_id: WorkerId,
pub parent_permissions: Permissions,
pub permissions: Permissions,
pub main_module: ModuleSpecifier,
@@ -68,13 +66,9 @@ pub struct WorkerThread {
worker_handle: WebWorkerHandle,
}
-pub type WorkersTable = HashMap<u32, WorkerThread>;
-pub type WorkerId = u32;
+pub type WorkersTable = HashMap<WorkerId, WorkerThread>;
-pub fn init(
- is_main_worker: bool,
- create_web_worker_cb: Arc<CreateWebWorkerCb>,
-) -> Extension {
+pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension {
Extension::builder()
.state(move |state| {
state.put::<WorkersTable>(WorkersTable::default());
@@ -94,20 +88,6 @@ pub fn init(
),
("op_host_post_message", op_sync(op_host_post_message)),
("op_host_get_message", op_async(op_host_get_message)),
- (
- "op_host_unhandled_error",
- op_sync(move |state, message: String, _: ()| {
- if is_main_worker {
- return Err(generic_error("Cannot be called from main worker."));
- }
-
- let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
- sender
- .try_send(WorkerEvent::Error(generic_error(message)))
- .expect("Failed to propagate error event to parent worker");
- Ok(true)
- }),
- ),
])
.build()
}
@@ -473,7 +453,7 @@ fn op_create_worker(
let worker_id = state.take::<WorkerId>();
let create_module_loader = state.take::<CreateWebWorkerCbHolder>();
state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone());
- state.put::<WorkerId>(worker_id + 1);
+ state.put::<WorkerId>(worker_id.next().unwrap());
let module_specifier = deno_core::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_else(|| "".to_string());
@@ -483,7 +463,7 @@ fn op_create_worker(
// Setup new thread
let thread_builder =
- std::thread::Builder::new().name(format!("deno-worker-{}", worker_id));
+ std::thread::Builder::new().name(format!("{}", worker_id));
// Spawn it
let join_handle = thread_builder.spawn(move || {
@@ -501,7 +481,7 @@ fn op_create_worker(
use_deno_namespace,
});
- // Send thread safe handle to newly created worker to host thread
+ // Send thread safe handle from newly created worker to host thread
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender);
@@ -512,6 +492,7 @@ fn op_create_worker(
run_web_worker(worker, module_specifier, maybe_source_code)
})?;
+ // Receive WebWorkerHandle from newly created worker
let worker_handle = handle_receiver.recv().unwrap()?;
let worker_thread = WorkerThread {
@@ -534,7 +515,7 @@ fn op_host_terminate_worker(
id: WorkerId,
_: (),
) -> Result<(), AnyError> {
- let worker_thread = state
+ let mut worker_thread = state
.borrow_mut::<WorkersTable>()
.remove(&id)
.expect("No worker handle found");
@@ -547,54 +528,53 @@ fn op_host_terminate_worker(
Ok(())
}
-fn serialize_worker_event(event: WorkerEvent) -> Value {
- match event {
- WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }),
- WorkerEvent::TerminalError(error) => match error.downcast::<JsError>() {
- Ok(js_error) => json!({
- "type": "terminalError",
- "error": {
- "message": js_error.message,
- "fileName": js_error.script_resource_name,
- "lineNumber": js_error.line_number,
- "columnNumber": js_error.start_column,
- }
- }),
- Err(error) => json!({
- "type": "terminalError",
- "error": {
- "message": error.to_string(),
- }
- }),
- },
- WorkerEvent::Error(error) => match error.downcast::<JsError>() {
- Ok(js_error) => json!({
- "type": "error",
- "error": {
- "message": js_error.message,
- "fileName": js_error.script_resource_name,
- "lineNumber": js_error.line_number,
- "columnNumber": js_error.start_column,
- }
- }),
- Err(error) => json!({
- "type": "error",
- "error": {
- "message": error.to_string(),
- }
- }),
- },
+use deno_core::serde::Serialize;
+use deno_core::serde::Serializer;
+
+impl Serialize for WorkerEvent {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let type_id = match &self {
+ WorkerEvent::Message(_) => 0_i32,
+ WorkerEvent::TerminalError(_) => 1_i32,
+ WorkerEvent::Error(_) => 2_i32,
+ WorkerEvent::Close => 3_i32,
+ };
+
+ match self {
+ WorkerEvent::Message(buf) => {
+ Serialize::serialize(&(type_id, buf), serializer)
+ }
+ WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => {
+ let value = match error.downcast_ref::<JsError>() {
+ Some(js_error) => json!({
+ "message": js_error.message,
+ "fileName": js_error.script_resource_name,
+ "lineNumber": js_error.line_number,
+ "columnNumber": js_error.start_column,
+ }),
+ None => json!({
+ "message": error.to_string(),
+ }),
+ };
+
+ Serialize::serialize(&(type_id, value), serializer)
+ }
+ _ => Serialize::serialize(&(type_id, ()), serializer),
+ }
}
}
/// Try to remove worker from workers table - NOTE: `Worker.terminate()`
/// might have been called already meaning that we won't find worker in
/// table - in that case ignore.
-fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: u32) {
+fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
let mut s = state.borrow_mut();
let workers = s.borrow_mut::<WorkersTable>();
if let Some(mut worker_thread) = workers.remove(&id) {
- worker_thread.worker_handle.sender.close_channel();
+ worker_thread.worker_handle.terminate();
worker_thread
.join_handle
.join()
@@ -608,7 +588,7 @@ async fn op_host_get_message(
state: Rc<RefCell<OpState>>,
id: WorkerId,
_: (),
-) -> Result<Value, AnyError> {
+) -> Result<WorkerEvent, AnyError> {
let worker_handle = {
let s = state.borrow();
let workers_table = s.borrow::<WorkersTable>();
@@ -617,7 +597,7 @@ async fn op_host_get_message(
handle.worker_handle.clone()
} else {
// If handle was not found it means worker has already shutdown
- return Ok(json!({ "type": "close" }));
+ return Ok(WorkerEvent::Close);
}
};
@@ -627,12 +607,12 @@ async fn op_host_get_message(
if let WorkerEvent::TerminalError(_) = &event {
try_remove_and_close(state, id);
}
- return Ok(serialize_worker_event(event));
+ return Ok(event);
}
// If there was no event from worker it means it has already been closed.
try_remove_and_close(state, id);
- Ok(json!({ "type": "close" }))
+ Ok(WorkerEvent::Close)
}
/// Post message to guest worker as host
@@ -641,8 +621,7 @@ fn op_host_post_message(
id: WorkerId,
data: Option<ZeroCopyBuf>,
) -> Result<(), AnyError> {
- let data = data.ok_or_else(null_opbuf)?;
- let msg = Vec::from(&*data).into_boxed_slice();
+ let msg = data.ok_or_else(null_opbuf)?;
debug!("post message to worker {}", id);
let worker_thread = state
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 690b6fb58..5b731a0f5 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -13,7 +13,8 @@ use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
use deno_core::futures::stream::StreamExt;
-use deno_core::futures::task::AtomicWaker;
+use deno_core::serde::Deserialize;
+use deno_core::serde::Serialize;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::url::Url;
@@ -22,12 +23,16 @@ use deno_core::Extension;
use deno_core::GetErrorClassFn;
use deno_core::JsErrorCreateFn;
use deno_core::JsRuntime;
+use deno_core::ModuleId;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
+use deno_core::ZeroCopyBuf;
use deno_file::BlobUrlStore;
use log::debug;
+use std::cell::RefCell;
use std::env;
+use std::fmt;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@@ -36,38 +41,98 @@ use std::task::Context;
use std::task::Poll;
use tokio::sync::Mutex as AsyncMutex;
+#[derive(
+ Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize,
+)]
+pub struct WorkerId(u32);
+impl fmt::Display for WorkerId {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "worker-{}", self.0)
+ }
+}
+impl WorkerId {
+ pub fn next(&self) -> Option<WorkerId> {
+ self.0.checked_add(1).map(WorkerId)
+ }
+}
+
+type WorkerMessage = ZeroCopyBuf;
+
/// Events that are sent to host from child
/// worker.
pub enum WorkerEvent {
- Message(Box<[u8]>),
+ Message(WorkerMessage),
Error(AnyError),
TerminalError(AnyError),
+ Close,
}
-pub struct WorkerChannelsInternal {
- pub sender: mpsc::Sender<WorkerEvent>,
- pub receiver: mpsc::Receiver<Box<[u8]>>,
+// Channels used for communication with worker's parent
+#[derive(Clone)]
+pub struct WebWorkerInternalHandle {
+ sender: mpsc::Sender<WorkerEvent>,
+ receiver: Rc<RefCell<mpsc::Receiver<WorkerMessage>>>,
+ terminated: Arc<AtomicBool>,
+ isolate_handle: v8::IsolateHandle,
+}
+
+impl WebWorkerInternalHandle {
+ /// Post WorkerEvent to parent as a worker
+ pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> {
+ let mut sender = self.sender.clone();
+ // If the channel is closed,
+ // the worker must have terminated but the termination message has not yet been received.
+ //
+ // Therefore just treat it as if the worker has terminated and return.
+ if sender.is_closed() {
+ self.terminated.store(true, Ordering::SeqCst);
+ return Ok(());
+ }
+ sender.try_send(event)?;
+ Ok(())
+ }
+
+ /// Get the WorkerEvent with lock
+ /// Panic if more than one listener tries to get event
+ pub async fn get_message(&self) -> Option<WorkerMessage> {
+ let mut receiver = self.receiver.borrow_mut();
+ receiver.next().await
+ }
+
+ /// Check if this worker is terminated or being terminated
+ pub fn is_terminated(&self) -> bool {
+ self.terminated.load(Ordering::SeqCst)
+ }
+
+ /// Terminate the worker
+ /// This function will set terminated to true, terminate the isolate and close the message channel
+ pub fn terminate(&mut self) {
+ // This function can be called multiple times by whomever holds
+ // the handle. However only a single "termination" should occur so
+ // we need a guard here.
+ let already_terminated = self.terminated.swap(true, Ordering::SeqCst);
+
+ if !already_terminated {
+ // Stop javascript execution
+ self.isolate_handle.terminate_execution();
+ }
+
+ // Wake parent by closing the channel
+ self.sender.close_channel();
+ }
}
-/// Wrapper for `WorkerHandle` that adds functionality
-/// for terminating workers.
-///
-/// This struct is used by host as well as worker itself.
-///
-/// Host uses it to communicate with worker and terminate it,
-/// while worker uses it only to finish execution on `self.close()`.
#[derive(Clone)]
pub struct WebWorkerHandle {
- pub sender: mpsc::Sender<Box<[u8]>>,
- pub receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
- terminate_tx: mpsc::Sender<()>,
+ sender: mpsc::Sender<WorkerMessage>,
+ receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
terminated: Arc<AtomicBool>,
isolate_handle: v8::IsolateHandle,
}
impl WebWorkerHandle {
- /// Post message to worker as a host.
- pub fn post_message(&self, buf: Box<[u8]>) -> Result<(), AnyError> {
+ /// Post WorkerMessage to worker as a host
+ pub fn post_message(&self, buf: WorkerMessage) -> Result<(), AnyError> {
let mut sender = self.sender.clone();
// If the channel is closed,
// the worker must have terminated but the termination message has not yet been recieved.
@@ -81,47 +146,50 @@ impl WebWorkerHandle {
Ok(())
}
- /// Get the event with lock.
+ /// Get the WorkerEvent with lock
/// Return error if more than one listener tries to get event
pub async fn get_event(&self) -> Result<Option<WorkerEvent>, AnyError> {
let mut receiver = self.receiver.try_lock()?;
Ok(receiver.next().await)
}
- pub fn terminate(&self) {
+ /// Terminate the worker
+ /// This function will set terminated to true, terminate the isolate and close the message channel
+ pub fn terminate(&mut self) {
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
let already_terminated = self.terminated.swap(true, Ordering::SeqCst);
if !already_terminated {
+ // Stop javascript execution
self.isolate_handle.terminate_execution();
- let mut sender = self.terminate_tx.clone();
- // This call should be infallible hence the `expect`.
- // This might change in the future.
- sender.try_send(()).expect("Failed to terminate");
}
+
+ // Wake web worker by closing the channel
+ self.sender.close_channel();
}
}
-fn create_channels(
+fn create_handles(
isolate_handle: v8::IsolateHandle,
- terminate_tx: mpsc::Sender<()>,
-) -> (WorkerChannelsInternal, WebWorkerHandle) {
- let (in_tx, in_rx) = mpsc::channel::<Box<[u8]>>(1);
+) -> (WebWorkerInternalHandle, WebWorkerHandle) {
+ let (in_tx, in_rx) = mpsc::channel::<WorkerMessage>(1);
let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1);
- let internal_channels = WorkerChannelsInternal {
+ let terminated = Arc::new(AtomicBool::new(false));
+ let internal_handle = WebWorkerInternalHandle {
sender: out_tx,
- receiver: in_rx,
+ receiver: Rc::new(RefCell::new(in_rx)),
+ terminated: terminated.clone(),
+ isolate_handle: isolate_handle.clone(),
};
- let external_channels = WebWorkerHandle {
+ let external_handle = WebWorkerHandle {
sender: in_tx,
receiver: Arc::new(AsyncMutex::new(out_rx)),
- terminated: Arc::new(AtomicBool::new(false)),
- terminate_tx,
+ terminated,
isolate_handle,
};
- (internal_channels, external_channels)
+ (internal_handle, external_handle)
}
/// This struct is an implementation of `Worker` Web API
@@ -129,17 +197,12 @@ fn create_channels(
/// Each `WebWorker` is either a child of `MainWorker` or other
/// `WebWorker`.
pub struct WebWorker {
- id: u32,
+ id: WorkerId,
inspector: Option<Box<DenoInspector>>,
- // Following fields are pub because they are accessed
- // when creating a new WebWorker instance.
- pub(crate) internal_channels: WorkerChannelsInternal,
pub js_runtime: JsRuntime,
pub name: String,
- waker: AtomicWaker,
- event_loop_idle: bool,
- terminate_rx: mpsc::Receiver<()>,
- handle: WebWorkerHandle,
+ internal_handle: WebWorkerInternalHandle,
+ external_handle: WebWorkerHandle,
pub use_deno_namespace: bool,
pub main_module: ModuleSpecifier,
}
@@ -174,7 +237,7 @@ impl WebWorker {
name: String,
permissions: Permissions,
main_module: ModuleSpecifier,
- worker_id: u32,
+ worker_id: WorkerId,
options: &WebWorkerOptions,
) -> Self {
// Permissions: many ops depend on this
@@ -218,7 +281,7 @@ impl WebWorker {
let runtime_exts = vec![
ops::web_worker::init(),
ops::runtime::init(main_module.clone()),
- ops::worker_host::init(false, options.create_web_worker_cb.clone()),
+ ops::worker_host::init(options.create_web_worker_cb.clone()),
ops::io::init(),
];
@@ -264,38 +327,24 @@ impl WebWorker {
None
};
- let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1);
- let isolate_handle = js_runtime.v8_isolate().thread_safe_handle();
- let (internal_channels, handle) =
- create_channels(isolate_handle, terminate_tx);
+ let (internal_handle, external_handle) = {
+ let handle = js_runtime.v8_isolate().thread_safe_handle();
+ let (internal_handle, external_handle) = create_handles(handle);
+ let op_state = js_runtime.op_state();
+ let mut op_state = op_state.borrow_mut();
+ op_state.put(internal_handle.clone());
+ (internal_handle, external_handle)
+ };
- let mut worker = Self {
+ Self {
id: worker_id,
inspector,
- internal_channels,
js_runtime,
name,
- waker: AtomicWaker::new(),
- event_loop_idle: false,
- terminate_rx,
- handle,
+ internal_handle,
+ external_handle,
use_deno_namespace: options.use_deno_namespace,
main_module,
- };
-
- // Setup worker-dependant OpState and return worker
- {
- let handle = worker.thread_safe_handle();
- let sender = worker.internal_channels.sender.clone();
- let js_runtime = &mut worker.js_runtime;
- let op_state = js_runtime.op_state();
- let mut op_state = op_state.borrow_mut();
-
- // Required by runtime::ops::worker_host/web_worker
- op_state.put(handle);
- op_state.put(sender);
-
- worker
}
}
@@ -321,7 +370,7 @@ impl WebWorker {
// Instead of using name for log we use `worker-${id}` because
// WebWorkers can have empty string as name.
let script = format!(
- "bootstrap.workerRuntime({}, \"{}\", {}, \"worker-{}\")",
+ "bootstrap.workerRuntime({}, \"{}\", {}, \"{}\")",
runtime_options_str, self.name, options.use_deno_namespace, self.id
);
self
@@ -338,12 +387,20 @@ impl WebWorker {
self.js_runtime.execute(url.as_str(), js_source)
}
+ /// Loads and instantiates specified JavaScript module.
+ pub async fn preload_module(
+ &mut self,
+ module_specifier: &ModuleSpecifier,
+ ) -> Result<ModuleId, AnyError> {
+ self.js_runtime.load_module(module_specifier, None).await
+ }
+
/// Loads, instantiates and executes specified JavaScript module.
pub async fn execute_module(
&mut self,
module_specifier: &ModuleSpecifier,
) -> Result<(), AnyError> {
- let id = self.js_runtime.load_module(module_specifier, None).await?;
+ let id = self.preload_module(module_specifier).await?;
let mut receiver = self.js_runtime.mod_evaluate(id);
tokio::select! {
@@ -357,8 +414,8 @@ impl WebWorker {
}
event_loop_result = self.run_event_loop() => {
- if self.has_been_terminated() {
- return Ok(());
+ if self.internal_handle.is_terminated() {
+ return Ok(());
}
event_loop_result?;
let maybe_result = receiver.next().await;
@@ -370,82 +427,44 @@ impl WebWorker {
/// Returns a way to communicate with the Worker from other threads.
pub fn thread_safe_handle(&self) -> WebWorkerHandle {
- self.handle.clone()
- }
-
- pub fn has_been_terminated(&self) -> bool {
- self.handle.terminated.load(Ordering::SeqCst)
+ self.external_handle.clone()
}
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
- if self.has_been_terminated() {
+ // If awakened because we are terminating, just return Ok
+ if self.internal_handle.is_terminated() {
return Poll::Ready(Ok(()));
}
- if !self.event_loop_idle {
- let poll_result = {
- // We always poll the inspector if it exists.
- let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx));
- self.waker.register(cx.waker());
- self.js_runtime.poll_event_loop(cx)
- };
-
- if let Poll::Ready(r) = poll_result {
- if self.has_been_terminated() {
+ // We always poll the inspector if it exists.
+ let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx));
+ match self.js_runtime.poll_event_loop(cx) {
+ Poll::Ready(r) => {
+ // If js ended because we are terminating, just return Ok
+ if self.internal_handle.is_terminated() {
return Poll::Ready(Ok(()));
}
+ // In case of an error, pass to parent without terminating worker
if let Err(e) = r {
print_worker_error(e.to_string(), &self.name);
- let mut sender = self.internal_channels.sender.clone();
- sender
- .try_send(WorkerEvent::Error(e))
+ let handle = self.internal_handle.clone();
+ handle
+ .post_event(WorkerEvent::Error(e))
.expect("Failed to post message to host");
- }
- self.event_loop_idle = true;
- }
- }
-
- if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) {
- // terminate_rx should never be closed
- assert!(r.is_some());
- return Poll::Ready(Ok(()));
- }
- let maybe_msg_poll_result =
- self.internal_channels.receiver.poll_next_unpin(cx);
-
- if let Poll::Ready(maybe_msg) = maybe_msg_poll_result {
- let msg =
- maybe_msg.expect("Received `None` instead of message in worker");
- let msg = String::from_utf8(msg.to_vec()).unwrap();
- let script = format!("workerMessageRecvCallback({})", msg);
-
- // TODO(bartlomieju): set proper script name like "deno:runtime/web_worker.js"
- // so it's dimmed in stack trace instead of using "__anonymous__"
- if let Err(e) = self.execute(&script) {
- // If execution was terminated during message callback then
- // just ignore it
- if self.has_been_terminated() {
- return Poll::Ready(Ok(()));
+ return Poll::Pending;
}
- // Otherwise forward error to host
- let mut sender = self.internal_channels.sender.clone();
- sender
- .try_send(WorkerEvent::Error(e))
- .expect("Failed to post message to host");
+ panic!(
+ "coding error: either js is polling or the worker is terminated"
+ );
}
-
- // Let event loop be polled again
- self.event_loop_idle = false;
- self.waker.wake();
+ Poll::Pending => Poll::Pending,
}
-
- Poll::Pending
}
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
@@ -495,18 +514,18 @@ pub fn run_web_worker(
rt.block_on(load_future)
};
- let mut sender = worker.internal_channels.sender.clone();
+ let internal_handle = worker.internal_handle.clone();
// If sender is closed it means that worker has already been closed from
// within using "globalThis.close()"
- if sender.is_closed() {
+ if internal_handle.is_terminated() {
return Ok(());
}
if let Err(e) = result {
print_worker_error(e.to_string(), &name);
- sender
- .try_send(WorkerEvent::TerminalError(e))
+ internal_handle
+ .post_event(WorkerEvent::TerminalError(e))
.expect("Failed to post message to host");
// Failure to execute script is a terminal error, bye, bye.
@@ -522,7 +541,6 @@ pub fn run_web_worker(
mod tests {
use super::*;
use crate::tokio_util;
- use deno_core::serde_json::json;
fn create_test_web_worker() -> WebWorker {
let main_module = deno_core::resolve_url_or_path("./hello.js").unwrap();
@@ -554,7 +572,7 @@ mod tests {
"TEST".to_string(),
Permissions::allow_all(),
main_module,
- 1,
+ WorkerId(1),
&options,
);
worker.bootstrap(&options);
@@ -589,30 +607,30 @@ mod tests {
let mut handle = handle_receiver.recv().unwrap();
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = handle.post_message(msg.clone());
+ // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
+ let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded
+ let r = handle.post_message(msg.clone().into());
assert!(r.is_ok());
let maybe_msg = handle.get_event().await.unwrap();
assert!(maybe_msg.is_some());
- let r = handle.post_message(msg.clone());
+ let r = handle.post_message(msg.clone().into());
assert!(r.is_ok());
let maybe_msg = handle.get_event().await.unwrap();
assert!(maybe_msg.is_some());
match maybe_msg {
Some(WorkerEvent::Message(buf)) => {
- assert_eq!(*buf, *b"[1,2,3]");
+ // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
+ assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]);
}
_ => unreachable!(),
}
- let msg = json!("exit")
- .to_string()
- .into_boxed_str()
- .into_boxed_bytes();
- let r = handle.post_message(msg);
+ // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
+ let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded
+ let r = handle.post_message(msg.into());
assert!(r.is_ok());
let event = handle.get_event().await.unwrap();
assert!(event.is_none());
@@ -636,8 +654,9 @@ mod tests {
let mut handle = handle_receiver.recv().unwrap();
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = handle.post_message(msg.clone());
+ // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
+ let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded
+ let r = handle.post_message(msg.clone().into());
assert!(r.is_ok());
let event = handle.get_event().await.unwrap();
assert!(event.is_none());
diff --git a/runtime/worker.rs b/runtime/worker.rs
index ab54e2153..c75f09dc8 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -113,7 +113,7 @@ impl MainWorker {
metrics::init(),
// Runtime ops
ops::runtime::init(main_module),
- ops::worker_host::init(true, options.create_web_worker_cb.clone()),
+ ops::worker_host::init(options.create_web_worker_cb.clone()),
ops::fs_events::init(),
ops::fs::init(),
ops::http::init(),
diff --git a/serde_v8/src/magic/buffer.rs b/serde_v8/src/magic/buffer.rs
index 893bf35e1..1fcfffc72 100644
--- a/serde_v8/src/magic/buffer.rs
+++ b/serde_v8/src/magic/buffer.rs
@@ -1,9 +1,9 @@
use rusty_v8 as v8;
-use std::cell::Cell;
use std::fmt;
use std::ops::Deref;
use std::ops::DerefMut;
+use std::sync::Mutex;
use super::zero_copy_buf::ZeroCopyBuf;
@@ -11,7 +11,7 @@ use super::zero_copy_buf::ZeroCopyBuf;
// allowing us to use a single type for familiarity
pub enum MagicBuffer {
FromV8(ZeroCopyBuf),
- ToV8(Cell<Option<Box<[u8]>>>),
+ ToV8(Mutex<Option<Box<[u8]>>>),
}
impl MagicBuffer {
@@ -21,6 +21,10 @@ impl MagicBuffer {
) -> Self {
Self::FromV8(ZeroCopyBuf::new(scope, view))
}
+
+ pub fn empty() -> Self {
+ MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice())))
+ }
}
impl Clone for MagicBuffer {
@@ -65,7 +69,7 @@ impl DerefMut for MagicBuffer {
impl From<Box<[u8]>> for MagicBuffer {
fn from(buf: Box<[u8]>) -> Self {
- MagicBuffer::ToV8(Cell::new(Some(buf)))
+ MagicBuffer::ToV8(Mutex::new(Some(buf)))
}
}
@@ -88,8 +92,11 @@ impl serde::Serialize for MagicBuffer {
let mut s = serializer.serialize_struct(BUF_NAME, 1)?;
let boxed: Box<[u8]> = match self {
- Self::FromV8(_) => unreachable!(),
- Self::ToV8(x) => x.take().expect("MagicBuffer was empty"),
+ Self::FromV8(buf) => {
+ let value: &[u8] = &buf;
+ value.into()
+ }
+ Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"),
};
let hack: [usize; 2] = unsafe { std::mem::transmute(boxed) };
let f1: u64 = hack[0] as u64;