summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2024-03-16 00:59:18 +0000
committerGitHub <noreply@github.com>2024-03-16 01:59:18 +0100
commit92576fdcfd3e32dce63b533ab20d4974136b097d (patch)
tree6f3b61c4d1d10569e4bb54f8ee59fb5f1ff274da
parentebbc897b69f906d88a99768a2fff7661e2894670 (diff)
fix(ext/node): support MessagePort in `WorkerOptions.workerData` (#22950)
This commit fixes passing `MessagePort` instances to `WorkerOptions.workerData`. Before they were not serialized and deserialized properly when spawning a worker thread. Closes https://github.com/denoland/deno/issues/22935
-rw-r--r--ext/web/lib.rs3
-rw-r--r--ext/web/message_port.rs11
-rw-r--r--runtime/ops/worker_host.rs15
-rw-r--r--runtime/web_worker.rs26
-rw-r--r--tests/unit_node/worker_threads_test.ts49
5 files changed, 92 insertions, 12 deletions
diff --git a/ext/web/lib.rs b/ext/web/lib.rs
index 60a0cc0d7..74ed78c7e 100644
--- a/ext/web/lib.rs
+++ b/ext/web/lib.rs
@@ -43,12 +43,15 @@ pub use crate::blob::BlobStore;
pub use crate::blob::InMemoryBlobPart;
pub use crate::message_port::create_entangled_message_port;
+pub use crate::message_port::deserialize_js_transferables;
use crate::message_port::op_message_port_create_entangled;
use crate::message_port::op_message_port_post_message;
use crate::message_port::op_message_port_recv_message;
use crate::message_port::op_message_port_recv_message_sync;
+pub use crate::message_port::serialize_transferables;
pub use crate::message_port::JsMessageData;
pub use crate::message_port::MessagePort;
+pub use crate::message_port::Transferable;
use crate::timers::op_defer;
use crate::timers::op_now;
diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs
index 18429a179..1cd29c64d 100644
--- a/ext/web/message_port.rs
+++ b/ext/web/message_port.rs
@@ -22,7 +22,7 @@ use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
-enum Transferable {
+pub enum Transferable {
MessagePort(MessagePort),
ArrayBuffer(u32),
}
@@ -140,7 +140,7 @@ pub enum JsTransferable {
ArrayBuffer(u32),
}
-fn deserialize_js_transferables(
+pub fn deserialize_js_transferables(
state: &mut OpState,
js_transferables: Vec<JsTransferable>,
) -> Result<Vec<Transferable>, AnyError> {
@@ -165,7 +165,7 @@ fn deserialize_js_transferables(
Ok(transferables)
}
-fn serialize_transferables(
+pub fn serialize_transferables(
state: &mut OpState,
transferables: Vec<Transferable>,
) -> Vec<JsTransferable> {
@@ -189,8 +189,8 @@ fn serialize_transferables(
#[derive(Deserialize, Serialize)]
pub struct JsMessageData {
- data: DetachedBuffer,
- transferables: Vec<JsTransferable>,
+ pub data: DetachedBuffer,
+ pub transferables: Vec<JsTransferable>,
}
#[op2]
@@ -208,7 +208,6 @@ pub fn op_message_port_post_message(
}
let resource = state.resource_table.get::<MessagePortResource>(rid)?;
-
resource.port.send(state, data)
}
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index 242d3bcda..e3360b830 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -11,6 +11,7 @@ use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WebWorkerType;
use crate::web_worker::WorkerControlEvent;
use crate::web_worker::WorkerId;
+use crate::web_worker::WorkerMetadata;
use crate::worker::FormatJsErrorFn;
use deno_core::error::AnyError;
use deno_core::op2;
@@ -19,6 +20,7 @@ use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
+use deno_web::deserialize_js_transferables;
use deno_web::JsMessageData;
use log::debug;
use std::cell::RefCell;
@@ -36,7 +38,7 @@ pub struct CreateWebWorkerArgs {
pub main_module: ModuleSpecifier,
pub worker_type: WebWorkerType,
pub close_on_idle: bool,
- pub maybe_worker_metadata: Option<JsMessageData>,
+ pub maybe_worker_metadata: Option<WorkerMetadata>,
}
pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
@@ -175,7 +177,16 @@ fn op_create_worker(
// Setup new thread
let thread_builder = std::thread::Builder::new().name(format!("{worker_id}"));
-
+ let maybe_worker_metadata = if let Some(data) = maybe_worker_metadata {
+ let transferables =
+ deserialize_js_transferables(state, data.transferables)?;
+ Some(WorkerMetadata {
+ buffer: data.data,
+ transferables,
+ })
+ } else {
+ None
+ };
// Spawn it
thread_builder.spawn(move || {
// Any error inside this block is terminal:
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 55749ca27..27fe633ad 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -27,6 +27,7 @@ use deno_core::serde_json::json;
use deno_core::v8;
use deno_core::CancelHandle;
use deno_core::CompiledWasmModuleStore;
+use deno_core::DetachedBuffer;
use deno_core::Extension;
use deno_core::FeatureChecker;
use deno_core::GetErrorClassFn;
@@ -47,9 +48,11 @@ use deno_kv::dynamic::MultiBackendDbHandler;
use deno_terminal::colors;
use deno_tls::RootCertStoreProvider;
use deno_web::create_entangled_message_port;
+use deno_web::serialize_transferables;
use deno_web::BlobStore;
use deno_web::JsMessageData;
use deno_web::MessagePort;
+use deno_web::Transferable;
use log::debug;
use std::cell::RefCell;
use std::fmt;
@@ -61,6 +64,11 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
+pub struct WorkerMetadata {
+ pub buffer: DetachedBuffer,
+ pub transferables: Vec<Transferable>,
+}
+
static WORKER_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
@@ -343,7 +351,7 @@ pub struct WebWorker {
has_message_event_listener_fn: Option<v8::Global<v8::Value>>,
bootstrap_fn_global: Option<v8::Global<v8::Function>>,
// Consumed when `bootstrap_fn` is called
- maybe_worker_metadata: Option<JsMessageData>,
+ maybe_worker_metadata: Option<WorkerMetadata>,
}
pub struct WebWorkerOptions {
@@ -371,7 +379,7 @@ pub struct WebWorkerOptions {
pub feature_checker: Arc<FeatureChecker>,
pub strace_ops: Option<Vec<String>>,
pub close_on_idle: bool,
- pub maybe_worker_metadata: Option<JsMessageData>,
+ pub maybe_worker_metadata: Option<WorkerMetadata>,
}
impl WebWorker {
@@ -622,7 +630,8 @@ impl WebWorker {
}
pub fn bootstrap(&mut self, options: &BootstrapOptions) {
- self.js_runtime.op_state().borrow_mut().put(options.clone());
+ let op_state = self.js_runtime.op_state();
+ op_state.borrow_mut().put(options.clone());
// Instead of using name for log we use `worker-${id}` because
// WebWorkers can have empty string as name.
{
@@ -633,7 +642,16 @@ impl WebWorker {
let undefined = v8::undefined(scope);
let mut worker_data: v8::Local<v8::Value> = v8::undefined(scope).into();
if let Some(data) = self.maybe_worker_metadata.take() {
- worker_data = deno_core::serde_v8::to_v8(scope, data).unwrap();
+ let js_transferables = serialize_transferables(
+ &mut op_state.borrow_mut(),
+ data.transferables,
+ );
+ let js_message_data = JsMessageData {
+ data: data.buffer,
+ transferables: js_transferables,
+ };
+ worker_data =
+ deno_core::serde_v8::to_v8(scope, js_message_data).unwrap();
}
let name_str: v8::Local<v8::Value> =
v8::String::new(scope, &self.name).unwrap().into();
diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts
index 1ded9a591..f2ce00c84 100644
--- a/tests/unit_node/worker_threads_test.ts
+++ b/tests/unit_node/worker_threads_test.ts
@@ -238,3 +238,52 @@ Deno.test({
},
sanitizeResources: false,
});
+
+Deno.test({
+ name: "[worker_threads] Worker workerData with MessagePort",
+ async fn() {
+ const { port1: mainPort, port2: workerPort } = new workerThreads
+ .MessageChannel();
+ const deferred = Promise.withResolvers<void>();
+ const worker = new workerThreads.Worker(
+ `
+ import {
+ isMainThread,
+ MessageChannel,
+ parentPort,
+ receiveMessageOnPort,
+ Worker,
+ workerData,
+ } from "node:worker_threads";
+ parentPort.on("message", (msg) => {
+ console.log("message from main", msg);
+ parentPort.postMessage("Hello from worker on parentPort!");
+ workerData.workerPort.postMessage("Hello from worker on workerPort!");
+ });
+ `,
+ {
+ eval: true,
+ workerData: { workerPort },
+ transferList: [workerPort],
+ },
+ );
+
+ worker.on("message", (data) => {
+ assertEquals(data, "Hello from worker on parentPort!");
+ // TODO(bartlomieju): it would be better to use `mainPort.on("message")`,
+ // but we currently don't support it.
+ // https://github.com/denoland/deno/issues/22951
+ // Wait a bit so the message can arrive.
+ setTimeout(() => {
+ const msg = workerThreads.receiveMessageOnPort(mainPort)!.message;
+ assertEquals(msg, "Hello from worker on workerPort!");
+ deferred.resolve();
+ }, 500);
+ });
+
+ worker.postMessage("Hello from parent");
+ await deferred.promise;
+ await worker.terminate();
+ mainPort.close();
+ },
+});