diff options
author | haturau <135221985+haturatu@users.noreply.github.com> | 2024-11-20 01:20:47 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-20 01:20:47 +0900 |
commit | 85719a67e59c7aa45bead26e4942d7df8b1b42d4 (patch) | |
tree | face0aecaac53e93ce2f23b53c48859bcf1a36ec /ext/web/stream_resource.rs | |
parent | 67697bc2e4a62a9670699fd18ad0dd8efc5bd955 (diff) | |
parent | 186b52731c6bb326c4d32905c5e732d082e83465 (diff) |
Merge branch 'denoland:main' into main
Diffstat (limited to 'ext/web/stream_resource.rs')
-rw-r--r-- | ext/web/stream_resource.rs | 36 |
1 files changed, 26 insertions, 10 deletions
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 78487883b..c44a385ea 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -1,7 +1,5 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use bytes::BytesMut; -use deno_core::error::type_error; -use deno_core::error::AnyError; use deno_core::external; use deno_core::op2; use deno_core::serde_v8::V8Slice; @@ -18,6 +16,7 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use futures::future::poll_fn; +use futures::TryFutureExt; use std::borrow::Cow; use std::cell::RefCell; use std::cell::RefMut; @@ -31,6 +30,14 @@ use std::task::Context; use std::task::Poll; use std::task::Waker; +#[derive(Debug, thiserror::Error)] +pub enum StreamResourceError { + #[error(transparent)] + Canceled(#[from] deno_core::Canceled), + #[error("{0}")] + Js(String), +} + // How many buffers we'll allow in the channel before we stop allowing writes. const BUFFER_CHANNEL_SIZE: u16 = 1024; @@ -48,7 +55,7 @@ struct BoundedBufferChannelInner { buffers: [MaybeUninit<V8Slice<u8>>; BUFFER_CHANNEL_SIZE as _], ring_producer: u16, ring_consumer: u16, - error: Option<AnyError>, + error: Option<StreamResourceError>, current_size: usize, // TODO(mmastrac): we can math this field instead of accounting for it len: usize, @@ -141,7 +148,10 @@ impl BoundedBufferChannelInner { self.len = 0; } - pub fn read(&mut self, limit: usize) -> Result<Option<BufView>, AnyError> { + pub fn read( + &mut self, + limit: usize, + ) -> Result<Option<BufView>, StreamResourceError> { // Empty buffers will return the error, if one exists, or None if self.len == 0 { if let Some(error) = self.error.take() { @@ -230,7 +240,7 @@ impl BoundedBufferChannelInner { Ok(()) } - pub fn write_error(&mut self, error: AnyError) { + pub fn write_error(&mut self, error: StreamResourceError) { self.error = Some(error); if let Some(waker) = self.read_waker.take() { waker.wake(); @@ -306,7 +316,10 @@ impl BoundedBufferChannel { self.inner.borrow_mut() } - pub fn read(&self, limit: usize) -> Result<Option<BufView>, AnyError> { + pub fn read( + &self, + limit: usize, + ) -> Result<Option<BufView>, StreamResourceError> { self.inner().read(limit) } @@ -314,7 +327,7 @@ impl BoundedBufferChannel { self.inner().write(buffer) } - pub fn write_error(&self, error: AnyError) { + pub fn write_error(&self, error: StreamResourceError) { self.inner().write_error(error) } @@ -358,7 +371,10 @@ impl ReadableStreamResource { RcRef::map(self, |s| &s.cancel_handle).clone() } - async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> { + async fn read( + self: Rc<Self>, + limit: usize, + ) -> Result<BufView, StreamResourceError> { let cancel_handle = self.cancel_handle(); // Serialize all the reads using a task queue. let _read_permit = self.read_queue.acquire().await; @@ -387,7 +403,7 @@ impl Resource for ReadableStreamResource { } fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { - Box::pin(ReadableStreamResource::read(self, limit)) + Box::pin(ReadableStreamResource::read(self, limit).map_err(|e| e.into())) } fn close(self: Rc<Self>) { @@ -550,7 +566,7 @@ pub fn op_readable_stream_resource_write_error( ) -> bool { let sender = get_sender(sender); // We can always write an error, no polling required - sender.write_error(type_error(Cow::Owned(error))); + sender.write_error(StreamResourceError::Js(error)); !sender.closed() } |