summaryrefslogtreecommitdiff
path: root/runtime/ops/tls.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops/tls.rs')
-rw-r--r--runtime/ops/tls.rs245
1 files changed, 99 insertions, 146 deletions
diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs
index b59650ab0..0630747ed 100644
--- a/runtime/ops/tls.rs
+++ b/runtime/ops/tls.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use super::io::{StreamResource, StreamResourceHolder};
+use super::io::StreamResource;
+use super::io::TcpStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync;
@@ -8,25 +9,26 @@ use deno_core::error::bad_resource;
use deno_core::error::bad_resource_id;
use deno_core::error::custom_error;
use deno_core::error::AnyError;
-use deno_core::futures;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::convert::From;
use std::fs::File;
use std::io::BufReader;
-use std::net::SocketAddr;
use std::path::Path;
use std::rc::Rc;
use std::sync::Arc;
-use std::task::Context;
-use std::task::Poll;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
@@ -85,60 +87,53 @@ async fn op_start_tls(
permissions.check_read(Path::new(&path))?;
}
}
- let mut resource_holder = {
- let mut state_ = state.borrow_mut();
- match state_.resource_table.remove::<StreamResourceHolder>(rid) {
- Some(resource) => *resource,
- None => return Err(bad_resource_id()),
- }
- };
- if let StreamResource::TcpStream(ref mut tcp_stream) =
- resource_holder.resource
- {
- let tcp_stream = tcp_stream.take().unwrap();
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
- let mut config = ClientConfig::new();
- config
- .root_store
- .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
- if let Some(path) = cert_file {
- let key_file = File::open(path)?;
- let reader = &mut BufReader::new(key_file);
- config.root_store.add_pem_file(reader).unwrap();
- }
+ let resource_rc = state
+ .borrow_mut()
+ .resource_table
+ .take::<TcpStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let resource = Rc::try_unwrap(resource_rc)
+ .expect("Only a single use of this resource should happen");
+ let (read_half, write_half) = resource.into_inner();
+ let tcp_stream = read_half.reunite(write_half)?;
- let tls_connector = TlsConnector::from(Arc::new(config));
- let dnsname =
- DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
- let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
-
- let rid = {
- let mut state_ = state.borrow_mut();
- state_.resource_table.add(
- "clientTlsStream",
- Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
- Box::new(tls_stream),
- ))),
- )
- };
- Ok(json!({
- "rid": rid,
- "localAddr": {
- "hostname": local_addr.ip().to_string(),
- "port": local_addr.port(),
- "transport": "tcp",
- },
- "remoteAddr": {
- "hostname": remote_addr.ip().to_string(),
- "port": remote_addr.port(),
- "transport": "tcp",
- }
- }))
- } else {
- Err(bad_resource_id())
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+ if let Some(path) = cert_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
}
+
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
+
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_
+ .resource_table
+ .add(StreamResource::client_tls_stream(tls_stream))
+ };
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": "tcp",
+ },
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": "tcp",
+ }
+ }))
}
async fn op_connect_tls(
@@ -180,12 +175,9 @@ async fn op_connect_tls(
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
let rid = {
let mut state_ = state.borrow_mut();
- state_.resource_table.add(
- "clientTlsStream",
- Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
- Box::new(tls_stream),
- ))),
- )
+ state_
+ .resource_table
+ .add(StreamResource::client_tls_stream(tls_stream))
};
Ok(json!({
"rid": rid,
@@ -256,51 +248,19 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
Ok(keys)
}
-#[allow(dead_code)]
pub struct TlsListenerResource {
- listener: TcpListener,
+ listener: AsyncRefCell<TcpListener>,
tls_acceptor: TlsAcceptor,
- waker: Option<futures::task::AtomicWaker>,
- local_addr: SocketAddr,
+ cancel: CancelHandle,
}
-impl Drop for TlsListenerResource {
- fn drop(&mut self) {
- self.wake_task();
+impl Resource for TlsListenerResource {
+ fn name(&self) -> Cow<str> {
+ "tlsListener".into()
}
-}
-
-impl TlsListenerResource {
- /// Track the current task so future awaiting for connection
- /// can be notified when listener is closed.
- ///
- /// Throws an error if another task is already tracked.
- pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
- // Currently, we only allow tracking a single accept task for a listener.
- // This might be changed in the future with multiple workers.
- // Caveat: TcpListener by itself also only tracks an accept task at a time.
- // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
- if self.waker.is_some() {
- return Err(custom_error("Busy", "Another accept task is ongoing"));
- }
- let waker = futures::task::AtomicWaker::new();
- waker.register(cx.waker());
- self.waker.replace(waker);
- Ok(())
- }
-
- /// Notifies a task when listener is closed so accept future can resolve.
- pub fn wake_task(&mut self) {
- if let Some(waker) = self.waker.as_ref() {
- waker.wake();
- }
- }
-
- /// Stop tracking a task.
- /// Happens when the task is done and thus no further tracking is needed.
- pub fn untrack_task(&mut self) {
- self.waker.take();
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
}
}
@@ -340,15 +300,12 @@ fn op_listen_tls(
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let tls_listener_resource = TlsListenerResource {
- listener,
+ listener: AsyncRefCell::new(listener),
tls_acceptor,
- waker: None,
- local_addr,
+ cancel: Default::default(),
};
- let rid = state
- .resource_table
- .add("tlsListener", Box::new(tls_listener_resource));
+ let rid = state.resource_table.add(tls_listener_resource);
Ok(json!({
"rid": rid,
@@ -372,50 +329,46 @@ async fn op_accept_tls(
) -> Result<Value, AnyError> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let accept_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let listener_resource = state
- .resource_table
- .get_mut::<TlsListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = &mut listener_resource.listener;
- match listener.poll_accept(cx).map_err(AnyError::from) {
- Poll::Ready(Ok((stream, addr))) => {
- listener_resource.untrack_task();
- Poll::Ready(Ok((stream, addr)))
- }
- Poll::Pending => {
- listener_resource.track_task(cx)?;
- Poll::Pending
- }
- Poll::Ready(Err(e)) => {
- listener_resource.untrack_task();
- Poll::Ready(Err(e))
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TlsListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let mut listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (tcp_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await.map_err(|e| {
+ // FIXME(bartlomieju): compatibility with current JS implementation
+ if let std::io::ErrorKind::Interrupted = e.kind() {
+ bad_resource("Listener has been closed")
+ } else {
+ e.into()
}
- }
- });
- let (tcp_stream, _socket_addr) = accept_fut.await?;
+ })?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let tls_acceptor = {
- let state_ = state.borrow();
- let resource = state_
- .resource_table
- .get::<TlsListenerResource>(rid)
- .ok_or_else(bad_resource_id)
- .expect("Can't find tls listener");
- resource.tls_acceptor.clone()
- };
- let tls_stream = tls_acceptor.accept(tcp_stream).await?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TlsListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let cancel = RcRef::map(&resource, |r| &r.cancel);
+ let tls_acceptor = resource.tls_acceptor.clone();
+ let tls_stream = tls_acceptor
+ .accept(tcp_stream)
+ .try_or_cancel(cancel)
+ .await?;
+
let rid = {
let mut state_ = state.borrow_mut();
- state_.resource_table.add(
- "serverTlsStream",
- Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
- Box::new(tls_stream),
- ))),
- )
+ state_
+ .resource_table
+ .add(StreamResource::server_tls_stream(tls_stream))
};
+
Ok(json!({
"rid": rid,
"localAddr": {