diff options
Diffstat (limited to 'ext/net/ops_tls.rs')
-rw-r--r-- | ext/net/ops_tls.rs | 1061 |
1 files changed, 1061 insertions, 0 deletions
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs new file mode 100644 index 000000000..14a135d7d --- /dev/null +++ b/ext/net/ops_tls.rs @@ -0,0 +1,1061 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::io::TcpStreamResource; +use crate::io::TlsStreamResource; +use crate::ops::IpAddr; +use crate::ops::OpAddr; +use crate::ops::OpConn; +use crate::resolve_addr::resolve_addr; +use crate::resolve_addr::resolve_addr_sync; +use crate::DefaultTlsOptions; +use crate::NetPermissions; +use crate::UnsafelyIgnoreCertificateErrors; +use deno_core::error::bad_resource; +use deno_core::error::bad_resource_id; +use deno_core::error::custom_error; +use deno_core::error::generic_error; +use deno_core::error::invalid_hostname; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::ready; +use deno_core::futures::task::noop_waker_ref; +use deno_core::futures::task::AtomicWaker; +use deno_core::futures::task::Context; +use deno_core::futures::task::Poll; +use deno_core::futures::task::RawWaker; +use deno_core::futures::task::RawWakerVTable; +use deno_core::futures::task::Waker; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::parking_lot::Mutex; +use deno_core::AsyncRefCell; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpPair; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_tls::create_client_config; +use deno_tls::rustls::internal::pemfile::certs; +use deno_tls::rustls::internal::pemfile::pkcs8_private_keys; +use deno_tls::rustls::internal::pemfile::rsa_private_keys; +use deno_tls::rustls::Certificate; +use deno_tls::rustls::ClientConfig; +use deno_tls::rustls::ClientSession; +use deno_tls::rustls::NoClientAuth; +use deno_tls::rustls::PrivateKey; +use deno_tls::rustls::ServerConfig; +use deno_tls::rustls::ServerSession; +use deno_tls::rustls::Session; +use deno_tls::webpki::DNSNameRef; +use io::Error; +use io::Read; +use io::Write; +use serde::Deserialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::convert::From; +use std::fs::File; +use std::io; +use std::io::BufRead; +use std::io::BufReader; +use std::io::ErrorKind; +use std::ops::Deref; +use std::ops::DerefMut; +use std::path::Path; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::Weak; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::io::ReadBuf; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::task::spawn_local; + +#[derive(Debug)] +enum TlsSession { + Client(ClientSession), + Server(ServerSession), +} + +impl Deref for TlsSession { + type Target = dyn Session; + + fn deref(&self) -> &Self::Target { + match self { + TlsSession::Client(client_session) => client_session, + TlsSession::Server(server_session) => server_session, + } + } +} + +impl DerefMut for TlsSession { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + TlsSession::Client(client_session) => client_session, + TlsSession::Server(server_session) => server_session, + } + } +} + +impl From<ClientSession> for TlsSession { + fn from(client_session: ClientSession) -> Self { + TlsSession::Client(client_session) + } +} + +impl From<ServerSession> for TlsSession { + fn from(server_session: ServerSession) -> Self { + TlsSession::Server(server_session) + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum Flow { + Read, + Write, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +enum State { + StreamOpen, + StreamClosed, + TlsClosing, + TlsClosed, + TcpClosed, +} + +#[derive(Debug)] +pub struct TlsStream(Option<TlsStreamInner>); + +impl TlsStream { + fn new(tcp: TcpStream, tls: TlsSession) -> Self { + let inner = TlsStreamInner { + tcp, + tls, + rd_state: State::StreamOpen, + wr_state: State::StreamOpen, + }; + Self(Some(inner)) + } + + pub fn new_client_side( + tcp: TcpStream, + tls_config: &Arc<ClientConfig>, + hostname: DNSNameRef, + ) -> Self { + let tls = TlsSession::Client(ClientSession::new(tls_config, hostname)); + Self::new(tcp, tls) + } + + pub fn new_server_side( + tcp: TcpStream, + tls_config: &Arc<ServerConfig>, + ) -> Self { + let tls = TlsSession::Server(ServerSession::new(tls_config)); + Self::new(tcp, tls) + } + + pub async fn handshake(&mut self) -> io::Result<()> { + poll_fn(|cx| self.inner_mut().poll_io(cx, Flow::Write)).await + } + + fn into_split(self) -> (ReadHalf, WriteHalf) { + let shared = Shared::new(self); + let rd = ReadHalf { + shared: shared.clone(), + }; + let wr = WriteHalf { shared }; + (rd, wr) + } + + /// Tokio-rustls compatibility: returns a reference to the underlying TCP + /// stream, and a reference to the Rustls `Session` object. + pub fn get_ref(&self) -> (&TcpStream, &dyn Session) { + let inner = self.0.as_ref().unwrap(); + (&inner.tcp, &*inner.tls) + } + + fn inner_mut(&mut self) -> &mut TlsStreamInner { + self.0.as_mut().unwrap() + } +} + +impl AsyncRead for TlsStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self.inner_mut().poll_read(cx, buf) + } +} + +impl AsyncWrite for TlsStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.inner_mut().poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self.inner_mut().poll_io(cx, Flow::Write) + // The underlying TCP stream does not need to be flushed. + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self.inner_mut().poll_shutdown(cx) + } +} + +impl Drop for TlsStream { + fn drop(&mut self) { + let mut inner = self.0.take().unwrap(); + + let mut cx = Context::from_waker(noop_waker_ref()); + let use_linger_task = inner.poll_close(&mut cx).is_pending(); + + if use_linger_task { + spawn_local(poll_fn(move |cx| inner.poll_close(cx))); + } else if cfg!(debug_assertions) { + spawn_local(async {}); // Spawn dummy task to detect missing LocalSet. + } + } +} + +#[derive(Debug)] +pub struct TlsStreamInner { + tls: TlsSession, + tcp: TcpStream, + rd_state: State, + wr_state: State, +} + +impl TlsStreamInner { + fn poll_io( + &mut self, + cx: &mut Context<'_>, + flow: Flow, + ) -> Poll<io::Result<()>> { + loop { + let wr_ready = loop { + match self.wr_state { + _ if self.tls.is_handshaking() && !self.tls.wants_write() => { + break true; + } + _ if self.tls.is_handshaking() => {} + State::StreamOpen if !self.tls.wants_write() => break true, + State::StreamClosed => { + // Rustls will enqueue the 'CloseNotify' alert and send it after + // flusing the data that is already in the queue. + self.tls.send_close_notify(); + self.wr_state = State::TlsClosing; + continue; + } + State::TlsClosing if !self.tls.wants_write() => { + self.wr_state = State::TlsClosed; + continue; + } + // If a 'CloseNotify' alert sent by the remote end has been received, + // shut down the underlying TCP socket. Otherwise, consider polling + // done for the moment. + State::TlsClosed if self.rd_state < State::TlsClosed => break true, + State::TlsClosed + if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() => + { + break false; + } + State::TlsClosed => { + self.wr_state = State::TcpClosed; + continue; + } + State::TcpClosed => break true, + _ => {} + } + + // Poll whether there is space in the socket send buffer so we can flush + // the remaining outgoing ciphertext. + if self.tcp.poll_write_ready(cx)?.is_pending() { + break false; + } + + // Write ciphertext to the TCP socket. + let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp); + match self.tls.write_tls(&mut wrapped_tcp) { + Ok(0) => unreachable!(), + Ok(_) => {} + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => return Poll::Ready(Err(err)), + } + }; + + let rd_ready = loop { + match self.rd_state { + State::TcpClosed if self.tls.is_handshaking() => { + let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof"); + return Poll::Ready(Err(err)); + } + _ if self.tls.is_handshaking() && !self.tls.wants_read() => { + break true; + } + _ if self.tls.is_handshaking() => {} + State::StreamOpen if !self.tls.wants_read() => break true, + State::StreamOpen => {} + State::StreamClosed if !self.tls.wants_read() => { + // Rustls has more incoming cleartext buffered up, but the TLS + // session is closing so this data will never be processed by the + // application layer. Just like what would happen if this were a raw + // TCP stream, don't gracefully end the TLS session, but abort it. + return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset))); + } + State::StreamClosed => {} + State::TlsClosed if self.wr_state == State::TcpClosed => { + // Wait for the remote end to gracefully close the TCP connection. + // TODO(piscisaureus): this is unnecessary; remove when stable. + } + _ => break true, + } + + if self.rd_state < State::TlsClosed { + // Do a zero-length plaintext read so we can detect the arrival of + // 'CloseNotify' messages, even if only the write half is open. + // Actually reading data from the socket is done in `poll_read()`. + match self.tls.read(&mut []) { + Ok(0) => {} + Err(err) if err.kind() == ErrorKind::ConnectionAborted => { + // `Session::read()` returns `ConnectionAborted` when a + // 'CloseNotify' alert has been received, which indicates that + // the remote peer wants to gracefully end the TLS session. + self.rd_state = State::TlsClosed; + continue; + } + Err(err) => return Poll::Ready(Err(err)), + _ => unreachable!(), + } + } + + // Poll whether more ciphertext is available in the socket receive + // buffer. + if self.tcp.poll_read_ready(cx)?.is_pending() { + break false; + } + + // Receive ciphertext from the socket. + let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp); + match self.tls.read_tls(&mut wrapped_tcp) { + Ok(0) => self.rd_state = State::TcpClosed, + Ok(_) => self + .tls + .process_new_packets() + .map_err(|err| Error::new(ErrorKind::InvalidData, err))?, + Err(err) if err.kind() == ErrorKind::WouldBlock => {} + Err(err) => return Poll::Ready(Err(err)), + } + }; + + if wr_ready { + if self.rd_state >= State::TlsClosed + && self.wr_state >= State::TlsClosed + && self.wr_state < State::TcpClosed + { + continue; + } + if self.tls.wants_write() { + continue; + } + } + + let io_ready = match flow { + _ if self.tls.is_handshaking() => false, + Flow::Read => rd_ready, + Flow::Write => wr_ready, + }; + return match io_ready { + false => Poll::Pending, + true => Poll::Ready(Ok(())), + }; + } + } + + fn poll_read( + &mut self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + ready!(self.poll_io(cx, Flow::Read))?; + + if self.rd_state == State::StreamOpen { + let buf_slice = + unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) }; + let bytes_read = self.tls.read(buf_slice)?; + assert_ne!(bytes_read, 0); + unsafe { buf.assume_init(bytes_read) }; + buf.advance(bytes_read); + } + + Poll::Ready(Ok(())) + } + + fn poll_write( + &mut self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + if buf.is_empty() { + // Tokio-rustls compatibility: a zero byte write always succeeds. + Poll::Ready(Ok(0)) + } else if self.wr_state == State::StreamOpen { + // Flush Rustls' ciphertext send queue. + ready!(self.poll_io(cx, Flow::Write))?; + + // Copy data from `buf` to the Rustls cleartext send queue. + let bytes_written = self.tls.write(buf)?; + assert_ne!(bytes_written, 0); + + // Try to flush as much ciphertext as possible. However, since we just + // handed off at least some bytes to rustls, so we can't return + // `Poll::Pending()` any more: this would tell the caller that it should + // try to send those bytes again. + let _ = self.poll_io(cx, Flow::Write)?; + + Poll::Ready(Ok(bytes_written)) + } else { + // Return error if stream has been shut down for writing. + Poll::Ready(Err(ErrorKind::BrokenPipe.into())) + } + } + + fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + if self.wr_state == State::StreamOpen { + self.wr_state = State::StreamClosed; + } + + ready!(self.poll_io(cx, Flow::Write))?; + + // At minimum, a TLS 'CloseNotify' alert should have been sent. + assert!(self.wr_state >= State::TlsClosed); + // If we received a TLS 'CloseNotify' alert from the remote end + // already, the TCP socket should be shut down at this point. + assert!( + self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed + ); + + Poll::Ready(Ok(())) + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + if self.rd_state == State::StreamOpen { + self.rd_state = State::StreamClosed; + } + + // Send TLS 'CloseNotify' alert. + ready!(self.poll_shutdown(cx))?; + // Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet. + ready!(self.poll_io(cx, Flow::Read))?; + + assert_eq!(self.rd_state, State::TcpClosed); + assert_eq!(self.wr_state, State::TcpClosed); + + Poll::Ready(Ok(())) + } +} + +#[derive(Debug)] +pub struct ReadHalf { + shared: Arc<Shared>, +} + +impl ReadHalf { + pub fn reunite(self, wr: WriteHalf) -> TlsStream { + assert!(Arc::ptr_eq(&self.shared, &wr.shared)); + drop(wr); // Drop `wr`, so only one strong reference to `shared` remains. + + Arc::try_unwrap(self.shared) + .unwrap_or_else(|_| panic!("Arc::<Shared>::try_unwrap() failed")) + .tls_stream + .into_inner() + } +} + +impl AsyncRead for ReadHalf { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Read, move |tls, cx| { + tls.poll_read(cx, buf) + }) + } +} + +#[derive(Debug)] +pub struct WriteHalf { + shared: Arc<Shared>, +} + +impl AsyncWrite for WriteHalf { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, move |tls, cx| { + tls.poll_write(cx, buf) + }) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx)) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self + .shared + .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx)) + } +} + +#[derive(Debug)] +struct Shared { + tls_stream: Mutex<TlsStream>, + rd_waker: AtomicWaker, + wr_waker: AtomicWaker, +} + +impl Shared { + fn new(tls_stream: TlsStream) -> Arc<Self> { + let self_ = Self { + tls_stream: Mutex::new(tls_stream), + rd_waker: AtomicWaker::new(), + wr_waker: AtomicWaker::new(), + }; + Arc::new(self_) + } + + fn poll_with_shared_waker<R>( + self: &Arc<Self>, + cx: &mut Context<'_>, + flow: Flow, + mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R, + ) -> R { + match flow { + Flow::Read => self.rd_waker.register(cx.waker()), + Flow::Write => self.wr_waker.register(cx.waker()), + } + + let shared_waker = self.new_shared_waker(); + let mut cx = Context::from_waker(&shared_waker); + + let mut tls_stream = self.tls_stream.lock(); + f(Pin::new(&mut tls_stream), &mut cx) + } + + const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + Self::clone_shared_waker, + Self::wake_shared_waker, + Self::wake_shared_waker_by_ref, + Self::drop_shared_waker, + ); + + fn new_shared_waker(self: &Arc<Self>) -> Waker { + let self_weak = Arc::downgrade(self); + let self_ptr = self_weak.into_raw() as *const (); + let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE); + unsafe { Waker::from_raw(raw_waker) } + } + + fn clone_shared_waker(self_ptr: *const ()) -> RawWaker { + let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; + let ptr1 = self_weak.clone().into_raw(); + let ptr2 = self_weak.into_raw(); + assert!(ptr1 == ptr2); + RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE) + } + + fn wake_shared_waker(self_ptr: *const ()) { + Self::wake_shared_waker_by_ref(self_ptr); + Self::drop_shared_waker(self_ptr); + } + + fn wake_shared_waker_by_ref(self_ptr: *const ()) { + let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; + if let Some(self_arc) = Weak::upgrade(&self_weak) { + self_arc.rd_waker.wake(); + self_arc.wr_waker.wake(); + } + self_weak.into_raw(); + } + + fn drop_shared_waker(self_ptr: *const ()) { + let _ = unsafe { Weak::from_raw(self_ptr as *const Self) }; + } +} + +struct ImplementReadTrait<'a, T>(&'a mut T); + +impl Read for ImplementReadTrait<'_, TcpStream> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.try_read(buf) + } +} + +struct ImplementWriteTrait<'a, T>(&'a mut T); + +impl Write for ImplementWriteTrait<'_, TcpStream> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.try_write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> { + vec![ + ("op_start_tls", op_async(op_start_tls::<P>)), + ("op_connect_tls", op_async(op_connect_tls::<P>)), + ("op_listen_tls", op_sync(op_listen_tls::<P>)), + ("op_accept_tls", op_async(op_accept_tls)), + ] +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ConnectTlsArgs { + transport: String, + hostname: String, + port: u16, + cert_file: Option<String>, + cert_chain: Option<String>, + private_key: Option<String>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct StartTlsArgs { + rid: ResourceId, + cert_file: Option<String>, + hostname: String, +} + +async fn op_start_tls<NP>( + state: Rc<RefCell<OpState>>, + args: StartTlsArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + let rid = args.rid; + let hostname = match &*args.hostname { + "" => "localhost", + n => n, + }; + let cert_file = args.cert_file.as_deref(); + { + super::check_unstable2(&state, "Deno.startTls"); + let mut s = state.borrow_mut(); + let permissions = s.borrow_mut::<NP>(); + permissions.check_net(&(hostname, Some(0)))?; + if let Some(path) = cert_file { + permissions.check_read(Path::new(path))?; + } + } + + let ca_data = match cert_file { + Some(path) => { + let mut buf = Vec::new(); + File::open(path)?.read_to_end(&mut buf)?; + Some(buf) + } + _ => None, + }; + + let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) + .map_err(|_| invalid_hostname(hostname))?; + + let unsafely_ignore_certificate_errors = state + .borrow() + .borrow::<UnsafelyIgnoreCertificateErrors>() + .0 + .clone(); + + // TODO(@justinmchase): Ideally the certificate store is created once + // and not cloned. The store should be wrapped in Arc<T> to reduce + // copying memory unnecessarily. + let root_cert_store = state + .borrow() + .borrow::<DefaultTlsOptions>() + .root_cert_store + .clone(); + let resource_rc = state + .borrow_mut() + .resource_table + .take::<TcpStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; + + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let tls_config = Arc::new(create_client_config( + root_cert_store, + ca_data, + unsafely_ignore_certificate_errors, + )?); + let tls_stream = + TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +async fn op_connect_tls<NP>( + state: Rc<RefCell<OpState>>, + args: ConnectTlsArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + assert_eq!(args.transport, "tcp"); + let hostname = match &*args.hostname { + "" => "localhost", + n => n, + }; + let port = args.port; + let cert_file = args.cert_file.as_deref(); + let unsafely_ignore_certificate_errors = state + .borrow() + .borrow::<UnsafelyIgnoreCertificateErrors>() + .0 + .clone(); + + if args.cert_chain.is_some() { + super::check_unstable2(&state, "ConnectTlsOptions.certChain"); + } + if args.private_key.is_some() { + super::check_unstable2(&state, "ConnectTlsOptions.privateKey"); + } + + { + let mut s = state.borrow_mut(); + let permissions = s.borrow_mut::<NP>(); + permissions.check_net(&(hostname, Some(port)))?; + if let Some(path) = cert_file { + permissions.check_read(Path::new(path))?; + } + } + + let ca_data = match cert_file { + Some(path) => { + let mut buf = Vec::new(); + File::open(path)?.read_to_end(&mut buf)?; + Some(buf) + } + _ => None, + }; + + let root_cert_store = state + .borrow() + .borrow::<DefaultTlsOptions>() + .root_cert_store + .clone(); + let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) + .map_err(|_| invalid_hostname(hostname))?; + + let connect_addr = resolve_addr(hostname, port) + .await? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let tcp_stream = TcpStream::connect(connect_addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut tls_config = create_client_config( + root_cert_store, + ca_data, + unsafely_ignore_certificate_errors, + )?; + + if args.cert_chain.is_some() || args.private_key.is_some() { + let cert_chain = args + .cert_chain + .ok_or_else(|| type_error("No certificate chain provided"))?; + let private_key = args + .private_key + .ok_or_else(|| type_error("No private key provided"))?; + + // The `remove` is safe because load_private_keys checks that there is at least one key. + let private_key = load_private_keys(private_key.as_bytes())?.remove(0); + + tls_config.set_single_client_cert( + load_certs(&mut cert_chain.as_bytes())?, + private_key, + )?; + } + + let tls_config = Arc::new(tls_config); + + let tls_stream = + TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} + +fn load_certs(reader: &mut dyn BufRead) -> Result<Vec<Certificate>, AnyError> { + let certs = certs(reader) + .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?; + + if certs.is_empty() { + let e = custom_error("InvalidData", "No certificates found in cert file"); + return Err(e); + } + + Ok(certs) +} + +fn load_certs_from_file(path: &str) -> Result<Vec<Certificate>, AnyError> { + let cert_file = File::open(path)?; + let reader = &mut BufReader::new(cert_file); + load_certs(reader) +} + +fn key_decode_err() -> AnyError { + custom_error("InvalidData", "Unable to decode key") +} + +fn key_not_found_err() -> AnyError { + custom_error("InvalidData", "No keys found in key file") +} + +/// Starts with -----BEGIN RSA PRIVATE KEY----- +fn load_rsa_keys(mut bytes: &[u8]) -> Result<Vec<PrivateKey>, AnyError> { + let keys = rsa_private_keys(&mut bytes).map_err(|_| key_decode_err())?; + Ok(keys) +} + +/// Starts with -----BEGIN PRIVATE KEY----- +fn load_pkcs8_keys(mut bytes: &[u8]) -> Result<Vec<PrivateKey>, AnyError> { + let keys = pkcs8_private_keys(&mut bytes).map_err(|_| key_decode_err())?; + Ok(keys) +} + +fn load_private_keys(bytes: &[u8]) -> Result<Vec<PrivateKey>, AnyError> { + let mut keys = load_rsa_keys(bytes)?; + + if keys.is_empty() { + keys = load_pkcs8_keys(bytes)?; + } + + if keys.is_empty() { + return Err(key_not_found_err()); + } + + Ok(keys) +} + +fn load_private_keys_from_file( + path: &str, +) -> Result<Vec<PrivateKey>, AnyError> { + let key_bytes = std::fs::read(path)?; + load_private_keys(&key_bytes) +} + +pub struct TlsListenerResource { + tcp_listener: AsyncRefCell<TcpListener>, + tls_config: Arc<ServerConfig>, + cancel_handle: CancelHandle, +} + +impl Resource for TlsListenerResource { + fn name(&self) -> Cow<str> { + "tlsListener".into() + } + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel(); + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListenTlsArgs { + transport: String, + hostname: String, + port: u16, + cert_file: String, + key_file: String, + alpn_protocols: Option<Vec<String>>, +} + +fn op_listen_tls<NP>( + state: &mut OpState, + args: ListenTlsArgs, + _: (), +) -> Result<OpConn, AnyError> +where + NP: NetPermissions + 'static, +{ + assert_eq!(args.transport, "tcp"); + let hostname = &*args.hostname; + let port = args.port; + let cert_file = &*args.cert_file; + let key_file = &*args.key_file; + + { + let permissions = state.borrow_mut::<NP>(); + permissions.check_net(&(hostname, Some(port)))?; + permissions.check_read(Path::new(cert_file))?; + permissions.check_read(Path::new(key_file))?; + } + + let mut tls_config = ServerConfig::new(NoClientAuth::new()); + if let Some(alpn_protocols) = args.alpn_protocols { + super::check_unstable(state, "Deno.listenTls#alpn_protocols"); + tls_config.alpn_protocols = + alpn_protocols.into_iter().map(|s| s.into_bytes()).collect(); + } + tls_config + .set_single_cert( + load_certs_from_file(cert_file)?, + load_private_keys_from_file(key_file)?.remove(0), + ) + .expect("invalid key or certificate"); + + let bind_addr = resolve_addr_sync(hostname, port)? + .next() + .ok_or_else(|| generic_error("No resolved address found"))?; + let std_listener = std::net::TcpListener::bind(bind_addr)?; + std_listener.set_nonblocking(true)?; + let tcp_listener = TcpListener::from_std(std_listener)?; + let local_addr = tcp_listener.local_addr()?; + + let tls_listener_resource = TlsListenerResource { + tcp_listener: AsyncRefCell::new(tcp_listener), + tls_config: Arc::new(tls_config), + cancel_handle: Default::default(), + }; + + let rid = state.resource_table.add(tls_listener_resource); + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: None, + }) +} + +async fn op_accept_tls( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<OpConn, AnyError> { + let resource = state + .borrow() + .resource_table + .get::<TlsListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + + let cancel_handle = RcRef::map(&resource, |r| &r.cancel_handle); + let tcp_listener = RcRef::map(&resource, |r| &r.tcp_listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + + let (tcp_stream, remote_addr) = + match tcp_listener.accept().try_or_cancel(&cancel_handle).await { + Ok(tuple) => tuple, + Err(err) if err.kind() == ErrorKind::Interrupted => { + // FIXME(bartlomieju): compatibility with current JS implementation. + return Err(bad_resource("Listener has been closed")); + } + Err(err) => return Err(err.into()), + }; + + let local_addr = tcp_stream.local_addr()?; + + let tls_stream = TlsStream::new_server_side(tcp_stream, &resource.tls_config); + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(TlsStreamResource::new(tls_stream.into_split())) + }; + + Ok(OpConn { + rid, + local_addr: Some(OpAddr::Tcp(IpAddr { + hostname: local_addr.ip().to_string(), + port: local_addr.port(), + })), + remote_addr: Some(OpAddr::Tcp(IpAddr { + hostname: remote_addr.ip().to_string(), + port: remote_addr.port(), + })), + }) +} |