summaryrefslogtreecommitdiff
path: root/extensions/web/message_port.rs
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2021-06-22 16:30:16 +0200
committerGitHub <noreply@github.com>2021-06-22 16:30:16 +0200
commit6261c89e04b8f1a3aabc771dbc8cddad904710e9 (patch)
tree6b1814991d77b8f238e507aa2f52e93589f3c0a1 /extensions/web/message_port.rs
parent0a2ced57285aa0ee4b47426382c32fb53c4e07cd (diff)
feat: transfer MessagePort between workers (#11076)
Add support for transferring `MessagePort`s between workers.
Diffstat (limited to 'extensions/web/message_port.rs')
-rw-r--r--extensions/web/message_port.rs19
1 files changed, 14 insertions, 5 deletions
diff --git a/extensions/web/message_port.rs b/extensions/web/message_port.rs
index d10b455d5..f73d0486a 100644
--- a/extensions/web/message_port.rs
+++ b/extensions/web/message_port.rs
@@ -23,7 +23,7 @@ type MessagePortMessage = (Vec<u8>, Vec<Transferable>);
pub struct MessagePort {
rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
- tx: UnboundedSender<MessagePortMessage>,
+ tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
}
impl MessagePort {
@@ -37,7 +37,9 @@ impl MessagePort {
// Swallow the failed to send error. It means the channel was disentangled,
// but not cleaned up.
- self.tx.send((data.data.to_vec(), transferables)).ok();
+ if let Some(tx) = &*self.tx.borrow() {
+ tx.send((data.data.to_vec(), transferables)).ok();
+ }
Ok(())
}
@@ -60,6 +62,13 @@ impl MessagePort {
}
Ok(None)
}
+
+ /// This forcefully disconnects the message port from its paired port. This
+ /// will wake up the `.recv` on the paired port, which will return `Ok(None)`.
+ pub fn disentangle(&self) {
+ let mut tx = self.tx.borrow_mut();
+ tx.take();
+ }
}
pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
@@ -68,12 +77,12 @@ pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
let port1 = MessagePort {
rx: RefCell::new(port1_rx),
- tx: port1_tx,
+ tx: RefCell::new(Some(port1_tx)),
};
let port2 = MessagePort {
rx: RefCell::new(port2_rx),
- tx: port2_tx,
+ tx: RefCell::new(Some(port2_tx)),
};
(port1, port2)
@@ -204,5 +213,5 @@ pub async fn op_message_port_recv_message(
}
};
let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
- resource.port.recv(state.clone()).or_cancel(cancel).await?
+ resource.port.recv(state).or_cancel(cancel).await?
}