summaryrefslogtreecommitdiff
path: root/ext/broadcast_channel
diff options
context:
space:
mode:
authorLeo Kettmeir <crowlkats@toaxl.com>2024-10-12 08:20:17 -0700
committerGitHub <noreply@github.com>2024-10-12 08:20:17 -0700
commit3df8f1650039e9453056d516744e755d6be8801b (patch)
tree8963fe708ae5c73c6122bd21d40bd6fcf02de31e /ext/broadcast_channel
parent9117a9a43cba86d3112f86b10c5ea77baa2c6007 (diff)
refactor(ext/broadcastchannel): use concrete error type (#26105)
Diffstat (limited to 'ext/broadcast_channel')
-rw-r--r--ext/broadcast_channel/Cargo.toml1
-rw-r--r--ext/broadcast_channel/in_memory_broadcast_channel.rs13
-rw-r--r--ext/broadcast_channel/lib.rs70
3 files changed, 67 insertions, 17 deletions
diff --git a/ext/broadcast_channel/Cargo.toml b/ext/broadcast_channel/Cargo.toml
index 3caf7b9ef..b19c4ce15 100644
--- a/ext/broadcast_channel/Cargo.toml
+++ b/ext/broadcast_channel/Cargo.toml
@@ -16,5 +16,6 @@ path = "lib.rs"
[dependencies]
async-trait.workspace = true
deno_core.workspace = true
+thiserror.workspace = true
tokio.workspace = true
uuid.workspace = true
diff --git a/ext/broadcast_channel/in_memory_broadcast_channel.rs b/ext/broadcast_channel/in_memory_broadcast_channel.rs
index 00b52a9d6..61dc68e17 100644
--- a/ext/broadcast_channel/in_memory_broadcast_channel.rs
+++ b/ext/broadcast_channel/in_memory_broadcast_channel.rs
@@ -3,13 +3,13 @@
use std::sync::Arc;
use async_trait::async_trait;
-use deno_core::error::AnyError;
use deno_core::parking_lot::Mutex;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::BroadcastChannel;
+use crate::BroadcastChannelError;
#[derive(Clone)]
pub struct InMemoryBroadcastChannel(Arc<Mutex<broadcast::Sender<Message>>>);
@@ -41,7 +41,7 @@ impl Default for InMemoryBroadcastChannel {
impl BroadcastChannel for InMemoryBroadcastChannel {
type Resource = InMemoryBroadcastChannelResource;
- fn subscribe(&self) -> Result<Self::Resource, AnyError> {
+ fn subscribe(&self) -> Result<Self::Resource, BroadcastChannelError> {
let (cancel_tx, cancel_rx) = mpsc::unbounded_channel();
let broadcast_rx = self.0.lock().subscribe();
let rx = tokio::sync::Mutex::new((broadcast_rx, cancel_rx));
@@ -53,7 +53,10 @@ impl BroadcastChannel for InMemoryBroadcastChannel {
})
}
- fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError> {
+ fn unsubscribe(
+ &self,
+ resource: &Self::Resource,
+ ) -> Result<(), BroadcastChannelError> {
Ok(resource.cancel_tx.send(())?)
}
@@ -62,7 +65,7 @@ impl BroadcastChannel for InMemoryBroadcastChannel {
resource: &Self::Resource,
name: String,
data: Vec<u8>,
- ) -> Result<(), AnyError> {
+ ) -> Result<(), BroadcastChannelError> {
let name = Arc::new(name);
let data = Arc::new(data);
let uuid = resource.uuid;
@@ -73,7 +76,7 @@ impl BroadcastChannel for InMemoryBroadcastChannel {
async fn recv(
&self,
resource: &Self::Resource,
- ) -> Result<Option<crate::Message>, AnyError> {
+ ) -> Result<Option<crate::Message>, BroadcastChannelError> {
let mut g = resource.rx.lock().await;
let (broadcast_rx, cancel_rx) = &mut *g;
loop {
diff --git a/ext/broadcast_channel/lib.rs b/ext/broadcast_channel/lib.rs
index 47c48656d..c1de118a3 100644
--- a/ext/broadcast_channel/lib.rs
+++ b/ext/broadcast_channel/lib.rs
@@ -10,34 +10,69 @@ use std::path::PathBuf;
use std::rc::Rc;
use async_trait::async_trait;
-use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
+use tokio::sync::broadcast::error::SendError as BroadcastSendError;
+use tokio::sync::mpsc::error::SendError as MpscSendError;
pub const UNSTABLE_FEATURE_NAME: &str = "broadcast-channel";
+#[derive(Debug, thiserror::Error)]
+pub enum BroadcastChannelError {
+ #[error(transparent)]
+ Resource(deno_core::error::AnyError),
+ #[error(transparent)]
+ MPSCSendError(MpscSendError<Box<dyn std::fmt::Debug + Send + Sync>>),
+ #[error(transparent)]
+ BroadcastSendError(
+ BroadcastSendError<Box<dyn std::fmt::Debug + Send + Sync>>,
+ ),
+ #[error(transparent)]
+ Other(deno_core::error::AnyError),
+}
+
+impl<T: std::fmt::Debug + Send + Sync + 'static> From<MpscSendError<T>>
+ for BroadcastChannelError
+{
+ fn from(value: MpscSendError<T>) -> Self {
+ BroadcastChannelError::MPSCSendError(MpscSendError(Box::new(value.0)))
+ }
+}
+impl<T: std::fmt::Debug + Send + Sync + 'static> From<BroadcastSendError<T>>
+ for BroadcastChannelError
+{
+ fn from(value: BroadcastSendError<T>) -> Self {
+ BroadcastChannelError::BroadcastSendError(BroadcastSendError(Box::new(
+ value.0,
+ )))
+ }
+}
+
#[async_trait]
pub trait BroadcastChannel: Clone {
type Resource: Resource;
- fn subscribe(&self) -> Result<Self::Resource, AnyError>;
+ fn subscribe(&self) -> Result<Self::Resource, BroadcastChannelError>;
- fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError>;
+ fn unsubscribe(
+ &self,
+ resource: &Self::Resource,
+ ) -> Result<(), BroadcastChannelError>;
async fn send(
&self,
resource: &Self::Resource,
name: String,
data: Vec<u8>,
- ) -> Result<(), AnyError>;
+ ) -> Result<(), BroadcastChannelError>;
async fn recv(
&self,
resource: &Self::Resource,
- ) -> Result<Option<Message>, AnyError>;
+ ) -> Result<Option<Message>, BroadcastChannelError>;
}
pub type Message = (String, Vec<u8>);
@@ -46,7 +81,7 @@ pub type Message = (String, Vec<u8>);
#[smi]
pub fn op_broadcast_subscribe<BC>(
state: &mut OpState,
-) -> Result<ResourceId, AnyError>
+) -> Result<ResourceId, BroadcastChannelError>
where
BC: BroadcastChannel + 'static,
{
@@ -62,11 +97,14 @@ where
pub fn op_broadcast_unsubscribe<BC>(
state: &mut OpState,
#[smi] rid: ResourceId,
-) -> Result<(), AnyError>
+) -> Result<(), BroadcastChannelError>
where
BC: BroadcastChannel + 'static,
{
- let resource = state.resource_table.get::<BC::Resource>(rid)?;
+ let resource = state
+ .resource_table
+ .get::<BC::Resource>(rid)
+ .map_err(BroadcastChannelError::Resource)?;
let bc = state.borrow::<BC>();
bc.unsubscribe(&resource)
}
@@ -77,11 +115,15 @@ pub async fn op_broadcast_send<BC>(
#[smi] rid: ResourceId,
#[string] name: String,
#[buffer] buf: JsBuffer,
-) -> Result<(), AnyError>
+) -> Result<(), BroadcastChannelError>
where
BC: BroadcastChannel + 'static,
{
- let resource = state.borrow().resource_table.get::<BC::Resource>(rid)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<BC::Resource>(rid)
+ .map_err(BroadcastChannelError::Resource)?;
let bc = state.borrow().borrow::<BC>().clone();
bc.send(&resource, name, buf.to_vec()).await
}
@@ -91,11 +133,15 @@ where
pub async fn op_broadcast_recv<BC>(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<Option<Message>, AnyError>
+) -> Result<Option<Message>, BroadcastChannelError>
where
BC: BroadcastChannel + 'static,
{
- let resource = state.borrow().resource_table.get::<BC::Resource>(rid)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<BC::Resource>(rid)
+ .map_err(BroadcastChannelError::Resource)?;
let bc = state.borrow().borrow::<BC>().clone();
bc.recv(&resource).await
}