summaryrefslogtreecommitdiff
path: root/cli/tests/unit/streams_test.ts
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-08-17 07:52:37 -0600
committerGitHub <noreply@github.com>2023-08-17 07:52:37 -0600
commit23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch)
tree1521ffd2ac5e803224546cb349b3905925b9b5ff /cli/tests/unit/streams_test.ts
parent0960e895da1275792c1f38999f6a185c864edb3f (diff)
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work. This is a resource wrapper for `ReadableStream`, allowing us to treat all `ReadableStream` instances as resources, and remove special paths in both `fetch` and `serve`. Performance with a ReadableStream response yields ~18% improvement: ``` return new Response(new ReadableStream({ start(controller) { controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100])); controller.close(); } }) ``` This patch: ``` 12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080 Running 10s test @ http://localhost:8080 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 99.96us 100.03us 6.65ms 98.84% Req/Sec 47.73k 2.43k 51.02k 89.11% 959308 requests in 10.10s, 117.10MB read Requests/sec: 94978.71 Transfer/sec: 11.59MB ``` main: ``` Running 10s test @ http://localhost:8080 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 163.03us 685.51us 19.73ms 99.27% Req/Sec 39.50k 3.98k 66.11k 95.52% 789582 requests in 10.10s, 82.83MB read Requests/sec: 78182.65 Transfer/sec: 8.20MB ```
Diffstat (limited to 'cli/tests/unit/streams_test.ts')
-rw-r--r--cli/tests/unit/streams_test.ts299
1 files changed, 299 insertions, 0 deletions
diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts
new file mode 100644
index 000000000..4a573c934
--- /dev/null
+++ b/cli/tests/unit/streams_test.ts
@@ -0,0 +1,299 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+import { fail } from "https://deno.land/std@v0.42.0/testing/asserts.ts";
+import { assertEquals, Deferred, deferred } from "./test_util.ts";
+
+const {
+ core,
+ resourceForReadableStream,
+ // @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol
+} = Deno[Deno.internal];
+
+const LOREM =
+ "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.";
+
+// Hello world, with optional close
+// deno-lint-ignore no-explicit-any
+function helloWorldStream(close?: boolean, completion?: Deferred<any>) {
+ return new ReadableStream({
+ start(controller) {
+ controller.enqueue("hello, world");
+ if (close == true) {
+ controller.close();
+ }
+ },
+ cancel(reason) {
+ completion?.resolve(reason);
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+// Hello world, with optional close
+function errorStream(type: "string" | "controller" | "TypeError") {
+ return new ReadableStream({
+ start(controller) {
+ controller.enqueue("hello, world");
+ },
+ pull(controller) {
+ if (type == "string") {
+ throw "Uh oh (string)!";
+ }
+ if (type == "TypeError") {
+ throw TypeError("Uh oh (TypeError)!");
+ }
+ controller.error("Uh oh (controller)!");
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+// Long stream with Lorem Ipsum text.
+function longStream() {
+ return new ReadableStream({
+ start(controller) {
+ for (let i = 0; i < 4; i++) {
+ setTimeout(() => {
+ controller.enqueue(LOREM);
+ if (i == 3) {
+ controller.close();
+ }
+ }, i * 100);
+ }
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+// Empty stream, closes either immediately or on a call to pull.
+function emptyStream(onPull: boolean) {
+ return new ReadableStream({
+ start(controller) {
+ if (!onPull) {
+ controller.close();
+ }
+ },
+ pull(controller) {
+ if (onPull) {
+ controller.close();
+ }
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+// Include an empty chunk
+function emptyChunkStream() {
+ return new ReadableStream({
+ start(controller) {
+ controller.enqueue(new Uint8Array([1]));
+ controller.enqueue(new Uint8Array([]));
+ controller.enqueue(new Uint8Array([2]));
+ controller.close();
+ },
+ });
+}
+
+// Creates a stream with the given number of packets, a configurable delay between packets, and a final
+// action (either "Throw" or "Close").
+function makeStreamWithCount(
+ count: number,
+ delay: number,
+ action: "Throw" | "Close",
+): ReadableStream {
+ function doAction(controller: ReadableStreamDefaultController, i: number) {
+ if (i == count) {
+ if (action == "Throw") {
+ controller.error(new Error("Expected error!"));
+ } else {
+ controller.close();
+ }
+ } else {
+ controller.enqueue(String.fromCharCode("a".charCodeAt(0) + i));
+
+ if (delay == 0) {
+ doAction(controller, i + 1);
+ } else {
+ setTimeout(() => doAction(controller, i + 1), delay);
+ }
+ }
+ }
+
+ return new ReadableStream({
+ start(controller) {
+ if (delay == 0) {
+ doAction(controller, 0);
+ } else {
+ setTimeout(() => doAction(controller, 0), delay);
+ }
+ },
+ }).pipeThrough(new TextEncoderStream());
+}
+
+// Normal stream operation
+Deno.test(async function readableStream() {
+ const rid = resourceForReadableStream(helloWorldStream());
+ const buffer = new Uint8Array(1024);
+ const nread = await core.ops.op_read(rid, buffer);
+ assertEquals(nread, 12);
+ core.ops.op_close(rid);
+});
+
+// Close the stream after reading everything
+Deno.test(async function readableStreamClose() {
+ const cancel = deferred();
+ const rid = resourceForReadableStream(helloWorldStream(false, cancel));
+ const buffer = new Uint8Array(1024);
+ const nread = await core.ops.op_read(rid, buffer);
+ assertEquals(nread, 12);
+ core.ops.op_close(rid);
+ assertEquals(await cancel, undefined);
+});
+
+// Close the stream without reading everything
+Deno.test(async function readableStreamClosePartialRead() {
+ const cancel = deferred();
+ const rid = resourceForReadableStream(helloWorldStream(false, cancel));
+ const buffer = new Uint8Array(5);
+ const nread = await core.ops.op_read(rid, buffer);
+ assertEquals(nread, 5);
+ core.ops.op_close(rid);
+ assertEquals(await cancel, undefined);
+});
+
+// Close the stream without reading anything
+Deno.test(async function readableStreamCloseWithoutRead() {
+ const cancel = deferred();
+ const rid = resourceForReadableStream(helloWorldStream(false, cancel));
+ core.ops.op_close(rid);
+ assertEquals(await cancel, undefined);
+});
+
+Deno.test(async function readableStreamPartial() {
+ const rid = resourceForReadableStream(helloWorldStream());
+ const buffer = new Uint8Array(5);
+ const nread = await core.ops.op_read(rid, buffer);
+ assertEquals(nread, 5);
+ const buffer2 = new Uint8Array(1024);
+ const nread2 = await core.ops.op_read(rid, buffer2);
+ assertEquals(nread2, 7);
+ core.ops.op_close(rid);
+});
+
+Deno.test(async function readableStreamLongReadAll() {
+ const rid = resourceForReadableStream(longStream());
+ const buffer = await core.ops.op_read_all(rid);
+ assertEquals(buffer.length, LOREM.length * 4);
+ core.ops.op_close(rid);
+});
+
+Deno.test(async function readableStreamLongByPiece() {
+ const rid = resourceForReadableStream(longStream());
+ let total = 0;
+ for (let i = 0; i < 100; i++) {
+ const length = await core.ops.op_read(rid, new Uint8Array(16));
+ total += length;
+ if (length == 0) {
+ break;
+ }
+ }
+ assertEquals(total, LOREM.length * 4);
+ core.ops.op_close(rid);
+});
+
+for (
+ const type of [
+ "string",
+ "TypeError",
+ "controller",
+ ] as ("string" | "TypeError" | "controller")[]
+) {
+ Deno.test(`readableStreamError_${type}`, async function () {
+ const rid = resourceForReadableStream(errorStream(type));
+ assertEquals(12, await core.ops.op_read(rid, new Uint8Array(16)));
+ try {
+ await core.ops.op_read(rid, new Uint8Array(1));
+ fail();
+ } catch (e) {
+ assertEquals(e.message, `Uh oh (${type})!`);
+ }
+ core.ops.op_close(rid);
+ });
+}
+
+Deno.test(async function readableStreamEmptyOnStart() {
+ const rid = resourceForReadableStream(emptyStream(true));
+ const buffer = new Uint8Array(1024);
+ const nread = await core.ops.op_read(rid, buffer);
+ assertEquals(nread, 0);
+ core.ops.op_close(rid);
+});
+
+Deno.test(async function readableStreamEmptyOnPull() {
+ const rid = resourceForReadableStream(emptyStream(false));
+ const buffer = new Uint8Array(1024);
+ const nread = await core.ops.op_read(rid, buffer);
+ assertEquals(nread, 0);
+ core.ops.op_close(rid);
+});
+
+Deno.test(async function readableStreamEmptyReadAll() {
+ const rid = resourceForReadableStream(emptyStream(false));
+ const buffer = await core.ops.op_read_all(rid);
+ assertEquals(buffer.length, 0);
+ core.ops.op_close(rid);
+});
+
+Deno.test(async function readableStreamWithEmptyChunk() {
+ const rid = resourceForReadableStream(emptyChunkStream());
+ const buffer = await core.ops.op_read_all(rid);
+ assertEquals(buffer, new Uint8Array([1, 2]));
+ core.ops.op_close(rid);
+});
+
+Deno.test(async function readableStreamWithEmptyChunkOneByOne() {
+ const rid = resourceForReadableStream(emptyChunkStream());
+ assertEquals(1, await core.ops.op_read(rid, new Uint8Array(1)));
+ assertEquals(1, await core.ops.op_read(rid, new Uint8Array(1)));
+ assertEquals(0, await core.ops.op_read(rid, new Uint8Array(1)));
+ core.ops.op_close(rid);
+});
+
+for (const count of [0, 1, 2, 3]) {
+ for (const delay of [0, 1, 10]) {
+ // Creating a stream that errors in start will throw
+ if (delay > 0) {
+ createStreamTest(count, delay, "Throw");
+ }
+ createStreamTest(count, delay, "Close");
+ }
+}
+
+function createStreamTest(
+ count: number,
+ delay: number,
+ action: "Throw" | "Close",
+) {
+ Deno.test(`streamCount${count}Delay${delay}${action}`, async () => {
+ let rid;
+ try {
+ rid = resourceForReadableStream(
+ makeStreamWithCount(count, delay, action),
+ );
+ for (let i = 0; i < count; i++) {
+ const buffer = new Uint8Array(1);
+ await core.ops.op_read(rid, buffer);
+ }
+ if (action == "Throw") {
+ try {
+ const buffer = new Uint8Array(1);
+ assertEquals(1, await core.ops.op_read(rid, buffer));
+ fail();
+ } catch (e) {
+ // We expect this to be thrown
+ assertEquals(e.message, "Expected error!");
+ }
+ } else {
+ const buffer = new Uint8Array(1);
+ assertEquals(0, await core.ops.op_read(rid, buffer));
+ }
+ } finally {
+ core.ops.op_close(rid);
+ }
+ });
+}