summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/node/polyfills/worker_threads.ts25
-rw-r--r--ext/web/13_message_port.js2
-rw-r--r--ext/web/internal.d.ts6
-rw-r--r--ext/web/lib.rs2
-rw-r--r--ext/web/message_port.rs20
5 files changed, 52 insertions, 3 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index fe3425972..74abf5bb5 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -8,21 +8,25 @@ import {
op_host_recv_ctrl,
op_host_recv_message,
op_host_terminate_worker,
+ op_message_port_recv_message_sync,
op_require_read_closest_package_json,
} from "ext:core/ops";
-import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import {
deserializeJsMessageData,
MessageChannel,
MessagePort,
+ MessagePortIdSymbol,
+ MessagePortPrototype,
serializeJsMessageData,
} from "ext:deno_web/13_message_port.js";
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { log } from "ext:runtime/06_util.js";
import { notImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter, once } from "node:events";
+import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import { isAbsolute, resolve } from "node:path";
+const { ObjectPrototypeIsPrototypeOf } = primordials;
const {
Error,
Symbol,
@@ -496,9 +500,24 @@ export function markAsUntransferable() {
export function moveMessagePortToContext() {
notImplemented("moveMessagePortToContext");
}
-export function receiveMessageOnPort() {
- notImplemented("receiveMessageOnPort");
+
+/**
+ * @param { MessagePort } port
+ * @returns {object | undefined}
+ */
+export function receiveMessageOnPort(port: MessagePort): object | undefined {
+ if (!(ObjectPrototypeIsPrototypeOf(MessagePortPrototype, port))) {
+ const err = new TypeError(
+ 'The "port" argument must be a MessagePort instance',
+ );
+ err["code"] = "ERR_INVALID_ARG_TYPE";
+ throw err;
+ }
+ const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]);
+ if (data === null) return undefined;
+ return { message: deserializeJsMessageData(data)[0] };
}
+
export {
BroadcastChannel,
MessageChannel,
diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js
index b6a308716..bd7712c5f 100644
--- a/ext/web/13_message_port.js
+++ b/ext/web/13_message_port.js
@@ -83,6 +83,7 @@ webidl.configureInterface(MessageChannel);
const MessageChannelPrototype = MessageChannel.prototype;
const _id = Symbol("id");
+const MessagePortIdSymbol = _id;
const _enabled = Symbol("enabled");
/**
@@ -380,6 +381,7 @@ export {
deserializeJsMessageData,
MessageChannel,
MessagePort,
+ MessagePortIdSymbol,
MessagePortPrototype,
serializeJsMessageData,
structuredClone,
diff --git a/ext/web/internal.d.ts b/ext/web/internal.d.ts
index 4af04b071..b2aea80d9 100644
--- a/ext/web/internal.d.ts
+++ b/ext/web/internal.d.ts
@@ -110,4 +110,10 @@ declare module "ext:deno_web/13_message_port.js" {
data: Uint8Array;
transferables: Transferable[];
}
+ const MessageChannel: typeof MessageChannel;
+ const MessagePort: typeof MessagePort;
+ const MessagePortIdSymbol: typeof MessagePortIdSymbol;
+ function deserializeJsMessageData(
+ messageData: messagePort.MessageData,
+ ): [object, object[]];
}
diff --git a/ext/web/lib.rs b/ext/web/lib.rs
index 332102fcc..60a0cc0d7 100644
--- a/ext/web/lib.rs
+++ b/ext/web/lib.rs
@@ -46,6 +46,7 @@ pub use crate::message_port::create_entangled_message_port;
use crate::message_port::op_message_port_create_entangled;
use crate::message_port::op_message_port_post_message;
use crate::message_port::op_message_port_recv_message;
+use crate::message_port::op_message_port_recv_message_sync;
pub use crate::message_port::JsMessageData;
pub use crate::message_port::MessagePort;
@@ -78,6 +79,7 @@ deno_core::extension!(deno_web,
op_message_port_create_entangled,
op_message_port_post_message,
op_message_port_recv_message,
+ op_message_port_recv_message_sync,
compression::op_compression_new,
compression::op_compression_write,
compression::op_compression_finish,
diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs
index 556030965..18429a179 100644
--- a/ext/web/message_port.rs
+++ b/ext/web/message_port.rs
@@ -17,6 +17,7 @@ use deno_core::Resource;
use deno_core::ResourceId;
use serde::Deserialize;
use serde::Serialize;
+use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
@@ -227,3 +228,22 @@ pub async fn op_message_port_recv_message(
let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
resource.port.recv(state).or_cancel(cancel).await?
}
+
+#[op2]
+#[serde]
+pub fn op_message_port_recv_message_sync(
+ state: &mut OpState, // Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+) -> Result<Option<JsMessageData>, AnyError> {
+ let resource = state.resource_table.get::<MessagePortResource>(rid)?;
+ let mut rx = resource.port.rx.borrow_mut();
+
+ match rx.try_recv() {
+ Ok((d, t)) => Ok(Some(JsMessageData {
+ data: d,
+ transferables: serialize_transferables(state, t),
+ })),
+ Err(TryRecvError::Empty) => Ok(None),
+ Err(TryRecvError::Disconnected) => Ok(None),
+ }
+}