diff options
Diffstat (limited to 'tests/unit/streams_test.ts')
-rw-r--r-- | tests/unit/streams_test.ts | 478 |
1 files changed, 478 insertions, 0 deletions
diff --git a/tests/unit/streams_test.ts b/tests/unit/streams_test.ts new file mode 100644 index 000000000..6db9f666c --- /dev/null +++ b/tests/unit/streams_test.ts @@ -0,0 +1,478 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +import { assertEquals, fail } from "./test_util.ts"; + +const { + core, + resourceForReadableStream, + // @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol +} = Deno[Deno.internal]; + +const LOREM = + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."; + +// Hello world, with optional close +function helloWorldStream( + close?: boolean, + cancelResolve?: (value: unknown) => void, +) { + return new ReadableStream({ + start(controller) { + controller.enqueue("hello, world"); + if (close == true) { + controller.close(); + } + }, + cancel(reason) { + if (cancelResolve != undefined) { + cancelResolve(reason); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Hello world, with optional close +function errorStream(type: "string" | "controller" | "TypeError") { + return new ReadableStream({ + start(controller) { + controller.enqueue("hello, world"); + }, + pull(controller) { + if (type == "string") { + throw "Uh oh (string)!"; + } + if (type == "TypeError") { + throw TypeError("Uh oh (TypeError)!"); + } + controller.error("Uh oh (controller)!"); + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Long stream with Lorem Ipsum text. +function longStream() { + return new ReadableStream({ + start(controller) { + for (let i = 0; i < 4; i++) { + setTimeout(() => { + controller.enqueue(LOREM); + if (i == 3) { + controller.close(); + } + }, i * 100); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Long stream with Lorem Ipsum text. +function longAsyncStream(cancelResolve?: (value: unknown) => void) { + let currentTimeout: number | undefined = undefined; + return new ReadableStream({ + async start(controller) { + for (let i = 0; i < 100; i++) { + await new Promise((r) => currentTimeout = setTimeout(r, 1)); + currentTimeout = undefined; + controller.enqueue(LOREM); + } + controller.close(); + }, + cancel(reason) { + if (cancelResolve != undefined) { + cancelResolve(reason); + } + if (currentTimeout !== undefined) { + clearTimeout(currentTimeout); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Empty stream, closes either immediately or on a call to pull. +function emptyStream(onPull: boolean) { + return new ReadableStream({ + start(controller) { + if (!onPull) { + controller.close(); + } + }, + pull(controller) { + if (onPull) { + controller.close(); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +function largePacketStream(packetSize: number, count: number) { + return new ReadableStream({ + pull(controller) { + if (count-- > 0) { + const buffer = new Uint8Array(packetSize); + for (let i = 0; i < 256; i++) { + buffer[i * (packetSize / 256)] = i; + } + controller.enqueue(buffer); + } else { + controller.close(); + } + }, + }); +} + +// Include an empty chunk +function emptyChunkStream() { + return new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1])); + controller.enqueue(new Uint8Array([])); + controller.enqueue(new Uint8Array([2])); + controller.close(); + }, + }); +} + +// Try to blow up any recursive reads. +function veryLongTinyPacketStream(length: number) { + return new ReadableStream({ + start(controller) { + for (let i = 0; i < length; i++) { + controller.enqueue(new Uint8Array([1])); + } + controller.close(); + }, + }); +} + +// Creates a stream with the given number of packets, a configurable delay between packets, and a final +// action (either "Throw" or "Close"). +function makeStreamWithCount( + count: number, + delay: number, + action: "Throw" | "Close", +): ReadableStream { + function doAction(controller: ReadableStreamDefaultController, i: number) { + if (i == count) { + if (action == "Throw") { + controller.error(new Error("Expected error!")); + } else { + controller.close(); + } + } else { + controller.enqueue(String.fromCharCode("a".charCodeAt(0) + i)); + + if (delay == 0) { + doAction(controller, i + 1); + } else { + setTimeout(() => doAction(controller, i + 1), delay); + } + } + } + + return new ReadableStream({ + start(controller) { + if (delay == 0) { + doAction(controller, 0); + } else { + setTimeout(() => doAction(controller, 0), delay); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Normal stream operation +Deno.test(async function readableStream() { + const rid = resourceForReadableStream(helloWorldStream()); + const buffer = new Uint8Array(1024); + const nread = await core.read(rid, buffer); + assertEquals(nread, 12); + core.close(rid); +}); + +// Close the stream after reading everything +Deno.test(async function readableStreamClose() { + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream( + helloWorldStream(false, cancel.resolve), + ); + const buffer = new Uint8Array(1024); + const nread = await core.read(rid, buffer); + assertEquals(nread, 12); + core.close(rid); + assertEquals(await cancel.promise, "resource closed"); +}); + +// Close the stream without reading everything +Deno.test(async function readableStreamClosePartialRead() { + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream( + helloWorldStream(false, cancel.resolve), + ); + const buffer = new Uint8Array(5); + const nread = await core.read(rid, buffer); + assertEquals(nread, 5); + core.close(rid); + assertEquals(await cancel.promise, "resource closed"); +}); + +// Close the stream without reading anything +Deno.test(async function readableStreamCloseWithoutRead() { + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream( + helloWorldStream(false, cancel.resolve), + ); + core.close(rid); + assertEquals(await cancel.promise, "resource closed"); +}); + +// Close the stream without reading anything +Deno.test(async function readableStreamCloseWithoutRead2() { + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream(longAsyncStream(cancel.resolve)); + core.close(rid); + assertEquals(await cancel.promise, "resource closed"); +}); + +Deno.test(async function readableStreamPartial() { + const rid = resourceForReadableStream(helloWorldStream()); + const buffer = new Uint8Array(5); + const nread = await core.read(rid, buffer); + assertEquals(nread, 5); + const buffer2 = new Uint8Array(1024); + const nread2 = await core.read(rid, buffer2); + assertEquals(nread2, 7); + core.close(rid); +}); + +Deno.test(async function readableStreamLongReadAll() { + const rid = resourceForReadableStream(longStream()); + const buffer = await core.readAll(rid); + assertEquals(buffer.length, LOREM.length * 4); + core.close(rid); +}); + +Deno.test(async function readableStreamLongAsyncReadAll() { + const rid = resourceForReadableStream(longAsyncStream()); + const buffer = await core.readAll(rid); + assertEquals(buffer.length, LOREM.length * 100); + core.close(rid); +}); + +Deno.test(async function readableStreamVeryLongReadAll() { + const rid = resourceForReadableStream(veryLongTinyPacketStream(1_000_000)); + const buffer = await core.readAll(rid); + assertEquals(buffer.length, 1_000_000); + core.close(rid); +}); + +Deno.test(async function readableStreamLongByPiece() { + const rid = resourceForReadableStream(longStream()); + let total = 0; + for (let i = 0; i < 100; i++) { + const length = await core.read(rid, new Uint8Array(16)); + total += length; + if (length == 0) { + break; + } + } + assertEquals(total, LOREM.length * 4); + core.close(rid); +}); + +for ( + const type of [ + "string", + "TypeError", + "controller", + ] as ("string" | "TypeError" | "controller")[] +) { + Deno.test(`readableStreamError_${type}`, async function () { + const rid = resourceForReadableStream(errorStream(type)); + let nread; + try { + nread = await core.read(rid, new Uint8Array(16)); + } catch (_) { + fail("Should not have thrown"); + } + assertEquals(12, nread); + try { + await core.read(rid, new Uint8Array(1)); + fail(); + } catch (e) { + assertEquals(e.message, `Uh oh (${type})!`); + } + core.close(rid); + }); +} + +Deno.test(async function readableStreamEmptyOnStart() { + const rid = resourceForReadableStream(emptyStream(true)); + const buffer = new Uint8Array(1024); + const nread = await core.read(rid, buffer); + assertEquals(nread, 0); + core.close(rid); +}); + +Deno.test(async function readableStreamEmptyOnPull() { + const rid = resourceForReadableStream(emptyStream(false)); + const buffer = new Uint8Array(1024); + const nread = await core.read(rid, buffer); + assertEquals(nread, 0); + core.close(rid); +}); + +Deno.test(async function readableStreamEmptyReadAll() { + const rid = resourceForReadableStream(emptyStream(false)); + const buffer = await core.readAll(rid); + assertEquals(buffer.length, 0); + core.close(rid); +}); + +Deno.test(async function readableStreamWithEmptyChunk() { + const rid = resourceForReadableStream(emptyChunkStream()); + const buffer = await core.readAll(rid); + assertEquals(buffer, new Uint8Array([1, 2])); + core.close(rid); +}); + +Deno.test(async function readableStreamWithEmptyChunkOneByOne() { + const rid = resourceForReadableStream(emptyChunkStream()); + assertEquals(1, await core.read(rid, new Uint8Array(1))); + assertEquals(1, await core.read(rid, new Uint8Array(1))); + assertEquals(0, await core.read(rid, new Uint8Array(1))); + core.close(rid); +}); + +// Ensure that we correctly transmit all the sub-chunks of the larger chunks. +Deno.test(async function readableStreamReadSmallerChunks() { + const packetSize = 16 * 1024; + const rid = resourceForReadableStream(largePacketStream(packetSize, 1)); + const buffer = new Uint8Array(packetSize); + for (let i = 0; i < packetSize / 1024; i++) { + await core.read(rid, buffer.subarray(i * 1024, i * 1024 + 1024)); + } + for (let i = 0; i < 256; i++) { + assertEquals( + i, + buffer[i * (packetSize / 256)], + `at index ${i * (packetSize / 256)}`, + ); + } + core.close(rid); +}); + +Deno.test(async function readableStreamLargePackets() { + const packetSize = 128 * 1024; + const rid = resourceForReadableStream(largePacketStream(packetSize, 1024)); + for (let i = 0; i < 1024; i++) { + const buffer = new Uint8Array(packetSize); + assertEquals(packetSize, await core.read(rid, buffer)); + for (let i = 0; i < 256; i++) { + assertEquals( + i, + buffer[i * (packetSize / 256)], + `at index ${i * (packetSize / 256)}`, + ); + } + } + assertEquals(0, await core.read(rid, new Uint8Array(1))); + core.close(rid); +}); + +Deno.test(async function readableStreamVeryLargePackets() { + // 1024 packets of 1MB + const rid = resourceForReadableStream(largePacketStream(1024 * 1024, 1024)); + let total = 0; + // Read 96kB up to 12,288 times (96kB is not an even multiple of the 1MB packet size to test this) + const readCounts: Record<number, number> = {}; + for (let i = 0; i < 12 * 1024; i++) { + const nread = await core.read(rid, new Uint8Array(96 * 1024)); + total += nread; + readCounts[nread] = (readCounts[nread] || 0) + 1; + if (nread == 0) { + break; + } + } + assertEquals({ 0: 1, 65536: 1024, 98304: 10 * 1024 }, readCounts); + assertEquals(total, 1024 * 1024 * 1024); + core.close(rid); +}); + +for (const count of [0, 1, 2, 3]) { + for (const delay of [0, 1, 10]) { + // Creating a stream that errors in start will throw + if (delay > 0) { + createStreamTest(count, delay, "Throw"); + } + createStreamTest(count, delay, "Close"); + } +} + +function createStreamTest( + count: number, + delay: number, + action: "Throw" | "Close", +) { + Deno.test(`streamCount${count}Delay${delay}${action}`, async () => { + let rid; + try { + rid = resourceForReadableStream( + makeStreamWithCount(count, delay, action), + ); + for (let i = 0; i < count; i++) { + const buffer = new Uint8Array(1); + await core.read(rid, buffer); + } + if (action == "Throw") { + try { + const buffer = new Uint8Array(1); + assertEquals(1, await core.read(rid, buffer)); + fail(); + } catch (e) { + // We expect this to be thrown + assertEquals(e.message, "Expected error!"); + } + } else { + const buffer = new Uint8Array(1); + assertEquals(0, await core.read(rid, buffer)); + } + } finally { + core.close(rid); + } + }); +} + +// 1024 is the size of the internal packet buffer -- we want to make sure we fill the internal pipe fully. +for (const packetCount of [1, 1024]) { + Deno.test(`readableStreamWithAggressiveResourceClose_${packetCount}`, async function () { + let first = true; + const { promise, resolve } = Promise.withResolvers(); + const rid = resourceForReadableStream( + new ReadableStream({ + pull(controller) { + if (first) { + // We queue this up and then immediately close the resource (not the reader) + for (let i = 0; i < packetCount; i++) { + controller.enqueue(new Uint8Array(1)); + } + core.close(rid); + // This doesn't throw, even though the resource is closed + controller.enqueue(new Uint8Array(1)); + first = false; + } + }, + cancel(reason) { + resolve(reason); + }, + }), + ); + try { + for (let i = 0; i < packetCount; i++) { + await core.read(rid, new Uint8Array(1)); + } + fail(); + } catch (e) { + assertEquals(e.message, "operation canceled"); + } + assertEquals(await promise, "resource closed"); + }); +} |