summaryrefslogtreecommitdiff
path: root/cli/ops/net.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r--cli/ops/net.rs75
1 files changed, 42 insertions, 33 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 67a201460..7dd9082b6 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -1,9 +1,15 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use crate::ops::io::{StreamResource, StreamResourceHolder};
+use crate::ops::io::StreamResource;
+use crate::ops::io::StreamResourceHolder;
use crate::resolve_addr::resolve_addr;
+use deno_core::error::bad_resource;
+use deno_core::error::bad_resource_id;
+use deno_core::error::custom_error;
+use deno_core::error::generic_error;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
use deno_core::BufVec;
-use deno_core::ErrBox;
use deno_core::OpState;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
@@ -21,6 +27,8 @@ use tokio::net::UdpSocket;
#[cfg(unix)]
use super::net_unix;
+#[cfg(unix)]
+use std::path::Path;
pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_async(rt, "op_accept", op_accept);
@@ -41,7 +49,7 @@ async fn accept_tcp(
state: Rc<RefCell<OpState>>,
args: AcceptArgs,
_zero_copy: BufVec,
-) -> Result<Value, ErrBox> {
+) -> Result<Value, AnyError> {
let rid = args.rid as u32;
let accept_fut = poll_fn(|cx| {
@@ -49,9 +57,9 @@ async fn accept_tcp(
let listener_resource = state
.resource_table
.get_mut::<TcpListenerResource>(rid)
- .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
let listener = &mut listener_resource.listener;
- match listener.poll_accept(cx).map_err(ErrBox::from) {
+ match listener.poll_accept(cx).map_err(AnyError::from) {
Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
Poll::Ready(Ok((stream, addr)))
@@ -96,13 +104,13 @@ async fn op_accept(
state: Rc<RefCell<OpState>>,
args: Value,
bufs: BufVec,
-) -> Result<Value, ErrBox> {
+) -> Result<Value, AnyError> {
let args: AcceptArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
"tcp" => accept_tcp(state, args, bufs).await,
#[cfg(unix)]
"unix" => net_unix::accept_unix(state, args, bufs).await,
- _ => Err(ErrBox::error(format!(
+ _ => Err(generic_error(format!(
"Unsupported transport protocol {}",
args.transport
))),
@@ -119,7 +127,7 @@ async fn receive_udp(
state: Rc<RefCell<OpState>>,
args: ReceiveArgs,
zero_copy: BufVec,
-) -> Result<Value, ErrBox> {
+) -> Result<Value, AnyError> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let mut zero_copy = zero_copy[0].clone();
@@ -130,11 +138,11 @@ async fn receive_udp(
let resource = state
.resource_table
.get_mut::<UdpSocketResource>(rid)
- .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
let socket = &mut resource.socket;
socket
.poll_recv_from(cx, &mut zero_copy)
- .map_err(ErrBox::from)
+ .map_err(AnyError::from)
});
let (size, remote_addr) = receive_fut.await?;
Ok(json!({
@@ -151,7 +159,7 @@ async fn op_datagram_receive(
state: Rc<RefCell<OpState>>,
args: Value,
zero_copy: BufVec,
-) -> Result<Value, ErrBox> {
+) -> Result<Value, AnyError> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let args: ReceiveArgs = serde_json::from_value(args)?;
@@ -159,7 +167,7 @@ async fn op_datagram_receive(
"udp" => receive_udp(state, args, zero_copy).await,
#[cfg(unix)]
"unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await,
- _ => Err(ErrBox::error(format!(
+ _ => Err(generic_error(format!(
"Unsupported transport protocol {}",
args.transport
))),
@@ -178,7 +186,7 @@ async fn op_datagram_send(
state: Rc<RefCell<OpState>>,
args: Value,
zero_copy: BufVec,
-) -> Result<Value, ErrBox> {
+) -> Result<Value, AnyError> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let zero_copy = zero_copy[0].clone();
let cli_state = super::cli_state2(&state);
@@ -196,12 +204,12 @@ async fn op_datagram_send(
let resource = state
.resource_table
.get_mut::<UdpSocketResource>(rid as u32)
- .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
resource
.socket
.poll_send_to(cx, &zero_copy, &addr)
.map_ok(|byte_length| json!(byte_length))
- .map_err(ErrBox::from)
+ .map_err(AnyError::from)
})
.await
}
@@ -211,13 +219,15 @@ async fn op_datagram_send(
transport,
transport_args: ArgsEnum::Unix(args),
} if transport == "unixpacket" => {
- let address_path = net_unix::Path::new(&args.path);
+ let address_path = Path::new(&args.path);
cli_state.check_read(&address_path)?;
let mut state = state.borrow_mut();
let resource = state
.resource_table
.get_mut::<net_unix::UnixDatagramResource>(rid as u32)
- .ok_or_else(|| ErrBox::new("NotConnected", "Socket has been closed"))?;
+ .ok_or_else(|| {
+ custom_error("NotConnected", "Socket has been closed")
+ })?;
let socket = &mut resource.socket;
let byte_length = socket
.send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
@@ -225,7 +235,7 @@ async fn op_datagram_send(
Ok(json!(byte_length))
}
- _ => Err(ErrBox::type_error("Wrong argument format!")),
+ _ => Err(type_error("Wrong argument format!")),
}
}
@@ -240,7 +250,7 @@ async fn op_connect(
state: Rc<RefCell<OpState>>,
args: Value,
_zero_copy: BufVec,
-) -> Result<Value, ErrBox> {
+) -> Result<Value, AnyError> {
let cli_state = super::cli_state2(&state);
match serde_json::from_value(args)? {
ConnectArgs {
@@ -279,12 +289,11 @@ async fn op_connect(
transport,
transport_args: ArgsEnum::Unix(args),
} if transport == "unix" => {
- let address_path = net_unix::Path::new(&args.path);
+ let address_path = Path::new(&args.path);
cli_state.check_unstable("Deno.connect");
cli_state.check_read(&address_path)?;
let path = args.path;
- let unix_stream =
- net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?;
+ let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
@@ -307,7 +316,7 @@ async fn op_connect(
}
}))
}
- _ => Err(ErrBox::type_error("Wrong argument format!")),
+ _ => Err(type_error("Wrong argument format!")),
}
}
@@ -321,7 +330,7 @@ fn op_shutdown(
state: &mut OpState,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
-) -> Result<Value, ErrBox> {
+) -> Result<Value, AnyError> {
super::cli_state(state).check_unstable("Deno.shutdown");
let args: ShutdownArgs = serde_json::from_value(args)?;
@@ -338,7 +347,7 @@ fn op_shutdown(
let resource_holder = state
.resource_table
.get_mut::<StreamResourceHolder>(rid)
- .ok_or_else(ErrBox::bad_resource_id)?;
+ .ok_or_else(bad_resource_id)?;
match resource_holder.resource {
StreamResource::TcpStream(Some(ref mut stream)) => {
TcpStream::shutdown(stream, shutdown_mode)?;
@@ -347,7 +356,7 @@ fn op_shutdown(
StreamResource::UnixStream(ref mut stream) => {
net_unix::UnixStream::shutdown(stream, shutdown_mode)?;
}
- _ => return Err(ErrBox::bad_resource_id()),
+ _ => return Err(bad_resource_id()),
}
Ok(json!({}))
@@ -371,13 +380,13 @@ impl TcpListenerResource {
/// can be notified when listener is closed.
///
/// Throws an error if another task is already tracked.
- pub fn track_task(&mut self, cx: &Context) -> Result<(), ErrBox> {
+ pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
// Currently, we only allow tracking a single accept task for a listener.
// This might be changed in the future with multiple workers.
// Caveat: TcpListener by itself also only tracks an accept task at a time.
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
if self.waker.is_some() {
- return Err(ErrBox::new("Busy", "Another accept task is ongoing"));
+ return Err(custom_error("Busy", "Another accept task is ongoing"));
}
let waker = futures::task::AtomicWaker::new();
@@ -430,7 +439,7 @@ struct ListenArgs {
fn listen_tcp(
state: &mut OpState,
addr: SocketAddr,
-) -> Result<(u32, SocketAddr), ErrBox> {
+) -> Result<(u32, SocketAddr), AnyError> {
let std_listener = std::net::TcpListener::bind(&addr)?;
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
@@ -449,7 +458,7 @@ fn listen_tcp(
fn listen_udp(
state: &mut OpState,
addr: SocketAddr,
-) -> Result<(u32, SocketAddr), ErrBox> {
+) -> Result<(u32, SocketAddr), AnyError> {
let std_socket = std::net::UdpSocket::bind(&addr)?;
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
@@ -465,7 +474,7 @@ fn op_listen(
state: &mut OpState,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
-) -> Result<Value, ErrBox> {
+) -> Result<Value, AnyError> {
let cli_state = super::cli_state(state);
match serde_json::from_value(args)? {
ListenArgs {
@@ -504,7 +513,7 @@ fn op_listen(
transport,
transport_args: ArgsEnum::Unix(args),
} if transport == "unix" || transport == "unixpacket" => {
- let address_path = net_unix::Path::new(&args.path);
+ let address_path = Path::new(&args.path);
{
if transport == "unix" {
cli_state.check_unstable("Deno.listen");
@@ -534,6 +543,6 @@ fn op_listen(
}))
}
#[cfg(unix)]
- _ => Err(ErrBox::type_error("Wrong argument format!")),
+ _ => Err(type_error("Wrong argument format!")),
}
}