diff options
Diffstat (limited to 'cli/ops/tls.rs')
-rw-r--r-- | cli/ops/tls.rs | 201 |
1 files changed, 118 insertions, 83 deletions
diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 48419f76f..484b7057c 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -9,16 +9,22 @@ use crate::resolve_addr::resolve_addr; use crate::state::ThreadSafeState; use deno::Resource; use deno::*; -use futures::Async; -use futures::Future; -use futures::Poll; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; use std; use std::convert::From; use std::fs::File; +use std::future::Future; use std::io::BufReader; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use tokio; +use tokio::net::tcp::Incoming; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_rustls::{rustls::ClientConfig, TlsConnector}; @@ -72,49 +78,63 @@ pub fn op_dial_tls( } let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { - TcpStream::connect(&addr) + futures::compat::Compat01As03::new(TcpStream::connect(&addr)) .and_then(move |tcp_stream| { - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; + let local_addr = match tcp_stream.local_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(e), + }; + let remote_addr = match tcp_stream.peer_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(e), + }; let mut config = ClientConfig::new(); config .root_store .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); if let Some(path) = cert_file { - let key_file = File::open(path)?; + let key_file = match File::open(path) { + Ok(v) => v, + Err(e) => return futures::future::err(e), + }; let reader = &mut BufReader::new(key_file); config.root_store.add_pem_file(reader).unwrap(); } - let tls_connector = TlsConnector::from(Arc::new(config)); - Ok((tls_connector, tcp_stream, local_addr, remote_addr)) + futures::future::ok(( + tls_connector, + tcp_stream, + local_addr, + remote_addr, + )) }) .map_err(ErrBox::from) .and_then( move |(tls_connector, tcp_stream, local_addr, remote_addr)| { let dnsname = DNSNameRef::try_from_ascii_str(&domain) .expect("Invalid DNS lookup"); - tls_connector - .connect(dnsname, tcp_stream) - .map_err(ErrBox::from) - .and_then(move |tls_stream| { - let mut table = state_.lock_resource_table(); - let rid = table.add( - "clientTlsStream", - Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), - ); - futures::future::ok(json!({ - "rid": rid, - "localAddr": local_addr.to_string(), - "remoteAddr": remote_addr.to_string(), - })) - }) + futures::compat::Compat01As03::new( + tls_connector.connect(dnsname, tcp_stream), + ) + .map_err(ErrBox::from) + .and_then(move |tls_stream| { + let mut table = state_.lock_resource_table(); + let rid = table.add( + "clientTlsStream", + Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), + ); + futures::future::ok(json!({ + "rid": rid, + "localAddr": local_addr.to_string(), + "remoteAddr": remote_addr.to_string(), + })) + }) }, ) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } fn load_certs(path: &str) -> Result<Vec<Certificate>, ErrBox> { @@ -177,9 +197,9 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, ErrBox> { #[allow(dead_code)] pub struct TlsListenerResource { - listener: tokio::net::TcpListener, + listener: Incoming, tls_acceptor: TlsAcceptor, - task: Option<futures::task::Task>, + waker: Option<futures::task::AtomicWaker>, local_addr: SocketAddr, } @@ -187,7 +207,7 @@ impl Resource for TlsListenerResource {} impl Drop for TlsListenerResource { fn drop(&mut self) { - self.notify_task(); + self.wake_task(); } } @@ -196,12 +216,12 @@ impl TlsListenerResource { /// can be notified when listener is closed. /// /// Throws an error if another task is already tracked. - pub fn track_task(&mut self) -> Result<(), ErrBox> { + pub fn track_task(&mut self, cx: &Context) -> Result<(), ErrBox> { // Currently, we only allow tracking a single accept task for a listener. // This might be changed in the future with multiple workers. // Caveat: TcpListener by itself also only tracks an accept task at a time. // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if self.task.is_some() { + if self.waker.is_some() { let e = std::io::Error::new( std::io::ErrorKind::Other, "Another accept task is ongoing", @@ -209,22 +229,24 @@ impl TlsListenerResource { return Err(ErrBox::from(e)); } - self.task.replace(futures::task::current()); + let waker = futures::task::AtomicWaker::new(); + waker.register(cx.waker()); + self.waker.replace(waker); Ok(()) } /// Notifies a task when listener is closed so accept future can resolve. - pub fn notify_task(&mut self) { - if let Some(task) = self.task.take() { - task.notify(); + pub fn wake_task(&mut self) { + if let Some(waker) = self.waker.as_ref() { + waker.wake(); } } /// Stop tracking a task. /// Happens when the task is done and thus no further tracking is needed. pub fn untrack_task(&mut self) { - if self.task.is_some() { - self.task.take(); + if self.waker.is_some() { + self.waker.take(); } } } @@ -259,14 +281,15 @@ fn op_listen_tls( .set_single_cert(load_certs(&cert_file)?, load_keys(&key_file)?.remove(0)) .expect("invalid key or certificate"); let tls_acceptor = TlsAcceptor::from(Arc::new(config)); - let addr = resolve_addr(&args.hostname, args.port).wait()?; + let addr = + futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; let listener = TcpListener::bind(&addr)?; let local_addr = listener.local_addr()?; let local_addr_str = local_addr.to_string(); let tls_listener_resource = TlsListenerResource { - listener, + listener: listener.incoming(), tls_acceptor, - task: None, + waker: None, local_addr, }; let mut table = state.lock_resource_table(); @@ -302,17 +325,17 @@ pub struct AcceptTls { } impl Future for AcceptTls { - type Item = (TcpStream, SocketAddr); - type Error = ErrBox; + type Output = Result<(TcpStream, SocketAddr), ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.accept_state == AcceptTlsState::Done { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + if inner.accept_state == AcceptTlsState::Done { panic!("poll AcceptTls after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = inner.state.lock_resource_table(); let listener_resource = table - .get_mut::<TlsListenerResource>(self.rid) + .get_mut::<TlsListenerResource>(inner.rid) .ok_or_else(|| { let e = std::io::Error::new( std::io::ErrorKind::Other, @@ -321,44 +344,50 @@ impl Future for AcceptTls { ErrBox::from(e) })?; - let listener = &mut listener_resource.listener; + let mut listener = + futures::compat::Compat01As03::new(&mut listener_resource.listener) + .map_err(ErrBox::from); - if self.accept_state == AcceptTlsState::Eager { + if inner.accept_state == AcceptTlsState::Eager { // Similar to try_ready!, but also track/untrack accept task // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). - match listener.poll_accept().map_err(ErrBox::from) { - Ok(Async::Ready((stream, addr))) => { - self.accept_state = AcceptTlsState::Done; - return Ok((stream, addr).into()); + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stream))) => { + inner.accept_state = AcceptTlsState::Done; + let addr = stream.peer_addr().unwrap(); + return Poll::Ready(Ok((stream, addr))); } - Ok(Async::NotReady) => { - self.accept_state = AcceptTlsState::Pending; - return Ok(Async::NotReady); + Poll::Pending => { + inner.accept_state = AcceptTlsState::Pending; + return Poll::Pending; } - Err(e) => { - self.accept_state = AcceptTlsState::Done; - return Err(e); + Poll::Ready(Some(Err(e))) => { + inner.accept_state = AcceptTlsState::Done; + return Poll::Ready(Err(e)); } + _ => unreachable!(), } } - match listener.poll_accept().map_err(ErrBox::from) { - Ok(Async::Ready((stream, addr))) => { + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stream))) => { listener_resource.untrack_task(); - self.accept_state = AcceptTlsState::Done; - Ok((stream, addr).into()) + inner.accept_state = AcceptTlsState::Done; + let addr = stream.peer_addr().unwrap(); + Poll::Ready(Ok((stream, addr))) } - Ok(Async::NotReady) => { - listener_resource.track_task()?; - Ok(Async::NotReady) + Poll::Pending => { + listener_resource.track_task(cx)?; + Poll::Pending } - Err(e) => { + Poll::Ready(Some(Err(e))) => { listener_resource.untrack_task(); - self.accept_state = AcceptTlsState::Done; - Err(e) + inner.accept_state = AcceptTlsState::Done; + Poll::Ready(Err(e)) } + _ => unreachable!(), } } } @@ -379,9 +408,15 @@ fn op_accept_tls( let state2 = state.clone(); let op = accept_tls(state, rid) .and_then(move |(tcp_stream, _socket_addr)| { - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - Ok((tcp_stream, local_addr, remote_addr)) + let local_addr = match tcp_stream.local_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; + let remote_addr = match tcp_stream.peer_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; + futures::future::ok((tcp_stream, local_addr, remote_addr)) }) .and_then(move |(tcp_stream, local_addr, remote_addr)| { let table = state1.lock_resource_table(); @@ -390,18 +425,18 @@ fn op_accept_tls( .ok_or_else(bad_resource) .expect("Can't find tls listener"); - resource - .tls_acceptor - .accept(tcp_stream) - .map_err(ErrBox::from) - .and_then(move |tls_stream| { - let mut table = state2.lock_resource_table(); - let rid = table.add( - "serverTlsStream", - Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), - ); - Ok((rid, local_addr, remote_addr)) - }) + futures::compat::Compat01As03::new( + resource.tls_acceptor.accept(tcp_stream), + ) + .map_err(ErrBox::from) + .and_then(move |tls_stream| { + let mut table = state2.lock_resource_table(); + let rid = table.add( + "serverTlsStream", + Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), + ); + futures::future::ok((rid, local_addr, remote_addr)) + }) }) .and_then(move |(rid, local_addr, remote_addr)| { futures::future::ok(json!({ @@ -411,5 +446,5 @@ fn op_accept_tls( })) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } |