summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock20
-rw-r--r--Cargo.toml4
-rw-r--r--cli/bench/http/deno_http_flash_ops.js37
-rw-r--r--cli/bench/http/deno_http_flash_ops_spawn.js18
-rw-r--r--cli/bench/http/deno_http_flash_post_bin.js16
-rw-r--r--cli/bench/http/deno_http_flash_post_bin.lua5
-rw-r--r--cli/bench/http/deno_http_flash_spawn.js18
-rw-r--r--cli/bench/http/deno_http_serve.js (renamed from cli/bench/http/deno_http_flash.js)0
-rw-r--r--cli/build.rs1
-rw-r--r--cli/tests/integration/run_tests.rs7
-rw-r--r--cli/tests/testdata/run/flash_shutdown/main.ts23
-rw-r--r--ext/flash/01_http.js793
-rw-r--r--ext/flash/Cargo.toml30
-rw-r--r--ext/flash/README.md7
-rw-r--r--ext/flash/chunked.rs273
-rw-r--r--ext/flash/lib.rs1543
-rw-r--r--ext/flash/request.rs49
-rw-r--r--ext/flash/sendfile.rs82
-rw-r--r--ext/flash/socket.rs151
-rw-r--r--runtime/Cargo.toml2
-rw-r--r--runtime/build.rs12
-rw-r--r--runtime/js/90_deno_ns.js2
-rw-r--r--runtime/lib.rs1
-rw-r--r--runtime/ops/http.rs19
-rw-r--r--runtime/permissions/mod.rs11
-rw-r--r--runtime/web_worker.rs1
-rw-r--r--runtime/worker.rs1
27 files changed, 1 insertions, 3125 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 10d7f335c..15275f911 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -958,25 +958,6 @@ dependencies = [
]
[[package]]
-name = "deno_flash"
-version = "0.33.0"
-dependencies = [
- "deno_core",
- "deno_tls",
- "deno_websocket",
- "http",
- "httparse",
- "libc",
- "log",
- "mio",
- "rustls",
- "rustls-pemfile",
- "serde",
- "socket2",
- "tokio",
-]
-
-[[package]]
name = "deno_fs"
version = "0.7.0"
dependencies = [
@@ -1181,7 +1162,6 @@ dependencies = [
"deno_crypto",
"deno_fetch",
"deno_ffi",
- "deno_flash",
"deno_fs",
"deno_http",
"deno_io",
diff --git a/Cargo.toml b/Cargo.toml
index 88fc8d2ce..c12b14af1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,7 +18,6 @@ members = [
"ext/console",
"ext/crypto",
"ext/fetch",
- "ext/flash",
"ext/ffi",
"ext/fs",
"ext/http",
@@ -62,7 +61,6 @@ deno_console = { version = "0.97.0", path = "./ext/console" }
deno_crypto = { version = "0.111.0", path = "./ext/crypto" }
deno_fetch = { version = "0.121.0", path = "./ext/fetch" }
deno_ffi = { version = "0.84.0", path = "./ext/ffi" }
-deno_flash = { version = "0.33.0", path = "./ext/flash" }
deno_fs = { version = "0.7.0", path = "./ext/fs" }
deno_http = { version = "0.92.0", path = "./ext/http" }
deno_io = { version = "0.7.0", path = "./ext/io" }
@@ -265,8 +263,6 @@ opt-level = 3
opt-level = 3
[profile.release.package.deno_http]
opt-level = 3
-[profile.release.package.deno_flash]
-opt-level = 3
[profile.release.package.deno_net]
opt-level = 3
[profile.release.package.deno_web]
diff --git a/cli/bench/http/deno_http_flash_ops.js b/cli/bench/http/deno_http_flash_ops.js
deleted file mode 100644
index 7b024f9af..000000000
--- a/cli/bench/http/deno_http_flash_ops.js
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-// deno-lint-ignore-file
-
-const {
- opAsync,
- ops: { op_flash_make_request, op_flash_serve },
- encode,
-} = Deno[Deno.internal].core;
-const addr = Deno.args[0] || "127.0.0.1:4500";
-const [hostname, port] = addr.split(":");
-const serverId = op_flash_serve({ hostname, port, reuseport: true });
-const serverPromise = opAsync("op_flash_drive_server", serverId);
-
-const fastOps = op_flash_make_request();
-function nextRequest() {
- return fastOps.nextRequest();
-}
-function respond(token, response) {
- return fastOps.respond(token, response, true);
-}
-
-const response = encode(
- "HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World",
-);
-let offset = 0;
-while (true) {
- let token = nextRequest();
- if (token === 0) token = await opAsync("op_flash_next_async", serverId);
- for (let i = offset; i < offset + token; i++) {
- respond(
- i,
- response,
- );
- }
- offset += token;
-}
diff --git a/cli/bench/http/deno_http_flash_ops_spawn.js b/cli/bench/http/deno_http_flash_ops_spawn.js
deleted file mode 100644
index b9d11462f..000000000
--- a/cli/bench/http/deno_http_flash_ops_spawn.js
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-if (Deno.build.os !== "linux") {
- throw new Error("SO_REUSEPORT is only supported on Linux");
-}
-
-const executable = Deno.execPath();
-const path = new URL("./deno_http_flash_ops.js", import.meta.url).pathname;
-// single flash instance runs on ~1.8 cores
-const cpus = navigator.hardwareConcurrency / 2;
-const processes = new Array(cpus);
-for (let i = 0; i < cpus; i++) {
- const proc = Deno.run({
- cmd: [executable, "run", "-A", "--unstable", path, Deno.args[0]],
- });
- processes.push(proc.status());
-}
-await Promise.all(processes);
diff --git a/cli/bench/http/deno_http_flash_post_bin.js b/cli/bench/http/deno_http_flash_post_bin.js
deleted file mode 100644
index b81553dcd..000000000
--- a/cli/bench/http/deno_http_flash_post_bin.js
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-const addr = Deno.args[0] || "127.0.0.1:4500";
-const [hostname, port] = addr.split(":");
-const { serve } = Deno;
-
-async function handler(request) {
- try {
- const buffer = await request.arrayBuffer();
- return new Response(buffer.byteLength);
- } catch (e) {
- console.log(e);
- }
-}
-
-serve(handler, { hostname, port });
diff --git a/cli/bench/http/deno_http_flash_post_bin.lua b/cli/bench/http/deno_http_flash_post_bin.lua
deleted file mode 100644
index c8f5d3e3f..000000000
--- a/cli/bench/http/deno_http_flash_post_bin.lua
+++ /dev/null
@@ -1,5 +0,0 @@
-wrk.method = "POST"
-wrk.headers["Content-Type"] = "application/octet-stream"
-
-file = io.open("./cli/bench/testdata/128k.bin", "rb")
-wrk.body = file:read("*a") \ No newline at end of file
diff --git a/cli/bench/http/deno_http_flash_spawn.js b/cli/bench/http/deno_http_flash_spawn.js
deleted file mode 100644
index e47acffc5..000000000
--- a/cli/bench/http/deno_http_flash_spawn.js
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-if (Deno.build.os !== "linux") {
- throw new Error("SO_REUSEPORT is only supported on Linux");
-}
-
-const executable = Deno.execPath();
-const path = new URL("./deno_http_flash.js", import.meta.url).pathname;
-// single flash instance runs on ~1.8 cores
-const cpus = navigator.hardwareConcurrency / 2;
-const processes = new Array(cpus);
-for (let i = 0; i < cpus; i++) {
- const proc = Deno.run({
- cmd: [executable, "run", "-A", "--unstable", path, Deno.args[0]],
- });
- processes.push(proc.status());
-}
-await Promise.all(processes);
diff --git a/cli/bench/http/deno_http_flash.js b/cli/bench/http/deno_http_serve.js
index a0db62630..a0db62630 100644
--- a/cli/bench/http/deno_http_flash.js
+++ b/cli/bench/http/deno_http_serve.js
diff --git a/cli/build.rs b/cli/build.rs
index 2a7327a90..ddd942593 100644
--- a/cli/build.rs
+++ b/cli/build.rs
@@ -362,7 +362,6 @@ fn create_cli_snapshot(snapshot_path: PathBuf) {
deno_http::deno_http::init_ops(),
deno_io::deno_io::init_ops(Default::default()),
deno_fs::deno_fs::init_ops::<PermissionsContainer>(false),
- deno_flash::deno_flash::init_ops::<PermissionsContainer>(false), // No --unstable
deno_node::deno_node::init_ops::<deno_runtime::RuntimeNodeEnv>(None),
cli::init_ops_and_esm(), // NOTE: This needs to be init_ops_and_esm!
];
diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs
index b661e135a..1d70a9cb7 100644
--- a/cli/tests/integration/run_tests.rs
+++ b/cli/tests/integration/run_tests.rs
@@ -4230,13 +4230,6 @@ itest!(config_file_lock_true {
exit_code: 10,
});
-// TODO(bartlomieju): this test is flaky on CI, reenable it after debugging
-// // Check https://github.com/denoland/deno_std/issues/2882
-// itest!(flash_shutdown {
-// args: "run --unstable --allow-net run/flash_shutdown/main.ts",
-// exit_code: 0,
-// });
-
itest!(permission_args {
args: "run run/001_hello.js --allow-net",
output: "run/permission_args.out",
diff --git a/cli/tests/testdata/run/flash_shutdown/main.ts b/cli/tests/testdata/run/flash_shutdown/main.ts
deleted file mode 100644
index 5e0908efb..000000000
--- a/cli/tests/testdata/run/flash_shutdown/main.ts
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-// Deno.serve caused segfault with this example after #16383
-// refs:
-// - https://github.com/denoland/deno/pull/16383
-// - https://github.com/denoland/deno_std/issues/2882
-// - revert https://github.com/denoland/deno/pull/16610
-
-const ctl = new AbortController();
-Deno.serve(() =>
- new Promise((resolve) => {
- resolve(new Response(new TextEncoder().encode("ok")));
- ctl.abort();
- }), {
- signal: ctl.signal,
- async onListen({ port }) {
- const a = await fetch(`http://localhost:${port}`, {
- method: "POST",
- body: "",
- });
- await a.text();
- },
-});
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
deleted file mode 100644
index fe503ed05..000000000
--- a/ext/flash/01_http.js
+++ /dev/null
@@ -1,793 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-const core = globalThis.Deno.core;
-const ops = core.ops;
-const primordials = globalThis.__bootstrap.primordials;
-import { BlobPrototype } from "ext:deno_web/09_file.js";
-import { TcpConn } from "ext:deno_net/01_net.js";
-import { toInnerResponse } from "ext:deno_fetch/23_response.js";
-import { _flash, fromFlashRequest } from "ext:deno_fetch/23_request.js";
-import { Event } from "ext:deno_web/02_event.js";
-import {
- _state,
- getReadableStreamResourceBacking,
- ReadableStream,
- readableStreamClose,
- ReadableStreamPrototype,
-} from "ext:deno_web/06_streams.js";
-import {
- _eventLoop,
- _idleTimeoutDuration,
- _idleTimeoutTimeout,
- _protocol,
- _readyState,
- _rid,
- _serverHandleIdleTimeout,
- WebSocket,
-} from "ext:deno_websocket/01_websocket.js";
-import { _ws } from "ext:deno_http/01_http.js";
-const {
- ObjectPrototypeIsPrototypeOf,
- PromisePrototype,
- PromisePrototypeCatch,
- PromisePrototypeThen,
- SafePromiseAll,
- TypedArrayPrototypeGetByteLength,
- TypedArrayPrototypeGetSymbolToStringTag,
- TypedArrayPrototypeSet,
- TypedArrayPrototypeSubarray,
- TypeError,
- Uint8Array,
-} = primordials;
-
-const statusCodes = {
- 100: "Continue",
- 101: "Switching Protocols",
- 102: "Processing",
- 200: "OK",
- 201: "Created",
- 202: "Accepted",
- 203: "Non Authoritative Information",
- 204: "No Content",
- 205: "Reset Content",
- 206: "Partial Content",
- 207: "Multi-Status",
- 208: "Already Reported",
- 226: "IM Used",
- 300: "Multiple Choices",
- 301: "Moved Permanently",
- 302: "Found",
- 303: "See Other",
- 304: "Not Modified",
- 305: "Use Proxy",
- 307: "Temporary Redirect",
- 308: "Permanent Redirect",
- 400: "Bad Request",
- 401: "Unauthorized",
- 402: "Payment Required",
- 403: "Forbidden",
- 404: "Not Found",
- 405: "Method Not Allowed",
- 406: "Not Acceptable",
- 407: "Proxy Authentication Required",
- 408: "Request Timeout",
- 409: "Conflict",
- 410: "Gone",
- 411: "Length Required",
- 412: "Precondition Failed",
- 413: "Payload Too Large",
- 414: "URI Too Long",
- 415: "Unsupported Media Type",
- 416: "Range Not Satisfiable",
- 418: "I'm a teapot",
- 421: "Misdirected Request",
- 422: "Unprocessable Entity",
- 423: "Locked",
- 424: "Failed Dependency",
- 426: "Upgrade Required",
- 428: "Precondition Required",
- 429: "Too Many Requests",
- 431: "Request Header Fields Too Large",
- 451: "Unavailable For Legal Reasons",
- 500: "Internal Server Error",
- 501: "Not Implemented",
- 502: "Bad Gateway",
- 503: "Service Unavailable",
- 504: "Gateway Timeout",
- 505: "HTTP Version Not Supported",
- 506: "Variant Also Negotiates",
- 507: "Insufficient Storage",
- 508: "Loop Detected",
- 510: "Not Extended",
- 511: "Network Authentication Required",
-};
-
-const methods = {
- 0: "GET",
- 1: "HEAD",
- 2: "CONNECT",
- 3: "PUT",
- 4: "DELETE",
- 5: "OPTIONS",
- 6: "TRACE",
- 7: "POST",
- 8: "PATCH",
-};
-
-let dateInterval;
-let date;
-
-/**
- * Construct an HTTP response message.
- * All HTTP/1.1 messages consist of a start-line followed by a sequence
- * of octets.
- *
- * HTTP-message = start-line
- * *( header-field CRLF )
- * CRLF
- * [ message-body ]
- *
- * @param {keyof typeof methods} method
- * @param {keyof typeof statusCodes} status
- * @param {[name: string, value: string][]} headerList
- * @param {Uint8Array | string | null} body
- * @param {number} bodyLen
- * @param {boolean} earlyEnd
- * @returns {Uint8Array | string}
- */
-function http1Response(
- method,
- status,
- headerList,
- body,
- bodyLen,
- earlyEnd = false,
-) {
- // HTTP uses a "<major>.<minor>" numbering scheme
- // HTTP-version = HTTP-name "/" DIGIT "." DIGIT
- // HTTP-name = %x48.54.54.50 ; "HTTP", case-sensitive
- //
- // status-line = HTTP-version SP status-code SP reason-phrase CRLF
- // Date header: https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.1.2
- let str = `HTTP/1.1 ${status} ${statusCodes[status]}\r\nDate: ${date}\r\n`;
- for (let i = 0; i < headerList.length; ++i) {
- const { 0: name, 1: value } = headerList[i];
- // header-field = field-name ":" OWS field-value OWS
- str += `${name}: ${value}\r\n`;
- }
-
- // https://datatracker.ietf.org/doc/html/rfc7231#section-6.3.6
- if (status === 205 || status === 304) {
- // MUST NOT generate a payload in a 205 response.
- // indicate a zero-length body for the response by
- // including a Content-Length header field with a value of 0.
- str += "Content-Length: 0\r\n\r\n";
- return str;
- }
-
- // MUST NOT send Content-Length or Transfer-Encoding if status code is 1xx or 204.
- if (status === 204 || status < 200) {
- str += "\r\n";
- return str;
- }
-
- if (earlyEnd === true) {
- return str;
- }
-
- // null body status is validated by inititalizeAResponse in ext/fetch
- if (body !== null && body !== undefined) {
- str += `Content-Length: ${bodyLen}\r\n\r\n`;
- } else {
- str += "Transfer-Encoding: chunked\r\n\r\n";
- return str;
- }
-
- // A HEAD request.
- if (method === 1) return str;
-
- if (typeof body === "string") {
- str += body ?? "";
- } else {
- const head = core.encode(str);
- const response = new Uint8Array(
- TypedArrayPrototypeGetByteLength(head) + bodyLen,
- );
- TypedArrayPrototypeSet(response, head, 0);
- TypedArrayPrototypeSet(
- response,
- body,
- TypedArrayPrototypeGetByteLength(head),
- );
- return response;
- }
-
- return str;
-}
-
-function prepareFastCalls() {
- return ops.op_flash_make_request();
-}
-
-function hostnameForDisplay(hostname) {
- // If the hostname is "0.0.0.0", we display "localhost" in console
- // because browsers in Windows don't resolve "0.0.0.0".
- // See the discussion in https://github.com/denoland/deno_std/issues/1165
- return hostname === "0.0.0.0" ? "localhost" : hostname;
-}
-
-function writeFixedResponse(
- server,
- requestId,
- response,
- responseLen,
- end,
- respondFast,
-) {
- let nwritten = 0;
- // TypedArray
- if (typeof response !== "string") {
- nwritten = respondFast(requestId, response, end);
- } else {
- // string
- nwritten = ops.op_flash_respond(
- server,
- requestId,
- response,
- end,
- );
- }
-
- if (nwritten < responseLen) {
- core.opAsync(
- "op_flash_respond_async",
- server,
- requestId,
- response.slice(nwritten),
- end,
- );
- }
-}
-
-// TODO(@littledivy): Woah woah, cut down the number of arguments.
-async function handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- tryRespondChunked,
-) {
- // there might've been an HTTP upgrade.
- if (resp === undefined) {
- return;
- }
- const innerResp = toInnerResponse(resp);
- // If response body length is known, it will be sent synchronously in a
- // single op, in other case a "response body" resource will be created and
- // we'll be streaming it.
- /** @type {ReadableStream<Uint8Array> | Uint8Array | string | null} */
- let respBody = null;
- let isStreamingResponseBody = false;
- if (innerResp.body !== null) {
- if (typeof innerResp.body.streamOrStatic?.body === "string") {
- if (innerResp.body.streamOrStatic.consumed === true) {
- throw new TypeError("Body is unusable.");
- }
- innerResp.body.streamOrStatic.consumed = true;
- respBody = innerResp.body.streamOrStatic.body;
- isStreamingResponseBody = false;
- } else if (
- ObjectPrototypeIsPrototypeOf(
- ReadableStreamPrototype,
- innerResp.body.streamOrStatic,
- )
- ) {
- if (innerResp.body.unusable()) {
- throw new TypeError("Body is unusable.");
- }
- if (
- innerResp.body.length === null ||
- ObjectPrototypeIsPrototypeOf(
- BlobPrototype,
- innerResp.body.source,
- )
- ) {
- respBody = innerResp.body.stream;
- } else {
- const reader = innerResp.body.stream.getReader();
- const r1 = await reader.read();
- if (r1.done) {
- respBody = new Uint8Array(0);
- } else {
- respBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
- }
- }
- isStreamingResponseBody = !(
- typeof respBody === "string" ||
- TypedArrayPrototypeGetSymbolToStringTag(respBody) === "Uint8Array"
- );
- } else {
- if (innerResp.body.streamOrStatic.consumed === true) {
- throw new TypeError("Body is unusable.");
- }
- innerResp.body.streamOrStatic.consumed = true;
- respBody = innerResp.body.streamOrStatic.body;
- }
- } else {
- respBody = new Uint8Array(0);
- }
-
- const ws = resp[_ws];
- if (isStreamingResponseBody === false) {
- const length = typeof respBody === "string"
- ? core.byteLength(respBody)
- : TypedArrayPrototypeGetByteLength(respBody);
- const responseStr = http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- respBody,
- length,
- );
- // A HEAD request always ignores body, but includes the correct content-length size.
- const responseLen = method === 1 ? core.byteLength(responseStr) : length;
- writeFixedResponse(
- serverId,
- i,
- responseStr,
- responseLen,
- !ws, // Don't close socket if there is a deferred websocket upgrade.
- respondFast,
- );
- }
-
- return (async () => {
- if (!ws) {
- if (hasBody && body[_state] !== "closed") {
- // TODO(@littledivy): Optimize by draining in a single op.
- try {
- await req.arrayBuffer();
- } catch { /* pass */ }
- }
- }
-
- if (isStreamingResponseBody === true) {
- const resourceBacking = getReadableStreamResourceBacking(respBody);
- if (resourceBacking) {
- if (respBody.locked) {
- throw new TypeError("ReadableStream is locked.");
- }
- const reader = respBody.getReader(); // Aquire JS lock.
- try {
- PromisePrototypeThen(
- core.opAsync(
- "op_flash_write_resource",
- http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- null,
- 0, // Content-Length will be set by the op.
- true,
- ),
- serverId,
- i,
- resourceBacking.rid,
- resourceBacking.autoClose,
- ),
- () => {
- // Release JS lock.
- readableStreamClose(respBody);
- },
- );
- } catch (error) {
- await reader.cancel(error);
- throw error;
- }
- } else {
- const reader = respBody.getReader();
-
- // Best case: sends headers + first chunk in a single go.
- const { value, done } = await reader.read();
- writeFixedResponse(
- serverId,
- i,
- http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- null,
- // deno-lint-ignore prefer-primordials
- respBody.byteLength,
- ),
- // deno-lint-ignore prefer-primordials
- respBody.byteLength,
- false,
- respondFast,
- );
-
- await tryRespondChunked(
- i,
- value,
- done,
- );
-
- if (!done) {
- while (true) {
- const chunk = await reader.read();
- await respondChunked(
- i,
- chunk.value,
- chunk.done,
- );
- if (chunk.done) break;
- }
- }
- }
- }
-
- if (ws) {
- const wsRid = await core.opAsync(
- "op_flash_upgrade_websocket",
- serverId,
- i,
- );
- ws[_rid] = wsRid;
- ws[_protocol] = resp.headers.get("sec-websocket-protocol");
-
- ws[_readyState] = WebSocket.OPEN;
- const event = new Event("open");
- ws.dispatchEvent(event);
-
- ws[_eventLoop]();
- if (ws[_idleTimeoutDuration]) {
- ws.addEventListener(
- "close",
- () => clearTimeout(ws[_idleTimeoutTimeout]),
- );
- }
- ws[_serverHandleIdleTimeout]();
- }
- })();
-}
-
-function createServe(opFn) {
- return async function serve(arg1, arg2) {
- let options = undefined;
- let handler = undefined;
- if (typeof arg1 === "function") {
- handler = arg1;
- options = arg2;
- } else if (typeof arg2 === "function") {
- handler = arg2;
- options = arg1;
- } else {
- options = arg1;
- }
- if (handler === undefined) {
- if (options === undefined) {
- throw new TypeError(
- "No handler was provided, so an options bag is mandatory.",
- );
- }
- handler = options.handler;
- }
- if (typeof handler !== "function") {
- throw new TypeError("A handler function must be provided.");
- }
- if (options === undefined) {
- options = {};
- }
-
- const signal = options.signal;
-
- const onError = options.onError ?? function (error) {
- console.error(error);
- return new Response("Internal Server Error", { status: 500 });
- };
-
- const onListen = options.onListen ?? function ({ port }) {
- console.log(
- `Listening on http://${
- hostnameForDisplay(listenOpts.hostname)
- }:${port}/`,
- );
- };
-
- const listenOpts = {
- hostname: options.hostname ?? "127.0.0.1",
- port: options.port ?? 9000,
- reuseport: options.reusePort ?? false,
- };
- if (options.cert || options.key) {
- if (!options.cert || !options.key) {
- throw new TypeError(
- "Both cert and key must be provided to enable HTTPS.",
- );
- }
- listenOpts.cert = options.cert;
- listenOpts.key = options.key;
- }
-
- const serverId = opFn(listenOpts);
- const serverPromise = core.opAsync("op_flash_drive_server", serverId);
-
- PromisePrototypeCatch(
- PromisePrototypeThen(
- core.opAsync("op_flash_wait_for_listening", serverId),
- (port) => {
- onListen({ hostname: listenOpts.hostname, port });
- },
- ),
- () => {},
- );
- const finishedPromise = PromisePrototypeCatch(serverPromise, () => {});
-
- const server = {
- id: serverId,
- transport: listenOpts.cert && listenOpts.key ? "https" : "http",
- hostname: listenOpts.hostname,
- port: listenOpts.port,
- closed: false,
- finished: finishedPromise,
- async close() {
- if (server.closed) {
- return;
- }
- server.closed = true;
- await core.opAsync("op_flash_close_server", serverId);
- await server.finished;
- },
- async serve() {
- let offset = 0;
- while (true) {
- if (server.closed) {
- break;
- }
-
- let tokens = nextRequestSync();
- if (tokens === 0) {
- tokens = await core.opAsync("op_flash_next_async", serverId);
- if (server.closed) {
- break;
- }
- }
-
- for (let i = offset; i < offset + tokens; i++) {
- let body = null;
- // There might be a body, but we don't expose it for GET/HEAD requests.
- // It will be closed automatically once the request has been handled and
- // the response has been sent.
- const method = getMethodSync(i);
- let hasBody = method > 2; // Not GET/HEAD/CONNECT
- if (hasBody) {
- body = createRequestBodyStream(serverId, i);
- if (body === null) {
- hasBody = false;
- }
- }
-
- const req = fromFlashRequest(
- serverId,
- /* streamRid */
- i,
- body,
- /* methodCb */
- () => methods[method],
- /* urlCb */
- () => {
- const path = ops.op_flash_path(serverId, i);
- return `${server.transport}://${server.hostname}:${server.port}${path}`;
- },
- /* headersCb */
- () => ops.op_flash_headers(serverId, i),
- );
-
- let resp;
- let remoteAddr;
- try {
- resp = handler(req, {
- get remoteAddr() {
- if (!remoteAddr) {
- const { 0: hostname, 1: port } = core.ops.op_flash_addr(
- serverId,
- i,
- );
- remoteAddr = { hostname, port };
- }
- return remoteAddr;
- },
- });
- if (ObjectPrototypeIsPrototypeOf(PromisePrototype, resp)) {
- PromisePrototypeCatch(
- PromisePrototypeThen(
- resp,
- (resp) =>
- handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- tryRespondChunked,
- ),
- ),
- onError,
- );
- } else if (typeof resp?.then === "function") {
- resp.then((resp) =>
- handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- tryRespondChunked,
- )
- ).catch(onError);
- } else {
- handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- tryRespondChunked,
- ).catch(onError);
- }
- } catch (e) {
- resp = await onError(e);
- }
- }
-
- offset += tokens;
- }
- await server.finished;
- },
- };
-
- signal?.addEventListener("abort", () => {
- clearInterval(dateInterval);
- PromisePrototypeThen(server.close(), () => {}, () => {});
- }, {
- once: true,
- });
-
- function tryRespondChunked(token, chunk, shutdown) {
- const nwritten = ops.op_try_flash_respond_chunked(
- serverId,
- token,
- chunk ?? new Uint8Array(),
- shutdown,
- );
- if (nwritten > 0) {
- return core.opAsync(
- "op_flash_respond_chunked",
- serverId,
- token,
- chunk,
- shutdown,
- nwritten,
- );
- }
- }
-
- function respondChunked(token, chunk, shutdown) {
- return core.opAsync(
- "op_flash_respond_chunked",
- serverId,
- token,
- chunk,
- shutdown,
- );
- }
-
- const fastOp = prepareFastCalls();
- let nextRequestSync = () => fastOp.nextRequest();
- let getMethodSync = (token) => fastOp.getMethod(token);
- let respondFast = (token, response, shutdown) =>
- fastOp.respond(token, response, shutdown);
- if (serverId > 0) {
- nextRequestSync = () => ops.op_flash_next_server(serverId);
- getMethodSync = (token) => ops.op_flash_method(serverId, token);
- respondFast = (token, response, shutdown) =>
- ops.op_flash_respond(serverId, token, response, null, shutdown);
- }
-
- if (!dateInterval) {
- date = new Date().toUTCString();
- dateInterval = setInterval(() => {
- date = new Date().toUTCString();
- }, 1000);
- }
-
- await SafePromiseAll([
- PromisePrototypeCatch(server.serve(), console.error),
- serverPromise,
- ]);
- };
-}
-
-function createRequestBodyStream(serverId, token) {
- // The first packet is left over bytes after parsing the request
- const firstRead = ops.op_flash_first_packet(
- serverId,
- token,
- );
- if (!firstRead) return null;
- let firstEnqueued = TypedArrayPrototypeGetByteLength(firstRead) === 0;
-
- return new ReadableStream({
- type: "bytes",
- async pull(controller) {
- try {
- if (firstEnqueued === false) {
- controller.enqueue(firstRead);
- firstEnqueued = true;
- return;
- }
- // This is the largest possible size for a single packet on a TLS
- // stream.
- const chunk = new Uint8Array(16 * 1024 + 256);
- const read = await core.opAsync(
- "op_flash_read_body",
- serverId,
- token,
- chunk,
- );
- if (read > 0) {
- // We read some data. Enqueue it onto the stream.
- controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
- } else {
- // We have reached the end of the body, so we close the stream.
- controller.close();
- }
- } catch (err) {
- // There was an error while reading a chunk of the body, so we
- // error.
- controller.error(err);
- controller.close();
- }
- },
- });
-}
-
-function upgradeHttpRaw(req) {
- if (!req[_flash]) {
- throw new TypeError(
- "Non-flash requests can not be upgraded with `upgradeHttpRaw`. Use `upgradeHttp` instead.",
- );
- }
-
- // NOTE(bartlomieju):
- // Access these fields so they are cached on `req` object, otherwise
- // they wouldn't be available after the connection gets upgraded.
- req.url;
- req.method;
- req.headers;
-
- const { serverId, streamRid } = req[_flash];
- const connRid = ops.op_flash_upgrade_http(streamRid, serverId);
- // TODO(@littledivy): return already read first packet too.
- return [new TcpConn(connRid), new Uint8Array()];
-}
-
-export { createServe, upgradeHttpRaw };
diff --git a/ext/flash/Cargo.toml b/ext/flash/Cargo.toml
deleted file mode 100644
index 2bde23826..000000000
--- a/ext/flash/Cargo.toml
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-[package]
-name = "deno_flash"
-version = "0.33.0"
-authors.workspace = true
-edition.workspace = true
-license.workspace = true
-readme = "README.md"
-repository.workspace = true
-description = "Fast HTTP/1 server implementation for Deno"
-
-[lib]
-path = "lib.rs"
-
-[dependencies]
-deno_core.workspace = true
-deno_tls.workspace = true
-# For HTTP/2 and websocket upgrades
-deno_websocket.workspace = true
-http.workspace = true
-httparse = "1.8"
-libc.workspace = true
-log.workspace = true
-mio = { version = "0.8.1", features = ["os-poll", "net"] }
-rustls.workspace = true
-rustls-pemfile.workspace = true
-serde.workspace = true
-socket2.workspace = true
-tokio.workspace = true
diff --git a/ext/flash/README.md b/ext/flash/README.md
deleted file mode 100644
index bc3c12065..000000000
--- a/ext/flash/README.md
+++ /dev/null
@@ -1,7 +0,0 @@
-# flash
-
-Flash is a fast HTTP/1.1 server implementation for Deno.
-
-```js
-serve({ fetch: (req) => new Response("Hello World") });
-```
diff --git a/ext/flash/chunked.rs b/ext/flash/chunked.rs
deleted file mode 100644
index 711dd717d..000000000
--- a/ext/flash/chunked.rs
+++ /dev/null
@@ -1,273 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-//
-// Based on https://github.com/frewsxcv/rust-chunked-transfer/blob/5c08614458580f9e7a85124021006d83ce1ed6e9/src/decoder.rs
-// Copyright 2015 The tiny-http Contributors
-// Copyright 2015 The rust-chunked-transfer Contributors
-
-use std::error::Error;
-use std::fmt;
-use std::io::Error as IoError;
-use std::io::ErrorKind;
-use std::io::Read;
-use std::io::Result as IoResult;
-
-pub struct Decoder<R> {
- pub source: R,
-
- // remaining size of the chunk being read
- // none if we are not in a chunk
- pub remaining_chunks_size: Option<usize>,
- pub end: bool,
-}
-
-impl<R> Decoder<R>
-where
- R: Read,
-{
- pub fn new(source: R, remaining_chunks_size: Option<usize>) -> Decoder<R> {
- Decoder {
- source,
- remaining_chunks_size,
- end: false,
- }
- }
-
- fn read_chunk_size(&mut self) -> IoResult<usize> {
- let mut chunk_size_bytes = Vec::new();
- let mut has_ext = false;
-
- loop {
- let byte = match self.source.by_ref().bytes().next() {
- Some(b) => b?,
- None => {
- return Err(IoError::new(ErrorKind::InvalidInput, DecoderError))
- }
- };
-
- if byte == b'\r' {
- break;
- }
-
- if byte == b';' {
- has_ext = true;
- break;
- }
-
- chunk_size_bytes.push(byte);
- }
-
- // Ignore extensions for now
- if has_ext {
- loop {
- let byte = match self.source.by_ref().bytes().next() {
- Some(b) => b?,
- None => {
- return Err(IoError::new(ErrorKind::InvalidInput, DecoderError))
- }
- };
- if byte == b'\r' {
- break;
- }
- }
- }
-
- self.read_line_feed()?;
-
- let chunk_size = String::from_utf8(chunk_size_bytes)
- .ok()
- .and_then(|c| usize::from_str_radix(c.trim(), 16).ok())
- .ok_or_else(|| IoError::new(ErrorKind::InvalidInput, DecoderError))?;
-
- Ok(chunk_size)
- }
-
- fn read_carriage_return(&mut self) -> IoResult<()> {
- match self.source.by_ref().bytes().next() {
- Some(Ok(b'\r')) => Ok(()),
- _ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)),
- }
- }
-
- fn read_line_feed(&mut self) -> IoResult<()> {
- match self.source.by_ref().bytes().next() {
- Some(Ok(b'\n')) => Ok(()),
- _ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)),
- }
- }
-}
-
-impl<R> Read for Decoder<R>
-where
- R: Read,
-{
- fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
- let remaining_chunks_size = match self.remaining_chunks_size {
- Some(c) => c,
- None => {
- // first possibility: we are not in a chunk, so we'll attempt to determine
- // the chunks size
- let chunk_size = self.read_chunk_size()?;
-
- // if the chunk size is 0, we are at EOF
- if chunk_size == 0 {
- self.read_carriage_return()?;
- self.read_line_feed()?;
- self.end = true;
- return Ok(0);
- }
-
- chunk_size
- }
- };
-
- // second possibility: we continue reading from a chunk
- if buf.len() < remaining_chunks_size {
- let read = self.source.read(buf)?;
- self.remaining_chunks_size = Some(remaining_chunks_size - read);
- return Ok(read);
- }
-
- // third possibility: the read request goes further than the current chunk
- // we simply read until the end of the chunk and return
- let buf = &mut buf[..remaining_chunks_size];
- let read = self.source.read(buf)?;
- self.remaining_chunks_size = if read == remaining_chunks_size {
- self.read_carriage_return()?;
- self.read_line_feed()?;
- None
- } else {
- Some(remaining_chunks_size - read)
- };
-
- Ok(read)
- }
-}
-
-#[derive(Debug, Copy, Clone)]
-struct DecoderError;
-
-impl fmt::Display for DecoderError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
- write!(fmt, "Error while decoding chunks")
- }
-}
-
-impl Error for DecoderError {
- fn description(&self) -> &str {
- "Error while decoding chunks"
- }
-}
-
-#[cfg(test)]
-mod test {
- use super::Decoder;
- use std::io;
- use std::io::Read;
-
- /// This unit test is taken from from Hyper
- /// https://github.com/hyperium/hyper
- /// Copyright (c) 2014 Sean McArthur
- #[test]
- fn test_read_chunk_size() {
- fn read(s: &str, expected: usize) {
- let mut decoded = Decoder::new(s.as_bytes(), None);
- let actual = decoded.read_chunk_size().unwrap();
- assert_eq!(expected, actual);
- }
-
- fn read_err(s: &str) {
- let mut decoded = Decoder::new(s.as_bytes(), None);
- let err_kind = decoded.read_chunk_size().unwrap_err().kind();
- assert_eq!(err_kind, io::ErrorKind::InvalidInput);
- }
-
- read("1\r\n", 1);
- read("01\r\n", 1);
- read("0\r\n", 0);
- read("00\r\n", 0);
- read("A\r\n", 10);
- read("a\r\n", 10);
- read("Ff\r\n", 255);
- read("Ff \r\n", 255);
- // Missing LF or CRLF
- read_err("F\rF");
- read_err("F");
- // Invalid hex digit
- read_err("X\r\n");
- read_err("1X\r\n");
- read_err("-\r\n");
- read_err("-1\r\n");
- // Acceptable (if not fully valid) extensions do not influence the size
- read("1;extension\r\n", 1);
- read("a;ext name=value\r\n", 10);
- read("1;extension;extension2\r\n", 1);
- read("1;;; ;\r\n", 1);
- read("2; extension...\r\n", 2);
- read("3 ; extension=123\r\n", 3);
- read("3 ;\r\n", 3);
- read("3 ; \r\n", 3);
- // Invalid extensions cause an error
- read_err("1 invalid extension\r\n");
- read_err("1 A\r\n");
- read_err("1;no CRLF");
- }
-
- #[test]
- fn test_valid_chunk_decode() {
- let source = io::Cursor::new(
- "3\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n\r\n"
- .to_string()
- .into_bytes(),
- );
- let mut decoded = Decoder::new(source, None);
-
- let mut string = String::new();
- decoded.read_to_string(&mut string).unwrap();
-
- assert_eq!(string, "hello world!!!");
- }
-
- #[test]
- fn test_decode_zero_length() {
- let mut decoder = Decoder::new(b"0\r\n\r\n" as &[u8], None);
-
- let mut decoded = String::new();
- decoder.read_to_string(&mut decoded).unwrap();
-
- assert_eq!(decoded, "");
- }
-
- #[test]
- fn test_decode_invalid_chunk_length() {
- let mut decoder = Decoder::new(b"m\r\n\r\n" as &[u8], None);
-
- let mut decoded = String::new();
- assert!(decoder.read_to_string(&mut decoded).is_err());
- }
-
- #[test]
- fn invalid_input1() {
- let source = io::Cursor::new(
- "2\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n"
- .to_string()
- .into_bytes(),
- );
- let mut decoded = Decoder::new(source, None);
-
- let mut string = String::new();
- assert!(decoded.read_to_string(&mut string).is_err());
- }
-
- #[test]
- fn invalid_input2() {
- let source = io::Cursor::new(
- "3\rhel\r\nb\r\nlo world!!!\r\n0\r\n"
- .to_string()
- .into_bytes(),
- );
- let mut decoded = Decoder::new(source, None);
-
- let mut string = String::new();
- assert!(decoded.read_to_string(&mut string).is_err());
- }
-}
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
deleted file mode 100644
index 647358a4f..000000000
--- a/ext/flash/lib.rs
+++ /dev/null
@@ -1,1543 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-// False positive lint for explicit drops.
-// https://github.com/rust-lang/rust-clippy/issues/6446
-#![allow(clippy::await_holding_lock)]
-// https://github.com/rust-lang/rust-clippy/issues/6353
-#![allow(clippy::await_holding_refcell_ref)]
-
-use deno_core::error::generic_error;
-use deno_core::error::type_error;
-use deno_core::error::AnyError;
-use deno_core::op;
-use deno_core::serde_v8;
-use deno_core::v8;
-use deno_core::v8::fast_api;
-use deno_core::ByteString;
-use deno_core::CancelFuture;
-use deno_core::CancelHandle;
-use deno_core::OpState;
-use deno_core::StringOrBuffer;
-use deno_core::ZeroCopyBuf;
-use deno_core::V8_WRAPPER_OBJECT_INDEX;
-use deno_tls::load_certs;
-use deno_tls::load_private_keys;
-use http::header::CONNECTION;
-use http::header::CONTENT_LENGTH;
-use http::header::EXPECT;
-use http::header::TRANSFER_ENCODING;
-use http::HeaderName;
-use http::HeaderValue;
-use log::trace;
-use mio::net::TcpListener;
-use mio::Events;
-use mio::Interest;
-use mio::Poll;
-use mio::Token;
-use serde::Deserialize;
-use serde::Serialize;
-use socket2::Socket;
-use std::cell::RefCell;
-use std::cell::UnsafeCell;
-use std::collections::HashMap;
-use std::ffi::c_void;
-use std::future::Future;
-use std::intrinsics::transmute;
-use std::io::BufReader;
-use std::io::Read;
-use std::io::Write;
-use std::mem::replace;
-use std::net::SocketAddr;
-use std::net::ToSocketAddrs;
-use std::pin::Pin;
-use std::rc::Rc;
-use std::sync::Arc;
-use std::sync::Mutex;
-use std::task::Context;
-use std::time::Duration;
-use tokio::sync::mpsc;
-use tokio::task::JoinHandle;
-
-mod chunked;
-mod request;
-#[cfg(unix)]
-mod sendfile;
-mod socket;
-
-use request::InnerRequest;
-use request::Request;
-use socket::InnerStream;
-use socket::Stream;
-
-pub struct FlashContext {
- next_server_id: u32,
- join_handles: HashMap<u32, JoinHandle<Result<(), AnyError>>>,
- pub servers: HashMap<u32, ServerContext>,
-}
-
-pub struct ServerContext {
- _addr: SocketAddr,
- tx: mpsc::Sender<Request>,
- rx: mpsc::Receiver<Request>,
- requests: HashMap<u32, Request>,
- next_token: u32,
- listening_rx: Option<mpsc::Receiver<u16>>,
- close_tx: mpsc::Sender<()>,
- cancel_handle: Rc<CancelHandle>,
-}
-
-#[derive(Debug, Eq, PartialEq)]
-pub enum ParseStatus {
- None,
- Ongoing(usize),
-}
-
-#[op]
-fn op_flash_respond(
- op_state: &mut OpState,
- server_id: u32,
- token: u32,
- response: StringOrBuffer,
- shutdown: bool,
-) -> u32 {
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- flash_respond(ctx, token, shutdown, &response)
-}
-
-#[op(fast)]
-fn op_try_flash_respond_chunked(
- op_state: &mut OpState,
- server_id: u32,
- token: u32,
- response: &[u8],
- shutdown: bool,
-) -> u32 {
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- let tx = ctx.requests.get(&token).unwrap();
- let sock = tx.socket();
-
- // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
- // https://github.com/denoland/deno/pull/15629
- let h = format!("{:x}\r\n", response.len());
-
- let concat = [h.as_bytes(), response, b"\r\n"].concat();
- let expected = sock.try_write(&concat);
- if expected != concat.len() {
- if expected > 2 {
- return expected as u32;
- }
- return expected as u32;
- }
-
- if shutdown {
- // Best case: We've written everything and the stream is done too.
- let _ = ctx.requests.remove(&token).unwrap();
- }
- 0
-}
-
-#[op]
-async fn op_flash_respond_async(
- state: Rc<RefCell<OpState>>,
- server_id: u32,
- token: u32,
- response: StringOrBuffer,
- shutdown: bool,
-) -> Result<(), AnyError> {
- trace!("op_flash_respond_async");
-
- let mut close = false;
- let sock = {
- let mut op_state = state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
-
- match shutdown {
- true => {
- let tx = ctx.requests.remove(&token).unwrap();
- close = !tx.keep_alive;
- tx.socket()
- }
- // In case of a websocket upgrade or streaming response.
- false => {
- let tx = ctx.requests.get(&token).unwrap();
- tx.socket()
- }
- }
- };
-
- sock
- .with_async_stream(|stream| {
- Box::pin(async move {
- Ok(tokio::io::AsyncWriteExt::write(stream, &response).await?)
- })
- })
- .await?;
- // server is done writing and request doesn't want to kept alive.
- if shutdown && close {
- sock.shutdown();
- }
- Ok(())
-}
-
-#[op]
-async fn op_flash_respond_chunked(
- op_state: Rc<RefCell<OpState>>,
- server_id: u32,
- token: u32,
- response: Option<ZeroCopyBuf>,
- shutdown: bool,
- nwritten: u32,
-) -> Result<(), AnyError> {
- let mut op_state = op_state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- let sock = match shutdown {
- true => {
- let tx = ctx.requests.remove(&token).unwrap();
- tx.socket()
- }
- // In case of a websocket upgrade or streaming response.
- false => {
- let tx = ctx.requests.get(&token).unwrap();
- tx.socket()
- }
- };
-
- drop(op_state);
- sock
- .with_async_stream(|stream| {
- Box::pin(async move {
- use tokio::io::AsyncWriteExt;
- // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
- // https://github.com/denoland/deno/pull/15629
- macro_rules! write_whats_not_written {
- ($e:expr) => {
- let e = $e;
- let n = nwritten as usize;
- if n < e.len() {
- stream.write_all(&e[n..]).await?;
- }
- };
- }
- if let Some(response) = response {
- let h = format!("{:x}\r\n", response.len());
- write_whats_not_written!(h.as_bytes());
- write_whats_not_written!(&response);
- write_whats_not_written!(b"\r\n");
- }
-
- // The last chunk
- if shutdown {
- write_whats_not_written!(b"0\r\n\r\n");
- }
-
- Ok(())
- })
- })
- .await?;
- Ok(())
-}
-
-#[op]
-async fn op_flash_write_resource(
- op_state: Rc<RefCell<OpState>>,
- response: StringOrBuffer,
- server_id: u32,
- token: u32,
- resource_id: deno_core::ResourceId,
- auto_close: bool,
-) -> Result<(), AnyError> {
- let (resource, sock) = {
- let op_state = &mut op_state.borrow_mut();
- let resource = if auto_close {
- op_state.resource_table.take_any(resource_id)?
- } else {
- op_state.resource_table.get_any(resource_id)?
- };
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- (resource, ctx.requests.remove(&token).unwrap().socket())
- };
-
- let _ = sock.write(&response);
-
- #[cfg(unix)]
- {
- use std::os::unix::io::AsRawFd;
- if let InnerStream::Tcp(stream_handle) = &sock.inner {
- let stream_handle = stream_handle.as_raw_fd();
- if let Some(fd) = resource.clone().backing_fd() {
- // SAFETY: all-zero byte-pattern is a valid value for libc::stat.
- let mut stat: libc::stat = unsafe { std::mem::zeroed() };
- // SAFETY: call to libc::fstat.
- if unsafe { libc::fstat(fd, &mut stat) } >= 0 {
- let _ = sock.write(
- format!("Content-Length: {}\r\n\r\n", stat.st_size).as_bytes(),
- );
- let tx = sendfile::SendFile {
- io: (fd, stream_handle),
- written: 0,
- };
- tx.await?;
- return Ok(());
- }
- }
- }
- }
-
- sock
- .with_async_stream(|stream| {
- Box::pin(async move {
- use tokio::io::AsyncWriteExt;
- stream
- .write_all(b"Transfer-Encoding: chunked\r\n\r\n")
- .await?;
- loop {
- let view = resource.clone().read(64 * 1024).await?; // 64KB
- if view.is_empty() {
- stream.write_all(b"0\r\n\r\n").await?;
- break;
- }
- // TODO(@littledivy): use vectored writes.
- stream
- .write_all(format!("{:x}\r\n", view.len()).as_bytes())
- .await?;
- stream.write_all(&view).await?;
- stream.write_all(b"\r\n").await?;
- }
- resource.close();
- Ok(())
- })
- })
- .await?;
- Ok(())
-}
-
-pub const RESPOND_FAST: fast_api::FastFunction = fast_api::FastFunction::new(
- &[
- fast_api::Type::V8Value,
- fast_api::Type::Uint32,
- fast_api::Type::TypedArray(fast_api::CType::Uint8),
- fast_api::Type::Bool,
- ],
- fast_api::CType::Uint32,
- op_flash_respond_fast as *const c_void,
-);
-
-fn flash_respond(
- ctx: &mut ServerContext,
- token: u32,
- shutdown: bool,
- response: &[u8],
-) -> u32 {
- let tx = ctx.requests.get(&token).unwrap();
- let sock = tx.socket();
-
- sock.read_tx.take();
- sock.read_rx.take();
-
- let nwritten = sock.try_write(response);
-
- if shutdown && nwritten == response.len() {
- if !tx.keep_alive {
- sock.shutdown();
- }
- ctx.requests.remove(&token).unwrap();
- }
-
- nwritten as u32
-}
-
-unsafe fn op_flash_respond_fast(
- recv: v8::Local<v8::Object>,
- token: u32,
- response: *const fast_api::FastApiTypedArray<u8>,
- shutdown: bool,
-) -> u32 {
- let ptr =
- recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
- let ctx = &mut *(ptr as *mut ServerContext);
-
- let response = &*response;
- // Uint8Array is always byte-aligned.
- let response = response.get_storage_if_aligned().unwrap_unchecked();
- flash_respond(ctx, token, shutdown, response)
-}
-
-macro_rules! get_request {
- ($op_state: ident, $token: ident) => {
- get_request!($op_state, 0, $token)
- };
- ($op_state: ident, $server_id: expr, $token: ident) => {{
- let flash_ctx = $op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap();
- ctx.requests.get_mut(&$token).unwrap()
- }};
-}
-
-#[repr(u32)]
-pub enum Method {
- GET = 0,
- HEAD,
- CONNECT,
- PUT,
- DELETE,
- OPTIONS,
- TRACE,
- POST,
- PATCH,
-}
-
-#[inline]
-fn get_method(req: &mut Request) -> u32 {
- let method = match req.method() {
- "GET" => Method::GET,
- "POST" => Method::POST,
- "PUT" => Method::PUT,
- "DELETE" => Method::DELETE,
- "OPTIONS" => Method::OPTIONS,
- "HEAD" => Method::HEAD,
- "PATCH" => Method::PATCH,
- "TRACE" => Method::TRACE,
- "CONNECT" => Method::CONNECT,
- _ => Method::GET,
- };
- method as u32
-}
-
-#[op]
-fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 {
- let req = get_request!(state, server_id, token);
- get_method(req)
-}
-
-#[op]
-async fn op_flash_close_server(state: Rc<RefCell<OpState>>, server_id: u32) {
- let close_tx = {
- let mut op_state = state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- ctx.cancel_handle.cancel();
- ctx.close_tx.clone()
- };
- let _ = close_tx.send(()).await;
-}
-
-#[op]
-fn op_flash_path(
- state: Rc<RefCell<OpState>>,
- server_id: u32,
- token: u32,
-) -> String {
- let mut op_state = state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- ctx
- .requests
- .get(&token)
- .unwrap()
- .inner
- .req
- .path
- .unwrap()
- .to_string()
-}
-
-#[inline]
-fn next_request_sync(ctx: &mut ServerContext) -> u32 {
- let offset = ctx.next_token;
-
- while let Ok(token) = ctx.rx.try_recv() {
- ctx.requests.insert(ctx.next_token, token);
- ctx.next_token += 1;
- }
-
- ctx.next_token - offset
-}
-
-const NEXT_REQUEST_FAST: fast_api::FastFunction = fast_api::FastFunction::new(
- &[fast_api::Type::V8Value],
- fast_api::CType::Uint32,
- op_flash_next_fast as *const c_void,
-);
-
-unsafe fn op_flash_next_fast(recv: v8::Local<v8::Object>) -> u32 {
- let ptr =
- recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
- let ctx = &mut *(ptr as *mut ServerContext);
- next_request_sync(ctx)
-}
-
-const GET_METHOD_FAST: fast_api::FastFunction = fast_api::FastFunction::new(
- &[fast_api::Type::V8Value, fast_api::Type::Uint32],
- fast_api::CType::Uint32,
- op_flash_get_method_fast as *const c_void,
-);
-
-unsafe fn op_flash_get_method_fast(
- recv: v8::Local<v8::Object>,
- token: u32,
-) -> u32 {
- let ptr =
- recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
- let ctx = &mut *(ptr as *mut ServerContext);
- let req = ctx.requests.get_mut(&token).unwrap();
- get_method(req)
-}
-
-// Fast calls
-#[op(v8)]
-fn op_flash_make_request<'scope>(
- scope: &mut v8::HandleScope<'scope>,
- state: &mut OpState,
-) -> serde_v8::Value<'scope> {
- let object_template = v8::ObjectTemplate::new(scope);
- assert!(object_template
- .set_internal_field_count((V8_WRAPPER_OBJECT_INDEX + 1) as usize));
- let obj = object_template.new_instance(scope).unwrap();
- let ctx = {
- let flash_ctx = state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&0).unwrap();
- ctx as *mut ServerContext
- };
- obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _);
-
- // nextRequest
- {
- let builder = v8::FunctionTemplate::builder(
- |_: &mut v8::HandleScope,
- args: v8::FunctionCallbackArguments,
- mut rv: v8::ReturnValue| {
- let external: v8::Local<v8::External> = args.data().try_into().unwrap();
- // SAFETY: This external is guaranteed to be a pointer to a ServerContext
- let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
- rv.set_uint32(next_request_sync(ctx));
- },
- )
- .data(v8::External::new(scope, ctx as *mut _).into());
-
- let func = builder.build_fast(scope, &NEXT_REQUEST_FAST, None, None, None);
- let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
-
- let key = v8::String::new(scope, "nextRequest").unwrap();
- obj.set(scope, key.into(), func).unwrap();
- }
-
- // getMethod
- {
- let builder = v8::FunctionTemplate::builder(
- |scope: &mut v8::HandleScope,
- args: v8::FunctionCallbackArguments,
- mut rv: v8::ReturnValue| {
- let external: v8::Local<v8::External> = args.data().try_into().unwrap();
- // SAFETY: This external is guaranteed to be a pointer to a ServerContext
- let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
- let token = args.get(0).uint32_value(scope).unwrap();
- let req = ctx.requests.get_mut(&token).unwrap();
- rv.set_uint32(get_method(req));
- },
- )
- .data(v8::External::new(scope, ctx as *mut _).into());
-
- let func = builder.build_fast(scope, &GET_METHOD_FAST, None, None, None);
- let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
-
- let key = v8::String::new(scope, "getMethod").unwrap();
- obj.set(scope, key.into(), func).unwrap();
- }
-
- // respond
- {
- let builder = v8::FunctionTemplate::builder(
- |scope: &mut v8::HandleScope,
- args: v8::FunctionCallbackArguments,
- mut rv: v8::ReturnValue| {
- let external: v8::Local<v8::External> = args.data().try_into().unwrap();
- // SAFETY: This external is guaranteed to be a pointer to a ServerContext
- let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
-
- let token = args.get(0).uint32_value(scope).unwrap();
-
- let response: v8::Local<v8::ArrayBufferView> =
- args.get(1).try_into().unwrap();
- let ab = response.buffer(scope).unwrap();
- let store = ab.get_backing_store();
- let (offset, len) = (response.byte_offset(), response.byte_length());
- // SAFETY: v8::SharedRef<v8::BackingStore> is similar to Arc<[u8]>,
- // it points to a fixed continuous slice of bytes on the heap.
- // We assume it's initialized and thus safe to read (though may not contain meaningful data)
- let response = unsafe {
- &*(&store[offset..offset + len] as *const _ as *const [u8])
- };
-
- let shutdown = args.get(2).boolean_value(scope);
-
- rv.set_uint32(flash_respond(ctx, token, shutdown, response));
- },
- )
- .data(v8::External::new(scope, ctx as *mut _).into());
-
- let func = builder.build_fast(scope, &RESPOND_FAST, None, None, None);
- let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
-
- let key = v8::String::new(scope, "respond").unwrap();
- obj.set(scope, key.into(), func).unwrap();
- }
-
- let value: v8::Local<v8::Value> = obj.into();
- value.into()
-}
-
-#[inline]
-fn has_body_stream(req: &Request) -> bool {
- let sock = req.socket();
- sock.read_rx.is_some()
-}
-
-#[op]
-fn op_flash_has_body_stream(
- op_state: &mut OpState,
- server_id: u32,
- token: u32,
-) -> bool {
- let req = get_request!(op_state, server_id, token);
- has_body_stream(req)
-}
-
-#[op]
-fn op_flash_headers(
- state: Rc<RefCell<OpState>>,
- server_id: u32,
- token: u32,
-) -> Result<Vec<(ByteString, ByteString)>, AnyError> {
- let mut op_state = state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx
- .servers
- .get_mut(&server_id)
- .ok_or_else(|| type_error("server closed"))?;
- let inner_req = &ctx
- .requests
- .get(&token)
- .ok_or_else(|| type_error("request closed"))?
- .inner
- .req;
- Ok(
- inner_req
- .headers
- .iter()
- .map(|h| (h.name.as_bytes().into(), h.value.into()))
- .collect(),
- )
-}
-
-#[op]
-fn op_flash_addr(
- state: Rc<RefCell<OpState>>,
- server_id: u32,
- token: u32,
-) -> Result<(String, u16), AnyError> {
- let mut op_state = state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx
- .servers
- .get_mut(&server_id)
- .ok_or_else(|| type_error("server closed"))?;
- let req = &ctx
- .requests
- .get(&token)
- .ok_or_else(|| type_error("request closed"))?;
- let socket = req.socket();
- Ok((socket.addr.ip().to_string(), socket.addr.port()))
-}
-
-// Remember the first packet we read? It probably also has some body data. This op quickly copies it into
-// a buffer and sets up channels for streaming the rest.
-#[op]
-fn op_flash_first_packet(
- op_state: &mut OpState,
- server_id: u32,
- token: u32,
-) -> Result<Option<ZeroCopyBuf>, AnyError> {
- let tx = get_request!(op_state, server_id, token);
- let sock = tx.socket();
-
- if !tx.te_chunked && tx.content_length.is_none() {
- return Ok(None);
- }
-
- if tx.expect_continue {
- let _ = sock.write(b"HTTP/1.1 100 Continue\r\n\r\n");
- tx.expect_continue = false;
- }
-
- let buffer = &tx.inner.buffer[tx.inner.body_offset..tx.inner.body_len];
- // Oh there is nothing here.
- if buffer.is_empty() {
- return Ok(Some(ZeroCopyBuf::empty()));
- }
-
- if tx.te_chunked {
- let mut buf = vec![0; 1024];
- let mut offset = 0;
- let mut decoder = chunked::Decoder::new(
- std::io::Cursor::new(buffer),
- tx.remaining_chunk_size,
- );
-
- loop {
- match decoder.read(&mut buf[offset..]) {
- Ok(n) => {
- tx.remaining_chunk_size = decoder.remaining_chunks_size;
- offset += n;
-
- if n == 0 {
- tx.te_chunked = false;
- buf.truncate(offset);
- return Ok(Some(buf.into()));
- }
-
- if offset < buf.len()
- && decoder.source.position() < buffer.len() as u64
- {
- continue;
- }
-
- buf.truncate(offset);
- return Ok(Some(buf.into()));
- }
- Err(e) => {
- return Err(type_error(format!("{e}")));
- }
- }
- }
- }
-
- tx.content_length
- .ok_or_else(|| type_error("no content-length"))?;
- tx.content_read += buffer.len();
-
- Ok(Some(buffer.to_vec().into()))
-}
-
-#[op]
-async fn op_flash_read_body(
- state: Rc<RefCell<OpState>>,
- server_id: u32,
- token: u32,
- mut buf: ZeroCopyBuf,
-) -> usize {
- // SAFETY: we cannot hold op_state borrow across the await point. The JS caller
- // is responsible for ensuring this is not called concurrently.
- let ctx = unsafe {
- {
- let op_state = &mut state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- flash_ctx.servers.get_mut(&server_id).unwrap() as *mut ServerContext
- }
- .as_mut()
- .unwrap()
- };
- let tx = match ctx.requests.get_mut(&token) {
- Some(tx) => tx,
- // request was already consumed by caller
- None => return 0,
- };
-
- if tx.te_chunked {
- let mut decoder =
- chunked::Decoder::new(tx.socket(), tx.remaining_chunk_size);
- loop {
- let sock = tx.socket();
-
- let _lock = sock.read_lock.lock().unwrap();
- match decoder.read(&mut buf) {
- Ok(n) => {
- tx.remaining_chunk_size = decoder.remaining_chunks_size;
- return n;
- }
- Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => {
- panic!("chunked read error: {e}");
- }
- Err(_) => {
- drop(_lock);
- sock.read_rx.as_mut().unwrap().recv().await.unwrap();
- }
- }
- }
- }
-
- if let Some(content_length) = tx.content_length {
- let sock = tx.socket();
- let l = sock.read_lock.clone();
-
- loop {
- let _lock = l.lock().unwrap();
- if tx.content_read >= content_length as usize {
- return 0;
- }
- match sock.read(&mut buf) {
- Ok(n) => {
- tx.content_read += n;
- return n;
- }
- _ => {
- drop(_lock);
- sock.read_rx.as_mut().unwrap().recv().await.unwrap();
- }
- }
- }
- }
-
- 0
-}
-
-// https://github.com/hyperium/hyper/blob/0c8ee93d7f557afc63ca2a5686d19071813ab2b7/src/headers.rs#L67
-#[inline]
-fn from_digits(bytes: &[u8]) -> Option<u64> {
- // cannot use FromStr for u64, since it allows a signed prefix
- let mut result = 0u64;
- const RADIX: u64 = 10;
- if bytes.is_empty() {
- return None;
- }
- for &b in bytes {
- // can't use char::to_digit, since we haven't verified these bytes
- // are utf-8.
- match b {
- b'0'..=b'9' => {
- result = result.checked_mul(RADIX)?;
- result = result.checked_add((b - b'0') as u64)?;
- }
- _ => {
- return None;
- }
- }
- }
- Some(result)
-}
-
-#[inline]
-fn connection_has(value: &HeaderValue, needle: &str) -> bool {
- if let Ok(s) = value.to_str() {
- for val in s.split(',') {
- if val.trim().eq_ignore_ascii_case(needle) {
- return true;
- }
- }
- }
- false
-}
-
-#[derive(Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ListenOpts {
- cert: Option<String>,
- key: Option<String>,
- hostname: String,
- port: u16,
- reuseport: bool,
-}
-
-fn run_server(
- tx: mpsc::Sender<Request>,
- listening_tx: mpsc::Sender<u16>,
- mut close_rx: mpsc::Receiver<()>,
- addr: SocketAddr,
- maybe_cert: Option<String>,
- maybe_key: Option<String>,
- reuseport: bool,
-) -> Result<(), AnyError> {
- let domain = if addr.is_ipv4() {
- socket2::Domain::IPV4
- } else {
- socket2::Domain::IPV6
- };
- let socket = Socket::new(domain, socket2::Type::STREAM, None)?;
-
- #[cfg(not(windows))]
- socket.set_reuse_address(true)?;
- if reuseport {
- #[cfg(target_os = "linux")]
- socket.set_reuse_port(true)?;
- }
-
- let socket_addr = socket2::SockAddr::from(addr);
- socket.bind(&socket_addr)?;
- socket.listen(128)?;
- socket.set_nonblocking(true)?;
- let std_listener: std::net::TcpListener = socket.into();
- let mut listener = TcpListener::from_std(std_listener);
-
- let mut poll = Poll::new()?;
- let token = Token(0);
- poll
- .registry()
- .register(&mut listener, token, Interest::READABLE)
- .unwrap();
-
- let tls_context: Option<Arc<rustls::ServerConfig>> = {
- if let Some(cert) = maybe_cert {
- let key = maybe_key.unwrap();
- let certificate_chain: Vec<rustls::Certificate> =
- load_certs(&mut BufReader::new(cert.as_bytes()))?;
- let private_key = load_private_keys(key.as_bytes())?.remove(0);
-
- let config = rustls::ServerConfig::builder()
- .with_safe_defaults()
- .with_no_client_auth()
- .with_single_cert(certificate_chain, private_key)
- .expect("invalid key or certificate");
- Some(Arc::new(config))
- } else {
- None
- }
- };
-
- listening_tx
- .blocking_send(listener.local_addr().unwrap().port())
- .unwrap();
- let mut sockets = HashMap::with_capacity(1000);
- let mut counter: usize = 1;
- let mut events = Events::with_capacity(1024);
- 'outer: loop {
- let result = close_rx.try_recv();
- if result.is_ok() {
- break 'outer;
- }
- // FIXME(bartlomieju): how does Tokio handle it? I just put random 100ms
- // timeout here to handle close signal.
- match poll.poll(&mut events, Some(Duration::from_millis(100))) {
- Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
- Err(e) => panic!("{}", e),
- Ok(()) => (),
- }
- 'events: for event in &events {
- if close_rx.try_recv().is_ok() {
- break 'outer;
- }
- let token = event.token();
- match token {
- Token(0) => loop {
- match listener.accept() {
- Ok((mut socket, addr)) => {
- counter += 1;
- let token = Token(counter);
- poll
- .registry()
- .register(&mut socket, token, Interest::READABLE)
- .unwrap();
-
- let socket = match tls_context {
- Some(ref tls_conf) => {
- let connection =
- rustls::ServerConnection::new(tls_conf.clone()).unwrap();
- InnerStream::Tls(Box::new(rustls::StreamOwned::new(
- connection, socket,
- )))
- }
- None => InnerStream::Tcp(socket),
- };
- let stream = Box::pin(Stream {
- inner: socket,
- detached: false,
- read_rx: None,
- read_tx: None,
- read_lock: Arc::new(Mutex::new(())),
- parse_done: ParseStatus::None,
- buffer: UnsafeCell::new(vec![0_u8; 1024]),
- addr,
- });
-
- trace!("New connection: {}", token.0);
- sockets.insert(token, stream);
- }
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
- Err(_) => break,
- }
- },
- token => {
- let socket = sockets.get_mut(&token).unwrap();
- // SAFETY: guarantee that we will never move the data out of the mutable reference.
- let socket = unsafe {
- let mut_ref: Pin<&mut Stream> = Pin::as_mut(socket);
- Pin::get_unchecked_mut(mut_ref)
- };
- let sock_ptr = socket as *mut _;
-
- if socket.detached {
- match &mut socket.inner {
- InnerStream::Tcp(ref mut socket) => {
- poll.registry().deregister(socket).unwrap();
- }
- InnerStream::Tls(_) => {
- todo!("upgrade tls not implemented");
- }
- }
-
- let boxed = sockets.remove(&token).unwrap();
- std::mem::forget(boxed);
- trace!("Socket detached: {}", token.0);
- continue;
- }
-
- debug_assert!(event.is_readable());
-
- trace!("Socket readable: {}", token.0);
- if let Some(tx) = &socket.read_tx {
- {
- let _l = socket.read_lock.lock().unwrap();
- }
- trace!("Sending readiness notification: {}", token.0);
- let _ = tx.blocking_send(());
-
- continue;
- }
-
- let mut headers = vec![httparse::EMPTY_HEADER; 40];
- let mut req = httparse::Request::new(&mut headers);
- let body_offset;
- let body_len;
- loop {
- // SAFETY: It is safe for the read buf to be mutable here.
- let buffer = unsafe { &mut *socket.buffer.get() };
- let offset = match socket.parse_done {
- ParseStatus::None => 0,
- ParseStatus::Ongoing(offset) => offset,
- };
- if offset >= buffer.len() {
- buffer.resize(offset * 2, 0);
- }
- let nread = socket.read(&mut buffer[offset..]);
-
- match nread {
- Ok(0) => {
- trace!("Socket closed: {}", token.0);
- // FIXME: don't remove while JS is writing!
- // sockets.remove(&token);
- continue 'events;
- }
- Ok(read) => {
- match req.parse(&buffer[..offset + read]) {
- Ok(httparse::Status::Complete(n)) => {
- body_offset = n;
- body_len = offset + read;
- socket.parse_done = ParseStatus::None;
- // On Windows, We must keep calling socket.read() until it fails with WouldBlock.
- //
- // Mio tries to emulate edge triggered events on Windows.
- // AFAICT it only rearms the event on WouldBlock, but it doesn't when a partial read happens.
- // https://github.com/denoland/deno/issues/15549
- #[cfg(target_os = "windows")]
- match &mut socket.inner {
- InnerStream::Tcp(ref mut socket) => {
- poll
- .registry()
- .reregister(socket, token, Interest::READABLE)
- .unwrap();
- }
- InnerStream::Tls(ref mut socket) => {
- poll
- .registry()
- .reregister(
- &mut socket.sock,
- token,
- Interest::READABLE,
- )
- .unwrap();
- }
- };
- break;
- }
- Ok(httparse::Status::Partial) => {
- socket.parse_done = ParseStatus::Ongoing(offset + read);
- continue;
- }
- Err(_) => {
- let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
- continue 'events;
- }
- }
- }
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
- break 'events
- }
- Err(_) => break 'events,
- }
- }
-
- debug_assert_eq!(socket.parse_done, ParseStatus::None);
- if let Some(method) = &req.method {
- if method == &"POST" || method == &"PUT" {
- let (tx, rx) = mpsc::channel(100);
- socket.read_tx = Some(tx);
- socket.read_rx = Some(rx);
- }
- }
-
- // SAFETY: It is safe for the read buf to be mutable here.
- let buffer = unsafe { &mut *socket.buffer.get() };
- let inner_req = InnerRequest {
- // SAFETY: backing buffer is pinned and lives as long as the request.
- req: unsafe { transmute::<httparse::Request<'_, '_>, _>(req) },
- // SAFETY: backing buffer is pinned and lives as long as the request.
- _headers: unsafe {
- transmute::<Vec<httparse::Header<'_>>, _>(headers)
- },
- buffer: Pin::new(
- replace(buffer, vec![0_u8; 1024]).into_boxed_slice(),
- ),
- body_offset,
- body_len,
- };
- // h1
- // https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177
- // https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127
- let mut keep_alive = inner_req.req.version.unwrap() == 1;
- let mut expect_continue = false;
- let mut te = false;
- let mut te_chunked = false;
- let mut content_length = None;
- for header in inner_req.req.headers.iter() {
- match HeaderName::from_bytes(header.name.as_bytes()) {
- Ok(CONNECTION) => {
- // SAFETY: illegal bytes are validated by httparse.
- let value = unsafe {
- HeaderValue::from_maybe_shared_unchecked(header.value)
- };
- if keep_alive {
- // 1.1
- keep_alive = !connection_has(&value, "close");
- } else {
- // 1.0
- keep_alive = connection_has(&value, "keep-alive");
- }
- }
- Ok(TRANSFER_ENCODING) => {
- // https://tools.ietf.org/html/rfc7230#section-3.3.3
- debug_assert!(inner_req.req.version.unwrap() == 1);
- // Two states for Transfer-Encoding because we want to make sure Content-Length handling knows it.
- te = true;
- content_length = None;
- // SAFETY: illegal bytes are validated by httparse.
- let value = unsafe {
- HeaderValue::from_maybe_shared_unchecked(header.value)
- };
- if let Ok(Some(encoding)) =
- value.to_str().map(|s| s.rsplit(',').next())
- {
- // Chunked must always be the last encoding
- if encoding.trim().eq_ignore_ascii_case("chunked") {
- te_chunked = true;
- }
- }
- }
- // Transfer-Encoding overrides the Content-Length.
- Ok(CONTENT_LENGTH) if !te => {
- if let Some(len) = from_digits(header.value) {
- if let Some(prev) = content_length {
- if prev != len {
- let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
- continue 'events;
- }
- continue;
- }
- content_length = Some(len);
- } else {
- let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
- continue 'events;
- }
- }
- Ok(EXPECT) if inner_req.req.version.unwrap() != 0 => {
- expect_continue =
- header.value.eq_ignore_ascii_case(b"100-continue");
- }
- _ => {}
- }
- }
-
- // There is Transfer-Encoding but its not chunked.
- if te && !te_chunked {
- let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
- continue 'events;
- }
-
- tx.blocking_send(Request {
- socket: sock_ptr,
- // SAFETY: headers backing buffer outlives the mio event loop ('static)
- inner: inner_req,
- keep_alive,
- te_chunked,
- remaining_chunk_size: None,
- content_read: 0,
- content_length,
- expect_continue,
- })
- .ok();
- }
- }
- }
- }
-
- Ok(())
-}
-
-fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) {
- // Default to localhost if given just the port. Example: ":80"
- if hostname.is_empty() {
- return ("0.0.0.0", port);
- }
-
- // If this looks like an ipv6 IP address. Example: "[2001:db8::1]"
- // Then we remove the brackets.
- let addr = hostname.trim_start_matches('[').trim_end_matches(']');
- (addr, port)
-}
-
-/// Resolve network address *synchronously*.
-pub fn resolve_addr_sync(
- hostname: &str,
- port: u16,
-) -> Result<impl Iterator<Item = SocketAddr>, AnyError> {
- let addr_port_pair = make_addr_port_pair(hostname, port);
- let result = addr_port_pair.to_socket_addrs()?;
- Ok(result)
-}
-
-fn flash_serve<P>(
- state: &mut OpState,
- opts: ListenOpts,
-) -> Result<u32, AnyError>
-where
- P: FlashPermissions + 'static,
-{
- state
- .borrow_mut::<P>()
- .check_net(&(&opts.hostname, Some(opts.port)), "Deno.serve()")?;
-
- let addr = resolve_addr_sync(&opts.hostname, opts.port)?
- .next()
- .ok_or_else(|| generic_error("No resolved address found"))?;
- let (tx, rx) = mpsc::channel(100);
- let (close_tx, close_rx) = mpsc::channel(1);
- let (listening_tx, listening_rx) = mpsc::channel(1);
- let ctx = ServerContext {
- _addr: addr,
- tx,
- rx,
- requests: HashMap::with_capacity(1000),
- next_token: 0,
- close_tx,
- listening_rx: Some(listening_rx),
- cancel_handle: CancelHandle::new_rc(),
- };
- let tx = ctx.tx.clone();
- let maybe_cert = opts.cert;
- let maybe_key = opts.key;
- let reuseport = opts.reuseport;
- let join_handle = tokio::task::spawn_blocking(move || {
- run_server(
- tx,
- listening_tx,
- close_rx,
- addr,
- maybe_cert,
- maybe_key,
- reuseport,
- )
- });
- let flash_ctx = state.borrow_mut::<FlashContext>();
- let server_id = flash_ctx.next_server_id;
- flash_ctx.next_server_id += 1;
- flash_ctx.join_handles.insert(server_id, join_handle);
- flash_ctx.servers.insert(server_id, ctx);
- Ok(server_id)
-}
-
-#[op]
-fn op_flash_serve<P>(
- state: &mut OpState,
- opts: ListenOpts,
-) -> Result<u32, AnyError>
-where
- P: FlashPermissions + 'static,
-{
- check_unstable(state, "Deno.serve");
- flash_serve::<P>(state, opts)
-}
-
-#[op]
-fn op_node_unstable_flash_serve<P>(
- state: &mut OpState,
- opts: ListenOpts,
-) -> Result<u32, AnyError>
-where
- P: FlashPermissions + 'static,
-{
- flash_serve::<P>(state, opts)
-}
-
-#[op]
-fn op_flash_wait_for_listening(
- state: Rc<RefCell<OpState>>,
- server_id: u32,
-) -> Result<impl Future<Output = Result<u16, AnyError>> + 'static, AnyError> {
- let mut listening_rx = {
- let mut state = state.borrow_mut();
- let flash_ctx = state.borrow_mut::<FlashContext>();
- let server_ctx = flash_ctx
- .servers
- .get_mut(&server_id)
- .ok_or_else(|| type_error("server not found"))?;
- server_ctx.listening_rx.take().unwrap()
- };
- Ok(async move {
- if let Some(port) = listening_rx.recv().await {
- Ok(port)
- } else {
- Err(generic_error("This error will be discarded"))
- }
- })
-}
-
-#[op]
-fn op_flash_drive_server(
- state: Rc<RefCell<OpState>>,
- server_id: u32,
-) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
- let join_handle = {
- let mut state = state.borrow_mut();
- let flash_ctx = state.borrow_mut::<FlashContext>();
- flash_ctx
- .join_handles
- .remove(&server_id)
- .ok_or_else(|| type_error("server not found"))?
- };
- Ok(async move {
- join_handle
- .await
- .map_err(|_| type_error("server join error"))??;
- Ok(())
- })
-}
-
-// Asychronous version of op_flash_next. This can be a bottleneck under
-// heavy load, it should be used as a fallback if there are no buffered
-// requests i.e `op_flash_next() == 0`.
-#[op]
-async fn op_flash_next_async(
- op_state: Rc<RefCell<OpState>>,
- server_id: u32,
-) -> u32 {
- let ctx = {
- let mut op_state = op_state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- ctx as *mut ServerContext
- };
- // SAFETY: we cannot hold op_state borrow across the await point. The JS caller
- // is responsible for ensuring this is not called concurrently.
- let ctx = unsafe { &mut *ctx };
- let cancel_handle = &ctx.cancel_handle;
-
- if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await {
- ctx.requests.insert(ctx.next_token, req);
- ctx.next_token += 1;
- return 1;
- }
-
- 0
-}
-
-// Synchronous version of op_flash_next_async. Under heavy load,
-// this can collect buffered requests from rx channel and return tokens in a single batch.
-//
-// perf: please do not add any arguments to this op. With optimizations enabled,
-// the ContextScope creation is optimized away and the op is as simple as:
-// f(info: *const v8::FunctionCallbackInfo) { let rv = ...; rv.set_uint32(op_flash_next()); }
-#[op]
-fn op_flash_next(state: &mut OpState) -> u32 {
- let flash_ctx = state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&0).unwrap();
- next_request_sync(ctx)
-}
-
-// Syncrhonous version of op_flash_next_async. Under heavy load,
-// this can collect buffered requests from rx channel and return tokens in a single batch.
-#[op]
-fn op_flash_next_server(state: &mut OpState, server_id: u32) -> u32 {
- let flash_ctx = state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- next_request_sync(ctx)
-}
-
-// Wrapper type for tokio::net::TcpStream that implements
-// deno_websocket::UpgradedStream
-struct UpgradedStream(tokio::net::TcpStream);
-impl tokio::io::AsyncRead for UpgradedStream {
- fn poll_read(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &mut tokio::io::ReadBuf,
- ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
- Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
- }
-}
-
-impl tokio::io::AsyncWrite for UpgradedStream {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &[u8],
- ) -> std::task::Poll<Result<usize, std::io::Error>> {
- Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
- }
- fn poll_flush(
- self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- Pin::new(&mut self.get_mut().0).poll_flush(cx)
- }
- fn poll_shutdown(
- self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
- }
-}
-
-impl deno_websocket::Upgraded for UpgradedStream {}
-
-#[inline]
-pub fn detach_socket(
- ctx: &mut ServerContext,
- token: u32,
-) -> Result<tokio::net::TcpStream, AnyError> {
- // Two main 'hacks' to get this working:
- // * make server thread forget about the socket. `detach_ownership` prevents the socket from being
- // dropped on the server thread.
- // * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we
- // use raw fds.
- let tx = ctx
- .requests
- .remove(&token)
- .ok_or_else(|| type_error("request closed"))?;
- let stream = tx.socket();
- // prevent socket from being dropped on server thread.
- // TODO(@littledivy): Box-ify, since there is no overhead.
- stream.detach_ownership();
-
- #[cfg(unix)]
- let std_stream = {
- use std::os::unix::prelude::AsRawFd;
- use std::os::unix::prelude::FromRawFd;
- let fd = match stream.inner {
- InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(),
- _ => todo!(),
- };
- // SAFETY: `fd` is a valid file descriptor.
- unsafe { std::net::TcpStream::from_raw_fd(fd) }
- };
- #[cfg(windows)]
- let std_stream = {
- use std::os::windows::prelude::AsRawSocket;
- use std::os::windows::prelude::FromRawSocket;
- let fd = match stream.inner {
- InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(),
- _ => todo!(),
- };
- // SAFETY: `fd` is a valid file descriptor.
- unsafe { std::net::TcpStream::from_raw_socket(fd) }
- };
- let stream = tokio::net::TcpStream::from_std(std_stream)?;
- Ok(stream)
-}
-
-#[op]
-async fn op_flash_upgrade_websocket(
- state: Rc<RefCell<OpState>>,
- server_id: u32,
- token: u32,
-) -> Result<deno_core::ResourceId, AnyError> {
- let stream = {
- let op_state = &mut state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- detach_socket(flash_ctx.servers.get_mut(&server_id).unwrap(), token)?
- };
- deno_websocket::ws_create_server_stream(
- &state,
- Box::pin(UpgradedStream(stream)),
- )
- .await
-}
-
-pub struct Unstable(pub bool);
-
-fn check_unstable(state: &OpState, api_name: &str) {
- let unstable = state.borrow::<Unstable>();
-
- if !unstable.0 {
- eprintln!(
- "Unstable API '{api_name}'. The --unstable flag must be provided."
- );
- std::process::exit(70);
- }
-}
-
-pub trait FlashPermissions {
- fn check_net<T: AsRef<str>>(
- &mut self,
- _host: &(T, Option<u16>),
- _api_name: &str,
- ) -> Result<(), AnyError>;
-}
-
-deno_core::extension!(deno_flash,
- deps = [
- deno_web,
- deno_net,
- deno_fetch,
- deno_websocket,
- deno_http
- ],
- parameters = [P: FlashPermissions],
- ops = [
- op_flash_serve<P>,
- op_node_unstable_flash_serve<P>,
- op_flash_respond,
- op_flash_respond_async,
- op_flash_respond_chunked,
- op_flash_method,
- op_flash_path,
- op_flash_headers,
- op_flash_addr,
- op_flash_next,
- op_flash_next_server,
- op_flash_next_async,
- op_flash_read_body,
- op_flash_upgrade_websocket,
- op_flash_drive_server,
- op_flash_wait_for_listening,
- op_flash_first_packet,
- op_flash_has_body_stream,
- op_flash_close_server,
- op_flash_make_request,
- op_flash_write_resource,
- op_try_flash_respond_chunked,
- ],
- esm = [ "01_http.js" ],
- options = {
- unstable: bool,
- },
- state = |state, options| {
- state.put(Unstable(options.unstable));
- state.put(FlashContext {
- next_server_id: 0,
- join_handles: HashMap::default(),
- servers: HashMap::default(),
- });
- },
-);
diff --git a/ext/flash/request.rs b/ext/flash/request.rs
deleted file mode 100644
index 32ab46ca2..000000000
--- a/ext/flash/request.rs
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-use crate::Stream;
-use std::pin::Pin;
-
-#[derive(Debug)]
-pub struct InnerRequest {
- /// Backing buffer for the request.
- pub buffer: Pin<Box<[u8]>>,
- /// Owned headers, we have to keep it around since its referenced in `req`.
- pub _headers: Vec<httparse::Header<'static>>,
- /// Fully parsed request.
- pub req: httparse::Request<'static, 'static>,
- pub body_offset: usize,
- pub body_len: usize,
-}
-
-#[derive(Debug)]
-pub struct Request {
- pub inner: InnerRequest,
- // Pointer to stream owned by the server loop thread.
- //
- // Dereferencing is safe until server thread finishes and
- // op_flash_serve resolves or websocket upgrade is performed.
- pub socket: *mut Stream,
- pub keep_alive: bool,
- pub content_read: usize,
- pub content_length: Option<u64>,
- pub remaining_chunk_size: Option<usize>,
- pub te_chunked: bool,
- pub expect_continue: bool,
-}
-
-// SAFETY: Sent from server thread to JS thread.
-// See comment above for `socket`.
-unsafe impl Send for Request {}
-
-impl Request {
- #[inline(always)]
- pub fn socket<'a>(&self) -> &'a mut Stream {
- // SAFETY: Dereferencing is safe until server thread detaches socket or finishes.
- unsafe { &mut *self.socket }
- }
-
- #[inline(always)]
- pub fn method(&self) -> &str {
- self.inner.req.method.unwrap()
- }
-}
diff --git a/ext/flash/sendfile.rs b/ext/flash/sendfile.rs
deleted file mode 100644
index 18dc3a39d..000000000
--- a/ext/flash/sendfile.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-// Forked from https://github.com/Thomasdezeeuw/sendfile/blob/024f82cd4dede9048392a5bd6d8afcd4d5aa83d5/src/lib.rs
-
-use std::future::Future;
-use std::io;
-use std::os::unix::io::RawFd;
-use std::pin::Pin;
-use std::task::Poll;
-use std::task::{self};
-
-pub struct SendFile {
- pub io: (RawFd, RawFd),
- pub written: usize,
-}
-
-impl SendFile {
- #[inline]
- pub fn try_send(&mut self) -> Result<usize, std::io::Error> {
- #[cfg(target_os = "linux")]
- {
- // This is the maximum the Linux kernel will write in a single call.
- let count = 0x7ffff000;
- let mut offset = self.written as libc::off_t;
-
- let res =
- // SAFETY: call to libc::sendfile()
- unsafe { libc::sendfile(self.io.1, self.io.0, &mut offset, count) };
- if res == -1 {
- Err(io::Error::last_os_error())
- } else {
- self.written = offset as usize;
- Ok(res as usize)
- }
- }
- #[cfg(target_os = "macos")]
- {
- // Send all bytes.
- let mut length = 0;
- // On macOS `length` is value-result parameter. It determines the number
- // of bytes to write and returns the number of bytes written also in
- // case of `EAGAIN` errors.
- // SAFETY: call to libc::sendfile()
- let res = unsafe {
- libc::sendfile(
- self.io.0,
- self.io.1,
- self.written as libc::off_t,
- &mut length,
- std::ptr::null_mut(),
- 0,
- )
- };
- self.written += length as usize;
- if res == -1 {
- Err(io::Error::last_os_error())
- } else {
- Ok(length as usize)
- }
- }
- }
-}
-
-impl Future for SendFile {
- type Output = Result<(), std::io::Error>;
-
- fn poll(
- mut self: Pin<&mut Self>,
- _: &mut task::Context<'_>,
- ) -> Poll<Self::Output> {
- loop {
- match self.try_send() {
- Ok(0) => break Poll::Ready(Ok(())),
- Ok(_) => continue,
- Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
- break Poll::Pending
- }
- Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, // Try again.
- Err(err) => break Poll::Ready(Err(err)),
- }
- }
- }
-}
diff --git a/ext/flash/socket.rs b/ext/flash/socket.rs
deleted file mode 100644
index cf9501634..000000000
--- a/ext/flash/socket.rs
+++ /dev/null
@@ -1,151 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-use std::cell::UnsafeCell;
-use std::future::Future;
-use std::io::Read;
-use std::io::Write;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::sync::Mutex;
-
-use deno_core::error::AnyError;
-use mio::net::TcpStream;
-use tokio::sync::mpsc;
-
-use crate::ParseStatus;
-
-type TlsTcpStream = rustls::StreamOwned<rustls::ServerConnection, TcpStream>;
-
-pub enum InnerStream {
- Tcp(TcpStream),
- Tls(Box<TlsTcpStream>),
-}
-
-pub struct Stream {
- pub inner: InnerStream,
- pub detached: bool,
- pub read_rx: Option<mpsc::Receiver<()>>,
- pub read_tx: Option<mpsc::Sender<()>>,
- pub parse_done: ParseStatus,
- pub buffer: UnsafeCell<Vec<u8>>,
- pub read_lock: Arc<Mutex<()>>,
- pub addr: std::net::SocketAddr,
-}
-
-impl Stream {
- pub fn detach_ownership(&mut self) {
- self.detached = true;
- }
-
- /// Try to write to the socket.
- #[inline]
- pub fn try_write(&mut self, buf: &[u8]) -> usize {
- let mut nwritten = 0;
- while nwritten < buf.len() {
- match self.write(&buf[nwritten..]) {
- Ok(n) => nwritten += n,
- Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
- break;
- }
- Err(e) => {
- log::trace!("Error writing to socket: {}", e);
- break;
- }
- }
- }
- nwritten
- }
-
- #[inline]
- pub fn shutdown(&mut self) {
- match &mut self.inner {
- InnerStream::Tcp(stream) => {
- // Typically shutdown shouldn't fail.
- let _ = stream.shutdown(std::net::Shutdown::Both);
- }
- InnerStream::Tls(stream) => {
- let _ = stream.sock.shutdown(std::net::Shutdown::Both);
- }
- }
- }
-
- pub fn as_std(&mut self) -> std::net::TcpStream {
- #[cfg(unix)]
- let std_stream = {
- use std::os::unix::prelude::AsRawFd;
- use std::os::unix::prelude::FromRawFd;
- let fd = match self.inner {
- InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(),
- _ => todo!(),
- };
- // SAFETY: `fd` is a valid file descriptor.
- unsafe { std::net::TcpStream::from_raw_fd(fd) }
- };
- #[cfg(windows)]
- let std_stream = {
- use std::os::windows::prelude::AsRawSocket;
- use std::os::windows::prelude::FromRawSocket;
- let fd = match self.inner {
- InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(),
- _ => todo!(),
- };
- // SAFETY: `fd` is a valid file descriptor.
- unsafe { std::net::TcpStream::from_raw_socket(fd) }
- };
- std_stream
- }
-
- #[inline]
- pub async fn with_async_stream<F, T>(&mut self, f: F) -> Result<T, AnyError>
- where
- F: FnOnce(
- &mut tokio::net::TcpStream,
- ) -> Pin<Box<dyn '_ + Future<Output = Result<T, AnyError>>>>,
- {
- let mut async_stream = tokio::net::TcpStream::from_std(self.as_std())?;
- let result = f(&mut async_stream).await?;
- forget_stream(async_stream.into_std()?);
- Ok(result)
- }
-}
-
-#[inline]
-pub fn forget_stream(stream: std::net::TcpStream) {
- #[cfg(unix)]
- {
- use std::os::unix::prelude::IntoRawFd;
- let _ = stream.into_raw_fd();
- }
- #[cfg(windows)]
- {
- use std::os::windows::prelude::IntoRawSocket;
- let _ = stream.into_raw_socket();
- }
-}
-
-impl Write for Stream {
- #[inline]
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- match self.inner {
- InnerStream::Tcp(ref mut stream) => stream.write(buf),
- InnerStream::Tls(ref mut stream) => stream.write(buf),
- }
- }
- #[inline]
- fn flush(&mut self) -> std::io::Result<()> {
- match self.inner {
- InnerStream::Tcp(ref mut stream) => stream.flush(),
- InnerStream::Tls(ref mut stream) => stream.flush(),
- }
- }
-}
-
-impl Read for Stream {
- #[inline]
- fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
- match self.inner {
- InnerStream::Tcp(ref mut stream) => stream.read(buf),
- InnerStream::Tls(ref mut stream) => stream.read(buf),
- }
- }
-}
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml
index 13bbfa34b..01675c120 100644
--- a/runtime/Cargo.toml
+++ b/runtime/Cargo.toml
@@ -41,7 +41,6 @@ deno_core.workspace = true
deno_crypto.workspace = true
deno_fetch.workspace = true
deno_ffi.workspace = true
-deno_flash.workspace = true
deno_fs.workspace = true
deno_http.workspace = true
deno_io.workspace = true
@@ -68,7 +67,6 @@ deno_core.workspace = true
deno_crypto.workspace = true
deno_fetch.workspace = true
deno_ffi.workspace = true
-deno_flash.workspace = true
deno_fs.workspace = true
deno_http.workspace = true
deno_io.workspace = true
diff --git a/runtime/build.rs b/runtime/build.rs
index abdd0e584..809e32a76 100644
--- a/runtime/build.rs
+++ b/runtime/build.rs
@@ -120,16 +120,6 @@ mod startup_snapshot {
}
}
- impl deno_flash::FlashPermissions for Permissions {
- fn check_net<T: AsRef<str>>(
- &mut self,
- _host: &(T, Option<u16>),
- _api_name: &str,
- ) -> Result<(), deno_core::error::AnyError> {
- unreachable!("snapshotting!")
- }
- }
-
impl deno_node::NodePermissions for Permissions {
fn check_read(
&mut self,
@@ -244,7 +234,6 @@ mod startup_snapshot {
deno_net,
deno_napi,
deno_http,
- deno_flash,
deno_io,
deno_fs
],
@@ -322,7 +311,6 @@ mod startup_snapshot {
deno_http::deno_http::init_ops_and_esm(),
deno_io::deno_io::init_ops_and_esm(Default::default()),
deno_fs::deno_fs::init_ops_and_esm::<Permissions>(false),
- deno_flash::deno_flash::init_ops_and_esm::<Permissions>(false), // No --unstable
runtime::init_ops_and_esm(),
// FIXME(bartlomieju): these extensions are specified last, because they
// depend on `runtime`, even though it should be other way around
diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js
index 1eb479114..bb6ba3b08 100644
--- a/runtime/js/90_deno_ns.js
+++ b/runtime/js/90_deno_ns.js
@@ -9,7 +9,6 @@ import * as ffi from "ext:deno_ffi/00_ffi.js";
import * as net from "ext:deno_net/01_net.js";
import * as tls from "ext:deno_net/02_tls.js";
import * as http from "ext:deno_http/01_http.js";
-import * as flash from "ext:deno_flash/01_http.js";
import * as errors from "ext:runtime/01_errors.js";
import * as version from "ext:runtime/01_version.ts";
import * as permissions from "ext:runtime/10_permissions.js";
@@ -172,7 +171,6 @@ const denoNsUnstable = {
funlock: fs.funlock,
funlockSync: fs.funlockSync,
upgradeHttp: http.upgradeHttp,
- upgradeHttpRaw: flash.upgradeHttpRaw,
serve: http.serve,
openKv: kv.openKv,
Kv: kv.Kv,
diff --git a/runtime/lib.rs b/runtime/lib.rs
index 994e043fd..6745c4a56 100644
--- a/runtime/lib.rs
+++ b/runtime/lib.rs
@@ -7,7 +7,6 @@ pub use deno_core;
pub use deno_crypto;
pub use deno_fetch;
pub use deno_ffi;
-pub use deno_flash;
pub use deno_fs;
pub use deno_http;
pub use deno_io;
diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs
index 3a316d800..767fc3ae0 100644
--- a/runtime/ops/http.rs
+++ b/runtime/ops/http.rs
@@ -29,7 +29,7 @@ use tokio::net::UnixStream;
deno_core::extension!(
deno_http_runtime,
- ops = [op_http_start, op_http_upgrade, op_flash_upgrade_http],
+ ops = [op_http_start, op_http_upgrade],
customizer = |ext: &mut deno_core::ExtensionBuilder| {
ext.force_op_registration();
},
@@ -91,23 +91,6 @@ fn op_http_start(
Err(bad_resource_id())
}
-#[op]
-fn op_flash_upgrade_http(
- state: &mut OpState,
- token: u32,
- server_id: u32,
-) -> Result<deno_core::ResourceId, AnyError> {
- let flash_ctx = state.borrow_mut::<deno_flash::FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
-
- let tcp_stream = deno_flash::detach_socket(ctx, token)?;
- Ok(
- state
- .resource_table
- .add(TcpStreamResource::new(tcp_stream.into_split())),
- )
-}
-
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct HttpUpgradeResult {
diff --git a/runtime/permissions/mod.rs b/runtime/permissions/mod.rs
index 2093b08f9..7e1772ee3 100644
--- a/runtime/permissions/mod.rs
+++ b/runtime/permissions/mod.rs
@@ -1826,17 +1826,6 @@ impl PermissionsContainer {
}
}
-impl deno_flash::FlashPermissions for PermissionsContainer {
- #[inline(always)]
- fn check_net<T: AsRef<str>>(
- &mut self,
- host: &(T, Option<u16>),
- api_name: &str,
- ) -> Result<(), AnyError> {
- self.0.lock().net.check(host, Some(api_name))
- }
-}
-
impl deno_node::NodePermissions for PermissionsContainer {
#[inline(always)]
fn check_read(&mut self, path: &Path) -> Result<(), AnyError> {
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 399b22912..8bd5cf21e 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -440,7 +440,6 @@ impl WebWorker {
deno_http::deno_http::init_ops(),
deno_io::deno_io::init_ops(Some(options.stdio)),
deno_fs::deno_fs::init_ops::<PermissionsContainer>(unstable),
- deno_flash::deno_flash::init_ops::<PermissionsContainer>(unstable),
deno_node::deno_node::init_ops::<crate::RuntimeNodeEnv>(
options.npm_resolver,
),
diff --git a/runtime/worker.rs b/runtime/worker.rs
index 296e9c4b1..48bf7b09f 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -264,7 +264,6 @@ impl MainWorker {
deno_http::deno_http::init_ops(),
deno_io::deno_io::init_ops(Some(options.stdio)),
deno_fs::deno_fs::init_ops::<PermissionsContainer>(unstable),
- deno_flash::deno_flash::init_ops::<PermissionsContainer>(unstable),
deno_node::deno_node::init_ops::<crate::RuntimeNodeEnv>(
options.npm_resolver,
),