diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-04-22 11:48:21 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-22 11:48:21 -0600 |
commit | bdffcb409fd1e257db280ab73e07cc319711256c (patch) | |
tree | 9aca1c1e73f0249bba8b66781b79c358a7a00798 /ext/net | |
parent | d137501a639cb315772866f6775fcd9f43e28f5b (diff) |
feat(ext/http): Rework Deno.serve using hyper 1.0-rc3 (#18619)
This is a rewrite of the `Deno.serve` API to live on top of hyper
1.0-rc3. The code should be more maintainable long-term, and avoids some
of the slower mpsc patterns that made the older code less efficient than
it could have been.
Missing features:
- `upgradeHttp` and `upgradeHttpRaw` (`upgradeWebSocket` is available,
however).
- Automatic compression is unavailable on responses.
Diffstat (limited to 'ext/net')
-rw-r--r-- | ext/net/Cargo.toml | 1 | ||||
-rw-r--r-- | ext/net/lib.rs | 1 | ||||
-rw-r--r-- | ext/net/ops_tls.rs | 29 | ||||
-rw-r--r-- | ext/net/ops_unix.rs | 4 | ||||
-rw-r--r-- | ext/net/raw.rs | 304 |
5 files changed, 335 insertions, 4 deletions
diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml index a7a1acff6..6bab80cc7 100644 --- a/ext/net/Cargo.toml +++ b/ext/net/Cargo.toml @@ -17,6 +17,7 @@ path = "lib.rs" deno_core.workspace = true deno_tls.workspace = true log.workspace = true +pin-project.workspace = true serde.workspace = true socket2.workspace = true tokio.workspace = true diff --git a/ext/net/lib.rs b/ext/net/lib.rs index f812bf60b..ff67186b0 100644 --- a/ext/net/lib.rs +++ b/ext/net/lib.rs @@ -5,6 +5,7 @@ pub mod ops; pub mod ops_tls; #[cfg(unix)] pub mod ops_unix; +pub mod raw; pub mod resolve_addr; use deno_core::error::AnyError; diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index c0cfb8674..8a7757066 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -61,6 +61,7 @@ use std::fs::File; use std::io; use std::io::BufReader; use std::io::ErrorKind; +use std::net::SocketAddr; use std::path::Path; use std::pin::Pin; use std::rc::Rc; @@ -115,6 +116,13 @@ impl TlsStream { Self::new(tcp, Connection::Client(tls)) } + pub fn new_client_side_from( + tcp: TcpStream, + connection: ClientConnection, + ) -> Self { + Self::new(tcp, Connection::Client(connection)) + } + pub fn new_server_side( tcp: TcpStream, tls_config: Arc<ServerConfig>, @@ -123,6 +131,13 @@ impl TlsStream { Self::new(tcp, Connection::Server(tls)) } + pub fn new_server_side_from( + tcp: TcpStream, + connection: ServerConnection, + ) -> Self { + Self::new(tcp, Connection::Server(connection)) + } + pub fn into_split(self) -> (ReadHalf, WriteHalf) { let shared = Shared::new(self); let rd = ReadHalf { @@ -132,6 +147,16 @@ impl TlsStream { (rd, wr) } + /// Convenience method to match [`TcpStream`]. + pub fn peer_addr(&self) -> Result<SocketAddr, io::Error> { + self.0.as_ref().unwrap().tcp.peer_addr() + } + + /// Convenience method to match [`TcpStream`]. + pub fn local_addr(&self) -> Result<SocketAddr, io::Error> { + self.0.as_ref().unwrap().tcp.local_addr() + } + /// Tokio-rustls compatibility: returns a reference to the underlying TCP /// stream, and a reference to the Rustls `Connection` object. pub fn get_ref(&self) -> (&TcpStream, &Connection) { @@ -954,8 +979,8 @@ fn load_private_keys_from_file( } pub struct TlsListenerResource { - tcp_listener: AsyncRefCell<TcpListener>, - tls_config: Arc<ServerConfig>, + pub(crate) tcp_listener: AsyncRefCell<TcpListener>, + pub(crate) tls_config: Arc<ServerConfig>, cancel_handle: CancelHandle, } diff --git a/ext/net/ops_unix.rs b/ext/net/ops_unix.rs index 1161d2759..bed923f8b 100644 --- a/ext/net/ops_unix.rs +++ b/ext/net/ops_unix.rs @@ -32,8 +32,8 @@ pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> { }) } -struct UnixListenerResource { - listener: AsyncRefCell<UnixListener>, +pub(crate) struct UnixListenerResource { + pub listener: AsyncRefCell<UnixListener>, cancel: CancelHandle, } diff --git a/ext/net/raw.rs b/ext/net/raw.rs new file mode 100644 index 000000000..74cc10d63 --- /dev/null +++ b/ext/net/raw.rs @@ -0,0 +1,304 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use crate::io::TcpStreamResource; +#[cfg(unix)] +use crate::io::UnixStreamResource; +use crate::ops::TcpListenerResource; +use crate::ops_tls::TlsListenerResource; +use crate::ops_tls::TlsStream; +use crate::ops_tls::TlsStreamResource; +#[cfg(unix)] +use crate::ops_unix::UnixListenerResource; +use deno_core::error::bad_resource; +use deno_core::error::bad_resource_id; +use deno_core::error::AnyError; +use deno_core::ResourceId; +use deno_core::ResourceTable; +use deno_tls::rustls::ServerConfig; +use pin_project::pin_project; +use std::rc::Rc; +use std::sync::Arc; +use tokio::net::TcpStream; +#[cfg(unix)] +use tokio::net::UnixStream; + +/// A raw stream of one of the types handled by this extension. +#[pin_project(project = NetworkStreamProject)] +pub enum NetworkStream { + Tcp(#[pin] TcpStream), + Tls(#[pin] TlsStream), + #[cfg(unix)] + Unix(#[pin] UnixStream), +} + +/// A raw stream of one of the types handled by this extension. +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum NetworkStreamType { + Tcp, + Tls, + #[cfg(unix)] + Unix, +} + +impl NetworkStream { + pub fn local_address(&self) -> Result<NetworkStreamAddress, std::io::Error> { + match self { + Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)), + Self::Tls(tls) => Ok(NetworkStreamAddress::Ip(tls.local_addr()?)), + #[cfg(unix)] + Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.local_addr()?)), + } + } + + pub fn peer_address(&self) -> Result<NetworkStreamAddress, std::io::Error> { + match self { + Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.peer_addr()?)), + Self::Tls(tls) => Ok(NetworkStreamAddress::Ip(tls.peer_addr()?)), + #[cfg(unix)] + Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.peer_addr()?)), + } + } + + pub fn stream(&self) -> NetworkStreamType { + match self { + Self::Tcp(_) => NetworkStreamType::Tcp, + Self::Tls(_) => NetworkStreamType::Tls, + #[cfg(unix)] + Self::Unix(_) => NetworkStreamType::Unix, + } + } +} + +impl tokio::io::AsyncRead for NetworkStream { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + match self.project() { + NetworkStreamProject::Tcp(s) => s.poll_read(cx, buf), + NetworkStreamProject::Tls(s) => s.poll_read(cx, buf), + #[cfg(unix)] + NetworkStreamProject::Unix(s) => s.poll_read(cx, buf), + } + } +} + +impl tokio::io::AsyncWrite for NetworkStream { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<Result<usize, std::io::Error>> { + match self.project() { + NetworkStreamProject::Tcp(s) => s.poll_write(cx, buf), + NetworkStreamProject::Tls(s) => s.poll_write(cx, buf), + #[cfg(unix)] + NetworkStreamProject::Unix(s) => s.poll_write(cx, buf), + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), std::io::Error>> { + match self.project() { + NetworkStreamProject::Tcp(s) => s.poll_flush(cx), + NetworkStreamProject::Tls(s) => s.poll_flush(cx), + #[cfg(unix)] + NetworkStreamProject::Unix(s) => s.poll_flush(cx), + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), std::io::Error>> { + match self.project() { + NetworkStreamProject::Tcp(s) => s.poll_shutdown(cx), + NetworkStreamProject::Tls(s) => s.poll_shutdown(cx), + #[cfg(unix)] + NetworkStreamProject::Unix(s) => s.poll_shutdown(cx), + } + } + + fn is_write_vectored(&self) -> bool { + match self { + Self::Tcp(s) => s.is_write_vectored(), + Self::Tls(s) => s.is_write_vectored(), + #[cfg(unix)] + Self::Unix(s) => s.is_write_vectored(), + } + } + + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll<Result<usize, std::io::Error>> { + match self.project() { + NetworkStreamProject::Tcp(s) => s.poll_write_vectored(cx, bufs), + NetworkStreamProject::Tls(s) => s.poll_write_vectored(cx, bufs), + #[cfg(unix)] + NetworkStreamProject::Unix(s) => s.poll_write_vectored(cx, bufs), + } + } +} + +/// A raw stream listener of one of the types handled by this extension. +pub enum NetworkStreamListener { + Tcp(tokio::net::TcpListener), + Tls(tokio::net::TcpListener, Arc<ServerConfig>), + #[cfg(unix)] + Unix(tokio::net::UnixListener), +} + +pub enum NetworkStreamAddress { + Ip(std::net::SocketAddr), + #[cfg(unix)] + Unix(tokio::net::unix::SocketAddr), +} + +impl NetworkStreamListener { + /// Accepts a connection on this listener. + pub async fn accept(&self) -> Result<NetworkStream, AnyError> { + Ok(match self { + Self::Tcp(tcp) => { + let (stream, _addr) = tcp.accept().await?; + NetworkStream::Tcp(stream) + } + Self::Tls(tcp, config) => { + let (stream, _addr) = tcp.accept().await?; + NetworkStream::Tls(TlsStream::new_server_side(stream, config.clone())) + } + #[cfg(unix)] + Self::Unix(unix) => { + let (stream, _addr) = unix.accept().await?; + NetworkStream::Unix(stream) + } + }) + } + + pub fn listen_address(&self) -> Result<NetworkStreamAddress, std::io::Error> { + match self { + Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)), + Self::Tls(tcp, _) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)), + #[cfg(unix)] + Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.local_addr()?)), + } + } + + pub fn stream(&self) -> NetworkStreamType { + match self { + Self::Tcp(..) => NetworkStreamType::Tcp, + Self::Tls(..) => NetworkStreamType::Tls, + #[cfg(unix)] + Self::Unix(..) => NetworkStreamType::Unix, + } + } +} + +/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server). +/// This method will extract a stream from the resource table and return it, unwrapped. +pub fn take_network_stream_resource( + resource_table: &mut ResourceTable, + stream_rid: ResourceId, +) -> Result<NetworkStream, AnyError> { + // The stream we're attempting to unwrap may be in use somewhere else. If that's the case, we cannot proceed + // with the process of unwrapping this connection, so we just return a bad resource error. + // See also: https://github.com/denoland/deno/pull/16242 + + if let Ok(resource_rc) = resource_table.take::<TcpStreamResource>(stream_rid) + { + // This TCP connection might be used somewhere else. + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("TCP stream is currently in use"))?; + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; + return Ok(NetworkStream::Tcp(tcp_stream)); + } + + if let Ok(resource_rc) = resource_table.take::<TlsStreamResource>(stream_rid) + { + // This TLS connection might be used somewhere else. + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("TLS stream is currently in use"))?; + let (read_half, write_half) = resource.into_inner(); + let tls_stream = read_half.reunite(write_half); + return Ok(NetworkStream::Tls(tls_stream)); + } + + #[cfg(unix)] + if let Ok(resource_rc) = resource_table.take::<UnixStreamResource>(stream_rid) + { + // This UNIX socket might be used somewhere else. + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("UNIX stream is currently in use"))?; + let (read_half, write_half) = resource.into_inner(); + let unix_stream = read_half.reunite(write_half)?; + return Ok(NetworkStream::Unix(unix_stream)); + } + + Err(bad_resource_id()) +} + +/// Inserts a raw stream (back?) into the resource table and returns a resource ID. This can then be used to create raw connection +/// objects on the JS side. +pub fn put_network_stream_resource( + resource_table: &mut ResourceTable, + stream: NetworkStream, +) -> Result<ResourceId, AnyError> { + let res = match stream { + NetworkStream::Tcp(conn) => { + let (r, w) = conn.into_split(); + resource_table.add(TcpStreamResource::new((r, w))) + } + NetworkStream::Tls(conn) => { + let (r, w) = conn.into_split(); + resource_table.add(TlsStreamResource::new((r, w))) + } + #[cfg(unix)] + NetworkStream::Unix(conn) => { + let (r, w) = conn.into_split(); + resource_table.add(UnixStreamResource::new((r, w))) + } + }; + + Ok(res) +} + +/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server). +/// This method will extract a stream from the resource table and return it, unwrapped. +pub fn take_network_stream_listener_resource( + resource_table: &mut ResourceTable, + listener_rid: ResourceId, +) -> Result<NetworkStreamListener, AnyError> { + if let Ok(resource_rc) = + resource_table.take::<TcpListenerResource>(listener_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("TCP socket listener is currently in use"))?; + return Ok(NetworkStreamListener::Tcp(resource.listener.into_inner())); + } + + if let Ok(resource_rc) = + resource_table.take::<TlsListenerResource>(listener_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("TLS socket listener is currently in use"))?; + return Ok(NetworkStreamListener::Tls( + resource.tcp_listener.into_inner(), + resource.tls_config, + )); + } + + #[cfg(unix)] + if let Ok(resource_rc) = + resource_table.take::<UnixListenerResource>(listener_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("UNIX socket listener is currently in use"))?; + return Ok(NetworkStreamListener::Unix(resource.listener.into_inner())); + } + + Err(bad_resource_id()) +} |