diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2021-04-09 00:34:15 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-08 18:34:15 -0400 |
commit | 70af8128767f2fc5a9c59107d3b5ddc00531db55 (patch) | |
tree | 48513959f16273eab2c4b743d61042b00a72deb8 /runtime/js/40_http.js | |
parent | b30ac9c5cf58c34ed71d2f470cdbcd86a6096987 (diff) |
feat: native HTTP bindings (#9935)
Co-authered-by: Luca Casonato <lucacasonato@yahoo.com>
Co-authered-by: Ben Noordhuis <info@bnoordhuis.nl>
Co-authered-by: Ryan Dahl <ry@tinyclouds.org>
Diffstat (limited to 'runtime/js/40_http.js')
-rw-r--r-- | runtime/js/40_http.js | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/runtime/js/40_http.js b/runtime/js/40_http.js new file mode 100644 index 000000000..cfb015edd --- /dev/null +++ b/runtime/js/40_http.js @@ -0,0 +1,210 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const { Request, dontValidateUrl, fastBody, Response } = + window.__bootstrap.fetch; + const { Headers } = window.__bootstrap.headers; + const errors = window.__bootstrap.errors.errors; + const core = window.Deno.core; + const { ReadableStream } = window.__bootstrap.streams; + + function flatEntries(obj) { + const entries = []; + for (const key in obj) { + entries.push(key); + entries.push(obj[key]); + } + return entries; + } + + function startHttp(conn) { + const rid = Deno.core.jsonOpSync("op_http_start", conn.rid); + return new HttpConn(rid); + } + + class HttpConn { + #rid = 0; + + constructor(rid) { + this.#rid = rid; + } + + /** @returns {number} */ + get rid() { + return this.#rid; + } + + /** @returns {Promise<ResponseEvent | null>} */ + async nextRequest() { + let nextRequest; + try { + nextRequest = await Deno.core.jsonOpAsync( + "op_http_request_next", + this.#rid, + ); + } catch (error) { + if (error instanceof errors.BadResource) { + return null; + } else if (error instanceof errors.Interrupted) { + return null; + } + throw error; + } + if (nextRequest === null) return null; + + const [ + requestBodyRid, + responseSenderRid, + method, + headersList, + url, + ] = nextRequest; + + /** @type {ReadableStream<Uint8Array> | undefined} */ + let body = undefined; + if (typeof requestBodyRid === "number") { + body = createRequestBodyStream(requestBodyRid); + } + + const request = new Request(url, { + body, + method, + headers: new Headers(headersList), + [dontValidateUrl]: true, + }); + + const respondWith = createRespondWith(responseSenderRid, this.#rid); + + return { request, respondWith }; + } + + /** @returns {void} */ + close() { + core.close(this.#rid); + } + + [Symbol.asyncIterator]() { + const httpConn = this; + return { + async next() { + const reqEvt = await httpConn.nextRequest(); + if (reqEvt === null) return { value: undefined, done: true }; + return { value: reqEvt, done: false }; + }, + }; + } + } + + function readRequest(requestRid, zeroCopyBuf) { + return Deno.core.jsonOpAsync( + "op_http_request_read", + requestRid, + zeroCopyBuf, + ); + } + + function respond(responseSenderRid, resp, zeroCopyBuf) { + return Deno.core.jsonOpSync("op_http_response", [ + responseSenderRid, + resp.status ?? 200, + flatEntries(resp.headers ?? {}), + ], zeroCopyBuf); + } + + function createRespondWith(responseSenderRid, connRid) { + return async function (resp) { + if (resp instanceof Promise) { + resp = await resp; + } + + if (!(resp instanceof Response)) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } + // If response body is Uint8Array it will be sent synchronously + // in a single op, in other case a "response body" resource will be + // created and we'll be streaming it. + const body = resp[fastBody](); + let zeroCopyBuf; + if (body instanceof ArrayBuffer) { + zeroCopyBuf = new Uint8Array(body); + } else if (!body) { + zeroCopyBuf = new Uint8Array(0); + } else { + zeroCopyBuf = null; + } + + const responseBodyRid = respond( + responseSenderRid, + resp, + zeroCopyBuf, + ); + + // If `respond` returns a responseBodyRid, we should stream the body + // to that resource. + if (typeof responseBodyRid === "number") { + if (!body || !(body instanceof ReadableStream)) { + throw new Error( + "internal error: recieved responseBodyRid, but response has no body or is not a stream", + ); + } + for await (const chunk of body) { + const data = new Uint8Array( + chunk.buffer, + chunk.byteOffset, + chunk.byteLength, + ); + await Deno.core.jsonOpAsync( + "op_http_response_write", + responseBodyRid, + data, + ); + } + + // Once all chunks are sent, and the request body is closed, we can close + // the response body. + await Deno.core.jsonOpAsync("op_http_response_close", responseBodyRid); + } + }; + } + + function createRequestBodyStream(requestBodyRid) { + return new ReadableStream({ + type: "bytes", + async pull(controller) { + try { + // This is the largest possible size for a single packet on a TLS + // stream. + const chunk = new Uint8Array(16 * 1024 + 256); + const read = await readRequest( + requestBodyRid, + chunk, + ); + if (read > 0) { + // We read some data. Enqueue it onto the stream. + controller.enqueue(chunk.subarray(0, read)); + } else { + // We have reached the end of the body, so we close the stream. + controller.close(); + core.close(requestBodyRid); + } + } catch (err) { + // There was an error while reading a chunk of the body, so we + // error. + controller.error(err); + controller.close(); + core.close(requestBodyRid); + } + }, + cancel() { + core.close(requestBodyRid); + }, + }); + } + + window.__bootstrap.http = { + startHttp, + }; +})(this); |