summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/fetch/22_body.js16
-rw-r--r--ext/fetch/23_request.js77
-rw-r--r--ext/fetch/23_response.js20
-rw-r--r--ext/flash/01_http.js569
-rw-r--r--ext/flash/Cargo.toml29
-rw-r--r--ext/flash/README.md7
-rw-r--r--ext/flash/chunked.rs272
-rw-r--r--ext/flash/lib.rs1567
-rw-r--r--ext/flash/sendfile.rs81
-rw-r--r--ext/http/01_http.js16
-rw-r--r--ext/http/lib.rs39
-rw-r--r--ext/web/06_streams.js1
-rw-r--r--ext/websocket/lib.rs23
13 files changed, 2691 insertions, 26 deletions
diff --git a/ext/fetch/22_body.js b/ext/fetch/22_body.js
index 10ddb7603..a51cdc184 100644
--- a/ext/fetch/22_body.js
+++ b/ext/fetch/22_body.js
@@ -388,7 +388,10 @@
let source = null;
let length = null;
let contentType = null;
- if (ObjectPrototypeIsPrototypeOf(BlobPrototype, object)) {
+ if (typeof object === "string") {
+ source = object;
+ contentType = "text/plain;charset=UTF-8";
+ } else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, object)) {
stream = object.stream();
source = object;
length = object.size;
@@ -424,24 +427,21 @@
// TODO(@satyarohith): not sure what primordial here.
source = object.toString();
contentType = "application/x-www-form-urlencoded;charset=UTF-8";
- } else if (typeof object === "string") {
- source = object;
- contentType = "text/plain;charset=UTF-8";
} else if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, object)) {
stream = object;
if (object.locked || isReadableStreamDisturbed(object)) {
throw new TypeError("ReadableStream is locked or disturbed");
}
}
- if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, source)) {
- stream = { body: source, consumed: false };
- length = source.byteLength;
- } else if (typeof source === "string") {
+ if (typeof source === "string") {
// WARNING: this deviates from spec (expects length to be set)
// https://fetch.spec.whatwg.org/#bodyinit > 7.
// no observable side-effect for users so far, but could change
stream = { body: source, consumed: false };
length = null; // NOTE: string length != byte length
+ } else if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, source)) {
+ stream = { body: source, consumed: false };
+ length = source.byteLength;
}
const body = new InnerBody(stream);
body.source = source;
diff --git a/ext/fetch/23_request.js b/ext/fetch/23_request.js
index 63fc6a26e..5221d5ca9 100644
--- a/ext/fetch/23_request.js
+++ b/ext/fetch/23_request.js
@@ -16,7 +16,7 @@
const { HTTP_TOKEN_CODE_POINT_RE, byteUpperCase } = window.__bootstrap.infra;
const { URL } = window.__bootstrap.url;
const { guardFromHeaders } = window.__bootstrap.headers;
- const { mixinBody, extractBody } = window.__bootstrap.fetchBody;
+ const { mixinBody, extractBody, InnerBody } = window.__bootstrap.fetchBody;
const { getLocationHref } = window.__bootstrap.location;
const { extractMimeType } = window.__bootstrap.mimesniff;
const { blobFromObjectUrl } = window.__bootstrap.file;
@@ -48,6 +48,9 @@
const _signal = Symbol("signal");
const _mimeType = Symbol("mime type");
const _body = Symbol("body");
+ const _flash = Symbol("flash");
+ const _url = Symbol("url");
+ const _method = Symbol("method");
/**
* @param {(() => string)[]} urlList
@@ -266,7 +269,11 @@
return extractMimeType(values);
}
get [_body]() {
- return this[_request].body;
+ if (this[_flash]) {
+ return this[_flash].body;
+ } else {
+ return this[_request].body;
+ }
}
/**
@@ -427,12 +434,31 @@
get method() {
webidl.assertBranded(this, RequestPrototype);
- return this[_request].method;
+ if (this[_method]) {
+ return this[_method];
+ }
+ if (this[_flash]) {
+ this[_method] = this[_flash].methodCb();
+ return this[_method];
+ } else {
+ this[_method] = this[_request].method;
+ return this[_method];
+ }
}
get url() {
webidl.assertBranded(this, RequestPrototype);
- return this[_request].url();
+ if (this[_url]) {
+ return this[_url];
+ }
+
+ if (this[_flash]) {
+ this[_url] = this[_flash].urlCb();
+ return this[_url];
+ } else {
+ this[_url] = this[_request].url();
+ return this[_url];
+ }
}
get headers() {
@@ -442,6 +468,9 @@
get redirect() {
webidl.assertBranded(this, RequestPrototype);
+ if (this[_flash]) {
+ return this[_flash].redirectMode;
+ }
return this[_request].redirectMode;
}
@@ -455,7 +484,12 @@
if (this[_body] && this[_body].unusable()) {
throw new TypeError("Body is unusable.");
}
- const newReq = cloneInnerRequest(this[_request]);
+ let newReq;
+ if (this[_flash]) {
+ newReq = cloneInnerRequest(this[_flash]);
+ } else {
+ newReq = cloneInnerRequest(this[_request]);
+ }
const newSignal = abortSignal.newSignal();
abortSignal.follow(newSignal, this[_signal]);
return fromInnerRequest(
@@ -549,10 +583,43 @@
return request;
}
+ /**
+ * @param {number} serverId
+ * @param {number} streamRid
+ * @param {ReadableStream} body
+ * @param {() => string} methodCb
+ * @param {() => string} urlCb
+ * @param {() => [string, string][]} headersCb
+ * @returns {Request}
+ */
+ function fromFlashRequest(
+ serverId,
+ streamRid,
+ body,
+ methodCb,
+ urlCb,
+ headersCb,
+ ) {
+ const request = webidl.createBranded(Request);
+ request[_flash] = {
+ body: body !== null ? new InnerBody(body) : null,
+ methodCb,
+ urlCb,
+ streamRid,
+ serverId,
+ redirectMode: "follow",
+ redirectCount: 0,
+ };
+ request[_getHeaders] = () => headersFromHeaderList(headersCb(), "request");
+ return request;
+ }
+
window.__bootstrap.fetch ??= {};
window.__bootstrap.fetch.Request = Request;
window.__bootstrap.fetch.toInnerRequest = toInnerRequest;
+ window.__bootstrap.fetch.fromFlashRequest = fromFlashRequest;
window.__bootstrap.fetch.fromInnerRequest = fromInnerRequest;
window.__bootstrap.fetch.newInnerRequest = newInnerRequest;
window.__bootstrap.fetch.processUrlList = processUrlList;
+ window.__bootstrap.fetch._flash = _flash;
})(globalThis);
diff --git a/ext/fetch/23_response.js b/ext/fetch/23_response.js
index 226a751bd..3c19f963a 100644
--- a/ext/fetch/23_response.js
+++ b/ext/fetch/23_response.js
@@ -15,6 +15,9 @@
const { isProxy } = Deno.core;
const webidl = window.__bootstrap.webidl;
const consoleInternal = window.__bootstrap.console;
+ const {
+ byteLowerCase,
+ } = window.__bootstrap.infra;
const { HTTP_TAB_OR_SPACE, regexMatcher, serializeJSValueToJSONString } =
window.__bootstrap.infra;
const { extractBody, mixinBody } = window.__bootstrap.fetchBody;
@@ -185,7 +188,6 @@
// 4.
response[_response].statusMessage = init.statusText;
-
// 5.
/** @type {__bootstrap.headers.Headers} */
const headers = response[_headers];
@@ -200,10 +202,22 @@
"Response with null body status cannot have body",
);
}
+
const { body, contentType } = bodyWithType;
response[_response].body = body;
- if (contentType !== null && !headers.has("content-type")) {
- headers.append("Content-Type", contentType);
+
+ if (contentType !== null) {
+ let hasContentType = false;
+ const list = headerListFromHeaders(headers);
+ for (let i = 0; i < list.length; i++) {
+ if (byteLowerCase(list[i][0]) === "content-type") {
+ hasContentType = true;
+ break;
+ }
+ }
+ if (!hasContentType) {
+ ArrayPrototypePush(list, ["Content-Type", contentType]);
+ }
}
}
}
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
new file mode 100644
index 000000000..fd817219e
--- /dev/null
+++ b/ext/flash/01_http.js
@@ -0,0 +1,569 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const { BlobPrototype } = window.__bootstrap.file;
+ const { fromFlashRequest, toInnerResponse } = window.__bootstrap.fetch;
+ const core = window.Deno.core;
+ const {
+ ReadableStream,
+ ReadableStreamPrototype,
+ getReadableStreamRid,
+ readableStreamClose,
+ _state,
+ } = window.__bootstrap.streams;
+ const {
+ WebSocket,
+ _rid,
+ _readyState,
+ _eventLoop,
+ _protocol,
+ _server,
+ _idleTimeoutDuration,
+ _idleTimeoutTimeout,
+ _serverHandleIdleTimeout,
+ } = window.__bootstrap.webSocket;
+ const { _ws } = window.__bootstrap.http;
+ const {
+ ObjectPrototypeIsPrototypeOf,
+ TypedArrayPrototypeSubarray,
+ TypeError,
+ Uint8Array,
+ Uint8ArrayPrototype,
+ } = window.__bootstrap.primordials;
+
+ const statusCodes = {
+ 100: "Continue",
+ 101: "Switching Protocols",
+ 102: "Processing",
+ 200: "OK",
+ 201: "Created",
+ 202: "Accepted",
+ 203: "Non Authoritative Information",
+ 204: "No Content",
+ 205: "Reset Content",
+ 206: "Partial Content",
+ 207: "Multi-Status",
+ 208: "Already Reported",
+ 226: "IM Used",
+ 300: "Multiple Choices",
+ 301: "Moved Permanently",
+ 302: "Found",
+ 303: "See Other",
+ 304: "Not Modified",
+ 305: "Use Proxy",
+ 307: "Temporary Redirect",
+ 308: "Permanent Redirect",
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 402: "Payment Required",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 405: "Method Not Allowed",
+ 406: "Not Acceptable",
+ 407: "Proxy Authentication Required",
+ 408: "Request Timeout",
+ 409: "Conflict",
+ 410: "Gone",
+ 411: "Length Required",
+ 412: "Precondition Failed",
+ 413: "Payload Too Large",
+ 414: "URI Too Long",
+ 415: "Unsupported Media Type",
+ 416: "Range Not Satisfiable",
+ 418: "I'm a teapot",
+ 421: "Misdirected Request",
+ 422: "Unprocessable Entity",
+ 423: "Locked",
+ 424: "Failed Dependency",
+ 426: "Upgrade Required",
+ 428: "Precondition Required",
+ 429: "Too Many Requests",
+ 431: "Request Header Fields Too Large",
+ 451: "Unavailable For Legal Reasons",
+ 500: "Internal Server Error",
+ 501: "Not Implemented",
+ 502: "Bad Gateway",
+ 503: "Service Unavailable",
+ 504: "Gateway Timeout",
+ 505: "HTTP Version Not Supported",
+ 506: "Variant Also Negotiates",
+ 507: "Insufficient Storage",
+ 508: "Loop Detected",
+ 510: "Not Extended",
+ 511: "Network Authentication Required",
+ };
+
+ const methods = {
+ 0: "GET",
+ 1: "HEAD",
+ 2: "CONNECT",
+ 3: "PUT",
+ 4: "DELETE",
+ 5: "OPTIONS",
+ 6: "TRACE",
+ 7: "POST",
+ 8: "PATCH",
+ };
+
+ let dateInterval;
+ let date;
+ let stringResources = {};
+
+ // Construct an HTTP response message.
+ // All HTTP/1.1 messages consist of a start-line followed by a sequence
+ // of octets.
+ //
+ // HTTP-message = start-line
+ // *( header-field CRLF )
+ // CRLF
+ // [ message-body ]
+ //
+ function http1Response(method, status, headerList, body, earlyEnd = false) {
+ // HTTP uses a "<major>.<minor>" numbering scheme
+ // HTTP-version = HTTP-name "/" DIGIT "." DIGIT
+ // HTTP-name = %x48.54.54.50 ; "HTTP", case-sensitive
+ //
+ // status-line = HTTP-version SP status-code SP reason-phrase CRLF
+ // Date header: https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.1.2
+ let str = `HTTP/1.1 ${status} ${statusCodes[status]}\r\nDate: ${date}\r\n`;
+ for (const [name, value] of headerList) {
+ // header-field = field-name ":" OWS field-value OWS
+ str += `${name}: ${value}\r\n`;
+ }
+
+ // https://datatracker.ietf.org/doc/html/rfc7231#section-6.3.6
+ if (status === 205 || status === 304) {
+ // MUST NOT generate a payload in a 205 response.
+ // indicate a zero-length body for the response by
+ // including a Content-Length header field with a value of 0.
+ str += "Content-Length: 0\r\n";
+ return str;
+ }
+
+ // MUST NOT send Content-Length or Transfer-Encoding if status code is 1xx or 204.
+ if (status == 204 && status <= 100) {
+ return str;
+ }
+
+ if (earlyEnd === true) {
+ return str;
+ }
+
+ // null body status is validated by inititalizeAResponse in ext/fetch
+ if (body !== null && body !== undefined) {
+ str += `Content-Length: ${body.length}\r\n\r\n`;
+ } else {
+ str += "Transfer-Encoding: chunked\r\n\r\n";
+ return str;
+ }
+
+ // A HEAD request.
+ if (method === 1) return str;
+
+ if (typeof body === "string") {
+ str += body ?? "";
+ } else {
+ const head = core.encode(str);
+ const response = new Uint8Array(head.byteLength + body.byteLength);
+ response.set(head, 0);
+ response.set(body, head.byteLength);
+ return response;
+ }
+
+ return str;
+ }
+
+ function prepareFastCalls() {
+ return core.opSync("op_flash_make_request");
+ }
+
+ function hostnameForDisplay(hostname) {
+ // If the hostname is "0.0.0.0", we display "localhost" in console
+ // because browsers in Windows don't resolve "0.0.0.0".
+ // See the discussion in https://github.com/denoland/deno_std/issues/1165
+ return hostname === "0.0.0.0" ? "localhost" : hostname;
+ }
+
+ function serve(handler, opts = {}) {
+ delete opts.key;
+ delete opts.cert;
+ return serveInner(handler, opts, false);
+ }
+
+ function serveTls(handler, opts = {}) {
+ return serveInner(handler, opts, true);
+ }
+
+ function serveInner(handler, opts, useTls) {
+ opts = { hostname: "127.0.0.1", port: 9000, useTls, ...opts };
+ const signal = opts.signal;
+ delete opts.signal;
+ const onError = opts.onError ?? function (error) {
+ console.error(error);
+ return new Response("Internal Server Error", { status: 500 });
+ };
+ delete opts.onError;
+ const onListen = opts.onListen ?? function () {
+ console.log(
+ `Listening on http://${
+ hostnameForDisplay(opts.hostname)
+ }:${opts.port}/`,
+ );
+ };
+ delete opts.onListen;
+ const serverId = core.ops.op_flash_serve(opts);
+ const serverPromise = core.opAsync("op_flash_drive_server", serverId);
+
+ core.opAsync("op_flash_wait_for_listening", serverId).then(() => {
+ onListen({ hostname: opts.hostname, port: opts.port });
+ });
+
+ const server = {
+ id: serverId,
+ transport: opts.cert && opts.key ? "https" : "http",
+ hostname: opts.hostname,
+ port: opts.port,
+ closed: false,
+ finished: (async () => {
+ return await serverPromise;
+ })(),
+ async close() {
+ if (server.closed) {
+ return;
+ }
+ server.closed = true;
+ await core.opAsync("op_flash_close_server", serverId);
+ await server.finished;
+ },
+ async serve() {
+ while (true) {
+ if (server.closed) {
+ break;
+ }
+
+ let token = nextRequestSync();
+ if (token === 0) {
+ token = await core.opAsync("op_flash_next_async", serverId);
+ if (server.closed) {
+ break;
+ }
+ }
+
+ for (let i = 0; i < token; i++) {
+ let body = null;
+ // There might be a body, but we don't expose it for GET/HEAD requests.
+ // It will be closed automatically once the request has been handled and
+ // the response has been sent.
+ const method = getMethodSync(i);
+ let hasBody = method > 2; // Not GET/HEAD/CONNECT
+ if (hasBody) {
+ body = createRequestBodyStream(serverId, i);
+ if (body === null) {
+ hasBody = false;
+ }
+ }
+
+ const req = fromFlashRequest(
+ serverId,
+ /* streamRid */
+ i,
+ body,
+ /* methodCb */
+ () => methods[method],
+ /* urlCb */
+ () => {
+ const path = core.ops.op_flash_path(serverId, i);
+ return `${server.transport}://${server.hostname}:${server.port}${path}`;
+ },
+ /* headersCb */
+ () => core.ops.op_flash_headers(serverId, i),
+ );
+
+ let resp;
+ try {
+ resp = await handler(req);
+ } catch (e) {
+ resp = await onError(e);
+ }
+ // there might've been an HTTP upgrade.
+ if (resp === undefined) {
+ continue;
+ }
+
+ const ws = resp[_ws];
+ if (!ws) {
+ if (hasBody && body[_state] !== "closed") {
+ // TODO(@littledivy): Optimize by draining in a single op.
+ try {
+ await req.arrayBuffer();
+ } catch { /* pass */ }
+ }
+ }
+
+ const innerResp = toInnerResponse(resp);
+
+ // If response body length is known, it will be sent synchronously in a
+ // single op, in other case a "response body" resource will be created and
+ // we'll be streaming it.
+ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
+ let respBody = null;
+ let isStreamingResponseBody = false;
+ if (innerResp.body !== null) {
+ if (typeof innerResp.body.streamOrStatic?.body === "string") {
+ if (innerResp.body.streamOrStatic.consumed === true) {
+ throw new TypeError("Body is unusable.");
+ }
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
+ isStreamingResponseBody = false;
+ } else if (
+ ObjectPrototypeIsPrototypeOf(
+ ReadableStreamPrototype,
+ innerResp.body.streamOrStatic,
+ )
+ ) {
+ if (innerResp.body.unusable()) {
+ throw new TypeError("Body is unusable.");
+ }
+ if (
+ innerResp.body.length === null ||
+ ObjectPrototypeIsPrototypeOf(
+ BlobPrototype,
+ innerResp.body.source,
+ )
+ ) {
+ respBody = innerResp.body.stream;
+ } else {
+ const reader = innerResp.body.stream.getReader();
+ const r1 = await reader.read();
+ if (r1.done) {
+ respBody = new Uint8Array(0);
+ } else {
+ respBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
+ }
+ isStreamingResponseBody = !(
+ typeof respBody === "string" ||
+ ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
+ );
+ } else {
+ if (innerResp.body.streamOrStatic.consumed === true) {
+ throw new TypeError("Body is unusable.");
+ }
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
+ }
+ } else {
+ respBody = new Uint8Array(0);
+ }
+
+ if (isStreamingResponseBody === true) {
+ const resourceRid = getReadableStreamRid(respBody);
+ if (resourceRid) {
+ if (respBody.locked) {
+ throw new TypeError("ReadableStream is locked.");
+ }
+ const reader = respBody.getReader(); // Aquire JS lock.
+ try {
+ core.opAsync(
+ "op_flash_write_resource",
+ http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ null,
+ true,
+ ),
+ serverId,
+ i,
+ resourceRid,
+ ).then(() => {
+ // Release JS lock.
+ readableStreamClose(respBody);
+ });
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
+ } else {
+ const reader = respBody.getReader();
+ let first = true;
+ a:
+ while (true) {
+ const { value, done } = await reader.read();
+ if (first) {
+ first = false;
+ core.ops.op_flash_respond(
+ serverId,
+ i,
+ http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ null,
+ ),
+ value ?? new Uint8Array(),
+ false,
+ );
+ } else {
+ if (value === undefined) {
+ core.ops.op_flash_respond_chuncked(
+ serverId,
+ i,
+ undefined,
+ done,
+ );
+ } else {
+ respondChunked(
+ i,
+ value,
+ done,
+ );
+ }
+ }
+ if (done) break a;
+ }
+ }
+ } else {
+ const responseStr = http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ respBody,
+ );
+
+ // TypedArray
+ if (typeof responseStr !== "string") {
+ respondFast(i, responseStr, !ws);
+ } else {
+ // string
+ const maybeResponse = stringResources[responseStr];
+ if (maybeResponse === undefined) {
+ stringResources[responseStr] = core.encode(responseStr);
+ core.ops.op_flash_respond(
+ serverId,
+ i,
+ stringResources[responseStr],
+ null,
+ !ws, // Don't close socket if there is a deferred websocket upgrade.
+ );
+ } else {
+ respondFast(i, maybeResponse, !ws);
+ }
+ }
+ }
+
+ if (ws) {
+ const wsRid = await core.opAsync(
+ "op_flash_upgrade_websocket",
+ serverId,
+ i,
+ );
+ ws[_rid] = wsRid;
+ ws[_protocol] = resp.headers.get("sec-websocket-protocol");
+
+ ws[_readyState] = WebSocket.OPEN;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
+
+ ws[_eventLoop]();
+ if (ws[_idleTimeoutDuration]) {
+ ws.addEventListener(
+ "close",
+ () => clearTimeout(ws[_idleTimeoutTimeout]),
+ );
+ }
+ ws[_serverHandleIdleTimeout]();
+ }
+ }
+ }
+ await server.finished;
+ },
+ };
+
+ signal?.addEventListener("abort", () => {
+ clearInterval(dateInterval);
+ server.close().then(() => {}, () => {});
+ }, {
+ once: true,
+ });
+
+ const fastOp = prepareFastCalls();
+ let nextRequestSync = () => fastOp.nextRequest();
+ let getMethodSync = (token) => fastOp.getMethod(token);
+ let respondChunked = (token, chunk, shutdown) =>
+ fastOp.respondChunked(token, chunk, shutdown);
+ let respondFast = (token, response, shutdown) =>
+ fastOp.respond(token, response, shutdown);
+ if (serverId > 0) {
+ nextRequestSync = () => core.ops.op_flash_next_server(serverId);
+ getMethodSync = (token) => core.ops.op_flash_method(serverId, token);
+ respondChunked = (token, chunk, shutdown) =>
+ core.ops.op_flash_respond_chuncked(serverId, token, chunk, shutdown);
+ respondFast = (token, response, shutdown) =>
+ core.ops.op_flash_respond(serverId, token, response, null, shutdown);
+ }
+
+ if (!dateInterval) {
+ date = new Date().toUTCString();
+ dateInterval = setInterval(() => {
+ date = new Date().toUTCString();
+ stringResources = {};
+ }, 1000);
+ }
+
+ return server.serve().catch(console.error);
+ }
+
+ function createRequestBodyStream(serverId, token) {
+ // The first packet is left over bytes after parsing the request
+ const firstRead = core.ops.op_flash_first_packet(
+ serverId,
+ token,
+ );
+ if (!firstRead) return null;
+ let firstEnqueued = firstRead.byteLength == 0;
+
+ return new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ try {
+ if (firstEnqueued === false) {
+ controller.enqueue(firstRead);
+ firstEnqueued = true;
+ return;
+ }
+ // This is the largest possible size for a single packet on a TLS
+ // stream.
+ const chunk = new Uint8Array(16 * 1024 + 256);
+ const read = await core.opAsync(
+ "op_flash_read_body",
+ serverId,
+ token,
+ chunk,
+ );
+ if (read > 0) {
+ // We read some data. Enqueue it onto the stream.
+ controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
+ } else {
+ // We have reached the end of the body, so we close the stream.
+ controller.close();
+ }
+ } catch (err) {
+ // There was an error while reading a chunk of the body, so we
+ // error.
+ controller.error(err);
+ controller.close();
+ }
+ },
+ });
+ }
+
+ window.__bootstrap.flash = {
+ serve,
+ serveTls,
+ };
+})(this);
diff --git a/ext/flash/Cargo.toml b/ext/flash/Cargo.toml
new file mode 100644
index 000000000..f61bc025d
--- /dev/null
+++ b/ext/flash/Cargo.toml
@@ -0,0 +1,29 @@
+# Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+[package]
+name = "deno_flash"
+version = "0.1.0"
+authors = ["the Deno authors"]
+edition = "2021"
+license = "MIT"
+readme = "README.md"
+repository = "https://github.com/denoland/deno"
+description = "Fast HTTP/1 server implementation for Deno"
+
+[lib]
+path = "lib.rs"
+
+[dependencies]
+deno_core = { path = "../../core", version = "0.147.0" }
+deno_tls = { version = "0.52.0", path = "../tls" }
+# For HTTP/2 and websocket upgrades
+deno_websocket = { version = "0.70.0", path = "../websocket" }
+http = "0.2.6"
+httparse = "1.7"
+libc = "0.2"
+log = "0.4.17"
+mio = { version = "0.8.1", features = ["os-poll", "net"] }
+rustls = { version = "0.20" }
+rustls-pemfile = { version = "0.2.1" }
+serde = { version = "1.0.136", features = ["derive"] }
+tokio = { version = "1.19", features = ["full"] }
diff --git a/ext/flash/README.md b/ext/flash/README.md
new file mode 100644
index 000000000..465c60d47
--- /dev/null
+++ b/ext/flash/README.md
@@ -0,0 +1,7 @@
+# flash
+
+Flash is a fast HTTP/1.1 server implementation for Deno.
+
+```js
+serve((req) => new Response("Hello World"));
+```
diff --git a/ext/flash/chunked.rs b/ext/flash/chunked.rs
new file mode 100644
index 000000000..86417807d
--- /dev/null
+++ b/ext/flash/chunked.rs
@@ -0,0 +1,272 @@
+// Based on https://github.com/frewsxcv/rust-chunked-transfer/blob/5c08614458580f9e7a85124021006d83ce1ed6e9/src/decoder.rs
+// Copyright 2015 The tiny-http Contributors
+// Copyright 2015 The rust-chunked-transfer Contributors
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use std::error::Error;
+use std::fmt;
+use std::io::Error as IoError;
+use std::io::ErrorKind;
+use std::io::Read;
+use std::io::Result as IoResult;
+
+pub struct Decoder<R> {
+ pub source: R,
+
+ // remaining size of the chunk being read
+ // none if we are not in a chunk
+ pub remaining_chunks_size: Option<usize>,
+ pub end: bool,
+}
+
+impl<R> Decoder<R>
+where
+ R: Read,
+{
+ pub fn new(source: R, remaining_chunks_size: Option<usize>) -> Decoder<R> {
+ Decoder {
+ source,
+ remaining_chunks_size,
+ end: false,
+ }
+ }
+
+ fn read_chunk_size(&mut self) -> IoResult<usize> {
+ let mut chunk_size_bytes = Vec::new();
+ let mut has_ext = false;
+
+ loop {
+ let byte = match self.source.by_ref().bytes().next() {
+ Some(b) => b?,
+ None => {
+ return Err(IoError::new(ErrorKind::InvalidInput, DecoderError))
+ }
+ };
+
+ if byte == b'\r' {
+ break;
+ }
+
+ if byte == b';' {
+ has_ext = true;
+ break;
+ }
+
+ chunk_size_bytes.push(byte);
+ }
+
+ // Ignore extensions for now
+ if has_ext {
+ loop {
+ let byte = match self.source.by_ref().bytes().next() {
+ Some(b) => b?,
+ None => {
+ return Err(IoError::new(ErrorKind::InvalidInput, DecoderError))
+ }
+ };
+ if byte == b'\r' {
+ break;
+ }
+ }
+ }
+
+ self.read_line_feed()?;
+
+ let chunk_size = String::from_utf8(chunk_size_bytes)
+ .ok()
+ .and_then(|c| usize::from_str_radix(c.trim(), 16).ok())
+ .ok_or_else(|| IoError::new(ErrorKind::InvalidInput, DecoderError))?;
+
+ Ok(chunk_size)
+ }
+
+ fn read_carriage_return(&mut self) -> IoResult<()> {
+ match self.source.by_ref().bytes().next() {
+ Some(Ok(b'\r')) => Ok(()),
+ _ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)),
+ }
+ }
+
+ fn read_line_feed(&mut self) -> IoResult<()> {
+ match self.source.by_ref().bytes().next() {
+ Some(Ok(b'\n')) => Ok(()),
+ _ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)),
+ }
+ }
+}
+
+impl<R> Read for Decoder<R>
+where
+ R: Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
+ let remaining_chunks_size = match self.remaining_chunks_size {
+ Some(c) => c,
+ None => {
+ // first possibility: we are not in a chunk, so we'll attempt to determine
+ // the chunks size
+ let chunk_size = self.read_chunk_size()?;
+
+ // if the chunk size is 0, we are at EOF
+ if chunk_size == 0 {
+ self.read_carriage_return()?;
+ self.read_line_feed()?;
+ self.end = true;
+ return Ok(0);
+ }
+
+ chunk_size
+ }
+ };
+
+ // second possibility: we continue reading from a chunk
+ if buf.len() < remaining_chunks_size {
+ let read = self.source.read(buf)?;
+ self.remaining_chunks_size = Some(remaining_chunks_size - read);
+ return Ok(read);
+ }
+
+ // third possibility: the read request goes further than the current chunk
+ // we simply read until the end of the chunk and return
+ let buf = &mut buf[..remaining_chunks_size];
+ let read = self.source.read(buf)?;
+ self.remaining_chunks_size = if read == remaining_chunks_size {
+ self.read_carriage_return()?;
+ self.read_line_feed()?;
+ None
+ } else {
+ Some(remaining_chunks_size - read)
+ };
+
+ Ok(read)
+ }
+}
+
+#[derive(Debug, Copy, Clone)]
+struct DecoderError;
+
+impl fmt::Display for DecoderError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
+ write!(fmt, "Error while decoding chunks")
+ }
+}
+
+impl Error for DecoderError {
+ fn description(&self) -> &str {
+ "Error while decoding chunks"
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::Decoder;
+ use std::io;
+ use std::io::Read;
+
+ /// This unit test is taken from from Hyper
+ /// https://github.com/hyperium/hyper
+ /// Copyright (c) 2014 Sean McArthur
+ #[test]
+ fn test_read_chunk_size() {
+ fn read(s: &str, expected: usize) {
+ let mut decoded = Decoder::new(s.as_bytes(), None);
+ let actual = decoded.read_chunk_size().unwrap();
+ assert_eq!(expected, actual);
+ }
+
+ fn read_err(s: &str) {
+ let mut decoded = Decoder::new(s.as_bytes(), None);
+ let err_kind = decoded.read_chunk_size().unwrap_err().kind();
+ assert_eq!(err_kind, io::ErrorKind::InvalidInput);
+ }
+
+ read("1\r\n", 1);
+ read("01\r\n", 1);
+ read("0\r\n", 0);
+ read("00\r\n", 0);
+ read("A\r\n", 10);
+ read("a\r\n", 10);
+ read("Ff\r\n", 255);
+ read("Ff \r\n", 255);
+ // Missing LF or CRLF
+ read_err("F\rF");
+ read_err("F");
+ // Invalid hex digit
+ read_err("X\r\n");
+ read_err("1X\r\n");
+ read_err("-\r\n");
+ read_err("-1\r\n");
+ // Acceptable (if not fully valid) extensions do not influence the size
+ read("1;extension\r\n", 1);
+ read("a;ext name=value\r\n", 10);
+ read("1;extension;extension2\r\n", 1);
+ read("1;;; ;\r\n", 1);
+ read("2; extension...\r\n", 2);
+ read("3 ; extension=123\r\n", 3);
+ read("3 ;\r\n", 3);
+ read("3 ; \r\n", 3);
+ // Invalid extensions cause an error
+ read_err("1 invalid extension\r\n");
+ read_err("1 A\r\n");
+ read_err("1;no CRLF");
+ }
+
+ #[test]
+ fn test_valid_chunk_decode() {
+ let source = io::Cursor::new(
+ "3\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n\r\n"
+ .to_string()
+ .into_bytes(),
+ );
+ let mut decoded = Decoder::new(source, None);
+
+ let mut string = String::new();
+ decoded.read_to_string(&mut string).unwrap();
+
+ assert_eq!(string, "hello world!!!");
+ }
+
+ #[test]
+ fn test_decode_zero_length() {
+ let mut decoder = Decoder::new(b"0\r\n\r\n" as &[u8], None);
+
+ let mut decoded = String::new();
+ decoder.read_to_string(&mut decoded).unwrap();
+
+ assert_eq!(decoded, "");
+ }
+
+ #[test]
+ fn test_decode_invalid_chunk_length() {
+ let mut decoder = Decoder::new(b"m\r\n\r\n" as &[u8], None);
+
+ let mut decoded = String::new();
+ assert!(decoder.read_to_string(&mut decoded).is_err());
+ }
+
+ #[test]
+ fn invalid_input1() {
+ let source = io::Cursor::new(
+ "2\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n"
+ .to_string()
+ .into_bytes(),
+ );
+ let mut decoded = Decoder::new(source, None);
+
+ let mut string = String::new();
+ assert!(decoded.read_to_string(&mut string).is_err());
+ }
+
+ #[test]
+ fn invalid_input2() {
+ let source = io::Cursor::new(
+ "3\rhel\r\nb\r\nlo world!!!\r\n0\r\n"
+ .to_string()
+ .into_bytes(),
+ );
+ let mut decoded = Decoder::new(source, None);
+
+ let mut string = String::new();
+ assert!(decoded.read_to_string(&mut string).is_err());
+ }
+}
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
new file mode 100644
index 000000000..2c0cc548a
--- /dev/null
+++ b/ext/flash/lib.rs
@@ -0,0 +1,1567 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+// False positive lint for explicit drops.
+// https://github.com/rust-lang/rust-clippy/issues/6446
+#![allow(clippy::await_holding_lock)]
+
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op;
+use deno_core::serde_v8;
+use deno_core::v8;
+use deno_core::v8::fast_api;
+use deno_core::ByteString;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
+use deno_core::Extension;
+use deno_core::OpState;
+use deno_core::StringOrBuffer;
+use deno_core::ZeroCopyBuf;
+use deno_core::V8_WRAPPER_OBJECT_INDEX;
+use deno_tls::load_certs;
+use deno_tls::load_private_keys;
+use http::header::HeaderName;
+use http::header::CONNECTION;
+use http::header::CONTENT_LENGTH;
+use http::header::EXPECT;
+use http::header::TRANSFER_ENCODING;
+use http::header::UPGRADE;
+use http::HeaderValue;
+use log::trace;
+use mio::net::TcpListener;
+use mio::net::TcpStream;
+use mio::Events;
+use mio::Interest;
+use mio::Poll;
+use mio::Token;
+use serde::Deserialize;
+use serde::Serialize;
+use std::cell::RefCell;
+use std::cell::UnsafeCell;
+use std::collections::HashMap;
+use std::ffi::c_void;
+use std::future::Future;
+use std::intrinsics::transmute;
+use std::io::BufReader;
+use std::io::Read;
+use std::io::Write;
+use std::marker::PhantomPinned;
+use std::mem::replace;
+use std::net::SocketAddr;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::task::Context;
+use std::time::Duration;
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
+
+mod chunked;
+
+#[cfg(unix)]
+mod sendfile;
+
+pub struct FlashContext {
+ next_server_id: u32,
+ join_handles: HashMap<u32, JoinHandle<Result<(), AnyError>>>,
+ pub servers: HashMap<u32, ServerContext>,
+}
+
+pub struct ServerContext {
+ _addr: SocketAddr,
+ tx: mpsc::Sender<NextRequest>,
+ rx: mpsc::Receiver<NextRequest>,
+ response: HashMap<u32, NextRequest>,
+ listening_rx: Option<mpsc::Receiver<()>>,
+ close_tx: mpsc::Sender<()>,
+ cancel_handle: Rc<CancelHandle>,
+}
+
+struct InnerRequest {
+ _headers: Vec<httparse::Header<'static>>,
+ req: httparse::Request<'static, 'static>,
+ body_offset: usize,
+ body_len: usize,
+ buffer: Pin<Box<[u8]>>,
+}
+
+#[derive(Debug, PartialEq)]
+enum ParseStatus {
+ None,
+ Ongoing(usize),
+}
+
+type TlsTcpStream = rustls::StreamOwned<rustls::ServerConnection, TcpStream>;
+
+enum InnerStream {
+ Tcp(TcpStream),
+ Tls(Box<TlsTcpStream>),
+}
+
+struct Stream {
+ inner: InnerStream,
+ detached: bool,
+ read_rx: Option<mpsc::Receiver<()>>,
+ read_tx: Option<mpsc::Sender<()>>,
+ parse_done: ParseStatus,
+ buffer: UnsafeCell<Vec<u8>>,
+ read_lock: Arc<Mutex<()>>,
+ _pin: PhantomPinned,
+}
+
+impl Stream {
+ pub fn detach_ownership(&mut self) {
+ self.detached = true;
+ }
+
+ fn reattach_ownership(&mut self) {
+ self.detached = false;
+ }
+}
+
+impl Write for Stream {
+ #[inline]
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ match self.inner {
+ InnerStream::Tcp(ref mut stream) => stream.write(buf),
+ InnerStream::Tls(ref mut stream) => stream.write(buf),
+ }
+ }
+ #[inline]
+ fn flush(&mut self) -> std::io::Result<()> {
+ match self.inner {
+ InnerStream::Tcp(ref mut stream) => stream.flush(),
+ InnerStream::Tls(ref mut stream) => stream.flush(),
+ }
+ }
+}
+
+impl Read for Stream {
+ #[inline]
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ match self.inner {
+ InnerStream::Tcp(ref mut stream) => stream.read(buf),
+ InnerStream::Tls(ref mut stream) => stream.read(buf),
+ }
+ }
+}
+
+struct NextRequest {
+ // Pointer to stream owned by the server loop thread.
+ //
+ // Why not Arc<Mutex<Stream>>? Performance. The stream
+ // is never written to by the server loop thread.
+ //
+ // Dereferencing is safe until server thread finishes and
+ // op_flash_serve resolves or websocket upgrade is performed.
+ socket: *mut Stream,
+ inner: InnerRequest,
+ keep_alive: bool,
+ #[allow(dead_code)]
+ upgrade: bool,
+ content_read: usize,
+ content_length: Option<u64>,
+ remaining_chunk_size: Option<usize>,
+ te_chunked: bool,
+ expect_continue: bool,
+}
+
+// SAFETY: Sent from server thread to JS thread.
+// See comment above for `socket`.
+unsafe impl Send for NextRequest {}
+
+impl NextRequest {
+ #[inline(always)]
+ pub fn socket<'a>(&self) -> &'a mut Stream {
+ // SAFETY: Dereferencing is safe until server thread detaches socket or finishes.
+ unsafe { &mut *self.socket }
+ }
+}
+
+#[op]
+fn op_flash_respond(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+ response: StringOrBuffer,
+ maybe_body: Option<ZeroCopyBuf>,
+ shutdown: bool,
+) {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+
+ let mut close = false;
+ let sock = match shutdown {
+ true => {
+ let tx = ctx.response.remove(&token).unwrap();
+ close = !tx.keep_alive;
+ tx.socket()
+ }
+ // In case of a websocket upgrade or streaming response.
+ false => {
+ let tx = ctx.response.get(&token).unwrap();
+ tx.socket()
+ }
+ };
+
+ sock.read_tx.take();
+ sock.read_rx.take();
+
+ let _ = sock.write(&response);
+ if let Some(response) = maybe_body {
+ let _ = sock.write(format!("{:x}", response.len()).as_bytes());
+ let _ = sock.write(b"\r\n");
+ let _ = sock.write(&response);
+ let _ = sock.write(b"\r\n");
+ }
+
+ // server is done writing and request doesn't want to kept alive.
+ if shutdown && close {
+ match &mut sock.inner {
+ InnerStream::Tcp(stream) => {
+ // Typically shutdown shouldn't fail.
+ let _ = stream.shutdown(std::net::Shutdown::Both);
+ }
+ InnerStream::Tls(stream) => {
+ let _ = stream.sock.shutdown(std::net::Shutdown::Both);
+ }
+ }
+ }
+}
+
+#[op]
+fn op_flash_respond_chuncked(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+ response: Option<ZeroCopyBuf>,
+ shutdown: bool,
+) {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ match response {
+ Some(response) => {
+ respond_chunked(ctx, token, shutdown, Some(&response));
+ }
+ None => {
+ respond_chunked(ctx, token, shutdown, None);
+ }
+ }
+}
+
+#[op]
+async fn op_flash_write_resource(
+ op_state: Rc<RefCell<OpState>>,
+ response: StringOrBuffer,
+ server_id: u32,
+ token: u32,
+ resource_id: deno_core::ResourceId,
+) -> Result<(), AnyError> {
+ let resource = op_state.borrow_mut().resource_table.take_any(resource_id)?;
+ let sock = {
+ let op_state = &mut op_state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ ctx.response.remove(&token).unwrap().socket()
+ };
+
+ drop(op_state);
+ let _ = sock.write(&response);
+
+ #[cfg(unix)]
+ {
+ use std::os::unix::io::AsRawFd;
+ if let InnerStream::Tcp(stream_handle) = &sock.inner {
+ let stream_handle = stream_handle.as_raw_fd();
+ if let Some(fd) = resource.clone().backing_fd() {
+ // SAFETY: all-zero byte-pattern is a valid value for libc::stat.
+ let mut stat: libc::stat = unsafe { std::mem::zeroed() };
+ // SAFETY: call to libc::fstat.
+ if unsafe { libc::fstat(fd, &mut stat) } >= 0 {
+ let _ = sock.write(
+ format!("Content-Length: {}\r\n\r\n", stat.st_size).as_bytes(),
+ );
+ let tx = sendfile::SendFile {
+ io: (fd, stream_handle),
+ written: 0,
+ };
+ tx.await?;
+ return Ok(());
+ }
+ }
+ }
+ }
+
+ let _ = sock.write(b"Transfer-Encoding: chunked\r\n\r\n");
+ loop {
+ let vec = vec![0u8; 64 * 1024]; // 64KB
+ let buf = ZeroCopyBuf::new_temp(vec);
+ let (nread, buf) = resource.clone().read_return(buf).await?;
+ if nread == 0 {
+ let _ = sock.write(b"0\r\n\r\n");
+ break;
+ }
+ let response = &buf[..nread];
+
+ let _ = sock.write(format!("{:x}", response.len()).as_bytes());
+ let _ = sock.write(b"\r\n");
+ let _ = sock.write(response);
+ let _ = sock.write(b"\r\n");
+ }
+
+ resource.close();
+ Ok(())
+}
+
+pub struct RespondFast;
+
+impl fast_api::FastFunction for RespondFast {
+ fn function(&self) -> *const c_void {
+ op_flash_respond_fast as *const c_void
+ }
+
+ fn args(&self) -> &'static [fast_api::Type] {
+ &[
+ fast_api::Type::V8Value,
+ fast_api::Type::Uint32,
+ fast_api::Type::TypedArray(fast_api::CType::Uint8),
+ fast_api::Type::Bool,
+ ]
+ }
+
+ fn return_type(&self) -> fast_api::CType {
+ fast_api::CType::Void
+ }
+}
+
+fn flash_respond(
+ ctx: &mut ServerContext,
+ token: u32,
+ shutdown: bool,
+ response: &[u8],
+) {
+ let mut close = false;
+ let sock = match shutdown {
+ true => {
+ let tx = ctx.response.remove(&token).unwrap();
+ close = !tx.keep_alive;
+ tx.socket()
+ }
+ // In case of a websocket upgrade or streaming response.
+ false => {
+ let tx = ctx.response.get(&token).unwrap();
+ tx.socket()
+ }
+ };
+
+ sock.read_tx.take();
+ sock.read_rx.take();
+
+ let _ = sock.write(response);
+ // server is done writing and request doesn't want to kept alive.
+ if shutdown && close {
+ match &mut sock.inner {
+ InnerStream::Tcp(stream) => {
+ // Typically shutdown shouldn't fail.
+ let _ = stream.shutdown(std::net::Shutdown::Both);
+ }
+ InnerStream::Tls(stream) => {
+ let _ = stream.sock.shutdown(std::net::Shutdown::Both);
+ }
+ }
+ }
+}
+
+unsafe fn op_flash_respond_fast(
+ recv: v8::Local<v8::Object>,
+ token: u32,
+ response: *const fast_api::FastApiTypedArray<u8>,
+ shutdown: bool,
+) {
+ let ptr =
+ recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
+ let ctx = &mut *(ptr as *mut ServerContext);
+
+ let response = &*response;
+ if let Some(response) = response.get_storage_if_aligned() {
+ flash_respond(ctx, token, shutdown, response);
+ } else {
+ todo!();
+ }
+}
+
+pub struct RespondChunkedFast;
+
+impl fast_api::FastFunction for RespondChunkedFast {
+ fn function(&self) -> *const c_void {
+ op_flash_respond_chunked_fast as *const c_void
+ }
+
+ fn args(&self) -> &'static [fast_api::Type] {
+ &[
+ fast_api::Type::V8Value,
+ fast_api::Type::Uint32,
+ fast_api::Type::TypedArray(fast_api::CType::Uint8),
+ fast_api::Type::Bool,
+ ]
+ }
+
+ fn return_type(&self) -> fast_api::CType {
+ fast_api::CType::Void
+ }
+}
+
+unsafe fn op_flash_respond_chunked_fast(
+ recv: v8::Local<v8::Object>,
+ token: u32,
+ response: *const fast_api::FastApiTypedArray<u8>,
+ shutdown: bool,
+) {
+ let ptr =
+ recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
+ let ctx = &mut *(ptr as *mut ServerContext);
+
+ let response = &*response;
+ if let Some(response) = response.get_storage_if_aligned() {
+ respond_chunked(ctx, token, shutdown, Some(response));
+ } else {
+ todo!();
+ }
+}
+
+fn respond_chunked(
+ ctx: &mut ServerContext,
+ token: u32,
+ shutdown: bool,
+ response: Option<&[u8]>,
+) {
+ let sock = match shutdown {
+ true => {
+ let tx = ctx.response.remove(&token).unwrap();
+ tx.socket()
+ }
+ // In case of a websocket upgrade or streaming response.
+ false => {
+ let tx = ctx.response.get(&token).unwrap();
+ tx.socket()
+ }
+ };
+
+ if let Some(response) = response {
+ let _ = sock.write(format!("{:x}", response.len()).as_bytes());
+ let _ = sock.write(b"\r\n");
+ let _ = sock.write(response);
+ let _ = sock.write(b"\r\n");
+ }
+
+ // The last chunk
+ if shutdown {
+ let _ = sock.write(b"0\r\n\r\n");
+ }
+ sock.reattach_ownership();
+}
+
+macro_rules! get_request {
+ ($op_state: ident, $token: ident) => {
+ get_request!($op_state, 0, $token)
+ };
+ ($op_state: ident, $server_id: expr, $token: ident) => {{
+ let flash_ctx = $op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap();
+ ctx.response.get_mut(&$token).unwrap()
+ }};
+}
+
+#[repr(u32)]
+pub enum Method {
+ GET = 0,
+ HEAD,
+ CONNECT,
+ PUT,
+ DELETE,
+ OPTIONS,
+ TRACE,
+ POST,
+ PATCH,
+}
+
+#[inline]
+fn get_method(req: &mut NextRequest) -> u32 {
+ let method = match req.inner.req.method.unwrap() {
+ "GET" => Method::GET,
+ "POST" => Method::POST,
+ "PUT" => Method::PUT,
+ "DELETE" => Method::DELETE,
+ "OPTIONS" => Method::OPTIONS,
+ "HEAD" => Method::HEAD,
+ "PATCH" => Method::PATCH,
+ "TRACE" => Method::TRACE,
+ "CONNECT" => Method::CONNECT,
+ _ => Method::GET,
+ };
+ method as u32
+}
+
+#[op]
+fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 {
+ let req = get_request!(state, server_id, token);
+ get_method(req)
+}
+
+#[op]
+async fn op_flash_close_server(state: Rc<RefCell<OpState>>, server_id: u32) {
+ let close_tx = {
+ let mut op_state = state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ ctx.cancel_handle.cancel();
+ ctx.close_tx.clone()
+ };
+ let _ = close_tx.send(()).await;
+}
+
+#[op]
+fn op_flash_path(
+ state: Rc<RefCell<OpState>>,
+ server_id: u32,
+ token: u32,
+) -> String {
+ let mut op_state = state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ ctx
+ .response
+ .get(&token)
+ .unwrap()
+ .inner
+ .req
+ .path
+ .unwrap()
+ .to_string()
+}
+
+#[inline]
+fn next_request_sync(ctx: &mut ServerContext) -> u32 {
+ let mut tokens = 0;
+ while let Ok(token) = ctx.rx.try_recv() {
+ ctx.response.insert(tokens, token);
+ tokens += 1;
+ }
+ tokens
+}
+
+pub struct NextRequestFast;
+
+impl fast_api::FastFunction for NextRequestFast {
+ fn function(&self) -> *const c_void {
+ op_flash_next_fast as *const c_void
+ }
+
+ fn args(&self) -> &'static [fast_api::Type] {
+ &[fast_api::Type::V8Value]
+ }
+
+ fn return_type(&self) -> fast_api::CType {
+ fast_api::CType::Uint32
+ }
+}
+
+unsafe fn op_flash_next_fast(recv: v8::Local<v8::Object>) -> u32 {
+ let ptr =
+ recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
+ let ctx = &mut *(ptr as *mut ServerContext);
+ next_request_sync(ctx)
+}
+
+pub struct GetMethodFast;
+
+impl fast_api::FastFunction for GetMethodFast {
+ fn function(&self) -> *const c_void {
+ op_flash_get_method_fast as *const c_void
+ }
+
+ fn args(&self) -> &'static [fast_api::Type] {
+ &[fast_api::Type::V8Value, fast_api::Type::Uint32]
+ }
+
+ fn return_type(&self) -> fast_api::CType {
+ fast_api::CType::Uint32
+ }
+}
+
+unsafe fn op_flash_get_method_fast(
+ recv: v8::Local<v8::Object>,
+ token: u32,
+) -> u32 {
+ let ptr =
+ recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
+ let ctx = &mut *(ptr as *mut ServerContext);
+ let req = ctx.response.get_mut(&token).unwrap();
+ get_method(req)
+}
+
+// Fast calls
+#[op(v8)]
+fn op_flash_make_request<'scope>(
+ scope: &mut v8::HandleScope<'scope>,
+ state: &mut OpState,
+) -> serde_v8::Value<'scope> {
+ let object_template = v8::ObjectTemplate::new(scope);
+ assert!(object_template
+ .set_internal_field_count((V8_WRAPPER_OBJECT_INDEX + 1) as usize));
+ let obj = object_template.new_instance(scope).unwrap();
+ let ctx = {
+ let flash_ctx = state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&0).unwrap();
+ ctx as *mut ServerContext
+ };
+ obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _);
+
+ // nextRequest
+ {
+ let builder = v8::FunctionTemplate::builder(
+ |_: &mut v8::HandleScope,
+ args: v8::FunctionCallbackArguments,
+ mut rv: v8::ReturnValue| {
+ let external: v8::Local<v8::External> =
+ args.data().unwrap().try_into().unwrap();
+ // SAFETY: This external is guaranteed to be a pointer to a ServerContext
+ let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
+ rv.set_uint32(next_request_sync(ctx));
+ },
+ )
+ .data(v8::External::new(scope, ctx as *mut _).into());
+
+ let func = builder.build_fast(scope, &NextRequestFast, None);
+ let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
+
+ let key = v8::String::new(scope, "nextRequest").unwrap();
+ obj.set(scope, key.into(), func).unwrap();
+ }
+
+ // getMethod
+ {
+ let builder = v8::FunctionTemplate::builder(
+ |scope: &mut v8::HandleScope,
+ args: v8::FunctionCallbackArguments,
+ mut rv: v8::ReturnValue| {
+ let external: v8::Local<v8::External> =
+ args.data().unwrap().try_into().unwrap();
+ // SAFETY: This external is guaranteed to be a pointer to a ServerContext
+ let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
+ let token = args.get(0).uint32_value(scope).unwrap();
+ let req = ctx.response.get_mut(&token).unwrap();
+ rv.set_uint32(get_method(req));
+ },
+ )
+ .data(v8::External::new(scope, ctx as *mut _).into());
+
+ let func = builder.build_fast(scope, &GetMethodFast, None);
+ let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
+
+ let key = v8::String::new(scope, "getMethod").unwrap();
+ obj.set(scope, key.into(), func).unwrap();
+ }
+
+ // respondChunked
+ {
+ let builder = v8::FunctionTemplate::builder(
+ |scope: &mut v8::HandleScope,
+ args: v8::FunctionCallbackArguments,
+ _: v8::ReturnValue| {
+ let external: v8::Local<v8::External> =
+ args.data().unwrap().try_into().unwrap();
+ // SAFETY: This external is guaranteed to be a pointer to a ServerContext
+ let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
+
+ let token = args.get(0).uint32_value(scope).unwrap();
+
+ let response: v8::Local<v8::ArrayBufferView> =
+ args.get(1).try_into().unwrap();
+ let ab = response.buffer(scope).unwrap();
+ let store = ab.get_backing_store();
+ let (offset, len) = (response.byte_offset(), response.byte_length());
+ // SAFETY: v8::SharedRef<v8::BackingStore> is similar to Arc<[u8]>,
+ // it points to a fixed continuous slice of bytes on the heap.
+ // We assume it's initialized and thus safe to read (though may not contain meaningful data)
+ let response = unsafe {
+ &*(&store[offset..offset + len] as *const _ as *const [u8])
+ };
+
+ let shutdown = args.get(2).boolean_value(scope);
+
+ respond_chunked(ctx, token, shutdown, Some(response));
+ },
+ )
+ .data(v8::External::new(scope, ctx as *mut _).into());
+
+ let func = builder.build_fast(scope, &RespondChunkedFast, None);
+ let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
+
+ let key = v8::String::new(scope, "respondChunked").unwrap();
+ obj.set(scope, key.into(), func).unwrap();
+ }
+
+ // respond
+ {
+ let builder = v8::FunctionTemplate::builder(
+ |scope: &mut v8::HandleScope,
+ args: v8::FunctionCallbackArguments,
+ _: v8::ReturnValue| {
+ let external: v8::Local<v8::External> =
+ args.data().unwrap().try_into().unwrap();
+ // SAFETY: This external is guaranteed to be a pointer to a ServerContext
+ let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
+
+ let token = args.get(0).uint32_value(scope).unwrap();
+
+ let response: v8::Local<v8::ArrayBufferView> =
+ args.get(1).try_into().unwrap();
+ let ab = response.buffer(scope).unwrap();
+ let store = ab.get_backing_store();
+ let (offset, len) = (response.byte_offset(), response.byte_length());
+ // SAFETY: v8::SharedRef<v8::BackingStore> is similar to Arc<[u8]>,
+ // it points to a fixed continuous slice of bytes on the heap.
+ // We assume it's initialized and thus safe to read (though may not contain meaningful data)
+ let response = unsafe {
+ &*(&store[offset..offset + len] as *const _ as *const [u8])
+ };
+
+ let shutdown = args.get(2).boolean_value(scope);
+
+ flash_respond(ctx, token, shutdown, response);
+ },
+ )
+ .data(v8::External::new(scope, ctx as *mut _).into());
+
+ let func = builder.build_fast(scope, &RespondFast, None);
+ let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
+
+ let key = v8::String::new(scope, "respond").unwrap();
+ obj.set(scope, key.into(), func).unwrap();
+ }
+
+ let value: v8::Local<v8::Value> = obj.into();
+ value.into()
+}
+
+#[inline]
+fn has_body_stream(req: &NextRequest) -> bool {
+ let sock = req.socket();
+ sock.read_rx.is_some()
+}
+
+#[op]
+fn op_flash_has_body_stream(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+) -> bool {
+ let req = get_request!(op_state, server_id, token);
+ has_body_stream(req)
+}
+
+#[op]
+fn op_flash_headers(
+ state: Rc<RefCell<OpState>>,
+ server_id: u32,
+ token: u32,
+) -> Result<Vec<(ByteString, ByteString)>, AnyError> {
+ let mut op_state = state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx
+ .servers
+ .get_mut(&server_id)
+ .ok_or_else(|| type_error("server closed"))?;
+ let inner_req = &ctx
+ .response
+ .get(&token)
+ .ok_or_else(|| type_error("request closed"))?
+ .inner
+ .req;
+ Ok(
+ inner_req
+ .headers
+ .iter()
+ .map(|h| (h.name.as_bytes().into(), h.value.into()))
+ .collect(),
+ )
+}
+
+// Remember the first packet we read? It probably also has some body data. This op quickly copies it into
+// a buffer and sets up channels for streaming the rest.
+#[op]
+fn op_flash_first_packet(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+) -> Result<Option<ZeroCopyBuf>, AnyError> {
+ let tx = get_request!(op_state, server_id, token);
+ let sock = tx.socket();
+
+ if !tx.te_chunked && tx.content_length.is_none() {
+ return Ok(None);
+ }
+
+ if tx.expect_continue {
+ let _ = sock.write(b"HTTP/1.1 100 Continue\r\n\r\n");
+ tx.expect_continue = false;
+ }
+
+ let buffer = &tx.inner.buffer[tx.inner.body_offset..tx.inner.body_len];
+ // Oh there is nothing here.
+ if buffer.is_empty() {
+ return Ok(Some(ZeroCopyBuf::empty()));
+ }
+
+ if tx.te_chunked {
+ let mut buf = vec![0; 1024];
+ let mut offset = 0;
+ let mut decoder = chunked::Decoder::new(
+ std::io::Cursor::new(buffer),
+ tx.remaining_chunk_size,
+ );
+
+ loop {
+ match decoder.read(&mut buf[offset..]) {
+ Ok(n) => {
+ tx.remaining_chunk_size = decoder.remaining_chunks_size;
+ offset += n;
+
+ if n == 0 {
+ tx.te_chunked = false;
+ buf.truncate(offset);
+ return Ok(Some(buf.into()));
+ }
+
+ if offset < buf.len()
+ && decoder.source.position() < buffer.len() as u64
+ {
+ continue;
+ }
+
+ buf.truncate(offset);
+ return Ok(Some(buf.into()));
+ }
+ Err(e) => {
+ return Err(type_error(format!("{}", e)));
+ }
+ }
+ }
+ }
+
+ tx.content_length
+ .ok_or_else(|| type_error("no content-length"))?;
+ tx.content_read += buffer.len();
+
+ Ok(Some(buffer.to_vec().into()))
+}
+
+#[op]
+async fn op_flash_read_body(
+ state: Rc<RefCell<OpState>>,
+ server_id: u32,
+ token: u32,
+ mut buf: ZeroCopyBuf,
+) -> usize {
+ // SAFETY: we cannot hold op_state borrow across the await point. The JS caller
+ // is responsible for ensuring this is not called concurrently.
+ let ctx = unsafe {
+ {
+ let op_state = &mut state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ flash_ctx.servers.get_mut(&server_id).unwrap() as *mut ServerContext
+ }
+ .as_mut()
+ .unwrap()
+ };
+ let tx = ctx.response.get_mut(&token).unwrap();
+
+ if tx.te_chunked {
+ let mut decoder =
+ chunked::Decoder::new(tx.socket(), tx.remaining_chunk_size);
+ loop {
+ let sock = tx.socket();
+
+ let _lock = sock.read_lock.lock().unwrap();
+ match decoder.read(&mut buf) {
+ Ok(n) => {
+ tx.remaining_chunk_size = decoder.remaining_chunks_size;
+ return n;
+ }
+ Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => {
+ panic!("chunked read error: {}", e);
+ }
+ Err(_) => {
+ drop(_lock);
+ sock.read_rx.as_mut().unwrap().recv().await.unwrap();
+ }
+ }
+ }
+ }
+
+ if let Some(content_length) = tx.content_length {
+ let sock = tx.socket();
+ let l = sock.read_lock.clone();
+
+ loop {
+ let _lock = l.lock().unwrap();
+ if tx.content_read >= content_length as usize {
+ return 0;
+ }
+ match sock.read(&mut buf) {
+ Ok(n) => {
+ tx.content_read += n;
+ return n;
+ }
+ _ => {
+ drop(_lock);
+ sock.read_rx.as_mut().unwrap().recv().await.unwrap();
+ }
+ }
+ }
+ }
+
+ 0
+}
+
+// https://github.com/hyperium/hyper/blob/0c8ee93d7f557afc63ca2a5686d19071813ab2b7/src/headers.rs#L67
+#[inline]
+fn from_digits(bytes: &[u8]) -> Option<u64> {
+ // cannot use FromStr for u64, since it allows a signed prefix
+ let mut result = 0u64;
+ const RADIX: u64 = 10;
+ if bytes.is_empty() {
+ return None;
+ }
+ for &b in bytes {
+ // can't use char::to_digit, since we haven't verified these bytes
+ // are utf-8.
+ match b {
+ b'0'..=b'9' => {
+ result = result.checked_mul(RADIX)?;
+ result = result.checked_add((b - b'0') as u64)?;
+ }
+ _ => {
+ return None;
+ }
+ }
+ }
+ Some(result)
+}
+
+#[inline]
+fn connection_has(value: &HeaderValue, needle: &str) -> bool {
+ if let Ok(s) = value.to_str() {
+ for val in s.split(',') {
+ if val.trim().eq_ignore_ascii_case(needle) {
+ return true;
+ }
+ }
+ }
+ false
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListenOpts {
+ cert: Option<String>,
+ key: Option<String>,
+ hostname: String,
+ port: u16,
+ use_tls: bool,
+}
+
+fn run_server(
+ tx: mpsc::Sender<NextRequest>,
+ listening_tx: mpsc::Sender<()>,
+ mut close_rx: mpsc::Receiver<()>,
+ addr: SocketAddr,
+ maybe_cert: Option<String>,
+ maybe_key: Option<String>,
+) -> Result<(), AnyError> {
+ let mut listener = TcpListener::bind(addr)?;
+ let mut poll = Poll::new()?;
+ let token = Token(0);
+ poll
+ .registry()
+ .register(&mut listener, token, Interest::READABLE)
+ .unwrap();
+
+ let tls_context: Option<Arc<rustls::ServerConfig>> = {
+ if let Some(cert) = maybe_cert {
+ let key = maybe_key.unwrap();
+ let certificate_chain: Vec<rustls::Certificate> =
+ load_certs(&mut BufReader::new(cert.as_bytes()))?;
+ let private_key = load_private_keys(key.as_bytes())?.remove(0);
+
+ let config = rustls::ServerConfig::builder()
+ .with_safe_defaults()
+ .with_no_client_auth()
+ .with_single_cert(certificate_chain, private_key)
+ .expect("invalid key or certificate");
+ Some(Arc::new(config))
+ } else {
+ None
+ }
+ };
+
+ listening_tx.blocking_send(()).unwrap();
+ let mut sockets = HashMap::with_capacity(1000);
+ let mut counter: usize = 1;
+ let mut events = Events::with_capacity(1024);
+ 'outer: loop {
+ let result = close_rx.try_recv();
+ if result.is_ok() {
+ break 'outer;
+ }
+ // FIXME(bartlomieju): how does Tokio handle it? I just put random 100ms
+ // timeout here to handle close signal.
+ match poll.poll(&mut events, Some(Duration::from_millis(100))) {
+ Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
+ Err(e) => panic!("{}", e),
+ Ok(()) => (),
+ }
+ 'events: for event in &events {
+ if close_rx.try_recv().is_ok() {
+ break 'outer;
+ }
+ let token = event.token();
+ match token {
+ Token(0) => loop {
+ match listener.accept() {
+ Ok((mut socket, _)) => {
+ counter += 1;
+ let token = Token(counter);
+ poll
+ .registry()
+ .register(&mut socket, token, Interest::READABLE)
+ .unwrap();
+ let socket = match tls_context {
+ Some(ref tls_conf) => {
+ let connection =
+ rustls::ServerConnection::new(tls_conf.clone()).unwrap();
+ InnerStream::Tls(Box::new(rustls::StreamOwned::new(
+ connection, socket,
+ )))
+ }
+ None => InnerStream::Tcp(socket),
+ };
+ let stream = Box::pin(Stream {
+ inner: socket,
+ detached: false,
+ read_rx: None,
+ read_tx: None,
+ read_lock: Arc::new(Mutex::new(())),
+ parse_done: ParseStatus::None,
+ buffer: UnsafeCell::new(vec![0_u8; 1024]),
+ _pin: PhantomPinned,
+ });
+
+ trace!("New connection: {}", token.0);
+ sockets.insert(token, stream);
+ }
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
+ Err(_) => break,
+ }
+ },
+ token => {
+ let socket = sockets.get_mut(&token).unwrap();
+ // SAFETY: guarantee that we will never move the data out of the mutable reference.
+ let socket = unsafe {
+ let mut_ref: Pin<&mut Stream> = Pin::as_mut(socket);
+ Pin::get_unchecked_mut(mut_ref)
+ };
+ let sock_ptr = socket as *mut _;
+
+ if socket.detached {
+ match &mut socket.inner {
+ InnerStream::Tcp(ref mut socket) => {
+ poll.registry().deregister(socket).unwrap();
+ }
+ InnerStream::Tls(_) => {
+ todo!("upgrade tls not implemented");
+ }
+ }
+
+ let boxed = sockets.remove(&token).unwrap();
+ std::mem::forget(boxed);
+ trace!("Socket detached: {}", token.0);
+ continue;
+ }
+
+ debug_assert!(event.is_readable());
+
+ trace!("Socket readable: {}", token.0);
+ if let Some(tx) = &socket.read_tx {
+ {
+ let _l = socket.read_lock.lock().unwrap();
+ }
+ trace!("Sending readiness notification: {}", token.0);
+ let _ = tx.blocking_send(());
+
+ continue;
+ }
+
+ let mut headers = vec![httparse::EMPTY_HEADER; 40];
+ let mut req = httparse::Request::new(&mut headers);
+ let body_offset;
+ let body_len;
+ loop {
+ // SAFETY: It is safe for the read buf to be mutable here.
+ let buffer = unsafe { &mut *socket.buffer.get() };
+ let offset = match socket.parse_done {
+ ParseStatus::None => 0,
+ ParseStatus::Ongoing(offset) => offset,
+ };
+ if offset >= buffer.len() {
+ buffer.resize(offset * 2, 0);
+ }
+ let nread = socket.read(&mut buffer[offset..]);
+
+ match nread {
+ Ok(0) => {
+ trace!("Socket closed: {}", token.0);
+ // FIXME: don't remove while JS is writing!
+ // sockets.remove(&token);
+ continue 'events;
+ }
+ Ok(read) => match req.parse(&buffer[..offset + read]) {
+ Ok(httparse::Status::Complete(n)) => {
+ body_offset = n;
+ body_len = offset + read;
+ socket.parse_done = ParseStatus::None;
+ break;
+ }
+ Ok(httparse::Status::Partial) => {
+ socket.parse_done = ParseStatus::Ongoing(offset + read);
+ continue;
+ }
+ Err(_) => {
+ let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
+ continue 'events;
+ }
+ },
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ break 'events
+ }
+ Err(_) => break 'events,
+ }
+ }
+
+ debug_assert_eq!(socket.parse_done, ParseStatus::None);
+ if let Some(method) = &req.method {
+ if method == &"POST" || method == &"PUT" {
+ let (tx, rx) = mpsc::channel(100);
+ socket.read_tx = Some(tx);
+ socket.read_rx = Some(rx);
+ }
+ }
+
+ // SAFETY: It is safe for the read buf to be mutable here.
+ let buffer = unsafe { &mut *socket.buffer.get() };
+ let inner_req = InnerRequest {
+ // SAFETY: backing buffer is pinned and lives as long as the request.
+ req: unsafe { transmute::<httparse::Request<'_, '_>, _>(req) },
+ // SAFETY: backing buffer is pinned and lives as long as the request.
+ _headers: unsafe {
+ transmute::<Vec<httparse::Header<'_>>, _>(headers)
+ },
+ buffer: Pin::new(
+ replace(buffer, vec![0_u8; 1024]).into_boxed_slice(),
+ ),
+ body_offset,
+ body_len,
+ };
+ // h1
+ // https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177
+ // https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127
+ let mut keep_alive = inner_req.req.version.unwrap() == 1;
+ let mut upgrade = false;
+ let mut expect_continue = false;
+ let mut te = false;
+ let mut te_chunked = false;
+ let mut content_length = None;
+ for header in inner_req.req.headers.iter() {
+ match HeaderName::from_bytes(header.name.as_bytes()) {
+ Ok(CONNECTION) => {
+ // SAFETY: illegal bytes are validated by httparse.
+ let value = unsafe {
+ HeaderValue::from_maybe_shared_unchecked(header.value)
+ };
+ if keep_alive {
+ // 1.1
+ keep_alive = !connection_has(&value, "close");
+ } else {
+ // 1.0
+ keep_alive = connection_has(&value, "keep-alive");
+ }
+ }
+ Ok(UPGRADE) => {
+ upgrade = inner_req.req.version.unwrap() == 1;
+ }
+ Ok(TRANSFER_ENCODING) => {
+ // https://tools.ietf.org/html/rfc7230#section-3.3.3
+ debug_assert!(inner_req.req.version.unwrap() == 1);
+ // Two states for Transfer-Encoding because we want to make sure Content-Length handling knows it.
+ te = true;
+ content_length = None;
+ // SAFETY: illegal bytes are validated by httparse.
+ let value = unsafe {
+ HeaderValue::from_maybe_shared_unchecked(header.value)
+ };
+ if let Ok(Some(encoding)) =
+ value.to_str().map(|s| s.rsplit(',').next())
+ {
+ // Chunked must always be the last encoding
+ if encoding.trim().eq_ignore_ascii_case("chunked") {
+ te_chunked = true;
+ }
+ }
+ }
+ // Transfer-Encoding overrides the Content-Length.
+ Ok(CONTENT_LENGTH) if !te => {
+ if let Some(len) = from_digits(header.value) {
+ if let Some(prev) = content_length {
+ if prev != len {
+ let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
+ continue 'events;
+ }
+ continue;
+ }
+ content_length = Some(len);
+ } else {
+ let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
+ continue 'events;
+ }
+ }
+ Ok(EXPECT) if inner_req.req.version.unwrap() != 0 => {
+ expect_continue =
+ header.value.eq_ignore_ascii_case(b"100-continue");
+ }
+ _ => {}
+ }
+ }
+
+ // There is Transfer-Encoding but its not chunked.
+ if te && !te_chunked {
+ let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
+ continue 'events;
+ }
+
+ tx.blocking_send(NextRequest {
+ socket: sock_ptr,
+ // SAFETY: headers backing buffer outlives the mio event loop ('static)
+ inner: inner_req,
+ keep_alive,
+ upgrade,
+ te_chunked,
+ remaining_chunk_size: None,
+ content_read: 0,
+ content_length,
+ expect_continue,
+ })
+ .ok();
+ }
+ }
+ }
+ }
+
+ Ok(())
+}
+
+#[op]
+fn op_flash_serve<P>(
+ state: &mut OpState,
+ opts: ListenOpts,
+) -> Result<u32, AnyError>
+where
+ P: FlashPermissions + 'static,
+{
+ if opts.use_tls {
+ check_unstable(state, "Deno.serveTls");
+ } else {
+ check_unstable(state, "Deno.serve");
+ }
+ state
+ .borrow_mut::<P>()
+ .check_net(&(&opts.hostname, Some(opts.port)))?;
+ let addr = SocketAddr::new(opts.hostname.parse()?, opts.port);
+ let (tx, rx) = mpsc::channel(100);
+ let (close_tx, close_rx) = mpsc::channel(1);
+ let (listening_tx, listening_rx) = mpsc::channel(1);
+ let ctx = ServerContext {
+ _addr: addr,
+ tx,
+ rx,
+ response: HashMap::with_capacity(1000),
+ close_tx,
+ listening_rx: Some(listening_rx),
+ cancel_handle: CancelHandle::new_rc(),
+ };
+ let tx = ctx.tx.clone();
+ let maybe_cert = opts.cert;
+ let maybe_key = opts.key;
+ let join_handle = tokio::task::spawn_blocking(move || {
+ run_server(tx, listening_tx, close_rx, addr, maybe_cert, maybe_key)
+ });
+ let flash_ctx = state.borrow_mut::<FlashContext>();
+ let server_id = flash_ctx.next_server_id;
+ flash_ctx.next_server_id += 1;
+ flash_ctx.join_handles.insert(server_id, join_handle);
+ flash_ctx.servers.insert(server_id, ctx);
+ Ok(server_id)
+}
+
+#[op]
+fn op_flash_wait_for_listening(
+ state: &mut OpState,
+ server_id: u32,
+) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
+ let mut listening_rx = {
+ let flash_ctx = state.borrow_mut::<FlashContext>();
+ let server_ctx = flash_ctx
+ .servers
+ .get_mut(&server_id)
+ .ok_or_else(|| type_error("server not found"))?;
+ server_ctx.listening_rx.take().unwrap()
+ };
+ Ok(async move {
+ listening_rx.recv().await;
+ Ok(())
+ })
+}
+
+#[op]
+fn op_flash_drive_server(
+ state: &mut OpState,
+ server_id: u32,
+) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
+ let join_handle = {
+ let flash_ctx = state.borrow_mut::<FlashContext>();
+ flash_ctx
+ .join_handles
+ .remove(&server_id)
+ .ok_or_else(|| type_error("server not found"))?
+ };
+ Ok(async move {
+ join_handle
+ .await
+ .map_err(|_| type_error("server join error"))??;
+ Ok(())
+ })
+}
+
+// Asychronous version of op_flash_next. This can be a bottleneck under
+// heavy load, it should be used as a fallback if there are no buffered
+// requests i.e `op_flash_next() == 0`.
+#[op]
+async fn op_flash_next_async(
+ op_state: Rc<RefCell<OpState>>,
+ server_id: u32,
+) -> u32 {
+ let ctx = {
+ let mut op_state = op_state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ ctx as *mut ServerContext
+ };
+ // SAFETY: we cannot hold op_state borrow across the await point. The JS caller
+ // is responsible for ensuring this is not called concurrently.
+ let ctx = unsafe { &mut *ctx };
+ let cancel_handle = &ctx.cancel_handle;
+ let mut tokens = 0;
+ while let Ok(token) = ctx.rx.try_recv() {
+ ctx.response.insert(tokens, token);
+ tokens += 1;
+ }
+ if tokens == 0 {
+ if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await {
+ ctx.response.insert(tokens, req);
+ tokens += 1;
+ }
+ }
+ tokens
+}
+
+// Syncrhonous version of op_flash_next_async. Under heavy load,
+// this can collect buffered requests from rx channel and return tokens in a single batch.
+//
+// perf: please do not add any arguments to this op. With optimizations enabled,
+// the ContextScope creation is optimized away and the op is as simple as:
+// f(info: *const v8::FunctionCallbackInfo) { let rv = ...; rv.set_uint32(op_flash_next()); }
+#[op]
+fn op_flash_next(op_state: &mut OpState) -> u32 {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&0).unwrap();
+ next_request_sync(ctx)
+}
+
+// Syncrhonous version of op_flash_next_async. Under heavy load,
+// this can collect buffered requests from rx channel and return tokens in a single batch.
+#[op]
+fn op_flash_next_server(op_state: &mut OpState, server_id: u32) -> u32 {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ next_request_sync(ctx)
+}
+
+// Wrapper type for tokio::net::TcpStream that implements
+// deno_websocket::UpgradedStream
+struct UpgradedStream(tokio::net::TcpStream);
+impl tokio::io::AsyncRead for UpgradedStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut tokio::io::ReadBuf,
+ ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
+ }
+}
+
+impl tokio::io::AsyncWrite for UpgradedStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
+ }
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_flush(cx)
+ }
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
+ }
+}
+
+impl deno_websocket::Upgraded for UpgradedStream {}
+
+#[inline]
+pub fn detach_socket(
+ ctx: &mut ServerContext,
+ token: u32,
+) -> Result<tokio::net::TcpStream, AnyError> {
+ // Two main 'hacks' to get this working:
+ // * make server thread forget about the socket. `detach_ownership` prevents the socket from being
+ // dropped on the server thread.
+ // * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we
+ // use raw fds.
+ let tx = ctx
+ .response
+ .remove(&token)
+ .ok_or_else(|| type_error("request closed"))?;
+ let stream = tx.socket();
+ // prevent socket from being dropped on server thread.
+ // TODO(@littledivy): Box-ify, since there is no overhead.
+ stream.detach_ownership();
+
+ #[cfg(unix)]
+ let std_stream = {
+ use std::os::unix::prelude::AsRawFd;
+ use std::os::unix::prelude::FromRawFd;
+ let fd = match stream.inner {
+ InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(),
+ _ => todo!(),
+ };
+ // SAFETY: `fd` is a valid file descriptor.
+ unsafe { std::net::TcpStream::from_raw_fd(fd) }
+ };
+ #[cfg(windows)]
+ let std_stream = {
+ use std::os::windows::prelude::AsRawSocket;
+ use std::os::windows::prelude::FromRawSocket;
+ let fd = match stream.inner {
+ InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(),
+ _ => todo!(),
+ };
+ // SAFETY: `fd` is a valid file descriptor.
+ unsafe { std::net::TcpStream::from_raw_socket(fd) }
+ };
+ let stream = tokio::net::TcpStream::from_std(std_stream)?;
+ Ok(stream)
+}
+
+#[op]
+async fn op_flash_upgrade_websocket(
+ state: Rc<RefCell<OpState>>,
+ server_id: u32,
+ token: u32,
+) -> Result<deno_core::ResourceId, AnyError> {
+ let stream = {
+ let op_state = &mut state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ detach_socket(flash_ctx.servers.get_mut(&server_id).unwrap(), token)?
+ };
+ deno_websocket::ws_create_server_stream(
+ &state,
+ Box::pin(UpgradedStream(stream)),
+ )
+ .await
+}
+
+pub struct Unstable(pub bool);
+
+fn check_unstable(state: &OpState, api_name: &str) {
+ let unstable = state.borrow::<Unstable>();
+
+ if !unstable.0 {
+ eprintln!(
+ "Unstable API '{}'. The --unstable flag must be provided.",
+ api_name
+ );
+ std::process::exit(70);
+ }
+}
+
+pub trait FlashPermissions {
+ fn check_net<T: AsRef<str>>(
+ &mut self,
+ _host: &(T, Option<u16>),
+ ) -> Result<(), AnyError>;
+}
+
+pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
+ Extension::builder()
+ .js(deno_core::include_js_files!(
+ prefix "deno:ext/flash",
+ "01_http.js",
+ ))
+ .ops(vec![
+ op_flash_serve::decl::<P>(),
+ op_flash_respond::decl(),
+ op_flash_respond_chuncked::decl(),
+ op_flash_method::decl(),
+ op_flash_path::decl(),
+ op_flash_headers::decl(),
+ op_flash_next::decl(),
+ op_flash_next_server::decl(),
+ op_flash_next_async::decl(),
+ op_flash_read_body::decl(),
+ op_flash_upgrade_websocket::decl(),
+ op_flash_drive_server::decl(),
+ op_flash_wait_for_listening::decl(),
+ op_flash_first_packet::decl(),
+ op_flash_has_body_stream::decl(),
+ op_flash_close_server::decl(),
+ op_flash_make_request::decl(),
+ op_flash_write_resource::decl(),
+ ])
+ .state(move |op_state| {
+ op_state.put(Unstable(unstable));
+ op_state.put(FlashContext {
+ next_server_id: 0,
+ join_handles: HashMap::default(),
+ servers: HashMap::default(),
+ });
+ Ok(())
+ })
+ .build()
+}
diff --git a/ext/flash/sendfile.rs b/ext/flash/sendfile.rs
new file mode 100644
index 000000000..4efa7bc35
--- /dev/null
+++ b/ext/flash/sendfile.rs
@@ -0,0 +1,81 @@
+// Forked from https://github.com/Thomasdezeeuw/sendfile/blob/024f82cd4dede9048392a5bd6d8afcd4d5aa83d5/src/lib.rs
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use std::future::Future;
+use std::io;
+use std::os::unix::io::RawFd;
+use std::pin::Pin;
+use std::task::{self, Poll};
+
+pub struct SendFile {
+ pub io: (RawFd, RawFd),
+ pub written: usize,
+}
+
+impl SendFile {
+ #[inline]
+ pub fn try_send(&mut self) -> Result<usize, std::io::Error> {
+ #[cfg(target_os = "linux")]
+ {
+ // This is the maximum the Linux kernel will write in a single call.
+ let count = 0x7ffff000;
+ let mut offset = self.written as libc::off_t;
+
+ // SAFETY: call to libc::sendfile()
+ let res =
+ unsafe { libc::sendfile(self.io.1, self.io.0, &mut offset, count) };
+ if res == -1 {
+ Err(io::Error::last_os_error())
+ } else {
+ self.written = offset as usize;
+ Ok(res as usize)
+ }
+ }
+ #[cfg(target_os = "macos")]
+ {
+ // Send all bytes.
+ let mut length = 0;
+ // On macOS `length` is value-result parameter. It determines the number
+ // of bytes to write and returns the number of bytes written also in
+ // case of `EAGAIN` errors.
+ // SAFETY: call to libc::sendfile()
+ let res = unsafe {
+ libc::sendfile(
+ self.io.0,
+ self.io.1,
+ self.written as libc::off_t,
+ &mut length,
+ std::ptr::null_mut(),
+ 0,
+ )
+ };
+ self.written += length as usize;
+ if res == -1 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(length as usize)
+ }
+ }
+ }
+}
+
+impl Future for SendFile {
+ type Output = Result<(), std::io::Error>;
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ _: &mut task::Context<'_>,
+ ) -> Poll<Self::Output> {
+ loop {
+ match self.try_send() {
+ Ok(0) => break Poll::Ready(Ok(())),
+ Ok(_) => continue,
+ Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+ break Poll::Pending
+ }
+ Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, // Try again.
+ Err(err) => break Poll::Ready(Err(err)),
+ }
+ }
+ }
+}
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index 7dfe86a75..6df26d09f 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -13,6 +13,7 @@
newInnerRequest,
newInnerResponse,
fromInnerResponse,
+ _flash,
} = window.__bootstrap.fetch;
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype, ops } = core;
@@ -475,6 +476,20 @@
}
function upgradeHttp(req) {
+ if (req[_flash]) {
+ // NOTE(bartlomieju):
+ // Access these fields so they are cached on `req` object, otherwise
+ // they wouldn't be available after the connection gets upgraded.
+ req.url;
+ req.method;
+ req.headers;
+
+ const { serverId, streamRid } = req[_flash];
+ const connRid = core.ops.op_flash_upgrade_http(streamRid, serverId);
+ // TODO(@littledivy): return already read first packet too.
+ return [new TcpConn(connRid), new Uint8Array()];
+ }
+
req[_deferred] = new Deferred();
return req[_deferred].promise;
}
@@ -483,5 +498,6 @@
HttpConn,
upgradeWebSocket,
upgradeHttp,
+ _ws,
};
})(this);
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 27d277654..d1b38fb42 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -874,6 +874,41 @@ fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> {
Ok(base64::encode(digest))
}
+struct UpgradedStream(hyper::upgrade::Upgraded);
+impl tokio::io::AsyncRead for UpgradedStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut tokio::io::ReadBuf,
+ ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
+ }
+}
+
+impl tokio::io::AsyncWrite for UpgradedStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
+ }
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_flush(cx)
+ }
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
+ }
+}
+
+impl deno_websocket::Upgraded for UpgradedStream {}
+
#[op]
async fn op_http_upgrade_websocket(
state: Rc<RefCell<OpState>>,
@@ -893,7 +928,9 @@ async fn op_http_upgrade_websocket(
};
let transport = hyper::upgrade::on(request).await?;
- let ws_rid = ws_create_server_stream(&state, transport).await?;
+ let ws_rid =
+ ws_create_server_stream(&state, Box::pin(UpgradedStream(transport)))
+ .await?;
Ok(ws_rid)
}
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 7f67e81ed..1b753572b 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -5897,6 +5897,7 @@
window.__bootstrap.streams = {
// Non-Public
+ _state,
isReadableStreamDisturbed,
errorReadableStream,
createProxy,
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index 0e642be3f..515f798ac 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -34,8 +34,11 @@ use std::cell::RefCell;
use std::convert::TryFrom;
use std::fmt;
use std::path::PathBuf;
+use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
+use tokio::io::AsyncRead;
+use tokio::io::AsyncWrite;
use tokio::net::TcpStream;
use tokio_rustls::rustls::RootCertStore;
use tokio_rustls::rustls::ServerName;
@@ -67,23 +70,25 @@ pub trait WebSocketPermissions {
/// would override previously used alias.
pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>);
-type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
+type ClientWsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
+type ServerWsStream = WebSocketStream<Pin<Box<dyn Upgraded>>>;
+
pub enum WebSocketStreamType {
Client {
- tx: AsyncRefCell<SplitSink<WsStream, Message>>,
- rx: AsyncRefCell<SplitStream<WsStream>>,
+ tx: AsyncRefCell<SplitSink<ClientWsStream, Message>>,
+ rx: AsyncRefCell<SplitStream<ClientWsStream>>,
},
Server {
- tx: AsyncRefCell<
- SplitSink<WebSocketStream<hyper::upgrade::Upgraded>, Message>,
- >,
- rx: AsyncRefCell<SplitStream<WebSocketStream<hyper::upgrade::Upgraded>>>,
+ tx: AsyncRefCell<SplitSink<ServerWsStream, Message>>,
+ rx: AsyncRefCell<SplitStream<ServerWsStream>>,
},
}
+pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {}
+
pub async fn ws_create_server_stream(
state: &Rc<RefCell<OpState>>,
- transport: hyper::upgrade::Upgraded,
+ transport: Pin<Box<dyn Upgraded>>,
) -> Result<ResourceId, AnyError> {
let ws_stream = WebSocketStream::from_raw_socket(
transport,
@@ -340,7 +345,7 @@ where
..Default::default()
}),
);
- let (stream, response): (WsStream, Response) =
+ let (stream, response): (ClientWsStream, Response) =
if let Some(cancel_resource) = cancel_resource {
client.or_cancel(cancel_resource.0.to_owned()).await?
} else {