summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYusuke Sakurai <kerokerokerop@gmail.com>2019-04-28 03:33:24 +0900
committerRyan Dahl <ry@tinyclouds.org>2019-04-27 11:33:24 -0700
commit972ac03858cc11e8b6bb759ee69640d17235580d (patch)
tree6631a1520c9385c357984096d930ad369a7ae531
parent02596c08bdedebb7ffb53ead9cae2bd6bf98c238 (diff)
recover: #1517 Kill all pending accepts when TCP listener is closed (#2224)
-rw-r--r--cli/resources.rs37
-rw-r--r--cli/tokio_util.rs21
-rw-r--r--js/net_test.ts92
3 files changed, 110 insertions, 40 deletions
diff --git a/cli/resources.rs b/cli/resources.rs
index 3a7121d4c..66a2ebdb3 100644
--- a/cli/resources.rs
+++ b/cli/resources.rs
@@ -171,12 +171,49 @@ impl Resource {
}
}
+ /// Track the current task (for TcpListener resource).
+ /// Throws an error if another task is already tracked.
+ pub fn track_task(&mut self) -> Result<(), std::io::Error> {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ // Only track if is TcpListener.
+ if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
+ // Currently, we only allow tracking a single accept task for a listener.
+ // This might be changed in the future with multiple workers.
+ // Caveat: TcpListener by itself also only tracks an accept task at a time.
+ // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
+ if t.is_some() {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Another accept task is ongoing",
+ ));
+ }
+ t.replace(futures::task::current());
+ }
+ Ok(())
+ }
+
+ /// Stop tracking a task (for TcpListener resource).
+ /// Happens when the task is done and thus no further tracking is needed.
+ pub fn untrack_task(&mut self) {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ // Only untrack if is TcpListener.
+ if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
+ assert!(t.is_some());
+ t.take();
+ }
+ }
+
// close(2) is done by dropping the value. Therefore we just need to remove
// the resource from the RESOURCE_TABLE.
pub fn close(&self) {
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&self.rid);
assert!(r.is_some());
+ // If TcpListener, we must kill all pending accepts!
+ if let Repr::TcpListener(_, Some(t)) = r.unwrap() {
+ // Call notify on the tracked task, so that they would error out.
+ t.notify();
+ }
}
pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs
index e1f8587c3..a57cbbd2e 100644
--- a/cli/tokio_util.rs
+++ b/cli/tokio_util.rs
@@ -78,14 +78,31 @@ pub fn accept(r: Resource) -> Accept {
pub struct Accept {
state: AcceptState,
}
-
impl Future for Accept {
type Item = (TcpStream, SocketAddr);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (stream, addr) = match self.state {
- AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()),
+ // Similar to try_ready!, but also track/untrack accept task
+ // in TcpListener resource.
+ // In this way, when the listener is closed, the task can be
+ // notified to error out (instead of stuck forever).
+ AcceptState::Pending(ref mut r) => match r.poll_accept() {
+ Ok(futures::prelude::Async::Ready(t)) => {
+ r.untrack_task();
+ t
+ }
+ Ok(futures::prelude::Async::NotReady) => {
+ // Would error out if another accept task is being tracked.
+ r.track_task()?;
+ return Ok(futures::prelude::Async::NotReady);
+ }
+ Err(e) => {
+ r.untrack_task();
+ return Err(From::from(e));
+ }
+ },
AcceptState::Empty => panic!("poll Accept after it's done"),
};
diff --git a/js/net_test.ts b/js/net_test.ts
index f02fa9611..cfbdac0cb 100644
--- a/js/net_test.ts
+++ b/js/net_test.ts
@@ -1,6 +1,22 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import { testPerm, assert, assertEquals } from "./test_util.ts";
+function deferred(): {
+ resolve: () => void;
+ reject: () => void;
+ promise: Promise<void>;
+} {
+ let resolve: () => void;
+ let reject: () => void;
+ const promise = new Promise<void>(
+ (a, b): void => {
+ resolve = a;
+ reject = b;
+ }
+ );
+ return { resolve, reject, promise };
+}
+
testPerm({ net: true }, function netListenClose(): void {
const listener = Deno.listen("tcp", "127.0.0.1:4500");
listener.close();
@@ -72,24 +88,25 @@ testPerm({ net: true }, async function netDialListen(): Promise<void> {
conn.close();
});
-/* TODO Fix broken test.
-testPerm({ net: true }, async function netCloseReadSuccess() {
+testPerm({ net: true }, async function netCloseReadSuccess(): Promise<void> {
const addr = "127.0.0.1:4500";
const listener = Deno.listen("tcp", addr);
const closeDeferred = deferred();
const closeReadDeferred = deferred();
- listener.accept().then(async conn => {
- await closeReadDeferred.promise;
- await conn.write(new Uint8Array([1, 2, 3]));
- const buf = new Uint8Array(1024);
- const readResult = await conn.read(buf);
- assertEquals(3, readResult.nread);
- assertEquals(4, buf[0]);
- assertEquals(5, buf[1]);
- assertEquals(6, buf[2]);
- conn.close();
- closeDeferred.resolve();
- });
+ listener.accept().then(
+ async (conn): Promise<void> => {
+ await closeReadDeferred.promise;
+ await conn.write(new Uint8Array([1, 2, 3]));
+ const buf = new Uint8Array(1024);
+ const readResult = await conn.read(buf);
+ assertEquals(3, readResult.nread);
+ assertEquals(4, buf[0]);
+ assertEquals(5, buf[1]);
+ assertEquals(6, buf[2]);
+ conn.close();
+ closeDeferred.resolve();
+ }
+ );
const conn = await Deno.dial("tcp", addr);
conn.closeRead(); // closing read
closeReadDeferred.resolve();
@@ -103,18 +120,18 @@ testPerm({ net: true }, async function netCloseReadSuccess() {
listener.close();
conn.close();
});
-*/
-/* TODO Fix broken test.
-testPerm({ net: true }, async function netDoubleCloseRead() {
+testPerm({ net: true }, async function netDoubleCloseRead(): Promise<void> {
const addr = "127.0.0.1:4500";
const listener = Deno.listen("tcp", addr);
const closeDeferred = deferred();
- listener.accept().then(async conn => {
- await conn.write(new Uint8Array([1, 2, 3]));
- await closeDeferred.promise;
- conn.close();
- });
+ listener.accept().then(
+ async (conn): Promise<void> => {
+ await conn.write(new Uint8Array([1, 2, 3]));
+ await closeDeferred.promise;
+ conn.close();
+ }
+ );
const conn = await Deno.dial("tcp", addr);
conn.closeRead(); // closing read
let err;
@@ -131,18 +148,18 @@ testPerm({ net: true }, async function netDoubleCloseRead() {
listener.close();
conn.close();
});
-*/
-/* TODO Fix broken test.
-testPerm({ net: true }, async function netCloseWriteSuccess() {
+testPerm({ net: true }, async function netCloseWriteSuccess(): Promise<void> {
const addr = "127.0.0.1:4500";
const listener = Deno.listen("tcp", addr);
const closeDeferred = deferred();
- listener.accept().then(async conn => {
- await conn.write(new Uint8Array([1, 2, 3]));
- await closeDeferred.promise;
- conn.close();
- });
+ listener.accept().then(
+ async (conn): Promise<void> => {
+ await conn.write(new Uint8Array([1, 2, 3]));
+ await closeDeferred.promise;
+ conn.close();
+ }
+ );
const conn = await Deno.dial("tcp", addr);
conn.closeWrite(); // closing write
const buf = new Uint8Array(1024);
@@ -166,17 +183,17 @@ testPerm({ net: true }, async function netCloseWriteSuccess() {
listener.close();
conn.close();
});
-*/
-/* TODO Fix broken test.
-testPerm({ net: true }, async function netDoubleCloseWrite() {
+testPerm({ net: true }, async function netDoubleCloseWrite(): Promise<void> {
const addr = "127.0.0.1:4500";
const listener = Deno.listen("tcp", addr);
const closeDeferred = deferred();
- listener.accept().then(async conn => {
- await closeDeferred.promise;
- conn.close();
- });
+ listener.accept().then(
+ async (conn): Promise<void> => {
+ await closeDeferred.promise;
+ conn.close();
+ }
+ );
const conn = await Deno.dial("tcp", addr);
conn.closeWrite(); // closing write
let err;
@@ -193,4 +210,3 @@ testPerm({ net: true }, async function netDoubleCloseWrite() {
listener.close();
conn.close();
});
-*/