diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/fetch/22_body.js | 48 | ||||
-rw-r--r-- | ext/fetch/26_fetch.js | 57 | ||||
-rw-r--r-- | ext/fetch/lib.rs | 6 | ||||
-rw-r--r-- | ext/http/lib.rs | 9 | ||||
-rw-r--r-- | ext/web/06_streams.js | 76 |
5 files changed, 99 insertions, 97 deletions
diff --git a/ext/fetch/22_body.js b/ext/fetch/22_body.js index 6e9a57447..429b56ae1 100644 --- a/ext/fetch/22_body.js +++ b/ext/fetch/22_body.js @@ -30,19 +30,18 @@ errorReadableStream, readableStreamClose, readableStreamDisturb, + readableStreamCollectIntoUint8Array, createProxy, ReadableStreamPrototype, } = globalThis.__bootstrap.streams; const { ArrayBufferPrototype, ArrayBufferIsView, - ArrayPrototypePush, ArrayPrototypeMap, JSONParse, ObjectDefineProperties, ObjectPrototypeIsPrototypeOf, PromiseResolve, - TypedArrayPrototypeSet, TypedArrayPrototypeSlice, TypeError, Uint8Array, @@ -66,12 +65,10 @@ } class InnerBody { - #knownExactLength = null; - /** * @param {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} stream */ - constructor(stream, knownExactLength) { + constructor(stream) { /** @type {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} */ this.streamOrStatic = stream ?? { body: new Uint8Array(), consumed: false }; @@ -79,8 +76,6 @@ this.source = null; /** @type {null | number} */ this.length = null; - - this.#knownExactLength = knownExactLength; } get stream() { @@ -144,7 +139,7 @@ * https://fetch.spec.whatwg.org/#concept-body-consume-body * @returns {Promise<Uint8Array>} */ - async consume() { + consume() { if (this.unusable()) throw new TypeError("Body already consumed."); if ( ObjectPrototypeIsPrototypeOf( @@ -152,40 +147,7 @@ this.streamOrStatic, ) ) { - const reader = this.stream.getReader(); - /** @type {Uint8Array[]} */ - const chunks = []; - - let finalBuffer = this.#knownExactLength - ? new Uint8Array(this.#knownExactLength) - : null; - - let totalLength = 0; - while (true) { - const { value: chunk, done } = await reader.read(); - if (done) break; - - if (finalBuffer) { - // fast path, content-length is present - TypedArrayPrototypeSet(finalBuffer, chunk, totalLength); - } else { - // slow path, content-length is not present - ArrayPrototypePush(chunks, chunk); - } - totalLength += chunk.byteLength; - } - - if (finalBuffer) { - return finalBuffer; - } - - finalBuffer = new Uint8Array(totalLength); - let i = 0; - for (const chunk of chunks) { - TypedArrayPrototypeSet(finalBuffer, chunk, i); - i += chunk.byteLength; - } - return finalBuffer; + return readableStreamCollectIntoUint8Array(this.stream); } else { this.streamOrStatic.consumed = true; return this.streamOrStatic.body; @@ -224,7 +186,7 @@ clone() { const [out1, out2] = this.stream.tee(); this.streamOrStatic = out1; - const second = new InnerBody(out2, this.#knownExactLength); + const second = new InnerBody(out2); second.source = core.deserialize(core.serialize(this.source)); second.length = this.length; return second; diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 3e90429ce..169db2bbf 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -17,7 +17,7 @@ const webidl = window.__bootstrap.webidl; const { byteLowerCase } = window.__bootstrap.infra; const { BlobPrototype } = window.__bootstrap.file; - const { errorReadableStream, ReadableStreamPrototype } = + const { errorReadableStream, ReadableStreamPrototype, readableStreamForRid } = window.__bootstrap.streams; const { InnerBody, extractBody } = window.__bootstrap.fetchBody; const { @@ -44,7 +44,6 @@ String, StringPrototypeStartsWith, StringPrototypeToLowerCase, - TypedArrayPrototypeSubarray, TypeError, Uint8Array, Uint8ArrayPrototype, @@ -89,65 +88,22 @@ return core.opAsync("op_fetch_send", rid); } - // A finalization registry to clean up underlying fetch resources that are GC'ed. - const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => { - core.tryClose(rid); - }); - /** * @param {number} responseBodyRid * @param {AbortSignal} [terminator] * @returns {ReadableStream<Uint8Array>} */ function createResponseBodyStream(responseBodyRid, terminator) { + const readable = readableStreamForRid(responseBodyRid); + function onAbort() { - if (readable) { - errorReadableStream(readable, terminator.reason); - } + errorReadableStream(readable, terminator.reason); core.tryClose(responseBodyRid); } + // TODO(lucacasonato): clean up registration terminator[abortSignal.add](onAbort); - const readable = new ReadableStream({ - type: "bytes", - async pull(controller) { - try { - // This is the largest possible size for a single packet on a TLS - // stream. - const chunk = new Uint8Array(16 * 1024 + 256); - // TODO(@AaronO): switch to handle nulls if that's moved to core - const read = await core.read( - responseBodyRid, - chunk, - ); - if (read > 0) { - // We read some data. Enqueue it onto the stream. - controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); - } else { - RESOURCE_REGISTRY.unregister(readable); - // We have reached the end of the body, so we close the stream. - controller.close(); - core.tryClose(responseBodyRid); - } - } catch (err) { - RESOURCE_REGISTRY.unregister(readable); - if (terminator.aborted) { - controller.error(terminator.reason); - } else { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - } - core.tryClose(responseBodyRid); - } - }, - cancel() { - if (!terminator.aborted) { - terminator[abortSignal.signalAbort](); - } - }, - }); - RESOURCE_REGISTRY.register(readable, responseBodyRid, readable); + return readable; } @@ -338,7 +294,6 @@ } else { response.body = new InnerBody( createResponseBodyStream(resp.responseRid, terminator), - resp.contentLength, ); } } diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index a7daaa63a..0adc32343 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -408,6 +408,7 @@ pub async fn op_fetch_send( .add(FetchResponseBodyResource { reader: AsyncRefCell::new(stream_reader), cancel: CancelHandle::default(), + size: content_length, }); Ok(FetchResponse { @@ -479,6 +480,7 @@ type BytesStream = struct FetchResponseBodyResource { reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, cancel: CancelHandle, + size: Option<u64>, } impl Resource for FetchResponseBodyResource { @@ -498,6 +500,10 @@ impl Resource for FetchResponseBodyResource { }) } + fn size_hint(&self) -> (u64, Option<u64>) { + (0, self.size) + } + fn close(self: Rc<Self>) { self.cancel.cancel() } diff --git a/ext/http/lib.rs b/ext/http/lib.rs index bffe3c3d5..a8c2810bc 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -39,6 +39,8 @@ use flate2::write::GzEncoder; use flate2::Compression; use fly_accept_encoding::Encoding; use hyper::body::Bytes; +use hyper::body::HttpBody; +use hyper::body::SizeHint; use hyper::header::HeaderName; use hyper::header::HeaderValue; use hyper::server::conn::Http; @@ -309,6 +311,7 @@ pub struct HttpStreamResource { wr: AsyncRefCell<HttpResponseWriter>, accept_encoding: Encoding, cancel_handle: CancelHandle, + size: SizeHint, } impl HttpStreamResource { @@ -318,11 +321,13 @@ impl HttpStreamResource { response_tx: oneshot::Sender<Response<Body>>, accept_encoding: Encoding, ) -> Self { + let size = request.body().size_hint(); Self { conn: conn.clone(), rd: HttpRequestReader::Headers(request).into(), wr: HttpResponseWriter::Headers(response_tx).into(), accept_encoding, + size, cancel_handle: CancelHandle::new(), } } @@ -388,6 +393,10 @@ impl Resource for HttpStreamResource { fn close(self: Rc<Self>) { self.cancel_handle.cancel(); } + + fn size_hint(&self) -> (u64, Option<u64>) { + (self.size.lower(), self.size.upper()) + } } /// The read half of an HTTP stream. diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 412c58c3c..ba422b71d 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -48,6 +48,7 @@ SymbolAsyncIterator, SymbolFor, TypeError, + TypedArrayPrototypeSet, Uint8Array, Uint8ArrayPrototype, Uint16ArrayPrototype, @@ -647,6 +648,10 @@ const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB + // A finalization registry to clean up underlying resources that are GC'ed. + const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => { + core.tryClose(rid); + }); /** * Create a new ReadableStream object that is backed by a Resource that * implements `Resource::read_return`. This object contains enough metadata to @@ -660,6 +665,17 @@ function readableStreamForRid(rid, autoClose = true) { const stream = webidl.createBranded(ReadableStream); stream[_resourceBacking] = { rid, autoClose }; + + const tryClose = () => { + if (!autoClose) return; + RESOURCE_REGISTRY.unregister(stream); + core.tryClose(rid); + }; + + if (autoClose) { + RESOURCE_REGISTRY.register(stream, rid, stream); + } + const underlyingSource = { type: "bytes", async pull(controller) { @@ -667,7 +683,7 @@ try { const bytesRead = await core.read(rid, v); if (bytesRead === 0) { - if (autoClose) core.tryClose(rid); + tryClose(); controller.close(); controller.byobRequest.respond(0); } else { @@ -675,11 +691,11 @@ } } catch (e) { controller.error(e); - if (autoClose) core.tryClose(rid); + tryClose(); } }, cancel() { - if (autoClose) core.tryClose(rid); + tryClose(); }, autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, }; @@ -766,6 +782,59 @@ return stream[_resourceBacking]; } + async function readableStreamCollectIntoUint8Array(stream) { + const resourceBacking = getReadableStreamResourceBacking(stream); + const reader = acquireReadableStreamDefaultReader(stream); + + if (resourceBacking) { + // fast path, read whole body in a single op call + try { + readableStreamDisturb(stream); + const buf = await core.opAsync("op_read_all", resourceBacking.rid); + readableStreamThrowIfErrored(stream); + readableStreamClose(stream); + return buf; + } catch (err) { + readableStreamThrowIfErrored(stream); + readableStreamError(stream, err); + throw err; + } finally { + if (resourceBacking.autoClose) { + core.tryClose(resourceBacking.rid); + } + } + } + + // slow path + /** @type {Uint8Array[]} */ + const chunks = []; + let totalLength = 0; + while (true) { + const { value: chunk, done } = await reader.read(); + if (done) break; + + ArrayPrototypePush(chunks, chunk); + totalLength += chunk.byteLength; + } + + const finalBuffer = new Uint8Array(totalLength); + let i = 0; + for (const chunk of chunks) { + TypedArrayPrototypeSet(finalBuffer, chunk, i); + i += chunk.byteLength; + } + return finalBuffer; + } + + /* + * @param {ReadableStream} stream + */ + function readableStreamThrowIfErrored(stream) { + if (stream[_state] === "errored") { + throw stream[_storedError]; + } + } + /** * @param {unknown} value * @returns {value is WritableStream} @@ -5982,6 +6051,7 @@ createProxy, writableStreamClose, readableStreamClose, + readableStreamCollectIntoUint8Array, readableStreamDisturb, readableStreamForRid, readableStreamForRidUnrefable, |