summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-08-22 16:16:34 -0600
committerGitHub <noreply@github.com>2023-08-22 16:16:34 -0600
commit9b0130770467ef11f4c0d4acbb42a9f0cf4bfe7f (patch)
treed011b4d1e5d06da86e3c02e83aa12fd965611db5
parentc37b9655b6a6ccff1cedc6e43d245fd55415d76f (diff)
fix(ext/web): better handling of errors in resourceForReadableStream (#20238)
Improves error handling when the Resource is closed in various phases of the ReadableStream. Ensure that we send a consistent `cancel` reason.
-rw-r--r--cli/tests/unit/streams_test.ts43
-rw-r--r--ext/web/06_streams.js22
-rw-r--r--ext/web/stream_resource.rs30
3 files changed, 69 insertions, 26 deletions
diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts
index 4a573c934..c62c48469 100644
--- a/cli/tests/unit/streams_test.ts
+++ b/cli/tests/unit/streams_test.ts
@@ -142,7 +142,7 @@ Deno.test(async function readableStreamClose() {
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 12);
core.ops.op_close(rid);
- assertEquals(await cancel, undefined);
+ assertEquals(await cancel, "resource closed");
});
// Close the stream without reading everything
@@ -153,7 +153,7 @@ Deno.test(async function readableStreamClosePartialRead() {
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 5);
core.ops.op_close(rid);
- assertEquals(await cancel, undefined);
+ assertEquals(await cancel, "resource closed");
});
// Close the stream without reading anything
@@ -161,7 +161,7 @@ Deno.test(async function readableStreamCloseWithoutRead() {
const cancel = deferred();
const rid = resourceForReadableStream(helloWorldStream(false, cancel));
core.ops.op_close(rid);
- assertEquals(await cancel, undefined);
+ assertEquals(await cancel, "resource closed");
});
Deno.test(async function readableStreamPartial() {
@@ -205,7 +205,13 @@ for (
) {
Deno.test(`readableStreamError_${type}`, async function () {
const rid = resourceForReadableStream(errorStream(type));
- assertEquals(12, await core.ops.op_read(rid, new Uint8Array(16)));
+ let nread;
+ try {
+ nread = await core.ops.op_read(rid, new Uint8Array(16));
+ } catch (_) {
+ fail("Should not have thrown");
+ }
+ assertEquals(12, nread);
try {
await core.ops.op_read(rid, new Uint8Array(1));
fail();
@@ -297,3 +303,32 @@ function createStreamTest(
}
});
}
+
+Deno.test(async function readableStreamWithAggressiveResourceClose() {
+ let first = true;
+ const reasonPromise = deferred();
+ const rid = resourceForReadableStream(
+ new ReadableStream({
+ pull(controller) {
+ if (first) {
+ // We queue this up and then immediately close the resource (not the reader)
+ controller.enqueue(new Uint8Array(1));
+ core.close(rid);
+ // This doesn't throw, even though the resource is closed
+ controller.enqueue(new Uint8Array(1));
+ first = false;
+ }
+ },
+ cancel(reason) {
+ reasonPromise.resolve(reason);
+ },
+ }),
+ );
+ try {
+ await core.ops.op_read(rid, new Uint8Array(1));
+ fail();
+ } catch (e) {
+ assertEquals(e.message, "operation canceled");
+ }
+ assertEquals(await reasonPromise, "resource closed");
+});
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 0849d221d..9dde03b7f 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -724,7 +724,7 @@ function resourceForReadableStream(stream) {
PromisePrototypeCatch(
PromisePrototypeThen(
op_readable_stream_resource_await_close(rid),
- () => reader.cancel(),
+ () => reader.cancel("resource closed"),
),
() => {},
);
@@ -745,17 +745,25 @@ function resourceForReadableStream(stream) {
break;
}
} catch (err) {
- const message = err.message;
- if (message) {
- await op_readable_stream_resource_write_error(sink, err.message);
- } else {
- await op_readable_stream_resource_write_error(sink, String(err));
+ const message = err?.message;
+ const success = (message && (typeof message == "string"))
+ ? await op_readable_stream_resource_write_error(sink, message)
+ : await op_readable_stream_resource_write_error(
+ sink,
+ String(err),
+ );
+ // We don't cancel the reader if there was an error reading. We'll let the downstream
+ // consumer close the resource after it receives the error.
+ if (!success) {
+ reader.cancel("resource closed");
}
break;
}
// If the chunk has non-zero length, write it
if (value.length > 0) {
- await op_readable_stream_resource_write_buf(sink, value);
+ if (!await op_readable_stream_resource_write_buf(sink, value)) {
+ reader.cancel("resource closed");
+ }
}
}
} finally {
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
index 4c2a75648..0c483eccc 100644
--- a/ext/web/stream_resource.rs
+++ b/ext/web/stream_resource.rs
@@ -111,6 +111,10 @@ impl Resource for ReadableStreamResource {
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(ReadableStreamResource::read(self, limit))
}
+
+ fn close(self: Rc<Self>) {
+ self.cancel_handle.cancel();
+ }
}
// TODO(mmastrac): Move this to deno_core
@@ -155,10 +159,6 @@ impl Future for CompletionHandle {
}
}
-fn sender_closed() -> Error {
- type_error("sender closed")
-}
-
/// Allocate a resource that wraps a ReadableStream.
#[op2(fast)]
#[smi]
@@ -210,15 +210,13 @@ fn drop_sender(sender: *const c_void) {
pub fn op_readable_stream_resource_write_buf(
sender: *const c_void,
#[buffer] buffer: JsBuffer,
-) -> impl Future<Output = Result<(), Error>> {
+) -> impl Future<Output = bool> {
let sender = get_sender(sender);
async move {
- let sender = sender.ok_or_else(sender_closed)?;
- sender
- .send(Ok(buffer.into()))
- .await
- .map_err(|_| sender_closed())?;
- Ok(())
+ let Some(sender) = sender else {
+ return false;
+ };
+ sender.send(Ok(buffer.into())).await.ok().is_some()
}
}
@@ -226,15 +224,17 @@ pub fn op_readable_stream_resource_write_buf(
pub fn op_readable_stream_resource_write_error(
sender: *const c_void,
#[string] error: String,
-) -> impl Future<Output = Result<(), Error>> {
+) -> impl Future<Output = bool> {
let sender = get_sender(sender);
async move {
- let sender = sender.ok_or_else(sender_closed)?;
+ let Some(sender) = sender else {
+ return false;
+ };
sender
.send(Err(type_error(Cow::Owned(error))))
.await
- .map_err(|_| sender_closed())?;
- Ok(())
+ .ok()
+ .is_some()
}
}