summaryrefslogtreecommitdiff
path: root/cli/tests/unit/streams_test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'cli/tests/unit/streams_test.ts')
-rw-r--r--cli/tests/unit/streams_test.ts98
1 files changed, 53 insertions, 45 deletions
diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts
index bb8099efb..c488f214a 100644
--- a/cli/tests/unit/streams_test.ts
+++ b/cli/tests/unit/streams_test.ts
@@ -190,44 +190,46 @@ Deno.test(async function readableStream() {
// Close the stream after reading everything
Deno.test(async function readableStreamClose() {
- const { promise: cancelPromise, resolve: cancelResolve } = Promise
- .withResolvers();
- const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve));
+ const cancel = Promise.withResolvers();
+ const rid = resourceForReadableStream(
+ helloWorldStream(false, cancel.resolve),
+ );
const buffer = new Uint8Array(1024);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 12);
core.ops.op_close(rid);
- assertEquals(await cancelPromise, "resource closed");
+ assertEquals(await cancel.promise, "resource closed");
});
// Close the stream without reading everything
Deno.test(async function readableStreamClosePartialRead() {
- const { promise: cancelPromise, resolve: cancelResolve } = Promise
- .withResolvers();
- const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve));
+ const cancel = Promise.withResolvers();
+ const rid = resourceForReadableStream(
+ helloWorldStream(false, cancel.resolve),
+ );
const buffer = new Uint8Array(5);
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 5);
core.ops.op_close(rid);
- assertEquals(await cancelPromise, "resource closed");
+ assertEquals(await cancel.promise, "resource closed");
});
// Close the stream without reading anything
Deno.test(async function readableStreamCloseWithoutRead() {
- const { promise: cancelPromise, resolve: cancelResolve } = Promise
- .withResolvers();
- const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve));
+ const cancel = Promise.withResolvers();
+ const rid = resourceForReadableStream(
+ helloWorldStream(false, cancel.resolve),
+ );
core.ops.op_close(rid);
- assertEquals(await cancelPromise, "resource closed");
+ assertEquals(await cancel.promise, "resource closed");
});
// Close the stream without reading anything
Deno.test(async function readableStreamCloseWithoutRead2() {
- const { promise: cancelPromise, resolve: cancelResolve } = Promise
- .withResolvers();
- const rid = resourceForReadableStream(longAsyncStream(cancelResolve));
+ const cancel = Promise.withResolvers();
+ const rid = resourceForReadableStream(longAsyncStream(cancel.resolve));
core.ops.op_close(rid);
- assertEquals(await cancelPromise, "resource closed");
+ assertEquals(await cancel.promise, "resource closed");
});
Deno.test(async function readableStreamPartial() {
@@ -439,32 +441,38 @@ function createStreamTest(
});
}
-Deno.test(async function readableStreamWithAggressiveResourceClose() {
- let first = true;
- const { promise: reasonPromise, resolve: reasonResolve } = Promise
- .withResolvers();
- const rid = resourceForReadableStream(
- new ReadableStream({
- pull(controller) {
- if (first) {
- // We queue this up and then immediately close the resource (not the reader)
- controller.enqueue(new Uint8Array(1));
- core.close(rid);
- // This doesn't throw, even though the resource is closed
- controller.enqueue(new Uint8Array(1));
- first = false;
- }
- },
- cancel(reason) {
- reasonResolve(reason);
- },
- }),
- );
- try {
- await core.ops.op_read(rid, new Uint8Array(1));
- fail();
- } catch (e) {
- assertEquals(e.message, "operation canceled");
- }
- assertEquals(await reasonPromise, "resource closed");
-});
+// 1024 is the size of the internal packet buffer -- we want to make sure we fill the internal pipe fully.
+for (const packetCount of [1, 1024]) {
+ Deno.test(`readableStreamWithAggressiveResourceClose_${packetCount}`, async function () {
+ let first = true;
+ const { promise, resolve } = Promise.withResolvers();
+ const rid = resourceForReadableStream(
+ new ReadableStream({
+ pull(controller) {
+ if (first) {
+ // We queue this up and then immediately close the resource (not the reader)
+ for (let i = 0; i < packetCount; i++) {
+ controller.enqueue(new Uint8Array(1));
+ }
+ core.close(rid);
+ // This doesn't throw, even though the resource is closed
+ controller.enqueue(new Uint8Array(1));
+ first = false;
+ }
+ },
+ cancel(reason) {
+ resolve(reason);
+ },
+ }),
+ );
+ try {
+ for (let i = 0; i < packetCount; i++) {
+ await core.ops.op_read(rid, new Uint8Array(1));
+ }
+ fail();
+ } catch (e) {
+ assertEquals(e.message, "operation canceled");
+ }
+ assertEquals(await promise, "resource closed");
+ });
+}