summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--js/net_test.ts36
-rw-r--r--src/resources.rs57
-rw-r--r--src/tokio_util.rs20
3 files changed, 106 insertions, 7 deletions
diff --git a/js/net_test.ts b/js/net_test.ts
index 6ba36547f..be415f19c 100644
--- a/js/net_test.ts
+++ b/js/net_test.ts
@@ -8,6 +8,42 @@ testPerm({ net: true }, function netListenClose() {
listener.close();
});
+testPerm({ net: true }, async function netCloseWhileAccept() {
+ const listener = deno.listen("tcp", ":4501");
+ const p = listener.accept();
+ listener.close();
+ let err;
+ try {
+ await p;
+ } catch (e) {
+ err = e;
+ }
+ assert(!!err);
+ assertEqual(err.kind, deno.ErrorKind.Other);
+ assertEqual(err.message, "Listener has been closed");
+});
+
+testPerm({ net: true }, async function netConcurrentAccept() {
+ const listener = deno.listen("tcp", ":4502");
+ let err;
+ // Consume this accept error
+ // (since it would still be waiting when listener.close is called)
+ listener.accept().catch(e => {
+ assertEqual(e.kind, deno.ErrorKind.Other);
+ assertEqual(e.message, "Listener has been closed");
+ });
+ const p1 = listener.accept();
+ try {
+ await p1;
+ } catch (e) {
+ err = e;
+ }
+ assert(!!err);
+ assertEqual(err.kind, deno.ErrorKind.Other);
+ assertEqual(err.message, "Another accept task is ongoing");
+ listener.close();
+});
+
testPerm({ net: true }, async function netDialListen() {
const listener = deno.listen("tcp", ":4500");
listener.accept().then(async conn => {
diff --git a/src/resources.rs b/src/resources.rs
index 55e1a9f64..1f5a121de 100644
--- a/src/resources.rs
+++ b/src/resources.rs
@@ -87,7 +87,12 @@ enum Repr {
Stdout(tokio::fs::File),
Stderr(tokio::io::Stderr),
FsFile(tokio::fs::File),
- TcpListener(tokio::net::TcpListener),
+ // Since TcpListener might be closed while there is a pending accept task,
+ // we need to track the task so that when the listener is closed,
+ // this pending task could be notified and die.
+ // Currently TcpListener itself does not take care of this issue.
+ // See: https://github.com/tokio-rs/tokio/issues/846
+ TcpListener(tokio::net::TcpListener, Option<futures::task::Task>),
TcpStream(tokio::net::TcpStream),
HttpBody(HttpBody),
Repl(Repl),
@@ -132,7 +137,7 @@ fn inspect_repr(repr: &Repr) -> String {
Repr::Stdout(_) => "stdout",
Repr::Stderr(_) => "stderr",
Repr::FsFile(_) => "fsFile",
- Repr::TcpListener(_) => "tcpListener",
+ Repr::TcpListener(_, _) => "tcpListener",
Repr::TcpStream(_) => "tcpStream",
Repr::HttpBody(_) => "httpBody",
Repr::Repl(_) => "repl",
@@ -159,20 +164,60 @@ impl Resource {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&self.rid);
match maybe_repr {
- None => panic!("bad rid"),
+ None => Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Listener has been closed",
+ )),
Some(repr) => match repr {
- Repr::TcpListener(ref mut s) => s.poll_accept(),
+ Repr::TcpListener(ref mut s, _) => s.poll_accept(),
_ => panic!("Cannot accept"),
},
}
}
+ /// Track the current task (for TcpListener resource).
+ /// Throws an error if another task is already tracked.
+ pub fn track_task(&mut self) -> Result<(), std::io::Error> {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ // Only track if is TcpListener.
+ if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
+ // Currently, we only allow tracking a single accept task for a listener.
+ // This might be changed in the future with multiple workers.
+ // Caveat: TcpListener by itself also only tracks an accept task at a time.
+ // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
+ if t.is_some() {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Another accept task is ongoing",
+ ));
+ }
+ t.replace(futures::task::current());
+ }
+ Ok(())
+ }
+
+ /// Stop tracking a task (for TcpListener resource).
+ /// Happens when the task is done and thus no further tracking is needed.
+ pub fn untrack_task(&mut self) {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ // Only untrack if is TcpListener.
+ if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
+ assert!(t.is_some());
+ t.take();
+ }
+ }
+
// close(2) is done by dropping the value. Therefore we just need to remove
// the resource from the RESOURCE_TABLE.
pub fn close(&self) {
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&self.rid);
assert!(r.is_some());
+ // If TcpListener, we must kill all pending accepts!
+ if let Repr::TcpListener(_, Some(t)) = r.unwrap() {
+ // Call notify on the tracked task, so that they would error out.
+ t.notify();
+ }
}
pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
@@ -264,7 +309,7 @@ pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource {
pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource {
let rid = new_rid();
let mut tg = RESOURCE_TABLE.lock().unwrap();
- let r = tg.insert(rid, Repr::TcpListener(listener));
+ let r = tg.insert(rid, Repr::TcpListener(listener, None));
assert!(r.is_none());
Resource { rid }
}
@@ -515,7 +560,7 @@ pub fn eager_accept(resource: Resource) -> EagerAccept {
match maybe_repr {
None => panic!("bad rid"),
Some(repr) => match repr {
- Repr::TcpListener(ref mut tcp_listener) => {
+ Repr::TcpListener(ref mut tcp_listener, _) => {
eager::tcp_accept(tcp_listener, resource)
}
_ => Either::A(tokio_util::accept(resource)),
diff --git a/src/tokio_util.rs b/src/tokio_util.rs
index 32542aa43..2eb0211db 100644
--- a/src/tokio_util.rs
+++ b/src/tokio_util.rs
@@ -63,7 +63,25 @@ impl Future for Accept {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (stream, addr) = match self.state {
- AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()),
+ // Similar to try_ready!, but also track/untrack accept task
+ // in TcpListener resource.
+ // In this way, when the listener is closed, the task can be
+ // notified to error out (instead of stuck forever).
+ AcceptState::Pending(ref mut r) => match r.poll_accept() {
+ Ok(futures::prelude::Async::Ready(t)) => {
+ r.untrack_task();
+ t
+ }
+ Ok(futures::prelude::Async::NotReady) => {
+ // Would error out if another accept task is being tracked.
+ r.track_task()?;
+ return Ok(futures::prelude::Async::NotReady);
+ }
+ Err(e) => {
+ r.untrack_task();
+ return Err(From::from(e));
+ }
+ },
AcceptState::Empty => panic!("poll Accept after it's done"),
};