diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/web/06_streams.js | 22 | ||||
-rw-r--r-- | ext/web/stream_resource.rs | 30 |
2 files changed, 30 insertions, 22 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 0849d221d..9dde03b7f 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -724,7 +724,7 @@ function resourceForReadableStream(stream) { PromisePrototypeCatch( PromisePrototypeThen( op_readable_stream_resource_await_close(rid), - () => reader.cancel(), + () => reader.cancel("resource closed"), ), () => {}, ); @@ -745,17 +745,25 @@ function resourceForReadableStream(stream) { break; } } catch (err) { - const message = err.message; - if (message) { - await op_readable_stream_resource_write_error(sink, err.message); - } else { - await op_readable_stream_resource_write_error(sink, String(err)); + const message = err?.message; + const success = (message && (typeof message == "string")) + ? await op_readable_stream_resource_write_error(sink, message) + : await op_readable_stream_resource_write_error( + sink, + String(err), + ); + // We don't cancel the reader if there was an error reading. We'll let the downstream + // consumer close the resource after it receives the error. + if (!success) { + reader.cancel("resource closed"); } break; } // If the chunk has non-zero length, write it if (value.length > 0) { - await op_readable_stream_resource_write_buf(sink, value); + if (!await op_readable_stream_resource_write_buf(sink, value)) { + reader.cancel("resource closed"); + } } } } finally { diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 4c2a75648..0c483eccc 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -111,6 +111,10 @@ impl Resource for ReadableStreamResource { fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { Box::pin(ReadableStreamResource::read(self, limit)) } + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel(); + } } // TODO(mmastrac): Move this to deno_core @@ -155,10 +159,6 @@ impl Future for CompletionHandle { } } -fn sender_closed() -> Error { - type_error("sender closed") -} - /// Allocate a resource that wraps a ReadableStream. #[op2(fast)] #[smi] @@ -210,15 +210,13 @@ fn drop_sender(sender: *const c_void) { pub fn op_readable_stream_resource_write_buf( sender: *const c_void, #[buffer] buffer: JsBuffer, -) -> impl Future<Output = Result<(), Error>> { +) -> impl Future<Output = bool> { let sender = get_sender(sender); async move { - let sender = sender.ok_or_else(sender_closed)?; - sender - .send(Ok(buffer.into())) - .await - .map_err(|_| sender_closed())?; - Ok(()) + let Some(sender) = sender else { + return false; + }; + sender.send(Ok(buffer.into())).await.ok().is_some() } } @@ -226,15 +224,17 @@ pub fn op_readable_stream_resource_write_buf( pub fn op_readable_stream_resource_write_error( sender: *const c_void, #[string] error: String, -) -> impl Future<Output = Result<(), Error>> { +) -> impl Future<Output = bool> { let sender = get_sender(sender); async move { - let sender = sender.ok_or_else(sender_closed)?; + let Some(sender) = sender else { + return false; + }; sender .send(Err(type_error(Cow::Owned(error)))) .await - .map_err(|_| sender_closed())?; - Ok(()) + .ok() + .is_some() } } |