diff options
author | Marcos Casagrande <marcoscvp90@gmail.com> | 2022-10-04 15:48:50 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-04 15:48:50 +0200 |
commit | 569287b15b6482a39f2c816f103574c3b35351f8 (patch) | |
tree | ff8433fc87613e3016ff7a188ee34aa3fc7d81c4 /ext/web/06_streams.js | |
parent | 0b4a6c4d084df54e827bc7767ce8653e06c45e93 (diff) |
perf(ext/fetch): consume body using ops (#16038)
This commit adds a fast path to `Request` and `Response` that
make consuming request bodies much faster when using `Body#text`,
`Body#arrayBuffer`, and `Body#blob`, if the body is a FastStream.
Because the response bodies for `fetch` are FastStream, this speeds up
consuming `fetch` response bodies significantly.
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r-- | ext/web/06_streams.js | 76 |
1 files changed, 73 insertions, 3 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 412c58c3c..ba422b71d 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -48,6 +48,7 @@ SymbolAsyncIterator, SymbolFor, TypeError, + TypedArrayPrototypeSet, Uint8Array, Uint8ArrayPrototype, Uint16ArrayPrototype, @@ -647,6 +648,10 @@ const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB + // A finalization registry to clean up underlying resources that are GC'ed. + const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => { + core.tryClose(rid); + }); /** * Create a new ReadableStream object that is backed by a Resource that * implements `Resource::read_return`. This object contains enough metadata to @@ -660,6 +665,17 @@ function readableStreamForRid(rid, autoClose = true) { const stream = webidl.createBranded(ReadableStream); stream[_resourceBacking] = { rid, autoClose }; + + const tryClose = () => { + if (!autoClose) return; + RESOURCE_REGISTRY.unregister(stream); + core.tryClose(rid); + }; + + if (autoClose) { + RESOURCE_REGISTRY.register(stream, rid, stream); + } + const underlyingSource = { type: "bytes", async pull(controller) { @@ -667,7 +683,7 @@ try { const bytesRead = await core.read(rid, v); if (bytesRead === 0) { - if (autoClose) core.tryClose(rid); + tryClose(); controller.close(); controller.byobRequest.respond(0); } else { @@ -675,11 +691,11 @@ } } catch (e) { controller.error(e); - if (autoClose) core.tryClose(rid); + tryClose(); } }, cancel() { - if (autoClose) core.tryClose(rid); + tryClose(); }, autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, }; @@ -766,6 +782,59 @@ return stream[_resourceBacking]; } + async function readableStreamCollectIntoUint8Array(stream) { + const resourceBacking = getReadableStreamResourceBacking(stream); + const reader = acquireReadableStreamDefaultReader(stream); + + if (resourceBacking) { + // fast path, read whole body in a single op call + try { + readableStreamDisturb(stream); + const buf = await core.opAsync("op_read_all", resourceBacking.rid); + readableStreamThrowIfErrored(stream); + readableStreamClose(stream); + return buf; + } catch (err) { + readableStreamThrowIfErrored(stream); + readableStreamError(stream, err); + throw err; + } finally { + if (resourceBacking.autoClose) { + core.tryClose(resourceBacking.rid); + } + } + } + + // slow path + /** @type {Uint8Array[]} */ + const chunks = []; + let totalLength = 0; + while (true) { + const { value: chunk, done } = await reader.read(); + if (done) break; + + ArrayPrototypePush(chunks, chunk); + totalLength += chunk.byteLength; + } + + const finalBuffer = new Uint8Array(totalLength); + let i = 0; + for (const chunk of chunks) { + TypedArrayPrototypeSet(finalBuffer, chunk, i); + i += chunk.byteLength; + } + return finalBuffer; + } + + /* + * @param {ReadableStream} stream + */ + function readableStreamThrowIfErrored(stream) { + if (stream[_state] === "errored") { + throw stream[_storedError]; + } + } + /** * @param {unknown} value * @returns {value is WritableStream} @@ -5982,6 +6051,7 @@ createProxy, writableStreamClose, readableStreamClose, + readableStreamCollectIntoUint8Array, readableStreamDisturb, readableStreamForRid, readableStreamForRidUnrefable, |