summaryrefslogtreecommitdiff
path: root/ext/fetch
diff options
context:
space:
mode:
Diffstat (limited to 'ext/fetch')
-rw-r--r--ext/fetch/22_body.js48
-rw-r--r--ext/fetch/26_fetch.js57
-rw-r--r--ext/fetch/lib.rs6
3 files changed, 17 insertions, 94 deletions
diff --git a/ext/fetch/22_body.js b/ext/fetch/22_body.js
index 6e9a57447..429b56ae1 100644
--- a/ext/fetch/22_body.js
+++ b/ext/fetch/22_body.js
@@ -30,19 +30,18 @@
errorReadableStream,
readableStreamClose,
readableStreamDisturb,
+ readableStreamCollectIntoUint8Array,
createProxy,
ReadableStreamPrototype,
} = globalThis.__bootstrap.streams;
const {
ArrayBufferPrototype,
ArrayBufferIsView,
- ArrayPrototypePush,
ArrayPrototypeMap,
JSONParse,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
PromiseResolve,
- TypedArrayPrototypeSet,
TypedArrayPrototypeSlice,
TypeError,
Uint8Array,
@@ -66,12 +65,10 @@
}
class InnerBody {
- #knownExactLength = null;
-
/**
* @param {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} stream
*/
- constructor(stream, knownExactLength) {
+ constructor(stream) {
/** @type {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} */
this.streamOrStatic = stream ??
{ body: new Uint8Array(), consumed: false };
@@ -79,8 +76,6 @@
this.source = null;
/** @type {null | number} */
this.length = null;
-
- this.#knownExactLength = knownExactLength;
}
get stream() {
@@ -144,7 +139,7 @@
* https://fetch.spec.whatwg.org/#concept-body-consume-body
* @returns {Promise<Uint8Array>}
*/
- async consume() {
+ consume() {
if (this.unusable()) throw new TypeError("Body already consumed.");
if (
ObjectPrototypeIsPrototypeOf(
@@ -152,40 +147,7 @@
this.streamOrStatic,
)
) {
- const reader = this.stream.getReader();
- /** @type {Uint8Array[]} */
- const chunks = [];
-
- let finalBuffer = this.#knownExactLength
- ? new Uint8Array(this.#knownExactLength)
- : null;
-
- let totalLength = 0;
- while (true) {
- const { value: chunk, done } = await reader.read();
- if (done) break;
-
- if (finalBuffer) {
- // fast path, content-length is present
- TypedArrayPrototypeSet(finalBuffer, chunk, totalLength);
- } else {
- // slow path, content-length is not present
- ArrayPrototypePush(chunks, chunk);
- }
- totalLength += chunk.byteLength;
- }
-
- if (finalBuffer) {
- return finalBuffer;
- }
-
- finalBuffer = new Uint8Array(totalLength);
- let i = 0;
- for (const chunk of chunks) {
- TypedArrayPrototypeSet(finalBuffer, chunk, i);
- i += chunk.byteLength;
- }
- return finalBuffer;
+ return readableStreamCollectIntoUint8Array(this.stream);
} else {
this.streamOrStatic.consumed = true;
return this.streamOrStatic.body;
@@ -224,7 +186,7 @@
clone() {
const [out1, out2] = this.stream.tee();
this.streamOrStatic = out1;
- const second = new InnerBody(out2, this.#knownExactLength);
+ const second = new InnerBody(out2);
second.source = core.deserialize(core.serialize(this.source));
second.length = this.length;
return second;
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js
index 3e90429ce..169db2bbf 100644
--- a/ext/fetch/26_fetch.js
+++ b/ext/fetch/26_fetch.js
@@ -17,7 +17,7 @@
const webidl = window.__bootstrap.webidl;
const { byteLowerCase } = window.__bootstrap.infra;
const { BlobPrototype } = window.__bootstrap.file;
- const { errorReadableStream, ReadableStreamPrototype } =
+ const { errorReadableStream, ReadableStreamPrototype, readableStreamForRid } =
window.__bootstrap.streams;
const { InnerBody, extractBody } = window.__bootstrap.fetchBody;
const {
@@ -44,7 +44,6 @@
String,
StringPrototypeStartsWith,
StringPrototypeToLowerCase,
- TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
Uint8ArrayPrototype,
@@ -89,65 +88,22 @@
return core.opAsync("op_fetch_send", rid);
}
- // A finalization registry to clean up underlying fetch resources that are GC'ed.
- const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => {
- core.tryClose(rid);
- });
-
/**
* @param {number} responseBodyRid
* @param {AbortSignal} [terminator]
* @returns {ReadableStream<Uint8Array>}
*/
function createResponseBodyStream(responseBodyRid, terminator) {
+ const readable = readableStreamForRid(responseBodyRid);
+
function onAbort() {
- if (readable) {
- errorReadableStream(readable, terminator.reason);
- }
+ errorReadableStream(readable, terminator.reason);
core.tryClose(responseBodyRid);
}
+
// TODO(lucacasonato): clean up registration
terminator[abortSignal.add](onAbort);
- const readable = new ReadableStream({
- type: "bytes",
- async pull(controller) {
- try {
- // This is the largest possible size for a single packet on a TLS
- // stream.
- const chunk = new Uint8Array(16 * 1024 + 256);
- // TODO(@AaronO): switch to handle nulls if that's moved to core
- const read = await core.read(
- responseBodyRid,
- chunk,
- );
- if (read > 0) {
- // We read some data. Enqueue it onto the stream.
- controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
- } else {
- RESOURCE_REGISTRY.unregister(readable);
- // We have reached the end of the body, so we close the stream.
- controller.close();
- core.tryClose(responseBodyRid);
- }
- } catch (err) {
- RESOURCE_REGISTRY.unregister(readable);
- if (terminator.aborted) {
- controller.error(terminator.reason);
- } else {
- // There was an error while reading a chunk of the body, so we
- // error.
- controller.error(err);
- }
- core.tryClose(responseBodyRid);
- }
- },
- cancel() {
- if (!terminator.aborted) {
- terminator[abortSignal.signalAbort]();
- }
- },
- });
- RESOURCE_REGISTRY.register(readable, responseBodyRid, readable);
+
return readable;
}
@@ -338,7 +294,6 @@
} else {
response.body = new InnerBody(
createResponseBodyStream(resp.responseRid, terminator),
- resp.contentLength,
);
}
}
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index a7daaa63a..0adc32343 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -408,6 +408,7 @@ pub async fn op_fetch_send(
.add(FetchResponseBodyResource {
reader: AsyncRefCell::new(stream_reader),
cancel: CancelHandle::default(),
+ size: content_length,
});
Ok(FetchResponse {
@@ -479,6 +480,7 @@ type BytesStream =
struct FetchResponseBodyResource {
reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
cancel: CancelHandle,
+ size: Option<u64>,
}
impl Resource for FetchResponseBodyResource {
@@ -498,6 +500,10 @@ impl Resource for FetchResponseBodyResource {
})
}
+ fn size_hint(&self) -> (u64, Option<u64>) {
+ (0, self.size)
+ }
+
fn close(self: Rc<Self>) {
self.cancel.cancel()
}