diff options
Diffstat (limited to 'cli/ops')
-rw-r--r-- | cli/ops/io.rs | 8 | ||||
-rw-r--r-- | cli/ops/mod.rs | 2 | ||||
-rw-r--r-- | cli/ops/net.rs | 361 | ||||
-rw-r--r-- | cli/ops/net_unix.rs | 142 |
4 files changed, 409 insertions, 104 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 7969184ef..0c9a83883 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -148,6 +148,8 @@ pub enum StreamResource { Stderr(tokio::io::Stderr), FsFile(tokio::fs::File, FileMetadata), TcpStream(tokio::net::TcpStream), + #[cfg(not(windows))] + UnixStream(tokio::net::UnixStream), ServerTlsStream(Box<ServerTlsStream<TcpStream>>), ClientTlsStream(Box<ClientTlsStream<TcpStream>>), HttpBody(Box<HttpBody>), @@ -183,6 +185,8 @@ impl DenoAsyncRead for StreamResource { FsFile(f, _) => f, Stdin(f, _) => f, TcpStream(f) => f, + #[cfg(not(windows))] + UnixStream(f) => f, ClientTlsStream(f) => f, ServerTlsStream(f) => f, ChildStdout(f) => f, @@ -262,6 +266,8 @@ impl DenoAsyncWrite for StreamResource { Stdout(f) => f, Stderr(f) => f, TcpStream(f) => f, + #[cfg(not(windows))] + UnixStream(f) => f, ClientTlsStream(f) => f, ServerTlsStream(f) => f, ChildStdin(f) => f, @@ -279,6 +285,8 @@ impl DenoAsyncWrite for StreamResource { Stdout(f) => f, Stderr(f) => f, TcpStream(f) => f, + #[cfg(not(windows))] + UnixStream(f) => f, ClientTlsStream(f) => f, ServerTlsStream(f) => f, ChildStdin(f) => f, diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index c011facfc..b91a61c3a 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -15,6 +15,8 @@ pub mod fs; pub mod fs_events; pub mod io; pub mod net; +#[cfg(unix)] +mod net_unix; pub mod os; pub mod permissions; pub mod plugins; diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 3987e94c1..f074ef9ee 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -18,6 +18,9 @@ use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; +#[cfg(unix)] +use super::net_unix; + pub fn init(i: &mut Isolate, s: &State) { i.register_op("op_accept", s.stateful_json_op(op_accept)); i.register_op("op_connect", s.stateful_json_op(op_connect)); @@ -30,14 +33,14 @@ pub fn init(i: &mut Isolate, s: &State) { #[derive(Deserialize)] struct AcceptArgs { rid: i32, + transport: String, } -fn op_accept( +fn accept_tcp( state: &State, - args: Value, + args: AcceptArgs, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - let args: AcceptArgs = serde_json::from_value(args)?; let rid = args.rid as u32; let state_ = state.clone(); { @@ -102,20 +105,36 @@ fn op_accept( Ok(JsonOp::Async(op.boxed_local())) } +fn op_accept( + state: &State, + args: Value, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, OpError> { + let args: AcceptArgs = serde_json::from_value(args)?; + match args.transport.as_str() { + "tcp" => accept_tcp(state, args, zero_copy), + #[cfg(unix)] + "unix" => net_unix::accept_unix(state, args.rid as u32, zero_copy), + _ => Err(OpError::other(format!( + "Unsupported transport protocol {}", + args.transport + ))), + } +} + #[derive(Deserialize)] struct ReceiveArgs { rid: i32, + transport: String, } -fn op_receive( +fn receive_udp( state: &State, - args: Value, + args: ReceiveArgs, zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - assert!(zero_copy.is_some()); let mut buf = zero_copy.unwrap(); - let args: ReceiveArgs = serde_json::from_value(args)?; let rid = args.rid as u32; let state_ = state.clone(); @@ -145,12 +164,32 @@ fn op_receive( Ok(JsonOp::Async(op.boxed_local())) } +fn op_receive( + state: &State, + args: Value, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, OpError> { + assert!(zero_copy.is_some()); + let args: ReceiveArgs = serde_json::from_value(args)?; + match args.transport.as_str() { + "udp" => receive_udp(state, args, zero_copy), + #[cfg(unix)] + "unixpacket" => { + net_unix::receive_unix_packet(state, args.rid as u32, zero_copy) + } + _ => Err(OpError::other(format!( + "Unsupported transport protocol {}", + args.transport + ))), + } +} + #[derive(Deserialize)] struct SendArgs { rid: i32, - hostname: String, - port: u16, transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, } fn op_send( @@ -160,38 +199,67 @@ fn op_send( ) -> Result<JsonOp, OpError> { assert!(zero_copy.is_some()); let buf = zero_copy.unwrap(); - - let args: SendArgs = serde_json::from_value(args)?; - assert_eq!(args.transport, "udp"); - let rid = args.rid as u32; - let state_ = state.clone(); - state.check_net(&args.hostname, args.port)?; - - let op = async move { - let mut state = state_.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(rid) - .ok_or_else(|| { - OpError::bad_resource("Socket has been closed".to_string()) - })?; - - let socket = &mut resource.socket; - let addr = resolve_addr(&args.hostname, args.port).await?; - socket.send_to(&buf, addr).await?; - - Ok(json!({})) - }; - - Ok(JsonOp::Async(op.boxed_local())) + match serde_json::from_value(args)? { + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "udp" => { + state.check_net(&args.hostname, args.port)?; + + let op = async move { + let mut state = state_.borrow_mut(); + let resource = state + .resource_table + .get_mut::<UdpSocketResource>(rid as u32) + .ok_or_else(|| { + OpError::bad_resource("Socket has been closed".to_string()) + })?; + let socket = &mut resource.socket; + let addr = resolve_addr(&args.hostname, args.port).await?; + socket.send_to(&buf, addr).await?; + Ok(json!({})) + }; + + Ok(JsonOp::Async(op.boxed_local())) + } + #[cfg(unix)] + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unixpacket" => { + let address_path = net_unix::Path::new(&args.address); + state.check_read(&address_path)?; + let op = async move { + let mut state = state_.borrow_mut(); + let resource = state + .resource_table + .get_mut::<net_unix::UnixDatagramResource>(rid as u32) + .ok_or_else(|| { + OpError::other("Socket has been closed".to_string()) + })?; + + let socket = &mut resource.socket; + socket + .send_to(&buf, &resource.local_addr.as_pathname().unwrap()) + .await?; + + Ok(json!({})) + }; + + Ok(JsonOp::Async(op.boxed_local())) + } + _ => Err(OpError::other("Wrong argument format!".to_owned())), + } } #[derive(Deserialize)] struct ConnectArgs { transport: String, - hostname: String, - port: u16, + #[serde(flatten)] + transport_args: ArgsEnum, } fn op_connect( @@ -199,39 +267,78 @@ fn op_connect( args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - let args: ConnectArgs = serde_json::from_value(args)?; - assert_eq!(args.transport, "tcp"); // TODO Support others. - let state_ = state.clone(); - state.check_net(&args.hostname, args.port)?; - - let op = async move { - let addr = resolve_addr(&args.hostname, args.port).await?; - let tcp_stream = TcpStream::connect(&addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - let mut state = state_.borrow_mut(); - let rid = state.resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream( - tcp_stream, - ))), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": args.transport, - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": args.transport, - } - })) - }; - - Ok(JsonOp::Async(op.boxed_local())) + match serde_json::from_value(args)? { + ConnectArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "tcp" => { + let state_ = state.clone(); + state.check_net(&args.hostname, args.port)?; + let op = async move { + let addr = resolve_addr(&args.hostname, args.port).await?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut state = state_.borrow_mut(); + let rid = state.resource_table.add( + "tcpStream", + Box::new(StreamResourceHolder::new(StreamResource::TcpStream( + tcp_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": transport, + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": transport, + } + })) + }; + Ok(JsonOp::Async(op.boxed_local())) + } + #[cfg(unix)] + ConnectArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" => { + let address_path = net_unix::Path::new(&args.address); + let state_ = state.clone(); + state.check_read(&address_path)?; + let op = async move { + let address = args.address; + let unix_stream = + net_unix::UnixStream::connect(net_unix::Path::new(&address)).await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + let mut state = state_.borrow_mut(); + let rid = state.resource_table.add( + "unixStream", + Box::new(StreamResourceHolder::new(StreamResource::UnixStream( + unix_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "address": local_addr.as_pathname(), + "transport": transport, + }, + "remoteAddr": { + "address": remote_addr.as_pathname(), + "transport": transport, + } + })) + }; + Ok(JsonOp::Async(op.boxed_local())) + } + _ => Err(OpError::other("Wrong argument format!".to_owned())), + } } #[derive(Deserialize)] @@ -265,19 +372,17 @@ fn op_shutdown( StreamResource::TcpStream(ref mut stream) => { TcpStream::shutdown(stream, shutdown_mode).map_err(OpError::from)?; } + #[cfg(unix)] + StreamResource::UnixStream(ref mut stream) => { + net_unix::UnixStream::shutdown(stream, shutdown_mode) + .map_err(OpError::from)?; + } _ => return Err(OpError::bad_resource_id()), } Ok(JsonOp::Sync(json!({}))) } -#[derive(Deserialize)] -struct ListenArgs { - transport: String, - hostname: String, - port: u16, -} - #[allow(dead_code)] struct TcpListenerResource { listener: TcpListener, @@ -331,6 +436,27 @@ struct UdpSocketResource { socket: UdpSocket, } +#[derive(Deserialize)] +struct IpListenArgs { + hostname: String, + port: u16, +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum ArgsEnum { + Ip(IpListenArgs), + #[cfg(unix)] + Unix(net_unix::UnixListenArgs), +} + +#[derive(Deserialize)] +struct ListenArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + fn listen_tcp( state: &State, addr: SocketAddr, @@ -370,33 +496,60 @@ fn op_listen( args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - let args: ListenArgs = serde_json::from_value(args)?; - assert!(args.transport == "tcp" || args.transport == "udp"); - - state.check_net(&args.hostname, args.port)?; - - let addr = - futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; - - let (rid, local_addr) = if args.transport == "tcp" { - listen_tcp(state, addr)? - } else { - listen_udp(state, addr)? - }; - - debug!( - "New listener {} {}:{}", - rid, - local_addr.ip().to_string(), - local_addr.port() - ); - - Ok(JsonOp::Sync(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": args.transport, - }, - }))) + match serde_json::from_value(args)? { + ListenArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } => { + state.check_net(&args.hostname, args.port)?; + let addr = + futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; + let (rid, local_addr) = if transport == "tcp" { + listen_tcp(state, addr)? + } else { + listen_udp(state, addr)? + }; + debug!( + "New listener {} {}:{}", + rid, + local_addr.ip().to_string(), + local_addr.port() + ); + Ok(JsonOp::Sync(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": transport, + }, + }))) + } + #[cfg(unix)] + ListenArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" || transport == "unixpacket" => { + let address_path = net_unix::Path::new(&args.address); + state.check_read(&address_path)?; + let (rid, local_addr) = if transport == "unix" { + net_unix::listen_unix(state, &address_path)? + } else { + net_unix::listen_unix_packet(state, &address_path)? + }; + debug!( + "New listener {} {}", + rid, + local_addr.as_pathname().unwrap().display(), + ); + Ok(JsonOp::Sync(json!({ + "rid": rid, + "localAddr": { + "address": local_addr.as_pathname(), + "transport": transport, + }, + }))) + } + #[cfg(unix)] + _ => Err(OpError::other("Wrong argument format!".to_owned())), + } } diff --git a/cli/ops/net_unix.rs b/cli/ops/net_unix.rs new file mode 100644 index 000000000..43778e7c6 --- /dev/null +++ b/cli/ops/net_unix.rs @@ -0,0 +1,142 @@ +use super::dispatch_json::{Deserialize, JsonOp}; +use super::io::{StreamResource, StreamResourceHolder}; +use crate::op_error::OpError; +use crate::state::State; +use futures::future::FutureExt; + +use deno_core::*; +use std::fs::remove_file; +use std::os::unix; +pub use std::path::Path; +use tokio::net::UnixDatagram; +use tokio::net::UnixListener; +pub use tokio::net::UnixStream; + +struct UnixListenerResource { + listener: UnixListener, +} + +pub struct UnixDatagramResource { + pub socket: UnixDatagram, + pub local_addr: unix::net::SocketAddr, +} + +#[derive(Deserialize)] +pub struct UnixListenArgs { + pub address: String, +} + +pub fn accept_unix( + state: &State, + rid: u32, + _zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, OpError> { + let state_ = state.clone(); + { + let state = state.borrow(); + state + .resource_table + .get::<UnixListenerResource>(rid) + .ok_or_else(OpError::bad_resource_id)?; + } + let op = async move { + let mut state = state_.borrow_mut(); + let listener_resource = state + .resource_table + .get_mut::<UnixListenerResource>(rid) + .ok_or_else(|| { + OpError::bad_resource("Listener has been closed".to_string()) + })?; + let (unix_stream, _socket_addr) = + listener_resource.listener.accept().await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + let rid = state.resource_table.add( + "unixStream", + Box::new(StreamResourceHolder::new(StreamResource::UnixStream( + unix_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "address": local_addr.as_pathname(), + "transport": "unix", + }, + "remoteAddr": { + "address": remote_addr.as_pathname(), + "transport": "unix", + } + })) + }; + + Ok(JsonOp::Async(op.boxed_local())) +} + +pub fn receive_unix_packet( + state: &State, + rid: u32, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, OpError> { + let mut buf = zero_copy.unwrap(); + let state_ = state.clone(); + + let op = async move { + let mut state = state_.borrow_mut(); + let resource = state + .resource_table + .get_mut::<UnixDatagramResource>(rid) + .ok_or_else(|| { + OpError::bad_resource("Socket has been closed".to_string()) + })?; + let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?; + Ok(json!({ + "size": size, + "remoteAddr": { + "address": remote_addr.as_pathname(), + "transport": "unixpacket", + } + })) + }; + + Ok(JsonOp::Async(op.boxed_local())) +} + +pub fn listen_unix( + state: &State, + addr: &Path, +) -> Result<(u32, unix::net::SocketAddr), OpError> { + let mut state = state.borrow_mut(); + if addr.exists() { + remove_file(&addr).unwrap(); + } + let listener = UnixListener::bind(&addr)?; + let local_addr = listener.local_addr()?; + let listener_resource = UnixListenerResource { listener }; + let rid = state + .resource_table + .add("unixListener", Box::new(listener_resource)); + + Ok((rid, local_addr)) +} + +pub fn listen_unix_packet( + state: &State, + addr: &Path, +) -> Result<(u32, unix::net::SocketAddr), OpError> { + let mut state = state.borrow_mut(); + if addr.exists() { + remove_file(&addr).unwrap(); + } + let socket = UnixDatagram::bind(&addr)?; + let local_addr = socket.local_addr()?; + let datagram_resource = UnixDatagramResource { + socket, + local_addr: local_addr.clone(), + }; + let rid = state + .resource_table + .add("unixDatagram", Box::new(datagram_resource)); + + Ok((rid, local_addr)) +} |