summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock64
-rw-r--r--Cargo.toml3
-rw-r--r--cli/bench/http/deno_http_serve_https.js18
-rw-r--r--cli/tests/unit/serve_test.ts359
-rw-r--r--cli/tests/unit/websocket_test.ts16
-rw-r--r--core/io.rs33
-rw-r--r--ext/http/00_serve.js534
-rw-r--r--ext/http/01_http.js257
-rw-r--r--ext/http/Cargo.toml8
-rw-r--r--ext/http/http_next.rs765
-rw-r--r--ext/http/lib.rs84
-rw-r--r--ext/http/request_body.rs84
-rw-r--r--ext/http/request_properties.rs249
-rw-r--r--ext/http/response_body.rs253
-rw-r--r--ext/net/Cargo.toml1
-rw-r--r--ext/net/lib.rs1
-rw-r--r--ext/net/ops_tls.rs29
-rw-r--r--ext/net/ops_unix.rs4
-rw-r--r--ext/net/raw.rs304
-rw-r--r--ext/websocket/Cargo.toml4
-rw-r--r--ext/websocket/lib.rs75
-rw-r--r--ext/websocket/stream.rs115
22 files changed, 2918 insertions, 342 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 114a6e0e8..ac188de53 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -728,7 +728,7 @@ dependencies = [
"fwdansi",
"glibc_version",
"http",
- "hyper",
+ "hyper 0.14.26",
"import_map 0.15.0",
"indexmap",
"jsonc-parser",
@@ -1022,11 +1022,14 @@ dependencies = [
"bytes",
"cache_control",
"deno_core",
+ "deno_net",
"deno_websocket",
"flate2",
"fly-accept-encoding",
+ "http",
"httparse",
- "hyper",
+ "hyper 0.14.26",
+ "hyper 1.0.0-rc.3",
"memmem",
"mime",
"once_cell",
@@ -1035,6 +1038,8 @@ dependencies = [
"pin-project",
"ring",
"serde",
+ "slab",
+ "thiserror",
"tokio",
"tokio-util",
]
@@ -1119,6 +1124,7 @@ dependencies = [
"deno_core",
"deno_tls",
"log",
+ "pin-project",
"serde",
"socket2",
"tokio",
@@ -1242,7 +1248,7 @@ dependencies = [
"fs3",
"fwdansi",
"http",
- "hyper",
+ "hyper 0.14.26",
"libc",
"log",
"netif",
@@ -1345,11 +1351,13 @@ dependencies = [
name = "deno_websocket"
version = "0.104.0"
dependencies = [
+ "bytes",
"deno_core",
+ "deno_net",
"deno_tls",
"fastwebsockets",
"http",
- "hyper",
+ "hyper 0.14.26",
"serde",
"tokio",
"tokio-rustls",
@@ -1794,13 +1802,13 @@ dependencies = [
[[package]]
name = "fastwebsockets"
-version = "0.2.5"
+version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a9e973e2bd2dbd77cc9e929ede2ce65984a35ac5481976afbfbd509cb40dc965"
+checksum = "2fbc4aeb6c0ab927a93b5e5fc70d4c7f834260fc414021ac40c58d046ea0e394"
dependencies = [
"base64 0.21.0",
"cc",
- "hyper",
+ "hyper 0.14.26",
"pin-project",
"rand",
"sha1",
@@ -2238,6 +2246,16 @@ dependencies = [
]
[[package]]
+name = "http-body"
+version = "1.0.0-rc.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "951dfc2e32ac02d67c90c0d65bd27009a635dc9b381a2cc7d284ab01e3a0150d"
+dependencies = [
+ "bytes",
+ "http",
+]
+
+[[package]]
name = "httparse"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2267,7 +2285,7 @@ dependencies = [
"futures-util",
"h2",
"http",
- "http-body",
+ "http-body 0.4.5",
"httparse",
"httpdate",
"itoa",
@@ -2280,13 +2298,35 @@ dependencies = [
]
[[package]]
+name = "hyper"
+version = "1.0.0-rc.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b75264b2003a3913f118d35c586e535293b3e22e41f074930762929d071e092"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body 1.0.0-rc.2",
+ "httparse",
+ "httpdate",
+ "itoa",
+ "pin-project-lite",
+ "tokio",
+ "tracing",
+ "want",
+]
+
+[[package]]
name = "hyper-rustls"
version = "0.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
dependencies = [
"http",
- "hyper",
+ "hyper 0.14.26",
"rustls",
"tokio",
"tokio-rustls",
@@ -3614,8 +3654,8 @@ dependencies = [
"futures-util",
"h2",
"http",
- "http-body",
- "hyper",
+ "http-body 0.4.5",
+ "hyper 0.14.26",
"hyper-rustls",
"ipnet",
"js-sys",
@@ -4870,7 +4910,7 @@ dependencies = [
"fastwebsockets",
"flate2",
"futures",
- "hyper",
+ "hyper 0.14.26",
"lazy-regex",
"lsp-types",
"nix",
diff --git a/Cargo.toml b/Cargo.toml
index aa12e1629..9edd7f835 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -91,7 +91,7 @@ data-url = "=0.2.0"
dlopen = "0.1.8"
encoding_rs = "=0.8.31"
ecb = "=0.1.1"
-fastwebsockets = "=0.2.5"
+fastwebsockets = "=0.2.6"
flate2 = "=1.0.24"
fs3 = "0.5.0"
futures = "0.3.21"
@@ -126,6 +126,7 @@ serde_json = "1.0.85"
serde_repr = "=0.1.9"
sha2 = { version = "0.10.6", features = ["oid"] }
signature = "=1.6.4"
+slab = "0.4"
smallvec = "1.8"
socket2 = "0.4.7"
tar = "=0.4.38"
diff --git a/cli/bench/http/deno_http_serve_https.js b/cli/bench/http/deno_http_serve_https.js
new file mode 100644
index 000000000..cea659e09
--- /dev/null
+++ b/cli/bench/http/deno_http_serve_https.js
@@ -0,0 +1,18 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+const addr = Deno.args[0] ?? "127.0.0.1:4500";
+const [hostname, port] = addr.split(":");
+const { serve } = Deno;
+
+function readFileSync(file) {
+ return Deno.readTextFileSync(new URL(file, import.meta.url).pathname);
+}
+
+const CERT = readFileSync("../../tests/testdata/tls/localhost.crt");
+const KEY = readFileSync("../../tests/testdata/tls/localhost.key");
+
+function handler() {
+ return new Response("Hello World");
+}
+
+serve(handler, { hostname, port, reusePort: true, cert: CERT, key: KEY });
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts
index 32d436d04..8344f1be5 100644
--- a/cli/tests/unit/serve_test.ts
+++ b/cli/tests/unit/serve_test.ts
@@ -2,6 +2,7 @@
// deno-lint-ignore-file
+import { assertMatch } from "https://deno.land/std@v0.42.0/testing/asserts.ts";
import { Buffer, BufReader, BufWriter } from "../../../test_util/std/io/mod.ts";
import { TextProtoReader } from "../testdata/run/textproto.ts";
import {
@@ -31,6 +32,27 @@ function onListen<T>(
};
}
+Deno.test(async function httpServerShutsDownPortBeforeResolving() {
+ const ac = new AbortController();
+ const listeningPromise = deferred();
+
+ const server = Deno.serve({
+ handler: (_req) => new Response("ok"),
+ port: 4501,
+ signal: ac.signal,
+ onListen: onListen(listeningPromise),
+ });
+
+ await listeningPromise;
+ assertThrows(() => Deno.listen({ port: 4501 }));
+
+ ac.abort();
+ await server;
+
+ const listener = Deno.listen({ port: 4501 });
+ listener!.close();
+});
+
Deno.test(async function httpServerCanResolveHostnames() {
const ac = new AbortController();
const listeningPromise = deferred();
@@ -120,6 +142,71 @@ Deno.test({ permissions: { net: true } }, async function httpServerBasic() {
await server;
});
+Deno.test({ permissions: { net: true } }, async function httpServerOnError() {
+ const ac = new AbortController();
+ const promise = deferred();
+ const listeningPromise = deferred();
+ let requestStash: Request | null;
+
+ const server = Deno.serve({
+ handler: async (request: Request) => {
+ requestStash = request;
+ await new Promise((r) => setTimeout(r, 100));
+ throw "fail";
+ },
+ port: 4501,
+ signal: ac.signal,
+ onListen: onListen(listeningPromise),
+ onError: () => {
+ return new Response("failed: " + requestStash!.url, { status: 500 });
+ },
+ });
+
+ await listeningPromise;
+ const resp = await fetch("http://127.0.0.1:4501/", {
+ headers: { "connection": "close" },
+ });
+ const text = await resp.text();
+ ac.abort();
+ await server;
+
+ assertEquals(text, "failed: http://127.0.0.1:4501/");
+});
+
+Deno.test(
+ { permissions: { net: true } },
+ async function httpServerOnErrorFails() {
+ const ac = new AbortController();
+ const promise = deferred();
+ const listeningPromise = deferred();
+ let requestStash: Request | null;
+
+ const server = Deno.serve({
+ handler: async (request: Request) => {
+ requestStash = request;
+ await new Promise((r) => setTimeout(r, 100));
+ throw "fail";
+ },
+ port: 4501,
+ signal: ac.signal,
+ onListen: onListen(listeningPromise),
+ onError: () => {
+ throw "again";
+ },
+ });
+
+ await listeningPromise;
+ const resp = await fetch("http://127.0.0.1:4501/", {
+ headers: { "connection": "close" },
+ });
+ const text = await resp.text();
+ ac.abort();
+ await server;
+
+ assertEquals(text, "Internal Server Error");
+ },
+);
+
Deno.test({ permissions: { net: true } }, async function httpServerOverload1() {
const ac = new AbortController();
const promise = deferred();
@@ -238,7 +325,7 @@ Deno.test(
console.log = (msg) => {
try {
const match = msg.match(/Listening on http:\/\/localhost:(\d+)\//);
- assert(!!match);
+ assert(!!match, `Didn't match ${msg}`);
const port = +match[1];
assert(port > 0 && port < 65536);
} finally {
@@ -301,6 +388,109 @@ Deno.test(
},
);
+function createUrlTest(
+ name: string,
+ methodAndPath: string,
+ host: string | null,
+ expected: string,
+) {
+ Deno.test(`httpServerUrl${name}`, async () => {
+ const listeningPromise: Deferred<number> = deferred();
+ const urlPromise = deferred();
+ const ac = new AbortController();
+ const server = Deno.serve({
+ handler: async (request: Request) => {
+ urlPromise.resolve(request.url);
+ return new Response("");
+ },
+ port: 0,
+ signal: ac.signal,
+ onListen: ({ port }: { port: number }) => {
+ listeningPromise.resolve(port);
+ },
+ onError: createOnErrorCb(ac),
+ });
+
+ const port = await listeningPromise;
+ const conn = await Deno.connect({ port });
+
+ const encoder = new TextEncoder();
+ const body = `${methodAndPath} HTTP/1.1\r\n${
+ host ? ("Host: " + host + "\r\n") : ""
+ }Content-Length: 5\r\n\r\n12345`;
+ const writeResult = await conn.write(encoder.encode(body));
+ assertEquals(body.length, writeResult);
+
+ try {
+ const expectedResult = expected.replace("HOST", "localhost").replace(
+ "PORT",
+ `${port}`,
+ );
+ assertEquals(await urlPromise, expectedResult);
+ } finally {
+ ac.abort();
+ await server;
+ conn.close();
+ }
+ });
+}
+
+createUrlTest("WithPath", "GET /path", null, "http://HOST:PORT/path");
+createUrlTest(
+ "WithPathAndHost",
+ "GET /path",
+ "deno.land",
+ "http://deno.land/path",
+);
+createUrlTest(
+ "WithAbsolutePath",
+ "GET http://localhost/path",
+ null,
+ "http://localhost/path",
+);
+createUrlTest(
+ "WithAbsolutePathAndHost",
+ "GET http://localhost/path",
+ "deno.land",
+ "http://localhost/path",
+);
+createUrlTest(
+ "WithPortAbsolutePath",
+ "GET http://localhost:1234/path",
+ null,
+ "http://localhost:1234/path",
+);
+createUrlTest(
+ "WithPortAbsolutePathAndHost",
+ "GET http://localhost:1234/path",
+ "deno.land",
+ "http://localhost:1234/path",
+);
+createUrlTest(
+ "WithPortAbsolutePathAndHostWithPort",
+ "GET http://localhost:1234/path",
+ "deno.land:9999",
+ "http://localhost:1234/path",
+);
+
+createUrlTest("WithAsterisk", "OPTIONS *", null, "*");
+createUrlTest(
+ "WithAuthorityForm",
+ "CONNECT deno.land:80",
+ null,
+ "deno.land:80",
+);
+
+// TODO(mmastrac): These should probably be 400 errors
+createUrlTest("WithInvalidAsterisk", "GET *", null, "*");
+createUrlTest("WithInvalidNakedPath", "GET path", null, "path");
+createUrlTest(
+ "WithInvalidNakedAuthority",
+ "GET deno.land:1234",
+ null,
+ "deno.land:1234",
+);
+
Deno.test(
{ permissions: { net: true } },
async function httpServerGetRequestBody() {
@@ -536,7 +726,10 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
response,
socket,
} = Deno.upgradeWebSocket(request);
- socket.onerror = () => fail();
+ socket.onerror = (e) => {
+ console.error(e);
+ fail();
+ };
socket.onmessage = (m) => {
socket.send(m.data);
socket.close(1001);
@@ -553,7 +746,10 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
const def = deferred();
const ws = new WebSocket("ws://localhost:4501");
ws.onmessage = (m) => assertEquals(m.data, "foo");
- ws.onerror = () => fail();
+ ws.onerror = (e) => {
+ console.error(e);
+ fail();
+ };
ws.onclose = () => def.resolve();
ws.onopen = () => ws.send("foo");
@@ -564,6 +760,50 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
Deno.test(
{ permissions: { net: true } },
+ async function httpServerWebSocketCanAccessRequest() {
+ const ac = new AbortController();
+ const listeningPromise = deferred();
+ const server = Deno.serve({
+ handler: async (request) => {
+ const {
+ response,
+ socket,
+ } = Deno.upgradeWebSocket(request);
+ socket.onerror = (e) => {
+ console.error(e);
+ fail();
+ };
+ socket.onmessage = (m) => {
+ socket.send(request.url.toString());
+ socket.close(1001);
+ };
+ return response;
+ },
+ port: 4501,
+ signal: ac.signal,
+ onListen: onListen(listeningPromise),
+ onError: createOnErrorCb(ac),
+ });
+
+ await listeningPromise;
+ const def = deferred();
+ const ws = new WebSocket("ws://localhost:4501");
+ ws.onmessage = (m) => assertEquals(m.data, "http://localhost:4501/");
+ ws.onerror = (e) => {
+ console.error(e);
+ fail();
+ };
+ ws.onclose = () => def.resolve();
+ ws.onopen = () => ws.send("foo");
+
+ await def;
+ ac.abort();
+ await server;
+ },
+);
+
+Deno.test(
+ { permissions: { net: true } },
async function httpVeryLargeRequest() {
const promise = deferred();
const listeningPromise = deferred();
@@ -682,47 +922,46 @@ Deno.test(
},
);
-// FIXME: auto request body reading is intefering with passing it as response.
-// Deno.test(
-// { permissions: { net: true } },
-// async function httpServerStreamDuplex() {
-// const promise = deferred();
-// const ac = new AbortController();
-
-// const server = Deno.serve(request => {
-// assert(request.body);
-
-// promise.resolve();
-// return new Response(request.body);
-// }, { port: 2333, signal: ac.signal });
-
-// const ts = new TransformStream();
-// const writable = ts.writable.getWriter();
-
-// const resp = await fetch("http://127.0.0.1:2333/", {
-// method: "POST",
-// body: ts.readable,
-// });
-
-// await promise;
-// assert(resp.body);
-// const reader = resp.body.getReader();
-// await writable.write(new Uint8Array([1]));
-// const chunk1 = await reader.read();
-// assert(!chunk1.done);
-// assertEquals(chunk1.value, new Uint8Array([1]));
-// await writable.write(new Uint8Array([2]));
-// const chunk2 = await reader.read();
-// assert(!chunk2.done);
-// assertEquals(chunk2.value, new Uint8Array([2]));
-// await writable.close();
-// const chunk3 = await reader.read();
-// assert(chunk3.done);
-
-// ac.abort();
-// await server;
-// },
-// );
+Deno.test(
+ { permissions: { net: true } },
+ async function httpServerStreamDuplex() {
+ const promise = deferred();
+ const ac = new AbortController();
+
+ const server = Deno.serve((request) => {
+ assert(request.body);
+
+ promise.resolve();
+ return new Response(request.body);
+ }, { port: 2333, signal: ac.signal });
+
+ const ts = new TransformStream();
+ const writable = ts.writable.getWriter();
+
+ const resp = await fetch("http://127.0.0.1:2333/", {
+ method: "POST",
+ body: ts.readable,
+ });
+
+ await promise;
+ assert(resp.body);
+ const reader = resp.body.getReader();
+ await writable.write(new Uint8Array([1]));
+ const chunk1 = await reader.read();
+ assert(!chunk1.done);
+ assertEquals(chunk1.value, new Uint8Array([1]));
+ await writable.write(new Uint8Array([2]));
+ const chunk2 = await reader.read();
+ assert(!chunk2.done);
+ assertEquals(chunk2.value, new Uint8Array([2]));
+ await writable.close();
+ const chunk3 = await reader.read();
+ assert(chunk3.done);
+
+ ac.abort();
+ await server;
+ },
+);
Deno.test(
{ permissions: { net: true } },
@@ -867,10 +1106,10 @@ Deno.test(
let responseText = new TextDecoder("iso-8859-1").decode(buf);
clientConn.close();
- assert(/\r\n[Xx]-[Hh]eader-[Tt]est: Æ\r\n/.test(responseText));
-
ac.abort();
await server;
+
+ assertMatch(responseText, /\r\n[Xx]-[Hh]eader-[Tt]est: Æ\r\n/);
},
);
@@ -1355,12 +1594,11 @@ createServerLengthTest("autoResponseWithKnownLengthEmpty", {
expects_con_len: true,
});
-// FIXME: https://github.com/denoland/deno/issues/15892
-// createServerLengthTest("autoResponseWithUnknownLengthEmpty", {
-// body: stream(""),
-// expects_chunked: true,
-// expects_con_len: false,
-// });
+createServerLengthTest("autoResponseWithUnknownLengthEmpty", {
+ body: stream(""),
+ expects_chunked: true,
+ expects_con_len: false,
+});
Deno.test(
{ permissions: { net: true } },
@@ -1841,6 +2079,7 @@ Deno.test(
method: "GET",
headers: { "connection": "close" },
});
+ assertEquals(resp.status, 204);
assertEquals(resp.headers.get("Content-Length"), null);
} finally {
ac.abort();
@@ -2162,11 +2401,11 @@ Deno.test(
count++;
return new Response(`hello world ${count}`);
}, {
- async onListen() {
- const res1 = await fetch("http://localhost:9000/");
+ async onListen({ port }: { port: number }) {
+ const res1 = await fetch(`http://localhost:${port}/`);
assertEquals(await res1.text(), "hello world 1");
- const res2 = await fetch("http://localhost:9000/");
+ const res2 = await fetch(`http://localhost:${port}/`);
assertEquals(await res2.text(), "hello world 2");
promise.resolve();
@@ -2199,13 +2438,13 @@ Deno.test(
return new Response("ok");
},
signal: ac.signal,
- onListen: onListen(listeningPromise),
+ onListen: ({ port }: { port: number }) => listeningPromise.resolve(port),
onError: createOnErrorCb(ac),
});
try {
- await listeningPromise;
- const resp = await fetch("http://localhost:9000/", {
+ const port = await listeningPromise;
+ const resp = await fetch(`http://localhost:${port}/`, {
headers: { connection: "close" },
method: "POST",
body: '{"sus":true}',
@@ -2238,8 +2477,8 @@ Deno.test(
},
}),
), {
- async onListen() {
- const res1 = await fetch("http://localhost:9000/");
+ async onListen({ port }) {
+ const res1 = await fetch(`http://localhost:${port}/`);
assertEquals((await res1.text()).length, 40 * 50_000);
promise.resolve();
diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts
index 997d8f0df..999eede41 100644
--- a/cli/tests/unit/websocket_test.ts
+++ b/cli/tests/unit/websocket_test.ts
@@ -43,6 +43,22 @@ Deno.test(async function websocketPingPong() {
ws.close();
});
+// TODO(mmastrac): This requires us to ignore bad certs
+// Deno.test(async function websocketSecureConnect() {
+// const promise = deferred();
+// const ws = new WebSocket("wss://localhost:4243/");
+// assertEquals(ws.url, "wss://localhost:4243/");
+// ws.onerror = (error) => {
+// console.log(error);
+// fail();
+// };
+// ws.onopen = () => ws.close();
+// ws.onclose = () => {
+// promise.resolve();
+// };
+// await promise;
+// });
+
// https://github.com/denoland/deno/issues/18700
Deno.test(
{ sanitizeOps: false, sanitizeResources: false },
diff --git a/core/io.rs b/core/io.rs
index 103fe79c1..567d50bd4 100644
--- a/core/io.rs
+++ b/core/io.rs
@@ -3,6 +3,7 @@
use std::ops::Deref;
use std::ops::DerefMut;
+use bytes::Buf;
use serde_v8::ZeroCopyBuf;
/// BufView is a wrapper around an underlying contiguous chunk of bytes. It can
@@ -26,11 +27,11 @@ enum BufViewInner {
}
impl BufView {
- fn from_inner(inner: BufViewInner) -> Self {
+ const fn from_inner(inner: BufViewInner) -> Self {
Self { inner, cursor: 0 }
}
- pub fn empty() -> Self {
+ pub const fn empty() -> Self {
Self::from_inner(BufViewInner::Empty)
}
@@ -65,6 +66,20 @@ impl BufView {
}
}
+impl Buf for BufView {
+ fn remaining(&self) -> usize {
+ self.len()
+ }
+
+ fn chunk(&self) -> &[u8] {
+ self.deref()
+ }
+
+ fn advance(&mut self, cnt: usize) {
+ self.advance_cursor(cnt)
+ }
+}
+
impl Deref for BufView {
type Target = [u8];
@@ -210,6 +225,20 @@ impl BufMutView {
}
}
+impl Buf for BufMutView {
+ fn remaining(&self) -> usize {
+ self.len()
+ }
+
+ fn chunk(&self) -> &[u8] {
+ self.deref()
+ }
+
+ fn advance(&mut self, cnt: usize) {
+ self.advance_cursor(cnt)
+ }
+}
+
impl Deref for BufMutView {
type Target = [u8];
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
new file mode 100644
index 000000000..91bd36094
--- /dev/null
+++ b/ext/http/00_serve.js
@@ -0,0 +1,534 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+const core = globalThis.Deno.core;
+const primordials = globalThis.__bootstrap.primordials;
+
+const { BadResourcePrototype } = core;
+import { InnerBody } from "ext:deno_fetch/22_body.js";
+import { Event } from "ext:deno_web/02_event.js";
+import {
+ fromInnerResponse,
+ newInnerResponse,
+ toInnerResponse,
+} from "ext:deno_fetch/23_response.js";
+import { fromInnerRequest } from "ext:deno_fetch/23_request.js";
+import { AbortController } from "ext:deno_web/03_abort_signal.js";
+import {
+ _eventLoop,
+ _idleTimeoutDuration,
+ _idleTimeoutTimeout,
+ _protocol,
+ _readyState,
+ _rid,
+ _role,
+ _server,
+ _serverHandleIdleTimeout,
+ SERVER,
+ WebSocket,
+} from "ext:deno_websocket/01_websocket.js";
+import {
+ Deferred,
+ getReadableStreamResourceBacking,
+ readableStreamForRid,
+ ReadableStreamPrototype,
+} from "ext:deno_web/06_streams.js";
+const {
+ ObjectPrototypeIsPrototypeOf,
+ SafeSet,
+ SafeSetIterator,
+ SetPrototypeAdd,
+ SetPrototypeDelete,
+ Symbol,
+ TypeError,
+ Uint8ArrayPrototype,
+ Uint8Array,
+} = primordials;
+
+const _upgraded = Symbol("_upgraded");
+
+function internalServerError() {
+ // "Internal Server Error"
+ return new Response(
+ new Uint8Array([
+ 73,
+ 110,
+ 116,
+ 101,
+ 114,
+ 110,
+ 97,
+ 108,
+ 32,
+ 83,
+ 101,
+ 114,
+ 118,
+ 101,
+ 114,
+ 32,
+ 69,
+ 114,
+ 114,
+ 111,
+ 114,
+ ]),
+ { status: 500 },
+ );
+}
+
+// Used to ensure that user returns a valid response (but not a different response) from handlers that are upgraded.
+const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse(
+ newInnerResponse(101),
+ "immutable",
+);
+
+class InnerRequest {
+ #slabId;
+ #context;
+ #methodAndUri;
+ #streamRid;
+ #body;
+ #upgraded;
+
+ constructor(slabId, context) {
+ this.#slabId = slabId;
+ this.#context = context;
+ this.#upgraded = false;
+ }
+
+ close() {
+ if (this.#streamRid !== undefined) {
+ core.close(this.#streamRid);
+ this.#streamRid = undefined;
+ }
+ this.#slabId = undefined;
+ }
+
+ get [_upgraded]() {
+ return this.#upgraded;
+ }
+
+ _wantsUpgrade(upgradeType, ...originalArgs) {
+ // upgradeHttp is async
+ // TODO(mmastrac)
+ if (upgradeType == "upgradeHttp") {
+ throw "upgradeHttp is unavailable in Deno.serve at this time";
+ }
+
+ // upgradeHttpRaw is async
+ // TODO(mmastrac)
+ if (upgradeType == "upgradeHttpRaw") {
+ throw "upgradeHttp is unavailable in Deno.serve at this time";
+ }
+
+ // upgradeWebSocket is sync
+ if (upgradeType == "upgradeWebSocket") {
+ const response = originalArgs[0];
+ const ws = originalArgs[1];
+
+ this.url();
+ this.headerList;
+ this.close();
+
+ const goAhead = new Deferred();
+ this.#upgraded = () => {
+ goAhead.resolve();
+ };
+
+ // Start the upgrade in the background.
+ (async () => {
+ try {
+ // Returns the connection and extra bytes, which we can pass directly to op_ws_server_create
+ const upgrade = await core.opAsync2(
+ "op_upgrade",
+ this.#slabId,
+ response.headerList,
+ );
+ const wsRid = core.ops.op_ws_server_create(upgrade[0], upgrade[1]);
+
+ // We have to wait for the go-ahead signal
+ await goAhead;
+
+ ws[_rid] = wsRid;
+ ws[_readyState] = WebSocket.OPEN;
+ ws[_role] = SERVER;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
+
+ ws[_eventLoop]();
+ if (ws[_idleTimeoutDuration]) {
+ ws.addEventListener(
+ "close",
+ () => clearTimeout(ws[_idleTimeoutTimeout]),
+ );
+ }
+ ws[_serverHandleIdleTimeout]();
+ } catch (error) {
+ const event = new ErrorEvent("error", { error });
+ ws.dispatchEvent(event);
+ }
+ })();
+ return { response: UPGRADE_RESPONSE_SENTINEL, socket: ws };
+ }
+ }
+
+ url() {
+ if (this.#methodAndUri === undefined) {
+ if (this.#slabId === undefined) {
+ throw new TypeError("request closed");
+ }
+ // TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider
+ // splitting this up into multiple ops.
+ this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId);
+ }
+
+ const path = this.#methodAndUri[2];
+
+ // * is valid for OPTIONS
+ if (path === "*") {
+ return "*";
+ }
+
+ // If the path is empty, return the authority (valid for CONNECT)
+ if (path == "") {
+ return this.#methodAndUri[1];
+ }
+
+ // CONNECT requires an authority
+ if (this.#methodAndUri[0] == "CONNECT") {
+ return this.#methodAndUri[1];
+ }
+
+ const hostname = this.#methodAndUri[1];
+ if (hostname) {
+ // Construct a URL from the scheme, the hostname, and the path
+ return this.#context.scheme + hostname + path;
+ }
+
+ // Construct a URL from the scheme, the fallback hostname, and the path
+ return this.#context.scheme + this.#context.fallbackHost + path;
+ }
+
+ get remoteAddr() {
+ if (this.#methodAndUri === undefined) {
+ if (this.#slabId === undefined) {
+ throw new TypeError("request closed");
+ }
+ this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId);
+ }
+ return {
+ transport: "tcp",
+ hostname: this.#methodAndUri[3],
+ port: this.#methodAndUri[4],
+ };
+ }
+
+ get method() {
+ if (this.#methodAndUri === undefined) {
+ if (this.#slabId === undefined) {
+ throw new TypeError("request closed");
+ }
+ this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId);
+ }
+ return this.#methodAndUri[0];
+ }
+
+ get body() {
+ if (this.#slabId === undefined) {
+ throw new TypeError("request closed");
+ }
+ if (this.#body !== undefined) {
+ return this.#body;
+ }
+ // If the method is GET or HEAD, we do not want to include a body here, even if the Rust
+ // side of the code is willing to provide it to us.
+ if (this.method == "GET" || this.method == "HEAD") {
+ this.#body = null;
+ return null;
+ }
+ this.#streamRid = core.ops.op_read_request_body(this.#slabId);
+ this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false));
+ return this.#body;
+ }
+
+ get headerList() {
+ if (this.#slabId === undefined) {
+ throw new TypeError("request closed");
+ }
+ return core.ops.op_get_request_headers(this.#slabId);
+ }
+
+ get slabId() {
+ return this.#slabId;
+ }
+}
+
+class CallbackContext {
+ scheme;
+ fallbackHost;
+ serverRid;
+ closed;
+
+ initialize(args) {
+ this.serverRid = args[0];
+ this.scheme = args[1];
+ this.fallbackHost = args[2];
+ this.closed = false;
+ }
+
+ close() {
+ try {
+ this.closed = true;
+ core.tryClose(this.serverRid);
+ } catch {
+ // Pass
+ }
+ }
+}
+
+function fastSyncResponseOrStream(req, respBody) {
+ if (respBody === null || respBody === undefined) {
+ // Don't set the body
+ return null;
+ }
+
+ const stream = respBody.streamOrStatic;
+ const body = stream.body;
+
+ if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
+ core.ops.op_set_response_body_bytes(req, body);
+ return null;
+ }
+
+ if (typeof body === "string") {
+ core.ops.op_set_response_body_text(req, body);
+ return null;
+ }
+
+ // At this point in the response it needs to be a stream
+ if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) {
+ throw TypeError("invalid response");
+ }
+ const resourceBacking = getReadableStreamResourceBacking(stream);
+ if (resourceBacking) {
+ core.ops.op_set_response_body_resource(
+ req,
+ resourceBacking.rid,
+ resourceBacking.autoClose,
+ );
+ return null;
+ }
+
+ return stream;
+}
+
+async function asyncResponse(responseBodies, req, status, stream) {
+ const responseRid = core.ops.op_set_response_body_stream(req);
+ SetPrototypeAdd(responseBodies, responseRid);
+ const reader = stream.getReader();
+ core.ops.op_set_promise_complete(req, status);
+ try {
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) {
+ break;
+ }
+ await core.writeAll(responseRid, value);
+ }
+ } catch (error) {
+ await reader.cancel(error);
+ } finally {
+ core.tryClose(responseRid);
+ SetPrototypeDelete(responseBodies, responseRid);
+ reader.releaseLock();
+ }
+}
+
+/**
+ * Maps the incoming request slab ID to a fully-fledged Request object, passes it to the user-provided
+ * callback, then extracts the response that was returned from that callback. The response is then pulled
+ * apart and handled on the Rust side.
+ *
+ * This function returns a promise that will only reject in the case of abnormal exit.
+ */
+function mapToCallback(responseBodies, context, signal, callback, onError) {
+ return async function (req) {
+ const innerRequest = new InnerRequest(req, context);
+ const request = fromInnerRequest(innerRequest, signal, "immutable");
+
+ // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
+ // 500 error.
+ let response;
+ try {
+ response = await callback(request, {
+ remoteAddr: innerRequest.remoteAddr,
+ });
+ } catch (error) {
+ try {
+ response = await onError(error);
+ } catch (error) {
+ console.error("Exception in onError while handling exception", error);
+ response = internalServerError();
+ }
+ }
+
+ const inner = toInnerResponse(response);
+ if (innerRequest[_upgraded]) {
+ // We're done here as the connection has been upgraded during the callback and no longer requires servicing.
+ if (response !== UPGRADE_RESPONSE_SENTINEL) {
+ console.error("Upgrade response was not returned from callback");
+ context.close();
+ }
+ innerRequest[_upgraded]();
+ return;
+ }
+
+ // Did everything shut down while we were waiting?
+ if (context.closed) {
+ innerRequest.close();
+ return;
+ }
+
+ const status = inner.status;
+ const headers = inner.headerList;
+ if (headers && headers.length > 0) {
+ if (headers.length == 1) {
+ core.ops.op_set_response_header(req, headers[0][0], headers[0][1]);
+ } else {
+ core.ops.op_set_response_headers(req, headers);
+ }
+ }
+
+ // Attempt to response quickly to this request, otherwise extract the stream
+ const stream = fastSyncResponseOrStream(req, inner.body);
+ if (stream !== null) {
+ // Handle the stream asynchronously
+ await asyncResponse(responseBodies, req, status, stream);
+ } else {
+ core.ops.op_set_promise_complete(req, status);
+ }
+
+ innerRequest.close();
+ };
+}
+
+async function serve(arg1, arg2) {
+ let options = undefined;
+ let handler = undefined;
+ if (typeof arg1 === "function") {
+ handler = arg1;
+ options = arg2;
+ } else if (typeof arg2 === "function") {
+ handler = arg2;
+ options = arg1;
+ } else {
+ options = arg1;
+ }
+ if (handler === undefined) {
+ if (options === undefined) {
+ throw new TypeError(
+ "No handler was provided, so an options bag is mandatory.",
+ );
+ }
+ handler = options.handler;
+ }
+ if (typeof handler !== "function") {
+ throw new TypeError("A handler function must be provided.");
+ }
+ if (options === undefined) {
+ options = {};
+ }
+
+ const wantsHttps = options.cert || options.key;
+ const signal = options.signal;
+ const onError = options.onError ?? function (error) {
+ console.error(error);
+ return internalServerError();
+ };
+ const listenOpts = {
+ hostname: options.hostname ?? "0.0.0.0",
+ port: options.port ?? (wantsHttps ? 9000 : 8000),
+ reusePort: options.reusePort ?? false,
+ };
+
+ const abortController = new AbortController();
+
+ const responseBodies = new SafeSet();
+ const context = new CallbackContext();
+ const callback = mapToCallback(
+ responseBodies,
+ context,
+ abortController.signal,
+ handler,
+ onError,
+ );
+
+ if (wantsHttps) {
+ if (!options.cert || !options.key) {
+ throw new TypeError(
+ "Both cert and key must be provided to enable HTTPS.",
+ );
+ }
+ listenOpts.cert = options.cert;
+ listenOpts.key = options.key;
+ listenOpts.alpnProtocols = ["h2", "http/1.1"];
+ const listener = Deno.listenTls(listenOpts);
+ listenOpts.port = listener.addr.port;
+ context.initialize(core.ops.op_serve_http(
+ listener.rid,
+ ));
+ } else {
+ const listener = Deno.listen(listenOpts);
+ listenOpts.port = listener.addr.port;
+ context.initialize(core.ops.op_serve_http(
+ listener.rid,
+ ));
+ }
+
+ signal?.addEventListener(
+ "abort",
+ () => context.close(),
+ { once: true },
+ );
+
+ const onListen = options.onListen ?? function ({ port }) {
+ // If the hostname is "0.0.0.0", we display "localhost" in console
+ // because browsers in Windows don't resolve "0.0.0.0".
+ // See the discussion in https://github.com/denoland/deno_std/issues/1165
+ const hostname = listenOpts.hostname == "0.0.0.0"
+ ? "localhost"
+ : listenOpts.hostname;
+ console.log(`Listening on ${context.scheme}${hostname}:${port}/`);
+ };
+
+ onListen({ port: listenOpts.port });
+
+ while (true) {
+ const rid = context.serverRid;
+ let req;
+ try {
+ req = await core.opAsync("op_http_wait", rid);
+ } catch (error) {
+ if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) {
+ break;
+ }
+ throw new Deno.errors.Http(error);
+ }
+ if (req === 0xffffffff) {
+ break;
+ }
+ callback(req).catch((error) => {
+ // Abnormal exit
+ console.error(
+ "Terminating Deno.serve loop due to unexpected error",
+ error,
+ );
+ context.close();
+ });
+ }
+
+ for (const streamRid of new SafeSetIterator(responseBodies)) {
+ core.tryClose(streamRid);
+ }
+}
+
+export { serve };
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index 5bfa58655..95e2cee74 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -32,8 +32,8 @@ import {
SERVER,
WebSocket,
} from "ext:deno_websocket/01_websocket.js";
-import { listen, TcpConn, UnixConn } from "ext:deno_net/01_net.js";
-import { listenTls, TlsConn } from "ext:deno_net/02_tls.js";
+import { TcpConn, UnixConn } from "ext:deno_net/01_net.js";
+import { TlsConn } from "ext:deno_net/02_tls.js";
import {
Deferred,
getReadableStreamResourceBacking,
@@ -41,18 +41,17 @@ import {
readableStreamForRid,
ReadableStreamPrototype,
} from "ext:deno_web/06_streams.js";
+import { serve } from "ext:deno_http/00_serve.js";
const {
ArrayPrototypeIncludes,
ArrayPrototypeMap,
ArrayPrototypePush,
Error,
ObjectPrototypeIsPrototypeOf,
- PromisePrototypeCatch,
SafeSet,
SafeSetIterator,
SetPrototypeAdd,
SetPrototypeDelete,
- SetPrototypeClear,
StringPrototypeCharCodeAt,
StringPrototypeIncludes,
StringPrototypeToLowerCase,
@@ -406,6 +405,7 @@ const websocketCvf = buildCaseInsensitiveCommaValueFinder("websocket");
const upgradeCvf = buildCaseInsensitiveCommaValueFinder("upgrade");
function upgradeWebSocket(request, options = {}) {
+ const inner = toInnerRequest(request);
const upgrade = request.headers.get("upgrade");
const upgradeHasWebSocketOption = upgrade !== null &&
websocketCvf(upgrade);
@@ -455,25 +455,39 @@ function upgradeWebSocket(request, options = {}) {
}
}
- const response = fromInnerResponse(r, "immutable");
-
const socket = webidl.createBranded(WebSocket);
setEventTargetData(socket);
socket[_server] = true;
- response[_ws] = socket;
socket[_idleTimeoutDuration] = options.idleTimeout ?? 120;
socket[_idleTimeoutTimeout] = null;
+ if (inner._wantsUpgrade) {
+ return inner._wantsUpgrade("upgradeWebSocket", r, socket);
+ }
+
+ const response = fromInnerResponse(r, "immutable");
+
+ response[_ws] = socket;
+
return { response, socket };
}
function upgradeHttp(req) {
+ const inner = toInnerRequest(req);
+ if (inner._wantsUpgrade) {
+ return inner._wantsUpgrade("upgradeHttp", arguments);
+ }
+
req[_deferred] = new Deferred();
return req[_deferred].promise;
}
async function upgradeHttpRaw(req, tcpConn) {
const inner = toInnerRequest(req);
+ if (inner._wantsUpgrade) {
+ return inner._wantsUpgrade("upgradeHttpRaw", arguments);
+ }
+
const res = await core.opAsync("op_http_upgrade_early", inner[streamRid]);
return new TcpConn(res, tcpConn.remoteAddr, tcpConn.localAddr);
}
@@ -552,233 +566,4 @@ function buildCaseInsensitiveCommaValueFinder(checkText) {
internals.buildCaseInsensitiveCommaValueFinder =
buildCaseInsensitiveCommaValueFinder;
-function hostnameForDisplay(hostname) {
- // If the hostname is "0.0.0.0", we display "localhost" in console
- // because browsers in Windows don't resolve "0.0.0.0".
- // See the discussion in https://github.com/denoland/deno_std/issues/1165
- return hostname === "0.0.0.0" ? "localhost" : hostname;
-}
-
-async function respond(handler, requestEvent, connInfo, onError) {
- let response;
-
- try {
- response = await handler(requestEvent.request, connInfo);
-
- if (response.bodyUsed && response.body !== null) {
- throw new TypeError("Response body already consumed.");
- }
- } catch (e) {
- // Invoke `onError` handler if the request handler throws.
- response = await onError(e);
- }
-
- try {
- // Send the response.
- await requestEvent.respondWith(response);
- } catch {
- // `respondWith()` can throw for various reasons, including downstream and
- // upstream connection errors, as well as errors thrown during streaming
- // of the response content. In order to avoid false negatives, we ignore
- // the error here and let `serveHttp` close the connection on the
- // following iteration if it is in fact a downstream connection error.
- }
-}
-
-async function serveConnection(
- server,
- activeHttpConnections,
- handler,
- httpConn,
- connInfo,
- onError,
-) {
- while (!server.closed) {
- let requestEvent = null;
-
- try {
- // Yield the new HTTP request on the connection.
- requestEvent = await httpConn.nextRequest();
- } catch {
- // Connection has been closed.
- break;
- }
-
- if (requestEvent === null) {
- break;
- }
-
- respond(handler, requestEvent, connInfo, onError);
- }
-
- SetPrototypeDelete(activeHttpConnections, httpConn);
- try {
- httpConn.close();
- } catch {
- // Connection has already been closed.
- }
-}
-
-async function serve(arg1, arg2) {
- let options = undefined;
- let handler = undefined;
- if (typeof arg1 === "function") {
- handler = arg1;
- options = arg2;
- } else if (typeof arg2 === "function") {
- handler = arg2;
- options = arg1;
- } else {
- options = arg1;
- }
- if (handler === undefined) {
- if (options === undefined) {
- throw new TypeError(
- "No handler was provided, so an options bag is mandatory.",
- );
- }
- handler = options.handler;
- }
- if (typeof handler !== "function") {
- throw new TypeError("A handler function must be provided.");
- }
- if (options === undefined) {
- options = {};
- }
-
- const signal = options.signal;
- const onError = options.onError ?? function (error) {
- console.error(error);
- return new Response("Internal Server Error", { status: 500 });
- };
- const onListen = options.onListen ?? function ({ port }) {
- console.log(
- `Listening on http://${hostnameForDisplay(listenOpts.hostname)}:${port}/`,
- );
- };
- const listenOpts = {
- hostname: options.hostname ?? "127.0.0.1",
- port: options.port ?? 9000,
- reusePort: options.reusePort ?? false,
- };
-
- if (options.cert || options.key) {
- if (!options.cert || !options.key) {
- throw new TypeError(
- "Both cert and key must be provided to enable HTTPS.",
- );
- }
- listenOpts.cert = options.cert;
- listenOpts.key = options.key;
- }
-
- let listener;
- if (listenOpts.cert && listenOpts.key) {
- listener = listenTls({
- hostname: listenOpts.hostname,
- port: listenOpts.port,
- cert: listenOpts.cert,
- key: listenOpts.key,
- reusePort: listenOpts.reusePort,
- });
- } else {
- listener = listen({
- hostname: listenOpts.hostname,
- port: listenOpts.port,
- reusePort: listenOpts.reusePort,
- });
- }
-
- const serverDeferred = new Deferred();
- const activeHttpConnections = new SafeSet();
-
- const server = {
- transport: listenOpts.cert && listenOpts.key ? "https" : "http",
- hostname: listenOpts.hostname,
- port: listenOpts.port,
- closed: false,
-
- close() {
- if (server.closed) {
- return;
- }
- server.closed = true;
- try {
- listener.close();
- } catch {
- // Might have been already closed.
- }
-
- for (const httpConn of new SafeSetIterator(activeHttpConnections)) {
- try {
- httpConn.close();
- } catch {
- // Might have been already closed.
- }
- }
-
- SetPrototypeClear(activeHttpConnections);
- serverDeferred.resolve();
- },
-
- async serve() {
- while (!server.closed) {
- let conn;
-
- try {
- conn = await listener.accept();
- } catch {
- // Listener has been closed.
- if (!server.closed) {
- console.log("Listener has closed unexpectedly");
- }
- break;
- }
-
- let httpConn;
- try {
- const rid = ops.op_http_start(conn.rid);
- httpConn = new HttpConn(rid, conn.remoteAddr, conn.localAddr);
- } catch {
- // Connection has been closed;
- continue;
- }
-
- SetPrototypeAdd(activeHttpConnections, httpConn);
-
- const connInfo = {
- localAddr: conn.localAddr,
- remoteAddr: conn.remoteAddr,
- };
- // Serve the HTTP connection
- serveConnection(
- server,
- activeHttpConnections,
- handler,
- httpConn,
- connInfo,
- onError,
- );
- }
- await serverDeferred.promise;
- },
- };
-
- signal?.addEventListener(
- "abort",
- () => {
- try {
- server.close();
- } catch {
- // Pass
- }
- },
- { once: true },
- );
-
- onListen(listener.addr);
-
- await PromisePrototypeCatch(server.serve(), console.error);
-}
-
export { _ws, HttpConn, serve, upgradeHttp, upgradeHttpRaw, upgradeWebSocket };
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index 382fd3184..bb965d9b2 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -10,6 +10,9 @@ readme = "README.md"
repository.workspace = true
description = "HTTP server implementation for Deno"
+[features]
+"__zombie_http_tracking" = []
+
[lib]
path = "lib.rs"
@@ -24,11 +27,14 @@ brotli = "3.3.4"
bytes.workspace = true
cache_control.workspace = true
deno_core.workspace = true
+deno_net.workspace = true
deno_websocket.workspace = true
flate2.workspace = true
fly-accept-encoding = "0.2.0"
+http.workspace = true
httparse.workspace = true
hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] }
+hyper1 = { package = "hyper", features = ["full"], version = "1.0.0-rc.3" }
memmem.workspace = true
mime = "0.3.16"
once_cell.workspace = true
@@ -37,6 +43,8 @@ phf = { version = "0.10", features = ["macros"] }
pin-project.workspace = true
ring.workspace = true
serde.workspace = true
+slab.workspace = true
+thiserror.workspace = true
tokio.workspace = true
tokio-util = { workspace = true, features = ["io"] }
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
new file mode 100644
index 000000000..25088e1ab
--- /dev/null
+++ b/ext/http/http_next.rs
@@ -0,0 +1,765 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use crate::extract_network_stream;
+use crate::request_body::HttpRequestBody;
+use crate::request_properties::DefaultHttpRequestProperties;
+use crate::request_properties::HttpConnectionProperties;
+use crate::request_properties::HttpListenProperties;
+use crate::request_properties::HttpPropertyExtractor;
+use crate::response_body::CompletionHandle;
+use crate::response_body::ResponseBytes;
+use crate::response_body::ResponseBytesInner;
+use crate::response_body::V8StreamHttpResponseBody;
+use crate::LocalExecutor;
+use deno_core::error::AnyError;
+use deno_core::futures::TryFutureExt;
+use deno_core::op;
+use deno_core::AsyncRefCell;
+use deno_core::BufView;
+use deno_core::ByteString;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
+use deno_net::ops_tls::TlsStream;
+use deno_net::raw::put_network_stream_resource;
+use deno_net::raw::NetworkStream;
+use deno_net::raw::NetworkStreamAddress;
+use http::request::Parts;
+use hyper1::body::Incoming;
+use hyper1::header::COOKIE;
+use hyper1::http::HeaderName;
+use hyper1::http::HeaderValue;
+use hyper1::server::conn::http1;
+use hyper1::server::conn::http2;
+use hyper1::service::service_fn;
+use hyper1::upgrade::OnUpgrade;
+use hyper1::StatusCode;
+use pin_project::pin_project;
+use pin_project::pinned_drop;
+use slab::Slab;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::future::Future;
+use std::io;
+use std::net::Ipv4Addr;
+use std::net::SocketAddr;
+use std::net::SocketAddrV4;
+use std::pin::Pin;
+use std::rc::Rc;
+use tokio::task::spawn_local;
+use tokio::task::JoinHandle;
+
+type Request = hyper1::Request<Incoming>;
+type Response = hyper1::Response<ResponseBytes>;
+
+pub struct HttpSlabRecord {
+ request_info: HttpConnectionProperties,
+ request_parts: Parts,
+ request_body: Option<Incoming>,
+ // The response may get taken before we tear this down
+ response: Option<Response>,
+ body: Option<Rc<HttpRequestBody>>,
+ promise: CompletionHandle,
+ #[cfg(__zombie_http_tracking)]
+ alive: bool,
+}
+
+thread_local! {
+ pub static SLAB: RefCell<Slab<HttpSlabRecord>> = RefCell::new(Slab::with_capacity(1024));
+}
+
+/// Generates getters and setters for the [`SLAB`]. For example,
+/// `with!(with_req, with_req_mut, Parts, http, http.request_parts);` expands to:
+///
+/// ```ignore
+/// #[inline(always)]
+/// #[allow(dead_code)]
+/// pub(crate) fn with_req_mut<T>(key: usize, f: impl FnOnce(&mut Parts) -> T) -> T {
+/// SLAB.with(|slab| {
+/// let mut borrow = slab.borrow_mut();
+/// let mut http = borrow.get_mut(key).unwrap();
+/// #[cfg(__zombie_http_tracking)]
+/// if !http.alive {
+/// panic!("Attempted to access a dead HTTP object")
+/// }
+/// f(&mut http.expr)
+/// })
+/// }
+
+/// #[inline(always)]
+/// #[allow(dead_code)]
+/// pub(crate) fn with_req<T>(key: usize, f: impl FnOnce(&Parts) -> T) -> T {
+/// SLAB.with(|slab| {
+/// let mut borrow = slab.borrow();
+/// let mut http = borrow.get(key).unwrap();
+/// #[cfg(__zombie_http_tracking)]
+/// if !http.alive {
+/// panic!("Attempted to access a dead HTTP object")
+/// }
+/// f(&http.expr)
+/// })
+/// }
+/// ```
+macro_rules! with {
+ ($ref:ident, $mut:ident, $type:ty, $http:ident, $expr:expr) => {
+ #[inline(always)]
+ #[allow(dead_code)]
+ pub(crate) fn $mut<T>(key: usize, f: impl FnOnce(&mut $type) -> T) -> T {
+ SLAB.with(|slab| {
+ let mut borrow = slab.borrow_mut();
+ #[allow(unused_mut)] // TODO(mmastrac): compiler issue?
+ let mut $http = borrow.get_mut(key).unwrap();
+ #[cfg(__zombie_http_tracking)]
+ if !$http.alive {
+ panic!("Attempted to access a dead HTTP object")
+ }
+ f(&mut $expr)
+ })
+ }
+
+ #[inline(always)]
+ #[allow(dead_code)]
+ pub(crate) fn $ref<T>(key: usize, f: impl FnOnce(&$type) -> T) -> T {
+ SLAB.with(|slab| {
+ let borrow = slab.borrow();
+ let $http = borrow.get(key).unwrap();
+ #[cfg(__zombie_http_tracking)]
+ if !$http.alive {
+ panic!("Attempted to access a dead HTTP object")
+ }
+ f(&$expr)
+ })
+ }
+ };
+}
+
+with!(with_req, with_req_mut, Parts, http, http.request_parts);
+with!(
+ with_req_body,
+ with_req_body_mut,
+ Option<Incoming>,
+ http,
+ http.request_body
+);
+with!(
+ with_resp,
+ with_resp_mut,
+ Option<Response>,
+ http,
+ http.response
+);
+with!(
+ with_body,
+ with_body_mut,
+ Option<Rc<HttpRequestBody>>,
+ http,
+ http.body
+);
+with!(
+ with_promise,
+ with_promise_mut,
+ CompletionHandle,
+ http,
+ http.promise
+);
+with!(with_http, with_http_mut, HttpSlabRecord, http, http);
+
+fn slab_insert(
+ request: Request,
+ request_info: HttpConnectionProperties,
+) -> usize {
+ SLAB.with(|slab| {
+ let (request_parts, request_body) = request.into_parts();
+ slab.borrow_mut().insert(HttpSlabRecord {
+ request_info,
+ request_parts,
+ request_body: Some(request_body),
+ response: Some(Response::new(ResponseBytes::default())),
+ body: None,
+ promise: CompletionHandle::default(),
+ #[cfg(__zombie_http_tracking)]
+ alive: true,
+ })
+ })
+}
+
+#[op]
+pub fn op_upgrade_raw(_index: usize) {}
+
+#[op]
+pub async fn op_upgrade(
+ state: Rc<RefCell<OpState>>,
+ index: usize,
+ headers: Vec<(ByteString, ByteString)>,
+) -> Result<(ResourceId, ZeroCopyBuf), AnyError> {
+ // Stage 1: set the respnse to 101 Switching Protocols and send it
+ let upgrade = with_http_mut(index, |http| {
+ // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
+ let upgrade = http.request_parts.extensions.remove::<OnUpgrade>().unwrap();
+
+ let response = http.response.as_mut().unwrap();
+ *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
+ for (name, value) in headers {
+ response.headers_mut().append(
+ HeaderName::from_bytes(&name).unwrap(),
+ HeaderValue::from_bytes(&value).unwrap(),
+ );
+ }
+ http.promise.complete(true);
+ upgrade
+ });
+
+ // Stage 2: wait for the request to finish upgrading
+ let upgraded = upgrade.await?;
+
+ // Stage 3: return the extracted raw network stream
+ let (stream, bytes) = extract_network_stream(upgraded);
+
+ // We're allocating for those extra bytes, but they are probably going to be empty most of the time
+ Ok((
+ put_network_stream_resource(
+ &mut state.borrow_mut().resource_table,
+ stream,
+ )?,
+ ZeroCopyBuf::from(bytes.to_vec()),
+ ))
+}
+
+#[op]
+pub fn op_set_promise_complete(index: usize, status: u16) {
+ with_resp_mut(index, |resp| {
+ // The Javascript code will never provide a status that is invalid here (see 23_response.js)
+ *resp.as_mut().unwrap().status_mut() =
+ StatusCode::from_u16(status).unwrap();
+ });
+ with_promise_mut(index, |promise| {
+ promise.complete(true);
+ });
+}
+
+#[op]
+pub fn op_get_request_method_and_url(
+ index: usize,
+) -> (String, Option<String>, String, String, Option<u16>) {
+ // TODO(mmastrac): Passing method can be optimized
+ with_http(index, |http| {
+ let request_properties = DefaultHttpRequestProperties::request_properties(
+ &http.request_info,
+ &http.request_parts.uri,
+ &http.request_parts.headers,
+ );
+
+ // Only extract the path part - we handle authority elsewhere
+ let path = match &http.request_parts.uri.path_and_query() {
+ Some(path_and_query) => path_and_query.to_string(),
+ None => "".to_owned(),
+ };
+
+ (
+ http.request_parts.method.as_str().to_owned(),
+ request_properties.authority,
+ path,
+ String::from(http.request_info.peer_address.as_ref()),
+ http.request_info.peer_port,
+ )
+ })
+}
+
+#[op]
+pub fn op_get_request_header(index: usize, name: String) -> Option<ByteString> {
+ with_req(index, |req| {
+ let value = req.headers.get(name);
+ value.map(|value| value.as_bytes().into())
+ })
+}
+
+#[op]
+pub fn op_get_request_headers(index: usize) -> Vec<(ByteString, ByteString)> {
+ with_req(index, |req| {
+ let headers = &req.headers;
+ let mut vec = Vec::with_capacity(headers.len());
+ let mut cookies: Option<Vec<&[u8]>> = None;
+ for (name, value) in headers {
+ if name == COOKIE {
+ if let Some(ref mut cookies) = cookies {
+ cookies.push(value.as_bytes());
+ } else {
+ cookies = Some(vec![value.as_bytes()]);
+ }
+ } else {
+ let name: &[u8] = name.as_ref();
+ vec.push((name.into(), value.as_bytes().into()))
+ }
+ }
+
+ // We treat cookies specially, because we don't want them to get them
+ // mangled by the `Headers` object in JS. What we do is take all cookie
+ // headers and concat them into a single cookie header, separated by
+ // semicolons.
+ // TODO(mmastrac): This should probably happen on the JS side on-demand
+ if let Some(cookies) = cookies {
+ let cookie_sep = "; ".as_bytes();
+ vec.push((
+ ByteString::from(COOKIE.as_str()),
+ ByteString::from(cookies.join(cookie_sep)),
+ ));
+ }
+ vec
+ })
+}
+
+#[op]
+pub fn op_read_request_body(state: &mut OpState, index: usize) -> ResourceId {
+ let incoming = with_req_body_mut(index, |body| body.take().unwrap());
+ let body_resource = Rc::new(HttpRequestBody::new(incoming));
+ let res = state.resource_table.add_rc(body_resource.clone());
+ with_body_mut(index, |body| {
+ *body = Some(body_resource);
+ });
+ res
+}
+
+#[op]
+pub fn op_set_response_header(
+ index: usize,
+ name: ByteString,
+ value: ByteString,
+) {
+ with_resp_mut(index, |resp| {
+ let resp_headers = resp.as_mut().unwrap().headers_mut();
+ // These are valid latin-1 strings
+ let name = HeaderName::from_bytes(&name).unwrap();
+ let value = HeaderValue::from_bytes(&value).unwrap();
+ resp_headers.append(name, value);
+ });
+}
+
+#[op]
+pub fn op_set_response_headers(
+ index: usize,
+ headers: Vec<(ByteString, ByteString)>,
+) {
+ // TODO(mmastrac): Invalid headers should be handled?
+ with_resp_mut(index, |resp| {
+ let resp_headers = resp.as_mut().unwrap().headers_mut();
+ resp_headers.reserve(headers.len());
+ for (name, value) in headers {
+ // These are valid latin-1 strings
+ let name = HeaderName::from_bytes(&name).unwrap();
+ let value = HeaderValue::from_bytes(&value).unwrap();
+ resp_headers.append(name, value);
+ }
+ })
+}
+
+#[op]
+pub fn op_set_response_body_resource(
+ state: &mut OpState,
+ index: usize,
+ stream_rid: ResourceId,
+ auto_close: bool,
+) -> Result<(), AnyError> {
+ // If the stream is auto_close, we will hold the last ref to it until the response is complete.
+ let resource = if auto_close {
+ state.resource_table.take_any(stream_rid)?
+ } else {
+ state.resource_table.get_any(stream_rid)?
+ };
+
+ with_resp_mut(index, move |response| {
+ let future = resource.clone().read(64 * 1024);
+ response
+ .as_mut()
+ .unwrap()
+ .body_mut()
+ .initialize(ResponseBytesInner::Resource(auto_close, resource, future));
+ });
+
+ Ok(())
+}
+
+#[op]
+pub fn op_set_response_body_stream(
+ state: &mut OpState,
+ index: usize,
+) -> Result<ResourceId, AnyError> {
+ // TODO(mmastrac): what should this channel size be?
+ let (tx, rx) = tokio::sync::mpsc::channel(1);
+ let (tx, rx) = (
+ V8StreamHttpResponseBody::new(tx),
+ ResponseBytesInner::V8Stream(rx),
+ );
+
+ with_resp_mut(index, move |response| {
+ response.as_mut().unwrap().body_mut().initialize(rx);
+ });
+
+ Ok(state.resource_table.add(tx))
+}
+
+#[op]
+pub fn op_set_response_body_text(index: usize, text: String) {
+ if !text.is_empty() {
+ with_resp_mut(index, move |response| {
+ response
+ .as_mut()
+ .unwrap()
+ .body_mut()
+ .initialize(ResponseBytesInner::Bytes(BufView::from(text.into_bytes())))
+ });
+ }
+}
+
+#[op]
+pub fn op_set_response_body_bytes(index: usize, buffer: ZeroCopyBuf) {
+ if !buffer.is_empty() {
+ with_resp_mut(index, |response| {
+ response
+ .as_mut()
+ .unwrap()
+ .body_mut()
+ .initialize(ResponseBytesInner::Bytes(BufView::from(buffer)))
+ });
+ };
+}
+
+#[op]
+pub async fn op_http_track(
+ state: Rc<RefCell<OpState>>,
+ index: usize,
+ server_rid: ResourceId,
+) -> Result<(), AnyError> {
+ let handle = with_resp(index, |resp| {
+ resp.as_ref().unwrap().body().completion_handle()
+ });
+
+ let join_handle = state
+ .borrow_mut()
+ .resource_table
+ .get::<HttpJoinHandle>(server_rid)?;
+
+ match handle.or_cancel(join_handle.cancel_handle()).await {
+ Ok(true) => Ok(()),
+ Ok(false) => {
+ Err(AnyError::msg("connection closed before message completed"))
+ }
+ Err(_e) => Ok(()),
+ }
+}
+
+#[pin_project(PinnedDrop)]
+pub struct SlabFuture<F: Future<Output = ()>>(usize, #[pin] F);
+
+pub fn new_slab_future(
+ request: Request,
+ request_info: HttpConnectionProperties,
+ tx: tokio::sync::mpsc::Sender<usize>,
+) -> SlabFuture<impl Future<Output = ()>> {
+ let index = slab_insert(request, request_info);
+ let rx = with_promise(index, |promise| promise.clone());
+ SlabFuture(index, async move {
+ if tx.send(index).await.is_ok() {
+ // We only need to wait for completion if we aren't closed
+ rx.await;
+ }
+ })
+}
+
+impl<F: Future<Output = ()>> SlabFuture<F> {}
+
+#[pinned_drop]
+impl<F: Future<Output = ()>> PinnedDrop for SlabFuture<F> {
+ fn drop(self: Pin<&mut Self>) {
+ SLAB.with(|slab| {
+ #[cfg(__zombie_http_tracking)]
+ {
+ slab.borrow_mut().get_mut(self.0).unwrap().alive = false;
+ }
+ #[cfg(not(__zombie_http_tracking))]
+ {
+ slab.borrow_mut().remove(self.0);
+ }
+ });
+ }
+}
+
+impl<F: Future<Output = ()>> Future for SlabFuture<F> {
+ type Output = Result<Response, hyper::Error>;
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ let index = self.0;
+ self
+ .project()
+ .1
+ .poll(cx)
+ .map(|_| Ok(with_resp_mut(index, |resp| resp.take().unwrap())))
+ }
+}
+
+fn serve_https(
+ mut io: TlsStream,
+ request_info: HttpConnectionProperties,
+ cancel: RcRef<CancelHandle>,
+ tx: tokio::sync::mpsc::Sender<usize>,
+) -> JoinHandle<Result<(), AnyError>> {
+ // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
+ let svc = service_fn(move |req: Request| {
+ new_slab_future(req, request_info.clone(), tx.clone())
+ });
+ spawn_local(async {
+ io.handshake().await?;
+ let handshake = io.get_ref().1.alpn_protocol();
+ // h2
+ if handshake == Some(&[104, 50]) {
+ let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc);
+
+ conn.map_err(AnyError::from).try_or_cancel(cancel).await
+ } else {
+ let conn = http1::Builder::new()
+ .keep_alive(true)
+ .serve_connection(io, svc);
+
+ conn
+ .with_upgrades()
+ .map_err(AnyError::from)
+ .try_or_cancel(cancel)
+ .await
+ }
+ })
+}
+
+fn serve_http(
+ io: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
+ request_info: HttpConnectionProperties,
+ cancel: RcRef<CancelHandle>,
+ tx: tokio::sync::mpsc::Sender<usize>,
+) -> JoinHandle<Result<(), AnyError>> {
+ // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
+ let svc = service_fn(move |req: Request| {
+ new_slab_future(req, request_info.clone(), tx.clone())
+ });
+ spawn_local(async {
+ let conn = http1::Builder::new()
+ .keep_alive(true)
+ .serve_connection(io, svc);
+ conn
+ .with_upgrades()
+ .map_err(AnyError::from)
+ .try_or_cancel(cancel)
+ .await
+ })
+}
+
+fn serve_http_on(
+ network_stream: NetworkStream,
+ listen_properties: &HttpListenProperties,
+ cancel: RcRef<CancelHandle>,
+ tx: tokio::sync::mpsc::Sender<usize>,
+) -> JoinHandle<Result<(), AnyError>> {
+ // We always want some sort of peer address. If we can't get one, just make up one.
+ let peer_address = network_stream.peer_address().unwrap_or_else(|_| {
+ NetworkStreamAddress::Ip(SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(0, 0, 0, 0),
+ 0,
+ )))
+ });
+ let connection_properties: HttpConnectionProperties =
+ DefaultHttpRequestProperties::connection_properties(
+ listen_properties,
+ &peer_address,
+ );
+
+ match network_stream {
+ NetworkStream::Tcp(conn) => {
+ serve_http(conn, connection_properties, cancel, tx)
+ }
+ NetworkStream::Tls(conn) => {
+ serve_https(conn, connection_properties, cancel, tx)
+ }
+ #[cfg(unix)]
+ NetworkStream::Unix(conn) => {
+ serve_http(conn, connection_properties, cancel, tx)
+ }
+ }
+}
+
+struct HttpJoinHandle(
+ AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
+ CancelHandle,
+ AsyncRefCell<tokio::sync::mpsc::Receiver<usize>>,
+);
+
+impl HttpJoinHandle {
+ fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
+ RcRef::map(self, |this| &this.1)
+ }
+}
+
+impl Resource for HttpJoinHandle {
+ fn name(&self) -> Cow<str> {
+ "http".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.1.cancel()
+ }
+}
+
+#[op(v8)]
+pub fn op_serve_http(
+ state: Rc<RefCell<OpState>>,
+ listener_rid: ResourceId,
+) -> Result<(ResourceId, &'static str, String), AnyError> {
+ let listener =
+ DefaultHttpRequestProperties::get_network_stream_listener_for_rid(
+ &mut state.borrow_mut(),
+ listener_rid,
+ )?;
+
+ let local_address = listener.listen_address()?;
+ let listen_properties = DefaultHttpRequestProperties::listen_properties(
+ listener.stream(),
+ &local_address,
+ );
+
+ let (tx, rx) = tokio::sync::mpsc::channel(10);
+ let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
+ AsyncRefCell::new(None),
+ CancelHandle::new(),
+ AsyncRefCell::new(rx),
+ ));
+ let cancel_clone = resource.cancel_handle();
+
+ let listen_properties_clone = listen_properties.clone();
+ let handle = spawn_local(async move {
+ loop {
+ let conn = listener
+ .accept()
+ .try_or_cancel(cancel_clone.clone())
+ .await?;
+ serve_http_on(
+ conn,
+ &listen_properties_clone,
+ cancel_clone.clone(),
+ tx.clone(),
+ );
+ }
+ #[allow(unreachable_code)]
+ Ok::<_, AnyError>(())
+ });
+
+ // Set the handle after we start the future
+ *RcRef::map(&resource, |this| &this.0)
+ .try_borrow_mut()
+ .unwrap() = Some(handle);
+
+ Ok((
+ state.borrow_mut().resource_table.add_rc(resource),
+ listen_properties.scheme,
+ listen_properties.fallback_host,
+ ))
+}
+
+#[op(v8)]
+pub fn op_serve_http_on(
+ state: Rc<RefCell<OpState>>,
+ conn: ResourceId,
+) -> Result<(ResourceId, &'static str, String), AnyError> {
+ let network_stream =
+ DefaultHttpRequestProperties::get_network_stream_for_rid(
+ &mut state.borrow_mut(),
+ conn,
+ )?;
+
+ let local_address = network_stream.local_address()?;
+ let listen_properties = DefaultHttpRequestProperties::listen_properties(
+ network_stream.stream(),
+ &local_address,
+ );
+
+ let (tx, rx) = tokio::sync::mpsc::channel(10);
+ let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
+ AsyncRefCell::new(None),
+ CancelHandle::new(),
+ AsyncRefCell::new(rx),
+ ));
+
+ let handle = serve_http_on(
+ network_stream,
+ &listen_properties,
+ resource.cancel_handle(),
+ tx,
+ );
+
+ // Set the handle after we start the future
+ *RcRef::map(&resource, |this| &this.0)
+ .try_borrow_mut()
+ .unwrap() = Some(handle);
+
+ Ok((
+ state.borrow_mut().resource_table.add_rc(resource),
+ listen_properties.scheme,
+ listen_properties.fallback_host,
+ ))
+}
+
+#[op]
+pub async fn op_http_wait(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+) -> Result<u32, AnyError> {
+ // We will get the join handle initially, as we might be consuming requests still
+ let join_handle = state
+ .borrow_mut()
+ .resource_table
+ .get::<HttpJoinHandle>(rid)?;
+
+ let cancel = join_handle.clone().cancel_handle();
+ let next = async {
+ let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await;
+ recv.recv().await
+ }
+ .or_cancel(cancel)
+ .unwrap_or_else(|_| None)
+ .await;
+
+ // Do we have a request?
+ if let Some(req) = next {
+ return Ok(req as u32);
+ }
+
+ // No - we're shutting down
+ let res = RcRef::map(join_handle, |this| &this.0)
+ .borrow_mut()
+ .await
+ .take()
+ .unwrap()
+ .await?;
+
+ // Drop the cancel and join handles
+ state
+ .borrow_mut()
+ .resource_table
+ .take::<HttpJoinHandle>(rid)?;
+
+ // Filter out shutdown (ENOTCONN) errors
+ if let Err(err) = res {
+ if let Some(err) = err.source() {
+ if let Some(err) = err.downcast_ref::<io::Error>() {
+ if err.kind() == io::ErrorKind::NotConnected {
+ return Ok(u32::MAX);
+ }
+ }
+ }
+ return Err(err);
+ }
+
+ Ok(u32::MAX)
+}
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 43e3c130a..561b13885 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -34,6 +34,7 @@ use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::WriteOutcome;
use deno_core::ZeroCopyBuf;
+use deno_net::raw::NetworkStream;
use deno_websocket::ws_create_server_stream;
use flate2::write::GzEncoder;
use flate2::Compression;
@@ -76,7 +77,11 @@ use crate::reader_stream::ExternallyAbortableReaderStream;
use crate::reader_stream::ShutdownHandle;
pub mod compressible;
+mod http_next;
mod reader_stream;
+mod request_body;
+mod request_properties;
+mod response_body;
mod websocket_upgrade;
deno_core::extension!(
@@ -92,8 +97,25 @@ deno_core::extension!(
op_http_websocket_accept_header,
op_http_upgrade_early,
op_http_upgrade_websocket,
+ http_next::op_serve_http,
+ http_next::op_serve_http_on,
+ http_next::op_http_wait,
+ http_next::op_http_track,
+ http_next::op_set_response_header,
+ http_next::op_set_response_headers,
+ http_next::op_set_response_body_text,
+ http_next::op_set_promise_complete,
+ http_next::op_set_response_body_bytes,
+ http_next::op_set_response_body_resource,
+ http_next::op_set_response_body_stream,
+ http_next::op_get_request_header,
+ http_next::op_get_request_headers,
+ http_next::op_get_request_method_and_url,
+ http_next::op_read_request_body,
+ http_next::op_upgrade,
+ http_next::op_upgrade_raw,
],
- esm = ["01_http.js"],
+ esm = ["00_serve.js", "01_http.js"],
);
pub enum HttpSocketAddr {
@@ -1147,8 +1169,10 @@ async fn op_http_upgrade_websocket(
}
};
- let transport = hyper::upgrade::on(request).await?;
- let ws_rid = ws_create_server_stream(&state, transport).await?;
+ let (transport, bytes) =
+ extract_network_stream(hyper::upgrade::on(request).await?);
+ let ws_rid =
+ ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?;
Ok(ws_rid)
}
@@ -1166,6 +1190,16 @@ where
}
}
+impl<Fut> hyper1::rt::Executor<Fut> for LocalExecutor
+where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+{
+ fn execute(&self, fut: Fut) {
+ spawn_local(fut);
+ }
+}
+
fn http_error(message: &'static str) -> AnyError {
custom_error("Http", message)
}
@@ -1192,3 +1226,47 @@ fn filter_enotconn(
fn never() -> Pending<Never> {
pending()
}
+
+trait CanDowncastUpgrade: Sized {
+ fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
+ self,
+ ) -> Result<(T, Bytes), Self>;
+}
+
+impl CanDowncastUpgrade for hyper1::upgrade::Upgraded {
+ fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
+ self,
+ ) -> Result<(T, Bytes), Self> {
+ let hyper1::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
+ Ok((io, read_buf))
+ }
+}
+
+impl CanDowncastUpgrade for hyper::upgrade::Upgraded {
+ fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
+ self,
+ ) -> Result<(T, Bytes), Self> {
+ let hyper::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
+ Ok((io, read_buf))
+ }
+}
+
+fn extract_network_stream<U: CanDowncastUpgrade>(
+ upgraded: U,
+) -> (NetworkStream, Bytes) {
+ let upgraded = match upgraded.downcast::<tokio::net::TcpStream>() {
+ Ok((stream, bytes)) => return (NetworkStream::Tcp(stream), bytes),
+ Err(x) => x,
+ };
+ let upgraded = match upgraded.downcast::<deno_net::ops_tls::TlsStream>() {
+ Ok((stream, bytes)) => return (NetworkStream::Tls(stream), bytes),
+ Err(x) => x,
+ };
+ #[cfg(unix)]
+ let upgraded = match upgraded.downcast::<tokio::net::UnixStream>() {
+ Ok((stream, bytes)) => return (NetworkStream::Unix(stream), bytes),
+ Err(x) => x,
+ };
+ drop(upgraded);
+ unreachable!("unexpected stream type");
+}
diff --git a/ext/http/request_body.rs b/ext/http/request_body.rs
new file mode 100644
index 000000000..73908ca55
--- /dev/null
+++ b/ext/http/request_body.rs
@@ -0,0 +1,84 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use bytes::Bytes;
+use deno_core::error::AnyError;
+use deno_core::futures::stream::Peekable;
+use deno_core::futures::Stream;
+use deno_core::futures::StreamExt;
+use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::BufView;
+use deno_core::RcRef;
+use deno_core::Resource;
+use hyper1::body::Body;
+use hyper1::body::Incoming;
+use hyper1::body::SizeHint;
+use std::borrow::Cow;
+use std::pin::Pin;
+use std::rc::Rc;
+
+/// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8.
+struct ReadFuture(Incoming);
+
+impl Stream for ReadFuture {
+ type Item = Result<Bytes, AnyError>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ let res = Pin::new(&mut self.get_mut().0).poll_frame(cx);
+ match res {
+ std::task::Poll::Ready(Some(Ok(frame))) => {
+ if let Ok(data) = frame.into_data() {
+ // Ensure that we never yield an empty frame
+ if !data.is_empty() {
+ return std::task::Poll::Ready(Some(Ok(data)));
+ }
+ }
+ }
+ std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),
+ _ => {}
+ }
+ std::task::Poll::Pending
+ }
+}
+
+pub struct HttpRequestBody(AsyncRefCell<Peekable<ReadFuture>>, SizeHint);
+
+impl HttpRequestBody {
+ pub fn new(body: Incoming) -> Self {
+ let size_hint = body.size_hint();
+ Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint)
+ }
+
+ async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> {
+ let peekable = RcRef::map(self, |this| &this.0);
+ let mut peekable = peekable.borrow_mut().await;
+ match Pin::new(&mut *peekable).peek_mut().await {
+ None => Ok(BufView::empty()),
+ Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()),
+ Some(Ok(bytes)) => {
+ if bytes.len() <= limit {
+ // We can safely take the next item since we peeked it
+ return Ok(BufView::from(peekable.next().await.unwrap()?));
+ }
+ let ret = bytes.split_to(limit);
+ Ok(BufView::from(ret))
+ }
+ }
+ }
+}
+
+impl Resource for HttpRequestBody {
+ fn name(&self) -> Cow<str> {
+ "requestBody".into()
+ }
+
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
+ Box::pin(HttpRequestBody::read(self, limit))
+ }
+
+ fn size_hint(&self) -> (u64, Option<u64>) {
+ (self.1.lower(), self.1.upper())
+ }
+}
diff --git a/ext/http/request_properties.rs b/ext/http/request_properties.rs
new file mode 100644
index 000000000..7a7f5219c
--- /dev/null
+++ b/ext/http/request_properties.rs
@@ -0,0 +1,249 @@
+use deno_core::error::AnyError;
+use deno_core::OpState;
+use deno_core::ResourceId;
+use deno_net::raw::NetworkStream;
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use deno_net::raw::take_network_stream_listener_resource;
+use deno_net::raw::take_network_stream_resource;
+use deno_net::raw::NetworkStreamAddress;
+use deno_net::raw::NetworkStreamListener;
+use deno_net::raw::NetworkStreamType;
+use hyper::HeaderMap;
+use hyper::Uri;
+use hyper1::header::HOST;
+use std::borrow::Cow;
+use std::rc::Rc;
+
+// TODO(mmastrac): I don't like that we have to clone this, but it's one-time setup
+#[derive(Clone)]
+pub struct HttpListenProperties {
+ pub stream_type: NetworkStreamType,
+ pub scheme: &'static str,
+ pub fallback_host: String,
+ pub local_port: Option<u16>,
+}
+
+#[derive(Clone)]
+pub struct HttpConnectionProperties {
+ pub stream_type: NetworkStreamType,
+ pub peer_address: Rc<str>,
+ pub peer_port: Option<u16>,
+ pub local_port: Option<u16>,
+}
+
+pub struct HttpRequestProperties {
+ pub authority: Option<String>,
+}
+
+/// Pluggable trait to determine listen, connection and request properties
+/// for embedders that wish to provide alternative routes for incoming HTTP.
+pub trait HttpPropertyExtractor {
+ /// Given a listener [`ResourceId`], returns the [`NetworkStreamListener`].
+ fn get_network_stream_listener_for_rid(
+ state: &mut OpState,
+ listener_rid: ResourceId,
+ ) -> Result<NetworkStreamListener, AnyError>;
+
+ /// Given a connection [`ResourceId`], returns the [`NetworkStream`].
+ fn get_network_stream_for_rid(
+ state: &mut OpState,
+ rid: ResourceId,
+ ) -> Result<NetworkStream, AnyError>;
+
+ /// Determines the listener properties.
+ fn listen_properties(
+ stream_type: NetworkStreamType,
+ local_address: &NetworkStreamAddress,
+ ) -> HttpListenProperties;
+
+ /// Determines the connection properties.
+ fn connection_properties(
+ listen_properties: &HttpListenProperties,
+ peer_address: &NetworkStreamAddress,
+ ) -> HttpConnectionProperties;
+
+ /// Determines the request properties.
+ fn request_properties(
+ connection_properties: &HttpConnectionProperties,
+ uri: &Uri,
+ headers: &HeaderMap,
+ ) -> HttpRequestProperties;
+}
+
+pub struct DefaultHttpRequestProperties {}
+
+impl HttpPropertyExtractor for DefaultHttpRequestProperties {
+ fn get_network_stream_for_rid(
+ state: &mut OpState,
+ rid: ResourceId,
+ ) -> Result<NetworkStream, AnyError> {
+ take_network_stream_resource(&mut state.resource_table, rid)
+ }
+
+ fn get_network_stream_listener_for_rid(
+ state: &mut OpState,
+ listener_rid: ResourceId,
+ ) -> Result<NetworkStreamListener, AnyError> {
+ take_network_stream_listener_resource(
+ &mut state.resource_table,
+ listener_rid,
+ )
+ }
+
+ fn listen_properties(
+ stream_type: NetworkStreamType,
+ local_address: &NetworkStreamAddress,
+ ) -> HttpListenProperties {
+ let scheme = req_scheme_from_stream_type(stream_type);
+ let fallback_host = req_host_from_addr(stream_type, local_address);
+ let local_port: Option<u16> = match local_address {
+ NetworkStreamAddress::Ip(ip) => Some(ip.port()),
+ #[cfg(unix)]
+ NetworkStreamAddress::Unix(_) => None,
+ };
+
+ HttpListenProperties {
+ scheme,
+ fallback_host,
+ local_port,
+ stream_type,
+ }
+ }
+
+ fn connection_properties(
+ listen_properties: &HttpListenProperties,
+ peer_address: &NetworkStreamAddress,
+ ) -> HttpConnectionProperties {
+ let peer_port: Option<u16> = match peer_address {
+ NetworkStreamAddress::Ip(ip) => Some(ip.port()),
+ #[cfg(unix)]
+ NetworkStreamAddress::Unix(_) => None,
+ };
+ let peer_address = match peer_address {
+ NetworkStreamAddress::Ip(addr) => Rc::from(addr.ip().to_string()),
+ #[cfg(unix)]
+ NetworkStreamAddress::Unix(_) => Rc::from("unix"),
+ };
+ let local_port = listen_properties.local_port;
+ let stream_type = listen_properties.stream_type;
+
+ HttpConnectionProperties {
+ stream_type,
+ peer_address,
+ peer_port,
+ local_port,
+ }
+ }
+
+ fn request_properties(
+ connection_properties: &HttpConnectionProperties,
+ uri: &Uri,
+ headers: &HeaderMap,
+ ) -> HttpRequestProperties {
+ let authority = req_host(
+ uri,
+ headers,
+ connection_properties.stream_type,
+ connection_properties.local_port.unwrap_or_default(),
+ )
+ .map(|s| s.into_owned());
+
+ HttpRequestProperties { authority }
+ }
+}
+
+/// Compute the fallback address from the [`NetworkStreamListenAddress`]. If the request has no authority/host in
+/// its URI, and there is no [`HeaderName::HOST`] header, we fall back to this.
+fn req_host_from_addr(
+ stream_type: NetworkStreamType,
+ addr: &NetworkStreamAddress,
+) -> String {
+ match addr {
+ NetworkStreamAddress::Ip(addr) => {
+ if (stream_type == NetworkStreamType::Tls && addr.port() == 443)
+ || (stream_type == NetworkStreamType::Tcp && addr.port() == 80)
+ {
+ if addr.ip().is_loopback() || addr.ip().is_unspecified() {
+ return "localhost".to_owned();
+ }
+ addr.ip().to_string()
+ } else {
+ if addr.ip().is_loopback() || addr.ip().is_unspecified() {
+ return format!("localhost:{}", addr.port());
+ }
+ addr.to_string()
+ }
+ }
+ // There is no standard way for unix domain socket URLs
+ // nginx and nodejs request use http://unix:[socket_path]:/ but it is not a valid URL
+ // httpie uses http+unix://[percent_encoding_of_path]/ which we follow
+ #[cfg(unix)]
+ NetworkStreamAddress::Unix(unix) => percent_encoding::percent_encode(
+ unix
+ .as_pathname()
+ .and_then(|x| x.to_str())
+ .unwrap_or_default()
+ .as_bytes(),
+ percent_encoding::NON_ALPHANUMERIC,
+ )
+ .to_string(),
+ }
+}
+
+fn req_scheme_from_stream_type(stream_type: NetworkStreamType) -> &'static str {
+ match stream_type {
+ NetworkStreamType::Tcp => "http://",
+ NetworkStreamType::Tls => "https://",
+ #[cfg(unix)]
+ NetworkStreamType::Unix => "http+unix://",
+ }
+}
+
+fn req_host<'a>(
+ uri: &'a Uri,
+ headers: &'a HeaderMap,
+ addr_type: NetworkStreamType,
+ port: u16,
+) -> Option<Cow<'a, str>> {
+ // Unix sockets always use the socket address
+ #[cfg(unix)]
+ if addr_type == NetworkStreamType::Unix {
+ return None;
+ }
+
+ // It is rare that an authority will be passed, but if it does, it takes priority
+ if let Some(auth) = uri.authority() {
+ match addr_type {
+ NetworkStreamType::Tcp => {
+ if port == 80 {
+ return Some(Cow::Borrowed(auth.host()));
+ }
+ }
+ NetworkStreamType::Tls => {
+ if port == 443 {
+ return Some(Cow::Borrowed(auth.host()));
+ }
+ }
+ #[cfg(unix)]
+ NetworkStreamType::Unix => {}
+ }
+ return Some(Cow::Borrowed(auth.as_str()));
+ }
+
+ // TODO(mmastrac): Most requests will use this path and we probably will want to optimize it in the future
+ if let Some(host) = headers.get(HOST) {
+ return Some(match host.to_str() {
+ Ok(host) => Cow::Borrowed(host),
+ Err(_) => Cow::Owned(
+ host
+ .as_bytes()
+ .iter()
+ .cloned()
+ .map(char::from)
+ .collect::<String>(),
+ ),
+ });
+ }
+
+ None
+}
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
new file mode 100644
index 000000000..0086e4d78
--- /dev/null
+++ b/ext/http/response_body.rs
@@ -0,0 +1,253 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::future::Future;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Waker;
+
+use deno_core::error::bad_resource;
+use deno_core::error::AnyError;
+use deno_core::futures::FutureExt;
+use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::BufView;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::WriteOutcome;
+use hyper1::body::Body;
+use hyper1::body::Frame;
+use hyper1::body::SizeHint;
+
+#[derive(Clone, Debug, Default)]
+pub struct CompletionHandle {
+ inner: Rc<RefCell<CompletionHandleInner>>,
+}
+
+#[derive(Debug, Default)]
+struct CompletionHandleInner {
+ complete: bool,
+ success: bool,
+ waker: Option<Waker>,
+}
+
+impl CompletionHandle {
+ pub fn complete(&self, success: bool) {
+ let mut mut_self = self.inner.borrow_mut();
+ mut_self.complete = true;
+ mut_self.success = success;
+ if let Some(waker) = mut_self.waker.take() {
+ drop(mut_self);
+ waker.wake();
+ }
+ }
+}
+
+impl Future for CompletionHandle {
+ type Output = bool;
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ let mut mut_self = self.inner.borrow_mut();
+ if mut_self.complete {
+ return std::task::Poll::Ready(mut_self.success);
+ }
+
+ mut_self.waker = Some(cx.waker().clone());
+ std::task::Poll::Pending
+ }
+}
+
+#[derive(Default)]
+pub enum ResponseBytesInner {
+ /// An empty stream.
+ #[default]
+ Empty,
+ /// A completed stream.
+ Done,
+ /// A static buffer of bytes, sent it one fell swoop.
+ Bytes(BufView),
+ /// A resource stream, piped in fast mode.
+ Resource(bool, Rc<dyn Resource>, AsyncResult<BufView>),
+ /// A JS-backed stream, written in JS and transported via pipe.
+ V8Stream(tokio::sync::mpsc::Receiver<BufView>),
+}
+
+impl std::fmt::Debug for ResponseBytesInner {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::Done => f.write_str("Done"),
+ Self::Empty => f.write_str("Empty"),
+ Self::Bytes(..) => f.write_str("Bytes"),
+ Self::Resource(..) => f.write_str("Resource"),
+ Self::V8Stream(..) => f.write_str("V8Stream"),
+ }
+ }
+}
+
+/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
+/// required by hyper. As the API requires information about request completion (including a success/fail
+/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
+#[derive(Debug, Default)]
+pub struct ResponseBytes(ResponseBytesInner, CompletionHandle);
+
+impl ResponseBytes {
+ pub fn initialize(&mut self, inner: ResponseBytesInner) {
+ debug_assert!(matches!(self.0, ResponseBytesInner::Empty));
+ self.0 = inner;
+ }
+
+ pub fn completion_handle(&self) -> CompletionHandle {
+ self.1.clone()
+ }
+
+ fn complete(&mut self, success: bool) -> ResponseBytesInner {
+ if matches!(self.0, ResponseBytesInner::Done) {
+ return ResponseBytesInner::Done;
+ }
+
+ let current = std::mem::replace(&mut self.0, ResponseBytesInner::Done);
+ self.1.complete(success);
+ current
+ }
+}
+
+impl ResponseBytesInner {
+ pub fn size_hint(&self) -> SizeHint {
+ match self {
+ Self::Done => SizeHint::with_exact(0),
+ Self::Empty => SizeHint::with_exact(0),
+ Self::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64),
+ Self::Resource(_, res, _) => {
+ let hint = res.size_hint();
+ let mut size_hint = SizeHint::new();
+ size_hint.set_lower(hint.0);
+ if let Some(upper) = hint.1 {
+ size_hint.set_upper(upper)
+ }
+ size_hint
+ }
+ Self::V8Stream(..) => SizeHint::default(),
+ }
+ }
+}
+
+impl Body for ResponseBytes {
+ type Data = BufView;
+ type Error = AnyError;
+
+ fn poll_frame(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+ match &mut self.0 {
+ ResponseBytesInner::Done | ResponseBytesInner::Empty => {
+ unreachable!()
+ }
+ ResponseBytesInner::Bytes(..) => {
+ if let ResponseBytesInner::Bytes(data) = self.complete(true) {
+ std::task::Poll::Ready(Some(Ok(Frame::data(data))))
+ } else {
+ unreachable!()
+ }
+ }
+ ResponseBytesInner::Resource(auto_close, stm, ref mut future) => {
+ match future.poll_unpin(cx) {
+ std::task::Poll::Pending => std::task::Poll::Pending,
+ std::task::Poll::Ready(Err(err)) => {
+ std::task::Poll::Ready(Some(Err(err)))
+ }
+ std::task::Poll::Ready(Ok(buf)) => {
+ if buf.is_empty() {
+ if *auto_close {
+ stm.clone().close();
+ }
+ self.complete(true);
+ return std::task::Poll::Ready(None);
+ }
+ // Re-arm the future
+ *future = stm.clone().read(64 * 1024);
+ std::task::Poll::Ready(Some(Ok(Frame::data(buf))))
+ }
+ }
+ }
+ ResponseBytesInner::V8Stream(stm) => match stm.poll_recv(cx) {
+ std::task::Poll::Pending => std::task::Poll::Pending,
+ std::task::Poll::Ready(Some(buf)) => {
+ std::task::Poll::Ready(Some(Ok(Frame::data(buf))))
+ }
+ std::task::Poll::Ready(None) => {
+ self.complete(true);
+ std::task::Poll::Ready(None)
+ }
+ },
+ }
+ }
+
+ fn is_end_stream(&self) -> bool {
+ matches!(self.0, ResponseBytesInner::Done | ResponseBytesInner::Empty)
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
+ // anyways just in case hyper needs it.
+ self.0.size_hint()
+ }
+}
+
+impl Drop for ResponseBytes {
+ fn drop(&mut self) {
+ // We won't actually poll_frame for Empty responses so this is where we return success
+ self.complete(matches!(self.0, ResponseBytesInner::Empty));
+ }
+}
+
+/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which
+/// feed's hyper's HTTP response.
+pub struct V8StreamHttpResponseBody(
+ AsyncRefCell<Option<tokio::sync::mpsc::Sender<BufView>>>,
+ CancelHandle,
+);
+
+impl V8StreamHttpResponseBody {
+ pub fn new(sender: tokio::sync::mpsc::Sender<BufView>) -> Self {
+ Self(AsyncRefCell::new(Some(sender)), CancelHandle::default())
+ }
+}
+
+impl Resource for V8StreamHttpResponseBody {
+ fn name(&self) -> Cow<str> {
+ "responseBody".into()
+ }
+
+ fn write(
+ self: Rc<Self>,
+ buf: BufView,
+ ) -> AsyncResult<deno_core::WriteOutcome> {
+ let cancel_handle = RcRef::map(&self, |this| &this.1);
+ Box::pin(
+ async move {
+ let nwritten = buf.len();
+
+ let res = RcRef::map(self, |this| &this.0).borrow().await;
+ if let Some(tx) = res.as_ref() {
+ tx.send(buf)
+ .await
+ .map_err(|_| bad_resource("failed to write"))?;
+ Ok(WriteOutcome::Full { nwritten })
+ } else {
+ Err(bad_resource("failed to write"))
+ }
+ }
+ .try_or_cancel(cancel_handle),
+ )
+ }
+
+ fn close(self: Rc<Self>) {
+ self.1.cancel();
+ }
+}
diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml
index a7a1acff6..6bab80cc7 100644
--- a/ext/net/Cargo.toml
+++ b/ext/net/Cargo.toml
@@ -17,6 +17,7 @@ path = "lib.rs"
deno_core.workspace = true
deno_tls.workspace = true
log.workspace = true
+pin-project.workspace = true
serde.workspace = true
socket2.workspace = true
tokio.workspace = true
diff --git a/ext/net/lib.rs b/ext/net/lib.rs
index f812bf60b..ff67186b0 100644
--- a/ext/net/lib.rs
+++ b/ext/net/lib.rs
@@ -5,6 +5,7 @@ pub mod ops;
pub mod ops_tls;
#[cfg(unix)]
pub mod ops_unix;
+pub mod raw;
pub mod resolve_addr;
use deno_core::error::AnyError;
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
index c0cfb8674..8a7757066 100644
--- a/ext/net/ops_tls.rs
+++ b/ext/net/ops_tls.rs
@@ -61,6 +61,7 @@ use std::fs::File;
use std::io;
use std::io::BufReader;
use std::io::ErrorKind;
+use std::net::SocketAddr;
use std::path::Path;
use std::pin::Pin;
use std::rc::Rc;
@@ -115,6 +116,13 @@ impl TlsStream {
Self::new(tcp, Connection::Client(tls))
}
+ pub fn new_client_side_from(
+ tcp: TcpStream,
+ connection: ClientConnection,
+ ) -> Self {
+ Self::new(tcp, Connection::Client(connection))
+ }
+
pub fn new_server_side(
tcp: TcpStream,
tls_config: Arc<ServerConfig>,
@@ -123,6 +131,13 @@ impl TlsStream {
Self::new(tcp, Connection::Server(tls))
}
+ pub fn new_server_side_from(
+ tcp: TcpStream,
+ connection: ServerConnection,
+ ) -> Self {
+ Self::new(tcp, Connection::Server(connection))
+ }
+
pub fn into_split(self) -> (ReadHalf, WriteHalf) {
let shared = Shared::new(self);
let rd = ReadHalf {
@@ -132,6 +147,16 @@ impl TlsStream {
(rd, wr)
}
+ /// Convenience method to match [`TcpStream`].
+ pub fn peer_addr(&self) -> Result<SocketAddr, io::Error> {
+ self.0.as_ref().unwrap().tcp.peer_addr()
+ }
+
+ /// Convenience method to match [`TcpStream`].
+ pub fn local_addr(&self) -> Result<SocketAddr, io::Error> {
+ self.0.as_ref().unwrap().tcp.local_addr()
+ }
+
/// Tokio-rustls compatibility: returns a reference to the underlying TCP
/// stream, and a reference to the Rustls `Connection` object.
pub fn get_ref(&self) -> (&TcpStream, &Connection) {
@@ -954,8 +979,8 @@ fn load_private_keys_from_file(
}
pub struct TlsListenerResource {
- tcp_listener: AsyncRefCell<TcpListener>,
- tls_config: Arc<ServerConfig>,
+ pub(crate) tcp_listener: AsyncRefCell<TcpListener>,
+ pub(crate) tls_config: Arc<ServerConfig>,
cancel_handle: CancelHandle,
}
diff --git a/ext/net/ops_unix.rs b/ext/net/ops_unix.rs
index 1161d2759..bed923f8b 100644
--- a/ext/net/ops_unix.rs
+++ b/ext/net/ops_unix.rs
@@ -32,8 +32,8 @@ pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
})
}
-struct UnixListenerResource {
- listener: AsyncRefCell<UnixListener>,
+pub(crate) struct UnixListenerResource {
+ pub listener: AsyncRefCell<UnixListener>,
cancel: CancelHandle,
}
diff --git a/ext/net/raw.rs b/ext/net/raw.rs
new file mode 100644
index 000000000..74cc10d63
--- /dev/null
+++ b/ext/net/raw.rs
@@ -0,0 +1,304 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use crate::io::TcpStreamResource;
+#[cfg(unix)]
+use crate::io::UnixStreamResource;
+use crate::ops::TcpListenerResource;
+use crate::ops_tls::TlsListenerResource;
+use crate::ops_tls::TlsStream;
+use crate::ops_tls::TlsStreamResource;
+#[cfg(unix)]
+use crate::ops_unix::UnixListenerResource;
+use deno_core::error::bad_resource;
+use deno_core::error::bad_resource_id;
+use deno_core::error::AnyError;
+use deno_core::ResourceId;
+use deno_core::ResourceTable;
+use deno_tls::rustls::ServerConfig;
+use pin_project::pin_project;
+use std::rc::Rc;
+use std::sync::Arc;
+use tokio::net::TcpStream;
+#[cfg(unix)]
+use tokio::net::UnixStream;
+
+/// A raw stream of one of the types handled by this extension.
+#[pin_project(project = NetworkStreamProject)]
+pub enum NetworkStream {
+ Tcp(#[pin] TcpStream),
+ Tls(#[pin] TlsStream),
+ #[cfg(unix)]
+ Unix(#[pin] UnixStream),
+}
+
+/// A raw stream of one of the types handled by this extension.
+#[derive(Copy, Clone, PartialEq, Eq)]
+pub enum NetworkStreamType {
+ Tcp,
+ Tls,
+ #[cfg(unix)]
+ Unix,
+}
+
+impl NetworkStream {
+ pub fn local_address(&self) -> Result<NetworkStreamAddress, std::io::Error> {
+ match self {
+ Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)),
+ Self::Tls(tls) => Ok(NetworkStreamAddress::Ip(tls.local_addr()?)),
+ #[cfg(unix)]
+ Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.local_addr()?)),
+ }
+ }
+
+ pub fn peer_address(&self) -> Result<NetworkStreamAddress, std::io::Error> {
+ match self {
+ Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.peer_addr()?)),
+ Self::Tls(tls) => Ok(NetworkStreamAddress::Ip(tls.peer_addr()?)),
+ #[cfg(unix)]
+ Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.peer_addr()?)),
+ }
+ }
+
+ pub fn stream(&self) -> NetworkStreamType {
+ match self {
+ Self::Tcp(_) => NetworkStreamType::Tcp,
+ Self::Tls(_) => NetworkStreamType::Tls,
+ #[cfg(unix)]
+ Self::Unix(_) => NetworkStreamType::Unix,
+ }
+ }
+}
+
+impl tokio::io::AsyncRead for NetworkStream {
+ fn poll_read(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ match self.project() {
+ NetworkStreamProject::Tcp(s) => s.poll_read(cx, buf),
+ NetworkStreamProject::Tls(s) => s.poll_read(cx, buf),
+ #[cfg(unix)]
+ NetworkStreamProject::Unix(s) => s.poll_read(cx, buf),
+ }
+ }
+}
+
+impl tokio::io::AsyncWrite for NetworkStream {
+ fn poll_write(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ match self.project() {
+ NetworkStreamProject::Tcp(s) => s.poll_write(cx, buf),
+ NetworkStreamProject::Tls(s) => s.poll_write(cx, buf),
+ #[cfg(unix)]
+ NetworkStreamProject::Unix(s) => s.poll_write(cx, buf),
+ }
+ }
+
+ fn poll_flush(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ match self.project() {
+ NetworkStreamProject::Tcp(s) => s.poll_flush(cx),
+ NetworkStreamProject::Tls(s) => s.poll_flush(cx),
+ #[cfg(unix)]
+ NetworkStreamProject::Unix(s) => s.poll_flush(cx),
+ }
+ }
+
+ fn poll_shutdown(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ match self.project() {
+ NetworkStreamProject::Tcp(s) => s.poll_shutdown(cx),
+ NetworkStreamProject::Tls(s) => s.poll_shutdown(cx),
+ #[cfg(unix)]
+ NetworkStreamProject::Unix(s) => s.poll_shutdown(cx),
+ }
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ match self {
+ Self::Tcp(s) => s.is_write_vectored(),
+ Self::Tls(s) => s.is_write_vectored(),
+ #[cfg(unix)]
+ Self::Unix(s) => s.is_write_vectored(),
+ }
+ }
+
+ fn poll_write_vectored(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ match self.project() {
+ NetworkStreamProject::Tcp(s) => s.poll_write_vectored(cx, bufs),
+ NetworkStreamProject::Tls(s) => s.poll_write_vectored(cx, bufs),
+ #[cfg(unix)]
+ NetworkStreamProject::Unix(s) => s.poll_write_vectored(cx, bufs),
+ }
+ }
+}
+
+/// A raw stream listener of one of the types handled by this extension.
+pub enum NetworkStreamListener {
+ Tcp(tokio::net::TcpListener),
+ Tls(tokio::net::TcpListener, Arc<ServerConfig>),
+ #[cfg(unix)]
+ Unix(tokio::net::UnixListener),
+}
+
+pub enum NetworkStreamAddress {
+ Ip(std::net::SocketAddr),
+ #[cfg(unix)]
+ Unix(tokio::net::unix::SocketAddr),
+}
+
+impl NetworkStreamListener {
+ /// Accepts a connection on this listener.
+ pub async fn accept(&self) -> Result<NetworkStream, AnyError> {
+ Ok(match self {
+ Self::Tcp(tcp) => {
+ let (stream, _addr) = tcp.accept().await?;
+ NetworkStream::Tcp(stream)
+ }
+ Self::Tls(tcp, config) => {
+ let (stream, _addr) = tcp.accept().await?;
+ NetworkStream::Tls(TlsStream::new_server_side(stream, config.clone()))
+ }
+ #[cfg(unix)]
+ Self::Unix(unix) => {
+ let (stream, _addr) = unix.accept().await?;
+ NetworkStream::Unix(stream)
+ }
+ })
+ }
+
+ pub fn listen_address(&self) -> Result<NetworkStreamAddress, std::io::Error> {
+ match self {
+ Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)),
+ Self::Tls(tcp, _) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)),
+ #[cfg(unix)]
+ Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.local_addr()?)),
+ }
+ }
+
+ pub fn stream(&self) -> NetworkStreamType {
+ match self {
+ Self::Tcp(..) => NetworkStreamType::Tcp,
+ Self::Tls(..) => NetworkStreamType::Tls,
+ #[cfg(unix)]
+ Self::Unix(..) => NetworkStreamType::Unix,
+ }
+ }
+}
+
+/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server).
+/// This method will extract a stream from the resource table and return it, unwrapped.
+pub fn take_network_stream_resource(
+ resource_table: &mut ResourceTable,
+ stream_rid: ResourceId,
+) -> Result<NetworkStream, AnyError> {
+ // The stream we're attempting to unwrap may be in use somewhere else. If that's the case, we cannot proceed
+ // with the process of unwrapping this connection, so we just return a bad resource error.
+ // See also: https://github.com/denoland/deno/pull/16242
+
+ if let Ok(resource_rc) = resource_table.take::<TcpStreamResource>(stream_rid)
+ {
+ // This TCP connection might be used somewhere else.
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("TCP stream is currently in use"))?;
+ let (read_half, write_half) = resource.into_inner();
+ let tcp_stream = read_half.reunite(write_half)?;
+ return Ok(NetworkStream::Tcp(tcp_stream));
+ }
+
+ if let Ok(resource_rc) = resource_table.take::<TlsStreamResource>(stream_rid)
+ {
+ // This TLS connection might be used somewhere else.
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("TLS stream is currently in use"))?;
+ let (read_half, write_half) = resource.into_inner();
+ let tls_stream = read_half.reunite(write_half);
+ return Ok(NetworkStream::Tls(tls_stream));
+ }
+
+ #[cfg(unix)]
+ if let Ok(resource_rc) = resource_table.take::<UnixStreamResource>(stream_rid)
+ {
+ // This UNIX socket might be used somewhere else.
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("UNIX stream is currently in use"))?;
+ let (read_half, write_half) = resource.into_inner();
+ let unix_stream = read_half.reunite(write_half)?;
+ return Ok(NetworkStream::Unix(unix_stream));
+ }
+
+ Err(bad_resource_id())
+}
+
+/// Inserts a raw stream (back?) into the resource table and returns a resource ID. This can then be used to create raw connection
+/// objects on the JS side.
+pub fn put_network_stream_resource(
+ resource_table: &mut ResourceTable,
+ stream: NetworkStream,
+) -> Result<ResourceId, AnyError> {
+ let res = match stream {
+ NetworkStream::Tcp(conn) => {
+ let (r, w) = conn.into_split();
+ resource_table.add(TcpStreamResource::new((r, w)))
+ }
+ NetworkStream::Tls(conn) => {
+ let (r, w) = conn.into_split();
+ resource_table.add(TlsStreamResource::new((r, w)))
+ }
+ #[cfg(unix)]
+ NetworkStream::Unix(conn) => {
+ let (r, w) = conn.into_split();
+ resource_table.add(UnixStreamResource::new((r, w)))
+ }
+ };
+
+ Ok(res)
+}
+
+/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server).
+/// This method will extract a stream from the resource table and return it, unwrapped.
+pub fn take_network_stream_listener_resource(
+ resource_table: &mut ResourceTable,
+ listener_rid: ResourceId,
+) -> Result<NetworkStreamListener, AnyError> {
+ if let Ok(resource_rc) =
+ resource_table.take::<TcpListenerResource>(listener_rid)
+ {
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("TCP socket listener is currently in use"))?;
+ return Ok(NetworkStreamListener::Tcp(resource.listener.into_inner()));
+ }
+
+ if let Ok(resource_rc) =
+ resource_table.take::<TlsListenerResource>(listener_rid)
+ {
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("TLS socket listener is currently in use"))?;
+ return Ok(NetworkStreamListener::Tls(
+ resource.tcp_listener.into_inner(),
+ resource.tls_config,
+ ));
+ }
+
+ #[cfg(unix)]
+ if let Ok(resource_rc) =
+ resource_table.take::<UnixListenerResource>(listener_rid)
+ {
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("UNIX socket listener is currently in use"))?;
+ return Ok(NetworkStreamListener::Unix(resource.listener.into_inner()));
+ }
+
+ Err(bad_resource_id())
+}
diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml
index 53e184e1e..006c73a5f 100644
--- a/ext/websocket/Cargo.toml
+++ b/ext/websocket/Cargo.toml
@@ -14,11 +14,13 @@ description = "Implementation of WebSocket API for Deno"
path = "lib.rs"
[dependencies]
+bytes.workspace = true
deno_core.workspace = true
+deno_net.workspace = true
deno_tls.workspace = true
fastwebsockets = { workspace = true, features = ["upgrade"] }
http.workspace = true
-hyper.workspace = true
+hyper = { workspace = true, features = ["backports"] }
serde.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index 798856bc1..71aa66ff3 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -1,11 +1,10 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
+use crate::stream::WebSocketStream;
+use bytes::Bytes;
use deno_core::error::invalid_hostname;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
-use deno_core::StringOrBuffer;
-
use deno_core::url;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
@@ -15,7 +14,10 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
+use deno_core::StringOrBuffer;
use deno_core::ZeroCopyBuf;
+use deno_net::raw::take_network_stream_resource;
+use deno_net::raw::NetworkStream;
use deno_tls::create_client_config;
use http::header::CONNECTION;
use http::header::UPGRADE;
@@ -24,9 +26,7 @@ use http::HeaderValue;
use http::Method;
use http::Request;
use http::Uri;
-use hyper::upgrade::Upgraded;
use hyper::Body;
-use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
@@ -52,6 +52,7 @@ use fastwebsockets::Role;
use fastwebsockets::WebSocket;
pub use tokio_tungstenite; // Re-export tokio_tungstenite
+mod stream;
#[derive(Clone)]
pub struct WsRootStore(pub Option<RootCertStore>);
@@ -243,17 +244,21 @@ where
let client =
fastwebsockets::handshake::client(&LocalExecutor, request, socket);
- let (stream, response): (WebSocket<Upgraded>, Response<Body>) =
- if let Some(cancel_resource) = cancel_resource {
- client.or_cancel(cancel_resource.0.to_owned()).await?
- } else {
- client.await
- }
- .map_err(|err| {
- DomExceptionNetworkError::new(&format!(
- "failed to connect to WebSocket: {err}"
- ))
- })?;
+ let (upgraded, response) = if let Some(cancel_resource) = cancel_resource {
+ client.or_cancel(cancel_resource.0.to_owned()).await?
+ } else {
+ client.await
+ }
+ .map_err(|err| {
+ DomExceptionNetworkError::new(&format!(
+ "failed to connect to WebSocket: {err}"
+ ))
+ })?;
+
+ let inner = MaybeTlsStream::Plain(upgraded.into_inner());
+ let stream =
+ WebSocketStream::new(stream::WsStreamKind::Tungstenite(inner), None);
+ let stream = WebSocket::after_handshake(stream, Role::Client);
if let Some(cancel_rid) = cancel_handle {
state.borrow_mut().resource_table.close(cancel_rid).ok();
@@ -294,7 +299,7 @@ pub enum MessageKind {
}
pub struct ServerWebSocket {
- ws: AsyncRefCell<FragmentCollector<Upgraded>>,
+ ws: AsyncRefCell<FragmentCollector<WebSocketStream>>,
closed: Rc<Cell<bool>>,
}
@@ -320,11 +325,19 @@ impl Resource for ServerWebSocket {
"serverWebSocket".into()
}
}
-pub async fn ws_create_server_stream(
- state: &Rc<RefCell<OpState>>,
- transport: Upgraded,
+
+pub fn ws_create_server_stream(
+ state: &mut OpState,
+ transport: NetworkStream,
+ read_buf: Bytes,
) -> Result<ResourceId, AnyError> {
- let mut ws = WebSocket::after_handshake(transport, Role::Server);
+ let mut ws = WebSocket::after_handshake(
+ WebSocketStream::new(
+ stream::WsStreamKind::Network(transport),
+ Some(read_buf),
+ ),
+ Role::Server,
+ );
ws.set_writev(true);
ws.set_auto_close(true);
ws.set_auto_pong(true);
@@ -334,12 +347,27 @@ pub async fn ws_create_server_stream(
closed: Rc::new(Cell::new(false)),
};
- let resource_table = &mut state.borrow_mut().resource_table;
- let rid = resource_table.add(ws_resource);
+ let rid = state.resource_table.add(ws_resource);
Ok(rid)
}
#[op]
+pub fn op_ws_server_create(
+ state: &mut OpState,
+ conn: ResourceId,
+ extra_bytes: &[u8],
+) -> Result<ResourceId, AnyError> {
+ let network_stream =
+ take_network_stream_resource(&mut state.resource_table, conn)?;
+ // Copying the extra bytes, but unlikely this will account for much
+ ws_create_server_stream(
+ state,
+ network_stream,
+ Bytes::from(extra_bytes.to_vec()),
+ )
+}
+
+#[op]
pub async fn op_ws_send_binary(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
@@ -490,6 +518,7 @@ deno_core::extension!(deno_websocket,
op_ws_next_event,
op_ws_send_binary,
op_ws_send_text,
+ op_ws_server_create,
],
esm = [ "01_websocket.js", "02_websocketstream.js" ],
options = {
diff --git a/ext/websocket/stream.rs b/ext/websocket/stream.rs
new file mode 100644
index 000000000..69c06b7eb
--- /dev/null
+++ b/ext/websocket/stream.rs
@@ -0,0 +1,115 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use bytes::Buf;
+use bytes::Bytes;
+use deno_net::raw::NetworkStream;
+use hyper::upgrade::Upgraded;
+use std::pin::Pin;
+use std::task::Poll;
+use tokio::io::AsyncRead;
+use tokio::io::AsyncWrite;
+use tokio::io::ReadBuf;
+use tokio_tungstenite::MaybeTlsStream;
+
+// TODO(bartlomieju): remove this
+pub(crate) enum WsStreamKind {
+ Tungstenite(MaybeTlsStream<Upgraded>),
+ Network(NetworkStream),
+}
+
+pub(crate) struct WebSocketStream {
+ stream: WsStreamKind,
+ pre: Option<Bytes>,
+}
+
+impl WebSocketStream {
+ pub fn new(stream: WsStreamKind, buffer: Option<Bytes>) -> Self {
+ Self {
+ stream,
+ pre: buffer,
+ }
+ }
+}
+
+impl AsyncRead for WebSocketStream {
+ // From hyper's Rewind (https://github.com/hyperium/hyper), MIT License, Copyright (c) Sean McArthur
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<std::io::Result<()>> {
+ if let Some(mut prefix) = self.pre.take() {
+ // If there are no remaining bytes, let the bytes get dropped.
+ if !prefix.is_empty() {
+ let copy_len = std::cmp::min(prefix.len(), buf.remaining());
+ // TODO: There should be a way to do following two lines cleaner...
+ buf.put_slice(&prefix[..copy_len]);
+ prefix.advance(copy_len);
+ // Put back what's left
+ if !prefix.is_empty() {
+ self.pre = Some(prefix);
+ }
+
+ return Poll::Ready(Ok(()));
+ }
+ }
+ match &mut self.stream {
+ WsStreamKind::Network(stream) => Pin::new(stream).poll_read(cx, buf),
+ WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_read(cx, buf),
+ }
+ }
+}
+
+impl AsyncWrite for WebSocketStream {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ match &mut self.stream {
+ WsStreamKind::Network(stream) => Pin::new(stream).poll_write(cx, buf),
+ WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_write(cx, buf),
+ }
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ match &mut self.stream {
+ WsStreamKind::Network(stream) => Pin::new(stream).poll_flush(cx),
+ WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_flush(cx),
+ }
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ match &mut self.stream {
+ WsStreamKind::Network(stream) => Pin::new(stream).poll_shutdown(cx),
+ WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_shutdown(cx),
+ }
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ match &self.stream {
+ WsStreamKind::Network(stream) => stream.is_write_vectored(),
+ WsStreamKind::Tungstenite(stream) => stream.is_write_vectored(),
+ }
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ match &mut self.stream {
+ WsStreamKind::Network(stream) => {
+ Pin::new(stream).poll_write_vectored(cx, bufs)
+ }
+ WsStreamKind::Tungstenite(stream) => {
+ Pin::new(stream).poll_write_vectored(cx, bufs)
+ }
+ }
+ }
+}