diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-12-16 17:14:12 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-16 17:14:12 +0100 |
commit | 6984b63f2f3c8d0819fe2dced8252a81f3400ae7 (patch) | |
tree | 5201bc962f913927409ae2770aca48ffa3aaaa34 /runtime/ops/net_unix.rs | |
parent | 9fe26f8ca189ac81d9c20c454b9dbfa5e1011c3f (diff) |
refactor: rewrite ops to use ResourceTable2 (#8512)
This commit migrates all ops to use new resource table
and "AsyncRefCell".
Old implementation of resource table was completely
removed and all code referencing it was updated to use
new system.
Diffstat (limited to 'runtime/ops/net_unix.rs')
-rw-r--r-- | runtime/ops/net_unix.rs | 106 |
1 files changed, 60 insertions, 46 deletions
diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs index 4c416a5a4..23981a7f1 100644 --- a/runtime/ops/net_unix.rs +++ b/runtime/ops/net_unix.rs @@ -1,34 +1,59 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::ops::io::StreamResource; -use crate::ops::io::StreamResourceHolder; use crate::ops::net::AcceptArgs; use crate::ops::net::ReceiveArgs; use deno_core::error::bad_resource; +use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use serde::Deserialize; +use std::borrow::Cow; use std::cell::RefCell; use std::fs::remove_file; use std::os::unix; use std::path::Path; use std::rc::Rc; -use std::task::Poll; use tokio::net::UnixDatagram; use tokio::net::UnixListener; pub use tokio::net::UnixStream; struct UnixListenerResource { - listener: UnixListener, + listener: AsyncRefCell<UnixListener>, + cancel: CancelHandle, +} + +impl Resource for UnixListenerResource { + fn name(&self) -> Cow<str> { + "unixListener".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } } pub struct UnixDatagramResource { - pub socket: UnixDatagram, - pub local_addr: unix::net::SocketAddr, + pub socket: AsyncRefCell<UnixDatagram>, + pub cancel: CancelHandle, +} + +impl Resource for UnixDatagramResource { + fn name(&self) -> Cow<str> { + "unixDatagram".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } } #[derive(Deserialize)] @@ -43,38 +68,23 @@ pub(crate) async fn accept_unix( ) -> Result<Value, AnyError> { let rid = args.rid as u32; - let accept_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<UnixListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - use deno_core::futures::StreamExt; - match listener.poll_next_unpin(cx) { - Poll::Ready(Some(stream)) => { - //listener_resource.untrack_task(); - Poll::Ready(stream) - } - Poll::Ready(None) => todo!(), - Poll::Pending => { - //listener_resource.track_task(cx)?; - Poll::Pending - } - } - .map_err(AnyError::from) - }); - let unix_stream = accept_fut.await?; + let resource = state + .borrow() + .resource_table + .get::<UnixListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let mut listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Listener already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (unix_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await?; let local_addr = unix_stream.local_addr()?; let remote_addr = unix_stream.peer_addr()?; + let resource = StreamResource::unix_stream(unix_stream); let mut state = state.borrow_mut(); - let rid = state.resource_table.add( - "unixStream", - Box::new(StreamResourceHolder::new(StreamResource::UnixStream( - unix_stream, - ))), - ); + let rid = state.resource_table.add(resource); Ok(json!({ "rid": rid, "localAddr": { @@ -98,12 +108,17 @@ pub(crate) async fn receive_unix_packet( let rid = args.rid as u32; let mut buf = bufs.into_iter().next().unwrap(); - let mut state = state.borrow_mut(); let resource = state + .borrow() .resource_table - .get_mut::<UnixDatagramResource>(rid) + .get::<UnixDatagramResource>(rid) .ok_or_else(|| bad_resource("Socket has been closed"))?; - let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?; + let mut socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (size, remote_addr) = + socket.recv_from(&mut buf).try_or_cancel(cancel).await?; Ok(json!({ "size": size, "remoteAddr": { @@ -122,10 +137,11 @@ pub fn listen_unix( } let listener = UnixListener::bind(&addr)?; let local_addr = listener.local_addr()?; - let listener_resource = UnixListenerResource { listener }; - let rid = state - .resource_table - .add("unixListener", Box::new(listener_resource)); + let listener_resource = UnixListenerResource { + listener: AsyncRefCell::new(listener), + cancel: Default::default(), + }; + let rid = state.resource_table.add(listener_resource); Ok((rid, local_addr)) } @@ -140,12 +156,10 @@ pub fn listen_unix_packet( let socket = UnixDatagram::bind(&addr)?; let local_addr = socket.local_addr()?; let datagram_resource = UnixDatagramResource { - socket, - local_addr: local_addr.clone(), + socket: AsyncRefCell::new(socket), + cancel: Default::default(), }; - let rid = state - .resource_table - .add("unixDatagram", Box::new(datagram_resource)); + let rid = state.resource_table.add(datagram_resource); Ok((rid, local_addr)) } |