diff options
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 107 |
1 files changed, 48 insertions, 59 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 101fc5130..8a6afe756 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -20,12 +20,12 @@ use tokio::net::UdpSocket; use super::net_unix; pub fn init(i: &mut Isolate, s: &State) { - i.register_op("op_accept", s.stateful_json_op(op_accept)); - i.register_op("op_connect", s.stateful_json_op(op_connect)); - i.register_op("op_shutdown", s.stateful_json_op(op_shutdown)); - i.register_op("op_listen", s.stateful_json_op(op_listen)); - i.register_op("op_receive", s.stateful_json_op(op_receive)); - i.register_op("op_send", s.stateful_json_op(op_send)); + i.register_op("op_accept", s.stateful_json_op2(op_accept)); + i.register_op("op_connect", s.stateful_json_op2(op_connect)); + i.register_op("op_shutdown", s.stateful_json_op2(op_shutdown)); + i.register_op("op_listen", s.stateful_json_op2(op_listen)); + i.register_op("op_receive", s.stateful_json_op2(op_receive)); + i.register_op("op_send", s.stateful_json_op2(op_send)); } #[derive(Deserialize)] @@ -35,25 +35,16 @@ struct AcceptArgs { } fn accept_tcp( - state: &State, + isolate: &mut deno_core::Isolate, args: AcceptArgs, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { let rid = args.rid as u32; - let state_ = state.clone(); - { - let state = state.borrow(); - state - .resource_table - .get::<TcpListenerResource>(rid) - .ok_or_else(OpError::bad_resource_id)?; - } - - let state = state.clone(); + let resource_table = isolate.resource_table.clone(); let op = async move { let accept_fut = poll_fn(|cx| { - let resource_table = &mut state.borrow_mut().resource_table; + let mut resource_table = resource_table.borrow_mut(); let listener_resource = resource_table .get_mut::<TcpListenerResource>(rid) .ok_or_else(|| { @@ -78,8 +69,8 @@ fn accept_tcp( let (tcp_stream, _socket_addr) = accept_fut.await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let mut state = state_.borrow_mut(); - let rid = state.resource_table.add( + let mut resource_table = resource_table.borrow_mut(); + let rid = resource_table.add( "tcpStream", Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( tcp_stream, @@ -104,15 +95,16 @@ fn accept_tcp( } fn op_accept( - state: &State, + isolate: &mut deno_core::Isolate, + _state: &State, args: Value, zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { let args: AcceptArgs = serde_json::from_value(args)?; match args.transport.as_str() { - "tcp" => accept_tcp(state, args, zero_copy), + "tcp" => accept_tcp(isolate, args, zero_copy), #[cfg(unix)] - "unix" => net_unix::accept_unix(state, args.rid as u32, zero_copy), + "unix" => net_unix::accept_unix(isolate, args.rid as u32, zero_copy), _ => Err(OpError::other(format!( "Unsupported transport protocol {}", args.transport @@ -127,7 +119,8 @@ struct ReceiveArgs { } fn receive_udp( - state: &State, + isolate: &mut deno_core::Isolate, + _state: &State, args: ReceiveArgs, zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { @@ -135,11 +128,11 @@ fn receive_udp( let rid = args.rid as u32; - let state_ = state.clone(); + let resource_table = isolate.resource_table.clone(); let op = async move { let receive_fut = poll_fn(|cx| { - let resource_table = &mut state_.borrow_mut().resource_table; + let mut resource_table = resource_table.borrow_mut(); let resource = resource_table .get_mut::<UdpSocketResource>(rid) .ok_or_else(|| { @@ -163,6 +156,7 @@ fn receive_udp( } fn op_receive( + isolate: &mut deno_core::Isolate, state: &State, args: Value, zero_copy: Option<ZeroCopyBuf>, @@ -170,10 +164,10 @@ fn op_receive( assert!(zero_copy.is_some()); let args: ReceiveArgs = serde_json::from_value(args)?; match args.transport.as_str() { - "udp" => receive_udp(state, args, zero_copy), + "udp" => receive_udp(isolate, state, args, zero_copy), #[cfg(unix)] "unixpacket" => { - net_unix::receive_unix_packet(state, args.rid as u32, zero_copy) + net_unix::receive_unix_packet(isolate, args.rid as u32, zero_copy) } _ => Err(OpError::other(format!( "Unsupported transport protocol {}", @@ -191,13 +185,14 @@ struct SendArgs { } fn op_send( + isolate: &mut deno_core::Isolate, state: &State, args: Value, zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { assert!(zero_copy.is_some()); let buf = zero_copy.unwrap(); - let state_ = state.clone(); + let resource_table = isolate.resource_table.clone(); match serde_json::from_value(args)? { SendArgs { rid, @@ -207,9 +202,8 @@ fn op_send( state.check_net(&args.hostname, args.port)?; let op = async move { - let mut state = state_.borrow_mut(); - let resource = state - .resource_table + let mut resource_table = resource_table.borrow_mut(); + let resource = resource_table .get_mut::<UdpSocketResource>(rid as u32) .ok_or_else(|| { OpError::bad_resource("Socket has been closed".to_string()) @@ -231,9 +225,8 @@ fn op_send( let address_path = net_unix::Path::new(&args.address); state.check_read(&address_path)?; let op = async move { - let mut state = state_.borrow_mut(); - let resource = state - .resource_table + let mut resource_table = resource_table.borrow_mut(); + let resource = resource_table .get_mut::<net_unix::UnixDatagramResource>(rid as u32) .ok_or_else(|| { OpError::other("Socket has been closed".to_string()) @@ -261,24 +254,25 @@ struct ConnectArgs { } fn op_connect( + isolate: &mut deno_core::Isolate, state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { + let resource_table = isolate.resource_table.clone(); match serde_json::from_value(args)? { ConnectArgs { transport, transport_args: ArgsEnum::Ip(args), } if transport == "tcp" => { - let state_ = state.clone(); state.check_net(&args.hostname, args.port)?; let op = async move { let addr = resolve_addr(&args.hostname, args.port)?; let tcp_stream = TcpStream::connect(&addr).await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let mut state = state_.borrow_mut(); - let rid = state.resource_table.add( + let mut resource_table = resource_table.borrow_mut(); + let rid = resource_table.add( "tcpStream", Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( tcp_stream, @@ -306,7 +300,6 @@ fn op_connect( transport_args: ArgsEnum::Unix(args), } if transport == "unix" => { let address_path = net_unix::Path::new(&args.address); - let state_ = state.clone(); state.check_read(&address_path)?; let op = async move { let address = args.address; @@ -314,8 +307,8 @@ fn op_connect( net_unix::UnixStream::connect(net_unix::Path::new(&address)).await?; let local_addr = unix_stream.local_addr()?; let remote_addr = unix_stream.peer_addr()?; - let mut state = state_.borrow_mut(); - let rid = state.resource_table.add( + let mut resource_table = resource_table.borrow_mut(); + let rid = resource_table.add( "unixStream", Box::new(StreamResourceHolder::new(StreamResource::UnixStream( unix_stream, @@ -346,7 +339,8 @@ struct ShutdownArgs { } fn op_shutdown( - state: &State, + isolate: &mut deno_core::Isolate, + _state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { @@ -361,9 +355,8 @@ fn op_shutdown( _ => unimplemented!(), }; - let mut state = state.borrow_mut(); - let resource_holder = state - .resource_table + let mut resource_table = isolate.resource_table.borrow_mut(); + let resource_holder = resource_table .get_mut::<StreamResourceHolder>(rid) .ok_or_else(OpError::bad_resource_id)?; match resource_holder.resource { @@ -456,10 +449,9 @@ struct ListenArgs { } fn listen_tcp( - state: &State, + resource_table: &mut ResourceTable, addr: SocketAddr, ) -> Result<(u32, SocketAddr), OpError> { - let mut state = state.borrow_mut(); let std_listener = std::net::TcpListener::bind(&addr)?; let listener = TcpListener::from_std(std_listener)?; let local_addr = listener.local_addr()?; @@ -468,34 +460,31 @@ fn listen_tcp( waker: None, local_addr, }; - let rid = state - .resource_table - .add("tcpListener", Box::new(listener_resource)); + let rid = resource_table.add("tcpListener", Box::new(listener_resource)); Ok((rid, local_addr)) } fn listen_udp( - state: &State, + resource_table: &mut ResourceTable, addr: SocketAddr, ) -> Result<(u32, SocketAddr), OpError> { - let mut state = state.borrow_mut(); let std_socket = std::net::UdpSocket::bind(&addr)?; let socket = UdpSocket::from_std(std_socket)?; let local_addr = socket.local_addr()?; let socket_resource = UdpSocketResource { socket }; - let rid = state - .resource_table - .add("udpSocket", Box::new(socket_resource)); + let rid = resource_table.add("udpSocket", Box::new(socket_resource)); Ok((rid, local_addr)) } fn op_listen( + isolate: &mut deno_core::Isolate, state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { + let mut resource_table = isolate.resource_table.borrow_mut(); match serde_json::from_value(args)? { ListenArgs { transport, @@ -504,9 +493,9 @@ fn op_listen( state.check_net(&args.hostname, args.port)?; let addr = resolve_addr(&args.hostname, args.port)?; let (rid, local_addr) = if transport == "tcp" { - listen_tcp(state, addr)? + listen_tcp(&mut resource_table, addr)? } else { - listen_udp(state, addr)? + listen_udp(&mut resource_table, addr)? }; debug!( "New listener {} {}:{}", @@ -531,9 +520,9 @@ fn op_listen( let address_path = net_unix::Path::new(&args.address); state.check_read(&address_path)?; let (rid, local_addr) = if transport == "unix" { - net_unix::listen_unix(state, &address_path)? + net_unix::listen_unix(&mut resource_table, &address_path)? } else { - net_unix::listen_unix_packet(state, &address_path)? + net_unix::listen_unix_packet(&mut resource_table, &address_path)? }; debug!( "New listener {} {}", |