diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2020-09-10 09:57:45 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-10 09:57:45 -0400 |
commit | 7c2e7c660804afca823d60e6496aa853f75db16c (patch) | |
tree | b7746b181c1564c6b1abd2e906662f9e6b008417 /cli/ops/net_unix.rs | |
parent | 6f70e6e72ba2d5c1de7495adac37c1e4f4e86b24 (diff) |
Use gotham-like state for ops (#7385)
Provides a concrete state type that can be dynamically added. This is necessary for op crates.
* renames BasicState to OpState
* async ops take `Rc<RefCell<OpState>>`
* sync ops take `&mut OpState`
* removes `OpRegistry`, `OpRouter` traits
* `get_error_class_fn` moved to OpState
* ResourceTable moved to OpState
Diffstat (limited to 'cli/ops/net_unix.rs')
-rw-r--r-- | cli/ops/net_unix.rs | 52 |
1 files changed, 34 insertions, 18 deletions
diff --git a/cli/ops/net_unix.rs b/cli/ops/net_unix.rs index a73db89b2..5cdb451dc 100644 --- a/cli/ops/net_unix.rs +++ b/cli/ops/net_unix.rs @@ -2,15 +2,18 @@ use crate::ops::io::StreamResource; use crate::ops::io::StreamResourceHolder; use crate::ops::net::AcceptArgs; use crate::ops::net::ReceiveArgs; -use crate::state::State; use deno_core::BufVec; use deno_core::ErrBox; +use deno_core::OpState; +use futures::future::poll_fn; use serde_derive::Deserialize; use serde_json::Value; +use std::cell::RefCell; use std::fs::remove_file; use std::os::unix; pub use std::path::Path; use std::rc::Rc; +use std::task::Poll; use tokio::net::UnixDatagram; use tokio::net::UnixListener; pub use tokio::net::UnixStream; @@ -30,25 +33,39 @@ pub struct UnixListenArgs { } pub(crate) async fn accept_unix( - state: Rc<State>, + state: Rc<RefCell<OpState>>, args: AcceptArgs, _bufs: BufVec, ) -> Result<Value, ErrBox> { let rid = args.rid as u32; - let mut resource_table_ = state.resource_table.borrow_mut(); - let listener_resource = { - resource_table_ + let accept_fut = poll_fn(|cx| { + let mut state = state.borrow_mut(); + let listener_resource = state + .resource_table .get_mut::<UnixListenerResource>(rid) - .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))? - }; - let (unix_stream, _socket_addr) = listener_resource.listener.accept().await?; - drop(resource_table_); + .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?; + let listener = &mut listener_resource.listener; + use futures::StreamExt; + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(stream)) => { + //listener_resource.untrack_task(); + Poll::Ready(stream) + } + Poll::Ready(None) => todo!(), + Poll::Pending => { + //listener_resource.track_task(cx)?; + Poll::Pending + } + } + .map_err(ErrBox::from) + }); + let unix_stream = accept_fut.await?; let local_addr = unix_stream.local_addr()?; let remote_addr = unix_stream.peer_addr()?; - let mut resource_table_ = state.resource_table.borrow_mut(); - let rid = resource_table_.add( + let mut state = state.borrow_mut(); + let rid = state.resource_table.add( "unixStream", Box::new(StreamResourceHolder::new(StreamResource::UnixStream( unix_stream, @@ -68,7 +85,7 @@ pub(crate) async fn accept_unix( } pub(crate) async fn receive_unix_packet( - state: Rc<State>, + state: Rc<RefCell<OpState>>, args: ReceiveArgs, bufs: BufVec, ) -> Result<Value, ErrBox> { @@ -77,8 +94,9 @@ pub(crate) async fn receive_unix_packet( let rid = args.rid as u32; let mut buf = bufs.into_iter().next().unwrap(); - let mut resource_table_ = state.resource_table.borrow_mut(); - let resource = resource_table_ + let mut state = state.borrow_mut(); + let resource = state + .resource_table .get_mut::<UnixDatagramResource>(rid) .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?; let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?; @@ -92,7 +110,7 @@ pub(crate) async fn receive_unix_packet( } pub fn listen_unix( - state: &State, + state: &mut OpState, addr: &Path, ) -> Result<(u32, unix::net::SocketAddr), ErrBox> { if addr.exists() { @@ -103,14 +121,13 @@ pub fn listen_unix( let listener_resource = UnixListenerResource { listener }; let rid = state .resource_table - .borrow_mut() .add("unixListener", Box::new(listener_resource)); Ok((rid, local_addr)) } pub fn listen_unix_packet( - state: &State, + state: &mut OpState, addr: &Path, ) -> Result<(u32, unix::net::SocketAddr), ErrBox> { if addr.exists() { @@ -124,7 +141,6 @@ pub fn listen_unix_packet( }; let rid = state .resource_table - .borrow_mut() .add("unixDatagram", Box::new(datagram_resource)); Ok((rid, local_addr)) |