diff options
Diffstat (limited to 'cli/tests/unit/streams_test.ts')
-rw-r--r-- | cli/tests/unit/streams_test.ts | 98 |
1 files changed, 53 insertions, 45 deletions
diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts index bb8099efb..c488f214a 100644 --- a/cli/tests/unit/streams_test.ts +++ b/cli/tests/unit/streams_test.ts @@ -190,44 +190,46 @@ Deno.test(async function readableStream() { // Close the stream after reading everything Deno.test(async function readableStreamClose() { - const { promise: cancelPromise, resolve: cancelResolve } = Promise - .withResolvers(); - const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve)); + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream( + helloWorldStream(false, cancel.resolve), + ); const buffer = new Uint8Array(1024); const nread = await core.ops.op_read(rid, buffer); assertEquals(nread, 12); core.ops.op_close(rid); - assertEquals(await cancelPromise, "resource closed"); + assertEquals(await cancel.promise, "resource closed"); }); // Close the stream without reading everything Deno.test(async function readableStreamClosePartialRead() { - const { promise: cancelPromise, resolve: cancelResolve } = Promise - .withResolvers(); - const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve)); + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream( + helloWorldStream(false, cancel.resolve), + ); const buffer = new Uint8Array(5); const nread = await core.ops.op_read(rid, buffer); assertEquals(nread, 5); core.ops.op_close(rid); - assertEquals(await cancelPromise, "resource closed"); + assertEquals(await cancel.promise, "resource closed"); }); // Close the stream without reading anything Deno.test(async function readableStreamCloseWithoutRead() { - const { promise: cancelPromise, resolve: cancelResolve } = Promise - .withResolvers(); - const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve)); + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream( + helloWorldStream(false, cancel.resolve), + ); core.ops.op_close(rid); - assertEquals(await cancelPromise, "resource closed"); + assertEquals(await cancel.promise, "resource closed"); }); // Close the stream without reading anything Deno.test(async function readableStreamCloseWithoutRead2() { - const { promise: cancelPromise, resolve: cancelResolve } = Promise - .withResolvers(); - const rid = resourceForReadableStream(longAsyncStream(cancelResolve)); + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream(longAsyncStream(cancel.resolve)); core.ops.op_close(rid); - assertEquals(await cancelPromise, "resource closed"); + assertEquals(await cancel.promise, "resource closed"); }); Deno.test(async function readableStreamPartial() { @@ -439,32 +441,38 @@ function createStreamTest( }); } -Deno.test(async function readableStreamWithAggressiveResourceClose() { - let first = true; - const { promise: reasonPromise, resolve: reasonResolve } = Promise - .withResolvers(); - const rid = resourceForReadableStream( - new ReadableStream({ - pull(controller) { - if (first) { - // We queue this up and then immediately close the resource (not the reader) - controller.enqueue(new Uint8Array(1)); - core.close(rid); - // This doesn't throw, even though the resource is closed - controller.enqueue(new Uint8Array(1)); - first = false; - } - }, - cancel(reason) { - reasonResolve(reason); - }, - }), - ); - try { - await core.ops.op_read(rid, new Uint8Array(1)); - fail(); - } catch (e) { - assertEquals(e.message, "operation canceled"); - } - assertEquals(await reasonPromise, "resource closed"); -}); +// 1024 is the size of the internal packet buffer -- we want to make sure we fill the internal pipe fully. +for (const packetCount of [1, 1024]) { + Deno.test(`readableStreamWithAggressiveResourceClose_${packetCount}`, async function () { + let first = true; + const { promise, resolve } = Promise.withResolvers(); + const rid = resourceForReadableStream( + new ReadableStream({ + pull(controller) { + if (first) { + // We queue this up and then immediately close the resource (not the reader) + for (let i = 0; i < packetCount; i++) { + controller.enqueue(new Uint8Array(1)); + } + core.close(rid); + // This doesn't throw, even though the resource is closed + controller.enqueue(new Uint8Array(1)); + first = false; + } + }, + cancel(reason) { + resolve(reason); + }, + }), + ); + try { + for (let i = 0; i < packetCount; i++) { + await core.ops.op_read(rid, new Uint8Array(1)); + } + fail(); + } catch (e) { + assertEquals(e.message, "operation canceled"); + } + assertEquals(await promise, "resource closed"); + }); +} |