summaryrefslogtreecommitdiff
path: root/ext/broadcast_channel/in_memory_broadcast_channel.rs
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2021-08-11 12:27:05 +0200
committerGitHub <noreply@github.com>2021-08-11 12:27:05 +0200
commita0285e2eb88f6254f6494b0ecd1878db3a3b2a58 (patch)
tree90671b004537e20f9493fd3277ffd21d30b39a0e /ext/broadcast_channel/in_memory_broadcast_channel.rs
parent3a6994115176781b3a93d70794b1b81bc95e42b4 (diff)
Rename extensions/ directory to ext/ (#11643)
Diffstat (limited to 'ext/broadcast_channel/in_memory_broadcast_channel.rs')
-rw-r--r--ext/broadcast_channel/in_memory_broadcast_channel.rs97
1 files changed, 97 insertions, 0 deletions
diff --git a/ext/broadcast_channel/in_memory_broadcast_channel.rs b/ext/broadcast_channel/in_memory_broadcast_channel.rs
new file mode 100644
index 000000000..879a3dbd5
--- /dev/null
+++ b/ext/broadcast_channel/in_memory_broadcast_channel.rs
@@ -0,0 +1,97 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::BroadcastChannel;
+use async_trait::async_trait;
+use deno_core::error::AnyError;
+use deno_core::parking_lot::Mutex;
+use std::sync::Arc;
+use tokio::sync::broadcast;
+use tokio::sync::mpsc;
+use uuid::Uuid;
+
+#[derive(Clone)]
+pub struct InMemoryBroadcastChannel(Arc<Mutex<broadcast::Sender<Message>>>);
+
+pub struct InMemoryBroadcastChannelResource {
+ rx: tokio::sync::Mutex<(
+ broadcast::Receiver<Message>,
+ mpsc::UnboundedReceiver<()>,
+ )>,
+ cancel_tx: mpsc::UnboundedSender<()>,
+ uuid: Uuid,
+}
+
+#[derive(Clone, Debug)]
+struct Message {
+ name: Arc<String>,
+ data: Arc<Vec<u8>>,
+ uuid: Uuid,
+}
+
+impl Default for InMemoryBroadcastChannel {
+ fn default() -> Self {
+ let (tx, _) = broadcast::channel(256);
+ Self(Arc::new(Mutex::new(tx)))
+ }
+}
+
+#[async_trait]
+impl BroadcastChannel for InMemoryBroadcastChannel {
+ type Resource = InMemoryBroadcastChannelResource;
+
+ fn subscribe(&self) -> Result<Self::Resource, AnyError> {
+ let (cancel_tx, cancel_rx) = mpsc::unbounded_channel();
+ let broadcast_rx = self.0.lock().subscribe();
+ let rx = tokio::sync::Mutex::new((broadcast_rx, cancel_rx));
+ let uuid = Uuid::new_v4();
+ Ok(Self::Resource {
+ rx,
+ cancel_tx,
+ uuid,
+ })
+ }
+
+ fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError> {
+ Ok(resource.cancel_tx.send(())?)
+ }
+
+ async fn send(
+ &self,
+ resource: &Self::Resource,
+ name: String,
+ data: Vec<u8>,
+ ) -> Result<(), AnyError> {
+ let name = Arc::new(name);
+ let data = Arc::new(data);
+ let uuid = resource.uuid;
+ self.0.lock().send(Message { name, data, uuid })?;
+ Ok(())
+ }
+
+ async fn recv(
+ &self,
+ resource: &Self::Resource,
+ ) -> Result<Option<crate::Message>, AnyError> {
+ let mut g = resource.rx.lock().await;
+ let (broadcast_rx, cancel_rx) = &mut *g;
+ loop {
+ let result = tokio::select! {
+ r = broadcast_rx.recv() => r,
+ _ = cancel_rx.recv() => return Ok(None),
+ };
+ use tokio::sync::broadcast::error::RecvError::*;
+ match result {
+ Err(Closed) => return Ok(None),
+ Err(Lagged(_)) => (), // Backlogged, messages dropped.
+ Ok(message) if message.uuid == resource.uuid => (), // Self-send.
+ Ok(message) => {
+ let name = String::clone(&message.name);
+ let data = Vec::clone(&message.data);
+ return Ok(Some((name, data)));
+ }
+ }
+ }
+ }
+}
+
+impl deno_core::Resource for InMemoryBroadcastChannelResource {}