summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/web/06_streams.js22
-rw-r--r--ext/web/stream_resource.rs30
2 files changed, 30 insertions, 22 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 0849d221d..9dde03b7f 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -724,7 +724,7 @@ function resourceForReadableStream(stream) {
PromisePrototypeCatch(
PromisePrototypeThen(
op_readable_stream_resource_await_close(rid),
- () => reader.cancel(),
+ () => reader.cancel("resource closed"),
),
() => {},
);
@@ -745,17 +745,25 @@ function resourceForReadableStream(stream) {
break;
}
} catch (err) {
- const message = err.message;
- if (message) {
- await op_readable_stream_resource_write_error(sink, err.message);
- } else {
- await op_readable_stream_resource_write_error(sink, String(err));
+ const message = err?.message;
+ const success = (message && (typeof message == "string"))
+ ? await op_readable_stream_resource_write_error(sink, message)
+ : await op_readable_stream_resource_write_error(
+ sink,
+ String(err),
+ );
+ // We don't cancel the reader if there was an error reading. We'll let the downstream
+ // consumer close the resource after it receives the error.
+ if (!success) {
+ reader.cancel("resource closed");
}
break;
}
// If the chunk has non-zero length, write it
if (value.length > 0) {
- await op_readable_stream_resource_write_buf(sink, value);
+ if (!await op_readable_stream_resource_write_buf(sink, value)) {
+ reader.cancel("resource closed");
+ }
}
}
} finally {
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
index 4c2a75648..0c483eccc 100644
--- a/ext/web/stream_resource.rs
+++ b/ext/web/stream_resource.rs
@@ -111,6 +111,10 @@ impl Resource for ReadableStreamResource {
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(ReadableStreamResource::read(self, limit))
}
+
+ fn close(self: Rc<Self>) {
+ self.cancel_handle.cancel();
+ }
}
// TODO(mmastrac): Move this to deno_core
@@ -155,10 +159,6 @@ impl Future for CompletionHandle {
}
}
-fn sender_closed() -> Error {
- type_error("sender closed")
-}
-
/// Allocate a resource that wraps a ReadableStream.
#[op2(fast)]
#[smi]
@@ -210,15 +210,13 @@ fn drop_sender(sender: *const c_void) {
pub fn op_readable_stream_resource_write_buf(
sender: *const c_void,
#[buffer] buffer: JsBuffer,
-) -> impl Future<Output = Result<(), Error>> {
+) -> impl Future<Output = bool> {
let sender = get_sender(sender);
async move {
- let sender = sender.ok_or_else(sender_closed)?;
- sender
- .send(Ok(buffer.into()))
- .await
- .map_err(|_| sender_closed())?;
- Ok(())
+ let Some(sender) = sender else {
+ return false;
+ };
+ sender.send(Ok(buffer.into())).await.ok().is_some()
}
}
@@ -226,15 +224,17 @@ pub fn op_readable_stream_resource_write_buf(
pub fn op_readable_stream_resource_write_error(
sender: *const c_void,
#[string] error: String,
-) -> impl Future<Output = Result<(), Error>> {
+) -> impl Future<Output = bool> {
let sender = get_sender(sender);
async move {
- let sender = sender.ok_or_else(sender_closed)?;
+ let Some(sender) = sender else {
+ return false;
+ };
sender
.send(Err(type_error(Cow::Owned(error))))
.await
- .map_err(|_| sender_closed())?;
- Ok(())
+ .ok()
+ .is_some()
}
}