From bdffcb409fd1e257db280ab73e07cc319711256c Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Sat, 22 Apr 2023 11:48:21 -0600 Subject: feat(ext/http): Rework Deno.serve using hyper 1.0-rc3 (#18619) This is a rewrite of the `Deno.serve` API to live on top of hyper 1.0-rc3. The code should be more maintainable long-term, and avoids some of the slower mpsc patterns that made the older code less efficient than it could have been. Missing features: - `upgradeHttp` and `upgradeHttpRaw` (`upgradeWebSocket` is available, however). - Automatic compression is unavailable on responses. --- ext/http/lib.rs | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 3 deletions(-) (limited to 'ext/http/lib.rs') diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 43e3c130a..561b13885 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -34,6 +34,7 @@ use deno_core::ResourceId; use deno_core::StringOrBuffer; use deno_core::WriteOutcome; use deno_core::ZeroCopyBuf; +use deno_net::raw::NetworkStream; use deno_websocket::ws_create_server_stream; use flate2::write::GzEncoder; use flate2::Compression; @@ -76,7 +77,11 @@ use crate::reader_stream::ExternallyAbortableReaderStream; use crate::reader_stream::ShutdownHandle; pub mod compressible; +mod http_next; mod reader_stream; +mod request_body; +mod request_properties; +mod response_body; mod websocket_upgrade; deno_core::extension!( @@ -92,8 +97,25 @@ deno_core::extension!( op_http_websocket_accept_header, op_http_upgrade_early, op_http_upgrade_websocket, + http_next::op_serve_http, + http_next::op_serve_http_on, + http_next::op_http_wait, + http_next::op_http_track, + http_next::op_set_response_header, + http_next::op_set_response_headers, + http_next::op_set_response_body_text, + http_next::op_set_promise_complete, + http_next::op_set_response_body_bytes, + http_next::op_set_response_body_resource, + http_next::op_set_response_body_stream, + http_next::op_get_request_header, + http_next::op_get_request_headers, + http_next::op_get_request_method_and_url, + http_next::op_read_request_body, + http_next::op_upgrade, + http_next::op_upgrade_raw, ], - esm = ["01_http.js"], + esm = ["00_serve.js", "01_http.js"], ); pub enum HttpSocketAddr { @@ -1147,8 +1169,10 @@ async fn op_http_upgrade_websocket( } }; - let transport = hyper::upgrade::on(request).await?; - let ws_rid = ws_create_server_stream(&state, transport).await?; + let (transport, bytes) = + extract_network_stream(hyper::upgrade::on(request).await?); + let ws_rid = + ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?; Ok(ws_rid) } @@ -1166,6 +1190,16 @@ where } } +impl hyper1::rt::Executor for LocalExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + spawn_local(fut); + } +} + fn http_error(message: &'static str) -> AnyError { custom_error("Http", message) } @@ -1192,3 +1226,47 @@ fn filter_enotconn( fn never() -> Pending { pending() } + +trait CanDowncastUpgrade: Sized { + fn downcast( + self, + ) -> Result<(T, Bytes), Self>; +} + +impl CanDowncastUpgrade for hyper1::upgrade::Upgraded { + fn downcast( + self, + ) -> Result<(T, Bytes), Self> { + let hyper1::upgrade::Parts { io, read_buf, .. } = self.downcast()?; + Ok((io, read_buf)) + } +} + +impl CanDowncastUpgrade for hyper::upgrade::Upgraded { + fn downcast( + self, + ) -> Result<(T, Bytes), Self> { + let hyper::upgrade::Parts { io, read_buf, .. } = self.downcast()?; + Ok((io, read_buf)) + } +} + +fn extract_network_stream( + upgraded: U, +) -> (NetworkStream, Bytes) { + let upgraded = match upgraded.downcast::() { + Ok((stream, bytes)) => return (NetworkStream::Tcp(stream), bytes), + Err(x) => x, + }; + let upgraded = match upgraded.downcast::() { + Ok((stream, bytes)) => return (NetworkStream::Tls(stream), bytes), + Err(x) => x, + }; + #[cfg(unix)] + let upgraded = match upgraded.downcast::() { + Ok((stream, bytes)) => return (NetworkStream::Unix(stream), bytes), + Err(x) => x, + }; + drop(upgraded); + unreachable!("unexpected stream type"); +} -- cgit v1.2.3