From eca83fc9b45ab1e5a73bd7b13b05ee42ab1a4dcc Mon Sep 17 00:00:00 2001 From: Leo Kettmeir Date: Thu, 17 Oct 2024 12:05:38 -0700 Subject: refactor(ext/web): use concrete error types (#26185) --- ext/web/stream_resource.rs | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) (limited to 'ext/web/stream_resource.rs') diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 78487883b..c44a385ea 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -1,7 +1,5 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use bytes::BytesMut; -use deno_core::error::type_error; -use deno_core::error::AnyError; use deno_core::external; use deno_core::op2; use deno_core::serde_v8::V8Slice; @@ -18,6 +16,7 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use futures::future::poll_fn; +use futures::TryFutureExt; use std::borrow::Cow; use std::cell::RefCell; use std::cell::RefMut; @@ -31,6 +30,14 @@ use std::task::Context; use std::task::Poll; use std::task::Waker; +#[derive(Debug, thiserror::Error)] +pub enum StreamResourceError { + #[error(transparent)] + Canceled(#[from] deno_core::Canceled), + #[error("{0}")] + Js(String), +} + // How many buffers we'll allow in the channel before we stop allowing writes. const BUFFER_CHANNEL_SIZE: u16 = 1024; @@ -48,7 +55,7 @@ struct BoundedBufferChannelInner { buffers: [MaybeUninit>; BUFFER_CHANNEL_SIZE as _], ring_producer: u16, ring_consumer: u16, - error: Option, + error: Option, current_size: usize, // TODO(mmastrac): we can math this field instead of accounting for it len: usize, @@ -141,7 +148,10 @@ impl BoundedBufferChannelInner { self.len = 0; } - pub fn read(&mut self, limit: usize) -> Result, AnyError> { + pub fn read( + &mut self, + limit: usize, + ) -> Result, StreamResourceError> { // Empty buffers will return the error, if one exists, or None if self.len == 0 { if let Some(error) = self.error.take() { @@ -230,7 +240,7 @@ impl BoundedBufferChannelInner { Ok(()) } - pub fn write_error(&mut self, error: AnyError) { + pub fn write_error(&mut self, error: StreamResourceError) { self.error = Some(error); if let Some(waker) = self.read_waker.take() { waker.wake(); @@ -306,7 +316,10 @@ impl BoundedBufferChannel { self.inner.borrow_mut() } - pub fn read(&self, limit: usize) -> Result, AnyError> { + pub fn read( + &self, + limit: usize, + ) -> Result, StreamResourceError> { self.inner().read(limit) } @@ -314,7 +327,7 @@ impl BoundedBufferChannel { self.inner().write(buffer) } - pub fn write_error(&self, error: AnyError) { + pub fn write_error(&self, error: StreamResourceError) { self.inner().write_error(error) } @@ -358,7 +371,10 @@ impl ReadableStreamResource { RcRef::map(self, |s| &s.cancel_handle).clone() } - async fn read(self: Rc, limit: usize) -> Result { + async fn read( + self: Rc, + limit: usize, + ) -> Result { let cancel_handle = self.cancel_handle(); // Serialize all the reads using a task queue. let _read_permit = self.read_queue.acquire().await; @@ -387,7 +403,7 @@ impl Resource for ReadableStreamResource { } fn read(self: Rc, limit: usize) -> AsyncResult { - Box::pin(ReadableStreamResource::read(self, limit)) + Box::pin(ReadableStreamResource::read(self, limit).map_err(|e| e.into())) } fn close(self: Rc) { @@ -550,7 +566,7 @@ pub fn op_readable_stream_resource_write_error( ) -> bool { let sender = get_sender(sender); // We can always write an error, no polling required - sender.write_error(type_error(Cow::Owned(error))); + sender.write_error(StreamResourceError::Js(error)); !sender.closed() } -- cgit v1.2.3