summaryrefslogtreecommitdiff
path: root/runtime/ops/net.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-12-16 17:14:12 +0100
committerGitHub <noreply@github.com>2020-12-16 17:14:12 +0100
commit6984b63f2f3c8d0819fe2dced8252a81f3400ae7 (patch)
tree5201bc962f913927409ae2770aca48ffa3aaaa34 /runtime/ops/net.rs
parent9fe26f8ca189ac81d9c20c454b9dbfa5e1011c3f (diff)
refactor: rewrite ops to use ResourceTable2 (#8512)
This commit migrates all ops to use new resource table and "AsyncRefCell". Old implementation of resource table was completely removed and all code referencing it was updated to use new system.
Diffstat (limited to 'runtime/ops/net.rs')
-rw-r--r--runtime/ops/net.rs255
1 files changed, 109 insertions, 146 deletions
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs
index 8770ef103..a4bda585b 100644
--- a/runtime/ops/net.rs
+++ b/runtime/ops/net.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use crate::ops::io::StreamResource;
-use crate::ops::io::StreamResourceHolder;
+use crate::ops::io::FullDuplexResource;
+use crate::ops::io::TcpStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync;
@@ -11,21 +11,24 @@ use deno_core::error::custom_error;
use deno_core::error::generic_error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
-use std::task::Context;
-use std::task::Poll;
+use tokio::net::udp;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
@@ -33,12 +36,14 @@ use tokio::net::UdpSocket;
#[cfg(unix)]
use super::net_unix;
#[cfg(unix)]
+use crate::ops::io::StreamResource;
+#[cfg(unix)]
use std::path::Path;
pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_async(rt, "op_accept", op_accept);
super::reg_json_async(rt, "op_connect", op_connect);
- super::reg_json_sync(rt, "op_shutdown", op_shutdown);
+ super::reg_json_async(rt, "op_shutdown", op_shutdown);
super::reg_json_sync(rt, "op_listen", op_listen);
super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive);
super::reg_json_async(rt, "op_datagram_send", op_datagram_send);
@@ -57,39 +62,31 @@ async fn accept_tcp(
) -> Result<Value, AnyError> {
let rid = args.rid as u32;
- let accept_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let listener_resource = state
- .resource_table
- .get_mut::<TcpListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = &mut listener_resource.listener;
- match listener.poll_accept(cx).map_err(AnyError::from) {
- Poll::Ready(Ok((stream, addr))) => {
- listener_resource.untrack_task();
- Poll::Ready(Ok((stream, addr)))
- }
- Poll::Pending => {
- listener_resource.track_task(cx)?;
- Poll::Pending
- }
- Poll::Ready(Err(e)) => {
- listener_resource.untrack_task();
- Poll::Ready(Err(e))
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TcpListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let mut listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (tcp_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await.map_err(|e| {
+ // FIXME(bartlomieju): compatibility with current JS implementation
+ if let std::io::ErrorKind::Interrupted = e.kind() {
+ bad_resource("Listener has been closed")
+ } else {
+ e.into()
}
- }
- });
- let (tcp_stream, _socket_addr) = accept_fut.await?;
+ })?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state.borrow_mut();
- let rid = state.resource_table.add(
- "tcpStream",
- Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
- tcp_stream,
- )))),
- );
+ let rid = state
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
Ok(json!({
"rid": rid,
"localAddr": {
@@ -138,18 +135,17 @@ async fn receive_udp(
let rid = args.rid as u32;
- let receive_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let resource = state
- .resource_table
- .get_mut::<UdpSocketResource>(rid)
- .ok_or_else(|| bad_resource("Socket has been closed"))?;
- let socket = &mut resource.socket;
- socket
- .poll_recv_from(cx, &mut zero_copy)
- .map_err(AnyError::from)
- });
- let (size, remote_addr) = receive_fut.await?;
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let (size, remote_addr) = resource
+ .rd_borrow_mut()
+ .await
+ .recv_from(&mut zero_copy)
+ .try_or_cancel(resource.cancel_handle())
+ .await?;
Ok(json!({
"size": size,
"remoteAddr": {
@@ -207,19 +203,18 @@ async fn op_datagram_send(
.check_net(&args.hostname, args.port)?;
}
let addr = resolve_addr(&args.hostname, args.port).await?;
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let resource = state
- .resource_table
- .get_mut::<UdpSocketResource>(rid as u32)
- .ok_or_else(|| bad_resource("Socket has been closed"))?;
- resource
- .socket
- .poll_send_to(cx, &zero_copy, &addr)
- .map_ok(|byte_length| json!(byte_length))
- .map_err(AnyError::from)
- })
- .await
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid as u32)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let byte_length = resource
+ .wr_borrow_mut()
+ .await
+ .send_to(&zero_copy, &addr)
+ .await?;
+ Ok(json!(byte_length))
}
#[cfg(unix)]
SendArgs {
@@ -232,18 +227,17 @@ async fn op_datagram_send(
let s = state.borrow();
s.borrow::<Permissions>().check_write(&address_path)?;
}
- let mut state = state.borrow_mut();
let resource = state
+ .borrow()
.resource_table
- .get_mut::<net_unix::UnixDatagramResource>(rid as u32)
+ .get::<net_unix::UnixDatagramResource>(rid as u32)
.ok_or_else(|| {
custom_error("NotConnected", "Socket has been closed")
})?;
- let socket = &mut resource.socket;
- let byte_length = socket
- .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
- .await?;
-
+ let mut socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let byte_length = socket.send_to(&zero_copy, address_path).await?;
Ok(json!(byte_length))
}
_ => Err(type_error("Wrong argument format!")),
@@ -279,12 +273,9 @@ async fn op_connect(
let remote_addr = tcp_stream.peer_addr()?;
let mut state_ = state.borrow_mut();
- let rid = state_.resource_table.add(
- "tcpStream",
- Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
- tcp_stream,
- )))),
- );
+ let rid = state_
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
Ok(json!({
"rid": rid,
"localAddr": {
@@ -317,12 +308,8 @@ async fn op_connect(
let remote_addr = unix_stream.peer_addr()?;
let mut state_ = state.borrow_mut();
- let rid = state_.resource_table.add(
- "unixStream",
- Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
- unix_stream,
- ))),
- );
+ let resource = StreamResource::unix_stream(unix_stream);
+ let rid = state_.resource_table.add(resource);
Ok(json!({
"rid": rid,
"localAddr": {
@@ -345,12 +332,12 @@ struct ShutdownArgs {
how: i32,
}
-fn op_shutdown(
- state: &mut OpState,
+async fn op_shutdown(
+ state: Rc<RefCell<OpState>>,
args: Value,
- _zero_copy: &mut [ZeroCopyBuf],
+ _zero_copy: BufVec,
) -> Result<Value, AnyError> {
- super::check_unstable(state, "Deno.shutdown");
+ super::check_unstable2(&state, "Deno.shutdown");
let args: ShutdownArgs = serde_json::from_value(args)?;
@@ -358,80 +345,61 @@ fn op_shutdown(
let how = args.how;
let shutdown_mode = match how {
- 0 => Shutdown::Read,
+ 0 => Shutdown::Read, // TODO: nonsense, remove me.
1 => Shutdown::Write,
_ => unimplemented!(),
};
- let resource_holder = state
+ let resource = state
+ .borrow()
.resource_table
- .get_mut::<StreamResourceHolder>(rid)
+ .get_any(rid)
.ok_or_else(bad_resource_id)?;
- match resource_holder.resource {
- StreamResource::TcpStream(Some(ref mut stream)) => {
- TcpStream::shutdown(stream, shutdown_mode)?;
- }
- #[cfg(unix)]
- StreamResource::UnixStream(ref mut stream) => {
- net_unix::UnixStream::shutdown(stream, shutdown_mode)?;
+ if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() {
+ let wr = stream.wr_borrow_mut().await;
+ TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?;
+ return Ok(json!({}));
+ }
+
+ #[cfg(unix)]
+ if let Some(stream) = resource.downcast_rc::<StreamResource>() {
+ if stream.unix_stream.is_some() {
+ let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?;
+ return Ok(json!({}));
}
- _ => return Err(bad_resource_id()),
}
- Ok(json!({}))
+ Err(bad_resource_id())
}
-#[allow(dead_code)]
struct TcpListenerResource {
- listener: TcpListener,
- waker: Option<futures::task::AtomicWaker>,
- local_addr: SocketAddr,
+ listener: AsyncRefCell<TcpListener>,
+ cancel: CancelHandle,
}
-impl Drop for TcpListenerResource {
- fn drop(&mut self) {
- self.wake_task();
+impl Resource for TcpListenerResource {
+ fn name(&self) -> Cow<str> {
+ "tcpListener".into()
}
-}
-
-impl TcpListenerResource {
- /// Track the current task so future awaiting for connection
- /// can be notified when listener is closed.
- ///
- /// Throws an error if another task is already tracked.
- pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
- // Currently, we only allow tracking a single accept task for a listener.
- // This might be changed in the future with multiple workers.
- // Caveat: TcpListener by itself also only tracks an accept task at a time.
- // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
- if self.waker.is_some() {
- return Err(custom_error("Busy", "Another accept task is ongoing"));
- }
- let waker = futures::task::AtomicWaker::new();
- waker.register(cx.waker());
- self.waker.replace(waker);
- Ok(())
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
}
+}
- /// Notifies a task when listener is closed so accept future can resolve.
- pub fn wake_task(&mut self) {
- if let Some(waker) = self.waker.as_ref() {
- waker.wake();
- }
- }
+type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>;
- /// Stop tracking a task.
- /// Happens when the task is done and thus no further tracking is needed.
- pub fn untrack_task(&mut self) {
- if self.waker.is_some() {
- self.waker.take();
- }
+impl Resource for UdpSocketResource {
+ fn name(&self) -> Cow<str> {
+ "udpSocket".into()
}
-}
-struct UdpSocketResource {
- socket: UdpSocket,
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops()
+ }
}
#[derive(Deserialize)]
@@ -463,13 +431,10 @@ fn listen_tcp(
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let listener_resource = TcpListenerResource {
- listener,
- waker: None,
- local_addr,
+ listener: AsyncRefCell::new(listener),
+ cancel: Default::default(),
};
- let rid = state
- .resource_table
- .add("tcpListener", Box::new(listener_resource));
+ let rid = state.resource_table.add(listener_resource);
Ok((rid, local_addr))
}
@@ -481,10 +446,8 @@ fn listen_udp(
let std_socket = std::net::UdpSocket::bind(&addr)?;
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
- let socket_resource = UdpSocketResource { socket };
- let rid = state
- .resource_table
- .add("udpSocket", Box::new(socket_resource));
+ let socket_resource = UdpSocketResource::new(socket.split());
+ let rid = state.resource_table.add(socket_resource);
Ok((rid, local_addr))
}