diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-08-17 07:52:37 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-17 07:52:37 -0600 |
commit | 23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch) | |
tree | 1521ffd2ac5e803224546cb349b3905925b9b5ff /cli/tests/unit/streams_test.ts | |
parent | 0960e895da1275792c1f38999f6a185c864edb3f (diff) |
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work.
This is a resource wrapper for `ReadableStream`, allowing us to treat
all `ReadableStream` instances as resources, and remove special paths in
both `fetch` and `serve`.
Performance with a ReadableStream response yields ~18% improvement:
```
return new Response(new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]));
controller.close();
}
})
```
This patch:
```
12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 99.96us 100.03us 6.65ms 98.84%
Req/Sec 47.73k 2.43k 51.02k 89.11%
959308 requests in 10.10s, 117.10MB read
Requests/sec: 94978.71
Transfer/sec: 11.59MB
```
main:
```
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 163.03us 685.51us 19.73ms 99.27%
Req/Sec 39.50k 3.98k 66.11k 95.52%
789582 requests in 10.10s, 82.83MB read
Requests/sec: 78182.65
Transfer/sec: 8.20MB
```
Diffstat (limited to 'cli/tests/unit/streams_test.ts')
-rw-r--r-- | cli/tests/unit/streams_test.ts | 299 |
1 files changed, 299 insertions, 0 deletions
diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts new file mode 100644 index 000000000..4a573c934 --- /dev/null +++ b/cli/tests/unit/streams_test.ts @@ -0,0 +1,299 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +import { fail } from "https://deno.land/std@v0.42.0/testing/asserts.ts"; +import { assertEquals, Deferred, deferred } from "./test_util.ts"; + +const { + core, + resourceForReadableStream, + // @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol +} = Deno[Deno.internal]; + +const LOREM = + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."; + +// Hello world, with optional close +// deno-lint-ignore no-explicit-any +function helloWorldStream(close?: boolean, completion?: Deferred<any>) { + return new ReadableStream({ + start(controller) { + controller.enqueue("hello, world"); + if (close == true) { + controller.close(); + } + }, + cancel(reason) { + completion?.resolve(reason); + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Hello world, with optional close +function errorStream(type: "string" | "controller" | "TypeError") { + return new ReadableStream({ + start(controller) { + controller.enqueue("hello, world"); + }, + pull(controller) { + if (type == "string") { + throw "Uh oh (string)!"; + } + if (type == "TypeError") { + throw TypeError("Uh oh (TypeError)!"); + } + controller.error("Uh oh (controller)!"); + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Long stream with Lorem Ipsum text. +function longStream() { + return new ReadableStream({ + start(controller) { + for (let i = 0; i < 4; i++) { + setTimeout(() => { + controller.enqueue(LOREM); + if (i == 3) { + controller.close(); + } + }, i * 100); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Empty stream, closes either immediately or on a call to pull. +function emptyStream(onPull: boolean) { + return new ReadableStream({ + start(controller) { + if (!onPull) { + controller.close(); + } + }, + pull(controller) { + if (onPull) { + controller.close(); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Include an empty chunk +function emptyChunkStream() { + return new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1])); + controller.enqueue(new Uint8Array([])); + controller.enqueue(new Uint8Array([2])); + controller.close(); + }, + }); +} + +// Creates a stream with the given number of packets, a configurable delay between packets, and a final +// action (either "Throw" or "Close"). +function makeStreamWithCount( + count: number, + delay: number, + action: "Throw" | "Close", +): ReadableStream { + function doAction(controller: ReadableStreamDefaultController, i: number) { + if (i == count) { + if (action == "Throw") { + controller.error(new Error("Expected error!")); + } else { + controller.close(); + } + } else { + controller.enqueue(String.fromCharCode("a".charCodeAt(0) + i)); + + if (delay == 0) { + doAction(controller, i + 1); + } else { + setTimeout(() => doAction(controller, i + 1), delay); + } + } + } + + return new ReadableStream({ + start(controller) { + if (delay == 0) { + doAction(controller, 0); + } else { + setTimeout(() => doAction(controller, 0), delay); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Normal stream operation +Deno.test(async function readableStream() { + const rid = resourceForReadableStream(helloWorldStream()); + const buffer = new Uint8Array(1024); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 12); + core.ops.op_close(rid); +}); + +// Close the stream after reading everything +Deno.test(async function readableStreamClose() { + const cancel = deferred(); + const rid = resourceForReadableStream(helloWorldStream(false, cancel)); + const buffer = new Uint8Array(1024); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 12); + core.ops.op_close(rid); + assertEquals(await cancel, undefined); +}); + +// Close the stream without reading everything +Deno.test(async function readableStreamClosePartialRead() { + const cancel = deferred(); + const rid = resourceForReadableStream(helloWorldStream(false, cancel)); + const buffer = new Uint8Array(5); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 5); + core.ops.op_close(rid); + assertEquals(await cancel, undefined); +}); + +// Close the stream without reading anything +Deno.test(async function readableStreamCloseWithoutRead() { + const cancel = deferred(); + const rid = resourceForReadableStream(helloWorldStream(false, cancel)); + core.ops.op_close(rid); + assertEquals(await cancel, undefined); +}); + +Deno.test(async function readableStreamPartial() { + const rid = resourceForReadableStream(helloWorldStream()); + const buffer = new Uint8Array(5); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 5); + const buffer2 = new Uint8Array(1024); + const nread2 = await core.ops.op_read(rid, buffer2); + assertEquals(nread2, 7); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamLongReadAll() { + const rid = resourceForReadableStream(longStream()); + const buffer = await core.ops.op_read_all(rid); + assertEquals(buffer.length, LOREM.length * 4); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamLongByPiece() { + const rid = resourceForReadableStream(longStream()); + let total = 0; + for (let i = 0; i < 100; i++) { + const length = await core.ops.op_read(rid, new Uint8Array(16)); + total += length; + if (length == 0) { + break; + } + } + assertEquals(total, LOREM.length * 4); + core.ops.op_close(rid); +}); + +for ( + const type of [ + "string", + "TypeError", + "controller", + ] as ("string" | "TypeError" | "controller")[] +) { + Deno.test(`readableStreamError_${type}`, async function () { + const rid = resourceForReadableStream(errorStream(type)); + assertEquals(12, await core.ops.op_read(rid, new Uint8Array(16))); + try { + await core.ops.op_read(rid, new Uint8Array(1)); + fail(); + } catch (e) { + assertEquals(e.message, `Uh oh (${type})!`); + } + core.ops.op_close(rid); + }); +} + +Deno.test(async function readableStreamEmptyOnStart() { + const rid = resourceForReadableStream(emptyStream(true)); + const buffer = new Uint8Array(1024); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 0); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamEmptyOnPull() { + const rid = resourceForReadableStream(emptyStream(false)); + const buffer = new Uint8Array(1024); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 0); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamEmptyReadAll() { + const rid = resourceForReadableStream(emptyStream(false)); + const buffer = await core.ops.op_read_all(rid); + assertEquals(buffer.length, 0); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamWithEmptyChunk() { + const rid = resourceForReadableStream(emptyChunkStream()); + const buffer = await core.ops.op_read_all(rid); + assertEquals(buffer, new Uint8Array([1, 2])); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamWithEmptyChunkOneByOne() { + const rid = resourceForReadableStream(emptyChunkStream()); + assertEquals(1, await core.ops.op_read(rid, new Uint8Array(1))); + assertEquals(1, await core.ops.op_read(rid, new Uint8Array(1))); + assertEquals(0, await core.ops.op_read(rid, new Uint8Array(1))); + core.ops.op_close(rid); +}); + +for (const count of [0, 1, 2, 3]) { + for (const delay of [0, 1, 10]) { + // Creating a stream that errors in start will throw + if (delay > 0) { + createStreamTest(count, delay, "Throw"); + } + createStreamTest(count, delay, "Close"); + } +} + +function createStreamTest( + count: number, + delay: number, + action: "Throw" | "Close", +) { + Deno.test(`streamCount${count}Delay${delay}${action}`, async () => { + let rid; + try { + rid = resourceForReadableStream( + makeStreamWithCount(count, delay, action), + ); + for (let i = 0; i < count; i++) { + const buffer = new Uint8Array(1); + await core.ops.op_read(rid, buffer); + } + if (action == "Throw") { + try { + const buffer = new Uint8Array(1); + assertEquals(1, await core.ops.op_read(rid, buffer)); + fail(); + } catch (e) { + // We expect this to be thrown + assertEquals(e.message, "Expected error!"); + } + } else { + const buffer = new Uint8Array(1); + assertEquals(0, await core.ops.op_read(rid, buffer)); + } + } finally { + core.ops.op_close(rid); + } + }); +} |