summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--ext/http/Cargo.toml1
-rw-r--r--ext/http/http_next.rs42
-rw-r--r--ext/http/request_properties.rs144
-rw-r--r--ext/net/raw.rs2
5 files changed, 118 insertions, 72 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 4b46e4b65..6dcecdd03 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1026,6 +1026,7 @@ name = "deno_http"
version = "0.99.0"
dependencies = [
"async-compression",
+ "async-trait",
"base64 0.13.1",
"bencher",
"brotli",
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index 8bf1d42e2..e555d742e 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -22,6 +22,7 @@ harness = false
[dependencies]
async-compression = { version = "0.3.12", features = ["tokio", "brotli", "gzip"] }
+async-trait.workspace = true
base64.workspace = true
brotli = "3.3.4"
bytes.workspace = true
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 8b2f91be0..eaa19a89d 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -33,7 +33,6 @@ use deno_core::ZeroCopyBuf;
use deno_net::ops_tls::TlsStream;
use deno_net::raw::put_network_stream_resource;
use deno_net::raw::NetworkStream;
-use deno_net::raw::NetworkStreamAddress;
use fly_accept_encoding::Encoding;
use http::header::ACCEPT_ENCODING;
use http::header::CACHE_CONTROL;
@@ -61,9 +60,6 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
use std::io;
-use std::net::Ipv4Addr;
-use std::net::SocketAddr;
-use std::net::SocketAddrV4;
use std::pin::Pin;
use std::rc::Rc;
@@ -825,7 +821,7 @@ fn serve_http(
}
fn serve_http_on<HTTP>(
- network_stream: NetworkStream,
+ connection: HTTP::Connection,
listen_properties: &HttpListenProperties,
cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>,
@@ -833,15 +829,10 @@ fn serve_http_on<HTTP>(
where
HTTP: HttpPropertyExtractor,
{
- // We always want some sort of peer address. If we can't get one, just make up one.
- let peer_address = network_stream.peer_address().unwrap_or_else(|_| {
- NetworkStreamAddress::Ip(SocketAddr::V4(SocketAddrV4::new(
- Ipv4Addr::new(0, 0, 0, 0),
- 0,
- )))
- });
let connection_properties: HttpConnectionProperties =
- HTTP::connection_properties(listen_properties, &peer_address);
+ HTTP::connection_properties(listen_properties, &connection);
+
+ let network_stream = HTTP::to_network_stream_from_connection(connection);
match network_stream {
NetworkStream::Tcp(conn) => {
@@ -895,14 +886,10 @@ pub fn op_http_serve<HTTP>(
where
HTTP: HttpPropertyExtractor,
{
- let listener = HTTP::get_network_stream_listener_for_rid(
- &mut state.borrow_mut(),
- listener_rid,
- )?;
+ let listener =
+ HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?;
- let local_address = listener.listen_address()?;
- let listen_properties =
- HTTP::listen_properties(listener.stream(), &local_address);
+ let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
@@ -915,8 +902,7 @@ where
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn(async move {
loop {
- let conn = listener
- .accept()
+ let conn = HTTP::accept_connection_from_listener(&listener)
.try_or_cancel(cancel_clone.clone())
.await?;
serve_http_on::<HTTP>(
@@ -945,17 +931,15 @@ where
#[op(v8)]
pub fn op_http_serve_on<HTTP>(
state: Rc<RefCell<OpState>>,
- conn: ResourceId,
+ connection_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError>
where
HTTP: HttpPropertyExtractor,
{
- let network_stream: NetworkStream =
- HTTP::get_network_stream_for_rid(&mut state.borrow_mut(), conn)?;
+ let connection =
+ HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?;
- let local_address = network_stream.local_address()?;
- let listen_properties =
- HTTP::listen_properties(network_stream.stream(), &local_address);
+ let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
@@ -966,7 +950,7 @@ where
let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
serve_http_on::<HTTP>(
- network_stream,
+ connection,
&listen_properties,
resource.cancel_handle(),
tx,
diff --git a/ext/http/request_properties.rs b/ext/http/request_properties.rs
index 9c0c0e815..905139673 100644
--- a/ext/http/request_properties.rs
+++ b/ext/http/request_properties.rs
@@ -1,10 +1,10 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_core::error::AnyError;
use deno_core::OpState;
use deno_core::ResourceId;
-use deno_net::raw::NetworkStream;
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_net::raw::take_network_stream_listener_resource;
use deno_net::raw::take_network_stream_resource;
+use deno_net::raw::NetworkStream;
use deno_net::raw::NetworkStreamAddress;
use deno_net::raw::NetworkStreamListener;
use deno_net::raw::NetworkStreamType;
@@ -12,23 +12,26 @@ use hyper::HeaderMap;
use hyper::Uri;
use hyper1::header::HOST;
use std::borrow::Cow;
+use std::net::Ipv4Addr;
+use std::net::SocketAddr;
+use std::net::SocketAddrV4;
use std::rc::Rc;
// TODO(mmastrac): I don't like that we have to clone this, but it's one-time setup
#[derive(Clone)]
pub struct HttpListenProperties {
- pub stream_type: NetworkStreamType,
pub scheme: &'static str,
pub fallback_host: String,
pub local_port: Option<u16>,
+ pub stream_type: NetworkStreamType,
}
#[derive(Clone)]
pub struct HttpConnectionProperties {
- pub stream_type: NetworkStreamType,
pub peer_address: Rc<str>,
pub peer_port: Option<u16>,
pub local_port: Option<u16>,
+ pub stream_type: NetworkStreamType,
}
pub struct HttpRequestProperties {
@@ -37,31 +40,49 @@ pub struct HttpRequestProperties {
/// Pluggable trait to determine listen, connection and request properties
/// for embedders that wish to provide alternative routes for incoming HTTP.
+#[async_trait::async_trait(?Send)]
pub trait HttpPropertyExtractor {
- /// Given a listener [`ResourceId`], returns the [`NetworkStreamListener`].
- fn get_network_stream_listener_for_rid(
+ type Listener: 'static;
+ type Connection;
+
+ /// Given a listener [`ResourceId`], returns the [`HttpPropertyExtractor::Listener`].
+ fn get_listener_for_rid(
state: &mut OpState,
listener_rid: ResourceId,
- ) -> Result<NetworkStreamListener, AnyError>;
+ ) -> Result<Self::Listener, AnyError>;
- /// Given a connection [`ResourceId`], returns the [`NetworkStream`].
- fn get_network_stream_for_rid(
+ /// Given a connection [`ResourceId`], returns the [`HttpPropertyExtractor::Connection`].
+ fn get_connection_for_rid(
state: &mut OpState,
- rid: ResourceId,
- ) -> Result<NetworkStream, AnyError>;
+ connection_rid: ResourceId,
+ ) -> Result<Self::Connection, AnyError>;
/// Determines the listener properties.
- fn listen_properties(
- stream_type: NetworkStreamType,
- local_address: &NetworkStreamAddress,
- ) -> HttpListenProperties;
+ fn listen_properties_from_listener(
+ listener: &Self::Listener,
+ ) -> Result<HttpListenProperties, std::io::Error>;
+
+ /// Determines the listener properties given a [`HttpPropertyExtractor::Connection`].
+ fn listen_properties_from_connection(
+ connection: &Self::Connection,
+ ) -> Result<HttpListenProperties, std::io::Error>;
+
+ /// Accept a new [`HttpPropertyExtractor::Connection`] from the given listener [`HttpPropertyExtractor::Listener`].
+ async fn accept_connection_from_listener(
+ listener: &Self::Listener,
+ ) -> Result<Self::Connection, AnyError>;
/// Determines the connection properties.
fn connection_properties(
listen_properties: &HttpListenProperties,
- peer_address: &NetworkStreamAddress,
+ connection: &Self::Connection,
) -> HttpConnectionProperties;
+ /// Turn a given [`HttpPropertyExtractor::Connection`] into a [`NetworkStream`].
+ fn to_network_stream_from_connection(
+ connection: Self::Connection,
+ ) -> NetworkStream;
+
/// Determines the request properties.
fn request_properties(
connection_properties: &HttpConnectionProperties,
@@ -72,15 +93,13 @@ pub trait HttpPropertyExtractor {
pub struct DefaultHttpPropertyExtractor {}
+#[async_trait::async_trait(?Send)]
impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
- fn get_network_stream_for_rid(
- state: &mut OpState,
- rid: ResourceId,
- ) -> Result<NetworkStream, AnyError> {
- take_network_stream_resource(&mut state.resource_table, rid)
- }
+ type Listener = NetworkStreamListener;
- fn get_network_stream_listener_for_rid(
+ type Connection = NetworkStream;
+
+ fn get_listener_for_rid(
state: &mut OpState,
listener_rid: ResourceId,
) -> Result<NetworkStreamListener, AnyError> {
@@ -90,30 +109,52 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
)
}
- fn listen_properties(
- stream_type: NetworkStreamType,
- local_address: &NetworkStreamAddress,
- ) -> HttpListenProperties {
- let scheme = req_scheme_from_stream_type(stream_type);
- let fallback_host = req_host_from_addr(stream_type, local_address);
- let local_port: Option<u16> = match local_address {
- NetworkStreamAddress::Ip(ip) => Some(ip.port()),
- #[cfg(unix)]
- NetworkStreamAddress::Unix(_) => None,
- };
+ fn get_connection_for_rid(
+ state: &mut OpState,
+ stream_rid: ResourceId,
+ ) -> Result<NetworkStream, AnyError> {
+ take_network_stream_resource(&mut state.resource_table, stream_rid)
+ }
- HttpListenProperties {
- scheme,
- fallback_host,
- local_port,
- stream_type,
- }
+ async fn accept_connection_from_listener(
+ listener: &NetworkStreamListener,
+ ) -> Result<NetworkStream, AnyError> {
+ listener.accept().await.map_err(Into::into)
+ }
+
+ fn listen_properties_from_listener(
+ listener: &NetworkStreamListener,
+ ) -> Result<HttpListenProperties, std::io::Error> {
+ let stream_type = listener.stream();
+ let local_address = listener.listen_address()?;
+ listener_properties(stream_type, local_address)
+ }
+
+ fn listen_properties_from_connection(
+ connection: &Self::Connection,
+ ) -> Result<HttpListenProperties, std::io::Error> {
+ let stream_type = connection.stream();
+ let local_address = connection.local_address()?;
+ listener_properties(stream_type, local_address)
+ }
+
+ fn to_network_stream_from_connection(
+ connection: Self::Connection,
+ ) -> NetworkStream {
+ connection
}
fn connection_properties(
listen_properties: &HttpListenProperties,
- peer_address: &NetworkStreamAddress,
+ connection: &NetworkStream,
) -> HttpConnectionProperties {
+ // We always want some sort of peer address. If we can't get one, just make up one.
+ let peer_address = connection.peer_address().unwrap_or_else(|_| {
+ NetworkStreamAddress::Ip(SocketAddr::V4(SocketAddrV4::new(
+ Ipv4Addr::new(0, 0, 0, 0),
+ 0,
+ )))
+ });
let peer_port: Option<u16> = match peer_address {
NetworkStreamAddress::Ip(ip) => Some(ip.port()),
#[cfg(unix)]
@@ -128,10 +169,10 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
let stream_type = listen_properties.stream_type;
HttpConnectionProperties {
- stream_type,
peer_address,
peer_port,
local_port,
+ stream_type,
}
}
@@ -152,6 +193,25 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
}
}
+fn listener_properties(
+ stream_type: NetworkStreamType,
+ local_address: NetworkStreamAddress,
+) -> Result<HttpListenProperties, std::io::Error> {
+ let scheme = req_scheme_from_stream_type(stream_type);
+ let fallback_host = req_host_from_addr(stream_type, &local_address);
+ let local_port: Option<u16> = match local_address {
+ NetworkStreamAddress::Ip(ip) => Some(ip.port()),
+ #[cfg(unix)]
+ NetworkStreamAddress::Unix(_) => None,
+ };
+ Ok(HttpListenProperties {
+ scheme,
+ fallback_host,
+ local_port,
+ stream_type,
+ })
+}
+
/// Compute the fallback address from the [`NetworkStreamListenAddress`]. If the request has no authority/host in
/// its URI, and there is no [`HeaderName::HOST`] header, we fall back to this.
fn req_host_from_addr(
diff --git a/ext/net/raw.rs b/ext/net/raw.rs
index 3b50af41e..3f230a08b 100644
--- a/ext/net/raw.rs
+++ b/ext/net/raw.rs
@@ -179,7 +179,7 @@ pub enum NetworkStreamAddress {
impl NetworkStreamListener {
/// Accepts a connection on this listener.
- pub async fn accept(&self) -> Result<NetworkStream, AnyError> {
+ pub async fn accept(&self) -> Result<NetworkStream, std::io::Error> {
Ok(match self {
Self::Tcp(tcp) => {
let (stream, _addr) = tcp.accept().await?;