summaryrefslogtreecommitdiff
path: root/cli/ops/net.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r--cli/ops/net.rs117
1 files changed, 66 insertions, 51 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 91a9079d4..67a201460 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -2,14 +2,14 @@
use crate::ops::io::{StreamResource, StreamResourceHolder};
use crate::resolve_addr::resolve_addr;
-use crate::state::State;
use deno_core::BufVec;
use deno_core::ErrBox;
-use deno_core::OpRegistry;
+use deno_core::OpState;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
use serde_derive::Deserialize;
use serde_json::Value;
+use std::cell::RefCell;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
@@ -22,13 +22,13 @@ use tokio::net::UdpSocket;
#[cfg(unix)]
use super::net_unix;
-pub fn init(s: &Rc<State>) {
- s.register_op_json_async("op_accept", op_accept);
- s.register_op_json_async("op_connect", op_connect);
- s.register_op_json_sync("op_shutdown", op_shutdown);
- s.register_op_json_sync("op_listen", op_listen);
- s.register_op_json_async("op_datagram_receive", op_datagram_receive);
- s.register_op_json_async("op_datagram_send", op_datagram_send);
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_async(rt, "op_accept", op_accept);
+ super::reg_json_async(rt, "op_connect", op_connect);
+ super::reg_json_sync(rt, "op_shutdown", op_shutdown);
+ super::reg_json_sync(rt, "op_listen", op_listen);
+ super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive);
+ super::reg_json_async(rt, "op_datagram_send", op_datagram_send);
}
#[derive(Deserialize)]
@@ -38,15 +38,16 @@ pub(crate) struct AcceptArgs {
}
async fn accept_tcp(
- state: Rc<State>,
+ state: Rc<RefCell<OpState>>,
args: AcceptArgs,
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let rid = args.rid as u32;
let accept_fut = poll_fn(|cx| {
- let mut resource_table = state.resource_table.borrow_mut();
- let listener_resource = resource_table
+ let mut state = state.borrow_mut();
+ let listener_resource = state
+ .resource_table
.get_mut::<TcpListenerResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
let listener = &mut listener_resource.listener;
@@ -68,7 +69,9 @@ async fn accept_tcp(
let (tcp_stream, _socket_addr) = accept_fut.await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let rid = state.resource_table.borrow_mut().add(
+
+ let mut state = state.borrow_mut();
+ let rid = state.resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
tcp_stream,
@@ -90,7 +93,7 @@ async fn accept_tcp(
}
async fn op_accept(
- state: Rc<State>,
+ state: Rc<RefCell<OpState>>,
args: Value,
bufs: BufVec,
) -> Result<Value, ErrBox> {
@@ -113,7 +116,7 @@ pub(crate) struct ReceiveArgs {
}
async fn receive_udp(
- state: Rc<State>,
+ state: Rc<RefCell<OpState>>,
args: ReceiveArgs,
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
@@ -123,8 +126,9 @@ async fn receive_udp(
let rid = args.rid as u32;
let receive_fut = poll_fn(|cx| {
- let mut resource_table = state.resource_table.borrow_mut();
- let resource = resource_table
+ let mut state = state.borrow_mut();
+ let resource = state
+ .resource_table
.get_mut::<UdpSocketResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
let socket = &mut resource.socket;
@@ -144,7 +148,7 @@ async fn receive_udp(
}
async fn op_datagram_receive(
- state: Rc<State>,
+ state: Rc<RefCell<OpState>>,
args: Value,
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
@@ -171,12 +175,13 @@ struct SendArgs {
}
async fn op_datagram_send(
- state: Rc<State>,
+ state: Rc<RefCell<OpState>>,
args: Value,
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let zero_copy = zero_copy[0].clone();
+ let cli_state = super::cli_state2(&state);
match serde_json::from_value(args)? {
SendArgs {
@@ -184,11 +189,12 @@ async fn op_datagram_send(
transport,
transport_args: ArgsEnum::Ip(args),
} if transport == "udp" => {
- state.check_net(&args.hostname, args.port)?;
+ cli_state.check_net(&args.hostname, args.port)?;
let addr = resolve_addr(&args.hostname, args.port)?;
poll_fn(move |cx| {
- let mut resource_table = state.resource_table.borrow_mut();
- let resource = resource_table
+ let mut state = state.borrow_mut();
+ let resource = state
+ .resource_table
.get_mut::<UdpSocketResource>(rid as u32)
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
resource
@@ -206,9 +212,10 @@ async fn op_datagram_send(
transport_args: ArgsEnum::Unix(args),
} if transport == "unixpacket" => {
let address_path = net_unix::Path::new(&args.path);
- state.check_read(&address_path)?;
- let mut resource_table = state.resource_table.borrow_mut();
- let resource = resource_table
+ cli_state.check_read(&address_path)?;
+ let mut state = state.borrow_mut();
+ let resource = state
+ .resource_table
.get_mut::<net_unix::UnixDatagramResource>(rid as u32)
.ok_or_else(|| ErrBox::new("NotConnected", "Socket has been closed"))?;
let socket = &mut resource.socket;
@@ -230,21 +237,24 @@ struct ConnectArgs {
}
async fn op_connect(
- state: Rc<State>,
+ state: Rc<RefCell<OpState>>,
args: Value,
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
+ let cli_state = super::cli_state2(&state);
match serde_json::from_value(args)? {
ConnectArgs {
transport,
transport_args: ArgsEnum::Ip(args),
} if transport == "tcp" => {
- state.check_net(&args.hostname, args.port)?;
+ cli_state.check_net(&args.hostname, args.port)?;
let addr = resolve_addr(&args.hostname, args.port)?;
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let rid = state.resource_table.borrow_mut().add(
+
+ let mut state_ = state.borrow_mut();
+ let rid = state_.resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
tcp_stream,
@@ -270,14 +280,16 @@ async fn op_connect(
transport_args: ArgsEnum::Unix(args),
} if transport == "unix" => {
let address_path = net_unix::Path::new(&args.path);
- state.check_unstable("Deno.connect");
- state.check_read(&address_path)?;
+ cli_state.check_unstable("Deno.connect");
+ cli_state.check_read(&address_path)?;
let path = args.path;
let unix_stream =
net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
- let rid = state.resource_table.borrow_mut().add(
+
+ let mut state_ = state.borrow_mut();
+ let rid = state_.resource_table.add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
@@ -306,11 +318,11 @@ struct ShutdownArgs {
}
fn op_shutdown(
- state: &State,
+ state: &mut OpState,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<Value, ErrBox> {
- state.check_unstable("Deno.shutdown");
+ super::cli_state(state).check_unstable("Deno.shutdown");
let args: ShutdownArgs = serde_json::from_value(args)?;
@@ -323,8 +335,8 @@ fn op_shutdown(
_ => unimplemented!(),
};
- let mut resource_table = state.resource_table.borrow_mut();
- let resource_holder = resource_table
+ let resource_holder = state
+ .resource_table
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
match resource_holder.resource {
@@ -416,7 +428,7 @@ struct ListenArgs {
}
fn listen_tcp(
- state: &State,
+ state: &mut OpState,
addr: SocketAddr,
) -> Result<(u32, SocketAddr), ErrBox> {
let std_listener = std::net::TcpListener::bind(&addr)?;
@@ -429,14 +441,13 @@ fn listen_tcp(
};
let rid = state
.resource_table
- .borrow_mut()
.add("tcpListener", Box::new(listener_resource));
Ok((rid, local_addr))
}
fn listen_udp(
- state: &State,
+ state: &mut OpState,
addr: SocketAddr,
) -> Result<(u32, SocketAddr), ErrBox> {
let std_socket = std::net::UdpSocket::bind(&addr)?;
@@ -445,26 +456,28 @@ fn listen_udp(
let socket_resource = UdpSocketResource { socket };
let rid = state
.resource_table
- .borrow_mut()
.add("udpSocket", Box::new(socket_resource));
Ok((rid, local_addr))
}
fn op_listen(
- state: &State,
+ state: &mut OpState,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<Value, ErrBox> {
+ let cli_state = super::cli_state(state);
match serde_json::from_value(args)? {
ListenArgs {
transport,
transport_args: ArgsEnum::Ip(args),
} => {
- if transport == "udp" {
- state.check_unstable("Deno.listenDatagram");
+ {
+ if transport == "udp" {
+ cli_state.check_unstable("Deno.listenDatagram");
+ }
+ cli_state.check_net(&args.hostname, args.port)?;
}
- state.check_net(&args.hostname, args.port)?;
let addr = resolve_addr(&args.hostname, args.port)?;
let (rid, local_addr) = if transport == "tcp" {
listen_tcp(state, addr)?
@@ -491,15 +504,17 @@ fn op_listen(
transport,
transport_args: ArgsEnum::Unix(args),
} if transport == "unix" || transport == "unixpacket" => {
- if transport == "unix" {
- state.check_unstable("Deno.listen");
- }
- if transport == "unixpacket" {
- state.check_unstable("Deno.listenDatagram");
- }
let address_path = net_unix::Path::new(&args.path);
- state.check_read(&address_path)?;
- state.check_write(&address_path)?;
+ {
+ if transport == "unix" {
+ cli_state.check_unstable("Deno.listen");
+ }
+ if transport == "unixpacket" {
+ cli_state.check_unstable("Deno.listenDatagram");
+ }
+ cli_state.check_read(&address_path)?;
+ cli_state.check_write(&address_path)?;
+ }
let (rid, local_addr) = if transport == "unix" {
net_unix::listen_unix(state, &address_path)?
} else {