diff options
author | Bartek Iwańczuk <biwanczuk@gmail.com> | 2021-06-29 01:43:03 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-29 01:43:03 +0200 |
commit | 38a7128cdd6f3308ba3c13cfb0b0d4ef925a44c3 (patch) | |
tree | 8f0c86028d9ba0266f1846e7f3611f7049cb43a8 /runtime/ops | |
parent | 30cba2484815f712502ae8937a25afa13aba0818 (diff) |
feat: Add "deno_net" extension (#11150)
This commits moves implementation of net related APIs available on "Deno"
namespace to "deno_net" extension.
Following APIs were moved:
- Deno.listen()
- Deno.connect()
- Deno.listenTls()
- Deno.serveHttp()
- Deno.shutdown()
- Deno.resolveDns()
- Deno.listenDatagram()
- Deno.startTls()
- Deno.Conn
- Deno.Listener
- Deno.DatagramConn
Diffstat (limited to 'runtime/ops')
-rw-r--r-- | runtime/ops/http.rs | 579 | ||||
-rw-r--r-- | runtime/ops/io.rs | 129 | ||||
-rw-r--r-- | runtime/ops/mod.rs | 5 | ||||
-rw-r--r-- | runtime/ops/net.rs | 793 | ||||
-rw-r--r-- | runtime/ops/net_unix.rs | 173 | ||||
-rw-r--r-- | runtime/ops/tls.rs | 1017 |
6 files changed, 3 insertions, 2693 deletions
diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs deleted file mode 100644 index 01658c802..000000000 --- a/runtime/ops/http.rs +++ /dev/null @@ -1,579 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -use crate::ops::io::TcpStreamResource; -use crate::ops::io::TlsStreamResource; -use crate::ops::tls::TlsStream; -use deno_core::error::bad_resource_id; -use deno_core::error::null_opbuf; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::futures::FutureExt; -use deno_core::futures::Stream; -use deno_core::futures::StreamExt; -use deno_core::op_async; -use deno_core::op_sync; -use deno_core::AsyncRefCell; -use deno_core::ByteString; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::Extension; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; -use hyper::body::HttpBody; -use hyper::http; -use hyper::server::conn::Connection; -use hyper::server::conn::Http; -use hyper::service::Service as HyperService; -use hyper::Body; -use hyper::Request; -use hyper::Response; -use serde::Deserialize; -use serde::Serialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::future::Future; -use std::net::SocketAddr; -use std::pin::Pin; -use std::rc::Rc; -use std::task::Context; -use std::task::Poll; -use tokio::io::AsyncReadExt; -use tokio::net::TcpStream; -use tokio::sync::oneshot; -use tokio_util::io::StreamReader; - -pub fn init() -> Extension { - Extension::builder() - .ops(vec![ - ("op_http_start", op_sync(op_http_start)), - ("op_http_request_next", op_async(op_http_request_next)), - ("op_http_request_read", op_async(op_http_request_read)), - ("op_http_response", op_async(op_http_response)), - ("op_http_response_write", op_async(op_http_response_write)), - ("op_http_response_close", op_async(op_http_response_close)), - ]) - .build() -} - -struct ServiceInner { - request: Request<Body>, - response_tx: oneshot::Sender<Response<Body>>, -} - -#[derive(Clone, Default)] -struct Service { - inner: Rc<RefCell<Option<ServiceInner>>>, - waker: Rc<deno_core::futures::task::AtomicWaker>, -} - -impl HyperService<Request<Body>> for Service { - type Response = Response<Body>; - type Error = http::Error; - #[allow(clippy::type_complexity)] - type Future = - Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; - - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { - if self.inner.borrow().is_some() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn call(&mut self, req: Request<Body>) -> Self::Future { - let (resp_tx, resp_rx) = oneshot::channel(); - self.inner.borrow_mut().replace(ServiceInner { - request: req, - response_tx: resp_tx, - }); - - async move { Ok(resp_rx.await.unwrap()) }.boxed_local() - } -} - -enum ConnType { - Tcp(Rc<RefCell<Connection<TcpStream, Service, LocalExecutor>>>), - Tls(Rc<RefCell<Connection<TlsStream, Service, LocalExecutor>>>), -} - -struct ConnResource { - hyper_connection: ConnType, - deno_service: Service, - addr: SocketAddr, - cancel: CancelHandle, -} - -impl ConnResource { - // TODO(ry) impl Future for ConnResource? - fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> { - match &self.hyper_connection { - ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx), - ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx), - } - .map_err(AnyError::from) - } -} - -impl Resource for ConnResource { - fn name(&self) -> Cow<str> { - "httpConnection".into() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel() - } -} - -// We use a tuple instead of struct to avoid serialization overhead of the keys. -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct NextRequestResponse( - // request_body_rid: - Option<ResourceId>, - // response_sender_rid: - ResourceId, - // method: - // This is a String rather than a ByteString because reqwest will only return - // the method as a str which is guaranteed to be ASCII-only. - String, - // headers: - Vec<(ByteString, ByteString)>, - // url: - String, -); - -async fn op_http_request_next( - state: Rc<RefCell<OpState>>, - conn_rid: ResourceId, - _: (), -) -> Result<Option<NextRequestResponse>, AnyError> { - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(conn_rid) - .ok_or_else(bad_resource_id)?; - - let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); - - poll_fn(|cx| { - conn_resource.deno_service.waker.register(cx.waker()); - let connection_closed = match conn_resource.poll(cx) { - Poll::Pending => false, - Poll::Ready(Ok(())) => { - // try to close ConnResource, but don't unwrap as it might - // already be closed - let _ = state - .borrow_mut() - .resource_table - .take::<ConnResource>(conn_rid); - true - } - Poll::Ready(Err(e)) => { - // TODO(ry) close RequestResource associated with connection - // TODO(ry) close ResponseBodyResource associated with connection - // close ConnResource - state - .borrow_mut() - .resource_table - .take::<ConnResource>(conn_rid) - .unwrap(); - - if should_ignore_error(&e) { - true - } else { - return Poll::Ready(Err(e)); - } - } - }; - if let Some(request_resource) = - conn_resource.deno_service.inner.borrow_mut().take() - { - let tx = request_resource.response_tx; - let req = request_resource.request; - let method = req.method().to_string(); - - let mut headers = Vec::with_capacity(req.headers().len()); - for (name, value) in req.headers().iter() { - let name: &[u8] = name.as_ref(); - let value = value.as_bytes(); - headers - .push((ByteString(name.to_owned()), ByteString(value.to_owned()))); - } - - let url = { - let scheme = { - match conn_resource.hyper_connection { - ConnType::Tcp(_) => "http", - ConnType::Tls(_) => "https", - } - }; - let host: Cow<str> = if let Some(host) = req.uri().host() { - Cow::Borrowed(host) - } else if let Some(host) = req.headers().get("HOST") { - Cow::Borrowed(host.to_str()?) - } else { - Cow::Owned(conn_resource.addr.to_string()) - }; - let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); - format!("{}://{}{}", scheme, host, path) - }; - - let has_body = if let Some(exact_size) = req.size_hint().exact() { - exact_size > 0 - } else { - true - }; - - let maybe_request_body_rid = if has_body { - let stream: BytesStream = Box::pin(req.into_body().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let stream_reader = StreamReader::new(stream); - let mut state = state.borrow_mut(); - let request_body_rid = state.resource_table.add(RequestBodyResource { - conn_rid, - reader: AsyncRefCell::new(stream_reader), - cancel: CancelHandle::default(), - }); - Some(request_body_rid) - } else { - None - }; - - let mut state = state.borrow_mut(); - let response_sender_rid = - state.resource_table.add(ResponseSenderResource { - sender: tx, - conn_rid, - }); - - Poll::Ready(Ok(Some(NextRequestResponse( - maybe_request_body_rid, - response_sender_rid, - method, - headers, - url, - )))) - } else if connection_closed { - Poll::Ready(Ok(None)) - } else { - Poll::Pending - } - }) - .try_or_cancel(cancel) - .await - .map_err(AnyError::from) -} - -fn should_ignore_error(e: &AnyError) -> bool { - if let Some(e) = e.downcast_ref::<hyper::Error>() { - use std::error::Error; - if let Some(std_err) = e.source() { - if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() { - if io_err.kind() == std::io::ErrorKind::NotConnected { - return true; - } - } - } - } - false -} - -fn op_http_start( - state: &mut OpState, - tcp_stream_rid: ResourceId, - _: (), -) -> Result<ResourceId, AnyError> { - let deno_service = Service::default(); - - if let Some(resource_rc) = state - .resource_table - .take::<TcpStreamResource>(tcp_stream_rid) - { - let resource = Rc::try_unwrap(resource_rc) - .expect("Only a single use of this resource should happen"); - let (read_half, write_half) = resource.into_inner(); - let tcp_stream = read_half.reunite(write_half)?; - let addr = tcp_stream.local_addr()?; - let hyper_connection = Http::new() - .with_executor(LocalExecutor) - .serve_connection(tcp_stream, deno_service.clone()); - let conn_resource = ConnResource { - hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))), - deno_service, - addr, - cancel: CancelHandle::default(), - }; - let rid = state.resource_table.add(conn_resource); - return Ok(rid); - } - - if let Some(resource_rc) = state - .resource_table - .take::<TlsStreamResource>(tcp_stream_rid) - { - let resource = Rc::try_unwrap(resource_rc) - .expect("Only a single use of this resource should happen"); - let (read_half, write_half) = resource.into_inner(); - let tls_stream = read_half.reunite(write_half); - let addr = tls_stream.get_ref().0.local_addr()?; - - let hyper_connection = Http::new() - .with_executor(LocalExecutor) - .serve_connection(tls_stream, deno_service.clone()); - let conn_resource = ConnResource { - hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))), - deno_service, - addr, - cancel: CancelHandle::default(), - }; - let rid = state.resource_table.add(conn_resource); - return Ok(rid); - } - - Err(bad_resource_id()) -} - -// We use a tuple instead of struct to avoid serialization overhead of the keys. -#[derive(Deserialize)] -struct RespondArgs( - // rid: - u32, - // status: - u16, - // headers: - Vec<(ByteString, ByteString)>, -); - -async fn op_http_response( - state: Rc<RefCell<OpState>>, - args: RespondArgs, - data: Option<ZeroCopyBuf>, -) -> Result<Option<ResourceId>, AnyError> { - let RespondArgs(rid, status, headers) = args; - - let response_sender = state - .borrow_mut() - .resource_table - .take::<ResponseSenderResource>(rid) - .ok_or_else(bad_resource_id)?; - let response_sender = Rc::try_unwrap(response_sender) - .ok() - .expect("multiple op_http_respond ongoing"); - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(response_sender.conn_rid) - .ok_or_else(bad_resource_id)?; - - let mut builder = Response::builder().status(status); - - builder.headers_mut().unwrap().reserve(headers.len()); - for (key, value) in &headers { - builder = builder.header(key.as_ref(), value.as_ref()); - } - - let res; - let maybe_response_body_rid = if let Some(d) = data { - // If a body is passed, we use it, and don't return a body for streaming. - res = builder.body(Vec::from(&*d).into())?; - None - } else { - // If no body is passed, we return a writer for streaming the body. - let (sender, body) = Body::channel(); - res = builder.body(body)?; - - let response_body_rid = - state.borrow_mut().resource_table.add(ResponseBodyResource { - body: AsyncRefCell::new(sender), - conn_rid: response_sender.conn_rid, - }); - - Some(response_body_rid) - }; - - // oneshot::Sender::send(v) returns |v| on error, not an error object. - // The only failure mode is the receiver already having dropped its end - // of the channel. - if response_sender.sender.send(res).is_err() { - return Err(type_error("internal communication error")); - } - - poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => Poll::Ready(x), - Poll::Pending => Poll::Ready(Ok(())), - }) - .await?; - - if maybe_response_body_rid.is_none() { - conn_resource.deno_service.waker.wake(); - } - Ok(maybe_response_body_rid) -} - -async fn op_http_response_close( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - _: (), -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .take::<ResponseBodyResource>(rid) - .ok_or_else(bad_resource_id)?; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(resource.conn_rid) - .ok_or_else(bad_resource_id)?; - drop(resource); - - let r = poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => Poll::Ready(x), - Poll::Pending => Poll::Ready(Ok(())), - }) - .await; - conn_resource.deno_service.waker.wake(); - r -} - -async fn op_http_request_read( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - data: Option<ZeroCopyBuf>, -) -> Result<usize, AnyError> { - let mut data = data.ok_or_else(null_opbuf)?; - - let resource = state - .borrow() - .resource_table - .get::<RequestBodyResource>(rid as u32) - .ok_or_else(bad_resource_id)?; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(resource.conn_rid) - .ok_or_else(bad_resource_id)?; - - let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); - - poll_fn(|cx| { - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); - } - - read_fut.poll_unpin(cx).map_err(AnyError::from) - }) - .await -} - -async fn op_http_response_write( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - data: Option<ZeroCopyBuf>, -) -> Result<(), AnyError> { - let buf = data.ok_or_else(null_opbuf)?; - let resource = state - .borrow() - .resource_table - .get::<ResponseBodyResource>(rid as u32) - .ok_or_else(bad_resource_id)?; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(resource.conn_rid) - .ok_or_else(bad_resource_id)?; - - let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - - let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local(); - - poll_fn(|cx| { - let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); - - // Poll connection so the data is flushed - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); - } - - r - }) - .await?; - - Ok(()) -} - -type BytesStream = - Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>; - -struct RequestBodyResource { - conn_rid: ResourceId, - reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, - cancel: CancelHandle, -} - -impl Resource for RequestBodyResource { - fn name(&self) -> Cow<str> { - "requestBody".into() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel() - } -} - -struct ResponseSenderResource { - sender: oneshot::Sender<Response<Body>>, - conn_rid: ResourceId, -} - -impl Resource for ResponseSenderResource { - fn name(&self) -> Cow<str> { - "responseSender".into() - } -} - -struct ResponseBodyResource { - body: AsyncRefCell<hyper::body::Sender>, - conn_rid: ResourceId, -} - -impl Resource for ResponseBodyResource { - fn name(&self) -> Cow<str> { - "responseBody".into() - } -} - -// Needed so hyper can use non Send futures -#[derive(Clone)] -struct LocalExecutor; - -impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor -where - Fut: Future + 'static, - Fut::Output: 'static, -{ - fn execute(&self, fut: Fut) { - tokio::task::spawn_local(fut); - } -} diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 18279c0eb..e18846466 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -1,6 +1,5 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use crate::ops::tls; use deno_core::error::null_opbuf; use deno_core::error::resource_unavailable; use deno_core::error::AnyError; @@ -17,6 +16,9 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; +use deno_net::io::TcpStreamResource; +use deno_net::io::TlsStreamResource; +use deno_net::io::UnixStreamResource; use std::borrow::Cow; use std::cell::RefCell; use std::io::Read; @@ -26,13 +28,10 @@ use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use tokio::net::tcp; use tokio::process; #[cfg(unix)] use std::os::unix::io::FromRawFd; -#[cfg(unix)] -use tokio::net::unix; #[cfg(windows)] use std::os::windows::io::FromRawHandle; @@ -238,70 +237,6 @@ where } } -/// A full duplex resource has a read and write ends that are completely -/// independent, like TCP/Unix sockets and TLS streams. -#[derive(Debug)] -pub struct FullDuplexResource<R, W> { - rd: AsyncRefCell<R>, - wr: AsyncRefCell<W>, - // When a full-duplex resource is closed, all pending 'read' ops are - // canceled, while 'write' ops are allowed to complete. Therefore only - // 'read' futures should be attached to this cancel handle. - cancel_handle: CancelHandle, -} - -impl<R, W> FullDuplexResource<R, W> -where - R: AsyncRead + Unpin + 'static, - W: AsyncWrite + Unpin + 'static, -{ - pub fn new((rd, wr): (R, W)) -> Self { - Self { - rd: rd.into(), - wr: wr.into(), - cancel_handle: Default::default(), - } - } - - pub fn into_inner(self) -> (R, W) { - (self.rd.into_inner(), self.wr.into_inner()) - } - - pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> { - RcRef::map(self, |r| &r.rd).borrow_mut() - } - - pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> { - RcRef::map(self, |r| &r.wr).borrow_mut() - } - - pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> { - RcRef::map(self, |r| &r.cancel_handle) - } - - pub fn cancel_read_ops(&self) { - self.cancel_handle.cancel() - } - - async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { - let mut rd = self.rd_borrow_mut().await; - let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?; - Ok(nread) - } - - async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { - let mut wr = self.wr_borrow_mut().await; - let nwritten = wr.write(buf).await?; - Ok(nwritten) - } - - async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> { - let mut wr = self.wr_borrow_mut().await; - wr.shutdown().await?; - Ok(()) - } -} - pub type ChildStdinResource = WriteOnlyResource<process::ChildStdin>; impl Resource for ChildStdinResource { @@ -334,64 +269,6 @@ impl Resource for ChildStderrResource { } } -pub type TcpStreamResource = - FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>; - -impl Resource for TcpStreamResource { - fn name(&self) -> Cow<str> { - "tcpStream".into() - } - - fn close(self: Rc<Self>) { - self.cancel_read_ops(); - } -} - -pub type TlsStreamResource = FullDuplexResource<tls::ReadHalf, tls::WriteHalf>; - -impl Resource for TlsStreamResource { - fn name(&self) -> Cow<str> { - "tlsStream".into() - } - - fn close(self: Rc<Self>) { - self.cancel_read_ops(); - } -} - -#[cfg(unix)] -pub type UnixStreamResource = - FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>; - -#[cfg(not(unix))] -struct UnixStreamResource; - -#[cfg(not(unix))] -impl UnixStreamResource { - async fn read(self: &Rc<Self>, _buf: &mut [u8]) -> Result<usize, AnyError> { - unreachable!() - } - async fn write(self: &Rc<Self>, _buf: &[u8]) -> Result<usize, AnyError> { - unreachable!() - } - async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> { - unreachable!() - } - fn cancel_read_ops(&self) { - unreachable!() - } -} - -impl Resource for UnixStreamResource { - fn name(&self) -> Cow<str> { - "unixStream".into() - } - - fn close(self: Rc<Self>) { - self.cancel_read_ops(); - } -} - #[derive(Debug, Default)] pub struct StdFileResource { pub fs_file: diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs index b05a91180..c94020780 100644 --- a/runtime/ops/mod.rs +++ b/runtime/ops/mod.rs @@ -2,18 +2,13 @@ pub mod fs; pub mod fs_events; -pub mod http; pub mod io; -pub mod net; -#[cfg(unix)] -mod net_unix; pub mod os; pub mod permissions; pub mod plugin; pub mod process; pub mod runtime; pub mod signal; -pub mod tls; pub mod tty; mod utils; pub mod web_worker; diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs deleted file mode 100644 index c9195aab7..000000000 --- a/runtime/ops/net.rs +++ /dev/null @@ -1,793 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use crate::ops::io::TcpStreamResource; -use crate::permissions::Permissions; -use crate::resolve_addr::resolve_addr; -use crate::resolve_addr::resolve_addr_sync; -use deno_core::error::bad_resource; -use deno_core::error::custom_error; -use deno_core::error::generic_error; -use deno_core::error::null_opbuf; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::op_async; -use deno_core::op_sync; -use deno_core::AsyncRefCell; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::Extension; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; -use log::debug; -use serde::Deserialize; -use serde::Serialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::net::SocketAddr; -use std::rc::Rc; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::net::UdpSocket; -use trust_dns_proto::rr::record_data::RData; -use trust_dns_proto::rr::record_type::RecordType; -use trust_dns_resolver::config::NameServerConfigGroup; -use trust_dns_resolver::config::ResolverConfig; -use trust_dns_resolver::config::ResolverOpts; -use trust_dns_resolver::system_conf; -use trust_dns_resolver::AsyncResolver; - -#[cfg(unix)] -use super::net_unix; -#[cfg(unix)] -use crate::ops::io::UnixStreamResource; -#[cfg(unix)] -use std::path::Path; - -pub fn init() -> Extension { - Extension::builder() - .ops(vec![ - ("op_accept", op_async(op_accept)), - ("op_connect", op_async(op_connect)), - ("op_listen", op_sync(op_listen)), - ("op_datagram_receive", op_async(op_datagram_receive)), - ("op_datagram_send", op_async(op_datagram_send)), - ("op_dns_resolve", op_async(op_dns_resolve)), - ]) - .build() -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct OpConn { - pub rid: ResourceId, - pub remote_addr: Option<OpAddr>, - pub local_addr: Option<OpAddr>, -} - -#[derive(Serialize)] -#[serde(tag = "transport", rename_all = "lowercase")] -pub enum OpAddr { - Tcp(IpAddr), - Udp(IpAddr), - #[cfg(unix)] - Unix(net_unix::UnixAddr), - #[cfg(unix)] - UnixPacket(net_unix::UnixAddr), -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -/// A received datagram packet (from udp or unixpacket) -pub struct OpPacket { - pub size: usize, - pub remote_addr: OpAddr, -} - -#[derive(Serialize)] -pub struct IpAddr { - pub hostname: String, - pub port: u16, -} - -#[derive(Deserialize)] -pub(crate) struct AcceptArgs { - pub rid: ResourceId, - pub transport: String, -} - -async fn accept_tcp( - state: Rc<RefCell<OpState>>, - args: AcceptArgs, - _: (), -) -> Result<OpConn, AnyError> { - let rid = args.rid; - - let resource = state - .borrow() - .resource_table - .get::<TcpListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = RcRef::map(&resource, |r| &r.listener) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; - let cancel = RcRef::map(resource, |r| &r.cancel); - let (tcp_stream, _socket_addr) = - listener.accept().try_or_cancel(cancel).await.map_err(|e| { - // FIXME(bartlomieju): compatibility with current JS implementation - if let std::io::ErrorKind::Interrupted = e.kind() { - bad_resource("Listener has been closed") - } else { - e.into() - } - })?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut state = state.borrow_mut(); - let rid = state - .resource_table - .add(TcpStreamResource::new(tcp_stream.into_split())); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) -} - -async fn op_accept( - state: Rc<RefCell<OpState>>, - args: AcceptArgs, - _: (), -) -> Result<OpConn, AnyError> { - match args.transport.as_str() { - "tcp" => accept_tcp(state, args, ()).await, - #[cfg(unix)] - "unix" => net_unix::accept_unix(state, args, ()).await, - other => Err(bad_transport(other)), - } -} - -fn bad_transport(transport: &str) -> AnyError { - generic_error(format!("Unsupported transport protocol {}", transport)) -} - -#[derive(Deserialize)] -pub(crate) struct ReceiveArgs { - pub rid: ResourceId, - pub transport: String, -} - -async fn receive_udp( - state: Rc<RefCell<OpState>>, - args: ReceiveArgs, - zero_copy: Option<ZeroCopyBuf>, -) -> Result<OpPacket, AnyError> { - let zero_copy = zero_copy.ok_or_else(null_opbuf)?; - let mut zero_copy = zero_copy.clone(); - - let rid = args.rid; - - let resource = state - .borrow_mut() - .resource_table - .get::<UdpSocketResource>(rid) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; - let cancel_handle = RcRef::map(&resource, |r| &r.cancel); - let (size, remote_addr) = socket - .recv_from(&mut zero_copy) - .try_or_cancel(cancel_handle) - .await?; - Ok(OpPacket { - size, - remote_addr: OpAddr::Udp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - }), - }) -} - -async fn op_datagram_receive( - state: Rc<RefCell<OpState>>, - args: ReceiveArgs, - zero_copy: Option<ZeroCopyBuf>, -) -> Result<OpPacket, AnyError> { - match args.transport.as_str() { - "udp" => receive_udp(state, args, zero_copy).await, - #[cfg(unix)] - "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await, - other => Err(bad_transport(other)), - } -} - -#[derive(Deserialize)] -struct SendArgs { - rid: ResourceId, - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -async fn op_datagram_send( - state: Rc<RefCell<OpState>>, - args: SendArgs, - zero_copy: Option<ZeroCopyBuf>, -) -> Result<usize, AnyError> { - let zero_copy = zero_copy.ok_or_else(null_opbuf)?; - let zero_copy = zero_copy.clone(); - - match args { - SendArgs { - rid, - transport, - transport_args: ArgsEnum::Ip(args), - } if transport == "udp" => { - { - let mut s = state.borrow_mut(); - s.borrow_mut::<Permissions>() - .net - .check(&(&args.hostname, Some(args.port)))?; - } - let addr = resolve_addr(&args.hostname, args.port) - .await? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - - let resource = state - .borrow_mut() - .resource_table - .get::<UdpSocketResource>(rid) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; - let byte_length = socket.send_to(&zero_copy, &addr).await?; - Ok(byte_length) - } - #[cfg(unix)] - SendArgs { - rid, - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unixpacket" => { - let address_path = Path::new(&args.path); - { - let mut s = state.borrow_mut(); - s.borrow_mut::<Permissions>().write.check(&address_path)?; - } - let resource = state - .borrow() - .resource_table - .get::<net_unix::UnixDatagramResource>(rid) - .ok_or_else(|| { - custom_error("NotConnected", "Socket has been closed") - })?; - let socket = RcRef::map(&resource, |r| &r.socket) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; - let byte_length = socket.send_to(&zero_copy, address_path).await?; - Ok(byte_length) - } - _ => Err(type_error("Wrong argument format!")), - } -} - -#[derive(Deserialize)] -struct ConnectArgs { - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -async fn op_connect( - state: Rc<RefCell<OpState>>, - args: ConnectArgs, - _: (), -) -> Result<OpConn, AnyError> { - match args { - ConnectArgs { - transport, - transport_args: ArgsEnum::Ip(args), - } if transport == "tcp" => { - { - let mut state_ = state.borrow_mut(); - state_ - .borrow_mut::<Permissions>() - .net - .check(&(&args.hostname, Some(args.port)))?; - } - let addr = resolve_addr(&args.hostname, args.port) - .await? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let tcp_stream = TcpStream::connect(&addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut state_ = state.borrow_mut(); - let rid = state_ - .resource_table - .add(TcpStreamResource::new(tcp_stream.into_split())); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) - } - #[cfg(unix)] - ConnectArgs { - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unix" => { - let address_path = Path::new(&args.path); - super::check_unstable2(&state, "Deno.connect"); - { - let mut state_ = state.borrow_mut(); - state_ - .borrow_mut::<Permissions>() - .read - .check(&address_path)?; - state_ - .borrow_mut::<Permissions>() - .write - .check(&address_path)?; - } - let path = args.path; - let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?; - let local_addr = unix_stream.local_addr()?; - let remote_addr = unix_stream.peer_addr()?; - - let mut state_ = state.borrow_mut(); - let resource = UnixStreamResource::new(unix_stream.into_split()); - let rid = state_.resource_table.add(resource); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Unix(net_unix::UnixAddr { - path: local_addr.as_pathname().and_then(net_unix::pathstring), - })), - remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr { - path: remote_addr.as_pathname().and_then(net_unix::pathstring), - })), - }) - } - _ => Err(type_error("Wrong argument format!")), - } -} - -pub struct TcpListenerResource { - pub listener: AsyncRefCell<TcpListener>, - pub cancel: CancelHandle, -} - -impl Resource for TcpListenerResource { - fn name(&self) -> Cow<str> { - "tcpListener".into() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel(); - } -} - -struct UdpSocketResource { - socket: AsyncRefCell<UdpSocket>, - cancel: CancelHandle, -} - -impl Resource for UdpSocketResource { - fn name(&self) -> Cow<str> { - "udpSocket".into() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel() - } -} - -#[derive(Deserialize)] -struct IpListenArgs { - hostname: String, - port: u16, -} - -#[derive(Deserialize)] -#[serde(untagged)] -enum ArgsEnum { - Ip(IpListenArgs), - #[cfg(unix)] - Unix(net_unix::UnixListenArgs), -} - -#[derive(Deserialize)] -struct ListenArgs { - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -fn listen_tcp( - state: &mut OpState, - addr: SocketAddr, -) -> Result<(u32, SocketAddr), AnyError> { - let std_listener = std::net::TcpListener::bind(&addr)?; - std_listener.set_nonblocking(true)?; - let listener = TcpListener::from_std(std_listener)?; - let local_addr = listener.local_addr()?; - let listener_resource = TcpListenerResource { - listener: AsyncRefCell::new(listener), - cancel: Default::default(), - }; - let rid = state.resource_table.add(listener_resource); - - Ok((rid, local_addr)) -} - -fn listen_udp( - state: &mut OpState, - addr: SocketAddr, -) -> Result<(u32, SocketAddr), AnyError> { - let std_socket = std::net::UdpSocket::bind(&addr)?; - std_socket.set_nonblocking(true)?; - let socket = UdpSocket::from_std(std_socket)?; - let local_addr = socket.local_addr()?; - let socket_resource = UdpSocketResource { - socket: AsyncRefCell::new(socket), - cancel: Default::default(), - }; - let rid = state.resource_table.add(socket_resource); - - Ok((rid, local_addr)) -} - -fn op_listen( - state: &mut OpState, - args: ListenArgs, - _: (), -) -> Result<OpConn, AnyError> { - match args { - ListenArgs { - transport, - transport_args: ArgsEnum::Ip(args), - } => { - { - if transport == "udp" { - super::check_unstable(state, "Deno.listenDatagram"); - } - state - .borrow_mut::<Permissions>() - .net - .check(&(&args.hostname, Some(args.port)))?; - } - let addr = resolve_addr_sync(&args.hostname, args.port)? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let (rid, local_addr) = if transport == "tcp" { - listen_tcp(state, addr)? - } else { - listen_udp(state, addr)? - }; - debug!( - "New listener {} {}:{}", - rid, - local_addr.ip().to_string(), - local_addr.port() - ); - let ip_addr = IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - }; - Ok(OpConn { - rid, - local_addr: Some(match transport.as_str() { - "udp" => OpAddr::Udp(ip_addr), - "tcp" => OpAddr::Tcp(ip_addr), - // NOTE: This could be unreachable!() - other => return Err(bad_transport(other)), - }), - remote_addr: None, - }) - } - #[cfg(unix)] - ListenArgs { - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unix" || transport == "unixpacket" => { - let address_path = Path::new(&args.path); - { - if transport == "unix" { - super::check_unstable(state, "Deno.listen"); - } - if transport == "unixpacket" { - super::check_unstable(state, "Deno.listenDatagram"); - } - let permissions = state.borrow_mut::<Permissions>(); - permissions.read.check(&address_path)?; - permissions.write.check(&address_path)?; - } - let (rid, local_addr) = if transport == "unix" { - net_unix::listen_unix(state, &address_path)? - } else { - net_unix::listen_unix_packet(state, &address_path)? - }; - debug!( - "New listener {} {}", - rid, - local_addr.as_pathname().unwrap().display(), - ); - let unix_addr = net_unix::UnixAddr { - path: local_addr.as_pathname().and_then(net_unix::pathstring), - }; - - Ok(OpConn { - rid, - local_addr: Some(match transport.as_str() { - "unix" => OpAddr::Unix(unix_addr), - "unixpacket" => OpAddr::UnixPacket(unix_addr), - other => return Err(bad_transport(other)), - }), - remote_addr: None, - }) - } - #[cfg(unix)] - _ => Err(type_error("Wrong argument format!")), - } -} - -#[derive(Serialize, PartialEq, Debug)] -#[serde(untagged)] -enum DnsReturnRecord { - A(String), - Aaaa(String), - Aname(String), - Cname(String), - Mx { - preference: u16, - exchange: String, - }, - Ptr(String), - Srv { - priority: u16, - weight: u16, - port: u16, - target: String, - }, - Txt(Vec<String>), -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ResolveAddrArgs { - query: String, - record_type: RecordType, - options: Option<ResolveDnsOption>, -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ResolveDnsOption { - name_server: Option<NameServer>, -} - -fn default_port() -> u16 { - 53 -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct NameServer { - ip_addr: String, - #[serde(default = "default_port")] - port: u16, -} - -async fn op_dns_resolve( - state: Rc<RefCell<OpState>>, - args: ResolveAddrArgs, - _: (), -) -> Result<Vec<DnsReturnRecord>, AnyError> { - let ResolveAddrArgs { - query, - record_type, - options, - } = args; - - let (config, opts) = if let Some(name_server) = - options.as_ref().and_then(|o| o.name_server.as_ref()) - { - let group = NameServerConfigGroup::from_ips_clear( - &[name_server.ip_addr.parse()?], - name_server.port, - true, - ); - ( - ResolverConfig::from_parts(None, vec![], group), - ResolverOpts::default(), - ) - } else { - system_conf::read_system_conf()? - }; - - { - let mut s = state.borrow_mut(); - let perm = s.borrow_mut::<Permissions>(); - - // Checks permission against the name servers which will be actually queried. - for ns in config.name_servers() { - let socker_addr = &ns.socket_addr; - let ip = socker_addr.ip().to_string(); - let port = socker_addr.port(); - perm.net.check(&(ip, Some(port)))?; - } - } - - let resolver = AsyncResolver::tokio(config, opts)?; - - let results = resolver - .lookup(query, record_type, Default::default()) - .await - .map_err(|e| generic_error(format!("{}", e)))? - .iter() - .filter_map(rdata_to_return_record(record_type)) - .collect(); - - Ok(results) -} - -fn rdata_to_return_record( - ty: RecordType, -) -> impl Fn(&RData) -> Option<DnsReturnRecord> { - use RecordType::*; - move |r: &RData| -> Option<DnsReturnRecord> { - match ty { - A => r.as_a().map(ToString::to_string).map(DnsReturnRecord::A), - AAAA => r - .as_aaaa() - .map(ToString::to_string) - .map(DnsReturnRecord::Aaaa), - ANAME => r - .as_aname() - .map(ToString::to_string) - .map(DnsReturnRecord::Aname), - CNAME => r - .as_cname() - .map(ToString::to_string) - .map(DnsReturnRecord::Cname), - MX => r.as_mx().map(|mx| DnsReturnRecord::Mx { - preference: mx.preference(), - exchange: mx.exchange().to_string(), - }), - PTR => r - .as_ptr() - .map(ToString::to_string) - .map(DnsReturnRecord::Ptr), - SRV => r.as_srv().map(|srv| DnsReturnRecord::Srv { - priority: srv.priority(), - weight: srv.weight(), - port: srv.port(), - target: srv.target().to_string(), - }), - TXT => r.as_txt().map(|txt| { - let texts: Vec<String> = txt - .iter() - .map(|bytes| { - // Tries to parse these bytes as Latin-1 - bytes.iter().map(|&b| b as char).collect::<String>() - }) - .collect(); - DnsReturnRecord::Txt(texts) - }), - // TODO(magurotuna): Other record types are not supported - _ => todo!(), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::net::Ipv4Addr; - use std::net::Ipv6Addr; - use trust_dns_proto::rr::rdata::mx::MX; - use trust_dns_proto::rr::rdata::srv::SRV; - use trust_dns_proto::rr::rdata::txt::TXT; - use trust_dns_proto::rr::record_data::RData; - use trust_dns_proto::rr::Name; - - #[test] - fn rdata_to_return_record_a() { - let func = rdata_to_return_record(RecordType::A); - let rdata = RData::A(Ipv4Addr::new(127, 0, 0, 1)); - assert_eq!( - func(&rdata), - Some(DnsReturnRecord::A("127.0.0.1".to_string())) - ); - } - - #[test] - fn rdata_to_return_record_aaaa() { - let func = rdata_to_return_record(RecordType::AAAA); - let rdata = RData::AAAA(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); - assert_eq!(func(&rdata), Some(DnsReturnRecord::Aaaa("::1".to_string()))); - } - - #[test] - fn rdata_to_return_record_aname() { - let func = rdata_to_return_record(RecordType::ANAME); - let rdata = RData::ANAME(Name::new()); - assert_eq!(func(&rdata), Some(DnsReturnRecord::Aname("".to_string()))); - } - - #[test] - fn rdata_to_return_record_cname() { - let func = rdata_to_return_record(RecordType::CNAME); - let rdata = RData::CNAME(Name::new()); - assert_eq!(func(&rdata), Some(DnsReturnRecord::Cname("".to_string()))); - } - - #[test] - fn rdata_to_return_record_mx() { - let func = rdata_to_return_record(RecordType::MX); - let rdata = RData::MX(MX::new(10, Name::new())); - assert_eq!( - func(&rdata), - Some(DnsReturnRecord::Mx { - preference: 10, - exchange: "".to_string() - }) - ); - } - - #[test] - fn rdata_to_return_record_ptr() { - let func = rdata_to_return_record(RecordType::PTR); - let rdata = RData::PTR(Name::new()); - assert_eq!(func(&rdata), Some(DnsReturnRecord::Ptr("".to_string()))); - } - - #[test] - fn rdata_to_return_record_srv() { - let func = rdata_to_return_record(RecordType::SRV); - let rdata = RData::SRV(SRV::new(1, 2, 3, Name::new())); - assert_eq!( - func(&rdata), - Some(DnsReturnRecord::Srv { - priority: 1, - weight: 2, - port: 3, - target: "".to_string() - }) - ); - } - - #[test] - fn rdata_to_return_record_txt() { - let func = rdata_to_return_record(RecordType::TXT); - let rdata = RData::TXT(TXT::from_bytes(vec![ - "foo".as_bytes(), - "bar".as_bytes(), - &[0xa3], // "£" in Latin-1 - &[0xe3, 0x81, 0x82], // "あ" in UTF-8 - ])); - assert_eq!( - func(&rdata), - Some(DnsReturnRecord::Txt(vec![ - "foo".to_string(), - "bar".to_string(), - "£".to_string(), - "ã\u{81}\u{82}".to_string(), - ])) - ); - } -} diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs deleted file mode 100644 index d56dc76d9..000000000 --- a/runtime/ops/net_unix.rs +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -use super::utils::into_string; -use crate::ops::io::UnixStreamResource; -use crate::ops::net::AcceptArgs; -use crate::ops::net::OpAddr; -use crate::ops::net::OpConn; -use crate::ops::net::OpPacket; -use crate::ops::net::ReceiveArgs; -use deno_core::error::bad_resource; -use deno_core::error::custom_error; -use deno_core::error::null_opbuf; -use deno_core::error::AnyError; -use deno_core::AsyncRefCell; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use serde::Serialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::fs::remove_file; -use std::path::Path; -use std::rc::Rc; -use tokio::net::UnixDatagram; -use tokio::net::UnixListener; -pub use tokio::net::UnixStream; - -struct UnixListenerResource { - listener: AsyncRefCell<UnixListener>, - cancel: CancelHandle, -} - -impl Resource for UnixListenerResource { - fn name(&self) -> Cow<str> { - "unixListener".into() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel(); - } -} - -pub struct UnixDatagramResource { - pub socket: AsyncRefCell<UnixDatagram>, - pub cancel: CancelHandle, -} - -impl Resource for UnixDatagramResource { - fn name(&self) -> Cow<str> { - "unixDatagram".into() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel(); - } -} - -#[derive(Serialize)] -pub struct UnixAddr { - pub path: Option<String>, -} - -#[derive(Deserialize)] -pub struct UnixListenArgs { - pub path: String, -} - -pub(crate) async fn accept_unix( - state: Rc<RefCell<OpState>>, - args: AcceptArgs, - _: (), -) -> Result<OpConn, AnyError> { - let rid = args.rid; - - let resource = state - .borrow() - .resource_table - .get::<UnixListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = RcRef::map(&resource, |r| &r.listener) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Listener already in use"))?; - let cancel = RcRef::map(resource, |r| &r.cancel); - let (unix_stream, _socket_addr) = - listener.accept().try_or_cancel(cancel).await?; - - let local_addr = unix_stream.local_addr()?; - let remote_addr = unix_stream.peer_addr()?; - let resource = UnixStreamResource::new(unix_stream.into_split()); - let mut state = state.borrow_mut(); - let rid = state.resource_table.add(resource); - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Unix(UnixAddr { - path: local_addr.as_pathname().and_then(pathstring), - })), - remote_addr: Some(OpAddr::Unix(UnixAddr { - path: remote_addr.as_pathname().and_then(pathstring), - })), - }) -} - -pub(crate) async fn receive_unix_packet( - state: Rc<RefCell<OpState>>, - args: ReceiveArgs, - buf: Option<ZeroCopyBuf>, -) -> Result<OpPacket, AnyError> { - let mut buf = buf.ok_or_else(null_opbuf)?; - - let rid = args.rid; - - let resource = state - .borrow() - .resource_table - .get::<UnixDatagramResource>(rid) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - let socket = RcRef::map(&resource, |r| &r.socket) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; - let cancel = RcRef::map(resource, |r| &r.cancel); - let (size, remote_addr) = - socket.recv_from(&mut buf).try_or_cancel(cancel).await?; - Ok(OpPacket { - size, - remote_addr: OpAddr::UnixPacket(UnixAddr { - path: remote_addr.as_pathname().and_then(pathstring), - }), - }) -} - -pub fn listen_unix( - state: &mut OpState, - addr: &Path, -) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { - if addr.exists() { - remove_file(&addr).unwrap(); - } - let listener = UnixListener::bind(&addr)?; - let local_addr = listener.local_addr()?; - let listener_resource = UnixListenerResource { - listener: AsyncRefCell::new(listener), - cancel: Default::default(), - }; - let rid = state.resource_table.add(listener_resource); - - Ok((rid, local_addr)) -} - -pub fn listen_unix_packet( - state: &mut OpState, - addr: &Path, -) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> { - if addr.exists() { - remove_file(&addr).unwrap(); - } - let socket = UnixDatagram::bind(&addr)?; - let local_addr = socket.local_addr()?; - let datagram_resource = UnixDatagramResource { - socket: AsyncRefCell::new(socket), - cancel: Default::default(), - }; - let rid = state.resource_table.add(datagram_resource); - - Ok((rid, local_addr)) -} - -pub fn pathstring(pathname: &Path) -> Option<String> { - into_string(pathname.into()).ok() -} diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs deleted file mode 100644 index c3f554856..000000000 --- a/runtime/ops/tls.rs +++ /dev/null @@ -1,1017 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -pub use rustls; -pub use webpki; - -use crate::ops::io::TcpStreamResource; -use crate::ops::io::TlsStreamResource; -use crate::ops::net::IpAddr; -use crate::ops::net::OpAddr; -use crate::ops::net::OpConn; -use crate::permissions::Permissions; -use crate::resolve_addr::resolve_addr; -use crate::resolve_addr::resolve_addr_sync; -use deno_core::error::bad_resource; -use deno_core::error::bad_resource_id; -use deno_core::error::custom_error; -use deno_core::error::generic_error; -use deno_core::error::invalid_hostname; -use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::futures::ready; -use deno_core::futures::task::noop_waker_ref; -use deno_core::futures::task::AtomicWaker; -use deno_core::futures::task::Context; -use deno_core::futures::task::Poll; -use deno_core::futures::task::RawWaker; -use deno_core::futures::task::RawWakerVTable; -use deno_core::futures::task::Waker; -use deno_core::op_async; -use deno_core::op_sync; -use deno_core::AsyncRefCell; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::Extension; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use io::Error; -use io::Read; -use io::Write; -use rustls::internal::pemfile::certs; -use rustls::internal::pemfile::pkcs8_private_keys; -use rustls::internal::pemfile::rsa_private_keys; -use rustls::Certificate; -use rustls::ClientConfig; -use rustls::ClientSession; -use rustls::NoClientAuth; -use rustls::PrivateKey; -use rustls::ServerConfig; -use rustls::ServerSession; -use rustls::Session; -use rustls::StoresClientSessions; -use serde::Deserialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::collections::HashMap; -use std::convert::From; -use std::fs::File; -use std::io; -use std::io::BufReader; -use std::io::ErrorKind; -use std::ops::Deref; -use std::ops::DerefMut; -use std::path::Path; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::Arc; -use std::sync::Mutex; -use std::sync::Weak; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::io::ReadBuf; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::task::spawn_local; -use webpki::DNSNameRef; - -lazy_static::lazy_static! { - static ref CLIENT_SESSION_MEMORY_CACHE: Arc<ClientSessionMemoryCache> = - Arc::new(ClientSessionMemoryCache::default()); -} - -#[derive(Default)] -struct ClientSessionMemoryCache(Mutex<HashMap<Vec<u8>, Vec<u8>>>); - -impl StoresClientSessions for ClientSessionMemoryCache { - fn get(&self, key: &[u8]) -> Option<Vec<u8>> { - self.0.lock().unwrap().get(key).cloned() - } - - fn put(&self, key: Vec<u8>, value: Vec<u8>) -> bool { - let mut sessions = self.0.lock().unwrap(); - // TODO(bnoordhuis) Evict sessions LRU-style instead of arbitrarily. - while sessions.len() >= 1024 { - let key = sessions.keys().next().unwrap().clone(); - sessions.remove(&key); - } - sessions.insert(key, value); - true - } -} - -#[derive(Debug)] -enum TlsSession { - Client(ClientSession), - Server(ServerSession), -} - -impl Deref for TlsSession { - type Target = dyn Session; - - fn deref(&self) -> &Self::Target { - match self { - TlsSession::Client(client_session) => client_session, - TlsSession::Server(server_session) => server_session, - } - } -} - -impl DerefMut for TlsSession { - fn deref_mut(&mut self) -> &mut Self::Target { - match self { - TlsSession::Client(client_session) => client_session, - TlsSession::Server(server_session) => server_session, - } - } -} - -impl From<ClientSession> for TlsSession { - fn from(client_session: ClientSession) -> Self { - TlsSession::Client(client_session) - } -} - -impl From<ServerSession> for TlsSession { - fn from(server_session: ServerSession) -> Self { - TlsSession::Server(server_session) - } -} - -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -enum Flow { - Read, - Write, -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -enum State { - StreamOpen, - StreamClosed, - TlsClosing, - TlsClosed, - TcpClosed, -} - -#[derive(Debug)] -pub struct TlsStream(Option<TlsStreamInner>); - -impl TlsStream { - fn new(tcp: TcpStream, tls: TlsSession) -> Self { - let inner = TlsStreamInner { - tcp, - tls, - rd_state: State::StreamOpen, - wr_state: State::StreamOpen, - }; - Self(Some(inner)) - } - - pub fn new_client_side( - tcp: TcpStream, - tls_config: &Arc<ClientConfig>, - hostname: DNSNameRef, - ) -> Self { - let tls = TlsSession::Client(ClientSession::new(tls_config, hostname)); - Self::new(tcp, tls) - } - - pub fn new_server_side( - tcp: TcpStream, - tls_config: &Arc<ServerConfig>, - ) -> Self { - let tls = TlsSession::Server(ServerSession::new(tls_config)); - Self::new(tcp, tls) - } - - pub async fn handshake(&mut self) -> io::Result<()> { - poll_fn(|cx| self.inner_mut().poll_io(cx, Flow::Write)).await - } - - fn into_split(self) -> (ReadHalf, WriteHalf) { - let shared = Shared::new(self); - let rd = ReadHalf { - shared: shared.clone(), - }; - let wr = WriteHalf { shared }; - (rd, wr) - } - - /// Tokio-rustls compatibility: returns a reference to the underlying TCP - /// stream, and a reference to the Rustls `Session` object. - pub fn get_ref(&self) -> (&TcpStream, &dyn Session) { - let inner = self.0.as_ref().unwrap(); - (&inner.tcp, &*inner.tls) - } - - fn inner_mut(&mut self) -> &mut TlsStreamInner { - self.0.as_mut().unwrap() - } -} - -impl AsyncRead for TlsStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - self.inner_mut().poll_read(cx, buf) - } -} - -impl AsyncWrite for TlsStream { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - self.inner_mut().poll_write(cx, buf) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<()>> { - self.inner_mut().poll_io(cx, Flow::Write) - // The underlying TCP stream does not need to be flushed. - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<()>> { - self.inner_mut().poll_shutdown(cx) - } -} - -impl Drop for TlsStream { - fn drop(&mut self) { - let mut inner = self.0.take().unwrap(); - - let mut cx = Context::from_waker(noop_waker_ref()); - let use_linger_task = inner.poll_close(&mut cx).is_pending(); - - if use_linger_task { - spawn_local(poll_fn(move |cx| inner.poll_close(cx))); - } else if cfg!(debug_assertions) { - spawn_local(async {}); // Spawn dummy task to detect missing LocalSet. - } - } -} - -#[derive(Debug)] -pub struct TlsStreamInner { - tls: TlsSession, - tcp: TcpStream, - rd_state: State, - wr_state: State, -} - -impl TlsStreamInner { - fn poll_io( - &mut self, - cx: &mut Context<'_>, - flow: Flow, - ) -> Poll<io::Result<()>> { - loop { - let wr_ready = loop { - match self.wr_state { - _ if self.tls.is_handshaking() && !self.tls.wants_write() => { - break true; - } - _ if self.tls.is_handshaking() => {} - State::StreamOpen if !self.tls.wants_write() => break true, - State::StreamClosed => { - // Rustls will enqueue the 'CloseNotify' alert and send it after - // flusing the data that is already in the queue. - self.tls.send_close_notify(); - self.wr_state = State::TlsClosing; - continue; - } - State::TlsClosing if !self.tls.wants_write() => { - self.wr_state = State::TlsClosed; - continue; - } - // If a 'CloseNotify' alert sent by the remote end has been received, - // shut down the underlying TCP socket. Otherwise, consider polling - // done for the moment. - State::TlsClosed if self.rd_state < State::TlsClosed => break true, - State::TlsClosed - if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() => - { - break false; - } - State::TlsClosed => { - self.wr_state = State::TcpClosed; - continue; - } - State::TcpClosed => break true, - _ => {} - } - - // Poll whether there is space in the socket send buffer so we can flush - // the remaining outgoing ciphertext. - if self.tcp.poll_write_ready(cx)?.is_pending() { - break false; - } - - // Write ciphertext to the TCP socket. - let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp); - match self.tls.write_tls(&mut wrapped_tcp) { - Ok(0) => unreachable!(), - Ok(_) => {} - Err(err) if err.kind() == ErrorKind::WouldBlock => {} - Err(err) => return Poll::Ready(Err(err)), - } - }; - - let rd_ready = loop { - match self.rd_state { - State::TcpClosed if self.tls.is_handshaking() => { - let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof"); - return Poll::Ready(Err(err)); - } - _ if self.tls.is_handshaking() && !self.tls.wants_read() => { - break true; - } - _ if self.tls.is_handshaking() => {} - State::StreamOpen if !self.tls.wants_read() => break true, - State::StreamOpen => {} - State::StreamClosed if !self.tls.wants_read() => { - // Rustls has more incoming cleartext buffered up, but the TLS - // session is closing so this data will never be processed by the - // application layer. Just like what would happen if this were a raw - // TCP stream, don't gracefully end the TLS session, but abort it. - return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset))); - } - State::StreamClosed => {} - State::TlsClosed if self.wr_state == State::TcpClosed => { - // Wait for the remote end to gracefully close the TCP connection. - // TODO(piscisaureus): this is unnecessary; remove when stable. - } - _ => break true, - } - - if self.rd_state < State::TlsClosed { - // Do a zero-length plaintext read so we can detect the arrival of - // 'CloseNotify' messages, even if only the write half is open. - // Actually reading data from the socket is done in `poll_read()`. - match self.tls.read(&mut []) { - Ok(0) => {} - Err(err) if err.kind() == ErrorKind::ConnectionAborted => { - // `Session::read()` returns `ConnectionAborted` when a - // 'CloseNotify' alert has been received, which indicates that - // the remote peer wants to gracefully end the TLS session. - self.rd_state = State::TlsClosed; - continue; - } - Err(err) => return Poll::Ready(Err(err)), - _ => unreachable!(), - } - } - - // Poll whether more ciphertext is available in the socket receive - // buffer. - if self.tcp.poll_read_ready(cx)?.is_pending() { - break false; - } - - // Receive ciphertext from the socket. - let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp); - match self.tls.read_tls(&mut wrapped_tcp) { - Ok(0) => self.rd_state = State::TcpClosed, - Ok(_) => self - .tls - .process_new_packets() - .map_err(|err| Error::new(ErrorKind::InvalidData, err))?, - Err(err) if err.kind() == ErrorKind::WouldBlock => {} - Err(err) => return Poll::Ready(Err(err)), - } - }; - - if wr_ready { - if self.rd_state >= State::TlsClosed - && self.wr_state >= State::TlsClosed - && self.wr_state < State::TcpClosed - { - continue; - } - if self.tls.wants_write() { - continue; - } - } - - let io_ready = match flow { - _ if self.tls.is_handshaking() => false, - Flow::Read => rd_ready, - Flow::Write => wr_ready, - }; - return match io_ready { - false => Poll::Pending, - true => Poll::Ready(Ok(())), - }; - } - } - - fn poll_read( - &mut self, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - ready!(self.poll_io(cx, Flow::Read))?; - - if self.rd_state == State::StreamOpen { - let buf_slice = - unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) }; - let bytes_read = self.tls.read(buf_slice)?; - assert_ne!(bytes_read, 0); - unsafe { buf.assume_init(bytes_read) }; - buf.advance(bytes_read); - } - - Poll::Ready(Ok(())) - } - - fn poll_write( - &mut self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - if buf.is_empty() { - // Tokio-rustls compatibility: a zero byte write always succeeds. - Poll::Ready(Ok(0)) - } else if self.wr_state == State::StreamOpen { - // Flush Rustls' ciphertext send queue. - ready!(self.poll_io(cx, Flow::Write))?; - - // Copy data from `buf` to the Rustls cleartext send queue. - let bytes_written = self.tls.write(buf)?; - assert_ne!(bytes_written, 0); - - // Try to flush as much ciphertext as possible. However, since we just - // handed off at least some bytes to rustls, so we can't return - // `Poll::Pending()` any more: this would tell the caller that it should - // try to send those bytes again. - let _ = self.poll_io(cx, Flow::Write)?; - - Poll::Ready(Ok(bytes_written)) - } else { - // Return error if stream has been shut down for writing. - Poll::Ready(Err(ErrorKind::BrokenPipe.into())) - } - } - - fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - if self.wr_state == State::StreamOpen { - self.wr_state = State::StreamClosed; - } - - ready!(self.poll_io(cx, Flow::Write))?; - - // At minimum, a TLS 'CloseNotify' alert should have been sent. - assert!(self.wr_state >= State::TlsClosed); - // If we received a TLS 'CloseNotify' alert from the remote end - // already, the TCP socket should be shut down at this point. - assert!( - self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed - ); - - Poll::Ready(Ok(())) - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - if self.rd_state == State::StreamOpen { - self.rd_state = State::StreamClosed; - } - - // Send TLS 'CloseNotify' alert. - ready!(self.poll_shutdown(cx))?; - // Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet. - ready!(self.poll_io(cx, Flow::Read))?; - - assert_eq!(self.rd_state, State::TcpClosed); - assert_eq!(self.wr_state, State::TcpClosed); - - Poll::Ready(Ok(())) - } -} - -#[derive(Debug)] -pub struct ReadHalf { - shared: Arc<Shared>, -} - -impl ReadHalf { - pub fn reunite(self, wr: WriteHalf) -> TlsStream { - assert!(Arc::ptr_eq(&self.shared, &wr.shared)); - drop(wr); // Drop `wr`, so only one strong reference to `shared` remains. - - Arc::try_unwrap(self.shared) - .unwrap_or_else(|_| panic!("Arc::<Shared>::try_unwrap() failed")) - .tls_stream - .into_inner() - .unwrap() - } -} - -impl AsyncRead for ReadHalf { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - self - .shared - .poll_with_shared_waker(cx, Flow::Read, move |tls, cx| { - tls.poll_read(cx, buf) - }) - } -} - -#[derive(Debug)] -pub struct WriteHalf { - shared: Arc<Shared>, -} - -impl AsyncWrite for WriteHalf { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - self - .shared - .poll_with_shared_waker(cx, Flow::Write, move |tls, cx| { - tls.poll_write(cx, buf) - }) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<()>> { - self - .shared - .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx)) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<()>> { - self - .shared - .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx)) - } -} - -#[derive(Debug)] -struct Shared { - tls_stream: Mutex<TlsStream>, - rd_waker: AtomicWaker, - wr_waker: AtomicWaker, -} - -impl Shared { - fn new(tls_stream: TlsStream) -> Arc<Self> { - let self_ = Self { - tls_stream: Mutex::new(tls_stream), - rd_waker: AtomicWaker::new(), - wr_waker: AtomicWaker::new(), - }; - Arc::new(self_) - } - - fn poll_with_shared_waker<R>( - self: &Arc<Self>, - cx: &mut Context<'_>, - flow: Flow, - mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R, - ) -> R { - match flow { - Flow::Read => self.rd_waker.register(cx.waker()), - Flow::Write => self.wr_waker.register(cx.waker()), - } - - let shared_waker = self.new_shared_waker(); - let mut cx = Context::from_waker(&shared_waker); - - let mut tls_stream = self.tls_stream.lock().unwrap(); - f(Pin::new(&mut tls_stream), &mut cx) - } - - const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - Self::clone_shared_waker, - Self::wake_shared_waker, - Self::wake_shared_waker_by_ref, - Self::drop_shared_waker, - ); - - fn new_shared_waker(self: &Arc<Self>) -> Waker { - let self_weak = Arc::downgrade(self); - let self_ptr = self_weak.into_raw() as *const (); - let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE); - unsafe { Waker::from_raw(raw_waker) } - } - - fn clone_shared_waker(self_ptr: *const ()) -> RawWaker { - let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; - let ptr1 = self_weak.clone().into_raw(); - let ptr2 = self_weak.into_raw(); - assert!(ptr1 == ptr2); - RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE) - } - - fn wake_shared_waker(self_ptr: *const ()) { - Self::wake_shared_waker_by_ref(self_ptr); - Self::drop_shared_waker(self_ptr); - } - - fn wake_shared_waker_by_ref(self_ptr: *const ()) { - let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) }; - if let Some(self_arc) = Weak::upgrade(&self_weak) { - self_arc.rd_waker.wake(); - self_arc.wr_waker.wake(); - } - self_weak.into_raw(); - } - - fn drop_shared_waker(self_ptr: *const ()) { - let _ = unsafe { Weak::from_raw(self_ptr as *const Self) }; - } -} - -struct ImplementReadTrait<'a, T>(&'a mut T); - -impl Read for ImplementReadTrait<'_, TcpStream> { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.0.try_read(buf) - } -} - -struct ImplementWriteTrait<'a, T>(&'a mut T); - -impl Write for ImplementWriteTrait<'_, TcpStream> { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.0.try_write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -pub fn init() -> Extension { - Extension::builder() - .ops(vec![ - ("op_start_tls", op_async(op_start_tls)), - ("op_connect_tls", op_async(op_connect_tls)), - ("op_listen_tls", op_sync(op_listen_tls)), - ("op_accept_tls", op_async(op_accept_tls)), - ]) - .build() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ConnectTlsArgs { - transport: String, - hostname: String, - port: u16, - cert_file: Option<String>, -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct StartTlsArgs { - rid: ResourceId, - cert_file: Option<String>, - hostname: String, -} - -async fn op_start_tls( - state: Rc<RefCell<OpState>>, - args: StartTlsArgs, - _: (), -) -> Result<OpConn, AnyError> { - let rid = args.rid; - let hostname = match &*args.hostname { - "" => "localhost", - n => n, - }; - let cert_file = args.cert_file.as_deref(); - - { - super::check_unstable2(&state, "Deno.startTls"); - let mut s = state.borrow_mut(); - let permissions = s.borrow_mut::<Permissions>(); - permissions.net.check(&(hostname, Some(0)))?; - if let Some(path) = cert_file { - permissions.read.check(Path::new(path))?; - } - } - - let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) - .map_err(|_| invalid_hostname(hostname))?; - - let resource_rc = state - .borrow_mut() - .resource_table - .take::<TcpStreamResource>(rid) - .ok_or_else(bad_resource_id)?; - let resource = Rc::try_unwrap(resource_rc) - .expect("Only a single use of this resource should happen"); - let (read_half, write_half) = resource.into_inner(); - let tcp_stream = read_half.reunite(write_half)?; - - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut tls_config = ClientConfig::new(); - tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone()); - tls_config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - if let Some(path) = cert_file { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - tls_config.root_store.add_pem_file(reader).unwrap(); - } - let tls_config = Arc::new(tls_config); - - let tls_stream = - TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); - - let rid = { - let mut state_ = state.borrow_mut(); - state_ - .resource_table - .add(TlsStreamResource::new(tls_stream.into_split())) - }; - - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) -} - -async fn op_connect_tls( - state: Rc<RefCell<OpState>>, - args: ConnectTlsArgs, - _: (), -) -> Result<OpConn, AnyError> { - assert_eq!(args.transport, "tcp"); - let hostname = match &*args.hostname { - "" => "localhost", - n => n, - }; - let port = args.port; - let cert_file = args.cert_file.as_deref(); - - { - let mut s = state.borrow_mut(); - let permissions = s.borrow_mut::<Permissions>(); - permissions.net.check(&(hostname, Some(port)))?; - if let Some(path) = cert_file { - permissions.read.check(Path::new(path))?; - } - } - - let hostname_dns = DNSNameRef::try_from_ascii_str(hostname) - .map_err(|_| invalid_hostname(hostname))?; - - let connect_addr = resolve_addr(hostname, port) - .await? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let tcp_stream = TcpStream::connect(connect_addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut tls_config = ClientConfig::new(); - tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone()); - tls_config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - if let Some(path) = cert_file { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - tls_config.root_store.add_pem_file(reader).unwrap(); - } - let tls_config = Arc::new(tls_config); - - let tls_stream = - TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns); - - let rid = { - let mut state_ = state.borrow_mut(); - state_ - .resource_table - .add(TlsStreamResource::new(tls_stream.into_split())) - }; - - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) -} - -fn load_certs(path: &str) -> Result<Vec<Certificate>, AnyError> { - let cert_file = File::open(path)?; - let reader = &mut BufReader::new(cert_file); - - let certs = certs(reader) - .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?; - - if certs.is_empty() { - let e = custom_error("InvalidData", "No certificates found in cert file"); - return Err(e); - } - - Ok(certs) -} - -fn key_decode_err() -> AnyError { - custom_error("InvalidData", "Unable to decode key") -} - -fn key_not_found_err() -> AnyError { - custom_error("InvalidData", "No keys found in key file") -} - -/// Starts with -----BEGIN RSA PRIVATE KEY----- -fn load_rsa_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - let keys = rsa_private_keys(reader).map_err(|_| key_decode_err())?; - Ok(keys) -} - -/// Starts with -----BEGIN PRIVATE KEY----- -fn load_pkcs8_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - let keys = pkcs8_private_keys(reader).map_err(|_| key_decode_err())?; - Ok(keys) -} - -fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { - let path = path.to_string(); - let mut keys = load_rsa_keys(&path)?; - - if keys.is_empty() { - keys = load_pkcs8_keys(&path)?; - } - - if keys.is_empty() { - return Err(key_not_found_err()); - } - - Ok(keys) -} - -pub struct TlsListenerResource { - tcp_listener: AsyncRefCell<TcpListener>, - tls_config: Arc<ServerConfig>, - cancel_handle: CancelHandle, -} - -impl Resource for TlsListenerResource { - fn name(&self) -> Cow<str> { - "tlsListener".into() - } - - fn close(self: Rc<Self>) { - self.cancel_handle.cancel(); - } -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ListenTlsArgs { - transport: String, - hostname: String, - port: u16, - cert_file: String, - key_file: String, - alpn_protocols: Option<Vec<String>>, -} - -fn op_listen_tls( - state: &mut OpState, - args: ListenTlsArgs, - _: (), -) -> Result<OpConn, AnyError> { - assert_eq!(args.transport, "tcp"); - let hostname = &*args.hostname; - let port = args.port; - let cert_file = &*args.cert_file; - let key_file = &*args.key_file; - - { - let permissions = state.borrow_mut::<Permissions>(); - permissions.net.check(&(hostname, Some(port)))?; - permissions.read.check(Path::new(cert_file))?; - permissions.read.check(Path::new(key_file))?; - } - - let mut tls_config = ServerConfig::new(NoClientAuth::new()); - if let Some(alpn_protocols) = args.alpn_protocols { - super::check_unstable(state, "Deno.listenTls#alpn_protocols"); - tls_config.alpn_protocols = - alpn_protocols.into_iter().map(|s| s.into_bytes()).collect(); - } - tls_config - .set_single_cert(load_certs(cert_file)?, load_keys(key_file)?.remove(0)) - .expect("invalid key or certificate"); - - let bind_addr = resolve_addr_sync(hostname, port)? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let std_listener = std::net::TcpListener::bind(bind_addr)?; - std_listener.set_nonblocking(true)?; - let tcp_listener = TcpListener::from_std(std_listener)?; - let local_addr = tcp_listener.local_addr()?; - - let tls_listener_resource = TlsListenerResource { - tcp_listener: AsyncRefCell::new(tcp_listener), - tls_config: Arc::new(tls_config), - cancel_handle: Default::default(), - }; - - let rid = state.resource_table.add(tls_listener_resource); - - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: None, - }) -} - -async fn op_accept_tls( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - _: (), -) -> Result<OpConn, AnyError> { - let resource = state - .borrow() - .resource_table - .get::<TlsListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - - let cancel_handle = RcRef::map(&resource, |r| &r.cancel_handle); - let tcp_listener = RcRef::map(&resource, |r| &r.tcp_listener) - .try_borrow_mut() - .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; - - let (tcp_stream, remote_addr) = - match tcp_listener.accept().try_or_cancel(&cancel_handle).await { - Ok(tuple) => tuple, - Err(err) if err.kind() == ErrorKind::Interrupted => { - // FIXME(bartlomieju): compatibility with current JS implementation. - return Err(bad_resource("Listener has been closed")); - } - Err(err) => return Err(err.into()), - }; - - let local_addr = tcp_stream.local_addr()?; - - let tls_stream = TlsStream::new_server_side(tcp_stream, &resource.tls_config); - - let rid = { - let mut state_ = state.borrow_mut(); - state_ - .resource_table - .add(TlsStreamResource::new(tls_stream.into_split())) - }; - - Ok(OpConn { - rid, - local_addr: Some(OpAddr::Tcp(IpAddr { - hostname: local_addr.ip().to_string(), - port: local_addr.port(), - })), - remote_addr: Some(OpAddr::Tcp(IpAddr { - hostname: remote_addr.ip().to_string(), - port: remote_addr.port(), - })), - }) -} |