diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-11-15 16:12:46 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-15 16:12:46 -0700 |
commit | 6b42cecc064d01d87aae978ecd7eb372bfe9a34e (patch) | |
tree | 3fb7f0e4be0c1e8184dde61f96324c7a8419d6b9 /ext/net/ops_tls.rs | |
parent | 40726721e287b2d6c44839d9081dccb408886cab (diff) |
feat(ext/net): use rustls_tokio_stream (#21205)
Fixes #21121 and #19498
Migrates fully to rustls_tokio_stream. We no longer need to maintain our
own TlsStream implementation to properly support duplex.
This should fix a number of errors with TLS and websockets, HTTP and
"other" places where it's failing.
Diffstat (limited to 'ext/net/ops_tls.rs')
-rw-r--r-- | ext/net/ops_tls.rs | 671 |
1 files changed, 30 insertions, 641 deletions
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index 26ec48fba..8c6474432 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -14,22 +14,9 @@ use deno_core::error::generic_error; use deno_core::error::invalid_hostname; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::futures::ready; -use deno_core::futures::task::noop_waker_ref; -use deno_core::futures::task::AtomicWaker; -use deno_core::futures::task::Context; -use deno_core::futures::task::Poll; -use deno_core::futures::task::RawWaker; -use deno_core::futures::task::RawWakerVTable; -use deno_core::futures::task::Waker; use deno_core::op2; - -use deno_core::parking_lot::Mutex; -use deno_core::unsync::spawn; use deno_core::AsyncRefCell; use deno_core::AsyncResult; -use deno_core::ByteString; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::OpState; @@ -40,17 +27,13 @@ use deno_tls::create_client_config; use deno_tls::load_certs; use deno_tls::load_private_keys; use deno_tls::rustls::Certificate; -use deno_tls::rustls::ClientConfig; -use deno_tls::rustls::ClientConnection; -use deno_tls::rustls::Connection; use deno_tls::rustls::PrivateKey; use deno_tls::rustls::ServerConfig; -use deno_tls::rustls::ServerConnection; use deno_tls::rustls::ServerName; use deno_tls::SocketUse; -use io::Error; use io::Read; -use io::Write; +use rustls_tokio_stream::TlsStreamRead; +use rustls_tokio_stream::TlsStreamWrite; use serde::Deserialize; use socket2::Domain; use socket2::Socket; @@ -63,632 +46,31 @@ use std::fs::File; use std::io; use std::io::BufReader; use std::io::ErrorKind; -use std::net::SocketAddr; +use std::num::NonZeroUsize; use std::path::Path; -use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; -use std::sync::Weak; -use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; -use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use tokio::io::ReadBuf; use tokio::net::TcpListener; use tokio::net::TcpStream; -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -enum Flow { - Handshake, - Read, - Write, -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -enum State { - StreamOpen, - StreamClosed, - TlsClosing, - TlsClosed, - TcpClosed, -} - -pub struct TlsStream(Option<TlsStreamInner>); - -impl TlsStream { - fn new(tcp: TcpStream, mut tls: Connection) -> Self { - tls.set_buffer_limit(None); - - let inner = TlsStreamInner { - tcp, - tls, - rd_state: State::StreamOpen, - wr_state: State::StreamOpen, - }; - Self(Some(inner)) - } - - pub fn new_client_side( - tcp: TcpStream, - tls_config: Arc<ClientConfig>, - server_name: ServerName, - ) -> Self { - let tls = ClientConnection::new(tls_config, server_name).unwrap(); - Self::new(tcp, Connection::Client(tls)) - } - - pub fn new_client_side_from( - tcp: TcpStream, - connection: ClientConnection, - ) -> Self { - Self::new(tcp, Connection::Client(connection)) - } - - pub fn new_server_side( - tcp: TcpStream, - tls_config: Arc<ServerConfig>, - ) -> Self { - let tls = ServerConnection::new(tls_config).unwrap(); - Self::new(tcp, Connection::Server(tls)) - } - - pub fn new_server_side_from( - tcp: TcpStream, - connection: ServerConnection, - ) -> Self { - Self::new(tcp, Connection::Server(connection)) - } - - pub fn into_split(self) -> (ReadHalf, WriteHalf) { - let shared = Shared::new(self); - let rd = ReadHalf { - shared: shared.clone(), - }; - let wr = WriteHalf { shared }; - (rd, wr) - } - - /// Convenience method to match [`TcpStream`]. - pub fn peer_addr(&self) -> Result<SocketAddr, io::Error> { - self.0.as_ref().unwrap().tcp.peer_addr() - } - - /// Convenience method to match [`TcpStream`]. - pub fn local_addr(&self) -> Result<SocketAddr, io::Error> { - self.0.as_ref().unwrap().tcp.local_addr() - } - - /// Tokio-rustls compatibility: returns a reference to the underlying TCP - /// stream, and a reference to the Rustls `Connection` object. - pub fn get_ref(&self) -> (&TcpStream, &Connection) { - let inner = self.0.as_ref().unwrap(); - (&inner.tcp, &inner.tls) - } - - fn inner_mut(&mut self) -> &mut TlsStreamInner { - self.0.as_mut().unwrap() - } - - pub async fn handshake(&mut self) -> io::Result<()> { - poll_fn(|cx| self.inner_mut().poll_handshake(cx)).await - } - - fn poll_handshake(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - self.inner_mut().poll_handshake(cx) - } - - fn get_alpn_protocol(&mut self) -> Option<ByteString> { - self.inner_mut().tls.alpn_protocol().map(|s| s.into()) - } -} - -impl AsyncRead for TlsStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - self.inner_mut().poll_read(cx, buf) - } -} - -impl AsyncWrite for TlsStream { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - self.inner_mut().poll_write(cx, buf) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<()>> { - self.inner_mut().poll_io(cx, Flow::Write) - // The underlying TCP stream does not need to be flushed. - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<()>> { - self.inner_mut().poll_shutdown(cx) - } -} - -impl Drop for TlsStream { - fn drop(&mut self) { - let mut inner = self.0.take().unwrap(); - - let mut cx = Context::from_waker(noop_waker_ref()); - let use_linger_task = inner.poll_close(&mut cx).is_pending(); +pub use rustls_tokio_stream::TlsStream; - if use_linger_task { - spawn(poll_fn(move |cx| inner.poll_close(cx))); - } else if cfg!(debug_assertions) { - spawn(async {}); // Spawn dummy task to detect missing runtime. - } - } -} - -pub struct TlsStreamInner { - tls: Connection, - tcp: TcpStream, - rd_state: State, - wr_state: State, -} - -impl TlsStreamInner { - fn poll_io( - &mut self, - cx: &mut Context<'_>, - flow: Flow, - ) -> Poll<io::Result<()>> { - loop { - let wr_ready = loop { - match self.wr_state { - _ if self.tls.is_handshaking() && !self.tls.wants_write() => { - break true; - } - _ if self.tls.is_handshaking() => {} - State::StreamOpen if !self.tls.wants_write() => break true, - State::StreamClosed => { - // Rustls will enqueue the 'CloseNotify' alert and send it after - // flushing the data that is already in the queue. - self.tls.send_close_notify(); - self.wr_state = State::TlsClosing; - continue; - } - State::TlsClosing if !self.tls.wants_write() => { - self.wr_state = State::TlsClosed; - continue; - } - // If a 'CloseNotify' alert sent by the remote end has been received, - // shut down the underlying TCP socket. Otherwise, consider polling - // done for the moment. - State::TlsClosed if self.rd_state < State::TlsClosed => break true, - State::TlsClosed - if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() => - { - break false; - } - State::TlsClosed => { - self.wr_state = State::TcpClosed; - continue; - } - State::TcpClosed => break true, - _ => {} - } - - // Write ciphertext to the TCP socket. - let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp); - match self.tls.write_tls(&mut wrapped_tcp) { - Ok(0) => {} // Wait until the socket has enough buffer space. - Ok(_) => continue, // Try to send more more data immediately. - Err(err) if err.kind() == ErrorKind::WouldBlock => unreachable!(), - Err(err) => return Poll::Ready(Err(err)), - } - - // Poll whether there is space in the socket send buffer so we can flush - // the remaining outgoing ciphertext. - if self.tcp.poll_write_ready(cx)?.is_pending() { - break false; - } - }; - - let rd_ready = loop { - // Interpret and decrypt unprocessed TLS protocol data. - let tls_state = self - .tls - .process_new_packets() - .map_err(|e| Error::new(ErrorKind::InvalidData, e))?; - - match self.rd_state { - State::TcpClosed if self.tls.is_handshaking() => { - let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof"); - return Poll::Ready(Err(err)); - } - _ if self.tls.is_handshaking() && !self.tls.wants_read() => { - break true; - } - _ if self.tls.is_handshaking() => {} - State::StreamOpen if tls_state.plaintext_bytes_to_read() > 0 => { - break true; - } - State::StreamOpen if tls_state.peer_has_closed() => { - self.rd_state = State::TlsClosed; - continue; - } - State::StreamOpen => {} - State::StreamClosed if tls_state.plaintext_bytes_to_read() > 0 => { - // Rustls has more incoming cleartext buffered up, but the TLS - // session is closing so this data will never be processed by the - // application layer. Just like what would happen if this were a raw - // TCP stream, don't gracefully end the TLS session, but abort it. - return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset))); - } - State::StreamClosed => {} - State::TlsClosed if self.wr_state == State::TcpClosed => { - // Keep trying to read from the TCP connection until the remote end - // closes it gracefully. - } - State::TlsClosed => break true, - State::TcpClosed => break true, - _ => unreachable!(), - } - - // Try to read more TLS protocol data from the TCP socket. - let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp); - match self.tls.read_tls(&mut wrapped_tcp) { - Ok(0) => { - self.rd_state = State::TcpClosed; - continue; - } - Ok(_) => continue, - Err(err) if err.kind() == ErrorKind::WouldBlock => {} - Err(err) => return Poll::Ready(Err(err)), - } - - // Get notified when more ciphertext becomes available to read from the - // TCP socket. - if self.tcp.poll_read_ready(cx)?.is_pending() { - break false; - } - }; - - if wr_ready { - if self.rd_state >= State::TlsClosed - && self.wr_state >= State::TlsClosed - && self.wr_state < State::TcpClosed - { - continue; - } - if self.tls.wants_write() { - continue; - } - } - - let io_ready = match flow { - _ if self.tls.is_handshaking() => false, - Flow::Handshake => true, - Flow::Read => rd_ready, - Flow::Write => wr_ready, - }; - return match io_ready { - false => Poll::Pending, - true => Poll::Ready(Ok(())), - }; - } - } - - fn poll_handshake(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - if self.tls.is_handshaking() { - ready!(self.poll_io(cx, Flow::Handshake))?; - } - Poll::Ready(Ok(())) - } - - fn poll_read( - &mut self, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - ready!(self.poll_io(cx, Flow::Read))?; - - if self.rd_state == State::StreamOpen { - // TODO(bartlomieju): - #[allow(clippy::undocumented_unsafe_blocks)] - let buf_slice = - unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) }; - let bytes_read = self.tls.reader().read(buf_slice)?; - assert_ne!(bytes_read, 0); - // TODO(bartlomieju): - #[allow(clippy::undocumented_unsafe_blocks)] - unsafe { - buf.assume_init(bytes_read) - }; - buf.advance(bytes_read); - } - - Poll::Ready(Ok(())) - } - - fn poll_write( - &mut self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - if buf.is_empty() { - // Tokio-rustls compatibility: a zero byte write always succeeds. - Poll::Ready(Ok(0)) - } else if self.wr_state == State::StreamOpen { - // Flush Rustls' ciphertext send queue. - ready!(self.poll_io(cx, Flow::Write))?; - - // Copy data from `buf` to the Rustls cleartext send queue. - let bytes_written = self.tls.writer().write(buf)?; - assert_ne!(bytes_written, 0); - - // Try to flush as much ciphertext as possible. However, since we just - // handed off at least some bytes to rustls, so we can't return - // `Poll::Pending()` any more: this would tell the caller that it should - // try to send those bytes again. - let _ = self.poll_io(cx, Flow::Write)?; - - Poll::Ready(Ok(bytes_written)) - } else { - // Return error if stream has been shut down for writing. - Poll::Ready(Err(ErrorKind::BrokenPipe.into())) - } - } - - fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - if self.wr_state == State::StreamOpen { - self.wr_state = State::StreamClosed; - } - - ready!(self.poll_io(cx, Flow::Write))?; - - // At minimum, a TLS 'CloseNotify' alert should have been sent. - assert!(self.wr_state >= State::TlsClosed); - // If we received a TLS 'CloseNotify' alert from the remote end - // already, the TCP socket should be shut down at this point. - assert!( - self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed - ); - - Poll::Ready(Ok(())) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - if self.rd_state == State::StreamOpen { - self.rd_state = State::StreamClosed; - } - - // Wait for the handshake to complete. - ready!(self.poll_io(cx, Flow::Handshake))?; - // Send TLS 'CloseNotify' alert. - ready!(self.poll_shutdown(cx))?; - // Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet. - ready!(self.poll_io(cx, Flow::Read))?; - - assert_eq!(self.rd_state, State::TcpClosed); - assert_eq!(self.wr_state, State::TcpClosed); - - Poll::Ready(Ok(())) - } -} - -pub struct ReadHalf { - shared: Arc<Shared>, -} - -impl ReadHalf { - pub fn reunite(self, wr: WriteHalf) -> TlsStream { - assert!(Arc::ptr_eq(&self.shared, &wr.shared)); - drop(wr); // Drop `wr`, so only one strong reference to `shared` remains. - - Arc::try_unwrap(self.shared) - .unwrap_or_else(|_| panic!("Arc::<Shared>::try_unwrap() failed")) - .tls_stream - .into_inner() - } -} - -impl AsyncRead for ReadHalf { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - self - .shared - .poll_with_shared_waker(cx, Flow::Read, move |tls, cx| { - tls.poll_read(cx, buf) - }) - } -} - -pub struct WriteHalf { - shared: Arc<Shared>, -} - -impl WriteHalf { - pub async fn handshake(&mut self) -> io::Result<()> { - poll_fn(|cx| { - self - .shared - .poll_with_shared_waker(cx, Flow::Write, |mut tls, cx| { - tls.poll_handshake(cx) - }) - }) - .await - } - - fn get_alpn_protocol(&mut self) -> Option<ByteString> { - self.shared.get_alpn_protocol() - } -} - -impl AsyncWrite for WriteHalf { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - self - .shared - .poll_with_shared_waker(cx, Flow::Write, move |tls, cx| { - tls.poll_write(cx, buf) - }) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<()>> { - self - .shared - .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx)) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<()>> { - self - .shared - .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx)) - } -} - -struct Shared { - tls_stream: Mutex<TlsStream>, - rd_waker: AtomicWaker, - wr_waker: AtomicWaker, -} - -impl Shared { - fn new(tls_stream: TlsStream) -> Arc<Self> { - let self_ = Self { - tls_stream: Mutex::new(tls_stream), - rd_waker: AtomicWaker::new(), - wr_waker: AtomicWaker::new(), - }; - Arc::new(self_) - } - - fn poll_with_shared_waker<R>( - self: &Arc<Self>, - cx: &mut Context<'_>, - flow: Flow, - mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R, - ) -> R { - match flow { - Flow::Handshake => unreachable!(), - Flow::Read => self.rd_waker.register(cx.waker()), - Flow::Write => self.wr_waker.register(cx.waker()), - } - - let shared_waker = self.new_shared_waker(); - let mut cx = Context::from_waker(&shared_waker); - - let mut tls_stream = self.tls_stream.lock(); - f(Pin::new(&mut tls_stream), &mut cx) - } - - const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - Self::clone_shared_waker, - Self::wake_shared_waker, - Self::wake_shared_waker_by_ref, - Self::drop_shared_waker, - ); - - fn new_shared_waker(self: &Arc<Self>) -> Waker { - let self_weak = Arc::downgrade(self); - let self_ptr = self_weak.into_raw() as *const (); - let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE); - // TODO(bartlomieju): - #[allow(clippy::undocumented_unsafe_blocks)] - unsafe { - Waker::from_raw(raw_waker) - } - } - - fn clone_shared_waker(self_ptr: *const ()) -> RawWaker { - // TODO(bartlomieju): - #[allow(clippy::undocumented_unsafe_blocks)] - let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; - let ptr1 = self_weak.clone().into_raw(); - let ptr2 = self_weak.into_raw(); - assert!(ptr1 == ptr2); - RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE) - } - - fn wake_shared_waker(self_ptr: *const ()) { - Self::wake_shared_waker_by_ref(self_ptr); - Self::drop_shared_waker(self_ptr); - } - - fn wake_shared_waker_by_ref(self_ptr: *const ()) { - // TODO(bartlomieju): - #[allow(clippy::undocumented_unsafe_blocks)] - let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; - if let Some(self_arc) = Weak::upgrade(&self_weak) { - self_arc.rd_waker.wake(); - self_arc.wr_waker.wake(); - } - let _ = self_weak.into_raw(); - } - - fn drop_shared_waker(self_ptr: *const ()) { - // TODO(bartlomieju): - #[allow(clippy::undocumented_unsafe_blocks)] - let _ = unsafe { Weak::from_raw(self_ptr as *const Self) }; - } - - fn get_alpn_protocol(self: &Arc<Self>) -> Option<ByteString> { - let mut tls_stream = self.tls_stream.lock(); - tls_stream.get_alpn_protocol() - } -} - -struct ImplementReadTrait<'a, T>(&'a mut T); - -impl Read for ImplementReadTrait<'_, TcpStream> { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.0.try_read(buf) - } -} - -struct ImplementWriteTrait<'a, T>(&'a mut T); - -impl Write for ImplementWriteTrait<'_, TcpStream> { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - match self.0.try_write(buf) { - Ok(n) => Ok(n), - Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(0), - Err(err) => Err(err), - } - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} +pub(crate) const TLS_BUFFER_SIZE: Option<NonZeroUsize> = + NonZeroUsize::new(65536); #[derive(Debug)] pub struct TlsStreamResource { - rd: AsyncRefCell<ReadHalf>, - wr: AsyncRefCell<WriteHalf>, + rd: AsyncRefCell<TlsStreamRead>, + wr: AsyncRefCell<TlsStreamWrite>, // `None` when a TLS handshake hasn't been done. handshake_info: RefCell<Option<TlsHandshakeInfo>>, cancel_handle: CancelHandle, // Only read and handshake ops get canceled. } impl TlsStreamResource { - pub fn new((rd, wr): (ReadHalf, WriteHalf)) -> Self { + pub fn new((rd, wr): (TlsStreamRead, TlsStreamWrite)) -> Self { Self { rd: rd.into(), wr: wr.into(), @@ -697,7 +79,7 @@ impl TlsStreamResource { } } - pub fn into_inner(self) -> (ReadHalf, WriteHalf) { + pub fn into_inner(self) -> (TlsStreamRead, TlsStreamWrite) { (self.rd.into_inner(), self.wr.into_inner()) } @@ -707,12 +89,10 @@ impl TlsStreamResource { ) -> Result<usize, AnyError> { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); - let nread = rd.read(data).try_or_cancel(cancel_handle).await?; - Ok(nread) + Ok(rd.read(data).try_or_cancel(cancel_handle).await?) } pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> { - self.handshake().await?; let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; let nwritten = wr.write(data).await?; wr.flush().await?; @@ -720,7 +100,6 @@ impl TlsStreamResource { } pub async fn shutdown(self: Rc<Self>) -> Result<(), AnyError> { - self.handshake().await?; let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; wr.shutdown().await?; Ok(()) @@ -735,9 +114,9 @@ impl TlsStreamResource { let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; let cancel_handle = RcRef::map(self, |r| &r.cancel_handle); - wr.handshake().try_or_cancel(cancel_handle).await?; + let handshake = wr.handshake().try_or_cancel(cancel_handle).await?; - let alpn_protocol = wr.get_alpn_protocol(); + let alpn_protocol = handshake.alpn.map(|alpn| alpn.into()); let tls_info = TlsHandshakeInfo { alpn_protocol }; self.handshake_info.replace(Some(tls_info.clone())); Ok(tls_info) @@ -849,9 +228,12 @@ where } let tls_config = Arc::new(tls_config); - - let tls_stream = - TlsStream::new_client_side(tcp_stream, tls_config, hostname_dns); + let tls_stream = TlsStream::new_client_side( + tcp_stream, + tls_config, + hostname_dns, + TLS_BUFFER_SIZE, + ); let rid = { let mut state_ = state.borrow_mut(); @@ -950,8 +332,12 @@ where let tls_config = Arc::new(tls_config); - let tls_stream = - TlsStream::new_client_side(tcp_stream, tls_config, hostname_dns); + let tls_stream = TlsStream::new_client_side( + tcp_stream, + tls_config, + hostname_dns, + TLS_BUFFER_SIZE, + ); let rid = { let mut state_ = state.borrow_mut(); @@ -1136,8 +522,11 @@ pub async fn op_net_accept_tls( let local_addr = tcp_stream.local_addr()?; - let tls_stream = - TlsStream::new_server_side(tcp_stream, resource.tls_config.clone()); + let tls_stream = TlsStream::new_server_side( + tcp_stream, + resource.tls_config.clone(), + TLS_BUFFER_SIZE, + ); let rid = { let mut state_ = state.borrow_mut(); |