summaryrefslogtreecommitdiff
path: root/ext/http/lib.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-04-22 11:48:21 -0600
committerGitHub <noreply@github.com>2023-04-22 11:48:21 -0600
commitbdffcb409fd1e257db280ab73e07cc319711256c (patch)
tree9aca1c1e73f0249bba8b66781b79c358a7a00798 /ext/http/lib.rs
parentd137501a639cb315772866f6775fcd9f43e28f5b (diff)
feat(ext/http): Rework Deno.serve using hyper 1.0-rc3 (#18619)
This is a rewrite of the `Deno.serve` API to live on top of hyper 1.0-rc3. The code should be more maintainable long-term, and avoids some of the slower mpsc patterns that made the older code less efficient than it could have been. Missing features: - `upgradeHttp` and `upgradeHttpRaw` (`upgradeWebSocket` is available, however). - Automatic compression is unavailable on responses.
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r--ext/http/lib.rs84
1 files changed, 81 insertions, 3 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 43e3c130a..561b13885 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -34,6 +34,7 @@ use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::WriteOutcome;
use deno_core::ZeroCopyBuf;
+use deno_net::raw::NetworkStream;
use deno_websocket::ws_create_server_stream;
use flate2::write::GzEncoder;
use flate2::Compression;
@@ -76,7 +77,11 @@ use crate::reader_stream::ExternallyAbortableReaderStream;
use crate::reader_stream::ShutdownHandle;
pub mod compressible;
+mod http_next;
mod reader_stream;
+mod request_body;
+mod request_properties;
+mod response_body;
mod websocket_upgrade;
deno_core::extension!(
@@ -92,8 +97,25 @@ deno_core::extension!(
op_http_websocket_accept_header,
op_http_upgrade_early,
op_http_upgrade_websocket,
+ http_next::op_serve_http,
+ http_next::op_serve_http_on,
+ http_next::op_http_wait,
+ http_next::op_http_track,
+ http_next::op_set_response_header,
+ http_next::op_set_response_headers,
+ http_next::op_set_response_body_text,
+ http_next::op_set_promise_complete,
+ http_next::op_set_response_body_bytes,
+ http_next::op_set_response_body_resource,
+ http_next::op_set_response_body_stream,
+ http_next::op_get_request_header,
+ http_next::op_get_request_headers,
+ http_next::op_get_request_method_and_url,
+ http_next::op_read_request_body,
+ http_next::op_upgrade,
+ http_next::op_upgrade_raw,
],
- esm = ["01_http.js"],
+ esm = ["00_serve.js", "01_http.js"],
);
pub enum HttpSocketAddr {
@@ -1147,8 +1169,10 @@ async fn op_http_upgrade_websocket(
}
};
- let transport = hyper::upgrade::on(request).await?;
- let ws_rid = ws_create_server_stream(&state, transport).await?;
+ let (transport, bytes) =
+ extract_network_stream(hyper::upgrade::on(request).await?);
+ let ws_rid =
+ ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?;
Ok(ws_rid)
}
@@ -1166,6 +1190,16 @@ where
}
}
+impl<Fut> hyper1::rt::Executor<Fut> for LocalExecutor
+where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+{
+ fn execute(&self, fut: Fut) {
+ spawn_local(fut);
+ }
+}
+
fn http_error(message: &'static str) -> AnyError {
custom_error("Http", message)
}
@@ -1192,3 +1226,47 @@ fn filter_enotconn(
fn never() -> Pending<Never> {
pending()
}
+
+trait CanDowncastUpgrade: Sized {
+ fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
+ self,
+ ) -> Result<(T, Bytes), Self>;
+}
+
+impl CanDowncastUpgrade for hyper1::upgrade::Upgraded {
+ fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
+ self,
+ ) -> Result<(T, Bytes), Self> {
+ let hyper1::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
+ Ok((io, read_buf))
+ }
+}
+
+impl CanDowncastUpgrade for hyper::upgrade::Upgraded {
+ fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
+ self,
+ ) -> Result<(T, Bytes), Self> {
+ let hyper::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
+ Ok((io, read_buf))
+ }
+}
+
+fn extract_network_stream<U: CanDowncastUpgrade>(
+ upgraded: U,
+) -> (NetworkStream, Bytes) {
+ let upgraded = match upgraded.downcast::<tokio::net::TcpStream>() {
+ Ok((stream, bytes)) => return (NetworkStream::Tcp(stream), bytes),
+ Err(x) => x,
+ };
+ let upgraded = match upgraded.downcast::<deno_net::ops_tls::TlsStream>() {
+ Ok((stream, bytes)) => return (NetworkStream::Tls(stream), bytes),
+ Err(x) => x,
+ };
+ #[cfg(unix)]
+ let upgraded = match upgraded.downcast::<tokio::net::UnixStream>() {
+ Ok((stream, bytes)) => return (NetworkStream::Unix(stream), bytes),
+ Err(x) => x,
+ };
+ drop(upgraded);
+ unreachable!("unexpected stream type");
+}