diff options
Diffstat (limited to 'runtime/ops/net_unix.rs')
-rw-r--r-- | runtime/ops/net_unix.rs | 106 |
1 files changed, 60 insertions, 46 deletions
diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs index 4c416a5a4..23981a7f1 100644 --- a/runtime/ops/net_unix.rs +++ b/runtime/ops/net_unix.rs @@ -1,34 +1,59 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::ops::io::StreamResource; -use crate::ops::io::StreamResourceHolder; use crate::ops::net::AcceptArgs; use crate::ops::net::ReceiveArgs; use deno_core::error::bad_resource; +use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use serde::Deserialize; +use std::borrow::Cow; use std::cell::RefCell; use std::fs::remove_file; use std::os::unix; use std::path::Path; use std::rc::Rc; -use std::task::Poll; use tokio::net::UnixDatagram; use tokio::net::UnixListener; pub use tokio::net::UnixStream; struct UnixListenerResource { - listener: UnixListener, + listener: AsyncRefCell<UnixListener>, + cancel: CancelHandle, +} + +impl Resource for UnixListenerResource { + fn name(&self) -> Cow<str> { + "unixListener".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } } pub struct UnixDatagramResource { - pub socket: UnixDatagram, - pub local_addr: unix::net::SocketAddr, + pub socket: AsyncRefCell<UnixDatagram>, + pub cancel: CancelHandle, +} + +impl Resource for UnixDatagramResource { + fn name(&self) -> Cow<str> { + "unixDatagram".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } } #[derive(Deserialize)] @@ -43,38 +68,23 @@ pub(crate) async fn accept_unix( ) -> Result<Value, AnyError> { let rid = args.rid as u32; - let accept_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<UnixListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - use deno_core::futures::StreamExt; - match listener.poll_next_unpin(cx) { - Poll::Ready(Some(stream)) => { - //listener_resource.untrack_task(); - Poll::Ready(stream) - } - Poll::Ready(None) => todo!(), - Poll::Pending => { - //listener_resource.track_task(cx)?; - Poll::Pending - } - } - .map_err(AnyError::from) - }); - let unix_stream = accept_fut.await?; + let resource = state + .borrow() + .resource_table + .get::<UnixListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let mut listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Listener already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (unix_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await?; let local_addr = unix_stream.local_addr()?; let remote_addr = unix_stream.peer_addr()?; + let resource = StreamResource::unix_stream(unix_stream); let mut state = state.borrow_mut(); - let rid = state.resource_table.add( - "unixStream", - Box::new(StreamResourceHolder::new(StreamResource::UnixStream( - unix_stream, - ))), - ); + let rid = state.resource_table.add(resource); Ok(json!({ "rid": rid, "localAddr": { @@ -98,12 +108,17 @@ pub(crate) async fn receive_unix_packet( let rid = args.rid as u32; let mut buf = bufs.into_iter().next().unwrap(); - let mut state = state.borrow_mut(); let resource = state + .borrow() .resource_table - .get_mut::<UnixDatagramResource>(rid) + .get::<UnixDatagramResource>(rid) .ok_or_else(|| bad_resource("Socket has been closed"))?; - let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?; + let mut socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (size, remote_addr) = + socket.recv_from(&mut buf).try_or_cancel(cancel).await?; Ok(json!({ "size": size, "remoteAddr": { @@ -122,10 +137,11 @@ pub fn listen_unix( } let listener = UnixListener::bind(&addr)?; let local_addr = listener.local_addr()?; - let listener_resource = UnixListenerResource { listener }; - let rid = state - .resource_table - .add("unixListener", Box::new(listener_resource)); + let listener_resource = UnixListenerResource { + listener: AsyncRefCell::new(listener), + cancel: Default::default(), + }; + let rid = state.resource_table.add(listener_resource); Ok((rid, local_addr)) } @@ -140,12 +156,10 @@ pub fn listen_unix_packet( let socket = UnixDatagram::bind(&addr)?; let local_addr = socket.local_addr()?; let datagram_resource = UnixDatagramResource { - socket, - local_addr: local_addr.clone(), + socket: AsyncRefCell::new(socket), + cancel: Default::default(), }; - let rid = state - .resource_table - .add("unixDatagram", Box::new(datagram_resource)); + let rid = state.resource_table.add(datagram_resource); Ok((rid, local_addr)) } |