summaryrefslogtreecommitdiff
path: root/runtime/ops/net_unix.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops/net_unix.rs')
-rw-r--r--runtime/ops/net_unix.rs106
1 files changed, 60 insertions, 46 deletions
diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs
index 4c416a5a4..23981a7f1 100644
--- a/runtime/ops/net_unix.rs
+++ b/runtime/ops/net_unix.rs
@@ -1,34 +1,59 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::ops::io::StreamResource;
-use crate::ops::io::StreamResourceHolder;
use crate::ops::net::AcceptArgs;
use crate::ops::net::ReceiveArgs;
use deno_core::error::bad_resource;
+use deno_core::error::custom_error;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::fs::remove_file;
use std::os::unix;
use std::path::Path;
use std::rc::Rc;
-use std::task::Poll;
use tokio::net::UnixDatagram;
use tokio::net::UnixListener;
pub use tokio::net::UnixStream;
struct UnixListenerResource {
- listener: UnixListener,
+ listener: AsyncRefCell<UnixListener>,
+ cancel: CancelHandle,
+}
+
+impl Resource for UnixListenerResource {
+ fn name(&self) -> Cow<str> {
+ "unixListener".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
}
pub struct UnixDatagramResource {
- pub socket: UnixDatagram,
- pub local_addr: unix::net::SocketAddr,
+ pub socket: AsyncRefCell<UnixDatagram>,
+ pub cancel: CancelHandle,
+}
+
+impl Resource for UnixDatagramResource {
+ fn name(&self) -> Cow<str> {
+ "unixDatagram".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
}
#[derive(Deserialize)]
@@ -43,38 +68,23 @@ pub(crate) async fn accept_unix(
) -> Result<Value, AnyError> {
let rid = args.rid as u32;
- let accept_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let listener_resource = state
- .resource_table
- .get_mut::<UnixListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = &mut listener_resource.listener;
- use deno_core::futures::StreamExt;
- match listener.poll_next_unpin(cx) {
- Poll::Ready(Some(stream)) => {
- //listener_resource.untrack_task();
- Poll::Ready(stream)
- }
- Poll::Ready(None) => todo!(),
- Poll::Pending => {
- //listener_resource.track_task(cx)?;
- Poll::Pending
- }
- }
- .map_err(AnyError::from)
- });
- let unix_stream = accept_fut.await?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<UnixListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let mut listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Listener already in use"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (unix_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
+ let resource = StreamResource::unix_stream(unix_stream);
let mut state = state.borrow_mut();
- let rid = state.resource_table.add(
- "unixStream",
- Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
- unix_stream,
- ))),
- );
+ let rid = state.resource_table.add(resource);
Ok(json!({
"rid": rid,
"localAddr": {
@@ -98,12 +108,17 @@ pub(crate) async fn receive_unix_packet(
let rid = args.rid as u32;
let mut buf = bufs.into_iter().next().unwrap();
- let mut state = state.borrow_mut();
let resource = state
+ .borrow()
.resource_table
- .get_mut::<UnixDatagramResource>(rid)
+ .get::<UnixDatagramResource>(rid)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?;
+ let mut socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (size, remote_addr) =
+ socket.recv_from(&mut buf).try_or_cancel(cancel).await?;
Ok(json!({
"size": size,
"remoteAddr": {
@@ -122,10 +137,11 @@ pub fn listen_unix(
}
let listener = UnixListener::bind(&addr)?;
let local_addr = listener.local_addr()?;
- let listener_resource = UnixListenerResource { listener };
- let rid = state
- .resource_table
- .add("unixListener", Box::new(listener_resource));
+ let listener_resource = UnixListenerResource {
+ listener: AsyncRefCell::new(listener),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(listener_resource);
Ok((rid, local_addr))
}
@@ -140,12 +156,10 @@ pub fn listen_unix_packet(
let socket = UnixDatagram::bind(&addr)?;
let local_addr = socket.local_addr()?;
let datagram_resource = UnixDatagramResource {
- socket,
- local_addr: local_addr.clone(),
+ socket: AsyncRefCell::new(socket),
+ cancel: Default::default(),
};
- let rid = state
- .resource_table
- .add("unixDatagram", Box::new(datagram_resource));
+ let rid = state.resource_table.add(datagram_resource);
Ok((rid, local_addr))
}