diff options
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 387 |
1 files changed, 184 insertions, 203 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index d979f44ae..9cb6eb79d 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,15 +1,15 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::dispatch_json::{Deserialize, Value}; use super::io::{StreamResource, StreamResourceHolder}; use crate::resolve_addr::resolve_addr; use crate::state::State; +use deno_core::BufVec; use deno_core::CoreIsolate; -use deno_core::CoreIsolateState; use deno_core::ErrBox; use deno_core::ResourceTable; use deno_core::ZeroCopyBuf; use futures::future::poll_fn; -use futures::future::FutureExt; +use std::cell::RefCell; use std::net::Shutdown; use std::net::SocketAddr; use std::rc::Rc; @@ -23,15 +23,20 @@ use tokio::net::UdpSocket; use super::net_unix; pub fn init(i: &mut CoreIsolate, s: &Rc<State>) { - i.register_op("op_accept", s.stateful_json_op2(op_accept)); - i.register_op("op_connect", s.stateful_json_op2(op_connect)); - i.register_op("op_shutdown", s.stateful_json_op2(op_shutdown)); - i.register_op("op_listen", s.stateful_json_op2(op_listen)); + let t = &CoreIsolate::state(i).borrow().resource_table.clone(); + + i.register_op("op_accept", s.stateful_json_op_async(t, op_accept)); + i.register_op("op_connect", s.stateful_json_op_async(t, op_connect)); + i.register_op("op_shutdown", s.stateful_json_op_sync(t, op_shutdown)); + i.register_op("op_listen", s.stateful_json_op_sync(t, op_listen)); i.register_op( "op_datagram_receive", - s.stateful_json_op2(op_datagram_receive), + s.stateful_json_op_async(t, op_datagram_receive), + ); + i.register_op( + "op_datagram_send", + s.stateful_json_op_async(t, op_datagram_send), ); - i.register_op("op_datagram_send", s.stateful_json_op2(op_datagram_send)); } #[derive(Deserialize)] @@ -40,75 +45,72 @@ struct AcceptArgs { transport: String, } -fn accept_tcp( - isolate_state: &mut CoreIsolateState, +async fn accept_tcp( + resource_table: Rc<RefCell<ResourceTable>>, args: AcceptArgs, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { + _zero_copy: BufVec, +) -> Result<Value, ErrBox> { let rid = args.rid as u32; - let resource_table = isolate_state.resource_table.clone(); - let op = async move { - let accept_fut = poll_fn(|cx| { - let mut resource_table = resource_table.borrow_mut(); - let listener_resource = resource_table - .get_mut::<TcpListenerResource>(rid) - .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - match listener.poll_accept(cx).map_err(ErrBox::from) { - Poll::Ready(Ok((stream, addr))) => { - listener_resource.untrack_task(); - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => { - listener_resource.track_task(cx)?; - Poll::Pending - } - Poll::Ready(Err(e)) => { - listener_resource.untrack_task(); - Poll::Ready(Err(e)) - } - } - }); - let (tcp_stream, _socket_addr) = accept_fut.await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; + let accept_fut = poll_fn(|cx| { let mut resource_table = resource_table.borrow_mut(); - let rid = resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( - tcp_stream, - )))), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": "tcp", - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": "tcp", + let listener_resource = resource_table + .get_mut::<TcpListenerResource>(rid) + .ok_or_else(|| ErrBox::bad_resource("Listener has been closed"))?; + let listener = &mut listener_resource.listener; + match listener.poll_accept(cx).map_err(ErrBox::from) { + Poll::Ready(Ok((stream, addr))) => { + listener_resource.untrack_task(); + Poll::Ready(Ok((stream, addr))) } - })) - }; - - Ok(JsonOp::Async(op.boxed_local())) + Poll::Pending => { + listener_resource.track_task(cx)?; + Poll::Pending + } + Poll::Ready(Err(e)) => { + listener_resource.untrack_task(); + Poll::Ready(Err(e)) + } + } + }); + let (tcp_stream, _socket_addr) = accept_fut.await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut resource_table = resource_table.borrow_mut(); + let rid = resource_table.add( + "tcpStream", + Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( + tcp_stream, + )))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": "tcp", + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": "tcp", + } + })) } -fn op_accept( - isolate_state: &mut CoreIsolateState, - _state: &Rc<State>, +async fn op_accept( + _state: Rc<State>, + resource_table: Rc<RefCell<ResourceTable>>, args: Value, - zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { + zero_copy: BufVec, +) -> Result<Value, ErrBox> { let args: AcceptArgs = serde_json::from_value(args)?; match args.transport.as_str() { - "tcp" => accept_tcp(isolate_state, args, zero_copy), + "tcp" => accept_tcp(resource_table, args, zero_copy).await, #[cfg(unix)] - "unix" => net_unix::accept_unix(isolate_state, args.rid as u32, zero_copy), + "unix" => { + net_unix::accept_unix(resource_table, args.rid as u32, zero_copy).await + } _ => Err(ErrBox::error(format!( "Unsupported transport protocol {}", args.transport @@ -122,58 +124,53 @@ struct ReceiveArgs { transport: String, } -fn receive_udp( - isolate_state: &mut CoreIsolateState, +async fn receive_udp( + resource_table: Rc<RefCell<ResourceTable>>, _state: &Rc<State>, args: ReceiveArgs, - zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { + zero_copy: BufVec, +) -> Result<Value, ErrBox> { assert_eq!(zero_copy.len(), 1, "Invalid number of arguments"); let mut zero_copy = zero_copy[0].clone(); let rid = args.rid as u32; - let resource_table = isolate_state.resource_table.clone(); - - let op = async move { - let receive_fut = poll_fn(|cx| { - let mut resource_table = resource_table.borrow_mut(); - let resource = resource_table - .get_mut::<UdpSocketResource>(rid) - .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?; - let socket = &mut resource.socket; - socket - .poll_recv_from(cx, &mut zero_copy) - .map_err(ErrBox::from) - }); - let (size, remote_addr) = receive_fut.await?; - Ok(json!({ - "size": size, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": "udp", - } - })) - }; - - Ok(JsonOp::Async(op.boxed_local())) + let receive_fut = poll_fn(|cx| { + let mut resource_table = resource_table.borrow_mut(); + let resource = resource_table + .get_mut::<UdpSocketResource>(rid) + .ok_or_else(|| ErrBox::bad_resource("Socket has been closed"))?; + let socket = &mut resource.socket; + socket + .poll_recv_from(cx, &mut zero_copy) + .map_err(ErrBox::from) + }); + let (size, remote_addr) = receive_fut.await?; + Ok(json!({ + "size": size, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": "udp", + } + })) } -fn op_datagram_receive( - isolate_state: &mut CoreIsolateState, - state: &Rc<State>, +async fn op_datagram_receive( + state: Rc<State>, + resource_table: Rc<RefCell<ResourceTable>>, args: Value, - zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { + zero_copy: BufVec, +) -> Result<Value, ErrBox> { assert_eq!(zero_copy.len(), 1, "Invalid number of arguments"); let args: ReceiveArgs = serde_json::from_value(args)?; match args.transport.as_str() { - "udp" => receive_udp(isolate_state, state, args, zero_copy), + "udp" => receive_udp(resource_table, &state, args, zero_copy).await, #[cfg(unix)] "unixpacket" => { - net_unix::receive_unix_packet(isolate_state, args.rid as u32, zero_copy) + net_unix::receive_unix_packet(resource_table, args.rid as u32, zero_copy) + .await } _ => Err(ErrBox::error(format!( "Unsupported transport protocol {}", @@ -190,16 +187,15 @@ struct SendArgs { transport_args: ArgsEnum, } -fn op_datagram_send( - isolate_state: &mut CoreIsolateState, - state: &Rc<State>, +async fn op_datagram_send( + state: Rc<State>, + resource_table: Rc<RefCell<ResourceTable>>, args: Value, - zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { + zero_copy: BufVec, +) -> Result<Value, ErrBox> { assert_eq!(zero_copy.len(), 1, "Invalid number of arguments"); let zero_copy = zero_copy[0].clone(); - let resource_table = isolate_state.resource_table.clone(); match serde_json::from_value(args)? { SendArgs { rid, @@ -208,7 +204,7 @@ fn op_datagram_send( } if transport == "udp" => { state.check_net(&args.hostname, args.port)?; let addr = resolve_addr(&args.hostname, args.port)?; - let f = poll_fn(move |cx| { + poll_fn(move |cx| { let mut resource_table = resource_table.borrow_mut(); let resource = resource_table .get_mut::<UdpSocketResource>(rid as u32) @@ -218,8 +214,8 @@ fn op_datagram_send( .poll_send_to(cx, &zero_copy, &addr) .map_ok(|byte_length| json!(byte_length)) .map_err(ErrBox::from) - }); - Ok(JsonOp::Async(f.boxed_local())) + }) + .await } #[cfg(unix)] SendArgs { @@ -229,22 +225,16 @@ fn op_datagram_send( } if transport == "unixpacket" => { let address_path = net_unix::Path::new(&args.path); state.check_read(&address_path)?; - let op = async move { - let mut resource_table = resource_table.borrow_mut(); - let resource = resource_table - .get_mut::<net_unix::UnixDatagramResource>(rid as u32) - .ok_or_else(|| { - ErrBox::new("NotConnected", "Socket has been closed") - })?; - let socket = &mut resource.socket; - let byte_length = socket - .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap()) - .await?; - - Ok(json!(byte_length)) - }; + let mut resource_table = resource_table.borrow_mut(); + let resource = resource_table + .get_mut::<net_unix::UnixDatagramResource>(rid as u32) + .ok_or_else(|| ErrBox::new("NotConnected", "Socket has been closed"))?; + let socket = &mut resource.socket; + let byte_length = socket + .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap()) + .await?; - Ok(JsonOp::Async(op.boxed_local())) + Ok(json!(byte_length)) } _ => Err(ErrBox::type_error("Wrong argument format!")), } @@ -257,46 +247,42 @@ struct ConnectArgs { transport_args: ArgsEnum, } -fn op_connect( - isolate_state: &mut CoreIsolateState, - state: &Rc<State>, +async fn op_connect( + state: Rc<State>, + resource_table: Rc<RefCell<ResourceTable>>, args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { - let resource_table = isolate_state.resource_table.clone(); + _zero_copy: BufVec, +) -> Result<Value, ErrBox> { match serde_json::from_value(args)? { ConnectArgs { transport, transport_args: ArgsEnum::Ip(args), } if transport == "tcp" => { state.check_net(&args.hostname, args.port)?; - let op = async move { - let addr = resolve_addr(&args.hostname, args.port)?; - let tcp_stream = TcpStream::connect(&addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - let mut resource_table = resource_table.borrow_mut(); - let rid = resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( - tcp_stream, - )))), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": transport, - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": transport, - } - })) - }; - Ok(JsonOp::Async(op.boxed_local())) + let addr = resolve_addr(&args.hostname, args.port)?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut resource_table = resource_table.borrow_mut(); + let rid = resource_table.add( + "tcpStream", + Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( + tcp_stream, + )))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": transport, + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": transport, + } + })) } #[cfg(unix)] ConnectArgs { @@ -306,32 +292,29 @@ fn op_connect( let address_path = net_unix::Path::new(&args.path); state.check_unstable("Deno.connect"); state.check_read(&address_path)?; - let op = async move { - let path = args.path; - let unix_stream = - net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?; - let local_addr = unix_stream.local_addr()?; - let remote_addr = unix_stream.peer_addr()?; - let mut resource_table = resource_table.borrow_mut(); - let rid = resource_table.add( - "unixStream", - Box::new(StreamResourceHolder::new(StreamResource::UnixStream( - unix_stream, - ))), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "path": local_addr.as_pathname(), - "transport": transport, - }, - "remoteAddr": { - "path": remote_addr.as_pathname(), - "transport": transport, - } - })) - }; - Ok(JsonOp::Async(op.boxed_local())) + let path = args.path; + let unix_stream = + net_unix::UnixStream::connect(net_unix::Path::new(&path)).await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + let mut resource_table = resource_table.borrow_mut(); + let rid = resource_table.add( + "unixStream", + Box::new(StreamResourceHolder::new(StreamResource::UnixStream( + unix_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "path": local_addr.as_pathname(), + "transport": transport, + }, + "remoteAddr": { + "path": remote_addr.as_pathname(), + "transport": transport, + } + })) } _ => Err(ErrBox::type_error("Wrong argument format!")), } @@ -344,11 +327,11 @@ struct ShutdownArgs { } fn op_shutdown( - isolate_state: &mut CoreIsolateState, - state: &Rc<State>, + state: &State, + resource_table: &mut ResourceTable, args: Value, _zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { +) -> Result<Value, ErrBox> { state.check_unstable("Deno.shutdown"); let args: ShutdownArgs = serde_json::from_value(args)?; @@ -362,7 +345,6 @@ fn op_shutdown( _ => unimplemented!(), }; - let mut resource_table = isolate_state.resource_table.borrow_mut(); let resource_holder = resource_table .get_mut::<StreamResourceHolder>(rid) .ok_or_else(ErrBox::bad_resource_id)?; @@ -377,7 +359,7 @@ fn op_shutdown( _ => return Err(ErrBox::bad_resource_id()), } - Ok(JsonOp::Sync(json!({}))) + Ok(json!({})) } #[allow(dead_code)] @@ -485,12 +467,11 @@ fn listen_udp( } fn op_listen( - isolate_state: &mut CoreIsolateState, - state: &Rc<State>, + state: &State, + resource_table: &mut ResourceTable, args: Value, _zero_copy: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { - let mut resource_table = isolate_state.resource_table.borrow_mut(); +) -> Result<Value, ErrBox> { match serde_json::from_value(args)? { ListenArgs { transport, @@ -502,9 +483,9 @@ fn op_listen( state.check_net(&args.hostname, args.port)?; let addr = resolve_addr(&args.hostname, args.port)?; let (rid, local_addr) = if transport == "tcp" { - listen_tcp(&mut resource_table, addr)? + listen_tcp(resource_table, addr)? } else { - listen_udp(&mut resource_table, addr)? + listen_udp(resource_table, addr)? }; debug!( "New listener {} {}:{}", @@ -512,14 +493,14 @@ fn op_listen( local_addr.ip().to_string(), local_addr.port() ); - Ok(JsonOp::Sync(json!({ + Ok(json!({ "rid": rid, "localAddr": { "hostname": local_addr.ip().to_string(), "port": local_addr.port(), "transport": transport, }, - }))) + })) } #[cfg(unix)] ListenArgs { @@ -536,22 +517,22 @@ fn op_listen( state.check_read(&address_path)?; state.check_write(&address_path)?; let (rid, local_addr) = if transport == "unix" { - net_unix::listen_unix(&mut resource_table, &address_path)? + net_unix::listen_unix(resource_table, &address_path)? } else { - net_unix::listen_unix_packet(&mut resource_table, &address_path)? + net_unix::listen_unix_packet(resource_table, &address_path)? }; debug!( "New listener {} {}", rid, local_addr.as_pathname().unwrap().display(), ); - Ok(JsonOp::Sync(json!({ + Ok(json!({ "rid": rid, "localAddr": { "path": local_addr.as_pathname(), "transport": transport, }, - }))) + })) } #[cfg(unix)] _ => Err(ErrBox::type_error("Wrong argument format!")), |