summaryrefslogtreecommitdiff
path: root/tests/unit/streams_test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/streams_test.ts')
-rw-r--r--tests/unit/streams_test.ts478
1 files changed, 478 insertions, 0 deletions
diff --git a/tests/unit/streams_test.ts b/tests/unit/streams_test.ts
new file mode 100644
index 000000000..6db9f666c
--- /dev/null
+++ b/tests/unit/streams_test.ts
@@ -0,0 +1,478 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+import { assertEquals, fail } from "./test_util.ts";
+
+const {
+ core,
+ resourceForReadableStream,
+ // @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol
+} = Deno[Deno.internal];
+
+const LOREM =
+ "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.";
+
+// Hello world, with optional close
+function helloWorldStream(
+ close?: boolean,
+ cancelResolve?: (value: unknown) => void,
+) {
+ return new ReadableStream({
+ start(controller) {
+ controller.enqueue("hello, world");
+ if (close == true) {
+ controller.close();
+ }
+ },
+ cancel(reason) {
+ if (cancelResolve != undefined) {
+ cancelResolve(reason);
+ }
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+// Hello world, with optional close
+function errorStream(type: "string" | "controller" | "TypeError") {
+ return new ReadableStream({
+ start(controller) {
+ controller.enqueue("hello, world");
+ },
+ pull(controller) {
+ if (type == "string") {
+ throw "Uh oh (string)!";
+ }
+ if (type == "TypeError") {
+ throw TypeError("Uh oh (TypeError)!");
+ }
+ controller.error("Uh oh (controller)!");
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+// Long stream with Lorem Ipsum text.
+function longStream() {
+ return new ReadableStream({
+ start(controller) {
+ for (let i = 0; i < 4; i++) {
+ setTimeout(() => {
+ controller.enqueue(LOREM);
+ if (i == 3) {
+ controller.close();
+ }
+ }, i * 100);
+ }
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+// Long stream with Lorem Ipsum text.
+function longAsyncStream(cancelResolve?: (value: unknown) => void) {
+ let currentTimeout: number | undefined = undefined;
+ return new ReadableStream({
+ async start(controller) {
+ for (let i = 0; i < 100; i++) {
+ await new Promise((r) => currentTimeout = setTimeout(r, 1));
+ currentTimeout = undefined;
+ controller.enqueue(LOREM);
+ }
+ controller.close();
+ },
+ cancel(reason) {
+ if (cancelResolve != undefined) {
+ cancelResolve(reason);
+ }
+ if (currentTimeout !== undefined) {
+ clearTimeout(currentTimeout);
+ }
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+// Empty stream, closes either immediately or on a call to pull.
+function emptyStream(onPull: boolean) {
+ return new ReadableStream({
+ start(controller) {
+ if (!onPull) {
+ controller.close();
+ }
+ },
+ pull(controller) {
+ if (onPull) {
+ controller.close();
+ }
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+function largePacketStream(packetSize: number, count: number) {
+ return new ReadableStream({
+ pull(controller) {
+ if (count-- > 0) {
+ const buffer = new Uint8Array(packetSize);
+ for (let i = 0; i < 256; i++) {
+ buffer[i * (packetSize / 256)] = i;
+ }
+ controller.enqueue(buffer);
+ } else {
+ controller.close();
+ }
+ },
+ });
+}
+
+// Include an empty chunk
+function emptyChunkStream() {
+ return new ReadableStream({
+ start(controller) {
+ controller.enqueue(new Uint8Array([1]));
+ controller.enqueue(new Uint8Array([]));
+ controller.enqueue(new Uint8Array([2]));
+ controller.close();
+ },
+ });
+}
+
+// Try to blow up any recursive reads.
+function veryLongTinyPacketStream(length: number) {
+ return new ReadableStream({
+ start(controller) {
+ for (let i = 0; i < length; i++) {
+ controller.enqueue(new Uint8Array([1]));
+ }
+ controller.close();
+ },
+ });
+}
+
+// Creates a stream with the given number of packets, a configurable delay between packets, and a final
+// action (either "Throw" or "Close").
+function makeStreamWithCount(
+ count: number,
+ delay: number,
+ action: "Throw" | "Close",
+): ReadableStream {
+ function doAction(controller: ReadableStreamDefaultController, i: number) {
+ if (i == count) {
+ if (action == "Throw") {
+ controller.error(new Error("Expected error!"));
+ } else {
+ controller.close();
+ }
+ } else {
+ controller.enqueue(String.fromCharCode("a".charCodeAt(0) + i));
+
+ if (delay == 0) {
+ doAction(controller, i + 1);
+ } else {
+ setTimeout(() => doAction(controller, i + 1), delay);
+ }
+ }
+ }
+
+ return new ReadableStream({
+ start(controller) {
+ if (delay == 0) {
+ doAction(controller, 0);
+ } else {
+ setTimeout(() => doAction(controller, 0), delay);
+ }
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+// Normal stream operation
+Deno.test(async function readableStream() {
+ const rid = resourceForReadableStream(helloWorldStream());
+ const buffer = new Uint8Array(1024);
+ const nread = await core.read(rid, buffer);
+ assertEquals(nread, 12);
+ core.close(rid);
+});
+
+// Close the stream after reading everything
+Deno.test(async function readableStreamClose() {
+ const cancel = Promise.withResolvers();
+ const rid = resourceForReadableStream(
+ helloWorldStream(false, cancel.resolve),
+ );
+ const buffer = new Uint8Array(1024);
+ const nread = await core.read(rid, buffer);
+ assertEquals(nread, 12);
+ core.close(rid);
+ assertEquals(await cancel.promise, "resource closed");
+});
+
+// Close the stream without reading everything
+Deno.test(async function readableStreamClosePartialRead() {
+ const cancel = Promise.withResolvers();
+ const rid = resourceForReadableStream(
+ helloWorldStream(false, cancel.resolve),
+ );
+ const buffer = new Uint8Array(5);
+ const nread = await core.read(rid, buffer);
+ assertEquals(nread, 5);
+ core.close(rid);
+ assertEquals(await cancel.promise, "resource closed");
+});
+
+// Close the stream without reading anything
+Deno.test(async function readableStreamCloseWithoutRead() {
+ const cancel = Promise.withResolvers();
+ const rid = resourceForReadableStream(
+ helloWorldStream(false, cancel.resolve),
+ );
+ core.close(rid);
+ assertEquals(await cancel.promise, "resource closed");
+});
+
+// Close the stream without reading anything
+Deno.test(async function readableStreamCloseWithoutRead2() {
+ const cancel = Promise.withResolvers();
+ const rid = resourceForReadableStream(longAsyncStream(cancel.resolve));
+ core.close(rid);
+ assertEquals(await cancel.promise, "resource closed");
+});
+
+Deno.test(async function readableStreamPartial() {
+ const rid = resourceForReadableStream(helloWorldStream());
+ const buffer = new Uint8Array(5);
+ const nread = await core.read(rid, buffer);
+ assertEquals(nread, 5);
+ const buffer2 = new Uint8Array(1024);
+ const nread2 = await core.read(rid, buffer2);
+ assertEquals(nread2, 7);
+ core.close(rid);
+});
+
+Deno.test(async function readableStreamLongReadAll() {
+ const rid = resourceForReadableStream(longStream());
+ const buffer = await core.readAll(rid);
+ assertEquals(buffer.length, LOREM.length * 4);
+ core.close(rid);
+});
+
+Deno.test(async function readableStreamLongAsyncReadAll() {
+ const rid = resourceForReadableStream(longAsyncStream());
+ const buffer = await core.readAll(rid);
+ assertEquals(buffer.length, LOREM.length * 100);
+ core.close(rid);
+});
+
+Deno.test(async function readableStreamVeryLongReadAll() {
+ const rid = resourceForReadableStream(veryLongTinyPacketStream(1_000_000));
+ const buffer = await core.readAll(rid);
+ assertEquals(buffer.length, 1_000_000);
+ core.close(rid);
+});
+
+Deno.test(async function readableStreamLongByPiece() {
+ const rid = resourceForReadableStream(longStream());
+ let total = 0;
+ for (let i = 0; i < 100; i++) {
+ const length = await core.read(rid, new Uint8Array(16));
+ total += length;
+ if (length == 0) {
+ break;
+ }
+ }
+ assertEquals(total, LOREM.length * 4);
+ core.close(rid);
+});
+
+for (
+ const type of [
+ "string",
+ "TypeError",
+ "controller",
+ ] as ("string" | "TypeError" | "controller")[]
+) {
+ Deno.test(`readableStreamError_${type}`, async function () {
+ const rid = resourceForReadableStream(errorStream(type));
+ let nread;
+ try {
+ nread = await core.read(rid, new Uint8Array(16));
+ } catch (_) {
+ fail("Should not have thrown");
+ }
+ assertEquals(12, nread);
+ try {
+ await core.read(rid, new Uint8Array(1));
+ fail();
+ } catch (e) {
+ assertEquals(e.message, `Uh oh (${type})!`);
+ }
+ core.close(rid);
+ });
+}
+
+Deno.test(async function readableStreamEmptyOnStart() {
+ const rid = resourceForReadableStream(emptyStream(true));
+ const buffer = new Uint8Array(1024);
+ const nread = await core.read(rid, buffer);
+ assertEquals(nread, 0);
+ core.close(rid);
+});
+
+Deno.test(async function readableStreamEmptyOnPull() {
+ const rid = resourceForReadableStream(emptyStream(false));
+ const buffer = new Uint8Array(1024);
+ const nread = await core.read(rid, buffer);
+ assertEquals(nread, 0);
+ core.close(rid);
+});
+
+Deno.test(async function readableStreamEmptyReadAll() {
+ const rid = resourceForReadableStream(emptyStream(false));
+ const buffer = await core.readAll(rid);
+ assertEquals(buffer.length, 0);
+ core.close(rid);
+});
+
+Deno.test(async function readableStreamWithEmptyChunk() {
+ const rid = resourceForReadableStream(emptyChunkStream());
+ const buffer = await core.readAll(rid);
+ assertEquals(buffer, new Uint8Array([1, 2]));
+ core.close(rid);
+});
+
+Deno.test(async function readableStreamWithEmptyChunkOneByOne() {
+ const rid = resourceForReadableStream(emptyChunkStream());
+ assertEquals(1, await core.read(rid, new Uint8Array(1)));
+ assertEquals(1, await core.read(rid, new Uint8Array(1)));
+ assertEquals(0, await core.read(rid, new Uint8Array(1)));
+ core.close(rid);
+});
+
+// Ensure that we correctly transmit all the sub-chunks of the larger chunks.
+Deno.test(async function readableStreamReadSmallerChunks() {
+ const packetSize = 16 * 1024;
+ const rid = resourceForReadableStream(largePacketStream(packetSize, 1));
+ const buffer = new Uint8Array(packetSize);
+ for (let i = 0; i < packetSize / 1024; i++) {
+ await core.read(rid, buffer.subarray(i * 1024, i * 1024 + 1024));
+ }
+ for (let i = 0; i < 256; i++) {
+ assertEquals(
+ i,
+ buffer[i * (packetSize / 256)],
+ `at index ${i * (packetSize / 256)}`,
+ );
+ }
+ core.close(rid);
+});
+
+Deno.test(async function readableStreamLargePackets() {
+ const packetSize = 128 * 1024;
+ const rid = resourceForReadableStream(largePacketStream(packetSize, 1024));
+ for (let i = 0; i < 1024; i++) {
+ const buffer = new Uint8Array(packetSize);
+ assertEquals(packetSize, await core.read(rid, buffer));
+ for (let i = 0; i < 256; i++) {
+ assertEquals(
+ i,
+ buffer[i * (packetSize / 256)],
+ `at index ${i * (packetSize / 256)}`,
+ );
+ }
+ }
+ assertEquals(0, await core.read(rid, new Uint8Array(1)));
+ core.close(rid);
+});
+
+Deno.test(async function readableStreamVeryLargePackets() {
+ // 1024 packets of 1MB
+ const rid = resourceForReadableStream(largePacketStream(1024 * 1024, 1024));
+ let total = 0;
+ // Read 96kB up to 12,288 times (96kB is not an even multiple of the 1MB packet size to test this)
+ const readCounts: Record<number, number> = {};
+ for (let i = 0; i < 12 * 1024; i++) {
+ const nread = await core.read(rid, new Uint8Array(96 * 1024));
+ total += nread;
+ readCounts[nread] = (readCounts[nread] || 0) + 1;
+ if (nread == 0) {
+ break;
+ }
+ }
+ assertEquals({ 0: 1, 65536: 1024, 98304: 10 * 1024 }, readCounts);
+ assertEquals(total, 1024 * 1024 * 1024);
+ core.close(rid);
+});
+
+for (const count of [0, 1, 2, 3]) {
+ for (const delay of [0, 1, 10]) {
+ // Creating a stream that errors in start will throw
+ if (delay > 0) {
+ createStreamTest(count, delay, "Throw");
+ }
+ createStreamTest(count, delay, "Close");
+ }
+}
+
+function createStreamTest(
+ count: number,
+ delay: number,
+ action: "Throw" | "Close",
+) {
+ Deno.test(`streamCount${count}Delay${delay}${action}`, async () => {
+ let rid;
+ try {
+ rid = resourceForReadableStream(
+ makeStreamWithCount(count, delay, action),
+ );
+ for (let i = 0; i < count; i++) {
+ const buffer = new Uint8Array(1);
+ await core.read(rid, buffer);
+ }
+ if (action == "Throw") {
+ try {
+ const buffer = new Uint8Array(1);
+ assertEquals(1, await core.read(rid, buffer));
+ fail();
+ } catch (e) {
+ // We expect this to be thrown
+ assertEquals(e.message, "Expected error!");
+ }
+ } else {
+ const buffer = new Uint8Array(1);
+ assertEquals(0, await core.read(rid, buffer));
+ }
+ } finally {
+ core.close(rid);
+ }
+ });
+}
+
+// 1024 is the size of the internal packet buffer -- we want to make sure we fill the internal pipe fully.
+for (const packetCount of [1, 1024]) {
+ Deno.test(`readableStreamWithAggressiveResourceClose_${packetCount}`, async function () {
+ let first = true;
+ const { promise, resolve } = Promise.withResolvers();
+ const rid = resourceForReadableStream(
+ new ReadableStream({
+ pull(controller) {
+ if (first) {
+ // We queue this up and then immediately close the resource (not the reader)
+ for (let i = 0; i < packetCount; i++) {
+ controller.enqueue(new Uint8Array(1));
+ }
+ core.close(rid);
+ // This doesn't throw, even though the resource is closed
+ controller.enqueue(new Uint8Array(1));
+ first = false;
+ }
+ },
+ cancel(reason) {
+ resolve(reason);
+ },
+ }),
+ );
+ try {
+ for (let i = 0; i < packetCount; i++) {
+ await core.read(rid, new Uint8Array(1));
+ }
+ fail();
+ } catch (e) {
+ assertEquals(e.message, "operation canceled");
+ }
+ assertEquals(await promise, "resource closed");
+ });
+}