summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYosi Pramajaya <yosi.pramajaya@gmail.com>2022-01-31 22:36:54 +0700
committerGitHub <noreply@github.com>2022-01-31 16:36:54 +0100
commit3e566bb457663cec57602e564f73ded817e426a8 (patch)
tree7644ed068287f35060cf1a0c7c1827c629778df9
parentb7b6b9c9e5b84dea67ff6a2b691245f640644b2c (diff)
feat(ext/net): Add Conn.setNoDelay and Conn.setKeepAlive (#13103)
-rw-r--r--Cargo.lock1
-rw-r--r--cli/tests/unit/net_test.ts72
-rw-r--r--ext/net/01_net.js8
-rw-r--r--ext/net/Cargo.toml1
-rw-r--r--ext/net/io.rs30
-rw-r--r--ext/net/lib.deno_net.d.ts4
-rw-r--r--ext/net/ops.rs124
7 files changed, 240 insertions, 0 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 0d89411c7..c1ad8fcbf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -987,6 +987,7 @@ dependencies = [
"deno_tls",
"log",
"serde",
+ "socket2 0.4.2",
"tokio",
"trust-dns-proto",
"trust-dns-resolver",
diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts
index 79b83f854..052202676 100644
--- a/cli/tests/unit/net_test.ts
+++ b/cli/tests/unit/net_test.ts
@@ -238,6 +238,78 @@ Deno.test({ permissions: { net: true } }, async function netTcpDialListen() {
conn.close();
});
+Deno.test({ permissions: { net: true } }, async function netTcpSetNoDelay() {
+ const listener = Deno.listen({ port: 3500 });
+ listener.accept().then(
+ async (conn) => {
+ assert(conn.remoteAddr != null);
+ assert(conn.localAddr.transport === "tcp");
+ assertEquals(conn.localAddr.hostname, "127.0.0.1");
+ assertEquals(conn.localAddr.port, 3500);
+ await conn.write(new Uint8Array([1, 2, 3]));
+ conn.close();
+ },
+ );
+
+ const conn = await Deno.connect({ hostname: "127.0.0.1", port: 3500 });
+ conn.setNoDelay(true);
+ assert(conn.remoteAddr.transport === "tcp");
+ assertEquals(conn.remoteAddr.hostname, "127.0.0.1");
+ assertEquals(conn.remoteAddr.port, 3500);
+ assert(conn.localAddr != null);
+ const buf = new Uint8Array(1024);
+ const readResult = await conn.read(buf);
+ assertEquals(3, readResult);
+ assertEquals(1, buf[0]);
+ assertEquals(2, buf[1]);
+ assertEquals(3, buf[2]);
+ assert(conn.rid > 0);
+
+ assert(readResult !== null);
+
+ const readResult2 = await conn.read(buf);
+ assertEquals(readResult2, null);
+
+ listener.close();
+ conn.close();
+});
+
+Deno.test({ permissions: { net: true } }, async function netTcpSetKeepAlive() {
+ const listener = Deno.listen({ port: 3500 });
+ listener.accept().then(
+ async (conn) => {
+ assert(conn.remoteAddr != null);
+ assert(conn.localAddr.transport === "tcp");
+ assertEquals(conn.localAddr.hostname, "127.0.0.1");
+ assertEquals(conn.localAddr.port, 3500);
+ await conn.write(new Uint8Array([1, 2, 3]));
+ conn.close();
+ },
+ );
+
+ const conn = await Deno.connect({ hostname: "127.0.0.1", port: 3500 });
+ conn.setKeepAlive(true);
+ assert(conn.remoteAddr.transport === "tcp");
+ assertEquals(conn.remoteAddr.hostname, "127.0.0.1");
+ assertEquals(conn.remoteAddr.port, 3500);
+ assert(conn.localAddr != null);
+ const buf = new Uint8Array(1024);
+ const readResult = await conn.read(buf);
+ assertEquals(3, readResult);
+ assertEquals(1, buf[0]);
+ assertEquals(2, buf[1]);
+ assertEquals(3, buf[2]);
+ assert(conn.rid > 0);
+
+ assert(readResult !== null);
+
+ const readResult2 = await conn.read(buf);
+ assertEquals(readResult2, null);
+
+ listener.close();
+ conn.close();
+});
+
Deno.test(
{
ignore: Deno.build.os === "windows",
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index 4a4005954..6f54ec999 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -95,6 +95,14 @@
closeWrite() {
return shutdown(this.rid);
}
+
+ setNoDelay(nodelay = true) {
+ return core.opSync("op_set_nodelay", this.rid, nodelay);
+ }
+
+ setKeepAlive(keepalive = true) {
+ return core.opSync("op_set_keepalive", this.rid, keepalive);
+ }
}
class Listener {
diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml
index fe4efa386..b4f643457 100644
--- a/ext/net/Cargo.toml
+++ b/ext/net/Cargo.toml
@@ -18,6 +18,7 @@ deno_core = { version = "0.117.0", path = "../../core" }
deno_tls = { version = "0.22.0", path = "../tls" }
log = "0.4.14"
serde = { version = "1.0.129", features = ["derive"] }
+socket2 = "0.4.2"
tokio = { version = "1.10.1", features = ["full"] }
trust-dns-proto = "0.20.3"
trust-dns-resolver = { version = "0.20.3", features = ["tokio-runtime", "serde-config"] }
diff --git a/ext/net/io.rs b/ext/net/io.rs
index 9673a4a89..17b86af17 100644
--- a/ext/net/io.rs
+++ b/ext/net/io.rs
@@ -1,5 +1,6 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
@@ -9,6 +10,7 @@ use deno_core::CancelTryFuture;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ZeroCopyBuf;
+use socket2::SockRef;
use std::borrow::Cow;
use std::rc::Rc;
use tokio::io::AsyncRead;
@@ -118,6 +120,34 @@ impl Resource for TcpStreamResource {
}
}
+impl TcpStreamResource {
+ pub fn set_nodelay(self: Rc<Self>, nodelay: bool) -> Result<(), AnyError> {
+ self.map_socket(Box::new(move |socket| Ok(socket.set_nodelay(nodelay)?)))
+ }
+
+ pub fn set_keepalive(
+ self: Rc<Self>,
+ keepalive: bool,
+ ) -> Result<(), AnyError> {
+ self
+ .map_socket(Box::new(move |socket| Ok(socket.set_keepalive(keepalive)?)))
+ }
+
+ fn map_socket(
+ self: Rc<Self>,
+ map: Box<dyn FnOnce(SockRef) -> Result<(), AnyError>>,
+ ) -> Result<(), AnyError> {
+ if let Some(wr) = RcRef::map(self, |r| &r.wr).try_borrow() {
+ let stream = wr.as_ref().as_ref();
+ let socket = socket2::SockRef::from(stream);
+
+ return map(socket);
+ }
+
+ Err(generic_error("Unable to get resources"))
+ }
+}
+
#[cfg(unix)]
pub type UnixStreamResource =
FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>;
diff --git a/ext/net/lib.deno_net.d.ts b/ext/net/lib.deno_net.d.ts
index c00cc1e44..ebed8ac87 100644
--- a/ext/net/lib.deno_net.d.ts
+++ b/ext/net/lib.deno_net.d.ts
@@ -50,6 +50,10 @@ declare namespace Deno {
/** Shuts down (`shutdown(2)`) the write side of the connection. Most
* callers should just use `close()`. */
closeWrite(): Promise<void>;
+ /** Enable/disable the use of Nagle's algorithm. Defaults to true */
+ setNoDelay(nodelay?: boolean): void;
+ /** Enable/disable keep-alive functionality */
+ setKeepAlive(keepalive?: boolean): void;
}
// deno-lint-ignore no-empty-interface
diff --git a/ext/net/ops.rs b/ext/net/ops.rs
index 05085b401..f64b79ba7 100644
--- a/ext/net/ops.rs
+++ b/ext/net/ops.rs
@@ -55,6 +55,8 @@ pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> {
("op_dgram_recv", op_async(op_dgram_recv)),
("op_dgram_send", op_async(op_dgram_send::<P>)),
("op_dns_resolve", op_async(op_dns_resolve::<P>)),
+ ("op_set_nodelay", op_sync(op_set_nodelay::<P>)),
+ ("op_set_keepalive", op_sync(op_set_keepalive::<P>)),
]
}
@@ -665,6 +667,26 @@ where
Ok(results)
}
+pub fn op_set_nodelay<NP>(
+ state: &mut OpState,
+ rid: ResourceId,
+ nodelay: bool,
+) -> Result<(), AnyError> {
+ let resource: Rc<TcpStreamResource> =
+ state.resource_table.get::<TcpStreamResource>(rid)?;
+ resource.set_nodelay(nodelay)
+}
+
+pub fn op_set_keepalive<NP>(
+ state: &mut OpState,
+ rid: ResourceId,
+ keepalive: bool,
+) -> Result<(), AnyError> {
+ let resource: Rc<TcpStreamResource> =
+ state.resource_table.get::<TcpStreamResource>(rid)?;
+ resource.set_keepalive(keepalive)
+}
+
fn rdata_to_return_record(
ty: RecordType,
) -> impl Fn(&RData) -> Option<DnsReturnRecord> {
@@ -717,8 +739,13 @@ fn rdata_to_return_record(
#[cfg(test)]
mod tests {
use super::*;
+ use deno_core::Extension;
+ use deno_core::JsRuntime;
+ use deno_core::RuntimeOptions;
+ use socket2::SockRef;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
+ use std::path::Path;
use trust_dns_proto::rr::rdata::mx::MX;
use trust_dns_proto::rr::rdata::srv::SRV;
use trust_dns_proto::rr::rdata::txt::TXT;
@@ -810,4 +837,101 @@ mod tests {
]))
);
}
+
+ struct TestPermission {}
+
+ impl NetPermissions for TestPermission {
+ fn check_net<T: AsRef<str>>(
+ &mut self,
+ _host: &(T, Option<u16>),
+ ) -> Result<(), AnyError> {
+ Ok(())
+ }
+
+ fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> {
+ Ok(())
+ }
+
+ fn check_write(&mut self, _p: &Path) -> Result<(), AnyError> {
+ Ok(())
+ }
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+ async fn tcp_set_no_delay() {
+ let set_nodelay = Box::new(|state: &mut OpState, rid| {
+ op_set_nodelay::<TestPermission>(state, rid, true).unwrap();
+ });
+ let test_fn = Box::new(|socket: SockRef| {
+ assert!(socket.nodelay().unwrap());
+ assert!(!socket.keepalive().unwrap());
+ });
+ check_sockopt(String::from("127.0.0.1:4245"), set_nodelay, test_fn).await;
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+ async fn tcp_set_keepalive() {
+ let set_keepalive = Box::new(|state: &mut OpState, rid| {
+ op_set_keepalive::<TestPermission>(state, rid, true).unwrap();
+ });
+ let test_fn = Box::new(|socket: SockRef| {
+ assert!(!socket.nodelay().unwrap());
+ assert!(socket.keepalive().unwrap());
+ });
+ check_sockopt(String::from("127.0.0.1:4246"), set_keepalive, test_fn).await;
+ }
+
+ async fn check_sockopt(
+ addr: String,
+ set_sockopt_fn: Box<dyn Fn(&mut OpState, u32)>,
+ test_fn: Box<dyn FnOnce(SockRef)>,
+ ) {
+ let clone_addr = addr.clone();
+ tokio::spawn(async move {
+ let listener = TcpListener::bind(addr).await.unwrap();
+ let _ = listener.accept().await;
+ });
+ let my_ext = Extension::builder()
+ .state(move |state| {
+ state.put(TestPermission {});
+ Ok(())
+ })
+ .build();
+
+ let mut runtime = JsRuntime::new(RuntimeOptions {
+ extensions: vec![my_ext],
+ ..Default::default()
+ });
+
+ let conn_state = runtime.op_state();
+
+ let server_addr: Vec<&str> = clone_addr.split(':').collect();
+ let ip_args = IpListenArgs {
+ hostname: String::from(server_addr[0]),
+ port: server_addr[1].parse().unwrap(),
+ };
+ let connect_args = ConnectArgs {
+ transport: String::from("tcp"),
+ transport_args: ArgsEnum::Ip(ip_args),
+ };
+
+ let connect_fut =
+ op_net_connect::<TestPermission>(conn_state, connect_args, ());
+ let conn = connect_fut.await.unwrap();
+
+ let rid = conn.rid;
+ let state = runtime.op_state();
+ set_sockopt_fn(&mut state.borrow_mut(), rid);
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<TcpStreamResource>(rid)
+ .unwrap();
+
+ let wr = resource.wr_borrow_mut().await;
+ let stream = wr.as_ref().as_ref();
+ let socket = socket2::SockRef::from(stream);
+ test_fn(socket);
+ }
}