diff options
author | Bert Belder <bertbelder@gmail.com> | 2022-03-16 14:54:18 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-16 14:54:18 +0100 |
commit | c5270abad7c42968dcbdbc8d9f09d7675fb843e9 (patch) | |
tree | 7e7ff9b63b7cd3420295d5e546dcd987ed746d7d | |
parent | 89a41d0a67c531d937126bbdb095ab1edb5eede2 (diff) |
feat(unstable): Add Deno.upgradeHttp API (#13618)
This commit adds "Deno.upgradeHttp" API, which
allows to "hijack" connection and switch protocols, to eg.
implement WebSocket required for Node compat.
Co-authored-by: crowlkats <crowlkats@toaxl.com>
Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
Co-authored-by: Bartek IwaĆczuk <biwanczuk@gmail.com>
-rw-r--r-- | cli/dts/lib.deno.unstable.d.ts | 14 | ||||
-rw-r--r-- | cli/tests/unit/http_test.ts | 44 | ||||
-rw-r--r-- | ext/http/01_http.js | 47 | ||||
-rw-r--r-- | ext/http/lib.rs | 6 | ||||
-rw-r--r-- | ext/net/ops_tls.rs | 2 | ||||
-rw-r--r-- | runtime/js/40_http.js | 2 | ||||
-rw-r--r-- | runtime/js/90_deno_ns.js | 1 | ||||
-rw-r--r-- | runtime/ops/http.rs | 80 |
8 files changed, 187 insertions, 9 deletions
diff --git a/cli/dts/lib.deno.unstable.d.ts b/cli/dts/lib.deno.unstable.d.ts index 9ab9b5761..a61d672f7 100644 --- a/cli/dts/lib.deno.unstable.d.ts +++ b/cli/dts/lib.deno.unstable.d.ts @@ -1333,6 +1333,20 @@ declare namespace Deno { * Make the timer of the given id not blocking the event loop from finishing */ export function unrefTimer(id: number): void; + + /** **UNSTABLE**: new API, yet to be vetter. + * + * Allows to "hijack" a connection that the request is associated with. + * Can be used to implement protocols that build on top of HTTP (eg. + * WebSockets). + * + * The returned promise returns underlying connection and first packet + * received. The promise shouldn't be awaited before responding to the + * `request`, otherwise event loop might deadlock. + */ + export function upgradeHttp( + request: Request, + ): Promise<[Deno.Conn, Uint8Array]>; } declare function fetch( diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 65ee55577..fd8d49947 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -5,6 +5,7 @@ import { BufWriter, } from "../../../test_util/std/io/buffer.ts"; import { TextProtoReader } from "../../../test_util/std/textproto/mod.ts"; +import { serve } from "../../../test_util/std/http/server.ts"; import { assert, assertEquals, @@ -1738,6 +1739,49 @@ Deno.test({ }, }); +Deno.test("upgradeHttp", async () => { + async function client() { + const tcpConn = await Deno.connect({ port: 4501 }); + await tcpConn.write( + new TextEncoder().encode( + "CONNECT server.example.com:80 HTTP/1.1\r\n\r\nbla bla bla\nbla bla\nbla\n", + ), + ); + setTimeout(async () => { + await tcpConn.write( + new TextEncoder().encode( + "bla bla bla\nbla bla\nbla\n", + ), + ); + tcpConn.close(); + }, 500); + } + + const abortController = new AbortController(); + const signal = abortController.signal; + + const server = serve((req) => { + const p = Deno.upgradeHttp(req); + + (async () => { + const [conn, firstPacket] = await p; + const buf = new Uint8Array(1024); + const firstPacketText = new TextDecoder().decode(firstPacket); + assertEquals(firstPacketText, "bla bla bla\nbla bla\nbla\n"); + const n = await conn.read(buf); + assert(n != null); + const secondPacketText = new TextDecoder().decode(buf.slice(0, n)); + assertEquals(secondPacketText, "bla bla bla\nbla bla\nbla\n"); + abortController.abort(); + conn.close(); + })(); + + return new Response(null, { status: 101 }); + }, { port: 4501, signal }); + + await Promise.all([server, client()]); +}); + function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(r); diff --git a/ext/http/01_http.js b/ext/http/01_http.js index eae742990..ad39ce257 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -30,10 +30,14 @@ _idleTimeoutTimeout, _serverHandleIdleTimeout, } = window.__bootstrap.webSocket; + const { TcpConn } = window.__bootstrap.net; + const { TlsConn } = window.__bootstrap.tls; + const { Deferred } = window.__bootstrap.streams; const { ArrayPrototypeIncludes, ArrayPrototypePush, ArrayPrototypeSome, + Error, ObjectPrototypeIsPrototypeOf, PromisePrototype, Set, @@ -53,10 +57,13 @@ } = window.__bootstrap.primordials; const connErrorSymbol = Symbol("connError"); + const _deferred = Symbol("upgradeHttpDeferred"); class HttpConn { #rid = 0; #closed = false; + #remoteAddr; + #localAddr; // This set holds resource ids of resources // that were created during lifecycle of this request. @@ -64,8 +71,10 @@ // as well. managedResources = new Set(); - constructor(rid) { + constructor(rid, remoteAddr, localAddr) { this.#rid = rid; + this.#remoteAddr = remoteAddr; + this.#localAddr = localAddr; } /** @returns {number} */ @@ -125,7 +134,13 @@ const signal = abortSignal.newSignal(); const request = fromInnerRequest(innerRequest, signal, "immutable"); - const respondWith = createRespondWith(this, streamRid); + const respondWith = createRespondWith( + this, + streamRid, + request, + this.#remoteAddr, + this.#localAddr, + ); return { request, respondWith }; } @@ -159,7 +174,13 @@ return core.opAsync("op_http_read", streamRid, buf); } - function createRespondWith(httpConn, streamRid) { + function createRespondWith( + httpConn, + streamRid, + request, + remoteAddr, + localAddr, + ) { return async function respondWith(resp) { try { if (ObjectPrototypeIsPrototypeOf(PromisePrototype, resp)) { @@ -282,6 +303,20 @@ } } + const deferred = request[_deferred]; + if (deferred) { + const res = await core.opAsync("op_http_upgrade", streamRid); + let conn; + if (res.connType === "tcp") { + conn = new TcpConn(res.connRid, remoteAddr, localAddr); + } else if (res.connType === "tls") { + conn = new TlsConn(res.connRid, remoteAddr, localAddr); + } else { + throw new Error("unreachable"); + } + + deferred.resolve([conn, res.readBuf]); + } const ws = resp[_ws]; if (ws) { const wsRid = await core.opAsync( @@ -425,8 +460,14 @@ return { response, socket }; } + function upgradeHttp(req) { + req[_deferred] = new Deferred(); + return req[_deferred].promise; + } + window.__bootstrap.http = { HttpConn, upgradeWebSocket, + upgradeHttp, }; })(this); diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 535f52a6c..48a58067e 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -289,9 +289,9 @@ impl HttpAcceptor { } /// A resource representing a single HTTP request/response stream. -struct HttpStreamResource { +pub struct HttpStreamResource { conn: Rc<HttpConnResource>, - rd: AsyncRefCell<HttpRequestReader>, + pub rd: AsyncRefCell<HttpRequestReader>, wr: AsyncRefCell<HttpResponseWriter>, accept_encoding: RefCell<Encoding>, cancel_handle: CancelHandle, @@ -324,7 +324,7 @@ impl Resource for HttpStreamResource { } /// The read half of an HTTP stream. -enum HttpRequestReader { +pub enum HttpRequestReader { Headers(Request<Body>), Body(Peekable<Body>), Closed, diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index 05e007176..74301292b 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -127,7 +127,7 @@ impl TlsStream { Self::new(tcp, Connection::Server(tls)) } - fn into_split(self) -> (ReadHalf, WriteHalf) { + pub fn into_split(self) -> (ReadHalf, WriteHalf) { let shared = Shared::new(self); let rd = ReadHalf { shared: shared.clone(), diff --git a/runtime/js/40_http.js b/runtime/js/40_http.js index 9afca0f5b..4a8743438 100644 --- a/runtime/js/40_http.js +++ b/runtime/js/40_http.js @@ -7,7 +7,7 @@ function serveHttp(conn) { const rid = core.opSync("op_http_start", conn.rid); - return new HttpConn(rid); + return new HttpConn(rid, conn.remoteAddr, conn.localAddr); } window.__bootstrap.http.serveHttp = serveHttp; diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index 5298d0a69..7ab0108a8 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -109,6 +109,7 @@ serveHttp: __bootstrap.http.serveHttp, resolveDns: __bootstrap.net.resolveDns, upgradeWebSocket: __bootstrap.http.upgradeWebSocket, + upgradeHttp: __bootstrap.http.upgradeHttp, kill: __bootstrap.process.kill, addSignalListener: __bootstrap.signals.addSignalListener, removeSignalListener: __bootstrap.signals.removeSignalListener, diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index 47ec31751..1e2bd66ec 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -1,18 +1,28 @@ +use std::cell::RefCell; use std::rc::Rc; use deno_core::error::bad_resource_id; +use deno_core::error::custom_error; use deno_core::error::AnyError; use deno_core::op; use deno_core::Extension; use deno_core::OpState; +use deno_core::RcRef; use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; use deno_http::http_create_conn_resource; +use deno_http::HttpRequestReader; +use deno_http::HttpStreamResource; use deno_net::io::TcpStreamResource; +use deno_net::ops_tls::TlsStream; use deno_net::ops_tls::TlsStreamResource; +use hyper::upgrade::Parts; +use serde::Serialize; +use tokio::net::TcpStream; pub fn init() -> Extension { Extension::builder() - .ops(vec![op_http_start::decl()]) + .ops(vec![op_http_start::decl(), op_http_upgrade::decl()]) .build() } @@ -62,3 +72,71 @@ fn op_http_start( Err(bad_resource_id()) } + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct HttpUpgradeResult { + conn_rid: ResourceId, + conn_type: &'static str, + read_buf: ZeroCopyBuf, +} + +#[op] +async fn op_http_upgrade( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<HttpUpgradeResult, AnyError> { + let stream = state + .borrow_mut() + .resource_table + .get::<HttpStreamResource>(rid)?; + let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; + + let request = match &mut *rd { + HttpRequestReader::Headers(request) => request, + _ => { + return Err(custom_error( + "Http", + "cannot upgrade because request body was used", + )) + } + }; + + let transport = hyper::upgrade::on(request).await?; + let transport = match transport.downcast::<TcpStream>() { + Ok(Parts { + io: tcp_stream, + read_buf, + .. + }) => { + return Ok(HttpUpgradeResult { + conn_type: "tcp", + conn_rid: state + .borrow_mut() + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())), + read_buf: read_buf.to_vec().into(), + }); + } + Err(transport) => transport, + }; + match transport.downcast::<TlsStream>() { + Ok(Parts { + io: tls_stream, + read_buf, + .. + }) => Ok(HttpUpgradeResult { + conn_type: "tls", + conn_rid: state + .borrow_mut() + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())), + read_buf: read_buf.to_vec().into(), + }), + Err(_) => Err(custom_error( + "Http", + "encountered unsupported transport while upgrading", + )), + } +} |