diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-11-07 15:59:02 +0100 |
---|---|---|
committer | Ry Dahl <ry@tinyclouds.org> | 2019-11-07 09:59:02 -0500 |
commit | 415d4c2e5236f6d8dfef8865b1665f144c39a019 (patch) | |
tree | e7c936d97d077c6ba1ce79b495894d182299c96f /cli/ops/tls.rs | |
parent | f466ef97061ffed5baf1612a646accb2cda4b772 (diff) |
refactor: rewrite accept resources (#3271)
Diffstat (limited to 'cli/ops/tls.rs')
-rw-r--r-- | cli/ops/tls.rs | 179 |
1 files changed, 167 insertions, 12 deletions
diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 569b5a1f6..ee08f357a 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -1,18 +1,22 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::deno_error::bad_resource; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; use crate::resources; +use crate::resources::CoreResource; use crate::state::ThreadSafeState; -use crate::tokio_util; use deno::*; +use futures::Async; use futures::Future; +use futures::Poll; use std; use std::convert::From; use std::fs::File; use std::io::BufReader; +use std::net::SocketAddr; use std::sync::Arc; use tokio; use tokio::net::TcpListener; @@ -167,6 +171,60 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, ErrBox> { Ok(keys) } +#[allow(dead_code)] +pub struct TlsListenerResource { + listener: tokio::net::TcpListener, + tls_acceptor: TlsAcceptor, + task: Option<futures::task::Task>, + local_addr: SocketAddr, +} + +impl CoreResource for TlsListenerResource {} + +impl Drop for TlsListenerResource { + fn drop(&mut self) { + self.notify_task(); + } +} + +impl TlsListenerResource { + /// Track the current task so future awaiting for connection + /// can be notified when listener is closed. + /// + /// Throws an error if another task is already tracked. + pub fn track_task(&mut self) -> Result<(), ErrBox> { + // Currently, we only allow tracking a single accept task for a listener. + // This might be changed in the future with multiple workers. + // Caveat: TcpListener by itself also only tracks an accept task at a time. + // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 + if self.task.is_some() { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Another accept task is ongoing", + ); + return Err(ErrBox::from(e)); + } + + self.task.replace(futures::task::current()); + Ok(()) + } + + /// Notifies a task when listener is closed so accept future can resolve. + pub fn notify_task(&mut self) { + if let Some(task) = self.task.take() { + task.notify(); + } + } + + /// Stop tracking a task. + /// Happens when the task is done and thus no further tracking is needed. + pub fn untrack_task(&mut self) { + if self.task.is_some() { + self.task.take(); + } + } +} + #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct ListenTlsArgs { @@ -196,18 +254,110 @@ fn op_listen_tls( config .set_single_cert(load_certs(&cert_file)?, load_keys(&key_file)?.remove(0)) .expect("invalid key or certificate"); - let acceptor = TlsAcceptor::from(Arc::new(config)); + let tls_acceptor = TlsAcceptor::from(Arc::new(config)); let addr = resolve_addr(&args.hostname, args.port).wait()?; let listener = TcpListener::bind(&addr)?; let local_addr = listener.local_addr()?; - let resource = resources::add_tls_listener(listener, acceptor); + let local_addr_str = local_addr.to_string(); + let tls_listener_resource = TlsListenerResource { + listener, + tls_acceptor, + task: None, + local_addr, + }; + let mut table = resources::lock_resource_table(); + let rid = table.add("tlsListener", Box::new(tls_listener_resource)); Ok(JsonOp::Sync(json!({ - "rid": resource.rid, - "localAddr": local_addr.to_string() + "rid": rid, + "localAddr": local_addr_str }))) } +#[derive(Debug, PartialEq)] +enum AcceptTlsState { + Eager, + Pending, + Done, +} + +/// Simply accepts a TLS connection. +pub fn accept_tls(rid: ResourceId) -> AcceptTls { + AcceptTls { + state: AcceptTlsState::Eager, + rid, + } +} + +/// A future representing state of accepting a TLS connection. +#[derive(Debug)] +pub struct AcceptTls { + state: AcceptTlsState, + rid: ResourceId, +} + +impl Future for AcceptTls { + type Item = (TcpStream, SocketAddr); + type Error = ErrBox; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + if self.state == AcceptTlsState::Done { + panic!("poll AcceptTls after it's done"); + } + + let mut table = resources::lock_resource_table(); + let listener_resource = table + .get_mut::<TlsListenerResource>(self.rid) + .ok_or_else(|| { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Listener has been closed", + ); + ErrBox::from(e) + })?; + + let listener = &mut listener_resource.listener; + + if self.state == AcceptTlsState::Eager { + // Similar to try_ready!, but also track/untrack accept task + // in TcpListener resource. + // In this way, when the listener is closed, the task can be + // notified to error out (instead of stuck forever). + match listener.poll_accept().map_err(ErrBox::from) { + Ok(Async::Ready((stream, addr))) => { + self.state = AcceptTlsState::Done; + return Ok((stream, addr).into()); + } + Ok(Async::NotReady) => { + self.state = AcceptTlsState::Pending; + return Ok(Async::NotReady); + } + Err(e) => { + self.state = AcceptTlsState::Done; + return Err(e); + } + } + } + + match listener.poll_accept().map_err(ErrBox::from) { + Ok(Async::Ready((stream, addr))) => { + listener_resource.untrack_task(); + self.state = AcceptTlsState::Done; + Ok((stream, addr).into()) + } + Ok(Async::NotReady) => { + listener_resource.track_task()?; + Ok(Async::NotReady) + } + Err(e) => { + listener_resource.untrack_task(); + self.state = AcceptTlsState::Done; + Err(e) + } + } + } +} + #[derive(Deserialize)] struct AcceptTlsArgs { rid: i32, @@ -219,26 +369,31 @@ fn op_accept_tls( _zero_copy: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: AcceptTlsArgs = serde_json::from_value(args)?; - let server_rid = args.rid as u32; + let rid = args.rid as u32; - let server_resource = resources::lookup(server_rid)?; - let op = tokio_util::accept(server_resource) + let op = accept_tls(rid) .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; Ok((tcp_stream, local_addr, remote_addr)) }) .and_then(move |(tcp_stream, local_addr, remote_addr)| { - let mut server_resource = resources::lookup(server_rid).unwrap(); - server_resource - .poll_accept_tls(tcp_stream) + let table = resources::lock_resource_table(); + let resource = table + .get::<TlsListenerResource>(rid) + .ok_or_else(bad_resource) + .expect("Can't find tls listener"); + + resource + .tls_acceptor + .accept(tcp_stream) + .map_err(ErrBox::from) .and_then(move |tls_stream| { let tls_stream_resource = resources::add_server_tls_stream(tls_stream); Ok((tls_stream_resource, local_addr, remote_addr)) }) }) - .map_err(ErrBox::from) .and_then(move |(tls_stream_resource, local_addr, remote_addr)| { futures::future::ok(json!({ "rid": tls_stream_resource.rid, |