summaryrefslogtreecommitdiff
path: root/ext/net
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-04-22 11:48:21 -0600
committerGitHub <noreply@github.com>2023-04-22 11:48:21 -0600
commitbdffcb409fd1e257db280ab73e07cc319711256c (patch)
tree9aca1c1e73f0249bba8b66781b79c358a7a00798 /ext/net
parentd137501a639cb315772866f6775fcd9f43e28f5b (diff)
feat(ext/http): Rework Deno.serve using hyper 1.0-rc3 (#18619)
This is a rewrite of the `Deno.serve` API to live on top of hyper 1.0-rc3. The code should be more maintainable long-term, and avoids some of the slower mpsc patterns that made the older code less efficient than it could have been. Missing features: - `upgradeHttp` and `upgradeHttpRaw` (`upgradeWebSocket` is available, however). - Automatic compression is unavailable on responses.
Diffstat (limited to 'ext/net')
-rw-r--r--ext/net/Cargo.toml1
-rw-r--r--ext/net/lib.rs1
-rw-r--r--ext/net/ops_tls.rs29
-rw-r--r--ext/net/ops_unix.rs4
-rw-r--r--ext/net/raw.rs304
5 files changed, 335 insertions, 4 deletions
diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml
index a7a1acff6..6bab80cc7 100644
--- a/ext/net/Cargo.toml
+++ b/ext/net/Cargo.toml
@@ -17,6 +17,7 @@ path = "lib.rs"
deno_core.workspace = true
deno_tls.workspace = true
log.workspace = true
+pin-project.workspace = true
serde.workspace = true
socket2.workspace = true
tokio.workspace = true
diff --git a/ext/net/lib.rs b/ext/net/lib.rs
index f812bf60b..ff67186b0 100644
--- a/ext/net/lib.rs
+++ b/ext/net/lib.rs
@@ -5,6 +5,7 @@ pub mod ops;
pub mod ops_tls;
#[cfg(unix)]
pub mod ops_unix;
+pub mod raw;
pub mod resolve_addr;
use deno_core::error::AnyError;
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
index c0cfb8674..8a7757066 100644
--- a/ext/net/ops_tls.rs
+++ b/ext/net/ops_tls.rs
@@ -61,6 +61,7 @@ use std::fs::File;
use std::io;
use std::io::BufReader;
use std::io::ErrorKind;
+use std::net::SocketAddr;
use std::path::Path;
use std::pin::Pin;
use std::rc::Rc;
@@ -115,6 +116,13 @@ impl TlsStream {
Self::new(tcp, Connection::Client(tls))
}
+ pub fn new_client_side_from(
+ tcp: TcpStream,
+ connection: ClientConnection,
+ ) -> Self {
+ Self::new(tcp, Connection::Client(connection))
+ }
+
pub fn new_server_side(
tcp: TcpStream,
tls_config: Arc<ServerConfig>,
@@ -123,6 +131,13 @@ impl TlsStream {
Self::new(tcp, Connection::Server(tls))
}
+ pub fn new_server_side_from(
+ tcp: TcpStream,
+ connection: ServerConnection,
+ ) -> Self {
+ Self::new(tcp, Connection::Server(connection))
+ }
+
pub fn into_split(self) -> (ReadHalf, WriteHalf) {
let shared = Shared::new(self);
let rd = ReadHalf {
@@ -132,6 +147,16 @@ impl TlsStream {
(rd, wr)
}
+ /// Convenience method to match [`TcpStream`].
+ pub fn peer_addr(&self) -> Result<SocketAddr, io::Error> {
+ self.0.as_ref().unwrap().tcp.peer_addr()
+ }
+
+ /// Convenience method to match [`TcpStream`].
+ pub fn local_addr(&self) -> Result<SocketAddr, io::Error> {
+ self.0.as_ref().unwrap().tcp.local_addr()
+ }
+
/// Tokio-rustls compatibility: returns a reference to the underlying TCP
/// stream, and a reference to the Rustls `Connection` object.
pub fn get_ref(&self) -> (&TcpStream, &Connection) {
@@ -954,8 +979,8 @@ fn load_private_keys_from_file(
}
pub struct TlsListenerResource {
- tcp_listener: AsyncRefCell<TcpListener>,
- tls_config: Arc<ServerConfig>,
+ pub(crate) tcp_listener: AsyncRefCell<TcpListener>,
+ pub(crate) tls_config: Arc<ServerConfig>,
cancel_handle: CancelHandle,
}
diff --git a/ext/net/ops_unix.rs b/ext/net/ops_unix.rs
index 1161d2759..bed923f8b 100644
--- a/ext/net/ops_unix.rs
+++ b/ext/net/ops_unix.rs
@@ -32,8 +32,8 @@ pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
})
}
-struct UnixListenerResource {
- listener: AsyncRefCell<UnixListener>,
+pub(crate) struct UnixListenerResource {
+ pub listener: AsyncRefCell<UnixListener>,
cancel: CancelHandle,
}
diff --git a/ext/net/raw.rs b/ext/net/raw.rs
new file mode 100644
index 000000000..74cc10d63
--- /dev/null
+++ b/ext/net/raw.rs
@@ -0,0 +1,304 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use crate::io::TcpStreamResource;
+#[cfg(unix)]
+use crate::io::UnixStreamResource;
+use crate::ops::TcpListenerResource;
+use crate::ops_tls::TlsListenerResource;
+use crate::ops_tls::TlsStream;
+use crate::ops_tls::TlsStreamResource;
+#[cfg(unix)]
+use crate::ops_unix::UnixListenerResource;
+use deno_core::error::bad_resource;
+use deno_core::error::bad_resource_id;
+use deno_core::error::AnyError;
+use deno_core::ResourceId;
+use deno_core::ResourceTable;
+use deno_tls::rustls::ServerConfig;
+use pin_project::pin_project;
+use std::rc::Rc;
+use std::sync::Arc;
+use tokio::net::TcpStream;
+#[cfg(unix)]
+use tokio::net::UnixStream;
+
+/// A raw stream of one of the types handled by this extension.
+#[pin_project(project = NetworkStreamProject)]
+pub enum NetworkStream {
+ Tcp(#[pin] TcpStream),
+ Tls(#[pin] TlsStream),
+ #[cfg(unix)]
+ Unix(#[pin] UnixStream),
+}
+
+/// A raw stream of one of the types handled by this extension.
+#[derive(Copy, Clone, PartialEq, Eq)]
+pub enum NetworkStreamType {
+ Tcp,
+ Tls,
+ #[cfg(unix)]
+ Unix,
+}
+
+impl NetworkStream {
+ pub fn local_address(&self) -> Result<NetworkStreamAddress, std::io::Error> {
+ match self {
+ Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)),
+ Self::Tls(tls) => Ok(NetworkStreamAddress::Ip(tls.local_addr()?)),
+ #[cfg(unix)]
+ Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.local_addr()?)),
+ }
+ }
+
+ pub fn peer_address(&self) -> Result<NetworkStreamAddress, std::io::Error> {
+ match self {
+ Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.peer_addr()?)),
+ Self::Tls(tls) => Ok(NetworkStreamAddress::Ip(tls.peer_addr()?)),
+ #[cfg(unix)]
+ Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.peer_addr()?)),
+ }
+ }
+
+ pub fn stream(&self) -> NetworkStreamType {
+ match self {
+ Self::Tcp(_) => NetworkStreamType::Tcp,
+ Self::Tls(_) => NetworkStreamType::Tls,
+ #[cfg(unix)]
+ Self::Unix(_) => NetworkStreamType::Unix,
+ }
+ }
+}
+
+impl tokio::io::AsyncRead for NetworkStream {
+ fn poll_read(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ match self.project() {
+ NetworkStreamProject::Tcp(s) => s.poll_read(cx, buf),
+ NetworkStreamProject::Tls(s) => s.poll_read(cx, buf),
+ #[cfg(unix)]
+ NetworkStreamProject::Unix(s) => s.poll_read(cx, buf),
+ }
+ }
+}
+
+impl tokio::io::AsyncWrite for NetworkStream {
+ fn poll_write(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ match self.project() {
+ NetworkStreamProject::Tcp(s) => s.poll_write(cx, buf),
+ NetworkStreamProject::Tls(s) => s.poll_write(cx, buf),
+ #[cfg(unix)]
+ NetworkStreamProject::Unix(s) => s.poll_write(cx, buf),
+ }
+ }
+
+ fn poll_flush(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ match self.project() {
+ NetworkStreamProject::Tcp(s) => s.poll_flush(cx),
+ NetworkStreamProject::Tls(s) => s.poll_flush(cx),
+ #[cfg(unix)]
+ NetworkStreamProject::Unix(s) => s.poll_flush(cx),
+ }
+ }
+
+ fn poll_shutdown(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ match self.project() {
+ NetworkStreamProject::Tcp(s) => s.poll_shutdown(cx),
+ NetworkStreamProject::Tls(s) => s.poll_shutdown(cx),
+ #[cfg(unix)]
+ NetworkStreamProject::Unix(s) => s.poll_shutdown(cx),
+ }
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ match self {
+ Self::Tcp(s) => s.is_write_vectored(),
+ Self::Tls(s) => s.is_write_vectored(),
+ #[cfg(unix)]
+ Self::Unix(s) => s.is_write_vectored(),
+ }
+ }
+
+ fn poll_write_vectored(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ match self.project() {
+ NetworkStreamProject::Tcp(s) => s.poll_write_vectored(cx, bufs),
+ NetworkStreamProject::Tls(s) => s.poll_write_vectored(cx, bufs),
+ #[cfg(unix)]
+ NetworkStreamProject::Unix(s) => s.poll_write_vectored(cx, bufs),
+ }
+ }
+}
+
+/// A raw stream listener of one of the types handled by this extension.
+pub enum NetworkStreamListener {
+ Tcp(tokio::net::TcpListener),
+ Tls(tokio::net::TcpListener, Arc<ServerConfig>),
+ #[cfg(unix)]
+ Unix(tokio::net::UnixListener),
+}
+
+pub enum NetworkStreamAddress {
+ Ip(std::net::SocketAddr),
+ #[cfg(unix)]
+ Unix(tokio::net::unix::SocketAddr),
+}
+
+impl NetworkStreamListener {
+ /// Accepts a connection on this listener.
+ pub async fn accept(&self) -> Result<NetworkStream, AnyError> {
+ Ok(match self {
+ Self::Tcp(tcp) => {
+ let (stream, _addr) = tcp.accept().await?;
+ NetworkStream::Tcp(stream)
+ }
+ Self::Tls(tcp, config) => {
+ let (stream, _addr) = tcp.accept().await?;
+ NetworkStream::Tls(TlsStream::new_server_side(stream, config.clone()))
+ }
+ #[cfg(unix)]
+ Self::Unix(unix) => {
+ let (stream, _addr) = unix.accept().await?;
+ NetworkStream::Unix(stream)
+ }
+ })
+ }
+
+ pub fn listen_address(&self) -> Result<NetworkStreamAddress, std::io::Error> {
+ match self {
+ Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)),
+ Self::Tls(tcp, _) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)),
+ #[cfg(unix)]
+ Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.local_addr()?)),
+ }
+ }
+
+ pub fn stream(&self) -> NetworkStreamType {
+ match self {
+ Self::Tcp(..) => NetworkStreamType::Tcp,
+ Self::Tls(..) => NetworkStreamType::Tls,
+ #[cfg(unix)]
+ Self::Unix(..) => NetworkStreamType::Unix,
+ }
+ }
+}
+
+/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server).
+/// This method will extract a stream from the resource table and return it, unwrapped.
+pub fn take_network_stream_resource(
+ resource_table: &mut ResourceTable,
+ stream_rid: ResourceId,
+) -> Result<NetworkStream, AnyError> {
+ // The stream we're attempting to unwrap may be in use somewhere else. If that's the case, we cannot proceed
+ // with the process of unwrapping this connection, so we just return a bad resource error.
+ // See also: https://github.com/denoland/deno/pull/16242
+
+ if let Ok(resource_rc) = resource_table.take::<TcpStreamResource>(stream_rid)
+ {
+ // This TCP connection might be used somewhere else.
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("TCP stream is currently in use"))?;
+ let (read_half, write_half) = resource.into_inner();
+ let tcp_stream = read_half.reunite(write_half)?;
+ return Ok(NetworkStream::Tcp(tcp_stream));
+ }
+
+ if let Ok(resource_rc) = resource_table.take::<TlsStreamResource>(stream_rid)
+ {
+ // This TLS connection might be used somewhere else.
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("TLS stream is currently in use"))?;
+ let (read_half, write_half) = resource.into_inner();
+ let tls_stream = read_half.reunite(write_half);
+ return Ok(NetworkStream::Tls(tls_stream));
+ }
+
+ #[cfg(unix)]
+ if let Ok(resource_rc) = resource_table.take::<UnixStreamResource>(stream_rid)
+ {
+ // This UNIX socket might be used somewhere else.
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("UNIX stream is currently in use"))?;
+ let (read_half, write_half) = resource.into_inner();
+ let unix_stream = read_half.reunite(write_half)?;
+ return Ok(NetworkStream::Unix(unix_stream));
+ }
+
+ Err(bad_resource_id())
+}
+
+/// Inserts a raw stream (back?) into the resource table and returns a resource ID. This can then be used to create raw connection
+/// objects on the JS side.
+pub fn put_network_stream_resource(
+ resource_table: &mut ResourceTable,
+ stream: NetworkStream,
+) -> Result<ResourceId, AnyError> {
+ let res = match stream {
+ NetworkStream::Tcp(conn) => {
+ let (r, w) = conn.into_split();
+ resource_table.add(TcpStreamResource::new((r, w)))
+ }
+ NetworkStream::Tls(conn) => {
+ let (r, w) = conn.into_split();
+ resource_table.add(TlsStreamResource::new((r, w)))
+ }
+ #[cfg(unix)]
+ NetworkStream::Unix(conn) => {
+ let (r, w) = conn.into_split();
+ resource_table.add(UnixStreamResource::new((r, w)))
+ }
+ };
+
+ Ok(res)
+}
+
+/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server).
+/// This method will extract a stream from the resource table and return it, unwrapped.
+pub fn take_network_stream_listener_resource(
+ resource_table: &mut ResourceTable,
+ listener_rid: ResourceId,
+) -> Result<NetworkStreamListener, AnyError> {
+ if let Ok(resource_rc) =
+ resource_table.take::<TcpListenerResource>(listener_rid)
+ {
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("TCP socket listener is currently in use"))?;
+ return Ok(NetworkStreamListener::Tcp(resource.listener.into_inner()));
+ }
+
+ if let Ok(resource_rc) =
+ resource_table.take::<TlsListenerResource>(listener_rid)
+ {
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("TLS socket listener is currently in use"))?;
+ return Ok(NetworkStreamListener::Tls(
+ resource.tcp_listener.into_inner(),
+ resource.tls_config,
+ ));
+ }
+
+ #[cfg(unix)]
+ if let Ok(resource_rc) =
+ resource_table.take::<UnixListenerResource>(listener_rid)
+ {
+ let resource = Rc::try_unwrap(resource_rc)
+ .map_err(|_| bad_resource("UNIX socket listener is currently in use"))?;
+ return Ok(NetworkStreamListener::Unix(resource.listener.into_inner()));
+ }
+
+ Err(bad_resource_id())
+}