diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2020-09-10 09:57:45 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-10 09:57:45 -0400 |
commit | 7c2e7c660804afca823d60e6496aa853f75db16c (patch) | |
tree | b7746b181c1564c6b1abd2e906662f9e6b008417 /cli/ops/net.rs | |
parent | 6f70e6e72ba2d5c1de7495adac37c1e4f4e86b24 (diff) |
Use gotham-like state for ops (#7385)
Provides a concrete state type that can be dynamically added. This is necessary for op crates.
* renames BasicState to OpState
* async ops take `Rc<RefCell<OpState>>`
* sync ops take `&mut OpState`
* removes `OpRegistry`, `OpRouter` traits
* `get_error_class_fn` moved to OpState
* ResourceTable moved to OpState
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 117 |
1 files changed, 66 insertions, 51 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 91a9079d4..67a201460 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -2,14 +2,14 @@ use crate::ops::io::{StreamResource, StreamResourceHolder}; use crate::resolve_addr::resolve_addr; -use crate::state::State; use deno_core::BufVec; use deno_core::ErrBox; -use deno_core::OpRegistry; +use deno_core::OpState; use deno_core::ZeroCopyBuf; use futures::future::poll_fn; use serde_derive::Deserialize; use serde_json::Value; +use std::cell::RefCell; use std::net::Shutdown; use std::net::SocketAddr; use std::rc::Rc; @@ -22,13 +22,13 @@ use tokio::net::UdpSocket; #[cfg(unix)] use super::net_unix; -pub fn init(s: &Rc<State>) { - s.register_op_json_async("op_accept", op_accept); - s.register_op_json_async("op_connect", op_connect); - s.register_op_json_sync("op_shutdown", op_shutdown); - s.register_op_json_sync("op_listen", op_listen); - s.register_op_json_async("op_datagram_receive", op_datagram_receive); - s.register_op_json_async("op_datagram_send", op_datagram_send); +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_async(rt, "op_accept", op_accept); + super::reg_json_async(rt, "op_connect", op_connect); + super::reg_json_sync(rt, "op_shutdown", op_shutdown); + super::reg_json_sync(rt, "op_listen", op_listen); + super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive); + super::reg_json_async(rt, "op_datagram_send", op_datagram_send); } #[derive(Deserialize)] @@ -38,15 +38,16 @@ pub(crate) struct AcceptArgs { } async fn accept_tcp( - state: Rc<State>, + state: Rc<RefCell<OpState>>, args: AcceptArgs, _zero_copy: BufVec, ) -> Result<Value, ErrBox> { let rid = args.rid as u32; let accept_fut = poll_fn(|cx| { - let mut resource_table = state.resource_table.borrow_mut(); - let listener_resource = resource_table + let mut state = state.borrow_mut(); + let listener_resource = state + .resource_table .get_mut::<TcpListenerResource>(rid) .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?; let listener = &mut listener_resource.listener; @@ -68,7 +69,9 @@ async fn accept_tcp( let (tcp_stream, _socket_addr) = accept_fut.await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let rid = state.resource_table.borrow_mut().add( + + let mut state = state.borrow_mut(); + let rid = state.resource_table.add( "tcpStream", Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( tcp_stream, @@ -90,7 +93,7 @@ async fn accept_tcp( } async fn op_accept( - state: Rc<State>, + state: Rc<RefCell<OpState>>, args: Value, bufs: BufVec, ) -> Result<Value, ErrBox> { @@ -113,7 +116,7 @@ pub(crate) struct ReceiveArgs { } async fn receive_udp( - state: Rc<State>, + state: Rc<RefCell<OpState>>, args: ReceiveArgs, zero_copy: BufVec, ) -> Result<Value, ErrBox> { @@ -123,8 +126,9 @@ async fn receive_udp( let rid = args.rid as u32; let receive_fut = poll_fn(|cx| { - let mut resource_table = state.resource_table.borrow_mut(); - let resource = resource_table + let mut state = state.borrow_mut(); + let resource = state + .resource_table .get_mut::<UdpSocketResource>(rid) .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?; let socket = &mut resource.socket; @@ -144,7 +148,7 @@ async fn receive_udp( } async fn op_datagram_receive( - state: Rc<State>, + state: Rc<RefCell<OpState>>, args: Value, zero_copy: BufVec, ) -> Result<Value, ErrBox> { @@ -171,12 +175,13 @@ struct SendArgs { } async fn op_datagram_send( - state: Rc<State>, + state: Rc<RefCell<OpState>>, args: Value, zero_copy: BufVec, ) -> Result<Value, ErrBox> { assert_eq!(zero_copy.len(), 1, "Invalid number of arguments"); let zero_copy = zero_copy[0].clone(); + let cli_state = super::cli_state2(&state); match serde_json::from_value(args)? { SendArgs { @@ -184,11 +189,12 @@ async fn op_datagram_send( transport, transport_args: ArgsEnum::Ip(args), } if transport == "udp" => { - state.check_net(&args.hostname, args.port)?; + cli_state.check_net(&args.hostname, args.port)?; let addr = resolve_addr(&args.hostname, args.port)?; poll_fn(move |cx| { - let mut resource_table = state.resource_table.borrow_mut(); - let resource = resource_table + let mut state = state.borrow_mut(); + let resource = state + .resource_table .get_mut::<UdpSocketResource>(rid as u32) .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?; resource @@ -206,9 +212,10 @@ async fn op_datagram_send( transport_args: ArgsEnum::Unix(args), } if transport == "unixpacket" => { let address_path = net_unix::Path::new(&args.path); - state.check_read(&address_path)?; - let mut resource_table = state.resource_table.borrow_mut(); - let resource = resource_table + cli_state.check_read(&address_path)?; + let mut state = state.borrow_mut(); + let resource = state + .resource_table .get_mut::<net_unix::UnixDatagramResource>(rid as u32) .ok_or_else(|| ErrBox::new("NotConnected", "Socket has been closed"))?; let socket = &mut resource.socket; @@ -230,21 +237,24 @@ struct ConnectArgs { } async fn op_connect( - state: Rc<State>, + state: Rc<RefCell<OpState>>, args: Value, _zero_copy: BufVec, ) -> Result<Value, ErrBox> { + let cli_state = super::cli_state2(&state); match serde_json::from_value(args)? { ConnectArgs { transport, transport_args: ArgsEnum::Ip(args), } if transport == "tcp" => { - state.check_net(&args.hostname, args.port)?; + cli_state.check_net(&args.hostname, args.port)?; let addr = resolve_addr(&args.hostname, args.port)?; let tcp_stream = TcpStream::connect(&addr).await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let rid = state.resource_table.borrow_mut().add( + + let mut state_ = state.borrow_mut(); + let rid = state_.resource_table.add( "tcpStream", Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( tcp_stream, @@ -270,14 +280,16 @@ async fn op_connect( transport_args: ArgsEnum::Unix(args), } if transport == "unix" => { let address_path = net_unix::Path::new(&args.path); - state.check_unstable("Deno.connect"); - state.check_read(&address_path)?; + cli_state.check_unstable("Deno.connect"); + cli_state.check_read(&address_path)?; let path = args.path; let unix_stream = net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?; let local_addr = unix_stream.local_addr()?; let remote_addr = unix_stream.peer_addr()?; - let rid = state.resource_table.borrow_mut().add( + + let mut state_ = state.borrow_mut(); + let rid = state_.resource_table.add( "unixStream", Box::new(StreamResourceHolder::new(StreamResource::UnixStream( unix_stream, @@ -306,11 +318,11 @@ struct ShutdownArgs { } fn op_shutdown( - state: &State, + state: &mut OpState, args: Value, _zero_copy: &mut [ZeroCopyBuf], ) -> Result<Value, ErrBox> { - state.check_unstable("Deno.shutdown"); + super::cli_state(state).check_unstable("Deno.shutdown"); let args: ShutdownArgs = serde_json::from_value(args)?; @@ -323,8 +335,8 @@ fn op_shutdown( _ => unimplemented!(), }; - let mut resource_table = state.resource_table.borrow_mut(); - let resource_holder = resource_table + let resource_holder = state + .resource_table .get_mut::<StreamResourceHolder>(rid) .ok_or_else(ErrBox::bad_resource_id)?; match resource_holder.resource { @@ -416,7 +428,7 @@ struct ListenArgs { } fn listen_tcp( - state: &State, + state: &mut OpState, addr: SocketAddr, ) -> Result<(u32, SocketAddr), ErrBox> { let std_listener = std::net::TcpListener::bind(&addr)?; @@ -429,14 +441,13 @@ fn listen_tcp( }; let rid = state .resource_table - .borrow_mut() .add("tcpListener", Box::new(listener_resource)); Ok((rid, local_addr)) } fn listen_udp( - state: &State, + state: &mut OpState, addr: SocketAddr, ) -> Result<(u32, SocketAddr), ErrBox> { let std_socket = std::net::UdpSocket::bind(&addr)?; @@ -445,26 +456,28 @@ fn listen_udp( let socket_resource = UdpSocketResource { socket }; let rid = state .resource_table - .borrow_mut() .add("udpSocket", Box::new(socket_resource)); Ok((rid, local_addr)) } fn op_listen( - state: &State, + state: &mut OpState, args: Value, _zero_copy: &mut [ZeroCopyBuf], ) -> Result<Value, ErrBox> { + let cli_state = super::cli_state(state); match serde_json::from_value(args)? { ListenArgs { transport, transport_args: ArgsEnum::Ip(args), } => { - if transport == "udp" { - state.check_unstable("Deno.listenDatagram"); + { + if transport == "udp" { + cli_state.check_unstable("Deno.listenDatagram"); + } + cli_state.check_net(&args.hostname, args.port)?; } - state.check_net(&args.hostname, args.port)?; let addr = resolve_addr(&args.hostname, args.port)?; let (rid, local_addr) = if transport == "tcp" { listen_tcp(state, addr)? @@ -491,15 +504,17 @@ fn op_listen( transport, transport_args: ArgsEnum::Unix(args), } if transport == "unix" || transport == "unixpacket" => { - if transport == "unix" { - state.check_unstable("Deno.listen"); - } - if transport == "unixpacket" { - state.check_unstable("Deno.listenDatagram"); - } let address_path = net_unix::Path::new(&args.path); - state.check_read(&address_path)?; - state.check_write(&address_path)?; + { + if transport == "unix" { + cli_state.check_unstable("Deno.listen"); + } + if transport == "unixpacket" { + cli_state.check_unstable("Deno.listenDatagram"); + } + cli_state.check_read(&address_path)?; + cli_state.check_write(&address_path)?; + } let (rid, local_addr) = if transport == "unix" { net_unix::listen_unix(state, &address_path)? } else { |