diff options
Diffstat (limited to 'runtime/ops/net.rs')
-rw-r--r-- | runtime/ops/net.rs | 255 |
1 files changed, 109 insertions, 146 deletions
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs index 8770ef103..a4bda585b 100644 --- a/runtime/ops/net.rs +++ b/runtime/ops/net.rs @@ -1,7 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use crate::ops::io::StreamResource; -use crate::ops::io::StreamResourceHolder; +use crate::ops::io::FullDuplexResource; +use crate::ops::io::TcpStreamResource; use crate::permissions::Permissions; use crate::resolve_addr::resolve_addr; use crate::resolve_addr::resolve_addr_sync; @@ -11,21 +11,24 @@ use deno_core::error::custom_error; use deno_core::error::generic_error; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::future::poll_fn; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; use serde::Deserialize; +use std::borrow::Cow; use std::cell::RefCell; use std::net::Shutdown; use std::net::SocketAddr; use std::rc::Rc; -use std::task::Context; -use std::task::Poll; +use tokio::net::udp; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; @@ -33,12 +36,14 @@ use tokio::net::UdpSocket; #[cfg(unix)] use super::net_unix; #[cfg(unix)] +use crate::ops::io::StreamResource; +#[cfg(unix)] use std::path::Path; pub fn init(rt: &mut deno_core::JsRuntime) { super::reg_json_async(rt, "op_accept", op_accept); super::reg_json_async(rt, "op_connect", op_connect); - super::reg_json_sync(rt, "op_shutdown", op_shutdown); + super::reg_json_async(rt, "op_shutdown", op_shutdown); super::reg_json_sync(rt, "op_listen", op_listen); super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive); super::reg_json_async(rt, "op_datagram_send", op_datagram_send); @@ -57,39 +62,31 @@ async fn accept_tcp( ) -> Result<Value, AnyError> { let rid = args.rid as u32; - let accept_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<TcpListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - match listener.poll_accept(cx).map_err(AnyError::from) { - Poll::Ready(Ok((stream, addr))) => { - listener_resource.untrack_task(); - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => { - listener_resource.track_task(cx)?; - Poll::Pending - } - Poll::Ready(Err(e)) => { - listener_resource.untrack_task(); - Poll::Ready(Err(e)) + let resource = state + .borrow() + .resource_table + .get::<TcpListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let mut listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (tcp_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await.map_err(|e| { + // FIXME(bartlomieju): compatibility with current JS implementation + if let std::io::ErrorKind::Interrupted = e.kind() { + bad_resource("Listener has been closed") + } else { + e.into() } - } - }); - let (tcp_stream, _socket_addr) = accept_fut.await?; + })?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; let mut state = state.borrow_mut(); - let rid = state.resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( - tcp_stream, - )))), - ); + let rid = state + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); Ok(json!({ "rid": rid, "localAddr": { @@ -138,18 +135,17 @@ async fn receive_udp( let rid = args.rid as u32; - let receive_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(rid) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - let socket = &mut resource.socket; - socket - .poll_recv_from(cx, &mut zero_copy) - .map_err(AnyError::from) - }); - let (size, remote_addr) = receive_fut.await?; + let resource = state + .borrow_mut() + .resource_table + .get::<UdpSocketResource>(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let (size, remote_addr) = resource + .rd_borrow_mut() + .await + .recv_from(&mut zero_copy) + .try_or_cancel(resource.cancel_handle()) + .await?; Ok(json!({ "size": size, "remoteAddr": { @@ -207,19 +203,18 @@ async fn op_datagram_send( .check_net(&args.hostname, args.port)?; } let addr = resolve_addr(&args.hostname, args.port).await?; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(rid as u32) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - resource - .socket - .poll_send_to(cx, &zero_copy, &addr) - .map_ok(|byte_length| json!(byte_length)) - .map_err(AnyError::from) - }) - .await + + let resource = state + .borrow_mut() + .resource_table + .get::<UdpSocketResource>(rid as u32) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let byte_length = resource + .wr_borrow_mut() + .await + .send_to(&zero_copy, &addr) + .await?; + Ok(json!(byte_length)) } #[cfg(unix)] SendArgs { @@ -232,18 +227,17 @@ async fn op_datagram_send( let s = state.borrow(); s.borrow::<Permissions>().check_write(&address_path)?; } - let mut state = state.borrow_mut(); let resource = state + .borrow() .resource_table - .get_mut::<net_unix::UnixDatagramResource>(rid as u32) + .get::<net_unix::UnixDatagramResource>(rid as u32) .ok_or_else(|| { custom_error("NotConnected", "Socket has been closed") })?; - let socket = &mut resource.socket; - let byte_length = socket - .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap()) - .await?; - + let mut socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let byte_length = socket.send_to(&zero_copy, address_path).await?; Ok(json!(byte_length)) } _ => Err(type_error("Wrong argument format!")), @@ -279,12 +273,9 @@ async fn op_connect( let remote_addr = tcp_stream.peer_addr()?; let mut state_ = state.borrow_mut(); - let rid = state_.resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( - tcp_stream, - )))), - ); + let rid = state_ + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); Ok(json!({ "rid": rid, "localAddr": { @@ -317,12 +308,8 @@ async fn op_connect( let remote_addr = unix_stream.peer_addr()?; let mut state_ = state.borrow_mut(); - let rid = state_.resource_table.add( - "unixStream", - Box::new(StreamResourceHolder::new(StreamResource::UnixStream( - unix_stream, - ))), - ); + let resource = StreamResource::unix_stream(unix_stream); + let rid = state_.resource_table.add(resource); Ok(json!({ "rid": rid, "localAddr": { @@ -345,12 +332,12 @@ struct ShutdownArgs { how: i32, } -fn op_shutdown( - state: &mut OpState, +async fn op_shutdown( + state: Rc<RefCell<OpState>>, args: Value, - _zero_copy: &mut [ZeroCopyBuf], + _zero_copy: BufVec, ) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.shutdown"); + super::check_unstable2(&state, "Deno.shutdown"); let args: ShutdownArgs = serde_json::from_value(args)?; @@ -358,80 +345,61 @@ fn op_shutdown( let how = args.how; let shutdown_mode = match how { - 0 => Shutdown::Read, + 0 => Shutdown::Read, // TODO: nonsense, remove me. 1 => Shutdown::Write, _ => unimplemented!(), }; - let resource_holder = state + let resource = state + .borrow() .resource_table - .get_mut::<StreamResourceHolder>(rid) + .get_any(rid) .ok_or_else(bad_resource_id)?; - match resource_holder.resource { - StreamResource::TcpStream(Some(ref mut stream)) => { - TcpStream::shutdown(stream, shutdown_mode)?; - } - #[cfg(unix)] - StreamResource::UnixStream(ref mut stream) => { - net_unix::UnixStream::shutdown(stream, shutdown_mode)?; + if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() { + let wr = stream.wr_borrow_mut().await; + TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?; + return Ok(json!({})); + } + + #[cfg(unix)] + if let Some(stream) = resource.downcast_rc::<StreamResource>() { + if stream.unix_stream.is_some() { + let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap()) + .borrow_mut() + .await; + net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?; + return Ok(json!({})); } - _ => return Err(bad_resource_id()), } - Ok(json!({})) + Err(bad_resource_id()) } -#[allow(dead_code)] struct TcpListenerResource { - listener: TcpListener, - waker: Option<futures::task::AtomicWaker>, - local_addr: SocketAddr, + listener: AsyncRefCell<TcpListener>, + cancel: CancelHandle, } -impl Drop for TcpListenerResource { - fn drop(&mut self) { - self.wake_task(); +impl Resource for TcpListenerResource { + fn name(&self) -> Cow<str> { + "tcpListener".into() } -} - -impl TcpListenerResource { - /// Track the current task so future awaiting for connection - /// can be notified when listener is closed. - /// - /// Throws an error if another task is already tracked. - pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> { - // Currently, we only allow tracking a single accept task for a listener. - // This might be changed in the future with multiple workers. - // Caveat: TcpListener by itself also only tracks an accept task at a time. - // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if self.waker.is_some() { - return Err(custom_error("Busy", "Another accept task is ongoing")); - } - let waker = futures::task::AtomicWaker::new(); - waker.register(cx.waker()); - self.waker.replace(waker); - Ok(()) + fn close(self: Rc<Self>) { + self.cancel.cancel(); } +} - /// Notifies a task when listener is closed so accept future can resolve. - pub fn wake_task(&mut self) { - if let Some(waker) = self.waker.as_ref() { - waker.wake(); - } - } +type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>; - /// Stop tracking a task. - /// Happens when the task is done and thus no further tracking is needed. - pub fn untrack_task(&mut self) { - if self.waker.is_some() { - self.waker.take(); - } +impl Resource for UdpSocketResource { + fn name(&self) -> Cow<str> { + "udpSocket".into() } -} -struct UdpSocketResource { - socket: UdpSocket, + fn close(self: Rc<Self>) { + self.cancel_read_ops() + } } #[derive(Deserialize)] @@ -463,13 +431,10 @@ fn listen_tcp( let listener = TcpListener::from_std(std_listener)?; let local_addr = listener.local_addr()?; let listener_resource = TcpListenerResource { - listener, - waker: None, - local_addr, + listener: AsyncRefCell::new(listener), + cancel: Default::default(), }; - let rid = state - .resource_table - .add("tcpListener", Box::new(listener_resource)); + let rid = state.resource_table.add(listener_resource); Ok((rid, local_addr)) } @@ -481,10 +446,8 @@ fn listen_udp( let std_socket = std::net::UdpSocket::bind(&addr)?; let socket = UdpSocket::from_std(std_socket)?; let local_addr = socket.local_addr()?; - let socket_resource = UdpSocketResource { socket }; - let rid = state - .resource_table - .add("udpSocket", Box::new(socket_resource)); + let socket_resource = UdpSocketResource::new(socket.split()); + let rid = state.resource_table.add(socket_resource); Ok((rid, local_addr)) } |