summaryrefslogtreecommitdiff
path: root/ext/fetch/26_fetch.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/fetch/26_fetch.js')
-rw-r--r--ext/fetch/26_fetch.js57
1 files changed, 6 insertions, 51 deletions
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js
index 3e90429ce..169db2bbf 100644
--- a/ext/fetch/26_fetch.js
+++ b/ext/fetch/26_fetch.js
@@ -17,7 +17,7 @@
const webidl = window.__bootstrap.webidl;
const { byteLowerCase } = window.__bootstrap.infra;
const { BlobPrototype } = window.__bootstrap.file;
- const { errorReadableStream, ReadableStreamPrototype } =
+ const { errorReadableStream, ReadableStreamPrototype, readableStreamForRid } =
window.__bootstrap.streams;
const { InnerBody, extractBody } = window.__bootstrap.fetchBody;
const {
@@ -44,7 +44,6 @@
String,
StringPrototypeStartsWith,
StringPrototypeToLowerCase,
- TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
Uint8ArrayPrototype,
@@ -89,65 +88,22 @@
return core.opAsync("op_fetch_send", rid);
}
- // A finalization registry to clean up underlying fetch resources that are GC'ed.
- const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => {
- core.tryClose(rid);
- });
-
/**
* @param {number} responseBodyRid
* @param {AbortSignal} [terminator]
* @returns {ReadableStream<Uint8Array>}
*/
function createResponseBodyStream(responseBodyRid, terminator) {
+ const readable = readableStreamForRid(responseBodyRid);
+
function onAbort() {
- if (readable) {
- errorReadableStream(readable, terminator.reason);
- }
+ errorReadableStream(readable, terminator.reason);
core.tryClose(responseBodyRid);
}
+
// TODO(lucacasonato): clean up registration
terminator[abortSignal.add](onAbort);
- const readable = new ReadableStream({
- type: "bytes",
- async pull(controller) {
- try {
- // This is the largest possible size for a single packet on a TLS
- // stream.
- const chunk = new Uint8Array(16 * 1024 + 256);
- // TODO(@AaronO): switch to handle nulls if that's moved to core
- const read = await core.read(
- responseBodyRid,
- chunk,
- );
- if (read > 0) {
- // We read some data. Enqueue it onto the stream.
- controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
- } else {
- RESOURCE_REGISTRY.unregister(readable);
- // We have reached the end of the body, so we close the stream.
- controller.close();
- core.tryClose(responseBodyRid);
- }
- } catch (err) {
- RESOURCE_REGISTRY.unregister(readable);
- if (terminator.aborted) {
- controller.error(terminator.reason);
- } else {
- // There was an error while reading a chunk of the body, so we
- // error.
- controller.error(err);
- }
- core.tryClose(responseBodyRid);
- }
- },
- cancel() {
- if (!terminator.aborted) {
- terminator[abortSignal.signalAbort]();
- }
- },
- });
- RESOURCE_REGISTRY.register(readable, responseBodyRid, readable);
+
return readable;
}
@@ -338,7 +294,6 @@
} else {
response.body = new InnerBody(
createResponseBodyStream(resp.responseRid, terminator),
- resp.contentLength,
);
}
}