summaryrefslogtreecommitdiff
path: root/ext/web/message_port.rs
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2021-08-11 12:27:05 +0200
committerGitHub <noreply@github.com>2021-08-11 12:27:05 +0200
commita0285e2eb88f6254f6494b0ecd1878db3a3b2a58 (patch)
tree90671b004537e20f9493fd3277ffd21d30b39a0e /ext/web/message_port.rs
parent3a6994115176781b3a93d70794b1b81bc95e42b4 (diff)
Rename extensions/ directory to ext/ (#11643)
Diffstat (limited to 'ext/web/message_port.rs')
-rw-r--r--ext/web/message_port.rs217
1 files changed, 217 insertions, 0 deletions
diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs
new file mode 100644
index 000000000..f73d0486a
--- /dev/null
+++ b/ext/web/message_port.rs
@@ -0,0 +1,217 @@
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::rc::Rc;
+
+use deno_core::error::bad_resource_id;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::ZeroCopyBuf;
+use deno_core::{CancelFuture, Resource};
+use deno_core::{CancelHandle, OpState};
+use deno_core::{RcRef, ResourceId};
+use serde::Deserialize;
+use serde::Serialize;
+use tokio::sync::mpsc::unbounded_channel;
+use tokio::sync::mpsc::UnboundedReceiver;
+use tokio::sync::mpsc::UnboundedSender;
+
+enum Transferable {
+ MessagePort(MessagePort),
+}
+
+type MessagePortMessage = (Vec<u8>, Vec<Transferable>);
+
+pub struct MessagePort {
+ rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
+ tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
+}
+
+impl MessagePort {
+ pub fn send(
+ &self,
+ state: &mut OpState,
+ data: JsMessageData,
+ ) -> Result<(), AnyError> {
+ let transferables =
+ deserialize_js_transferables(state, data.transferables)?;
+
+ // Swallow the failed to send error. It means the channel was disentangled,
+ // but not cleaned up.
+ if let Some(tx) = &*self.tx.borrow() {
+ tx.send((data.data.to_vec(), transferables)).ok();
+ }
+
+ Ok(())
+ }
+
+ pub async fn recv(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Option<JsMessageData>, AnyError> {
+ let mut rx = self
+ .rx
+ .try_borrow_mut()
+ .map_err(|_| type_error("Port receiver is already borrowed"))?;
+ if let Some((data, transferables)) = rx.recv().await {
+ let js_transferables =
+ serialize_transferables(&mut state.borrow_mut(), transferables);
+ return Ok(Some(JsMessageData {
+ data: ZeroCopyBuf::from(data),
+ transferables: js_transferables,
+ }));
+ }
+ Ok(None)
+ }
+
+ /// This forcefully disconnects the message port from its paired port. This
+ /// will wake up the `.recv` on the paired port, which will return `Ok(None)`.
+ pub fn disentangle(&self) {
+ let mut tx = self.tx.borrow_mut();
+ tx.take();
+ }
+}
+
+pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
+ let (port1_tx, port2_rx) = unbounded_channel::<MessagePortMessage>();
+ let (port2_tx, port1_rx) = unbounded_channel::<MessagePortMessage>();
+
+ let port1 = MessagePort {
+ rx: RefCell::new(port1_rx),
+ tx: RefCell::new(Some(port1_tx)),
+ };
+
+ let port2 = MessagePort {
+ rx: RefCell::new(port2_rx),
+ tx: RefCell::new(Some(port2_tx)),
+ };
+
+ (port1, port2)
+}
+
+pub struct MessagePortResource {
+ port: MessagePort,
+ cancel: CancelHandle,
+}
+
+impl Resource for MessagePortResource {
+ fn name(&self) -> Cow<str> {
+ "messagePort".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
+
+pub fn op_message_port_create_entangled(
+ state: &mut OpState,
+ _: (),
+ _: (),
+) -> Result<(ResourceId, ResourceId), AnyError> {
+ let (port1, port2) = create_entangled_message_port();
+
+ let port1_id = state.resource_table.add(MessagePortResource {
+ port: port1,
+ cancel: CancelHandle::new(),
+ });
+
+ let port2_id = state.resource_table.add(MessagePortResource {
+ port: port2,
+ cancel: CancelHandle::new(),
+ });
+
+ Ok((port1_id, port2_id))
+}
+
+#[derive(Deserialize, Serialize)]
+#[serde(tag = "kind", content = "data", rename_all = "camelCase")]
+pub enum JsTransferable {
+ #[serde(rename_all = "camelCase")]
+ MessagePort(ResourceId),
+}
+
+fn deserialize_js_transferables(
+ state: &mut OpState,
+ js_transferables: Vec<JsTransferable>,
+) -> Result<Vec<Transferable>, AnyError> {
+ let mut transferables = Vec::with_capacity(js_transferables.len());
+ for js_transferable in js_transferables {
+ match js_transferable {
+ JsTransferable::MessagePort(id) => {
+ let resource = state
+ .resource_table
+ .take::<MessagePortResource>(id)
+ .ok_or_else(|| type_error("Invalid message port transfer"))?;
+ resource.cancel.cancel();
+ let resource = Rc::try_unwrap(resource)
+ .map_err(|_| type_error("Message port is not ready for transfer"))?;
+ transferables.push(Transferable::MessagePort(resource.port));
+ }
+ }
+ }
+ Ok(transferables)
+}
+
+fn serialize_transferables(
+ state: &mut OpState,
+ transferables: Vec<Transferable>,
+) -> Vec<JsTransferable> {
+ let mut js_transferables = Vec::with_capacity(transferables.len());
+ for transferable in transferables {
+ match transferable {
+ Transferable::MessagePort(port) => {
+ let rid = state.resource_table.add(MessagePortResource {
+ port,
+ cancel: CancelHandle::new(),
+ });
+ js_transferables.push(JsTransferable::MessagePort(rid));
+ }
+ }
+ }
+ js_transferables
+}
+
+#[derive(Deserialize, Serialize)]
+pub struct JsMessageData {
+ data: ZeroCopyBuf,
+ transferables: Vec<JsTransferable>,
+}
+
+pub fn op_message_port_post_message(
+ state: &mut OpState,
+ rid: ResourceId,
+ data: JsMessageData,
+) -> Result<(), AnyError> {
+ for js_transferable in &data.transferables {
+ match js_transferable {
+ JsTransferable::MessagePort(id) => {
+ if *id == rid {
+ return Err(type_error("Can not transfer self message port"));
+ }
+ }
+ }
+ }
+
+ let resource = state
+ .resource_table
+ .get::<MessagePortResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ resource.port.send(state, data)
+}
+
+pub async fn op_message_port_recv_message(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _: (),
+) -> Result<Option<JsMessageData>, AnyError> {
+ let resource = {
+ let state = state.borrow();
+ match state.resource_table.get::<MessagePortResource>(rid) {
+ Some(resource) => resource,
+ None => return Ok(None),
+ }
+ };
+ let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
+ resource.port.recv(state).or_cancel(cancel).await?
+}