diff options
author | Bert Belder <bertbelder@gmail.com> | 2020-09-06 02:34:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-06 02:34:02 +0200 |
commit | c821e8f2f1fb8ad5e9eb00854277cafc8c80b2f5 (patch) | |
tree | c429a3c2707a4047fb512443a8468b7e15e5730d /cli/ops/net.rs | |
parent | 849431eb1d112d1f79f4a327830dc1a5bf22dd47 (diff) |
Move JSON ops to deno_core (#7336)
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 112 |
1 files changed, 48 insertions, 64 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 9cb6eb79d..91a9079d4 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,15 +1,15 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use super::dispatch_json::{Deserialize, Value}; -use super::io::{StreamResource, StreamResourceHolder}; + +use crate::ops::io::{StreamResource, StreamResourceHolder}; use crate::resolve_addr::resolve_addr; use crate::state::State; use deno_core::BufVec; -use deno_core::CoreIsolate; use deno_core::ErrBox; -use deno_core::ResourceTable; +use deno_core::OpRegistry; use deno_core::ZeroCopyBuf; use futures::future::poll_fn; -use std::cell::RefCell; +use serde_derive::Deserialize; +use serde_json::Value; use std::net::Shutdown; use std::net::SocketAddr; use std::rc::Rc; @@ -22,38 +22,30 @@ use tokio::net::UdpSocket; #[cfg(unix)] use super::net_unix; -pub fn init(i: &mut CoreIsolate, s: &Rc<State>) { - let t = &CoreIsolate::state(i).borrow().resource_table.clone(); - - i.register_op("op_accept", s.stateful_json_op_async(t, op_accept)); - i.register_op("op_connect", s.stateful_json_op_async(t, op_connect)); - i.register_op("op_shutdown", s.stateful_json_op_sync(t, op_shutdown)); - i.register_op("op_listen", s.stateful_json_op_sync(t, op_listen)); - i.register_op( - "op_datagram_receive", - s.stateful_json_op_async(t, op_datagram_receive), - ); - i.register_op( - "op_datagram_send", - s.stateful_json_op_async(t, op_datagram_send), - ); +pub fn init(s: &Rc<State>) { + s.register_op_json_async("op_accept", op_accept); + s.register_op_json_async("op_connect", op_connect); + s.register_op_json_sync("op_shutdown", op_shutdown); + s.register_op_json_sync("op_listen", op_listen); + s.register_op_json_async("op_datagram_receive", op_datagram_receive); + s.register_op_json_async("op_datagram_send", op_datagram_send); } #[derive(Deserialize)] -struct AcceptArgs { - rid: i32, - transport: String, +pub(crate) struct AcceptArgs { + pub rid: i32, + pub transport: String, } async fn accept_tcp( - resource_table: Rc<RefCell<ResourceTable>>, + state: Rc<State>, args: AcceptArgs, _zero_copy: BufVec, ) -> Result<Value, ErrBox> { let rid = args.rid as u32; let accept_fut = poll_fn(|cx| { - let mut resource_table = resource_table.borrow_mut(); + let mut resource_table = state.resource_table.borrow_mut(); let listener_resource = resource_table .get_mut::<TcpListenerResource>(rid) .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?; @@ -76,8 +68,7 @@ async fn accept_tcp( let (tcp_stream, _socket_addr) = accept_fut.await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let mut resource_table = resource_table.borrow_mut(); - let rid = resource_table.add( + let rid = state.resource_table.borrow_mut().add( "tcpStream", Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( tcp_stream, @@ -99,18 +90,15 @@ async fn accept_tcp( } async fn op_accept( - _state: Rc<State>, - resource_table: Rc<RefCell<ResourceTable>>, + state: Rc<State>, args: Value, - zero_copy: BufVec, + bufs: BufVec, ) -> Result<Value, ErrBox> { let args: AcceptArgs = serde_json::from_value(args)?; match args.transport.as_str() { - "tcp" => accept_tcp(resource_table, args, zero_copy).await, + "tcp" => accept_tcp(state, args, bufs).await, #[cfg(unix)] - "unix" => { - net_unix::accept_unix(resource_table, args.rid as u32, zero_copy).await - } + "unix" => net_unix::accept_unix(state, args, bufs).await, _ => Err(ErrBox::error(format!( "Unsupported transport protocol {}", args.transport @@ -119,14 +107,13 @@ async fn op_accept( } #[derive(Deserialize)] -struct ReceiveArgs { - rid: i32, - transport: String, +pub(crate) struct ReceiveArgs { + pub rid: i32, + pub transport: String, } async fn receive_udp( - resource_table: Rc<RefCell<ResourceTable>>, - _state: &Rc<State>, + state: Rc<State>, args: ReceiveArgs, zero_copy: BufVec, ) -> Result<Value, ErrBox> { @@ -136,7 +123,7 @@ async fn receive_udp( let rid = args.rid as u32; let receive_fut = poll_fn(|cx| { - let mut resource_table = resource_table.borrow_mut(); + let mut resource_table = state.resource_table.borrow_mut(); let resource = resource_table .get_mut::<UdpSocketResource>(rid) .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?; @@ -158,7 +145,6 @@ async fn receive_udp( async fn op_datagram_receive( state: Rc<State>, - resource_table: Rc<RefCell<ResourceTable>>, args: Value, zero_copy: BufVec, ) -> Result<Value, ErrBox> { @@ -166,12 +152,9 @@ async fn op_datagram_receive( let args: ReceiveArgs = serde_json::from_value(args)?; match args.transport.as_str() { - "udp" => receive_udp(resource_table, &state, args, zero_copy).await, + "udp" => receive_udp(state, args, zero_copy).await, #[cfg(unix)] - "unixpacket" => { - net_unix::receive_unix_packet(resource_table, args.rid as u32, zero_copy) - .await - } + "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await, _ => Err(ErrBox::error(format!( "Unsupported transport protocol {}", args.transport @@ -189,7 +172,6 @@ struct SendArgs { async fn op_datagram_send( state: Rc<State>, - resource_table: Rc<RefCell<ResourceTable>>, args: Value, zero_copy: BufVec, ) -> Result<Value, ErrBox> { @@ -205,7 +187,7 @@ async fn op_datagram_send( state.check_net(&args.hostname, args.port)?; let addr = resolve_addr(&args.hostname, args.port)?; poll_fn(move |cx| { - let mut resource_table = resource_table.borrow_mut(); + let mut resource_table = state.resource_table.borrow_mut(); let resource = resource_table .get_mut::<UdpSocketResource>(rid as u32) .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?; @@ -225,7 +207,7 @@ async fn op_datagram_send( } if transport == "unixpacket" => { let address_path = net_unix::Path::new(&args.path); state.check_read(&address_path)?; - let mut resource_table = resource_table.borrow_mut(); + let mut resource_table = state.resource_table.borrow_mut(); let resource = resource_table .get_mut::<net_unix::UnixDatagramResource>(rid as u32) .ok_or_else(|| ErrBox::new("NotConnected", "Socket has been closed"))?; @@ -249,7 +231,6 @@ struct ConnectArgs { async fn op_connect( state: Rc<State>, - resource_table: Rc<RefCell<ResourceTable>>, args: Value, _zero_copy: BufVec, ) -> Result<Value, ErrBox> { @@ -263,8 +244,7 @@ async fn op_connect( let tcp_stream = TcpStream::connect(&addr).await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let mut resource_table = resource_table.borrow_mut(); - let rid = resource_table.add( + let rid = state.resource_table.borrow_mut().add( "tcpStream", Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( tcp_stream, @@ -297,8 +277,7 @@ async fn op_connect( net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?; let local_addr = unix_stream.local_addr()?; let remote_addr = unix_stream.peer_addr()?; - let mut resource_table = resource_table.borrow_mut(); - let rid = resource_table.add( + let rid = state.resource_table.borrow_mut().add( "unixStream", Box::new(StreamResourceHolder::new(StreamResource::UnixStream( unix_stream, @@ -328,7 +307,6 @@ struct ShutdownArgs { fn op_shutdown( state: &State, - resource_table: &mut ResourceTable, args: Value, _zero_copy: &mut [ZeroCopyBuf], ) -> Result<Value, ErrBox> { @@ -345,6 +323,7 @@ fn op_shutdown( _ => unimplemented!(), }; + let mut resource_table = state.resource_table.borrow_mut(); let resource_holder = resource_table .get_mut::<StreamResourceHolder>(rid) .ok_or_else(ErrBox::bad_resource_id)?; @@ -437,7 +416,7 @@ struct ListenArgs { } fn listen_tcp( - resource_table: &mut ResourceTable, + state: &State, addr: SocketAddr, ) -> Result<(u32, SocketAddr), ErrBox> { let std_listener = std::net::TcpListener::bind(&addr)?; @@ -448,27 +427,32 @@ fn listen_tcp( waker: None, local_addr, }; - let rid = resource_table.add("tcpListener", Box::new(listener_resource)); + let rid = state + .resource_table + .borrow_mut() + .add("tcpListener", Box::new(listener_resource)); Ok((rid, local_addr)) } fn listen_udp( - resource_table: &mut ResourceTable, + state: &State, addr: SocketAddr, ) -> Result<(u32, SocketAddr), ErrBox> { let std_socket = std::net::UdpSocket::bind(&addr)?; let socket = UdpSocket::from_std(std_socket)?; let local_addr = socket.local_addr()?; let socket_resource = UdpSocketResource { socket }; - let rid = resource_table.add("udpSocket", Box::new(socket_resource)); + let rid = state + .resource_table + .borrow_mut() + .add("udpSocket", Box::new(socket_resource)); Ok((rid, local_addr)) } fn op_listen( state: &State, - resource_table: &mut ResourceTable, args: Value, _zero_copy: &mut [ZeroCopyBuf], ) -> Result<Value, ErrBox> { @@ -483,9 +467,9 @@ fn op_listen( state.check_net(&args.hostname, args.port)?; let addr = resolve_addr(&args.hostname, args.port)?; let (rid, local_addr) = if transport == "tcp" { - listen_tcp(resource_table, addr)? + listen_tcp(state, addr)? } else { - listen_udp(resource_table, addr)? + listen_udp(state, addr)? }; debug!( "New listener {} {}:{}", @@ -517,9 +501,9 @@ fn op_listen( state.check_read(&address_path)?; state.check_write(&address_path)?; let (rid, local_addr) = if transport == "unix" { - net_unix::listen_unix(resource_table, &address_path)? + net_unix::listen_unix(state, &address_path)? } else { - net_unix::listen_unix_packet(resource_table, &address_path)? + net_unix::listen_unix_packet(state, &address_path)? }; debug!( "New listener {} {}", |