From 3df8f1650039e9453056d516744e755d6be8801b Mon Sep 17 00:00:00 2001 From: Leo Kettmeir Date: Sat, 12 Oct 2024 08:20:17 -0700 Subject: refactor(ext/broadcastchannel): use concrete error type (#26105) --- ext/broadcast_channel/Cargo.toml | 1 + .../in_memory_broadcast_channel.rs | 13 ++-- ext/broadcast_channel/lib.rs | 70 ++++++++++++++++++---- 3 files changed, 67 insertions(+), 17 deletions(-) (limited to 'ext/broadcast_channel') diff --git a/ext/broadcast_channel/Cargo.toml b/ext/broadcast_channel/Cargo.toml index 3caf7b9ef..b19c4ce15 100644 --- a/ext/broadcast_channel/Cargo.toml +++ b/ext/broadcast_channel/Cargo.toml @@ -16,5 +16,6 @@ path = "lib.rs" [dependencies] async-trait.workspace = true deno_core.workspace = true +thiserror.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/ext/broadcast_channel/in_memory_broadcast_channel.rs b/ext/broadcast_channel/in_memory_broadcast_channel.rs index 00b52a9d6..61dc68e17 100644 --- a/ext/broadcast_channel/in_memory_broadcast_channel.rs +++ b/ext/broadcast_channel/in_memory_broadcast_channel.rs @@ -3,13 +3,13 @@ use std::sync::Arc; use async_trait::async_trait; -use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; use tokio::sync::broadcast; use tokio::sync::mpsc; use uuid::Uuid; use crate::BroadcastChannel; +use crate::BroadcastChannelError; #[derive(Clone)] pub struct InMemoryBroadcastChannel(Arc>>); @@ -41,7 +41,7 @@ impl Default for InMemoryBroadcastChannel { impl BroadcastChannel for InMemoryBroadcastChannel { type Resource = InMemoryBroadcastChannelResource; - fn subscribe(&self) -> Result { + fn subscribe(&self) -> Result { let (cancel_tx, cancel_rx) = mpsc::unbounded_channel(); let broadcast_rx = self.0.lock().subscribe(); let rx = tokio::sync::Mutex::new((broadcast_rx, cancel_rx)); @@ -53,7 +53,10 @@ impl BroadcastChannel for InMemoryBroadcastChannel { }) } - fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError> { + fn unsubscribe( + &self, + resource: &Self::Resource, + ) -> Result<(), BroadcastChannelError> { Ok(resource.cancel_tx.send(())?) } @@ -62,7 +65,7 @@ impl BroadcastChannel for InMemoryBroadcastChannel { resource: &Self::Resource, name: String, data: Vec, - ) -> Result<(), AnyError> { + ) -> Result<(), BroadcastChannelError> { let name = Arc::new(name); let data = Arc::new(data); let uuid = resource.uuid; @@ -73,7 +76,7 @@ impl BroadcastChannel for InMemoryBroadcastChannel { async fn recv( &self, resource: &Self::Resource, - ) -> Result, AnyError> { + ) -> Result, BroadcastChannelError> { let mut g = resource.rx.lock().await; let (broadcast_rx, cancel_rx) = &mut *g; loop { diff --git a/ext/broadcast_channel/lib.rs b/ext/broadcast_channel/lib.rs index 47c48656d..c1de118a3 100644 --- a/ext/broadcast_channel/lib.rs +++ b/ext/broadcast_channel/lib.rs @@ -10,34 +10,69 @@ use std::path::PathBuf; use std::rc::Rc; use async_trait::async_trait; -use deno_core::error::AnyError; use deno_core::op2; use deno_core::JsBuffer; use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; +use tokio::sync::broadcast::error::SendError as BroadcastSendError; +use tokio::sync::mpsc::error::SendError as MpscSendError; pub const UNSTABLE_FEATURE_NAME: &str = "broadcast-channel"; +#[derive(Debug, thiserror::Error)] +pub enum BroadcastChannelError { + #[error(transparent)] + Resource(deno_core::error::AnyError), + #[error(transparent)] + MPSCSendError(MpscSendError>), + #[error(transparent)] + BroadcastSendError( + BroadcastSendError>, + ), + #[error(transparent)] + Other(deno_core::error::AnyError), +} + +impl From> + for BroadcastChannelError +{ + fn from(value: MpscSendError) -> Self { + BroadcastChannelError::MPSCSendError(MpscSendError(Box::new(value.0))) + } +} +impl From> + for BroadcastChannelError +{ + fn from(value: BroadcastSendError) -> Self { + BroadcastChannelError::BroadcastSendError(BroadcastSendError(Box::new( + value.0, + ))) + } +} + #[async_trait] pub trait BroadcastChannel: Clone { type Resource: Resource; - fn subscribe(&self) -> Result; + fn subscribe(&self) -> Result; - fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError>; + fn unsubscribe( + &self, + resource: &Self::Resource, + ) -> Result<(), BroadcastChannelError>; async fn send( &self, resource: &Self::Resource, name: String, data: Vec, - ) -> Result<(), AnyError>; + ) -> Result<(), BroadcastChannelError>; async fn recv( &self, resource: &Self::Resource, - ) -> Result, AnyError>; + ) -> Result, BroadcastChannelError>; } pub type Message = (String, Vec); @@ -46,7 +81,7 @@ pub type Message = (String, Vec); #[smi] pub fn op_broadcast_subscribe( state: &mut OpState, -) -> Result +) -> Result where BC: BroadcastChannel + 'static, { @@ -62,11 +97,14 @@ where pub fn op_broadcast_unsubscribe( state: &mut OpState, #[smi] rid: ResourceId, -) -> Result<(), AnyError> +) -> Result<(), BroadcastChannelError> where BC: BroadcastChannel + 'static, { - let resource = state.resource_table.get::(rid)?; + let resource = state + .resource_table + .get::(rid) + .map_err(BroadcastChannelError::Resource)?; let bc = state.borrow::(); bc.unsubscribe(&resource) } @@ -77,11 +115,15 @@ pub async fn op_broadcast_send( #[smi] rid: ResourceId, #[string] name: String, #[buffer] buf: JsBuffer, -) -> Result<(), AnyError> +) -> Result<(), BroadcastChannelError> where BC: BroadcastChannel + 'static, { - let resource = state.borrow().resource_table.get::(rid)?; + let resource = state + .borrow() + .resource_table + .get::(rid) + .map_err(BroadcastChannelError::Resource)?; let bc = state.borrow().borrow::().clone(); bc.send(&resource, name, buf.to_vec()).await } @@ -91,11 +133,15 @@ where pub async fn op_broadcast_recv( state: Rc>, #[smi] rid: ResourceId, -) -> Result, AnyError> +) -> Result, BroadcastChannelError> where BC: BroadcastChannel + 'static, { - let resource = state.borrow().resource_table.get::(rid)?; + let resource = state + .borrow() + .resource_table + .get::(rid) + .map_err(BroadcastChannelError::Resource)?; let bc = state.borrow().borrow::().clone(); bc.recv(&resource).await } -- cgit v1.2.3