summaryrefslogtreecommitdiff
path: root/cli/ops/net.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r--cli/ops/net.rs174
1 files changed, 163 insertions, 11 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 569aebca0..c8fd5d398 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -18,12 +18,15 @@ use std::task::Poll;
use tokio;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
+use tokio::net::UdpSocket;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("accept", s.core_op(json_op(s.stateful_op(op_accept))));
i.register_op("connect", s.core_op(json_op(s.stateful_op(op_connect))));
i.register_op("shutdown", s.core_op(json_op(s.stateful_op(op_shutdown))));
i.register_op("listen", s.core_op(json_op(s.stateful_op(op_listen))));
+ i.register_op("receive", s.core_op(json_op(s.stateful_op(op_receive))));
+ i.register_op("send", s.core_op(json_op(s.stateful_op(op_send))));
}
#[derive(Debug, PartialEq)]
@@ -137,6 +140,121 @@ fn op_accept(
Ok(JsonOp::Async(op.boxed_local()))
}
+pub struct Receive<'a> {
+ state: &'a State,
+ rid: ResourceId,
+ buf: ZeroCopyBuf,
+}
+
+impl Future for Receive<'_> {
+ type Output = Result<(usize, SocketAddr), ErrBox>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut state = inner.state.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<UdpSocketResource>(inner.rid)
+ .ok_or_else(|| {
+ let e = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Socket has been closed",
+ );
+ ErrBox::from(e)
+ })?;
+
+ let socket = &mut resource.socket;
+
+ socket
+ .poll_recv_from(cx, &mut inner.buf)
+ .map_err(ErrBox::from)
+ }
+}
+
+#[derive(Deserialize)]
+struct ReceiveArgs {
+ rid: i32,
+}
+
+fn receive(state: &State, rid: ResourceId, buf: ZeroCopyBuf) -> Receive {
+ Receive { state, rid, buf }
+}
+
+fn op_receive(
+ state: &State,
+ args: Value,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, ErrBox> {
+ assert!(zero_copy.is_some());
+ let buf = zero_copy.unwrap();
+
+ let args: ReceiveArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+
+ let state_ = state.clone();
+
+ let op = async move {
+ let (size, remote_addr) = receive(&state_, rid, buf).await?;
+
+ Ok(json!({
+ "size": size,
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": "udp",
+ }
+ }))
+ };
+
+ Ok(JsonOp::Async(op.boxed_local()))
+}
+
+#[derive(Deserialize)]
+struct SendArgs {
+ rid: i32,
+ hostname: String,
+ port: u16,
+ transport: String,
+}
+
+fn op_send(
+ state: &State,
+ args: Value,
+ zero_copy: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, ErrBox> {
+ assert!(zero_copy.is_some());
+ let buf = zero_copy.unwrap();
+
+ let args: SendArgs = serde_json::from_value(args)?;
+ assert_eq!(args.transport, "udp");
+ let rid = args.rid as u32;
+
+ let state_ = state.clone();
+ state.check_net(&args.hostname, args.port)?;
+
+ let op = async move {
+ let mut state = state_.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<UdpSocketResource>(rid)
+ .ok_or_else(|| {
+ let e = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Socket has been closed",
+ );
+ ErrBox::from(e)
+ })?;
+
+ let socket = &mut resource.socket;
+ let addr = resolve_addr(&args.hostname, args.port).await?;
+ socket.send_to(&buf, addr).await?;
+
+ Ok(json!({}))
+ };
+
+ Ok(JsonOp::Async(op.boxed_local()))
+}
+
#[derive(Deserialize)]
struct ConnectArgs {
transport: String,
@@ -278,29 +396,63 @@ impl TcpListenerResource {
}
}
+struct UdpSocketResource {
+ socket: UdpSocket,
+}
+
+fn listen_tcp(
+ state: &State,
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), ErrBox> {
+ let mut state = state.borrow_mut();
+ let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
+ let local_addr = listener.local_addr()?;
+ let listener_resource = TcpListenerResource {
+ listener,
+ waker: None,
+ local_addr,
+ };
+ let rid = state
+ .resource_table
+ .add("tcpListener", Box::new(listener_resource));
+
+ Ok((rid, local_addr))
+}
+
+fn listen_udp(
+ state: &State,
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), ErrBox> {
+ let mut state = state.borrow_mut();
+ let socket = futures::executor::block_on(UdpSocket::bind(&addr))?;
+ let local_addr = socket.local_addr()?;
+ let socket_resource = UdpSocketResource { socket };
+ let rid = state
+ .resource_table
+ .add("udpSocket", Box::new(socket_resource));
+
+ Ok((rid, local_addr))
+}
+
fn op_listen(
state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let args: ListenArgs = serde_json::from_value(args)?;
- assert_eq!(args.transport, "tcp");
+ assert!(args.transport == "tcp" || args.transport == "udp");
state.check_net(&args.hostname, args.port)?;
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
- let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
- let local_addr = listener.local_addr()?;
- let listener_resource = TcpListenerResource {
- listener,
- waker: None,
- local_addr,
+
+ let (rid, local_addr) = if args.transport == "tcp" {
+ listen_tcp(state, addr)?
+ } else {
+ listen_udp(state, addr)?
};
- let mut state = state.borrow_mut();
- let rid = state
- .resource_table
- .add("tcpListener", Box::new(listener_resource));
+
debug!(
"New listener {} {}:{}",
rid,