summaryrefslogtreecommitdiff
path: root/op_crates/fetch/26_fetch.js
diff options
context:
space:
mode:
authorLuca Casonato <lucacasonato@yahoo.com>2021-01-10 20:54:29 +0100
committerGitHub <noreply@github.com>2021-01-10 20:54:29 +0100
commit1a6ce29f3d11e5f0d0d022914e3f9fbcfa487294 (patch)
tree9e1940a9a7a7392e6818341eea67becfa26ebbfa /op_crates/fetch/26_fetch.js
parent2c1f74402c00a2975cdaf9199b6487e5fd8175ba (diff)
feat(fetch): req streaming + 0-copy resp streaming (#9036)
* feat(fetch): req streaming + 0-copy resp streaming * lint * lint * fix test * rm test.js * explicitly use CancelHandle::default() * Apply review suggestions Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl> * fix test * Merge remote-tracking branch 'origin/master' into fetch_real_streaming * fix test * retrigger ci Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl>
Diffstat (limited to 'op_crates/fetch/26_fetch.js')
-rw-r--r--op_crates/fetch/26_fetch.js88
1 files changed, 62 insertions, 26 deletions
diff --git a/op_crates/fetch/26_fetch.js b/op_crates/fetch/26_fetch.js
index 0d405d4ec..2d50f1142 100644
--- a/op_crates/fetch/26_fetch.js
+++ b/op_crates/fetch/26_fetch.js
@@ -897,8 +897,20 @@
if (body != null) {
zeroCopy = new Uint8Array(body.buffer, body.byteOffset, body.byteLength);
}
+ return core.jsonOpSync("op_fetch", args, ...(zeroCopy ? [zeroCopy] : []));
+ }
- return core.jsonOpAsync("op_fetch", args, ...(zeroCopy ? [zeroCopy] : []));
+ function opFetchSend(args) {
+ return core.jsonOpAsync("op_fetch_send", args);
+ }
+
+ function opFetchRequestWrite(args, body) {
+ const zeroCopy = new Uint8Array(
+ body.buffer,
+ body.byteOffset,
+ body.byteLength,
+ );
+ return core.jsonOpAsync("op_fetch_request_write", args, zeroCopy);
}
const NULL_BODY_STATUS = [101, 204, 205, 304];
@@ -1184,19 +1196,41 @@
baseUrl = href;
}
- function sendFetchReq(url, method, headers, body, clientRid) {
+ async function sendFetchReq(url, method, headers, body, clientRid) {
let headerArray = [];
if (headers) {
headerArray = Array.from(headers.entries());
}
- return opFetch({
- method,
- url,
- baseUrl,
- headers: headerArray,
- clientRid,
- }, body);
+ const { requestRid, requestBodyRid } = opFetch(
+ {
+ method,
+ url,
+ baseUrl,
+ headers: headerArray,
+ clientRid,
+ hasBody: !!body,
+ },
+ body instanceof Uint8Array ? body : undefined,
+ );
+ if (requestBodyRid) {
+ const writer = new WritableStream({
+ async write(chunk, controller) {
+ try {
+ await opFetchRequestWrite({ rid: requestBodyRid }, chunk);
+ } catch (err) {
+ controller.error(err);
+ controller.close();
+ }
+ },
+ close() {
+ core.close(requestBodyRid);
+ },
+ });
+ body.pipeTo(writer);
+ }
+
+ return await opFetchSend({ rid: requestRid });
}
async function fetch(input, init) {
@@ -1253,13 +1287,8 @@
);
body = multipartBuilder.getBody();
contentType = multipartBuilder.getContentType();
- } else {
- // TODO(lucacasonato): do this in a streaming fashion once we support it
- const buf = new Buffer();
- for await (const chunk of init.body) {
- buf.write(chunk);
- }
- body = buf.bytes();
+ } else if (init.body instanceof ReadableStream) {
+ body = init.body;
}
if (contentType && !headers.has("content-type")) {
headers.set("content-type", contentType);
@@ -1275,8 +1304,8 @@
method = input.method;
headers = input.headers;
- if (input._bodySource) {
- body = new DataView(await input.arrayBuffer());
+ if (input.body) {
+ body = input.body;
}
}
@@ -1290,7 +1319,7 @@
body,
clientRid,
);
- const rid = fetchResponse.bodyRid;
+ const rid = fetchResponse.responseRid;
if (
NULL_BODY_STATUS.includes(fetchResponse.status) ||
@@ -1298,21 +1327,28 @@
) {
// We won't use body of received response, so close it now
// otherwise it will be kept in resource table.
- core.close(fetchResponse.bodyRid);
+ core.close(rid);
responseBody = null;
} else {
responseBody = new ReadableStream({
type: "bytes",
async pull(controller) {
try {
- const result = await core.jsonOpAsync("op_fetch_read", { rid });
- if (!result || !result.chunk) {
+ const chunk = new Uint8Array(16 * 1024 + 256);
+ const { read } = await core.jsonOpAsync(
+ "op_fetch_response_read",
+ { rid },
+ chunk,
+ );
+ if (read != 0) {
+ if (chunk.length == read) {
+ controller.enqueue(chunk);
+ } else {
+ controller.enqueue(chunk.subarray(0, read));
+ }
+ } else {
controller.close();
core.close(rid);
- } else {
- // TODO(ry) This is terribly inefficient. Make this zero-copy.
- const chunk = new Uint8Array(result.chunk);
- controller.enqueue(chunk);
}
} catch (e) {
controller.error(e);