summaryrefslogtreecommitdiff
path: root/cli/ops/net.rs
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2020-09-06 02:34:02 +0200
committerGitHub <noreply@github.com>2020-09-06 02:34:02 +0200
commitc821e8f2f1fb8ad5e9eb00854277cafc8c80b2f5 (patch)
treec429a3c2707a4047fb512443a8468b7e15e5730d /cli/ops/net.rs
parent849431eb1d112d1f79f4a327830dc1a5bf22dd47 (diff)
Move JSON ops to deno_core (#7336)
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r--cli/ops/net.rs112
1 files changed, 48 insertions, 64 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 9cb6eb79d..91a9079d4 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -1,15 +1,15 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_json::{Deserialize, Value};
-use super::io::{StreamResource, StreamResourceHolder};
+
+use crate::ops::io::{StreamResource, StreamResourceHolder};
use crate::resolve_addr::resolve_addr;
use crate::state::State;
use deno_core::BufVec;
-use deno_core::CoreIsolate;
use deno_core::ErrBox;
-use deno_core::ResourceTable;
+use deno_core::OpRegistry;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
-use std::cell::RefCell;
+use serde_derive::Deserialize;
+use serde_json::Value;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
@@ -22,38 +22,30 @@ use tokio::net::UdpSocket;
#[cfg(unix)]
use super::net_unix;
-pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
- let t = &CoreIsolate::state(i).borrow().resource_table.clone();
-
- i.register_op("op_accept", s.stateful_json_op_async(t, op_accept));
- i.register_op("op_connect", s.stateful_json_op_async(t, op_connect));
- i.register_op("op_shutdown", s.stateful_json_op_sync(t, op_shutdown));
- i.register_op("op_listen", s.stateful_json_op_sync(t, op_listen));
- i.register_op(
- "op_datagram_receive",
- s.stateful_json_op_async(t, op_datagram_receive),
- );
- i.register_op(
- "op_datagram_send",
- s.stateful_json_op_async(t, op_datagram_send),
- );
+pub fn init(s: &Rc<State>) {
+ s.register_op_json_async("op_accept", op_accept);
+ s.register_op_json_async("op_connect", op_connect);
+ s.register_op_json_sync("op_shutdown", op_shutdown);
+ s.register_op_json_sync("op_listen", op_listen);
+ s.register_op_json_async("op_datagram_receive", op_datagram_receive);
+ s.register_op_json_async("op_datagram_send", op_datagram_send);
}
#[derive(Deserialize)]
-struct AcceptArgs {
- rid: i32,
- transport: String,
+pub(crate) struct AcceptArgs {
+ pub rid: i32,
+ pub transport: String,
}
async fn accept_tcp(
- resource_table: Rc<RefCell<ResourceTable>>,
+ state: Rc<State>,
args: AcceptArgs,
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let rid = args.rid as u32;
let accept_fut = poll_fn(|cx| {
- let mut resource_table = resource_table.borrow_mut();
+ let mut resource_table = state.resource_table.borrow_mut();
let listener_resource = resource_table
.get_mut::<TcpListenerResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?;
@@ -76,8 +68,7 @@ async fn accept_tcp(
let (tcp_stream, _socket_addr) = accept_fut.await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let mut resource_table = resource_table.borrow_mut();
- let rid = resource_table.add(
+ let rid = state.resource_table.borrow_mut().add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
tcp_stream,
@@ -99,18 +90,15 @@ async fn accept_tcp(
}
async fn op_accept(
- _state: Rc<State>,
- resource_table: Rc<RefCell<ResourceTable>>,
+ state: Rc<State>,
args: Value,
- zero_copy: BufVec,
+ bufs: BufVec,
) -> Result<Value, ErrBox> {
let args: AcceptArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
- "tcp" => accept_tcp(resource_table, args, zero_copy).await,
+ "tcp" => accept_tcp(state, args, bufs).await,
#[cfg(unix)]
- "unix" => {
- net_unix::accept_unix(resource_table, args.rid as u32, zero_copy).await
- }
+ "unix" => net_unix::accept_unix(state, args, bufs).await,
_ => Err(ErrBox::error(format!(
"Unsupported transport protocol {}",
args.transport
@@ -119,14 +107,13 @@ async fn op_accept(
}
#[derive(Deserialize)]
-struct ReceiveArgs {
- rid: i32,
- transport: String,
+pub(crate) struct ReceiveArgs {
+ pub rid: i32,
+ pub transport: String,
}
async fn receive_udp(
- resource_table: Rc<RefCell<ResourceTable>>,
- _state: &Rc<State>,
+ state: Rc<State>,
args: ReceiveArgs,
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
@@ -136,7 +123,7 @@ async fn receive_udp(
let rid = args.rid as u32;
let receive_fut = poll_fn(|cx| {
- let mut resource_table = resource_table.borrow_mut();
+ let mut resource_table = state.resource_table.borrow_mut();
let resource = resource_table
.get_mut::<UdpSocketResource>(rid)
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
@@ -158,7 +145,6 @@ async fn receive_udp(
async fn op_datagram_receive(
state: Rc<State>,
- resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
@@ -166,12 +152,9 @@ async fn op_datagram_receive(
let args: ReceiveArgs = serde_json::from_value(args)?;
match args.transport.as_str() {
- "udp" => receive_udp(resource_table, &state, args, zero_copy).await,
+ "udp" => receive_udp(state, args, zero_copy).await,
#[cfg(unix)]
- "unixpacket" => {
- net_unix::receive_unix_packet(resource_table, args.rid as u32, zero_copy)
- .await
- }
+ "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await,
_ => Err(ErrBox::error(format!(
"Unsupported transport protocol {}",
args.transport
@@ -189,7 +172,6 @@ struct SendArgs {
async fn op_datagram_send(
state: Rc<State>,
- resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
zero_copy: BufVec,
) -> Result<Value, ErrBox> {
@@ -205,7 +187,7 @@ async fn op_datagram_send(
state.check_net(&args.hostname, args.port)?;
let addr = resolve_addr(&args.hostname, args.port)?;
poll_fn(move |cx| {
- let mut resource_table = resource_table.borrow_mut();
+ let mut resource_table = state.resource_table.borrow_mut();
let resource = resource_table
.get_mut::<UdpSocketResource>(rid as u32)
.ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?;
@@ -225,7 +207,7 @@ async fn op_datagram_send(
} if transport == "unixpacket" => {
let address_path = net_unix::Path::new(&args.path);
state.check_read(&address_path)?;
- let mut resource_table = resource_table.borrow_mut();
+ let mut resource_table = state.resource_table.borrow_mut();
let resource = resource_table
.get_mut::<net_unix::UnixDatagramResource>(rid as u32)
.ok_or_else(|| ErrBox::new("NotConnected", "Socket has been closed"))?;
@@ -249,7 +231,6 @@ struct ConnectArgs {
async fn op_connect(
state: Rc<State>,
- resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
@@ -263,8 +244,7 @@ async fn op_connect(
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let mut resource_table = resource_table.borrow_mut();
- let rid = resource_table.add(
+ let rid = state.resource_table.borrow_mut().add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
tcp_stream,
@@ -297,8 +277,7 @@ async fn op_connect(
net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
- let mut resource_table = resource_table.borrow_mut();
- let rid = resource_table.add(
+ let rid = state.resource_table.borrow_mut().add(
"unixStream",
Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
unix_stream,
@@ -328,7 +307,6 @@ struct ShutdownArgs {
fn op_shutdown(
state: &State,
- resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<Value, ErrBox> {
@@ -345,6 +323,7 @@ fn op_shutdown(
_ => unimplemented!(),
};
+ let mut resource_table = state.resource_table.borrow_mut();
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(ErrBox::bad_resource_id)?;
@@ -437,7 +416,7 @@ struct ListenArgs {
}
fn listen_tcp(
- resource_table: &mut ResourceTable,
+ state: &State,
addr: SocketAddr,
) -> Result<(u32, SocketAddr), ErrBox> {
let std_listener = std::net::TcpListener::bind(&addr)?;
@@ -448,27 +427,32 @@ fn listen_tcp(
waker: None,
local_addr,
};
- let rid = resource_table.add("tcpListener", Box::new(listener_resource));
+ let rid = state
+ .resource_table
+ .borrow_mut()
+ .add("tcpListener", Box::new(listener_resource));
Ok((rid, local_addr))
}
fn listen_udp(
- resource_table: &mut ResourceTable,
+ state: &State,
addr: SocketAddr,
) -> Result<(u32, SocketAddr), ErrBox> {
let std_socket = std::net::UdpSocket::bind(&addr)?;
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
let socket_resource = UdpSocketResource { socket };
- let rid = resource_table.add("udpSocket", Box::new(socket_resource));
+ let rid = state
+ .resource_table
+ .borrow_mut()
+ .add("udpSocket", Box::new(socket_resource));
Ok((rid, local_addr))
}
fn op_listen(
state: &State,
- resource_table: &mut ResourceTable,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<Value, ErrBox> {
@@ -483,9 +467,9 @@ fn op_listen(
state.check_net(&args.hostname, args.port)?;
let addr = resolve_addr(&args.hostname, args.port)?;
let (rid, local_addr) = if transport == "tcp" {
- listen_tcp(resource_table, addr)?
+ listen_tcp(state, addr)?
} else {
- listen_udp(resource_table, addr)?
+ listen_udp(state, addr)?
};
debug!(
"New listener {} {}:{}",
@@ -517,9 +501,9 @@ fn op_listen(
state.check_read(&address_path)?;
state.check_write(&address_path)?;
let (rid, local_addr) = if transport == "unix" {
- net_unix::listen_unix(resource_table, &address_path)?
+ net_unix::listen_unix(state, &address_path)?
} else {
- net_unix::listen_unix_packet(resource_table, &address_path)?
+ net_unix::listen_unix_packet(state, &address_path)?
};
debug!(
"New listener {} {}",