diff options
Diffstat (limited to 'ext/fetch')
-rw-r--r-- | ext/fetch/22_body.js | 48 | ||||
-rw-r--r-- | ext/fetch/26_fetch.js | 57 | ||||
-rw-r--r-- | ext/fetch/lib.rs | 6 |
3 files changed, 17 insertions, 94 deletions
diff --git a/ext/fetch/22_body.js b/ext/fetch/22_body.js index 6e9a57447..429b56ae1 100644 --- a/ext/fetch/22_body.js +++ b/ext/fetch/22_body.js @@ -30,19 +30,18 @@ errorReadableStream, readableStreamClose, readableStreamDisturb, + readableStreamCollectIntoUint8Array, createProxy, ReadableStreamPrototype, } = globalThis.__bootstrap.streams; const { ArrayBufferPrototype, ArrayBufferIsView, - ArrayPrototypePush, ArrayPrototypeMap, JSONParse, ObjectDefineProperties, ObjectPrototypeIsPrototypeOf, PromiseResolve, - TypedArrayPrototypeSet, TypedArrayPrototypeSlice, TypeError, Uint8Array, @@ -66,12 +65,10 @@ } class InnerBody { - #knownExactLength = null; - /** * @param {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} stream */ - constructor(stream, knownExactLength) { + constructor(stream) { /** @type {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} */ this.streamOrStatic = stream ?? { body: new Uint8Array(), consumed: false }; @@ -79,8 +76,6 @@ this.source = null; /** @type {null | number} */ this.length = null; - - this.#knownExactLength = knownExactLength; } get stream() { @@ -144,7 +139,7 @@ * https://fetch.spec.whatwg.org/#concept-body-consume-body * @returns {Promise<Uint8Array>} */ - async consume() { + consume() { if (this.unusable()) throw new TypeError("Body already consumed."); if ( ObjectPrototypeIsPrototypeOf( @@ -152,40 +147,7 @@ this.streamOrStatic, ) ) { - const reader = this.stream.getReader(); - /** @type {Uint8Array[]} */ - const chunks = []; - - let finalBuffer = this.#knownExactLength - ? new Uint8Array(this.#knownExactLength) - : null; - - let totalLength = 0; - while (true) { - const { value: chunk, done } = await reader.read(); - if (done) break; - - if (finalBuffer) { - // fast path, content-length is present - TypedArrayPrototypeSet(finalBuffer, chunk, totalLength); - } else { - // slow path, content-length is not present - ArrayPrototypePush(chunks, chunk); - } - totalLength += chunk.byteLength; - } - - if (finalBuffer) { - return finalBuffer; - } - - finalBuffer = new Uint8Array(totalLength); - let i = 0; - for (const chunk of chunks) { - TypedArrayPrototypeSet(finalBuffer, chunk, i); - i += chunk.byteLength; - } - return finalBuffer; + return readableStreamCollectIntoUint8Array(this.stream); } else { this.streamOrStatic.consumed = true; return this.streamOrStatic.body; @@ -224,7 +186,7 @@ clone() { const [out1, out2] = this.stream.tee(); this.streamOrStatic = out1; - const second = new InnerBody(out2, this.#knownExactLength); + const second = new InnerBody(out2); second.source = core.deserialize(core.serialize(this.source)); second.length = this.length; return second; diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 3e90429ce..169db2bbf 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -17,7 +17,7 @@ const webidl = window.__bootstrap.webidl; const { byteLowerCase } = window.__bootstrap.infra; const { BlobPrototype } = window.__bootstrap.file; - const { errorReadableStream, ReadableStreamPrototype } = + const { errorReadableStream, ReadableStreamPrototype, readableStreamForRid } = window.__bootstrap.streams; const { InnerBody, extractBody } = window.__bootstrap.fetchBody; const { @@ -44,7 +44,6 @@ String, StringPrototypeStartsWith, StringPrototypeToLowerCase, - TypedArrayPrototypeSubarray, TypeError, Uint8Array, Uint8ArrayPrototype, @@ -89,65 +88,22 @@ return core.opAsync("op_fetch_send", rid); } - // A finalization registry to clean up underlying fetch resources that are GC'ed. - const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => { - core.tryClose(rid); - }); - /** * @param {number} responseBodyRid * @param {AbortSignal} [terminator] * @returns {ReadableStream<Uint8Array>} */ function createResponseBodyStream(responseBodyRid, terminator) { + const readable = readableStreamForRid(responseBodyRid); + function onAbort() { - if (readable) { - errorReadableStream(readable, terminator.reason); - } + errorReadableStream(readable, terminator.reason); core.tryClose(responseBodyRid); } + // TODO(lucacasonato): clean up registration terminator[abortSignal.add](onAbort); - const readable = new ReadableStream({ - type: "bytes", - async pull(controller) { - try { - // This is the largest possible size for a single packet on a TLS - // stream. - const chunk = new Uint8Array(16 * 1024 + 256); - // TODO(@AaronO): switch to handle nulls if that's moved to core - const read = await core.read( - responseBodyRid, - chunk, - ); - if (read > 0) { - // We read some data. Enqueue it onto the stream. - controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); - } else { - RESOURCE_REGISTRY.unregister(readable); - // We have reached the end of the body, so we close the stream. - controller.close(); - core.tryClose(responseBodyRid); - } - } catch (err) { - RESOURCE_REGISTRY.unregister(readable); - if (terminator.aborted) { - controller.error(terminator.reason); - } else { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - } - core.tryClose(responseBodyRid); - } - }, - cancel() { - if (!terminator.aborted) { - terminator[abortSignal.signalAbort](); - } - }, - }); - RESOURCE_REGISTRY.register(readable, responseBodyRid, readable); + return readable; } @@ -338,7 +294,6 @@ } else { response.body = new InnerBody( createResponseBodyStream(resp.responseRid, terminator), - resp.contentLength, ); } } diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index a7daaa63a..0adc32343 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -408,6 +408,7 @@ pub async fn op_fetch_send( .add(FetchResponseBodyResource { reader: AsyncRefCell::new(stream_reader), cancel: CancelHandle::default(), + size: content_length, }); Ok(FetchResponse { @@ -479,6 +480,7 @@ type BytesStream = struct FetchResponseBodyResource { reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, cancel: CancelHandle, + size: Option<u64>, } impl Resource for FetchResponseBodyResource { @@ -498,6 +500,10 @@ impl Resource for FetchResponseBodyResource { }) } + fn size_hint(&self) -> (u64, Option<u64>) { + (0, self.size) + } + fn close(self: Rc<Self>) { self.cancel.cancel() } |