diff options
Diffstat (limited to 'runtime/ops/tls.rs')
-rw-r--r-- | runtime/ops/tls.rs | 245 |
1 files changed, 99 insertions, 146 deletions
diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs index b59650ab0..0630747ed 100644 --- a/runtime/ops/tls.rs +++ b/runtime/ops/tls.rs @@ -1,6 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use super::io::{StreamResource, StreamResourceHolder}; +use super::io::StreamResource; +use super::io::TcpStreamResource; use crate::permissions::Permissions; use crate::resolve_addr::resolve_addr; use crate::resolve_addr::resolve_addr_sync; @@ -8,25 +9,26 @@ use deno_core::error::bad_resource; use deno_core::error::bad_resource_id; use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::future::poll_fn; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; use serde::Deserialize; +use std::borrow::Cow; use std::cell::RefCell; use std::convert::From; use std::fs::File; use std::io::BufReader; -use std::net::SocketAddr; use std::path::Path; use std::rc::Rc; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_rustls::{rustls::ClientConfig, TlsConnector}; @@ -85,60 +87,53 @@ async fn op_start_tls( permissions.check_read(Path::new(&path))?; } } - let mut resource_holder = { - let mut state_ = state.borrow_mut(); - match state_.resource_table.remove::<StreamResourceHolder>(rid) { - Some(resource) => *resource, - None => return Err(bad_resource_id()), - } - }; - if let StreamResource::TcpStream(ref mut tcp_stream) = - resource_holder.resource - { - let tcp_stream = tcp_stream.take().unwrap(); - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - let mut config = ClientConfig::new(); - config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - if let Some(path) = cert_file { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - config.root_store.add_pem_file(reader).unwrap(); - } + let resource_rc = state + .borrow_mut() + .resource_table + .take::<TcpStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; - let tls_connector = TlsConnector::from(Arc::new(config)); - let dnsname = - DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); - let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; - - let rid = { - let mut state_ = state.borrow_mut(); - state_.resource_table.add( - "clientTlsStream", - Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream( - Box::new(tls_stream), - ))), - ) - }; - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": "tcp", - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": "tcp", - } - })) - } else { - Err(bad_resource_id()) + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + if let Some(path) = cert_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + config.root_store.add_pem_file(reader).unwrap(); } + + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(StreamResource::client_tls_stream(tls_stream)) + }; + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": "tcp", + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": "tcp", + } + })) } async fn op_connect_tls( @@ -180,12 +175,9 @@ async fn op_connect_tls( let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; let rid = { let mut state_ = state.borrow_mut(); - state_.resource_table.add( - "clientTlsStream", - Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream( - Box::new(tls_stream), - ))), - ) + state_ + .resource_table + .add(StreamResource::client_tls_stream(tls_stream)) }; Ok(json!({ "rid": rid, @@ -256,51 +248,19 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { Ok(keys) } -#[allow(dead_code)] pub struct TlsListenerResource { - listener: TcpListener, + listener: AsyncRefCell<TcpListener>, tls_acceptor: TlsAcceptor, - waker: Option<futures::task::AtomicWaker>, - local_addr: SocketAddr, + cancel: CancelHandle, } -impl Drop for TlsListenerResource { - fn drop(&mut self) { - self.wake_task(); +impl Resource for TlsListenerResource { + fn name(&self) -> Cow<str> { + "tlsListener".into() } -} - -impl TlsListenerResource { - /// Track the current task so future awaiting for connection - /// can be notified when listener is closed. - /// - /// Throws an error if another task is already tracked. - pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> { - // Currently, we only allow tracking a single accept task for a listener. - // This might be changed in the future with multiple workers. - // Caveat: TcpListener by itself also only tracks an accept task at a time. - // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if self.waker.is_some() { - return Err(custom_error("Busy", "Another accept task is ongoing")); - } - let waker = futures::task::AtomicWaker::new(); - waker.register(cx.waker()); - self.waker.replace(waker); - Ok(()) - } - - /// Notifies a task when listener is closed so accept future can resolve. - pub fn wake_task(&mut self) { - if let Some(waker) = self.waker.as_ref() { - waker.wake(); - } - } - - /// Stop tracking a task. - /// Happens when the task is done and thus no further tracking is needed. - pub fn untrack_task(&mut self) { - self.waker.take(); + fn close(self: Rc<Self>) { + self.cancel.cancel(); } } @@ -340,15 +300,12 @@ fn op_listen_tls( let listener = TcpListener::from_std(std_listener)?; let local_addr = listener.local_addr()?; let tls_listener_resource = TlsListenerResource { - listener, + listener: AsyncRefCell::new(listener), tls_acceptor, - waker: None, - local_addr, + cancel: Default::default(), }; - let rid = state - .resource_table - .add("tlsListener", Box::new(tls_listener_resource)); + let rid = state.resource_table.add(tls_listener_resource); Ok(json!({ "rid": rid, @@ -372,50 +329,46 @@ async fn op_accept_tls( ) -> Result<Value, AnyError> { let args: AcceptTlsArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let accept_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<TlsListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - match listener.poll_accept(cx).map_err(AnyError::from) { - Poll::Ready(Ok((stream, addr))) => { - listener_resource.untrack_task(); - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => { - listener_resource.track_task(cx)?; - Poll::Pending - } - Poll::Ready(Err(e)) => { - listener_resource.untrack_task(); - Poll::Ready(Err(e)) + + let resource = state + .borrow() + .resource_table + .get::<TlsListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let mut listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (tcp_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await.map_err(|e| { + // FIXME(bartlomieju): compatibility with current JS implementation + if let std::io::ErrorKind::Interrupted = e.kind() { + bad_resource("Listener has been closed") + } else { + e.into() } - } - }); - let (tcp_stream, _socket_addr) = accept_fut.await?; + })?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let tls_acceptor = { - let state_ = state.borrow(); - let resource = state_ - .resource_table - .get::<TlsListenerResource>(rid) - .ok_or_else(bad_resource_id) - .expect("Can't find tls listener"); - resource.tls_acceptor.clone() - }; - let tls_stream = tls_acceptor.accept(tcp_stream).await?; + let resource = state + .borrow() + .resource_table + .get::<TlsListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let cancel = RcRef::map(&resource, |r| &r.cancel); + let tls_acceptor = resource.tls_acceptor.clone(); + let tls_stream = tls_acceptor + .accept(tcp_stream) + .try_or_cancel(cancel) + .await?; + let rid = { let mut state_ = state.borrow_mut(); - state_.resource_table.add( - "serverTlsStream", - Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream( - Box::new(tls_stream), - ))), - ) + state_ + .resource_table + .add(StreamResource::server_tls_stream(tls_stream)) }; + Ok(json!({ "rid": rid, "localAddr": { |