summaryrefslogtreecommitdiff
path: root/ext/web/06_streams.js
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-08-17 07:52:37 -0600
committerGitHub <noreply@github.com>2023-08-17 07:52:37 -0600
commit23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch)
tree1521ffd2ac5e803224546cb349b3905925b9b5ff /ext/web/06_streams.js
parent0960e895da1275792c1f38999f6a185c864edb3f (diff)
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work. This is a resource wrapper for `ReadableStream`, allowing us to treat all `ReadableStream` instances as resources, and remove special paths in both `fetch` and `serve`. Performance with a ReadableStream response yields ~18% improvement: ``` return new Response(new ReadableStream({ start(controller) { controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100])); controller.close(); } }) ``` This patch: ``` 12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080 Running 10s test @ http://localhost:8080 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 99.96us 100.03us 6.65ms 98.84% Req/Sec 47.73k 2.43k 51.02k 89.11% 959308 requests in 10.10s, 117.10MB read Requests/sec: 94978.71 Transfer/sec: 11.59MB ``` main: ``` Running 10s test @ http://localhost:8080 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 163.03us 685.51us 19.73ms 99.27% Req/Sec 39.50k 3.98k 66.11k 95.52% 789582 requests in 10.10s, 82.83MB read Requests/sec: 78182.65 Transfer/sec: 8.20MB ```
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r--ext/web/06_streams.js83
1 files changed, 80 insertions, 3 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 01f84aa2c..0849d221d 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -1,4 +1,5 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// deno-lint-ignore-file camelcase
// @ts-check
/// <reference path="../webidl/internal.d.ts" />
@@ -7,7 +8,17 @@
/// <reference lib="esnext" />
const core = globalThis.Deno.core;
-const ops = core.ops;
+const internals = globalThis.__bootstrap.internals;
+const {
+ op_arraybuffer_was_detached,
+ op_transfer_arraybuffer,
+ op_readable_stream_resource_allocate,
+ op_readable_stream_resource_get_sink,
+ op_readable_stream_resource_write_error,
+ op_readable_stream_resource_write_buf,
+ op_readable_stream_resource_close,
+ op_readable_stream_resource_await_close,
+} = core.ensureFastOps();
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { structuredClone } from "ext:deno_web/02_structured_clone.js";
import {
@@ -61,6 +72,7 @@ const {
SafeWeakMap,
// TODO(lucacasonato): add SharedArrayBuffer to primordials
// SharedArrayBufferPrototype,
+ String,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
@@ -218,7 +230,7 @@ function isDetachedBuffer(O) {
return false;
}
return ArrayBufferPrototypeGetByteLength(O) === 0 &&
- ops.op_arraybuffer_was_detached(O);
+ op_arraybuffer_was_detached(O);
}
/**
@@ -244,7 +256,7 @@ function canTransferArrayBuffer(O) {
* @returns {ArrayBufferLike}
*/
function transferArrayBuffer(O) {
- return ops.op_transfer_arraybuffer(O);
+ return op_transfer_arraybuffer(O);
}
/**
@@ -695,6 +707,68 @@ function isReadableStreamDisturbed(stream) {
return stream[_disturbed];
}
+/**
+ * Create a new resource that wraps a ReadableStream. The resource will support
+ * read operations, and those read operations will be fed by the output of the
+ * ReadableStream source.
+ * @param {ReadableStream<Uint8Array>} stream
+ * @returns {number}
+ */
+function resourceForReadableStream(stream) {
+ const reader = acquireReadableStreamDefaultReader(stream);
+
+ // Allocate the resource
+ const rid = op_readable_stream_resource_allocate();
+
+ // Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors
+ PromisePrototypeCatch(
+ PromisePrototypeThen(
+ op_readable_stream_resource_await_close(rid),
+ () => reader.cancel(),
+ ),
+ () => {},
+ );
+
+ // The ops here look like op_write_all/op_close, but we're not actually writing to a
+ // real resource.
+ (async () => {
+ try {
+ // This allocation is freed in the finally block below, guaranteeing it won't leak
+ const sink = op_readable_stream_resource_get_sink(rid);
+ try {
+ while (true) {
+ let value;
+ try {
+ const read = await reader.read();
+ value = read.value;
+ if (read.done) {
+ break;
+ }
+ } catch (err) {
+ const message = err.message;
+ if (message) {
+ await op_readable_stream_resource_write_error(sink, err.message);
+ } else {
+ await op_readable_stream_resource_write_error(sink, String(err));
+ }
+ break;
+ }
+ // If the chunk has non-zero length, write it
+ if (value.length > 0) {
+ await op_readable_stream_resource_write_buf(sink, value);
+ }
+ }
+ } finally {
+ op_readable_stream_resource_close(sink);
+ }
+ } catch (err) {
+ // Something went terribly wrong with this stream -- log and continue
+ console.error("Unexpected internal error on stream", err);
+ }
+ })();
+ return rid;
+}
+
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
// A finalization registry to clean up underlying resources that are GC'ed.
@@ -6454,6 +6528,8 @@ webidl.converters.StreamPipeOptions = webidl
{ key: "signal", converter: webidl.converters.AbortSignal },
]);
+internals.resourceForReadableStream = resourceForReadableStream;
+
export {
// Non-Public
_state,
@@ -6482,6 +6558,7 @@ export {
ReadableStreamPrototype,
readableStreamTee,
readableStreamThrowIfErrored,
+ resourceForReadableStream,
TransformStream,
TransformStreamDefaultController,
WritableStream,