diff options
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 361 |
1 files changed, 257 insertions, 104 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 3987e94c1..f074ef9ee 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -18,6 +18,9 @@ use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; +#[cfg(unix)] +use super::net_unix; + pub fn init(i: &mut Isolate, s: &State) { i.register_op("op_accept", s.stateful_json_op(op_accept)); i.register_op("op_connect", s.stateful_json_op(op_connect)); @@ -30,14 +33,14 @@ pub fn init(i: &mut Isolate, s: &State) { #[derive(Deserialize)] struct AcceptArgs { rid: i32, + transport: String, } -fn op_accept( +fn accept_tcp( state: &State, - args: Value, + args: AcceptArgs, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - let args: AcceptArgs = serde_json::from_value(args)?; let rid = args.rid as u32; let state_ = state.clone(); { @@ -102,20 +105,36 @@ fn op_accept( Ok(JsonOp::Async(op.boxed_local())) } +fn op_accept( + state: &State, + args: Value, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, OpError> { + let args: AcceptArgs = serde_json::from_value(args)?; + match args.transport.as_str() { + "tcp" => accept_tcp(state, args, zero_copy), + #[cfg(unix)] + "unix" => net_unix::accept_unix(state, args.rid as u32, zero_copy), + _ => Err(OpError::other(format!( + "Unsupported transport protocol {}", + args.transport + ))), + } +} + #[derive(Deserialize)] struct ReceiveArgs { rid: i32, + transport: String, } -fn op_receive( +fn receive_udp( state: &State, - args: Value, + args: ReceiveArgs, zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - assert!(zero_copy.is_some()); let mut buf = zero_copy.unwrap(); - let args: ReceiveArgs = serde_json::from_value(args)?; let rid = args.rid as u32; let state_ = state.clone(); @@ -145,12 +164,32 @@ fn op_receive( Ok(JsonOp::Async(op.boxed_local())) } +fn op_receive( + state: &State, + args: Value, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, OpError> { + assert!(zero_copy.is_some()); + let args: ReceiveArgs = serde_json::from_value(args)?; + match args.transport.as_str() { + "udp" => receive_udp(state, args, zero_copy), + #[cfg(unix)] + "unixpacket" => { + net_unix::receive_unix_packet(state, args.rid as u32, zero_copy) + } + _ => Err(OpError::other(format!( + "Unsupported transport protocol {}", + args.transport + ))), + } +} + #[derive(Deserialize)] struct SendArgs { rid: i32, - hostname: String, - port: u16, transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, } fn op_send( @@ -160,38 +199,67 @@ fn op_send( ) -> Result<JsonOp, OpError> { assert!(zero_copy.is_some()); let buf = zero_copy.unwrap(); - - let args: SendArgs = serde_json::from_value(args)?; - assert_eq!(args.transport, "udp"); - let rid = args.rid as u32; - let state_ = state.clone(); - state.check_net(&args.hostname, args.port)?; - - let op = async move { - let mut state = state_.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(rid) - .ok_or_else(|| { - OpError::bad_resource("Socket has been closed".to_string()) - })?; - - let socket = &mut resource.socket; - let addr = resolve_addr(&args.hostname, args.port).await?; - socket.send_to(&buf, addr).await?; - - Ok(json!({})) - }; - - Ok(JsonOp::Async(op.boxed_local())) + match serde_json::from_value(args)? { + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "udp" => { + state.check_net(&args.hostname, args.port)?; + + let op = async move { + let mut state = state_.borrow_mut(); + let resource = state + .resource_table + .get_mut::<UdpSocketResource>(rid as u32) + .ok_or_else(|| { + OpError::bad_resource("Socket has been closed".to_string()) + })?; + let socket = &mut resource.socket; + let addr = resolve_addr(&args.hostname, args.port).await?; + socket.send_to(&buf, addr).await?; + Ok(json!({})) + }; + + Ok(JsonOp::Async(op.boxed_local())) + } + #[cfg(unix)] + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unixpacket" => { + let address_path = net_unix::Path::new(&args.address); + state.check_read(&address_path)?; + let op = async move { + let mut state = state_.borrow_mut(); + let resource = state + .resource_table + .get_mut::<net_unix::UnixDatagramResource>(rid as u32) + .ok_or_else(|| { + OpError::other("Socket has been closed".to_string()) + })?; + + let socket = &mut resource.socket; + socket + .send_to(&buf, &resource.local_addr.as_pathname().unwrap()) + .await?; + + Ok(json!({})) + }; + + Ok(JsonOp::Async(op.boxed_local())) + } + _ => Err(OpError::other("Wrong argument format!".to_owned())), + } } #[derive(Deserialize)] struct ConnectArgs { transport: String, - hostname: String, - port: u16, + #[serde(flatten)] + transport_args: ArgsEnum, } fn op_connect( @@ -199,39 +267,78 @@ fn op_connect( args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - let args: ConnectArgs = serde_json::from_value(args)?; - assert_eq!(args.transport, "tcp"); // TODO Support others. - let state_ = state.clone(); - state.check_net(&args.hostname, args.port)?; - - let op = async move { - let addr = resolve_addr(&args.hostname, args.port).await?; - let tcp_stream = TcpStream::connect(&addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - let mut state = state_.borrow_mut(); - let rid = state.resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream( - tcp_stream, - ))), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": args.transport, - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": args.transport, - } - })) - }; - - Ok(JsonOp::Async(op.boxed_local())) + match serde_json::from_value(args)? { + ConnectArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "tcp" => { + let state_ = state.clone(); + state.check_net(&args.hostname, args.port)?; + let op = async move { + let addr = resolve_addr(&args.hostname, args.port).await?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut state = state_.borrow_mut(); + let rid = state.resource_table.add( + "tcpStream", + Box::new(StreamResourceHolder::new(StreamResource::TcpStream( + tcp_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": transport, + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": transport, + } + })) + }; + Ok(JsonOp::Async(op.boxed_local())) + } + #[cfg(unix)] + ConnectArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" => { + let address_path = net_unix::Path::new(&args.address); + let state_ = state.clone(); + state.check_read(&address_path)?; + let op = async move { + let address = args.address; + let unix_stream = + net_unix::UnixStream::connect(net_unix::Path::new(&address)).await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + let mut state = state_.borrow_mut(); + let rid = state.resource_table.add( + "unixStream", + Box::new(StreamResourceHolder::new(StreamResource::UnixStream( + unix_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "address": local_addr.as_pathname(), + "transport": transport, + }, + "remoteAddr": { + "address": remote_addr.as_pathname(), + "transport": transport, + } + })) + }; + Ok(JsonOp::Async(op.boxed_local())) + } + _ => Err(OpError::other("Wrong argument format!".to_owned())), + } } #[derive(Deserialize)] @@ -265,19 +372,17 @@ fn op_shutdown( StreamResource::TcpStream(ref mut stream) => { TcpStream::shutdown(stream, shutdown_mode).map_err(OpError::from)?; } + #[cfg(unix)] + StreamResource::UnixStream(ref mut stream) => { + net_unix::UnixStream::shutdown(stream, shutdown_mode) + .map_err(OpError::from)?; + } _ => return Err(OpError::bad_resource_id()), } Ok(JsonOp::Sync(json!({}))) } -#[derive(Deserialize)] -struct ListenArgs { - transport: String, - hostname: String, - port: u16, -} - #[allow(dead_code)] struct TcpListenerResource { listener: TcpListener, @@ -331,6 +436,27 @@ struct UdpSocketResource { socket: UdpSocket, } +#[derive(Deserialize)] +struct IpListenArgs { + hostname: String, + port: u16, +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum ArgsEnum { + Ip(IpListenArgs), + #[cfg(unix)] + Unix(net_unix::UnixListenArgs), +} + +#[derive(Deserialize)] +struct ListenArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + fn listen_tcp( state: &State, addr: SocketAddr, @@ -370,33 +496,60 @@ fn op_listen( args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { - let args: ListenArgs = serde_json::from_value(args)?; - assert!(args.transport == "tcp" || args.transport == "udp"); - - state.check_net(&args.hostname, args.port)?; - - let addr = - futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; - - let (rid, local_addr) = if args.transport == "tcp" { - listen_tcp(state, addr)? - } else { - listen_udp(state, addr)? - }; - - debug!( - "New listener {} {}:{}", - rid, - local_addr.ip().to_string(), - local_addr.port() - ); - - Ok(JsonOp::Sync(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": args.transport, - }, - }))) + match serde_json::from_value(args)? { + ListenArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } => { + state.check_net(&args.hostname, args.port)?; + let addr = + futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; + let (rid, local_addr) = if transport == "tcp" { + listen_tcp(state, addr)? + } else { + listen_udp(state, addr)? + }; + debug!( + "New listener {} {}:{}", + rid, + local_addr.ip().to_string(), + local_addr.port() + ); + Ok(JsonOp::Sync(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": transport, + }, + }))) + } + #[cfg(unix)] + ListenArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" || transport == "unixpacket" => { + let address_path = net_unix::Path::new(&args.address); + state.check_read(&address_path)?; + let (rid, local_addr) = if transport == "unix" { + net_unix::listen_unix(state, &address_path)? + } else { + net_unix::listen_unix_packet(state, &address_path)? + }; + debug!( + "New listener {} {}", + rid, + local_addr.as_pathname().unwrap().display(), + ); + Ok(JsonOp::Sync(json!({ + "rid": rid, + "localAddr": { + "address": local_addr.as_pathname(), + "transport": transport, + }, + }))) + } + #[cfg(unix)] + _ => Err(OpError::other("Wrong argument format!".to_owned())), + } } |