diff options
author | mash-graz <mash-graz@users.noreply.github.com> | 2024-03-11 00:23:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-11 00:23:06 +0100 |
commit | 80dbcd3ddf589d20ae7786c716959c6d48ea1f1e (patch) | |
tree | 55967de66db8f4654539f462e6eea33385179aa3 | |
parent | 16dbbfa64a5d2905580535c52c1db51d1cf5b89f (diff) |
fix(ext/node) implement receiveMessageOnPort for node:worker_threads (#22766)
Implementation of `receiveMessageOnPort` for `node:worker_threads`
Fixes: #22702
-rw-r--r-- | ext/node/polyfills/worker_threads.ts | 25 | ||||
-rw-r--r-- | ext/web/13_message_port.js | 2 | ||||
-rw-r--r-- | ext/web/internal.d.ts | 6 | ||||
-rw-r--r-- | ext/web/lib.rs | 2 | ||||
-rw-r--r-- | ext/web/message_port.rs | 20 | ||||
-rw-r--r-- | tests/node_compat/config.jsonc | 3 | ||||
-rw-r--r-- | tests/node_compat/test/parallel/test-worker-message-port-receive-message.js | 40 | ||||
-rw-r--r-- | tools/node_compat/TODO.md | 1 |
8 files changed, 95 insertions, 4 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index fe3425972..74abf5bb5 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -8,21 +8,25 @@ import { op_host_recv_ctrl, op_host_recv_message, op_host_terminate_worker, + op_message_port_recv_message_sync, op_require_read_closest_package_json, } from "ext:core/ops"; -import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; import { deserializeJsMessageData, MessageChannel, MessagePort, + MessagePortIdSymbol, + MessagePortPrototype, serializeJsMessageData, } from "ext:deno_web/13_message_port.js"; import * as webidl from "ext:deno_webidl/00_webidl.js"; import { log } from "ext:runtime/06_util.js"; import { notImplemented } from "ext:deno_node/_utils.ts"; import { EventEmitter, once } from "node:events"; +import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; import { isAbsolute, resolve } from "node:path"; +const { ObjectPrototypeIsPrototypeOf } = primordials; const { Error, Symbol, @@ -496,9 +500,24 @@ export function markAsUntransferable() { export function moveMessagePortToContext() { notImplemented("moveMessagePortToContext"); } -export function receiveMessageOnPort() { - notImplemented("receiveMessageOnPort"); + +/** + * @param { MessagePort } port + * @returns {object | undefined} + */ +export function receiveMessageOnPort(port: MessagePort): object | undefined { + if (!(ObjectPrototypeIsPrototypeOf(MessagePortPrototype, port))) { + const err = new TypeError( + 'The "port" argument must be a MessagePort instance', + ); + err["code"] = "ERR_INVALID_ARG_TYPE"; + throw err; + } + const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]); + if (data === null) return undefined; + return { message: deserializeJsMessageData(data)[0] }; } + export { BroadcastChannel, MessageChannel, diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index b6a308716..bd7712c5f 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -83,6 +83,7 @@ webidl.configureInterface(MessageChannel); const MessageChannelPrototype = MessageChannel.prototype; const _id = Symbol("id"); +const MessagePortIdSymbol = _id; const _enabled = Symbol("enabled"); /** @@ -380,6 +381,7 @@ export { deserializeJsMessageData, MessageChannel, MessagePort, + MessagePortIdSymbol, MessagePortPrototype, serializeJsMessageData, structuredClone, diff --git a/ext/web/internal.d.ts b/ext/web/internal.d.ts index 4af04b071..b2aea80d9 100644 --- a/ext/web/internal.d.ts +++ b/ext/web/internal.d.ts @@ -110,4 +110,10 @@ declare module "ext:deno_web/13_message_port.js" { data: Uint8Array; transferables: Transferable[]; } + const MessageChannel: typeof MessageChannel; + const MessagePort: typeof MessagePort; + const MessagePortIdSymbol: typeof MessagePortIdSymbol; + function deserializeJsMessageData( + messageData: messagePort.MessageData, + ): [object, object[]]; } diff --git a/ext/web/lib.rs b/ext/web/lib.rs index 332102fcc..60a0cc0d7 100644 --- a/ext/web/lib.rs +++ b/ext/web/lib.rs @@ -46,6 +46,7 @@ pub use crate::message_port::create_entangled_message_port; use crate::message_port::op_message_port_create_entangled; use crate::message_port::op_message_port_post_message; use crate::message_port::op_message_port_recv_message; +use crate::message_port::op_message_port_recv_message_sync; pub use crate::message_port::JsMessageData; pub use crate::message_port::MessagePort; @@ -78,6 +79,7 @@ deno_core::extension!(deno_web, op_message_port_create_entangled, op_message_port_post_message, op_message_port_recv_message, + op_message_port_recv_message_sync, compression::op_compression_new, compression::op_compression_write, compression::op_compression_finish, diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs index 556030965..18429a179 100644 --- a/ext/web/message_port.rs +++ b/ext/web/message_port.rs @@ -17,6 +17,7 @@ use deno_core::Resource; use deno_core::ResourceId; use serde::Deserialize; use serde::Serialize; +use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedSender; @@ -227,3 +228,22 @@ pub async fn op_message_port_recv_message( let cancel = RcRef::map(resource.clone(), |r| &r.cancel); resource.port.recv(state).or_cancel(cancel).await? } + +#[op2] +#[serde] +pub fn op_message_port_recv_message_sync( + state: &mut OpState, // Rc<RefCell<OpState>>, + #[smi] rid: ResourceId, +) -> Result<Option<JsMessageData>, AnyError> { + let resource = state.resource_table.get::<MessagePortResource>(rid)?; + let mut rx = resource.port.rx.borrow_mut(); + + match rx.try_recv() { + Ok((d, t)) => Ok(Some(JsMessageData { + data: d, + transferables: serialize_transferables(state, t), + })), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Ok(None), + } +} diff --git a/tests/node_compat/config.jsonc b/tests/node_compat/config.jsonc index 3fde8be5a..c5902f6f9 100644 --- a/tests/node_compat/config.jsonc +++ b/tests/node_compat/config.jsonc @@ -104,6 +104,8 @@ "test-util.js", "test-webcrypto-sign-verify.js", "test-whatwg-url-properties.js", + // needs replace ".on" => ".addEventListener" in L29 + "test-worker-message-port-receive-message.js", "test-zlib-convenience-methods.js", "test-zlib-empty-buffer.js", "test-zlib-invalid-input.js", @@ -665,6 +667,7 @@ "test-whatwg-url-custom-tostringtag.js", "test-whatwg-url-override-hostname.js", "test-whatwg-url-properties.js", + "test-worker-message-port-receive-message.js", "test-zlib-close-after-error.js", "test-zlib-close-after-write.js", "test-zlib-convenience-methods.js", diff --git a/tests/node_compat/test/parallel/test-worker-message-port-receive-message.js b/tests/node_compat/test/parallel/test-worker-message-port-receive-message.js new file mode 100644 index 000000000..3945a8a1f --- /dev/null +++ b/tests/node_compat/test/parallel/test-worker-message-port-receive-message.js @@ -0,0 +1,40 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually. + +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { MessageChannel, receiveMessageOnPort } = require('worker_threads'); + +const { port1, port2 } = new MessageChannel(); + +const message1 = { hello: 'world' }; +const message2 = { foo: 'bar' }; + +// Make sure receiveMessageOnPort() works in a FIFO way, the same way it does +// when we’re using events. +assert.strictEqual(receiveMessageOnPort(port2), undefined); +port1.postMessage(message1); +port1.postMessage(message2); +assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 }); +assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message2 }); +assert.strictEqual(receiveMessageOnPort(port2), undefined); +assert.strictEqual(receiveMessageOnPort(port2), undefined); + +// Make sure message handlers aren’t called. +port2.addEventListener('message', common.mustNotCall()); +port1.postMessage(message1); +assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 }); +port1.close(); + +for (const value of [null, 0, -1, {}, []]) { + assert.throws(() => receiveMessageOnPort(value), { + name: 'TypeError', + code: 'ERR_INVALID_ARG_TYPE', + message: 'The "port" argument must be a MessagePort instance' + }); +} diff --git a/tools/node_compat/TODO.md b/tools/node_compat/TODO.md index d47ba9d9d..5e1cd9784 100644 --- a/tools/node_compat/TODO.md +++ b/tools/node_compat/TODO.md @@ -2710,7 +2710,6 @@ Total: 2998 - [parallel/test-worker-message-port-message-port-transferring.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-message-port-transferring.js) - [parallel/test-worker-message-port-move.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-move.js) - [parallel/test-worker-message-port-multiple-sharedarraybuffers.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-multiple-sharedarraybuffers.js) -- [parallel/test-worker-message-port-receive-message.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-receive-message.js) - [parallel/test-worker-message-port-terminate-transfer-list.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-terminate-transfer-list.js) - [parallel/test-worker-message-port-transfer-closed.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-transfer-closed.js) - [parallel/test-worker-message-port-transfer-duplicate.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-transfer-duplicate.js) |