summaryrefslogtreecommitdiff
path: root/cli/ops/net_unix.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/net_unix.rs')
-rw-r--r--cli/ops/net_unix.rs52
1 files changed, 34 insertions, 18 deletions
diff --git a/cli/ops/net_unix.rs b/cli/ops/net_unix.rs
index a73db89b2..5cdb451dc 100644
--- a/cli/ops/net_unix.rs
+++ b/cli/ops/net_unix.rs
@@ -2,15 +2,18 @@ use crate::ops::io::StreamResource;
use crate::ops::io::StreamResourceHolder;
use crate::ops::net::AcceptArgs;
use crate::ops::net::ReceiveArgs;
-use crate::state::State;
use deno_core::BufVec;
use deno_core::ErrBox;
+use deno_core::OpState;
+use futures::future::poll_fn;
use serde_derive::Deserialize;
use serde_json::Value;
+use std::cell::RefCell;
use std::fs::remove_file;
use std::os::unix;
pub use std::path::Path;
use std::rc::Rc;
+use std::task::Poll;
use tokio::net::UnixDatagram;
use tokio::net::UnixListener;
pub use tokio::net::UnixStream;
@@ -30,25 +33,39 @@ pub struct UnixListenArgs {
}
pub(crate) async fn accept_unix(
- state: Rc<State>,
+ state: Rc<RefCell<OpState>>,
args: AcceptArgs,
_bufs: BufVec,
) -> Result<Value, ErrBox> {
let rid = args.rid as u32;
- let mut resource_table_ = state.resource_table.borrow_mut();
- let listener_resource = {
- resource_table_
+ let accept_fut = poll_fn(|cx| {
+ let mut state = state.borrow_mut();
+ let listener_resource = state
+ .resource_table
.get_mut::<UnixListenerResource>(rid)
- .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?
- };
- let (unix_stream, _socket_addr) = listener_resource.listener.accept().await?;
- drop(resource_table_);
+ .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
+ let listener = &mut listener_resource.listener;
+ use futures::StreamExt;
+ match listener.poll_next_unpin(cx) {
+ Poll::Ready(Some(stream)) => {
+ //listener_resource.untrack_task();
+ Poll::Ready(stream)
+ }
+ Poll::Ready(None) => todo!(),
+ Poll::Pending => {
+ //listener_resource.track_task(cx)?;
+ Poll::Pending
+ }
+ }
+ .map_err(ErrBox::from)
+ });
+ let unix_stream = accept_fut.await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
- let mut resource_table_ = state.resource_table.borrow_mut();
- let rid = resource_table_.add(
+ let mut state = state.borrow_mut();
+ let rid = state.resource_table.add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
@@ -68,7 +85,7 @@ pub(crate) async fn accept_unix(
}
pub(crate) async fn receive_unix_packet(
- state: Rc<State>,
+ state: Rc<RefCell<OpState>>,
args: ReceiveArgs,
bufs: BufVec,
) -> Result<Value, ErrBox> {
@@ -77,8 +94,9 @@ pub(crate) async fn receive_unix_packet(
let rid = args.rid as u32;
let mut buf = bufs.into_iter().next().unwrap();
- let mut resource_table_ = state.resource_table.borrow_mut();
- let resource = resource_table_
+ let mut state = state.borrow_mut();
+ let resource = state
+ .resource_table
.get_mut::<UnixDatagramResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?;
@@ -92,7 +110,7 @@ pub(crate) async fn receive_unix_packet(
}
pub fn listen_unix(
- state: &State,
+ state: &mut OpState,
addr: &Path,
) -> Result<(u32, unix::net::SocketAddr), ErrBox> {
if addr.exists() {
@@ -103,14 +121,13 @@ pub fn listen_unix(
let listener_resource = UnixListenerResource { listener };
let rid = state
.resource_table
- .borrow_mut()
.add("unixListener", Box::new(listener_resource));
Ok((rid, local_addr))
}
pub fn listen_unix_packet(
- state: &State,
+ state: &mut OpState,
addr: &Path,
) -> Result<(u32, unix::net::SocketAddr), ErrBox> {
if addr.exists() {
@@ -124,7 +141,6 @@ pub fn listen_unix_packet(
};
let rid = state
.resource_table
- .borrow_mut()
.add("unixDatagram", Box::new(datagram_resource));
Ok((rid, local_addr))