summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2022-03-16 14:54:18 +0100
committerGitHub <noreply@github.com>2022-03-16 14:54:18 +0100
commitc5270abad7c42968dcbdbc8d9f09d7675fb843e9 (patch)
tree7e7ff9b63b7cd3420295d5e546dcd987ed746d7d
parent89a41d0a67c531d937126bbdb095ab1edb5eede2 (diff)
feat(unstable): Add Deno.upgradeHttp API (#13618)
This commit adds "Deno.upgradeHttp" API, which allows to "hijack" connection and switch protocols, to eg. implement WebSocket required for Node compat. Co-authored-by: crowlkats <crowlkats@toaxl.com> Co-authored-by: Ryan Dahl <ry@tinyclouds.org> Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
-rw-r--r--cli/dts/lib.deno.unstable.d.ts14
-rw-r--r--cli/tests/unit/http_test.ts44
-rw-r--r--ext/http/01_http.js47
-rw-r--r--ext/http/lib.rs6
-rw-r--r--ext/net/ops_tls.rs2
-rw-r--r--runtime/js/40_http.js2
-rw-r--r--runtime/js/90_deno_ns.js1
-rw-r--r--runtime/ops/http.rs80
8 files changed, 187 insertions, 9 deletions
diff --git a/cli/dts/lib.deno.unstable.d.ts b/cli/dts/lib.deno.unstable.d.ts
index 9ab9b5761..a61d672f7 100644
--- a/cli/dts/lib.deno.unstable.d.ts
+++ b/cli/dts/lib.deno.unstable.d.ts
@@ -1333,6 +1333,20 @@ declare namespace Deno {
* Make the timer of the given id not blocking the event loop from finishing
*/
export function unrefTimer(id: number): void;
+
+ /** **UNSTABLE**: new API, yet to be vetter.
+ *
+ * Allows to "hijack" a connection that the request is associated with.
+ * Can be used to implement protocols that build on top of HTTP (eg.
+ * WebSockets).
+ *
+ * The returned promise returns underlying connection and first packet
+ * received. The promise shouldn't be awaited before responding to the
+ * `request`, otherwise event loop might deadlock.
+ */
+ export function upgradeHttp(
+ request: Request,
+ ): Promise<[Deno.Conn, Uint8Array]>;
}
declare function fetch(
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts
index 65ee55577..fd8d49947 100644
--- a/cli/tests/unit/http_test.ts
+++ b/cli/tests/unit/http_test.ts
@@ -5,6 +5,7 @@ import {
BufWriter,
} from "../../../test_util/std/io/buffer.ts";
import { TextProtoReader } from "../../../test_util/std/textproto/mod.ts";
+import { serve } from "../../../test_util/std/http/server.ts";
import {
assert,
assertEquals,
@@ -1738,6 +1739,49 @@ Deno.test({
},
});
+Deno.test("upgradeHttp", async () => {
+ async function client() {
+ const tcpConn = await Deno.connect({ port: 4501 });
+ await tcpConn.write(
+ new TextEncoder().encode(
+ "CONNECT server.example.com:80 HTTP/1.1\r\n\r\nbla bla bla\nbla bla\nbla\n",
+ ),
+ );
+ setTimeout(async () => {
+ await tcpConn.write(
+ new TextEncoder().encode(
+ "bla bla bla\nbla bla\nbla\n",
+ ),
+ );
+ tcpConn.close();
+ }, 500);
+ }
+
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+
+ const server = serve((req) => {
+ const p = Deno.upgradeHttp(req);
+
+ (async () => {
+ const [conn, firstPacket] = await p;
+ const buf = new Uint8Array(1024);
+ const firstPacketText = new TextDecoder().decode(firstPacket);
+ assertEquals(firstPacketText, "bla bla bla\nbla bla\nbla\n");
+ const n = await conn.read(buf);
+ assert(n != null);
+ const secondPacketText = new TextDecoder().decode(buf.slice(0, n));
+ assertEquals(secondPacketText, "bla bla bla\nbla bla\nbla\n");
+ abortController.abort();
+ conn.close();
+ })();
+
+ return new Response(null, { status: 101 });
+ }, { port: 4501, signal });
+
+ await Promise.all([server, client()]);
+});
+
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index eae742990..ad39ce257 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -30,10 +30,14 @@
_idleTimeoutTimeout,
_serverHandleIdleTimeout,
} = window.__bootstrap.webSocket;
+ const { TcpConn } = window.__bootstrap.net;
+ const { TlsConn } = window.__bootstrap.tls;
+ const { Deferred } = window.__bootstrap.streams;
const {
ArrayPrototypeIncludes,
ArrayPrototypePush,
ArrayPrototypeSome,
+ Error,
ObjectPrototypeIsPrototypeOf,
PromisePrototype,
Set,
@@ -53,10 +57,13 @@
} = window.__bootstrap.primordials;
const connErrorSymbol = Symbol("connError");
+ const _deferred = Symbol("upgradeHttpDeferred");
class HttpConn {
#rid = 0;
#closed = false;
+ #remoteAddr;
+ #localAddr;
// This set holds resource ids of resources
// that were created during lifecycle of this request.
@@ -64,8 +71,10 @@
// as well.
managedResources = new Set();
- constructor(rid) {
+ constructor(rid, remoteAddr, localAddr) {
this.#rid = rid;
+ this.#remoteAddr = remoteAddr;
+ this.#localAddr = localAddr;
}
/** @returns {number} */
@@ -125,7 +134,13 @@
const signal = abortSignal.newSignal();
const request = fromInnerRequest(innerRequest, signal, "immutable");
- const respondWith = createRespondWith(this, streamRid);
+ const respondWith = createRespondWith(
+ this,
+ streamRid,
+ request,
+ this.#remoteAddr,
+ this.#localAddr,
+ );
return { request, respondWith };
}
@@ -159,7 +174,13 @@
return core.opAsync("op_http_read", streamRid, buf);
}
- function createRespondWith(httpConn, streamRid) {
+ function createRespondWith(
+ httpConn,
+ streamRid,
+ request,
+ remoteAddr,
+ localAddr,
+ ) {
return async function respondWith(resp) {
try {
if (ObjectPrototypeIsPrototypeOf(PromisePrototype, resp)) {
@@ -282,6 +303,20 @@
}
}
+ const deferred = request[_deferred];
+ if (deferred) {
+ const res = await core.opAsync("op_http_upgrade", streamRid);
+ let conn;
+ if (res.connType === "tcp") {
+ conn = new TcpConn(res.connRid, remoteAddr, localAddr);
+ } else if (res.connType === "tls") {
+ conn = new TlsConn(res.connRid, remoteAddr, localAddr);
+ } else {
+ throw new Error("unreachable");
+ }
+
+ deferred.resolve([conn, res.readBuf]);
+ }
const ws = resp[_ws];
if (ws) {
const wsRid = await core.opAsync(
@@ -425,8 +460,14 @@
return { response, socket };
}
+ function upgradeHttp(req) {
+ req[_deferred] = new Deferred();
+ return req[_deferred].promise;
+ }
+
window.__bootstrap.http = {
HttpConn,
upgradeWebSocket,
+ upgradeHttp,
};
})(this);
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 535f52a6c..48a58067e 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -289,9 +289,9 @@ impl HttpAcceptor {
}
/// A resource representing a single HTTP request/response stream.
-struct HttpStreamResource {
+pub struct HttpStreamResource {
conn: Rc<HttpConnResource>,
- rd: AsyncRefCell<HttpRequestReader>,
+ pub rd: AsyncRefCell<HttpRequestReader>,
wr: AsyncRefCell<HttpResponseWriter>,
accept_encoding: RefCell<Encoding>,
cancel_handle: CancelHandle,
@@ -324,7 +324,7 @@ impl Resource for HttpStreamResource {
}
/// The read half of an HTTP stream.
-enum HttpRequestReader {
+pub enum HttpRequestReader {
Headers(Request<Body>),
Body(Peekable<Body>),
Closed,
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
index 05e007176..74301292b 100644
--- a/ext/net/ops_tls.rs
+++ b/ext/net/ops_tls.rs
@@ -127,7 +127,7 @@ impl TlsStream {
Self::new(tcp, Connection::Server(tls))
}
- fn into_split(self) -> (ReadHalf, WriteHalf) {
+ pub fn into_split(self) -> (ReadHalf, WriteHalf) {
let shared = Shared::new(self);
let rd = ReadHalf {
shared: shared.clone(),
diff --git a/runtime/js/40_http.js b/runtime/js/40_http.js
index 9afca0f5b..4a8743438 100644
--- a/runtime/js/40_http.js
+++ b/runtime/js/40_http.js
@@ -7,7 +7,7 @@
function serveHttp(conn) {
const rid = core.opSync("op_http_start", conn.rid);
- return new HttpConn(rid);
+ return new HttpConn(rid, conn.remoteAddr, conn.localAddr);
}
window.__bootstrap.http.serveHttp = serveHttp;
diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js
index 5298d0a69..7ab0108a8 100644
--- a/runtime/js/90_deno_ns.js
+++ b/runtime/js/90_deno_ns.js
@@ -109,6 +109,7 @@
serveHttp: __bootstrap.http.serveHttp,
resolveDns: __bootstrap.net.resolveDns,
upgradeWebSocket: __bootstrap.http.upgradeWebSocket,
+ upgradeHttp: __bootstrap.http.upgradeHttp,
kill: __bootstrap.process.kill,
addSignalListener: __bootstrap.signals.addSignalListener,
removeSignalListener: __bootstrap.signals.removeSignalListener,
diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs
index 47ec31751..1e2bd66ec 100644
--- a/runtime/ops/http.rs
+++ b/runtime/ops/http.rs
@@ -1,18 +1,28 @@
+use std::cell::RefCell;
use std::rc::Rc;
use deno_core::error::bad_resource_id;
+use deno_core::error::custom_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::Extension;
use deno_core::OpState;
+use deno_core::RcRef;
use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
use deno_http::http_create_conn_resource;
+use deno_http::HttpRequestReader;
+use deno_http::HttpStreamResource;
use deno_net::io::TcpStreamResource;
+use deno_net::ops_tls::TlsStream;
use deno_net::ops_tls::TlsStreamResource;
+use hyper::upgrade::Parts;
+use serde::Serialize;
+use tokio::net::TcpStream;
pub fn init() -> Extension {
Extension::builder()
- .ops(vec![op_http_start::decl()])
+ .ops(vec![op_http_start::decl(), op_http_upgrade::decl()])
.build()
}
@@ -62,3 +72,71 @@ fn op_http_start(
Err(bad_resource_id())
}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct HttpUpgradeResult {
+ conn_rid: ResourceId,
+ conn_type: &'static str,
+ read_buf: ZeroCopyBuf,
+}
+
+#[op]
+async fn op_http_upgrade(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _: (),
+) -> Result<HttpUpgradeResult, AnyError> {
+ let stream = state
+ .borrow_mut()
+ .resource_table
+ .get::<HttpStreamResource>(rid)?;
+ let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
+
+ let request = match &mut *rd {
+ HttpRequestReader::Headers(request) => request,
+ _ => {
+ return Err(custom_error(
+ "Http",
+ "cannot upgrade because request body was used",
+ ))
+ }
+ };
+
+ let transport = hyper::upgrade::on(request).await?;
+ let transport = match transport.downcast::<TcpStream>() {
+ Ok(Parts {
+ io: tcp_stream,
+ read_buf,
+ ..
+ }) => {
+ return Ok(HttpUpgradeResult {
+ conn_type: "tcp",
+ conn_rid: state
+ .borrow_mut()
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split())),
+ read_buf: read_buf.to_vec().into(),
+ });
+ }
+ Err(transport) => transport,
+ };
+ match transport.downcast::<TlsStream>() {
+ Ok(Parts {
+ io: tls_stream,
+ read_buf,
+ ..
+ }) => Ok(HttpUpgradeResult {
+ conn_type: "tls",
+ conn_rid: state
+ .borrow_mut()
+ .resource_table
+ .add(TlsStreamResource::new(tls_stream.into_split())),
+ read_buf: read_buf.to_vec().into(),
+ }),
+ Err(_) => Err(custom_error(
+ "Http",
+ "encountered unsupported transport while upgrading",
+ )),
+ }
+}