summaryrefslogtreecommitdiff
path: root/cli/ops/net.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r--cli/ops/net.rs387
1 files changed, 184 insertions, 203 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index d979f44ae..9cb6eb79d 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -1,15 +1,15 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_json::{Deserialize, JsonOp, Value};
+use super::dispatch_json::{Deserialize, Value};
use super::io::{StreamResource, StreamResourceHolder};
use crate::resolve_addr::resolve_addr;
use crate::state::State;
+use deno_core::BufVec;
use deno_core::CoreIsolate;
-use deno_core::CoreIsolateState;
use deno_core::ErrBox;
use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
-use futures::future::FutureExt;
+use std::cell::RefCell;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
@@ -23,15 +23,20 @@ use tokio::net::UdpSocket;
use super::net_unix;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
- i.register_op("op_accept", s.stateful_json_op2(op_accept));
- i.register_op("op_connect", s.stateful_json_op2(op_connect));
- i.register_op("op_shutdown", s.stateful_json_op2(op_shutdown));
- i.register_op("op_listen", s.stateful_json_op2(op_listen));
+ let t = &CoreIsolate::state(i).borrow().resource_table.clone();
+
+ i.register_op("op_accept", s.stateful_json_op_async(t, op_accept));
+ i.register_op("op_connect", s.stateful_json_op_async(t, op_connect));
+ i.register_op("op_shutdown", s.stateful_json_op_sync(t, op_shutdown));
+ i.register_op("op_listen", s.stateful_json_op_sync(t, op_listen));
i.register_op(
"op_datagram_receive",
- s.stateful_json_op2(op_datagram_receive),
+ s.stateful_json_op_async(t, op_datagram_receive),
+ );
+ i.register_op(
+ "op_datagram_send",
+ s.stateful_json_op_async(t, op_datagram_send),
);
- i.register_op("op_datagram_send", s.stateful_json_op2(op_datagram_send));
}
#[derive(Deserialize)]
@@ -40,75 +45,72 @@ struct AcceptArgs {
transport: String,
}
-fn accept_tcp(
- isolate_state: &mut CoreIsolateState,
+async fn accept_tcp(
+ resource_table: Rc<RefCell<ResourceTable>>,
args: AcceptArgs,
- _zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+ _zero_copy: BufVec,
+) -> Result<Value, ErrBox> {
let rid = args.rid as u32;
- let resource_table = isolate_state.resource_table.clone();
- let op = async move {
- let accept_fut = poll_fn(|cx| {
- let mut resource_table = resource_table.borrow_mut();
- let listener_resource = resource_table
- .get_mut::<TcpListenerResource>(rid)
- .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
- let listener = &mut listener_resource.listener;
- match listener.poll_accept(cx).map_err(ErrBox::from) {
- Poll::Ready(Ok((stream, addr))) => {
- listener_resource.untrack_task();
- Poll::Ready(Ok((stream, addr)))
- }
- Poll::Pending => {
- listener_resource.track_task(cx)?;
- Poll::Pending
- }
- Poll::Ready(Err(e)) => {
- listener_resource.untrack_task();
- Poll::Ready(Err(e))
- }
- }
- });
- let (tcp_stream, _socket_addr) = accept_fut.await?;
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
+ let accept_fut = poll_fn(|cx| {
let mut resource_table = resource_table.borrow_mut();
- let rid = resource_table.add(
- "tcpStream",
- Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
- tcp_stream,
- )))),
- );
- Ok(json!({
- "rid": rid,
- "localAddr": {
- "hostname": local_addr.ip().to_string(),
- "port": local_addr.port(),
- "transport": "tcp",
- },
- "remoteAddr": {
- "hostname": remote_addr.ip().to_string(),
- "port": remote_addr.port(),
- "transport": "tcp",
+ let listener_resource = resource_table
+ .get_mut::<TcpListenerResource>(rid)
+ .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
+ let listener = &mut listener_resource.listener;
+ match listener.poll_accept(cx).map_err(ErrBox::from) {
+ Poll::Ready(Ok((stream, addr))) => {
+ listener_resource.untrack_task();
+ Poll::Ready(Ok((stream, addr)))
}
- }))
- };
-
- Ok(JsonOp::Async(op.boxed_local()))
+ Poll::Pending => {
+ listener_resource.track_task(cx)?;
+ Poll::Pending
+ }
+ Poll::Ready(Err(e)) => {
+ listener_resource.untrack_task();
+ Poll::Ready(Err(e))
+ }
+ }
+ });
+ let (tcp_stream, _socket_addr) = accept_fut.await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut resource_table = resource_table.borrow_mut();
+ let rid = resource_table.add(
+ "tcpStream",
+ Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
+ tcp_stream,
+ )))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": "tcp",
+ },
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": "tcp",
+ }
+ }))
}
-fn op_accept(
- isolate_state: &mut CoreIsolateState,
- _state: &Rc<State>,
+async fn op_accept(
+ _state: Rc<State>,
+ resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
- zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+ zero_copy: BufVec,
+) -> Result<Value, ErrBox> {
let args: AcceptArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
- "tcp" => accept_tcp(isolate_state, args, zero_copy),
+ "tcp" => accept_tcp(resource_table, args, zero_copy).await,
#[cfg(unix)]
- "unix" => net_unix::accept_unix(isolate_state, args.rid as u32, zero_copy),
+ "unix" => {
+ net_unix::accept_unix(resource_table, args.rid as u32, zero_copy).await
+ }
_ => Err(ErrBox::error(format!(
"Unsupported transport protocol {}",
args.transport
@@ -122,58 +124,53 @@ struct ReceiveArgs {
transport: String,
}
-fn receive_udp(
- isolate_state: &mut CoreIsolateState,
+async fn receive_udp(
+ resource_table: Rc<RefCell<ResourceTable>>,
_state: &Rc<State>,
args: ReceiveArgs,
- zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+ zero_copy: BufVec,
+) -> Result<Value, ErrBox> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let mut zero_copy = zero_copy[0].clone();
let rid = args.rid as u32;
- let resource_table = isolate_state.resource_table.clone();
-
- let op = async move {
- let receive_fut = poll_fn(|cx| {
- let mut resource_table = resource_table.borrow_mut();
- let resource = resource_table
- .get_mut::<UdpSocketResource>(rid)
- .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
- let socket = &mut resource.socket;
- socket
- .poll_recv_from(cx, &mut zero_copy)
- .map_err(ErrBox::from)
- });
- let (size, remote_addr) = receive_fut.await?;
- Ok(json!({
- "size": size,
- "remoteAddr": {
- "hostname": remote_addr.ip().to_string(),
- "port": remote_addr.port(),
- "transport": "udp",
- }
- }))
- };
-
- Ok(JsonOp::Async(op.boxed_local()))
+ let receive_fut = poll_fn(|cx| {
+ let mut resource_table = resource_table.borrow_mut();
+ let resource = resource_table
+ .get_mut::<UdpSocketResource>(rid)
+ .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
+ let socket = &mut resource.socket;
+ socket
+ .poll_recv_from(cx, &mut zero_copy)
+ .map_err(ErrBox::from)
+ });
+ let (size, remote_addr) = receive_fut.await?;
+ Ok(json!({
+ "size": size,
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": "udp",
+ }
+ }))
}
-fn op_datagram_receive(
- isolate_state: &mut CoreIsolateState,
- state: &Rc<State>,
+async fn op_datagram_receive(
+ state: Rc<State>,
+ resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
- zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+ zero_copy: BufVec,
+) -> Result<Value, ErrBox> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let args: ReceiveArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
- "udp" => receive_udp(isolate_state, state, args, zero_copy),
+ "udp" => receive_udp(resource_table, &state, args, zero_copy).await,
#[cfg(unix)]
"unixpacket" => {
- net_unix::receive_unix_packet(isolate_state, args.rid as u32, zero_copy)
+ net_unix::receive_unix_packet(resource_table, args.rid as u32, zero_copy)
+ .await
}
_ => Err(ErrBox::error(format!(
"Unsupported transport protocol {}",
@@ -190,16 +187,15 @@ struct SendArgs {
transport_args: ArgsEnum,
}
-fn op_datagram_send(
- isolate_state: &mut CoreIsolateState,
- state: &Rc<State>,
+async fn op_datagram_send(
+ state: Rc<State>,
+ resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
- zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+ zero_copy: BufVec,
+) -> Result<Value, ErrBox> {
assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
let zero_copy = zero_copy[0].clone();
- let resource_table = isolate_state.resource_table.clone();
match serde_json::from_value(args)? {
SendArgs {
rid,
@@ -208,7 +204,7 @@ fn op_datagram_send(
} if transport == "udp" => {
state.check_net(&args.hostname, args.port)?;
let addr = resolve_addr(&args.hostname, args.port)?;
- let f = poll_fn(move |cx| {
+ poll_fn(move |cx| {
let mut resource_table = resource_table.borrow_mut();
let resource = resource_table
.get_mut::<UdpSocketResource>(rid as u32)
@@ -218,8 +214,8 @@ fn op_datagram_send(
.poll_send_to(cx, &zero_copy, &addr)
.map_ok(|byte_length| json!(byte_length))
.map_err(ErrBox::from)
- });
- Ok(JsonOp::Async(f.boxed_local()))
+ })
+ .await
}
#[cfg(unix)]
SendArgs {
@@ -229,22 +225,16 @@ fn op_datagram_send(
} if transport == "unixpacket" => {
let address_path = net_unix::Path::new(&args.path);
state.check_read(&address_path)?;
- let op = async move {
- let mut resource_table = resource_table.borrow_mut();
- let resource = resource_table
- .get_mut::<net_unix::UnixDatagramResource>(rid as u32)
- .ok_or_else(|| {
- ErrBox::new("NotConnected", "Socket has been closed")
- })?;
- let socket = &mut resource.socket;
- let byte_length = socket
- .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
- .await?;
-
- Ok(json!(byte_length))
- };
+ let mut resource_table = resource_table.borrow_mut();
+ let resource = resource_table
+ .get_mut::<net_unix::UnixDatagramResource>(rid as u32)
+ .ok_or_else(|| ErrBox::new("NotConnected", "Socket has been closed"))?;
+ let socket = &mut resource.socket;
+ let byte_length = socket
+ .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
+ .await?;
- Ok(JsonOp::Async(op.boxed_local()))
+ Ok(json!(byte_length))
}
_ => Err(ErrBox::type_error("Wrong argument format!")),
}
@@ -257,46 +247,42 @@ struct ConnectArgs {
transport_args: ArgsEnum,
}
-fn op_connect(
- isolate_state: &mut CoreIsolateState,
- state: &Rc<State>,
+async fn op_connect(
+ state: Rc<State>,
+ resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
- _zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
- let resource_table = isolate_state.resource_table.clone();
+ _zero_copy: BufVec,
+) -> Result<Value, ErrBox> {
match serde_json::from_value(args)? {
ConnectArgs {
transport,
transport_args: ArgsEnum::Ip(args),
} if transport == "tcp" => {
state.check_net(&args.hostname, args.port)?;
- let op = async move {
- let addr = resolve_addr(&args.hostname, args.port)?;
- let tcp_stream = TcpStream::connect(&addr).await?;
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
- let mut resource_table = resource_table.borrow_mut();
- let rid = resource_table.add(
- "tcpStream",
- Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
- tcp_stream,
- )))),
- );
- Ok(json!({
- "rid": rid,
- "localAddr": {
- "hostname": local_addr.ip().to_string(),
- "port": local_addr.port(),
- "transport": transport,
- },
- "remoteAddr": {
- "hostname": remote_addr.ip().to_string(),
- "port": remote_addr.port(),
- "transport": transport,
- }
- }))
- };
- Ok(JsonOp::Async(op.boxed_local()))
+ let addr = resolve_addr(&args.hostname, args.port)?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut resource_table = resource_table.borrow_mut();
+ let rid = resource_table.add(
+ "tcpStream",
+ Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
+ tcp_stream,
+ )))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": transport,
+ },
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": transport,
+ }
+ }))
}
#[cfg(unix)]
ConnectArgs {
@@ -306,32 +292,29 @@ fn op_connect(
let address_path = net_unix::Path::new(&args.path);
state.check_unstable("Deno.connect");
state.check_read(&address_path)?;
- let op = async move {
- let path = args.path;
- let unix_stream =
- net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?;
- let local_addr = unix_stream.local_addr()?;
- let remote_addr = unix_stream.peer_addr()?;
- let mut resource_table = resource_table.borrow_mut();
- let rid = resource_table.add(
- "unixStream",
- Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
- unix_stream,
- ))),
- );
- Ok(json!({
- "rid": rid,
- "localAddr": {
- "path": local_addr.as_pathname(),
- "transport": transport,
- },
- "remoteAddr": {
- "path": remote_addr.as_pathname(),
- "transport": transport,
- }
- }))
- };
- Ok(JsonOp::Async(op.boxed_local()))
+ let path = args.path;
+ let unix_stream =
+ net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?;
+ let local_addr = unix_stream.local_addr()?;
+ let remote_addr = unix_stream.peer_addr()?;
+ let mut resource_table = resource_table.borrow_mut();
+ let rid = resource_table.add(
+ "unixStream",
+ Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
+ unix_stream,
+ ))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "path": local_addr.as_pathname(),
+ "transport": transport,
+ },
+ "remoteAddr": {
+ "path": remote_addr.as_pathname(),
+ "transport": transport,
+ }
+ }))
}
_ => Err(ErrBox::type_error("Wrong argument format!")),
}
@@ -344,11 +327,11 @@ struct ShutdownArgs {
}
fn op_shutdown(
- isolate_state: &mut CoreIsolateState,
- state: &Rc<State>,
+ state: &State,
+ resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+) -> Result<Value, ErrBox> {
state.check_unstable("Deno.shutdown");
let args: ShutdownArgs = serde_json::from_value(args)?;
@@ -362,7 +345,6 @@ fn op_shutdown(
_ => unimplemented!(),
};
- let mut resource_table = isolate_state.resource_table.borrow_mut();
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
@@ -377,7 +359,7 @@ fn op_shutdown(
_ => return Err(ErrBox::bad_resource_id()),
}
- Ok(JsonOp::Sync(json!({})))
+ Ok(json!({}))
}
#[allow(dead_code)]
@@ -485,12 +467,11 @@ fn listen_udp(
}
fn op_listen(
- isolate_state: &mut CoreIsolateState,
- state: &Rc<State>,
+ state: &State,
+ resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
- let mut resource_table = isolate_state.resource_table.borrow_mut();
+) -> Result<Value, ErrBox> {
match serde_json::from_value(args)? {
ListenArgs {
transport,
@@ -502,9 +483,9 @@ fn op_listen(
state.check_net(&args.hostname, args.port)?;
let addr = resolve_addr(&args.hostname, args.port)?;
let (rid, local_addr) = if transport == "tcp" {
- listen_tcp(&mut resource_table, addr)?
+ listen_tcp(resource_table, addr)?
} else {
- listen_udp(&mut resource_table, addr)?
+ listen_udp(resource_table, addr)?
};
debug!(
"New listener {} {}:{}",
@@ -512,14 +493,14 @@ fn op_listen(
local_addr.ip().to_string(),
local_addr.port()
);
- Ok(JsonOp::Sync(json!({
+ Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": transport,
},
- })))
+ }))
}
#[cfg(unix)]
ListenArgs {
@@ -536,22 +517,22 @@ fn op_listen(
state.check_read(&address_path)?;
state.check_write(&address_path)?;
let (rid, local_addr) = if transport == "unix" {
- net_unix::listen_unix(&mut resource_table, &address_path)?
+ net_unix::listen_unix(resource_table, &address_path)?
} else {
- net_unix::listen_unix_packet(&mut resource_table, &address_path)?
+ net_unix::listen_unix_packet(resource_table, &address_path)?
};
debug!(
"New listener {} {}",
rid,
local_addr.as_pathname().unwrap().display(),
);
- Ok(JsonOp::Sync(json!({
+ Ok(json!({
"rid": rid,
"localAddr": {
"path": local_addr.as_pathname(),
"transport": transport,
},
- })))
+ }))
}
#[cfg(unix)]
_ => Err(ErrBox::type_error("Wrong argument format!")),