summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/fetch/22_body.js48
-rw-r--r--ext/fetch/26_fetch.js57
-rw-r--r--ext/fetch/lib.rs6
-rw-r--r--ext/http/lib.rs9
-rw-r--r--ext/web/06_streams.js76
5 files changed, 99 insertions, 97 deletions
diff --git a/ext/fetch/22_body.js b/ext/fetch/22_body.js
index 6e9a57447..429b56ae1 100644
--- a/ext/fetch/22_body.js
+++ b/ext/fetch/22_body.js
@@ -30,19 +30,18 @@
errorReadableStream,
readableStreamClose,
readableStreamDisturb,
+ readableStreamCollectIntoUint8Array,
createProxy,
ReadableStreamPrototype,
} = globalThis.__bootstrap.streams;
const {
ArrayBufferPrototype,
ArrayBufferIsView,
- ArrayPrototypePush,
ArrayPrototypeMap,
JSONParse,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
PromiseResolve,
- TypedArrayPrototypeSet,
TypedArrayPrototypeSlice,
TypeError,
Uint8Array,
@@ -66,12 +65,10 @@
}
class InnerBody {
- #knownExactLength = null;
-
/**
* @param {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} stream
*/
- constructor(stream, knownExactLength) {
+ constructor(stream) {
/** @type {ReadableStream<Uint8Array> | { body: Uint8Array | string, consumed: boolean }} */
this.streamOrStatic = stream ??
{ body: new Uint8Array(), consumed: false };
@@ -79,8 +76,6 @@
this.source = null;
/** @type {null | number} */
this.length = null;
-
- this.#knownExactLength = knownExactLength;
}
get stream() {
@@ -144,7 +139,7 @@
* https://fetch.spec.whatwg.org/#concept-body-consume-body
* @returns {Promise<Uint8Array>}
*/
- async consume() {
+ consume() {
if (this.unusable()) throw new TypeError("Body already consumed.");
if (
ObjectPrototypeIsPrototypeOf(
@@ -152,40 +147,7 @@
this.streamOrStatic,
)
) {
- const reader = this.stream.getReader();
- /** @type {Uint8Array[]} */
- const chunks = [];
-
- let finalBuffer = this.#knownExactLength
- ? new Uint8Array(this.#knownExactLength)
- : null;
-
- let totalLength = 0;
- while (true) {
- const { value: chunk, done } = await reader.read();
- if (done) break;
-
- if (finalBuffer) {
- // fast path, content-length is present
- TypedArrayPrototypeSet(finalBuffer, chunk, totalLength);
- } else {
- // slow path, content-length is not present
- ArrayPrototypePush(chunks, chunk);
- }
- totalLength += chunk.byteLength;
- }
-
- if (finalBuffer) {
- return finalBuffer;
- }
-
- finalBuffer = new Uint8Array(totalLength);
- let i = 0;
- for (const chunk of chunks) {
- TypedArrayPrototypeSet(finalBuffer, chunk, i);
- i += chunk.byteLength;
- }
- return finalBuffer;
+ return readableStreamCollectIntoUint8Array(this.stream);
} else {
this.streamOrStatic.consumed = true;
return this.streamOrStatic.body;
@@ -224,7 +186,7 @@
clone() {
const [out1, out2] = this.stream.tee();
this.streamOrStatic = out1;
- const second = new InnerBody(out2, this.#knownExactLength);
+ const second = new InnerBody(out2);
second.source = core.deserialize(core.serialize(this.source));
second.length = this.length;
return second;
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js
index 3e90429ce..169db2bbf 100644
--- a/ext/fetch/26_fetch.js
+++ b/ext/fetch/26_fetch.js
@@ -17,7 +17,7 @@
const webidl = window.__bootstrap.webidl;
const { byteLowerCase } = window.__bootstrap.infra;
const { BlobPrototype } = window.__bootstrap.file;
- const { errorReadableStream, ReadableStreamPrototype } =
+ const { errorReadableStream, ReadableStreamPrototype, readableStreamForRid } =
window.__bootstrap.streams;
const { InnerBody, extractBody } = window.__bootstrap.fetchBody;
const {
@@ -44,7 +44,6 @@
String,
StringPrototypeStartsWith,
StringPrototypeToLowerCase,
- TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
Uint8ArrayPrototype,
@@ -89,65 +88,22 @@
return core.opAsync("op_fetch_send", rid);
}
- // A finalization registry to clean up underlying fetch resources that are GC'ed.
- const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => {
- core.tryClose(rid);
- });
-
/**
* @param {number} responseBodyRid
* @param {AbortSignal} [terminator]
* @returns {ReadableStream<Uint8Array>}
*/
function createResponseBodyStream(responseBodyRid, terminator) {
+ const readable = readableStreamForRid(responseBodyRid);
+
function onAbort() {
- if (readable) {
- errorReadableStream(readable, terminator.reason);
- }
+ errorReadableStream(readable, terminator.reason);
core.tryClose(responseBodyRid);
}
+
// TODO(lucacasonato): clean up registration
terminator[abortSignal.add](onAbort);
- const readable = new ReadableStream({
- type: "bytes",
- async pull(controller) {
- try {
- // This is the largest possible size for a single packet on a TLS
- // stream.
- const chunk = new Uint8Array(16 * 1024 + 256);
- // TODO(@AaronO): switch to handle nulls if that's moved to core
- const read = await core.read(
- responseBodyRid,
- chunk,
- );
- if (read > 0) {
- // We read some data. Enqueue it onto the stream.
- controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
- } else {
- RESOURCE_REGISTRY.unregister(readable);
- // We have reached the end of the body, so we close the stream.
- controller.close();
- core.tryClose(responseBodyRid);
- }
- } catch (err) {
- RESOURCE_REGISTRY.unregister(readable);
- if (terminator.aborted) {
- controller.error(terminator.reason);
- } else {
- // There was an error while reading a chunk of the body, so we
- // error.
- controller.error(err);
- }
- core.tryClose(responseBodyRid);
- }
- },
- cancel() {
- if (!terminator.aborted) {
- terminator[abortSignal.signalAbort]();
- }
- },
- });
- RESOURCE_REGISTRY.register(readable, responseBodyRid, readable);
+
return readable;
}
@@ -338,7 +294,6 @@
} else {
response.body = new InnerBody(
createResponseBodyStream(resp.responseRid, terminator),
- resp.contentLength,
);
}
}
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index a7daaa63a..0adc32343 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -408,6 +408,7 @@ pub async fn op_fetch_send(
.add(FetchResponseBodyResource {
reader: AsyncRefCell::new(stream_reader),
cancel: CancelHandle::default(),
+ size: content_length,
});
Ok(FetchResponse {
@@ -479,6 +480,7 @@ type BytesStream =
struct FetchResponseBodyResource {
reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
cancel: CancelHandle,
+ size: Option<u64>,
}
impl Resource for FetchResponseBodyResource {
@@ -498,6 +500,10 @@ impl Resource for FetchResponseBodyResource {
})
}
+ fn size_hint(&self) -> (u64, Option<u64>) {
+ (0, self.size)
+ }
+
fn close(self: Rc<Self>) {
self.cancel.cancel()
}
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index bffe3c3d5..a8c2810bc 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -39,6 +39,8 @@ use flate2::write::GzEncoder;
use flate2::Compression;
use fly_accept_encoding::Encoding;
use hyper::body::Bytes;
+use hyper::body::HttpBody;
+use hyper::body::SizeHint;
use hyper::header::HeaderName;
use hyper::header::HeaderValue;
use hyper::server::conn::Http;
@@ -309,6 +311,7 @@ pub struct HttpStreamResource {
wr: AsyncRefCell<HttpResponseWriter>,
accept_encoding: Encoding,
cancel_handle: CancelHandle,
+ size: SizeHint,
}
impl HttpStreamResource {
@@ -318,11 +321,13 @@ impl HttpStreamResource {
response_tx: oneshot::Sender<Response<Body>>,
accept_encoding: Encoding,
) -> Self {
+ let size = request.body().size_hint();
Self {
conn: conn.clone(),
rd: HttpRequestReader::Headers(request).into(),
wr: HttpResponseWriter::Headers(response_tx).into(),
accept_encoding,
+ size,
cancel_handle: CancelHandle::new(),
}
}
@@ -388,6 +393,10 @@ impl Resource for HttpStreamResource {
fn close(self: Rc<Self>) {
self.cancel_handle.cancel();
}
+
+ fn size_hint(&self) -> (u64, Option<u64>) {
+ (self.size.lower(), self.size.upper())
+ }
}
/// The read half of an HTTP stream.
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 412c58c3c..ba422b71d 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -48,6 +48,7 @@
SymbolAsyncIterator,
SymbolFor,
TypeError,
+ TypedArrayPrototypeSet,
Uint8Array,
Uint8ArrayPrototype,
Uint16ArrayPrototype,
@@ -647,6 +648,10 @@
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
+ // A finalization registry to clean up underlying resources that are GC'ed.
+ const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => {
+ core.tryClose(rid);
+ });
/**
* Create a new ReadableStream object that is backed by a Resource that
* implements `Resource::read_return`. This object contains enough metadata to
@@ -660,6 +665,17 @@
function readableStreamForRid(rid, autoClose = true) {
const stream = webidl.createBranded(ReadableStream);
stream[_resourceBacking] = { rid, autoClose };
+
+ const tryClose = () => {
+ if (!autoClose) return;
+ RESOURCE_REGISTRY.unregister(stream);
+ core.tryClose(rid);
+ };
+
+ if (autoClose) {
+ RESOURCE_REGISTRY.register(stream, rid, stream);
+ }
+
const underlyingSource = {
type: "bytes",
async pull(controller) {
@@ -667,7 +683,7 @@
try {
const bytesRead = await core.read(rid, v);
if (bytesRead === 0) {
- if (autoClose) core.tryClose(rid);
+ tryClose();
controller.close();
controller.byobRequest.respond(0);
} else {
@@ -675,11 +691,11 @@
}
} catch (e) {
controller.error(e);
- if (autoClose) core.tryClose(rid);
+ tryClose();
}
},
cancel() {
- if (autoClose) core.tryClose(rid);
+ tryClose();
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
};
@@ -766,6 +782,59 @@
return stream[_resourceBacking];
}
+ async function readableStreamCollectIntoUint8Array(stream) {
+ const resourceBacking = getReadableStreamResourceBacking(stream);
+ const reader = acquireReadableStreamDefaultReader(stream);
+
+ if (resourceBacking) {
+ // fast path, read whole body in a single op call
+ try {
+ readableStreamDisturb(stream);
+ const buf = await core.opAsync("op_read_all", resourceBacking.rid);
+ readableStreamThrowIfErrored(stream);
+ readableStreamClose(stream);
+ return buf;
+ } catch (err) {
+ readableStreamThrowIfErrored(stream);
+ readableStreamError(stream, err);
+ throw err;
+ } finally {
+ if (resourceBacking.autoClose) {
+ core.tryClose(resourceBacking.rid);
+ }
+ }
+ }
+
+ // slow path
+ /** @type {Uint8Array[]} */
+ const chunks = [];
+ let totalLength = 0;
+ while (true) {
+ const { value: chunk, done } = await reader.read();
+ if (done) break;
+
+ ArrayPrototypePush(chunks, chunk);
+ totalLength += chunk.byteLength;
+ }
+
+ const finalBuffer = new Uint8Array(totalLength);
+ let i = 0;
+ for (const chunk of chunks) {
+ TypedArrayPrototypeSet(finalBuffer, chunk, i);
+ i += chunk.byteLength;
+ }
+ return finalBuffer;
+ }
+
+ /*
+ * @param {ReadableStream} stream
+ */
+ function readableStreamThrowIfErrored(stream) {
+ if (stream[_state] === "errored") {
+ throw stream[_storedError];
+ }
+ }
+
/**
* @param {unknown} value
* @returns {value is WritableStream}
@@ -5982,6 +6051,7 @@
createProxy,
writableStreamClose,
readableStreamClose,
+ readableStreamCollectIntoUint8Array,
readableStreamDisturb,
readableStreamForRid,
readableStreamForRidUnrefable,