diff options
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 174 |
1 files changed, 163 insertions, 11 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 569aebca0..c8fd5d398 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -18,12 +18,15 @@ use std::task::Poll; use tokio; use tokio::net::TcpListener; use tokio::net::TcpStream; +use tokio::net::UdpSocket; pub fn init(i: &mut Isolate, s: &State) { i.register_op("accept", s.core_op(json_op(s.stateful_op(op_accept)))); i.register_op("connect", s.core_op(json_op(s.stateful_op(op_connect)))); i.register_op("shutdown", s.core_op(json_op(s.stateful_op(op_shutdown)))); i.register_op("listen", s.core_op(json_op(s.stateful_op(op_listen)))); + i.register_op("receive", s.core_op(json_op(s.stateful_op(op_receive)))); + i.register_op("send", s.core_op(json_op(s.stateful_op(op_send)))); } #[derive(Debug, PartialEq)] @@ -137,6 +140,121 @@ fn op_accept( Ok(JsonOp::Async(op.boxed_local())) } +pub struct Receive<'a> { + state: &'a State, + rid: ResourceId, + buf: ZeroCopyBuf, +} + +impl Future for Receive<'_> { + type Output = Result<(usize, SocketAddr), ErrBox>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut state = inner.state.borrow_mut(); + let resource = state + .resource_table + .get_mut::<UdpSocketResource>(inner.rid) + .ok_or_else(|| { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Socket has been closed", + ); + ErrBox::from(e) + })?; + + let socket = &mut resource.socket; + + socket + .poll_recv_from(cx, &mut inner.buf) + .map_err(ErrBox::from) + } +} + +#[derive(Deserialize)] +struct ReceiveArgs { + rid: i32, +} + +fn receive(state: &State, rid: ResourceId, buf: ZeroCopyBuf) -> Receive { + Receive { state, rid, buf } +} + +fn op_receive( + state: &State, + args: Value, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, ErrBox> { + assert!(zero_copy.is_some()); + let buf = zero_copy.unwrap(); + + let args: ReceiveArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + + let state_ = state.clone(); + + let op = async move { + let (size, remote_addr) = receive(&state_, rid, buf).await?; + + Ok(json!({ + "size": size, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": "udp", + } + })) + }; + + Ok(JsonOp::Async(op.boxed_local())) +} + +#[derive(Deserialize)] +struct SendArgs { + rid: i32, + hostname: String, + port: u16, + transport: String, +} + +fn op_send( + state: &State, + args: Value, + zero_copy: Option<ZeroCopyBuf>, +) -> Result<JsonOp, ErrBox> { + assert!(zero_copy.is_some()); + let buf = zero_copy.unwrap(); + + let args: SendArgs = serde_json::from_value(args)?; + assert_eq!(args.transport, "udp"); + let rid = args.rid as u32; + + let state_ = state.clone(); + state.check_net(&args.hostname, args.port)?; + + let op = async move { + let mut state = state_.borrow_mut(); + let resource = state + .resource_table + .get_mut::<UdpSocketResource>(rid) + .ok_or_else(|| { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Socket has been closed", + ); + ErrBox::from(e) + })?; + + let socket = &mut resource.socket; + let addr = resolve_addr(&args.hostname, args.port).await?; + socket.send_to(&buf, addr).await?; + + Ok(json!({})) + }; + + Ok(JsonOp::Async(op.boxed_local())) +} + #[derive(Deserialize)] struct ConnectArgs { transport: String, @@ -278,29 +396,63 @@ impl TcpListenerResource { } } +struct UdpSocketResource { + socket: UdpSocket, +} + +fn listen_tcp( + state: &State, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), ErrBox> { + let mut state = state.borrow_mut(); + let listener = futures::executor::block_on(TcpListener::bind(&addr))?; + let local_addr = listener.local_addr()?; + let listener_resource = TcpListenerResource { + listener, + waker: None, + local_addr, + }; + let rid = state + .resource_table + .add("tcpListener", Box::new(listener_resource)); + + Ok((rid, local_addr)) +} + +fn listen_udp( + state: &State, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), ErrBox> { + let mut state = state.borrow_mut(); + let socket = futures::executor::block_on(UdpSocket::bind(&addr))?; + let local_addr = socket.local_addr()?; + let socket_resource = UdpSocketResource { socket }; + let rid = state + .resource_table + .add("udpSocket", Box::new(socket_resource)); + + Ok((rid, local_addr)) +} + fn op_listen( state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: ListenArgs = serde_json::from_value(args)?; - assert_eq!(args.transport, "tcp"); + assert!(args.transport == "tcp" || args.transport == "udp"); state.check_net(&args.hostname, args.port)?; let addr = futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; - let listener = futures::executor::block_on(TcpListener::bind(&addr))?; - let local_addr = listener.local_addr()?; - let listener_resource = TcpListenerResource { - listener, - waker: None, - local_addr, + + let (rid, local_addr) = if args.transport == "tcp" { + listen_tcp(state, addr)? + } else { + listen_udp(state, addr)? }; - let mut state = state.borrow_mut(); - let rid = state - .resource_table - .add("tcpListener", Box::new(listener_resource)); + debug!( "New listener {} {}:{}", rid, |