diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2021-08-11 12:27:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-11 12:27:05 +0200 |
commit | a0285e2eb88f6254f6494b0ecd1878db3a3b2a58 (patch) | |
tree | 90671b004537e20f9493fd3277ffd21d30b39a0e /ext/broadcast_channel/in_memory_broadcast_channel.rs | |
parent | 3a6994115176781b3a93d70794b1b81bc95e42b4 (diff) |
Rename extensions/ directory to ext/ (#11643)
Diffstat (limited to 'ext/broadcast_channel/in_memory_broadcast_channel.rs')
-rw-r--r-- | ext/broadcast_channel/in_memory_broadcast_channel.rs | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/ext/broadcast_channel/in_memory_broadcast_channel.rs b/ext/broadcast_channel/in_memory_broadcast_channel.rs new file mode 100644 index 000000000..879a3dbd5 --- /dev/null +++ b/ext/broadcast_channel/in_memory_broadcast_channel.rs @@ -0,0 +1,97 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::BroadcastChannel; +use async_trait::async_trait; +use deno_core::error::AnyError; +use deno_core::parking_lot::Mutex; +use std::sync::Arc; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use uuid::Uuid; + +#[derive(Clone)] +pub struct InMemoryBroadcastChannel(Arc<Mutex<broadcast::Sender<Message>>>); + +pub struct InMemoryBroadcastChannelResource { + rx: tokio::sync::Mutex<( + broadcast::Receiver<Message>, + mpsc::UnboundedReceiver<()>, + )>, + cancel_tx: mpsc::UnboundedSender<()>, + uuid: Uuid, +} + +#[derive(Clone, Debug)] +struct Message { + name: Arc<String>, + data: Arc<Vec<u8>>, + uuid: Uuid, +} + +impl Default for InMemoryBroadcastChannel { + fn default() -> Self { + let (tx, _) = broadcast::channel(256); + Self(Arc::new(Mutex::new(tx))) + } +} + +#[async_trait] +impl BroadcastChannel for InMemoryBroadcastChannel { + type Resource = InMemoryBroadcastChannelResource; + + fn subscribe(&self) -> Result<Self::Resource, AnyError> { + let (cancel_tx, cancel_rx) = mpsc::unbounded_channel(); + let broadcast_rx = self.0.lock().subscribe(); + let rx = tokio::sync::Mutex::new((broadcast_rx, cancel_rx)); + let uuid = Uuid::new_v4(); + Ok(Self::Resource { + rx, + cancel_tx, + uuid, + }) + } + + fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError> { + Ok(resource.cancel_tx.send(())?) + } + + async fn send( + &self, + resource: &Self::Resource, + name: String, + data: Vec<u8>, + ) -> Result<(), AnyError> { + let name = Arc::new(name); + let data = Arc::new(data); + let uuid = resource.uuid; + self.0.lock().send(Message { name, data, uuid })?; + Ok(()) + } + + async fn recv( + &self, + resource: &Self::Resource, + ) -> Result<Option<crate::Message>, AnyError> { + let mut g = resource.rx.lock().await; + let (broadcast_rx, cancel_rx) = &mut *g; + loop { + let result = tokio::select! { + r = broadcast_rx.recv() => r, + _ = cancel_rx.recv() => return Ok(None), + }; + use tokio::sync::broadcast::error::RecvError::*; + match result { + Err(Closed) => return Ok(None), + Err(Lagged(_)) => (), // Backlogged, messages dropped. + Ok(message) if message.uuid == resource.uuid => (), // Self-send. + Ok(message) => { + let name = String::clone(&message.name); + let data = Vec::clone(&message.data); + return Ok(Some((name, data))); + } + } + } + } +} + +impl deno_core::Resource for InMemoryBroadcastChannelResource {} |