diff options
author | Luca Casonato <lucacasonato@yahoo.com> | 2021-01-10 20:54:29 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-10 20:54:29 +0100 |
commit | 1a6ce29f3d11e5f0d0d022914e3f9fbcfa487294 (patch) | |
tree | 9e1940a9a7a7392e6818341eea67becfa26ebbfa /op_crates/fetch/26_fetch.js | |
parent | 2c1f74402c00a2975cdaf9199b6487e5fd8175ba (diff) |
feat(fetch): req streaming + 0-copy resp streaming (#9036)
* feat(fetch): req streaming + 0-copy resp streaming
* lint
* lint
* fix test
* rm test.js
* explicitly use CancelHandle::default()
* Apply review suggestions
Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl>
* fix test
* Merge remote-tracking branch 'origin/master' into fetch_real_streaming
* fix test
* retrigger ci
Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl>
Diffstat (limited to 'op_crates/fetch/26_fetch.js')
-rw-r--r-- | op_crates/fetch/26_fetch.js | 88 |
1 files changed, 62 insertions, 26 deletions
diff --git a/op_crates/fetch/26_fetch.js b/op_crates/fetch/26_fetch.js index 0d405d4ec..2d50f1142 100644 --- a/op_crates/fetch/26_fetch.js +++ b/op_crates/fetch/26_fetch.js @@ -897,8 +897,20 @@ if (body != null) { zeroCopy = new Uint8Array(body.buffer, body.byteOffset, body.byteLength); } + return core.jsonOpSync("op_fetch", args, ...(zeroCopy ? [zeroCopy] : [])); + } - return core.jsonOpAsync("op_fetch", args, ...(zeroCopy ? [zeroCopy] : [])); + function opFetchSend(args) { + return core.jsonOpAsync("op_fetch_send", args); + } + + function opFetchRequestWrite(args, body) { + const zeroCopy = new Uint8Array( + body.buffer, + body.byteOffset, + body.byteLength, + ); + return core.jsonOpAsync("op_fetch_request_write", args, zeroCopy); } const NULL_BODY_STATUS = [101, 204, 205, 304]; @@ -1184,19 +1196,41 @@ baseUrl = href; } - function sendFetchReq(url, method, headers, body, clientRid) { + async function sendFetchReq(url, method, headers, body, clientRid) { let headerArray = []; if (headers) { headerArray = Array.from(headers.entries()); } - return opFetch({ - method, - url, - baseUrl, - headers: headerArray, - clientRid, - }, body); + const { requestRid, requestBodyRid } = opFetch( + { + method, + url, + baseUrl, + headers: headerArray, + clientRid, + hasBody: !!body, + }, + body instanceof Uint8Array ? body : undefined, + ); + if (requestBodyRid) { + const writer = new WritableStream({ + async write(chunk, controller) { + try { + await opFetchRequestWrite({ rid: requestBodyRid }, chunk); + } catch (err) { + controller.error(err); + controller.close(); + } + }, + close() { + core.close(requestBodyRid); + }, + }); + body.pipeTo(writer); + } + + return await opFetchSend({ rid: requestRid }); } async function fetch(input, init) { @@ -1253,13 +1287,8 @@ ); body = multipartBuilder.getBody(); contentType = multipartBuilder.getContentType(); - } else { - // TODO(lucacasonato): do this in a streaming fashion once we support it - const buf = new Buffer(); - for await (const chunk of init.body) { - buf.write(chunk); - } - body = buf.bytes(); + } else if (init.body instanceof ReadableStream) { + body = init.body; } if (contentType && !headers.has("content-type")) { headers.set("content-type", contentType); @@ -1275,8 +1304,8 @@ method = input.method; headers = input.headers; - if (input._bodySource) { - body = new DataView(await input.arrayBuffer()); + if (input.body) { + body = input.body; } } @@ -1290,7 +1319,7 @@ body, clientRid, ); - const rid = fetchResponse.bodyRid; + const rid = fetchResponse.responseRid; if ( NULL_BODY_STATUS.includes(fetchResponse.status) || @@ -1298,21 +1327,28 @@ ) { // We won't use body of received response, so close it now // otherwise it will be kept in resource table. - core.close(fetchResponse.bodyRid); + core.close(rid); responseBody = null; } else { responseBody = new ReadableStream({ type: "bytes", async pull(controller) { try { - const result = await core.jsonOpAsync("op_fetch_read", { rid }); - if (!result || !result.chunk) { + const chunk = new Uint8Array(16 * 1024 + 256); + const { read } = await core.jsonOpAsync( + "op_fetch_response_read", + { rid }, + chunk, + ); + if (read != 0) { + if (chunk.length == read) { + controller.enqueue(chunk); + } else { + controller.enqueue(chunk.subarray(0, read)); + } + } else { controller.close(); core.close(rid); - } else { - // TODO(ry) This is terribly inefficient. Make this zero-copy. - const chunk = new Uint8Array(result.chunk); - controller.enqueue(chunk); } } catch (e) { controller.error(e); |