summaryrefslogtreecommitdiff
path: root/runtime/ops
diff options
context:
space:
mode:
authorBartek Iwańczuk <biwanczuk@gmail.com>2021-06-29 01:43:03 +0200
committerGitHub <noreply@github.com>2021-06-29 01:43:03 +0200
commit38a7128cdd6f3308ba3c13cfb0b0d4ef925a44c3 (patch)
tree8f0c86028d9ba0266f1846e7f3611f7049cb43a8 /runtime/ops
parent30cba2484815f712502ae8937a25afa13aba0818 (diff)
feat: Add "deno_net" extension (#11150)
This commits moves implementation of net related APIs available on "Deno" namespace to "deno_net" extension. Following APIs were moved: - Deno.listen() - Deno.connect() - Deno.listenTls() - Deno.serveHttp() - Deno.shutdown() - Deno.resolveDns() - Deno.listenDatagram() - Deno.startTls() - Deno.Conn - Deno.Listener - Deno.DatagramConn
Diffstat (limited to 'runtime/ops')
-rw-r--r--runtime/ops/http.rs579
-rw-r--r--runtime/ops/io.rs129
-rw-r--r--runtime/ops/mod.rs5
-rw-r--r--runtime/ops/net.rs793
-rw-r--r--runtime/ops/net_unix.rs173
-rw-r--r--runtime/ops/tls.rs1017
6 files changed, 3 insertions, 2693 deletions
diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs
deleted file mode 100644
index 01658c802..000000000
--- a/runtime/ops/http.rs
+++ /dev/null
@@ -1,579 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-
-use crate::ops::io::TcpStreamResource;
-use crate::ops::io::TlsStreamResource;
-use crate::ops::tls::TlsStream;
-use deno_core::error::bad_resource_id;
-use deno_core::error::null_opbuf;
-use deno_core::error::type_error;
-use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
-use deno_core::futures::FutureExt;
-use deno_core::futures::Stream;
-use deno_core::futures::StreamExt;
-use deno_core::op_async;
-use deno_core::op_sync;
-use deno_core::AsyncRefCell;
-use deno_core::ByteString;
-use deno_core::CancelHandle;
-use deno_core::CancelTryFuture;
-use deno_core::Extension;
-use deno_core::OpState;
-use deno_core::RcRef;
-use deno_core::Resource;
-use deno_core::ResourceId;
-use deno_core::ZeroCopyBuf;
-use hyper::body::HttpBody;
-use hyper::http;
-use hyper::server::conn::Connection;
-use hyper::server::conn::Http;
-use hyper::service::Service as HyperService;
-use hyper::Body;
-use hyper::Request;
-use hyper::Response;
-use serde::Deserialize;
-use serde::Serialize;
-use std::borrow::Cow;
-use std::cell::RefCell;
-use std::future::Future;
-use std::net::SocketAddr;
-use std::pin::Pin;
-use std::rc::Rc;
-use std::task::Context;
-use std::task::Poll;
-use tokio::io::AsyncReadExt;
-use tokio::net::TcpStream;
-use tokio::sync::oneshot;
-use tokio_util::io::StreamReader;
-
-pub fn init() -> Extension {
- Extension::builder()
- .ops(vec![
- ("op_http_start", op_sync(op_http_start)),
- ("op_http_request_next", op_async(op_http_request_next)),
- ("op_http_request_read", op_async(op_http_request_read)),
- ("op_http_response", op_async(op_http_response)),
- ("op_http_response_write", op_async(op_http_response_write)),
- ("op_http_response_close", op_async(op_http_response_close)),
- ])
- .build()
-}
-
-struct ServiceInner {
- request: Request<Body>,
- response_tx: oneshot::Sender<Response<Body>>,
-}
-
-#[derive(Clone, Default)]
-struct Service {
- inner: Rc<RefCell<Option<ServiceInner>>>,
- waker: Rc<deno_core::futures::task::AtomicWaker>,
-}
-
-impl HyperService<Request<Body>> for Service {
- type Response = Response<Body>;
- type Error = http::Error;
- #[allow(clippy::type_complexity)]
- type Future =
- Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
-
- fn poll_ready(
- &mut self,
- _cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
- if self.inner.borrow().is_some() {
- Poll::Pending
- } else {
- Poll::Ready(Ok(()))
- }
- }
-
- fn call(&mut self, req: Request<Body>) -> Self::Future {
- let (resp_tx, resp_rx) = oneshot::channel();
- self.inner.borrow_mut().replace(ServiceInner {
- request: req,
- response_tx: resp_tx,
- });
-
- async move { Ok(resp_rx.await.unwrap()) }.boxed_local()
- }
-}
-
-enum ConnType {
- Tcp(Rc<RefCell<Connection<TcpStream, Service, LocalExecutor>>>),
- Tls(Rc<RefCell<Connection<TlsStream, Service, LocalExecutor>>>),
-}
-
-struct ConnResource {
- hyper_connection: ConnType,
- deno_service: Service,
- addr: SocketAddr,
- cancel: CancelHandle,
-}
-
-impl ConnResource {
- // TODO(ry) impl Future for ConnResource?
- fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> {
- match &self.hyper_connection {
- ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx),
- ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx),
- }
- .map_err(AnyError::from)
- }
-}
-
-impl Resource for ConnResource {
- fn name(&self) -> Cow<str> {
- "httpConnection".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel()
- }
-}
-
-// We use a tuple instead of struct to avoid serialization overhead of the keys.
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct NextRequestResponse(
- // request_body_rid:
- Option<ResourceId>,
- // response_sender_rid:
- ResourceId,
- // method:
- // This is a String rather than a ByteString because reqwest will only return
- // the method as a str which is guaranteed to be ASCII-only.
- String,
- // headers:
- Vec<(ByteString, ByteString)>,
- // url:
- String,
-);
-
-async fn op_http_request_next(
- state: Rc<RefCell<OpState>>,
- conn_rid: ResourceId,
- _: (),
-) -> Result<Option<NextRequestResponse>, AnyError> {
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel);
-
- poll_fn(|cx| {
- conn_resource.deno_service.waker.register(cx.waker());
- let connection_closed = match conn_resource.poll(cx) {
- Poll::Pending => false,
- Poll::Ready(Ok(())) => {
- // try to close ConnResource, but don't unwrap as it might
- // already be closed
- let _ = state
- .borrow_mut()
- .resource_table
- .take::<ConnResource>(conn_rid);
- true
- }
- Poll::Ready(Err(e)) => {
- // TODO(ry) close RequestResource associated with connection
- // TODO(ry) close ResponseBodyResource associated with connection
- // close ConnResource
- state
- .borrow_mut()
- .resource_table
- .take::<ConnResource>(conn_rid)
- .unwrap();
-
- if should_ignore_error(&e) {
- true
- } else {
- return Poll::Ready(Err(e));
- }
- }
- };
- if let Some(request_resource) =
- conn_resource.deno_service.inner.borrow_mut().take()
- {
- let tx = request_resource.response_tx;
- let req = request_resource.request;
- let method = req.method().to_string();
-
- let mut headers = Vec::with_capacity(req.headers().len());
- for (name, value) in req.headers().iter() {
- let name: &[u8] = name.as_ref();
- let value = value.as_bytes();
- headers
- .push((ByteString(name.to_owned()), ByteString(value.to_owned())));
- }
-
- let url = {
- let scheme = {
- match conn_resource.hyper_connection {
- ConnType::Tcp(_) => "http",
- ConnType::Tls(_) => "https",
- }
- };
- let host: Cow<str> = if let Some(host) = req.uri().host() {
- Cow::Borrowed(host)
- } else if let Some(host) = req.headers().get("HOST") {
- Cow::Borrowed(host.to_str()?)
- } else {
- Cow::Owned(conn_resource.addr.to_string())
- };
- let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
- format!("{}://{}{}", scheme, host, path)
- };
-
- let has_body = if let Some(exact_size) = req.size_hint().exact() {
- exact_size > 0
- } else {
- true
- };
-
- let maybe_request_body_rid = if has_body {
- let stream: BytesStream = Box::pin(req.into_body().map(|r| {
- r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
- }));
- let stream_reader = StreamReader::new(stream);
- let mut state = state.borrow_mut();
- let request_body_rid = state.resource_table.add(RequestBodyResource {
- conn_rid,
- reader: AsyncRefCell::new(stream_reader),
- cancel: CancelHandle::default(),
- });
- Some(request_body_rid)
- } else {
- None
- };
-
- let mut state = state.borrow_mut();
- let response_sender_rid =
- state.resource_table.add(ResponseSenderResource {
- sender: tx,
- conn_rid,
- });
-
- Poll::Ready(Ok(Some(NextRequestResponse(
- maybe_request_body_rid,
- response_sender_rid,
- method,
- headers,
- url,
- ))))
- } else if connection_closed {
- Poll::Ready(Ok(None))
- } else {
- Poll::Pending
- }
- })
- .try_or_cancel(cancel)
- .await
- .map_err(AnyError::from)
-}
-
-fn should_ignore_error(e: &AnyError) -> bool {
- if let Some(e) = e.downcast_ref::<hyper::Error>() {
- use std::error::Error;
- if let Some(std_err) = e.source() {
- if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() {
- if io_err.kind() == std::io::ErrorKind::NotConnected {
- return true;
- }
- }
- }
- }
- false
-}
-
-fn op_http_start(
- state: &mut OpState,
- tcp_stream_rid: ResourceId,
- _: (),
-) -> Result<ResourceId, AnyError> {
- let deno_service = Service::default();
-
- if let Some(resource_rc) = state
- .resource_table
- .take::<TcpStreamResource>(tcp_stream_rid)
- {
- let resource = Rc::try_unwrap(resource_rc)
- .expect("Only a single use of this resource should happen");
- let (read_half, write_half) = resource.into_inner();
- let tcp_stream = read_half.reunite(write_half)?;
- let addr = tcp_stream.local_addr()?;
- let hyper_connection = Http::new()
- .with_executor(LocalExecutor)
- .serve_connection(tcp_stream, deno_service.clone());
- let conn_resource = ConnResource {
- hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))),
- deno_service,
- addr,
- cancel: CancelHandle::default(),
- };
- let rid = state.resource_table.add(conn_resource);
- return Ok(rid);
- }
-
- if let Some(resource_rc) = state
- .resource_table
- .take::<TlsStreamResource>(tcp_stream_rid)
- {
- let resource = Rc::try_unwrap(resource_rc)
- .expect("Only a single use of this resource should happen");
- let (read_half, write_half) = resource.into_inner();
- let tls_stream = read_half.reunite(write_half);
- let addr = tls_stream.get_ref().0.local_addr()?;
-
- let hyper_connection = Http::new()
- .with_executor(LocalExecutor)
- .serve_connection(tls_stream, deno_service.clone());
- let conn_resource = ConnResource {
- hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))),
- deno_service,
- addr,
- cancel: CancelHandle::default(),
- };
- let rid = state.resource_table.add(conn_resource);
- return Ok(rid);
- }
-
- Err(bad_resource_id())
-}
-
-// We use a tuple instead of struct to avoid serialization overhead of the keys.
-#[derive(Deserialize)]
-struct RespondArgs(
- // rid:
- u32,
- // status:
- u16,
- // headers:
- Vec<(ByteString, ByteString)>,
-);
-
-async fn op_http_response(
- state: Rc<RefCell<OpState>>,
- args: RespondArgs,
- data: Option<ZeroCopyBuf>,
-) -> Result<Option<ResourceId>, AnyError> {
- let RespondArgs(rid, status, headers) = args;
-
- let response_sender = state
- .borrow_mut()
- .resource_table
- .take::<ResponseSenderResource>(rid)
- .ok_or_else(bad_resource_id)?;
- let response_sender = Rc::try_unwrap(response_sender)
- .ok()
- .expect("multiple op_http_respond ongoing");
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(response_sender.conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let mut builder = Response::builder().status(status);
-
- builder.headers_mut().unwrap().reserve(headers.len());
- for (key, value) in &headers {
- builder = builder.header(key.as_ref(), value.as_ref());
- }
-
- let res;
- let maybe_response_body_rid = if let Some(d) = data {
- // If a body is passed, we use it, and don't return a body for streaming.
- res = builder.body(Vec::from(&*d).into())?;
- None
- } else {
- // If no body is passed, we return a writer for streaming the body.
- let (sender, body) = Body::channel();
- res = builder.body(body)?;
-
- let response_body_rid =
- state.borrow_mut().resource_table.add(ResponseBodyResource {
- body: AsyncRefCell::new(sender),
- conn_rid: response_sender.conn_rid,
- });
-
- Some(response_body_rid)
- };
-
- // oneshot::Sender::send(v) returns |v| on error, not an error object.
- // The only failure mode is the receiver already having dropped its end
- // of the channel.
- if response_sender.sender.send(res).is_err() {
- return Err(type_error("internal communication error"));
- }
-
- poll_fn(|cx| match conn_resource.poll(cx) {
- Poll::Ready(x) => Poll::Ready(x),
- Poll::Pending => Poll::Ready(Ok(())),
- })
- .await?;
-
- if maybe_response_body_rid.is_none() {
- conn_resource.deno_service.waker.wake();
- }
- Ok(maybe_response_body_rid)
-}
-
-async fn op_http_response_close(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- _: (),
-) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .take::<ResponseBodyResource>(rid)
- .ok_or_else(bad_resource_id)?;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(resource.conn_rid)
- .ok_or_else(bad_resource_id)?;
- drop(resource);
-
- let r = poll_fn(|cx| match conn_resource.poll(cx) {
- Poll::Ready(x) => Poll::Ready(x),
- Poll::Pending => Poll::Ready(Ok(())),
- })
- .await;
- conn_resource.deno_service.waker.wake();
- r
-}
-
-async fn op_http_request_read(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: Option<ZeroCopyBuf>,
-) -> Result<usize, AnyError> {
- let mut data = data.ok_or_else(null_opbuf)?;
-
- let resource = state
- .borrow()
- .resource_table
- .get::<RequestBodyResource>(rid as u32)
- .ok_or_else(bad_resource_id)?;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(resource.conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
- let cancel = RcRef::map(resource, |r| &r.cancel);
- let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
-
- poll_fn(|cx| {
- if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
- // close ConnResource
- // close RequestResource associated with connection
- // close ResponseBodyResource associated with connection
- return Poll::Ready(Err(e));
- }
-
- read_fut.poll_unpin(cx).map_err(AnyError::from)
- })
- .await
-}
-
-async fn op_http_response_write(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: Option<ZeroCopyBuf>,
-) -> Result<(), AnyError> {
- let buf = data.ok_or_else(null_opbuf)?;
- let resource = state
- .borrow()
- .resource_table
- .get::<ResponseBodyResource>(rid as u32)
- .ok_or_else(bad_resource_id)?;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(resource.conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
-
- let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local();
-
- poll_fn(|cx| {
- let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from);
-
- // Poll connection so the data is flushed
- if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
- // close ConnResource
- // close RequestResource associated with connection
- // close ResponseBodyResource associated with connection
- return Poll::Ready(Err(e));
- }
-
- r
- })
- .await?;
-
- Ok(())
-}
-
-type BytesStream =
- Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;
-
-struct RequestBodyResource {
- conn_rid: ResourceId,
- reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
- cancel: CancelHandle,
-}
-
-impl Resource for RequestBodyResource {
- fn name(&self) -> Cow<str> {
- "requestBody".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel()
- }
-}
-
-struct ResponseSenderResource {
- sender: oneshot::Sender<Response<Body>>,
- conn_rid: ResourceId,
-}
-
-impl Resource for ResponseSenderResource {
- fn name(&self) -> Cow<str> {
- "responseSender".into()
- }
-}
-
-struct ResponseBodyResource {
- body: AsyncRefCell<hyper::body::Sender>,
- conn_rid: ResourceId,
-}
-
-impl Resource for ResponseBodyResource {
- fn name(&self) -> Cow<str> {
- "responseBody".into()
- }
-}
-
-// Needed so hyper can use non Send futures
-#[derive(Clone)]
-struct LocalExecutor;
-
-impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
-where
- Fut: Future + 'static,
- Fut::Output: 'static,
-{
- fn execute(&self, fut: Fut) {
- tokio::task::spawn_local(fut);
- }
-}
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
index 18279c0eb..e18846466 100644
--- a/runtime/ops/io.rs
+++ b/runtime/ops/io.rs
@@ -1,6 +1,5 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use crate::ops::tls;
use deno_core::error::null_opbuf;
use deno_core::error::resource_unavailable;
use deno_core::error::AnyError;
@@ -17,6 +16,9 @@ use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
+use deno_net::io::TcpStreamResource;
+use deno_net::io::TlsStreamResource;
+use deno_net::io::UnixStreamResource;
use std::borrow::Cow;
use std::cell::RefCell;
use std::io::Read;
@@ -26,13 +28,10 @@ use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
-use tokio::net::tcp;
use tokio::process;
#[cfg(unix)]
use std::os::unix::io::FromRawFd;
-#[cfg(unix)]
-use tokio::net::unix;
#[cfg(windows)]
use std::os::windows::io::FromRawHandle;
@@ -238,70 +237,6 @@ where
}
}
-/// A full duplex resource has a read and write ends that are completely
-/// independent, like TCP/Unix sockets and TLS streams.
-#[derive(Debug)]
-pub struct FullDuplexResource<R, W> {
- rd: AsyncRefCell<R>,
- wr: AsyncRefCell<W>,
- // When a full-duplex resource is closed, all pending 'read' ops are
- // canceled, while 'write' ops are allowed to complete. Therefore only
- // 'read' futures should be attached to this cancel handle.
- cancel_handle: CancelHandle,
-}
-
-impl<R, W> FullDuplexResource<R, W>
-where
- R: AsyncRead + Unpin + 'static,
- W: AsyncWrite + Unpin + 'static,
-{
- pub fn new((rd, wr): (R, W)) -> Self {
- Self {
- rd: rd.into(),
- wr: wr.into(),
- cancel_handle: Default::default(),
- }
- }
-
- pub fn into_inner(self) -> (R, W) {
- (self.rd.into_inner(), self.wr.into_inner())
- }
-
- pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> {
- RcRef::map(self, |r| &r.rd).borrow_mut()
- }
-
- pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> {
- RcRef::map(self, |r| &r.wr).borrow_mut()
- }
-
- pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
- RcRef::map(self, |r| &r.cancel_handle)
- }
-
- pub fn cancel_read_ops(&self) {
- self.cancel_handle.cancel()
- }
-
- async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
- let mut rd = self.rd_borrow_mut().await;
- let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?;
- Ok(nread)
- }
-
- async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
- let mut wr = self.wr_borrow_mut().await;
- let nwritten = wr.write(buf).await?;
- Ok(nwritten)
- }
-
- async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> {
- let mut wr = self.wr_borrow_mut().await;
- wr.shutdown().await?;
- Ok(())
- }
-}
-
pub type ChildStdinResource = WriteOnlyResource<process::ChildStdin>;
impl Resource for ChildStdinResource {
@@ -334,64 +269,6 @@ impl Resource for ChildStderrResource {
}
}
-pub type TcpStreamResource =
- FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>;
-
-impl Resource for TcpStreamResource {
- fn name(&self) -> Cow<str> {
- "tcpStream".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel_read_ops();
- }
-}
-
-pub type TlsStreamResource = FullDuplexResource<tls::ReadHalf, tls::WriteHalf>;
-
-impl Resource for TlsStreamResource {
- fn name(&self) -> Cow<str> {
- "tlsStream".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel_read_ops();
- }
-}
-
-#[cfg(unix)]
-pub type UnixStreamResource =
- FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>;
-
-#[cfg(not(unix))]
-struct UnixStreamResource;
-
-#[cfg(not(unix))]
-impl UnixStreamResource {
- async fn read(self: &Rc<Self>, _buf: &mut [u8]) -> Result<usize, AnyError> {
- unreachable!()
- }
- async fn write(self: &Rc<Self>, _buf: &[u8]) -> Result<usize, AnyError> {
- unreachable!()
- }
- async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> {
- unreachable!()
- }
- fn cancel_read_ops(&self) {
- unreachable!()
- }
-}
-
-impl Resource for UnixStreamResource {
- fn name(&self) -> Cow<str> {
- "unixStream".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel_read_ops();
- }
-}
-
#[derive(Debug, Default)]
pub struct StdFileResource {
pub fs_file:
diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs
index b05a91180..c94020780 100644
--- a/runtime/ops/mod.rs
+++ b/runtime/ops/mod.rs
@@ -2,18 +2,13 @@
pub mod fs;
pub mod fs_events;
-pub mod http;
pub mod io;
-pub mod net;
-#[cfg(unix)]
-mod net_unix;
pub mod os;
pub mod permissions;
pub mod plugin;
pub mod process;
pub mod runtime;
pub mod signal;
-pub mod tls;
pub mod tty;
mod utils;
pub mod web_worker;
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs
deleted file mode 100644
index c9195aab7..000000000
--- a/runtime/ops/net.rs
+++ /dev/null
@@ -1,793 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use crate::ops::io::TcpStreamResource;
-use crate::permissions::Permissions;
-use crate::resolve_addr::resolve_addr;
-use crate::resolve_addr::resolve_addr_sync;
-use deno_core::error::bad_resource;
-use deno_core::error::custom_error;
-use deno_core::error::generic_error;
-use deno_core::error::null_opbuf;
-use deno_core::error::type_error;
-use deno_core::error::AnyError;
-use deno_core::op_async;
-use deno_core::op_sync;
-use deno_core::AsyncRefCell;
-use deno_core::CancelHandle;
-use deno_core::CancelTryFuture;
-use deno_core::Extension;
-use deno_core::OpState;
-use deno_core::RcRef;
-use deno_core::Resource;
-use deno_core::ResourceId;
-use deno_core::ZeroCopyBuf;
-use log::debug;
-use serde::Deserialize;
-use serde::Serialize;
-use std::borrow::Cow;
-use std::cell::RefCell;
-use std::net::SocketAddr;
-use std::rc::Rc;
-use tokio::net::TcpListener;
-use tokio::net::TcpStream;
-use tokio::net::UdpSocket;
-use trust_dns_proto::rr::record_data::RData;
-use trust_dns_proto::rr::record_type::RecordType;
-use trust_dns_resolver::config::NameServerConfigGroup;
-use trust_dns_resolver::config::ResolverConfig;
-use trust_dns_resolver::config::ResolverOpts;
-use trust_dns_resolver::system_conf;
-use trust_dns_resolver::AsyncResolver;
-
-#[cfg(unix)]
-use super::net_unix;
-#[cfg(unix)]
-use crate::ops::io::UnixStreamResource;
-#[cfg(unix)]
-use std::path::Path;
-
-pub fn init() -> Extension {
- Extension::builder()
- .ops(vec![
- ("op_accept", op_async(op_accept)),
- ("op_connect", op_async(op_connect)),
- ("op_listen", op_sync(op_listen)),
- ("op_datagram_receive", op_async(op_datagram_receive)),
- ("op_datagram_send", op_async(op_datagram_send)),
- ("op_dns_resolve", op_async(op_dns_resolve)),
- ])
- .build()
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-pub struct OpConn {
- pub rid: ResourceId,
- pub remote_addr: Option<OpAddr>,
- pub local_addr: Option<OpAddr>,
-}
-
-#[derive(Serialize)]
-#[serde(tag = "transport", rename_all = "lowercase")]
-pub enum OpAddr {
- Tcp(IpAddr),
- Udp(IpAddr),
- #[cfg(unix)]
- Unix(net_unix::UnixAddr),
- #[cfg(unix)]
- UnixPacket(net_unix::UnixAddr),
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-/// A received datagram packet (from udp or unixpacket)
-pub struct OpPacket {
- pub size: usize,
- pub remote_addr: OpAddr,
-}
-
-#[derive(Serialize)]
-pub struct IpAddr {
- pub hostname: String,
- pub port: u16,
-}
-
-#[derive(Deserialize)]
-pub(crate) struct AcceptArgs {
- pub rid: ResourceId,
- pub transport: String,
-}
-
-async fn accept_tcp(
- state: Rc<RefCell<OpState>>,
- args: AcceptArgs,
- _: (),
-) -> Result<OpConn, AnyError> {
- let rid = args.rid;
-
- let resource = state
- .borrow()
- .resource_table
- .get::<TcpListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = RcRef::map(&resource, |r| &r.listener)
- .try_borrow_mut()
- .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
- let cancel = RcRef::map(resource, |r| &r.cancel);
- let (tcp_stream, _socket_addr) =
- listener.accept().try_or_cancel(cancel).await.map_err(|e| {
- // FIXME(bartlomieju): compatibility with current JS implementation
- if let std::io::ErrorKind::Interrupted = e.kind() {
- bad_resource("Listener has been closed")
- } else {
- e.into()
- }
- })?;
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
-
- let mut state = state.borrow_mut();
- let rid = state
- .resource_table
- .add(TcpStreamResource::new(tcp_stream.into_split()));
- Ok(OpConn {
- rid,
- local_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: local_addr.ip().to_string(),
- port: local_addr.port(),
- })),
- remote_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: remote_addr.ip().to_string(),
- port: remote_addr.port(),
- })),
- })
-}
-
-async fn op_accept(
- state: Rc<RefCell<OpState>>,
- args: AcceptArgs,
- _: (),
-) -> Result<OpConn, AnyError> {
- match args.transport.as_str() {
- "tcp" => accept_tcp(state, args, ()).await,
- #[cfg(unix)]
- "unix" => net_unix::accept_unix(state, args, ()).await,
- other => Err(bad_transport(other)),
- }
-}
-
-fn bad_transport(transport: &str) -> AnyError {
- generic_error(format!("Unsupported transport protocol {}", transport))
-}
-
-#[derive(Deserialize)]
-pub(crate) struct ReceiveArgs {
- pub rid: ResourceId,
- pub transport: String,
-}
-
-async fn receive_udp(
- state: Rc<RefCell<OpState>>,
- args: ReceiveArgs,
- zero_copy: Option<ZeroCopyBuf>,
-) -> Result<OpPacket, AnyError> {
- let zero_copy = zero_copy.ok_or_else(null_opbuf)?;
- let mut zero_copy = zero_copy.clone();
-
- let rid = args.rid;
-
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<UdpSocketResource>(rid)
- .ok_or_else(|| bad_resource("Socket has been closed"))?;
- let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
- let cancel_handle = RcRef::map(&resource, |r| &r.cancel);
- let (size, remote_addr) = socket
- .recv_from(&mut zero_copy)
- .try_or_cancel(cancel_handle)
- .await?;
- Ok(OpPacket {
- size,
- remote_addr: OpAddr::Udp(IpAddr {
- hostname: remote_addr.ip().to_string(),
- port: remote_addr.port(),
- }),
- })
-}
-
-async fn op_datagram_receive(
- state: Rc<RefCell<OpState>>,
- args: ReceiveArgs,
- zero_copy: Option<ZeroCopyBuf>,
-) -> Result<OpPacket, AnyError> {
- match args.transport.as_str() {
- "udp" => receive_udp(state, args, zero_copy).await,
- #[cfg(unix)]
- "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await,
- other => Err(bad_transport(other)),
- }
-}
-
-#[derive(Deserialize)]
-struct SendArgs {
- rid: ResourceId,
- transport: String,
- #[serde(flatten)]
- transport_args: ArgsEnum,
-}
-
-async fn op_datagram_send(
- state: Rc<RefCell<OpState>>,
- args: SendArgs,
- zero_copy: Option<ZeroCopyBuf>,
-) -> Result<usize, AnyError> {
- let zero_copy = zero_copy.ok_or_else(null_opbuf)?;
- let zero_copy = zero_copy.clone();
-
- match args {
- SendArgs {
- rid,
- transport,
- transport_args: ArgsEnum::Ip(args),
- } if transport == "udp" => {
- {
- let mut s = state.borrow_mut();
- s.borrow_mut::<Permissions>()
- .net
- .check(&(&args.hostname, Some(args.port)))?;
- }
- let addr = resolve_addr(&args.hostname, args.port)
- .await?
- .next()
- .ok_or_else(|| generic_error("No resolved address found"))?;
-
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<UdpSocketResource>(rid)
- .ok_or_else(|| bad_resource("Socket has been closed"))?;
- let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
- let byte_length = socket.send_to(&zero_copy, &addr).await?;
- Ok(byte_length)
- }
- #[cfg(unix)]
- SendArgs {
- rid,
- transport,
- transport_args: ArgsEnum::Unix(args),
- } if transport == "unixpacket" => {
- let address_path = Path::new(&args.path);
- {
- let mut s = state.borrow_mut();
- s.borrow_mut::<Permissions>().write.check(&address_path)?;
- }
- let resource = state
- .borrow()
- .resource_table
- .get::<net_unix::UnixDatagramResource>(rid)
- .ok_or_else(|| {
- custom_error("NotConnected", "Socket has been closed")
- })?;
- let socket = RcRef::map(&resource, |r| &r.socket)
- .try_borrow_mut()
- .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
- let byte_length = socket.send_to(&zero_copy, address_path).await?;
- Ok(byte_length)
- }
- _ => Err(type_error("Wrong argument format!")),
- }
-}
-
-#[derive(Deserialize)]
-struct ConnectArgs {
- transport: String,
- #[serde(flatten)]
- transport_args: ArgsEnum,
-}
-
-async fn op_connect(
- state: Rc<RefCell<OpState>>,
- args: ConnectArgs,
- _: (),
-) -> Result<OpConn, AnyError> {
- match args {
- ConnectArgs {
- transport,
- transport_args: ArgsEnum::Ip(args),
- } if transport == "tcp" => {
- {
- let mut state_ = state.borrow_mut();
- state_
- .borrow_mut::<Permissions>()
- .net
- .check(&(&args.hostname, Some(args.port)))?;
- }
- let addr = resolve_addr(&args.hostname, args.port)
- .await?
- .next()
- .ok_or_else(|| generic_error("No resolved address found"))?;
- let tcp_stream = TcpStream::connect(&addr).await?;
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
-
- let mut state_ = state.borrow_mut();
- let rid = state_
- .resource_table
- .add(TcpStreamResource::new(tcp_stream.into_split()));
- Ok(OpConn {
- rid,
- local_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: local_addr.ip().to_string(),
- port: local_addr.port(),
- })),
- remote_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: remote_addr.ip().to_string(),
- port: remote_addr.port(),
- })),
- })
- }
- #[cfg(unix)]
- ConnectArgs {
- transport,
- transport_args: ArgsEnum::Unix(args),
- } if transport == "unix" => {
- let address_path = Path::new(&args.path);
- super::check_unstable2(&state, "Deno.connect");
- {
- let mut state_ = state.borrow_mut();
- state_
- .borrow_mut::<Permissions>()
- .read
- .check(&address_path)?;
- state_
- .borrow_mut::<Permissions>()
- .write
- .check(&address_path)?;
- }
- let path = args.path;
- let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?;
- let local_addr = unix_stream.local_addr()?;
- let remote_addr = unix_stream.peer_addr()?;
-
- let mut state_ = state.borrow_mut();
- let resource = UnixStreamResource::new(unix_stream.into_split());
- let rid = state_.resource_table.add(resource);
- Ok(OpConn {
- rid,
- local_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
- path: local_addr.as_pathname().and_then(net_unix::pathstring),
- })),
- remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
- path: remote_addr.as_pathname().and_then(net_unix::pathstring),
- })),
- })
- }
- _ => Err(type_error("Wrong argument format!")),
- }
-}
-
-pub struct TcpListenerResource {
- pub listener: AsyncRefCell<TcpListener>,
- pub cancel: CancelHandle,
-}
-
-impl Resource for TcpListenerResource {
- fn name(&self) -> Cow<str> {
- "tcpListener".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel();
- }
-}
-
-struct UdpSocketResource {
- socket: AsyncRefCell<UdpSocket>,
- cancel: CancelHandle,
-}
-
-impl Resource for UdpSocketResource {
- fn name(&self) -> Cow<str> {
- "udpSocket".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel()
- }
-}
-
-#[derive(Deserialize)]
-struct IpListenArgs {
- hostname: String,
- port: u16,
-}
-
-#[derive(Deserialize)]
-#[serde(untagged)]
-enum ArgsEnum {
- Ip(IpListenArgs),
- #[cfg(unix)]
- Unix(net_unix::UnixListenArgs),
-}
-
-#[derive(Deserialize)]
-struct ListenArgs {
- transport: String,
- #[serde(flatten)]
- transport_args: ArgsEnum,
-}
-
-fn listen_tcp(
- state: &mut OpState,
- addr: SocketAddr,
-) -> Result<(u32, SocketAddr), AnyError> {
- let std_listener = std::net::TcpListener::bind(&addr)?;
- std_listener.set_nonblocking(true)?;
- let listener = TcpListener::from_std(std_listener)?;
- let local_addr = listener.local_addr()?;
- let listener_resource = TcpListenerResource {
- listener: AsyncRefCell::new(listener),
- cancel: Default::default(),
- };
- let rid = state.resource_table.add(listener_resource);
-
- Ok((rid, local_addr))
-}
-
-fn listen_udp(
- state: &mut OpState,
- addr: SocketAddr,
-) -> Result<(u32, SocketAddr), AnyError> {
- let std_socket = std::net::UdpSocket::bind(&addr)?;
- std_socket.set_nonblocking(true)?;
- let socket = UdpSocket::from_std(std_socket)?;
- let local_addr = socket.local_addr()?;
- let socket_resource = UdpSocketResource {
- socket: AsyncRefCell::new(socket),
- cancel: Default::default(),
- };
- let rid = state.resource_table.add(socket_resource);
-
- Ok((rid, local_addr))
-}
-
-fn op_listen(
- state: &mut OpState,
- args: ListenArgs,
- _: (),
-) -> Result<OpConn, AnyError> {
- match args {
- ListenArgs {
- transport,
- transport_args: ArgsEnum::Ip(args),
- } => {
- {
- if transport == "udp" {
- super::check_unstable(state, "Deno.listenDatagram");
- }
- state
- .borrow_mut::<Permissions>()
- .net
- .check(&(&args.hostname, Some(args.port)))?;
- }
- let addr = resolve_addr_sync(&args.hostname, args.port)?
- .next()
- .ok_or_else(|| generic_error("No resolved address found"))?;
- let (rid, local_addr) = if transport == "tcp" {
- listen_tcp(state, addr)?
- } else {
- listen_udp(state, addr)?
- };
- debug!(
- "New listener {} {}:{}",
- rid,
- local_addr.ip().to_string(),
- local_addr.port()
- );
- let ip_addr = IpAddr {
- hostname: local_addr.ip().to_string(),
- port: local_addr.port(),
- };
- Ok(OpConn {
- rid,
- local_addr: Some(match transport.as_str() {
- "udp" => OpAddr::Udp(ip_addr),
- "tcp" => OpAddr::Tcp(ip_addr),
- // NOTE: This could be unreachable!()
- other => return Err(bad_transport(other)),
- }),
- remote_addr: None,
- })
- }
- #[cfg(unix)]
- ListenArgs {
- transport,
- transport_args: ArgsEnum::Unix(args),
- } if transport == "unix" || transport == "unixpacket" => {
- let address_path = Path::new(&args.path);
- {
- if transport == "unix" {
- super::check_unstable(state, "Deno.listen");
- }
- if transport == "unixpacket" {
- super::check_unstable(state, "Deno.listenDatagram");
- }
- let permissions = state.borrow_mut::<Permissions>();
- permissions.read.check(&address_path)?;
- permissions.write.check(&address_path)?;
- }
- let (rid, local_addr) = if transport == "unix" {
- net_unix::listen_unix(state, &address_path)?
- } else {
- net_unix::listen_unix_packet(state, &address_path)?
- };
- debug!(
- "New listener {} {}",
- rid,
- local_addr.as_pathname().unwrap().display(),
- );
- let unix_addr = net_unix::UnixAddr {
- path: local_addr.as_pathname().and_then(net_unix::pathstring),
- };
-
- Ok(OpConn {
- rid,
- local_addr: Some(match transport.as_str() {
- "unix" => OpAddr::Unix(unix_addr),
- "unixpacket" => OpAddr::UnixPacket(unix_addr),
- other => return Err(bad_transport(other)),
- }),
- remote_addr: None,
- })
- }
- #[cfg(unix)]
- _ => Err(type_error("Wrong argument format!")),
- }
-}
-
-#[derive(Serialize, PartialEq, Debug)]
-#[serde(untagged)]
-enum DnsReturnRecord {
- A(String),
- Aaaa(String),
- Aname(String),
- Cname(String),
- Mx {
- preference: u16,
- exchange: String,
- },
- Ptr(String),
- Srv {
- priority: u16,
- weight: u16,
- port: u16,
- target: String,
- },
- Txt(Vec<String>),
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ResolveAddrArgs {
- query: String,
- record_type: RecordType,
- options: Option<ResolveDnsOption>,
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ResolveDnsOption {
- name_server: Option<NameServer>,
-}
-
-fn default_port() -> u16 {
- 53
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct NameServer {
- ip_addr: String,
- #[serde(default = "default_port")]
- port: u16,
-}
-
-async fn op_dns_resolve(
- state: Rc<RefCell<OpState>>,
- args: ResolveAddrArgs,
- _: (),
-) -> Result<Vec<DnsReturnRecord>, AnyError> {
- let ResolveAddrArgs {
- query,
- record_type,
- options,
- } = args;
-
- let (config, opts) = if let Some(name_server) =
- options.as_ref().and_then(|o| o.name_server.as_ref())
- {
- let group = NameServerConfigGroup::from_ips_clear(
- &[name_server.ip_addr.parse()?],
- name_server.port,
- true,
- );
- (
- ResolverConfig::from_parts(None, vec![], group),
- ResolverOpts::default(),
- )
- } else {
- system_conf::read_system_conf()?
- };
-
- {
- let mut s = state.borrow_mut();
- let perm = s.borrow_mut::<Permissions>();
-
- // Checks permission against the name servers which will be actually queried.
- for ns in config.name_servers() {
- let socker_addr = &ns.socket_addr;
- let ip = socker_addr.ip().to_string();
- let port = socker_addr.port();
- perm.net.check(&(ip, Some(port)))?;
- }
- }
-
- let resolver = AsyncResolver::tokio(config, opts)?;
-
- let results = resolver
- .lookup(query, record_type, Default::default())
- .await
- .map_err(|e| generic_error(format!("{}", e)))?
- .iter()
- .filter_map(rdata_to_return_record(record_type))
- .collect();
-
- Ok(results)
-}
-
-fn rdata_to_return_record(
- ty: RecordType,
-) -> impl Fn(&RData) -> Option<DnsReturnRecord> {
- use RecordType::*;
- move |r: &RData| -> Option<DnsReturnRecord> {
- match ty {
- A => r.as_a().map(ToString::to_string).map(DnsReturnRecord::A),
- AAAA => r
- .as_aaaa()
- .map(ToString::to_string)
- .map(DnsReturnRecord::Aaaa),
- ANAME => r
- .as_aname()
- .map(ToString::to_string)
- .map(DnsReturnRecord::Aname),
- CNAME => r
- .as_cname()
- .map(ToString::to_string)
- .map(DnsReturnRecord::Cname),
- MX => r.as_mx().map(|mx| DnsReturnRecord::Mx {
- preference: mx.preference(),
- exchange: mx.exchange().to_string(),
- }),
- PTR => r
- .as_ptr()
- .map(ToString::to_string)
- .map(DnsReturnRecord::Ptr),
- SRV => r.as_srv().map(|srv| DnsReturnRecord::Srv {
- priority: srv.priority(),
- weight: srv.weight(),
- port: srv.port(),
- target: srv.target().to_string(),
- }),
- TXT => r.as_txt().map(|txt| {
- let texts: Vec<String> = txt
- .iter()
- .map(|bytes| {
- // Tries to parse these bytes as Latin-1
- bytes.iter().map(|&b| b as char).collect::<String>()
- })
- .collect();
- DnsReturnRecord::Txt(texts)
- }),
- // TODO(magurotuna): Other record types are not supported
- _ => todo!(),
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use std::net::Ipv4Addr;
- use std::net::Ipv6Addr;
- use trust_dns_proto::rr::rdata::mx::MX;
- use trust_dns_proto::rr::rdata::srv::SRV;
- use trust_dns_proto::rr::rdata::txt::TXT;
- use trust_dns_proto::rr::record_data::RData;
- use trust_dns_proto::rr::Name;
-
- #[test]
- fn rdata_to_return_record_a() {
- let func = rdata_to_return_record(RecordType::A);
- let rdata = RData::A(Ipv4Addr::new(127, 0, 0, 1));
- assert_eq!(
- func(&rdata),
- Some(DnsReturnRecord::A("127.0.0.1".to_string()))
- );
- }
-
- #[test]
- fn rdata_to_return_record_aaaa() {
- let func = rdata_to_return_record(RecordType::AAAA);
- let rdata = RData::AAAA(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
- assert_eq!(func(&rdata), Some(DnsReturnRecord::Aaaa("::1".to_string())));
- }
-
- #[test]
- fn rdata_to_return_record_aname() {
- let func = rdata_to_return_record(RecordType::ANAME);
- let rdata = RData::ANAME(Name::new());
- assert_eq!(func(&rdata), Some(DnsReturnRecord::Aname("".to_string())));
- }
-
- #[test]
- fn rdata_to_return_record_cname() {
- let func = rdata_to_return_record(RecordType::CNAME);
- let rdata = RData::CNAME(Name::new());
- assert_eq!(func(&rdata), Some(DnsReturnRecord::Cname("".to_string())));
- }
-
- #[test]
- fn rdata_to_return_record_mx() {
- let func = rdata_to_return_record(RecordType::MX);
- let rdata = RData::MX(MX::new(10, Name::new()));
- assert_eq!(
- func(&rdata),
- Some(DnsReturnRecord::Mx {
- preference: 10,
- exchange: "".to_string()
- })
- );
- }
-
- #[test]
- fn rdata_to_return_record_ptr() {
- let func = rdata_to_return_record(RecordType::PTR);
- let rdata = RData::PTR(Name::new());
- assert_eq!(func(&rdata), Some(DnsReturnRecord::Ptr("".to_string())));
- }
-
- #[test]
- fn rdata_to_return_record_srv() {
- let func = rdata_to_return_record(RecordType::SRV);
- let rdata = RData::SRV(SRV::new(1, 2, 3, Name::new()));
- assert_eq!(
- func(&rdata),
- Some(DnsReturnRecord::Srv {
- priority: 1,
- weight: 2,
- port: 3,
- target: "".to_string()
- })
- );
- }
-
- #[test]
- fn rdata_to_return_record_txt() {
- let func = rdata_to_return_record(RecordType::TXT);
- let rdata = RData::TXT(TXT::from_bytes(vec![
- "foo".as_bytes(),
- "bar".as_bytes(),
- &[0xa3], // "£" in Latin-1
- &[0xe3, 0x81, 0x82], // "あ" in UTF-8
- ]));
- assert_eq!(
- func(&rdata),
- Some(DnsReturnRecord::Txt(vec![
- "foo".to_string(),
- "bar".to_string(),
- "£".to_string(),
- "ã\u{81}\u{82}".to_string(),
- ]))
- );
- }
-}
diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs
deleted file mode 100644
index d56dc76d9..000000000
--- a/runtime/ops/net_unix.rs
+++ /dev/null
@@ -1,173 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-
-use super::utils::into_string;
-use crate::ops::io::UnixStreamResource;
-use crate::ops::net::AcceptArgs;
-use crate::ops::net::OpAddr;
-use crate::ops::net::OpConn;
-use crate::ops::net::OpPacket;
-use crate::ops::net::ReceiveArgs;
-use deno_core::error::bad_resource;
-use deno_core::error::custom_error;
-use deno_core::error::null_opbuf;
-use deno_core::error::AnyError;
-use deno_core::AsyncRefCell;
-use deno_core::CancelHandle;
-use deno_core::CancelTryFuture;
-use deno_core::OpState;
-use deno_core::RcRef;
-use deno_core::Resource;
-use deno_core::ZeroCopyBuf;
-use serde::Deserialize;
-use serde::Serialize;
-use std::borrow::Cow;
-use std::cell::RefCell;
-use std::fs::remove_file;
-use std::path::Path;
-use std::rc::Rc;
-use tokio::net::UnixDatagram;
-use tokio::net::UnixListener;
-pub use tokio::net::UnixStream;
-
-struct UnixListenerResource {
- listener: AsyncRefCell<UnixListener>,
- cancel: CancelHandle,
-}
-
-impl Resource for UnixListenerResource {
- fn name(&self) -> Cow<str> {
- "unixListener".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel();
- }
-}
-
-pub struct UnixDatagramResource {
- pub socket: AsyncRefCell<UnixDatagram>,
- pub cancel: CancelHandle,
-}
-
-impl Resource for UnixDatagramResource {
- fn name(&self) -> Cow<str> {
- "unixDatagram".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel();
- }
-}
-
-#[derive(Serialize)]
-pub struct UnixAddr {
- pub path: Option<String>,
-}
-
-#[derive(Deserialize)]
-pub struct UnixListenArgs {
- pub path: String,
-}
-
-pub(crate) async fn accept_unix(
- state: Rc<RefCell<OpState>>,
- args: AcceptArgs,
- _: (),
-) -> Result<OpConn, AnyError> {
- let rid = args.rid;
-
- let resource = state
- .borrow()
- .resource_table
- .get::<UnixListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = RcRef::map(&resource, |r| &r.listener)
- .try_borrow_mut()
- .ok_or_else(|| custom_error("Busy", "Listener already in use"))?;
- let cancel = RcRef::map(resource, |r| &r.cancel);
- let (unix_stream, _socket_addr) =
- listener.accept().try_or_cancel(cancel).await?;
-
- let local_addr = unix_stream.local_addr()?;
- let remote_addr = unix_stream.peer_addr()?;
- let resource = UnixStreamResource::new(unix_stream.into_split());
- let mut state = state.borrow_mut();
- let rid = state.resource_table.add(resource);
- Ok(OpConn {
- rid,
- local_addr: Some(OpAddr::Unix(UnixAddr {
- path: local_addr.as_pathname().and_then(pathstring),
- })),
- remote_addr: Some(OpAddr::Unix(UnixAddr {
- path: remote_addr.as_pathname().and_then(pathstring),
- })),
- })
-}
-
-pub(crate) async fn receive_unix_packet(
- state: Rc<RefCell<OpState>>,
- args: ReceiveArgs,
- buf: Option<ZeroCopyBuf>,
-) -> Result<OpPacket, AnyError> {
- let mut buf = buf.ok_or_else(null_opbuf)?;
-
- let rid = args.rid;
-
- let resource = state
- .borrow()
- .resource_table
- .get::<UnixDatagramResource>(rid)
- .ok_or_else(|| bad_resource("Socket has been closed"))?;
- let socket = RcRef::map(&resource, |r| &r.socket)
- .try_borrow_mut()
- .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
- let cancel = RcRef::map(resource, |r| &r.cancel);
- let (size, remote_addr) =
- socket.recv_from(&mut buf).try_or_cancel(cancel).await?;
- Ok(OpPacket {
- size,
- remote_addr: OpAddr::UnixPacket(UnixAddr {
- path: remote_addr.as_pathname().and_then(pathstring),
- }),
- })
-}
-
-pub fn listen_unix(
- state: &mut OpState,
- addr: &Path,
-) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
- if addr.exists() {
- remove_file(&addr).unwrap();
- }
- let listener = UnixListener::bind(&addr)?;
- let local_addr = listener.local_addr()?;
- let listener_resource = UnixListenerResource {
- listener: AsyncRefCell::new(listener),
- cancel: Default::default(),
- };
- let rid = state.resource_table.add(listener_resource);
-
- Ok((rid, local_addr))
-}
-
-pub fn listen_unix_packet(
- state: &mut OpState,
- addr: &Path,
-) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
- if addr.exists() {
- remove_file(&addr).unwrap();
- }
- let socket = UnixDatagram::bind(&addr)?;
- let local_addr = socket.local_addr()?;
- let datagram_resource = UnixDatagramResource {
- socket: AsyncRefCell::new(socket),
- cancel: Default::default(),
- };
- let rid = state.resource_table.add(datagram_resource);
-
- Ok((rid, local_addr))
-}
-
-pub fn pathstring(pathname: &Path) -> Option<String> {
- into_string(pathname.into()).ok()
-}
diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs
deleted file mode 100644
index c3f554856..000000000
--- a/runtime/ops/tls.rs
+++ /dev/null
@@ -1,1017 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-
-pub use rustls;
-pub use webpki;
-
-use crate::ops::io::TcpStreamResource;
-use crate::ops::io::TlsStreamResource;
-use crate::ops::net::IpAddr;
-use crate::ops::net::OpAddr;
-use crate::ops::net::OpConn;
-use crate::permissions::Permissions;
-use crate::resolve_addr::resolve_addr;
-use crate::resolve_addr::resolve_addr_sync;
-use deno_core::error::bad_resource;
-use deno_core::error::bad_resource_id;
-use deno_core::error::custom_error;
-use deno_core::error::generic_error;
-use deno_core::error::invalid_hostname;
-use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
-use deno_core::futures::ready;
-use deno_core::futures::task::noop_waker_ref;
-use deno_core::futures::task::AtomicWaker;
-use deno_core::futures::task::Context;
-use deno_core::futures::task::Poll;
-use deno_core::futures::task::RawWaker;
-use deno_core::futures::task::RawWakerVTable;
-use deno_core::futures::task::Waker;
-use deno_core::op_async;
-use deno_core::op_sync;
-use deno_core::AsyncRefCell;
-use deno_core::CancelHandle;
-use deno_core::CancelTryFuture;
-use deno_core::Extension;
-use deno_core::OpState;
-use deno_core::RcRef;
-use deno_core::Resource;
-use deno_core::ResourceId;
-use io::Error;
-use io::Read;
-use io::Write;
-use rustls::internal::pemfile::certs;
-use rustls::internal::pemfile::pkcs8_private_keys;
-use rustls::internal::pemfile::rsa_private_keys;
-use rustls::Certificate;
-use rustls::ClientConfig;
-use rustls::ClientSession;
-use rustls::NoClientAuth;
-use rustls::PrivateKey;
-use rustls::ServerConfig;
-use rustls::ServerSession;
-use rustls::Session;
-use rustls::StoresClientSessions;
-use serde::Deserialize;
-use std::borrow::Cow;
-use std::cell::RefCell;
-use std::collections::HashMap;
-use std::convert::From;
-use std::fs::File;
-use std::io;
-use std::io::BufReader;
-use std::io::ErrorKind;
-use std::ops::Deref;
-use std::ops::DerefMut;
-use std::path::Path;
-use std::pin::Pin;
-use std::rc::Rc;
-use std::sync::Arc;
-use std::sync::Mutex;
-use std::sync::Weak;
-use tokio::io::AsyncRead;
-use tokio::io::AsyncWrite;
-use tokio::io::ReadBuf;
-use tokio::net::TcpListener;
-use tokio::net::TcpStream;
-use tokio::task::spawn_local;
-use webpki::DNSNameRef;
-
-lazy_static::lazy_static! {
- static ref CLIENT_SESSION_MEMORY_CACHE: Arc<ClientSessionMemoryCache> =
- Arc::new(ClientSessionMemoryCache::default());
-}
-
-#[derive(Default)]
-struct ClientSessionMemoryCache(Mutex<HashMap<Vec<u8>, Vec<u8>>>);
-
-impl StoresClientSessions for ClientSessionMemoryCache {
- fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
- self.0.lock().unwrap().get(key).cloned()
- }
-
- fn put(&self, key: Vec<u8>, value: Vec<u8>) -> bool {
- let mut sessions = self.0.lock().unwrap();
- // TODO(bnoordhuis) Evict sessions LRU-style instead of arbitrarily.
- while sessions.len() >= 1024 {
- let key = sessions.keys().next().unwrap().clone();
- sessions.remove(&key);
- }
- sessions.insert(key, value);
- true
- }
-}
-
-#[derive(Debug)]
-enum TlsSession {
- Client(ClientSession),
- Server(ServerSession),
-}
-
-impl Deref for TlsSession {
- type Target = dyn Session;
-
- fn deref(&self) -> &Self::Target {
- match self {
- TlsSession::Client(client_session) => client_session,
- TlsSession::Server(server_session) => server_session,
- }
- }
-}
-
-impl DerefMut for TlsSession {
- fn deref_mut(&mut self) -> &mut Self::Target {
- match self {
- TlsSession::Client(client_session) => client_session,
- TlsSession::Server(server_session) => server_session,
- }
- }
-}
-
-impl From<ClientSession> for TlsSession {
- fn from(client_session: ClientSession) -> Self {
- TlsSession::Client(client_session)
- }
-}
-
-impl From<ServerSession> for TlsSession {
- fn from(server_session: ServerSession) -> Self {
- TlsSession::Server(server_session)
- }
-}
-
-#[derive(Copy, Clone, Debug, Eq, PartialEq)]
-enum Flow {
- Read,
- Write,
-}
-
-#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
-enum State {
- StreamOpen,
- StreamClosed,
- TlsClosing,
- TlsClosed,
- TcpClosed,
-}
-
-#[derive(Debug)]
-pub struct TlsStream(Option<TlsStreamInner>);
-
-impl TlsStream {
- fn new(tcp: TcpStream, tls: TlsSession) -> Self {
- let inner = TlsStreamInner {
- tcp,
- tls,
- rd_state: State::StreamOpen,
- wr_state: State::StreamOpen,
- };
- Self(Some(inner))
- }
-
- pub fn new_client_side(
- tcp: TcpStream,
- tls_config: &Arc<ClientConfig>,
- hostname: DNSNameRef,
- ) -> Self {
- let tls = TlsSession::Client(ClientSession::new(tls_config, hostname));
- Self::new(tcp, tls)
- }
-
- pub fn new_server_side(
- tcp: TcpStream,
- tls_config: &Arc<ServerConfig>,
- ) -> Self {
- let tls = TlsSession::Server(ServerSession::new(tls_config));
- Self::new(tcp, tls)
- }
-
- pub async fn handshake(&mut self) -> io::Result<()> {
- poll_fn(|cx| self.inner_mut().poll_io(cx, Flow::Write)).await
- }
-
- fn into_split(self) -> (ReadHalf, WriteHalf) {
- let shared = Shared::new(self);
- let rd = ReadHalf {
- shared: shared.clone(),
- };
- let wr = WriteHalf { shared };
- (rd, wr)
- }
-
- /// Tokio-rustls compatibility: returns a reference to the underlying TCP
- /// stream, and a reference to the Rustls `Session` object.
- pub fn get_ref(&self) -> (&TcpStream, &dyn Session) {
- let inner = self.0.as_ref().unwrap();
- (&inner.tcp, &*inner.tls)
- }
-
- fn inner_mut(&mut self) -> &mut TlsStreamInner {
- self.0.as_mut().unwrap()
- }
-}
-
-impl AsyncRead for TlsStream {
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut ReadBuf<'_>,
- ) -> Poll<io::Result<()>> {
- self.inner_mut().poll_read(cx, buf)
- }
-}
-
-impl AsyncWrite for TlsStream {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- self.inner_mut().poll_write(cx, buf)
- }
-
- fn poll_flush(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<()>> {
- self.inner_mut().poll_io(cx, Flow::Write)
- // The underlying TCP stream does not need to be flushed.
- }
-
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<()>> {
- self.inner_mut().poll_shutdown(cx)
- }
-}
-
-impl Drop for TlsStream {
- fn drop(&mut self) {
- let mut inner = self.0.take().unwrap();
-
- let mut cx = Context::from_waker(noop_waker_ref());
- let use_linger_task = inner.poll_close(&mut cx).is_pending();
-
- if use_linger_task {
- spawn_local(poll_fn(move |cx| inner.poll_close(cx)));
- } else if cfg!(debug_assertions) {
- spawn_local(async {}); // Spawn dummy task to detect missing LocalSet.
- }
- }
-}
-
-#[derive(Debug)]
-pub struct TlsStreamInner {
- tls: TlsSession,
- tcp: TcpStream,
- rd_state: State,
- wr_state: State,
-}
-
-impl TlsStreamInner {
- fn poll_io(
- &mut self,
- cx: &mut Context<'_>,
- flow: Flow,
- ) -> Poll<io::Result<()>> {
- loop {
- let wr_ready = loop {
- match self.wr_state {
- _ if self.tls.is_handshaking() && !self.tls.wants_write() => {
- break true;
- }
- _ if self.tls.is_handshaking() => {}
- State::StreamOpen if !self.tls.wants_write() => break true,
- State::StreamClosed => {
- // Rustls will enqueue the 'CloseNotify' alert and send it after
- // flusing the data that is already in the queue.
- self.tls.send_close_notify();
- self.wr_state = State::TlsClosing;
- continue;
- }
- State::TlsClosing if !self.tls.wants_write() => {
- self.wr_state = State::TlsClosed;
- continue;
- }
- // If a 'CloseNotify' alert sent by the remote end has been received,
- // shut down the underlying TCP socket. Otherwise, consider polling
- // done for the moment.
- State::TlsClosed if self.rd_state < State::TlsClosed => break true,
- State::TlsClosed
- if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() =>
- {
- break false;
- }
- State::TlsClosed => {
- self.wr_state = State::TcpClosed;
- continue;
- }
- State::TcpClosed => break true,
- _ => {}
- }
-
- // Poll whether there is space in the socket send buffer so we can flush
- // the remaining outgoing ciphertext.
- if self.tcp.poll_write_ready(cx)?.is_pending() {
- break false;
- }
-
- // Write ciphertext to the TCP socket.
- let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp);
- match self.tls.write_tls(&mut wrapped_tcp) {
- Ok(0) => unreachable!(),
- Ok(_) => {}
- Err(err) if err.kind() == ErrorKind::WouldBlock => {}
- Err(err) => return Poll::Ready(Err(err)),
- }
- };
-
- let rd_ready = loop {
- match self.rd_state {
- State::TcpClosed if self.tls.is_handshaking() => {
- let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof");
- return Poll::Ready(Err(err));
- }
- _ if self.tls.is_handshaking() && !self.tls.wants_read() => {
- break true;
- }
- _ if self.tls.is_handshaking() => {}
- State::StreamOpen if !self.tls.wants_read() => break true,
- State::StreamOpen => {}
- State::StreamClosed if !self.tls.wants_read() => {
- // Rustls has more incoming cleartext buffered up, but the TLS
- // session is closing so this data will never be processed by the
- // application layer. Just like what would happen if this were a raw
- // TCP stream, don't gracefully end the TLS session, but abort it.
- return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset)));
- }
- State::StreamClosed => {}
- State::TlsClosed if self.wr_state == State::TcpClosed => {
- // Wait for the remote end to gracefully close the TCP connection.
- // TODO(piscisaureus): this is unnecessary; remove when stable.
- }
- _ => break true,
- }
-
- if self.rd_state < State::TlsClosed {
- // Do a zero-length plaintext read so we can detect the arrival of
- // 'CloseNotify' messages, even if only the write half is open.
- // Actually reading data from the socket is done in `poll_read()`.
- match self.tls.read(&mut []) {
- Ok(0) => {}
- Err(err) if err.kind() == ErrorKind::ConnectionAborted => {
- // `Session::read()` returns `ConnectionAborted` when a
- // 'CloseNotify' alert has been received, which indicates that
- // the remote peer wants to gracefully end the TLS session.
- self.rd_state = State::TlsClosed;
- continue;
- }
- Err(err) => return Poll::Ready(Err(err)),
- _ => unreachable!(),
- }
- }
-
- // Poll whether more ciphertext is available in the socket receive
- // buffer.
- if self.tcp.poll_read_ready(cx)?.is_pending() {
- break false;
- }
-
- // Receive ciphertext from the socket.
- let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp);
- match self.tls.read_tls(&mut wrapped_tcp) {
- Ok(0) => self.rd_state = State::TcpClosed,
- Ok(_) => self
- .tls
- .process_new_packets()
- .map_err(|err| Error::new(ErrorKind::InvalidData, err))?,
- Err(err) if err.kind() == ErrorKind::WouldBlock => {}
- Err(err) => return Poll::Ready(Err(err)),
- }
- };
-
- if wr_ready {
- if self.rd_state >= State::TlsClosed
- && self.wr_state >= State::TlsClosed
- && self.wr_state < State::TcpClosed
- {
- continue;
- }
- if self.tls.wants_write() {
- continue;
- }
- }
-
- let io_ready = match flow {
- _ if self.tls.is_handshaking() => false,
- Flow::Read => rd_ready,
- Flow::Write => wr_ready,
- };
- return match io_ready {
- false => Poll::Pending,
- true => Poll::Ready(Ok(())),
- };
- }
- }
-
- fn poll_read(
- &mut self,
- cx: &mut Context<'_>,
- buf: &mut ReadBuf<'_>,
- ) -> Poll<io::Result<()>> {
- ready!(self.poll_io(cx, Flow::Read))?;
-
- if self.rd_state == State::StreamOpen {
- let buf_slice =
- unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) };
- let bytes_read = self.tls.read(buf_slice)?;
- assert_ne!(bytes_read, 0);
- unsafe { buf.assume_init(bytes_read) };
- buf.advance(bytes_read);
- }
-
- Poll::Ready(Ok(()))
- }
-
- fn poll_write(
- &mut self,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- if buf.is_empty() {
- // Tokio-rustls compatibility: a zero byte write always succeeds.
- Poll::Ready(Ok(0))
- } else if self.wr_state == State::StreamOpen {
- // Flush Rustls' ciphertext send queue.
- ready!(self.poll_io(cx, Flow::Write))?;
-
- // Copy data from `buf` to the Rustls cleartext send queue.
- let bytes_written = self.tls.write(buf)?;
- assert_ne!(bytes_written, 0);
-
- // Try to flush as much ciphertext as possible. However, since we just
- // handed off at least some bytes to rustls, so we can't return
- // `Poll::Pending()` any more: this would tell the caller that it should
- // try to send those bytes again.
- let _ = self.poll_io(cx, Flow::Write)?;
-
- Poll::Ready(Ok(bytes_written))
- } else {
- // Return error if stream has been shut down for writing.
- Poll::Ready(Err(ErrorKind::BrokenPipe.into()))
- }
- }
-
- fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- if self.wr_state == State::StreamOpen {
- self.wr_state = State::StreamClosed;
- }
-
- ready!(self.poll_io(cx, Flow::Write))?;
-
- // At minimum, a TLS 'CloseNotify' alert should have been sent.
- assert!(self.wr_state >= State::TlsClosed);
- // If we received a TLS 'CloseNotify' alert from the remote end
- // already, the TCP socket should be shut down at this point.
- assert!(
- self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed
- );
-
- Poll::Ready(Ok(()))
- }
-
- fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- if self.rd_state == State::StreamOpen {
- self.rd_state = State::StreamClosed;
- }
-
- // Send TLS 'CloseNotify' alert.
- ready!(self.poll_shutdown(cx))?;
- // Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet.
- ready!(self.poll_io(cx, Flow::Read))?;
-
- assert_eq!(self.rd_state, State::TcpClosed);
- assert_eq!(self.wr_state, State::TcpClosed);
-
- Poll::Ready(Ok(()))
- }
-}
-
-#[derive(Debug)]
-pub struct ReadHalf {
- shared: Arc<Shared>,
-}
-
-impl ReadHalf {
- pub fn reunite(self, wr: WriteHalf) -> TlsStream {
- assert!(Arc::ptr_eq(&self.shared, &wr.shared));
- drop(wr); // Drop `wr`, so only one strong reference to `shared` remains.
-
- Arc::try_unwrap(self.shared)
- .unwrap_or_else(|_| panic!("Arc::<Shared>::try_unwrap() failed"))
- .tls_stream
- .into_inner()
- .unwrap()
- }
-}
-
-impl AsyncRead for ReadHalf {
- fn poll_read(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut ReadBuf<'_>,
- ) -> Poll<io::Result<()>> {
- self
- .shared
- .poll_with_shared_waker(cx, Flow::Read, move |tls, cx| {
- tls.poll_read(cx, buf)
- })
- }
-}
-
-#[derive(Debug)]
-pub struct WriteHalf {
- shared: Arc<Shared>,
-}
-
-impl AsyncWrite for WriteHalf {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- self
- .shared
- .poll_with_shared_waker(cx, Flow::Write, move |tls, cx| {
- tls.poll_write(cx, buf)
- })
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<()>> {
- self
- .shared
- .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx))
- }
-
- fn poll_shutdown(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<()>> {
- self
- .shared
- .poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx))
- }
-}
-
-#[derive(Debug)]
-struct Shared {
- tls_stream: Mutex<TlsStream>,
- rd_waker: AtomicWaker,
- wr_waker: AtomicWaker,
-}
-
-impl Shared {
- fn new(tls_stream: TlsStream) -> Arc<Self> {
- let self_ = Self {
- tls_stream: Mutex::new(tls_stream),
- rd_waker: AtomicWaker::new(),
- wr_waker: AtomicWaker::new(),
- };
- Arc::new(self_)
- }
-
- fn poll_with_shared_waker<R>(
- self: &Arc<Self>,
- cx: &mut Context<'_>,
- flow: Flow,
- mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R,
- ) -> R {
- match flow {
- Flow::Read => self.rd_waker.register(cx.waker()),
- Flow::Write => self.wr_waker.register(cx.waker()),
- }
-
- let shared_waker = self.new_shared_waker();
- let mut cx = Context::from_waker(&shared_waker);
-
- let mut tls_stream = self.tls_stream.lock().unwrap();
- f(Pin::new(&mut tls_stream), &mut cx)
- }
-
- const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
- Self::clone_shared_waker,
- Self::wake_shared_waker,
- Self::wake_shared_waker_by_ref,
- Self::drop_shared_waker,
- );
-
- fn new_shared_waker(self: &Arc<Self>) -> Waker {
- let self_weak = Arc::downgrade(self);
- let self_ptr = self_weak.into_raw() as *const ();
- let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE);
- unsafe { Waker::from_raw(raw_waker) }
- }
-
- fn clone_shared_waker(self_ptr: *const ()) -> RawWaker {
- let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) };
- let ptr1 = self_weak.clone().into_raw();
- let ptr2 = self_weak.into_raw();
- assert!(ptr1 == ptr2);
- RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE)
- }
-
- fn wake_shared_waker(self_ptr: *const ()) {
- Self::wake_shared_waker_by_ref(self_ptr);
- Self::drop_shared_waker(self_ptr);
- }
-
- fn wake_shared_waker_by_ref(self_ptr: *const ()) {
- let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) };
- if let Some(self_arc) = Weak::upgrade(&self_weak) {
- self_arc.rd_waker.wake();
- self_arc.wr_waker.wake();
- }
- self_weak.into_raw();
- }
-
- fn drop_shared_waker(self_ptr: *const ()) {
- let _ = unsafe { Weak::from_raw(self_ptr as *const Self) };
- }
-}
-
-struct ImplementReadTrait<'a, T>(&'a mut T);
-
-impl Read for ImplementReadTrait<'_, TcpStream> {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.0.try_read(buf)
- }
-}
-
-struct ImplementWriteTrait<'a, T>(&'a mut T);
-
-impl Write for ImplementWriteTrait<'_, TcpStream> {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.0.try_write(buf)
- }
-
- fn flush(&mut self) -> io::Result<()> {
- Ok(())
- }
-}
-
-pub fn init() -> Extension {
- Extension::builder()
- .ops(vec![
- ("op_start_tls", op_async(op_start_tls)),
- ("op_connect_tls", op_async(op_connect_tls)),
- ("op_listen_tls", op_sync(op_listen_tls)),
- ("op_accept_tls", op_async(op_accept_tls)),
- ])
- .build()
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ConnectTlsArgs {
- transport: String,
- hostname: String,
- port: u16,
- cert_file: Option<String>,
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct StartTlsArgs {
- rid: ResourceId,
- cert_file: Option<String>,
- hostname: String,
-}
-
-async fn op_start_tls(
- state: Rc<RefCell<OpState>>,
- args: StartTlsArgs,
- _: (),
-) -> Result<OpConn, AnyError> {
- let rid = args.rid;
- let hostname = match &*args.hostname {
- "" => "localhost",
- n => n,
- };
- let cert_file = args.cert_file.as_deref();
-
- {
- super::check_unstable2(&state, "Deno.startTls");
- let mut s = state.borrow_mut();
- let permissions = s.borrow_mut::<Permissions>();
- permissions.net.check(&(hostname, Some(0)))?;
- if let Some(path) = cert_file {
- permissions.read.check(Path::new(path))?;
- }
- }
-
- let hostname_dns = DNSNameRef::try_from_ascii_str(hostname)
- .map_err(|_| invalid_hostname(hostname))?;
-
- let resource_rc = state
- .borrow_mut()
- .resource_table
- .take::<TcpStreamResource>(rid)
- .ok_or_else(bad_resource_id)?;
- let resource = Rc::try_unwrap(resource_rc)
- .expect("Only a single use of this resource should happen");
- let (read_half, write_half) = resource.into_inner();
- let tcp_stream = read_half.reunite(write_half)?;
-
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
-
- let mut tls_config = ClientConfig::new();
- tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone());
- tls_config
- .root_store
- .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
- if let Some(path) = cert_file {
- let key_file = File::open(path)?;
- let reader = &mut BufReader::new(key_file);
- tls_config.root_store.add_pem_file(reader).unwrap();
- }
- let tls_config = Arc::new(tls_config);
-
- let tls_stream =
- TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns);
-
- let rid = {
- let mut state_ = state.borrow_mut();
- state_
- .resource_table
- .add(TlsStreamResource::new(tls_stream.into_split()))
- };
-
- Ok(OpConn {
- rid,
- local_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: local_addr.ip().to_string(),
- port: local_addr.port(),
- })),
- remote_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: remote_addr.ip().to_string(),
- port: remote_addr.port(),
- })),
- })
-}
-
-async fn op_connect_tls(
- state: Rc<RefCell<OpState>>,
- args: ConnectTlsArgs,
- _: (),
-) -> Result<OpConn, AnyError> {
- assert_eq!(args.transport, "tcp");
- let hostname = match &*args.hostname {
- "" => "localhost",
- n => n,
- };
- let port = args.port;
- let cert_file = args.cert_file.as_deref();
-
- {
- let mut s = state.borrow_mut();
- let permissions = s.borrow_mut::<Permissions>();
- permissions.net.check(&(hostname, Some(port)))?;
- if let Some(path) = cert_file {
- permissions.read.check(Path::new(path))?;
- }
- }
-
- let hostname_dns = DNSNameRef::try_from_ascii_str(hostname)
- .map_err(|_| invalid_hostname(hostname))?;
-
- let connect_addr = resolve_addr(hostname, port)
- .await?
- .next()
- .ok_or_else(|| generic_error("No resolved address found"))?;
- let tcp_stream = TcpStream::connect(connect_addr).await?;
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
-
- let mut tls_config = ClientConfig::new();
- tls_config.set_persistence(CLIENT_SESSION_MEMORY_CACHE.clone());
- tls_config
- .root_store
- .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
- if let Some(path) = cert_file {
- let key_file = File::open(path)?;
- let reader = &mut BufReader::new(key_file);
- tls_config.root_store.add_pem_file(reader).unwrap();
- }
- let tls_config = Arc::new(tls_config);
-
- let tls_stream =
- TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns);
-
- let rid = {
- let mut state_ = state.borrow_mut();
- state_
- .resource_table
- .add(TlsStreamResource::new(tls_stream.into_split()))
- };
-
- Ok(OpConn {
- rid,
- local_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: local_addr.ip().to_string(),
- port: local_addr.port(),
- })),
- remote_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: remote_addr.ip().to_string(),
- port: remote_addr.port(),
- })),
- })
-}
-
-fn load_certs(path: &str) -> Result<Vec<Certificate>, AnyError> {
- let cert_file = File::open(path)?;
- let reader = &mut BufReader::new(cert_file);
-
- let certs = certs(reader)
- .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?;
-
- if certs.is_empty() {
- let e = custom_error("InvalidData", "No certificates found in cert file");
- return Err(e);
- }
-
- Ok(certs)
-}
-
-fn key_decode_err() -> AnyError {
- custom_error("InvalidData", "Unable to decode key")
-}
-
-fn key_not_found_err() -> AnyError {
- custom_error("InvalidData", "No keys found in key file")
-}
-
-/// Starts with -----BEGIN RSA PRIVATE KEY-----
-fn load_rsa_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
- let key_file = File::open(path)?;
- let reader = &mut BufReader::new(key_file);
- let keys = rsa_private_keys(reader).map_err(|_| key_decode_err())?;
- Ok(keys)
-}
-
-/// Starts with -----BEGIN PRIVATE KEY-----
-fn load_pkcs8_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
- let key_file = File::open(path)?;
- let reader = &mut BufReader::new(key_file);
- let keys = pkcs8_private_keys(reader).map_err(|_| key_decode_err())?;
- Ok(keys)
-}
-
-fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
- let path = path.to_string();
- let mut keys = load_rsa_keys(&path)?;
-
- if keys.is_empty() {
- keys = load_pkcs8_keys(&path)?;
- }
-
- if keys.is_empty() {
- return Err(key_not_found_err());
- }
-
- Ok(keys)
-}
-
-pub struct TlsListenerResource {
- tcp_listener: AsyncRefCell<TcpListener>,
- tls_config: Arc<ServerConfig>,
- cancel_handle: CancelHandle,
-}
-
-impl Resource for TlsListenerResource {
- fn name(&self) -> Cow<str> {
- "tlsListener".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel_handle.cancel();
- }
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ListenTlsArgs {
- transport: String,
- hostname: String,
- port: u16,
- cert_file: String,
- key_file: String,
- alpn_protocols: Option<Vec<String>>,
-}
-
-fn op_listen_tls(
- state: &mut OpState,
- args: ListenTlsArgs,
- _: (),
-) -> Result<OpConn, AnyError> {
- assert_eq!(args.transport, "tcp");
- let hostname = &*args.hostname;
- let port = args.port;
- let cert_file = &*args.cert_file;
- let key_file = &*args.key_file;
-
- {
- let permissions = state.borrow_mut::<Permissions>();
- permissions.net.check(&(hostname, Some(port)))?;
- permissions.read.check(Path::new(cert_file))?;
- permissions.read.check(Path::new(key_file))?;
- }
-
- let mut tls_config = ServerConfig::new(NoClientAuth::new());
- if let Some(alpn_protocols) = args.alpn_protocols {
- super::check_unstable(state, "Deno.listenTls#alpn_protocols");
- tls_config.alpn_protocols =
- alpn_protocols.into_iter().map(|s| s.into_bytes()).collect();
- }
- tls_config
- .set_single_cert(load_certs(cert_file)?, load_keys(key_file)?.remove(0))
- .expect("invalid key or certificate");
-
- let bind_addr = resolve_addr_sync(hostname, port)?
- .next()
- .ok_or_else(|| generic_error("No resolved address found"))?;
- let std_listener = std::net::TcpListener::bind(bind_addr)?;
- std_listener.set_nonblocking(true)?;
- let tcp_listener = TcpListener::from_std(std_listener)?;
- let local_addr = tcp_listener.local_addr()?;
-
- let tls_listener_resource = TlsListenerResource {
- tcp_listener: AsyncRefCell::new(tcp_listener),
- tls_config: Arc::new(tls_config),
- cancel_handle: Default::default(),
- };
-
- let rid = state.resource_table.add(tls_listener_resource);
-
- Ok(OpConn {
- rid,
- local_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: local_addr.ip().to_string(),
- port: local_addr.port(),
- })),
- remote_addr: None,
- })
-}
-
-async fn op_accept_tls(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- _: (),
-) -> Result<OpConn, AnyError> {
- let resource = state
- .borrow()
- .resource_table
- .get::<TlsListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
-
- let cancel_handle = RcRef::map(&resource, |r| &r.cancel_handle);
- let tcp_listener = RcRef::map(&resource, |r| &r.tcp_listener)
- .try_borrow_mut()
- .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
-
- let (tcp_stream, remote_addr) =
- match tcp_listener.accept().try_or_cancel(&cancel_handle).await {
- Ok(tuple) => tuple,
- Err(err) if err.kind() == ErrorKind::Interrupted => {
- // FIXME(bartlomieju): compatibility with current JS implementation.
- return Err(bad_resource("Listener has been closed"));
- }
- Err(err) => return Err(err.into()),
- };
-
- let local_addr = tcp_stream.local_addr()?;
-
- let tls_stream = TlsStream::new_server_side(tcp_stream, &resource.tls_config);
-
- let rid = {
- let mut state_ = state.borrow_mut();
- state_
- .resource_table
- .add(TlsStreamResource::new(tls_stream.into_split()))
- };
-
- Ok(OpConn {
- rid,
- local_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: local_addr.ip().to_string(),
- port: local_addr.port(),
- })),
- remote_addr: Some(OpAddr::Tcp(IpAddr {
- hostname: remote_addr.ip().to_string(),
- port: remote_addr.port(),
- })),
- })
-}