diff options
author | Leo Kettmeir <crowlkats@toaxl.com> | 2024-10-12 08:20:17 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-12 08:20:17 -0700 |
commit | 3df8f1650039e9453056d516744e755d6be8801b (patch) | |
tree | 8963fe708ae5c73c6122bd21d40bd6fcf02de31e /ext/broadcast_channel | |
parent | 9117a9a43cba86d3112f86b10c5ea77baa2c6007 (diff) |
refactor(ext/broadcastchannel): use concrete error type (#26105)
Diffstat (limited to 'ext/broadcast_channel')
-rw-r--r-- | ext/broadcast_channel/Cargo.toml | 1 | ||||
-rw-r--r-- | ext/broadcast_channel/in_memory_broadcast_channel.rs | 13 | ||||
-rw-r--r-- | ext/broadcast_channel/lib.rs | 70 |
3 files changed, 67 insertions, 17 deletions
diff --git a/ext/broadcast_channel/Cargo.toml b/ext/broadcast_channel/Cargo.toml index 3caf7b9ef..b19c4ce15 100644 --- a/ext/broadcast_channel/Cargo.toml +++ b/ext/broadcast_channel/Cargo.toml @@ -16,5 +16,6 @@ path = "lib.rs" [dependencies] async-trait.workspace = true deno_core.workspace = true +thiserror.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/ext/broadcast_channel/in_memory_broadcast_channel.rs b/ext/broadcast_channel/in_memory_broadcast_channel.rs index 00b52a9d6..61dc68e17 100644 --- a/ext/broadcast_channel/in_memory_broadcast_channel.rs +++ b/ext/broadcast_channel/in_memory_broadcast_channel.rs @@ -3,13 +3,13 @@ use std::sync::Arc; use async_trait::async_trait; -use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; use tokio::sync::broadcast; use tokio::sync::mpsc; use uuid::Uuid; use crate::BroadcastChannel; +use crate::BroadcastChannelError; #[derive(Clone)] pub struct InMemoryBroadcastChannel(Arc<Mutex<broadcast::Sender<Message>>>); @@ -41,7 +41,7 @@ impl Default for InMemoryBroadcastChannel { impl BroadcastChannel for InMemoryBroadcastChannel { type Resource = InMemoryBroadcastChannelResource; - fn subscribe(&self) -> Result<Self::Resource, AnyError> { + fn subscribe(&self) -> Result<Self::Resource, BroadcastChannelError> { let (cancel_tx, cancel_rx) = mpsc::unbounded_channel(); let broadcast_rx = self.0.lock().subscribe(); let rx = tokio::sync::Mutex::new((broadcast_rx, cancel_rx)); @@ -53,7 +53,10 @@ impl BroadcastChannel for InMemoryBroadcastChannel { }) } - fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError> { + fn unsubscribe( + &self, + resource: &Self::Resource, + ) -> Result<(), BroadcastChannelError> { Ok(resource.cancel_tx.send(())?) } @@ -62,7 +65,7 @@ impl BroadcastChannel for InMemoryBroadcastChannel { resource: &Self::Resource, name: String, data: Vec<u8>, - ) -> Result<(), AnyError> { + ) -> Result<(), BroadcastChannelError> { let name = Arc::new(name); let data = Arc::new(data); let uuid = resource.uuid; @@ -73,7 +76,7 @@ impl BroadcastChannel for InMemoryBroadcastChannel { async fn recv( &self, resource: &Self::Resource, - ) -> Result<Option<crate::Message>, AnyError> { + ) -> Result<Option<crate::Message>, BroadcastChannelError> { let mut g = resource.rx.lock().await; let (broadcast_rx, cancel_rx) = &mut *g; loop { diff --git a/ext/broadcast_channel/lib.rs b/ext/broadcast_channel/lib.rs index 47c48656d..c1de118a3 100644 --- a/ext/broadcast_channel/lib.rs +++ b/ext/broadcast_channel/lib.rs @@ -10,34 +10,69 @@ use std::path::PathBuf; use std::rc::Rc; use async_trait::async_trait; -use deno_core::error::AnyError; use deno_core::op2; use deno_core::JsBuffer; use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; +use tokio::sync::broadcast::error::SendError as BroadcastSendError; +use tokio::sync::mpsc::error::SendError as MpscSendError; pub const UNSTABLE_FEATURE_NAME: &str = "broadcast-channel"; +#[derive(Debug, thiserror::Error)] +pub enum BroadcastChannelError { + #[error(transparent)] + Resource(deno_core::error::AnyError), + #[error(transparent)] + MPSCSendError(MpscSendError<Box<dyn std::fmt::Debug + Send + Sync>>), + #[error(transparent)] + BroadcastSendError( + BroadcastSendError<Box<dyn std::fmt::Debug + Send + Sync>>, + ), + #[error(transparent)] + Other(deno_core::error::AnyError), +} + +impl<T: std::fmt::Debug + Send + Sync + 'static> From<MpscSendError<T>> + for BroadcastChannelError +{ + fn from(value: MpscSendError<T>) -> Self { + BroadcastChannelError::MPSCSendError(MpscSendError(Box::new(value.0))) + } +} +impl<T: std::fmt::Debug + Send + Sync + 'static> From<BroadcastSendError<T>> + for BroadcastChannelError +{ + fn from(value: BroadcastSendError<T>) -> Self { + BroadcastChannelError::BroadcastSendError(BroadcastSendError(Box::new( + value.0, + ))) + } +} + #[async_trait] pub trait BroadcastChannel: Clone { type Resource: Resource; - fn subscribe(&self) -> Result<Self::Resource, AnyError>; + fn subscribe(&self) -> Result<Self::Resource, BroadcastChannelError>; - fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError>; + fn unsubscribe( + &self, + resource: &Self::Resource, + ) -> Result<(), BroadcastChannelError>; async fn send( &self, resource: &Self::Resource, name: String, data: Vec<u8>, - ) -> Result<(), AnyError>; + ) -> Result<(), BroadcastChannelError>; async fn recv( &self, resource: &Self::Resource, - ) -> Result<Option<Message>, AnyError>; + ) -> Result<Option<Message>, BroadcastChannelError>; } pub type Message = (String, Vec<u8>); @@ -46,7 +81,7 @@ pub type Message = (String, Vec<u8>); #[smi] pub fn op_broadcast_subscribe<BC>( state: &mut OpState, -) -> Result<ResourceId, AnyError> +) -> Result<ResourceId, BroadcastChannelError> where BC: BroadcastChannel + 'static, { @@ -62,11 +97,14 @@ where pub fn op_broadcast_unsubscribe<BC>( state: &mut OpState, #[smi] rid: ResourceId, -) -> Result<(), AnyError> +) -> Result<(), BroadcastChannelError> where BC: BroadcastChannel + 'static, { - let resource = state.resource_table.get::<BC::Resource>(rid)?; + let resource = state + .resource_table + .get::<BC::Resource>(rid) + .map_err(BroadcastChannelError::Resource)?; let bc = state.borrow::<BC>(); bc.unsubscribe(&resource) } @@ -77,11 +115,15 @@ pub async fn op_broadcast_send<BC>( #[smi] rid: ResourceId, #[string] name: String, #[buffer] buf: JsBuffer, -) -> Result<(), AnyError> +) -> Result<(), BroadcastChannelError> where BC: BroadcastChannel + 'static, { - let resource = state.borrow().resource_table.get::<BC::Resource>(rid)?; + let resource = state + .borrow() + .resource_table + .get::<BC::Resource>(rid) + .map_err(BroadcastChannelError::Resource)?; let bc = state.borrow().borrow::<BC>().clone(); bc.send(&resource, name, buf.to_vec()).await } @@ -91,11 +133,15 @@ where pub async fn op_broadcast_recv<BC>( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, -) -> Result<Option<Message>, AnyError> +) -> Result<Option<Message>, BroadcastChannelError> where BC: BroadcastChannel + 'static, { - let resource = state.borrow().resource_table.get::<BC::Resource>(rid)?; + let resource = state + .borrow() + .resource_table + .get::<BC::Resource>(rid) + .map_err(BroadcastChannelError::Resource)?; let bc = state.borrow().borrow::<BC>().clone(); bc.recv(&resource).await } |