summaryrefslogtreecommitdiff
path: root/ext/web/06_streams.js
diff options
context:
space:
mode:
authorMarcos Casagrande <marcoscvp90@gmail.com>2022-10-04 15:48:50 +0200
committerGitHub <noreply@github.com>2022-10-04 15:48:50 +0200
commit569287b15b6482a39f2c816f103574c3b35351f8 (patch)
treeff8433fc87613e3016ff7a188ee34aa3fc7d81c4 /ext/web/06_streams.js
parent0b4a6c4d084df54e827bc7767ce8653e06c45e93 (diff)
perf(ext/fetch): consume body using ops (#16038)
This commit adds a fast path to `Request` and `Response` that make consuming request bodies much faster when using `Body#text`, `Body#arrayBuffer`, and `Body#blob`, if the body is a FastStream. Because the response bodies for `fetch` are FastStream, this speeds up consuming `fetch` response bodies significantly.
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r--ext/web/06_streams.js76
1 files changed, 73 insertions, 3 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 412c58c3c..ba422b71d 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -48,6 +48,7 @@
SymbolAsyncIterator,
SymbolFor,
TypeError,
+ TypedArrayPrototypeSet,
Uint8Array,
Uint8ArrayPrototype,
Uint16ArrayPrototype,
@@ -647,6 +648,10 @@
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
+ // A finalization registry to clean up underlying resources that are GC'ed.
+ const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => {
+ core.tryClose(rid);
+ });
/**
* Create a new ReadableStream object that is backed by a Resource that
* implements `Resource::read_return`. This object contains enough metadata to
@@ -660,6 +665,17 @@
function readableStreamForRid(rid, autoClose = true) {
const stream = webidl.createBranded(ReadableStream);
stream[_resourceBacking] = { rid, autoClose };
+
+ const tryClose = () => {
+ if (!autoClose) return;
+ RESOURCE_REGISTRY.unregister(stream);
+ core.tryClose(rid);
+ };
+
+ if (autoClose) {
+ RESOURCE_REGISTRY.register(stream, rid, stream);
+ }
+
const underlyingSource = {
type: "bytes",
async pull(controller) {
@@ -667,7 +683,7 @@
try {
const bytesRead = await core.read(rid, v);
if (bytesRead === 0) {
- if (autoClose) core.tryClose(rid);
+ tryClose();
controller.close();
controller.byobRequest.respond(0);
} else {
@@ -675,11 +691,11 @@
}
} catch (e) {
controller.error(e);
- if (autoClose) core.tryClose(rid);
+ tryClose();
}
},
cancel() {
- if (autoClose) core.tryClose(rid);
+ tryClose();
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
};
@@ -766,6 +782,59 @@
return stream[_resourceBacking];
}
+ async function readableStreamCollectIntoUint8Array(stream) {
+ const resourceBacking = getReadableStreamResourceBacking(stream);
+ const reader = acquireReadableStreamDefaultReader(stream);
+
+ if (resourceBacking) {
+ // fast path, read whole body in a single op call
+ try {
+ readableStreamDisturb(stream);
+ const buf = await core.opAsync("op_read_all", resourceBacking.rid);
+ readableStreamThrowIfErrored(stream);
+ readableStreamClose(stream);
+ return buf;
+ } catch (err) {
+ readableStreamThrowIfErrored(stream);
+ readableStreamError(stream, err);
+ throw err;
+ } finally {
+ if (resourceBacking.autoClose) {
+ core.tryClose(resourceBacking.rid);
+ }
+ }
+ }
+
+ // slow path
+ /** @type {Uint8Array[]} */
+ const chunks = [];
+ let totalLength = 0;
+ while (true) {
+ const { value: chunk, done } = await reader.read();
+ if (done) break;
+
+ ArrayPrototypePush(chunks, chunk);
+ totalLength += chunk.byteLength;
+ }
+
+ const finalBuffer = new Uint8Array(totalLength);
+ let i = 0;
+ for (const chunk of chunks) {
+ TypedArrayPrototypeSet(finalBuffer, chunk, i);
+ i += chunk.byteLength;
+ }
+ return finalBuffer;
+ }
+
+ /*
+ * @param {ReadableStream} stream
+ */
+ function readableStreamThrowIfErrored(stream) {
+ if (stream[_state] === "errored") {
+ throw stream[_storedError];
+ }
+ }
+
/**
* @param {unknown} value
* @returns {value is WritableStream}
@@ -5982,6 +6051,7 @@
createProxy,
writableStreamClose,
readableStreamClose,
+ readableStreamCollectIntoUint8Array,
readableStreamDisturb,
readableStreamForRid,
readableStreamForRidUnrefable,