summaryrefslogtreecommitdiff
path: root/ext/web/stream_resource.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/web/stream_resource.rs')
-rw-r--r--ext/web/stream_resource.rs36
1 files changed, 26 insertions, 10 deletions
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
index 78487883b..c44a385ea 100644
--- a/ext/web/stream_resource.rs
+++ b/ext/web/stream_resource.rs
@@ -1,7 +1,5 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use bytes::BytesMut;
-use deno_core::error::type_error;
-use deno_core::error::AnyError;
use deno_core::external;
use deno_core::op2;
use deno_core::serde_v8::V8Slice;
@@ -18,6 +16,7 @@ use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use futures::future::poll_fn;
+use futures::TryFutureExt;
use std::borrow::Cow;
use std::cell::RefCell;
use std::cell::RefMut;
@@ -31,6 +30,14 @@ use std::task::Context;
use std::task::Poll;
use std::task::Waker;
+#[derive(Debug, thiserror::Error)]
+pub enum StreamResourceError {
+ #[error(transparent)]
+ Canceled(#[from] deno_core::Canceled),
+ #[error("{0}")]
+ Js(String),
+}
+
// How many buffers we'll allow in the channel before we stop allowing writes.
const BUFFER_CHANNEL_SIZE: u16 = 1024;
@@ -48,7 +55,7 @@ struct BoundedBufferChannelInner {
buffers: [MaybeUninit<V8Slice<u8>>; BUFFER_CHANNEL_SIZE as _],
ring_producer: u16,
ring_consumer: u16,
- error: Option<AnyError>,
+ error: Option<StreamResourceError>,
current_size: usize,
// TODO(mmastrac): we can math this field instead of accounting for it
len: usize,
@@ -141,7 +148,10 @@ impl BoundedBufferChannelInner {
self.len = 0;
}
- pub fn read(&mut self, limit: usize) -> Result<Option<BufView>, AnyError> {
+ pub fn read(
+ &mut self,
+ limit: usize,
+ ) -> Result<Option<BufView>, StreamResourceError> {
// Empty buffers will return the error, if one exists, or None
if self.len == 0 {
if let Some(error) = self.error.take() {
@@ -230,7 +240,7 @@ impl BoundedBufferChannelInner {
Ok(())
}
- pub fn write_error(&mut self, error: AnyError) {
+ pub fn write_error(&mut self, error: StreamResourceError) {
self.error = Some(error);
if let Some(waker) = self.read_waker.take() {
waker.wake();
@@ -306,7 +316,10 @@ impl BoundedBufferChannel {
self.inner.borrow_mut()
}
- pub fn read(&self, limit: usize) -> Result<Option<BufView>, AnyError> {
+ pub fn read(
+ &self,
+ limit: usize,
+ ) -> Result<Option<BufView>, StreamResourceError> {
self.inner().read(limit)
}
@@ -314,7 +327,7 @@ impl BoundedBufferChannel {
self.inner().write(buffer)
}
- pub fn write_error(&self, error: AnyError) {
+ pub fn write_error(&self, error: StreamResourceError) {
self.inner().write_error(error)
}
@@ -358,7 +371,10 @@ impl ReadableStreamResource {
RcRef::map(self, |s| &s.cancel_handle).clone()
}
- async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> {
+ async fn read(
+ self: Rc<Self>,
+ limit: usize,
+ ) -> Result<BufView, StreamResourceError> {
let cancel_handle = self.cancel_handle();
// Serialize all the reads using a task queue.
let _read_permit = self.read_queue.acquire().await;
@@ -387,7 +403,7 @@ impl Resource for ReadableStreamResource {
}
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
- Box::pin(ReadableStreamResource::read(self, limit))
+ Box::pin(ReadableStreamResource::read(self, limit).map_err(|e| e.into()))
}
fn close(self: Rc<Self>) {
@@ -550,7 +566,7 @@ pub fn op_readable_stream_resource_write_error(
) -> bool {
let sender = get_sender(sender);
// We can always write an error, no polling required
- sender.write_error(type_error(Cow::Owned(error)));
+ sender.write_error(StreamResourceError::Js(error));
!sender.closed()
}