diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2021-08-11 12:27:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-11 12:27:05 +0200 |
commit | a0285e2eb88f6254f6494b0ecd1878db3a3b2a58 (patch) | |
tree | 90671b004537e20f9493fd3277ffd21d30b39a0e /ext/web/message_port.rs | |
parent | 3a6994115176781b3a93d70794b1b81bc95e42b4 (diff) |
Rename extensions/ directory to ext/ (#11643)
Diffstat (limited to 'ext/web/message_port.rs')
-rw-r--r-- | ext/web/message_port.rs | 217 |
1 files changed, 217 insertions, 0 deletions
diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs new file mode 100644 index 000000000..f73d0486a --- /dev/null +++ b/ext/web/message_port.rs @@ -0,0 +1,217 @@ +use std::borrow::Cow; +use std::cell::RefCell; +use std::rc::Rc; + +use deno_core::error::bad_resource_id; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::ZeroCopyBuf; +use deno_core::{CancelFuture, Resource}; +use deno_core::{CancelHandle, OpState}; +use deno_core::{RcRef, ResourceId}; +use serde::Deserialize; +use serde::Serialize; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::UnboundedSender; + +enum Transferable { + MessagePort(MessagePort), +} + +type MessagePortMessage = (Vec<u8>, Vec<Transferable>); + +pub struct MessagePort { + rx: RefCell<UnboundedReceiver<MessagePortMessage>>, + tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>, +} + +impl MessagePort { + pub fn send( + &self, + state: &mut OpState, + data: JsMessageData, + ) -> Result<(), AnyError> { + let transferables = + deserialize_js_transferables(state, data.transferables)?; + + // Swallow the failed to send error. It means the channel was disentangled, + // but not cleaned up. + if let Some(tx) = &*self.tx.borrow() { + tx.send((data.data.to_vec(), transferables)).ok(); + } + + Ok(()) + } + + pub async fn recv( + &self, + state: Rc<RefCell<OpState>>, + ) -> Result<Option<JsMessageData>, AnyError> { + let mut rx = self + .rx + .try_borrow_mut() + .map_err(|_| type_error("Port receiver is already borrowed"))?; + if let Some((data, transferables)) = rx.recv().await { + let js_transferables = + serialize_transferables(&mut state.borrow_mut(), transferables); + return Ok(Some(JsMessageData { + data: ZeroCopyBuf::from(data), + transferables: js_transferables, + })); + } + Ok(None) + } + + /// This forcefully disconnects the message port from its paired port. This + /// will wake up the `.recv` on the paired port, which will return `Ok(None)`. + pub fn disentangle(&self) { + let mut tx = self.tx.borrow_mut(); + tx.take(); + } +} + +pub fn create_entangled_message_port() -> (MessagePort, MessagePort) { + let (port1_tx, port2_rx) = unbounded_channel::<MessagePortMessage>(); + let (port2_tx, port1_rx) = unbounded_channel::<MessagePortMessage>(); + + let port1 = MessagePort { + rx: RefCell::new(port1_rx), + tx: RefCell::new(Some(port1_tx)), + }; + + let port2 = MessagePort { + rx: RefCell::new(port2_rx), + tx: RefCell::new(Some(port2_tx)), + }; + + (port1, port2) +} + +pub struct MessagePortResource { + port: MessagePort, + cancel: CancelHandle, +} + +impl Resource for MessagePortResource { + fn name(&self) -> Cow<str> { + "messagePort".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } +} + +pub fn op_message_port_create_entangled( + state: &mut OpState, + _: (), + _: (), +) -> Result<(ResourceId, ResourceId), AnyError> { + let (port1, port2) = create_entangled_message_port(); + + let port1_id = state.resource_table.add(MessagePortResource { + port: port1, + cancel: CancelHandle::new(), + }); + + let port2_id = state.resource_table.add(MessagePortResource { + port: port2, + cancel: CancelHandle::new(), + }); + + Ok((port1_id, port2_id)) +} + +#[derive(Deserialize, Serialize)] +#[serde(tag = "kind", content = "data", rename_all = "camelCase")] +pub enum JsTransferable { + #[serde(rename_all = "camelCase")] + MessagePort(ResourceId), +} + +fn deserialize_js_transferables( + state: &mut OpState, + js_transferables: Vec<JsTransferable>, +) -> Result<Vec<Transferable>, AnyError> { + let mut transferables = Vec::with_capacity(js_transferables.len()); + for js_transferable in js_transferables { + match js_transferable { + JsTransferable::MessagePort(id) => { + let resource = state + .resource_table + .take::<MessagePortResource>(id) + .ok_or_else(|| type_error("Invalid message port transfer"))?; + resource.cancel.cancel(); + let resource = Rc::try_unwrap(resource) + .map_err(|_| type_error("Message port is not ready for transfer"))?; + transferables.push(Transferable::MessagePort(resource.port)); + } + } + } + Ok(transferables) +} + +fn serialize_transferables( + state: &mut OpState, + transferables: Vec<Transferable>, +) -> Vec<JsTransferable> { + let mut js_transferables = Vec::with_capacity(transferables.len()); + for transferable in transferables { + match transferable { + Transferable::MessagePort(port) => { + let rid = state.resource_table.add(MessagePortResource { + port, + cancel: CancelHandle::new(), + }); + js_transferables.push(JsTransferable::MessagePort(rid)); + } + } + } + js_transferables +} + +#[derive(Deserialize, Serialize)] +pub struct JsMessageData { + data: ZeroCopyBuf, + transferables: Vec<JsTransferable>, +} + +pub fn op_message_port_post_message( + state: &mut OpState, + rid: ResourceId, + data: JsMessageData, +) -> Result<(), AnyError> { + for js_transferable in &data.transferables { + match js_transferable { + JsTransferable::MessagePort(id) => { + if *id == rid { + return Err(type_error("Can not transfer self message port")); + } + } + } + } + + let resource = state + .resource_table + .get::<MessagePortResource>(rid) + .ok_or_else(bad_resource_id)?; + + resource.port.send(state, data) +} + +pub async fn op_message_port_recv_message( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<Option<JsMessageData>, AnyError> { + let resource = { + let state = state.borrow(); + match state.resource_table.get::<MessagePortResource>(rid) { + Some(resource) => resource, + None => return Ok(None), + } + }; + let cancel = RcRef::map(resource.clone(), |r| &r.cancel); + resource.port.recv(state).or_cancel(cancel).await? +} |