diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2019-05-23 21:22:52 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-23 21:22:52 +0300 |
commit | 583a646be7764a686b757418f31ab4d8e0e6a17a (patch) | |
tree | 4e1433516acd550b721e4f9f91f6cdc089c85c69 | |
parent | 2952fb5405eabbb9e603e90fe2e972e9bddfcadd (diff) |
Fix concurrent accepts (#2403)
-rw-r--r-- | cli/resources.rs | 41 | ||||
-rw-r--r-- | cli/tokio_util.rs | 21 | ||||
-rw-r--r-- | js/net_test.ts | 2 |
3 files changed, 58 insertions, 6 deletions
diff --git a/cli/resources.rs b/cli/resources.rs index 8473fc6f1..a6cee811f 100644 --- a/cli/resources.rs +++ b/cli/resources.rs @@ -171,12 +171,49 @@ impl Resource { } } + /// Track the current task (for TcpListener resource). + /// Throws an error if another task is already tracked. + pub fn track_task(&mut self) -> Result<(), std::io::Error> { + let mut table = RESOURCE_TABLE.lock().unwrap(); + // Only track if is TcpListener. + if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) { + // Currently, we only allow tracking a single accept task for a listener. + // This might be changed in the future with multiple workers. + // Caveat: TcpListener by itself also only tracks an accept task at a time. + // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 + if t.is_some() { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Another accept task is ongoing", + )); + } + t.replace(futures::task::current()); + } + Ok(()) + } + + /// Stop tracking a task (for TcpListener resource). + /// Happens when the task is done and thus no further tracking is needed. + pub fn untrack_task(&mut self) { + let mut table = RESOURCE_TABLE.lock().unwrap(); + // Only untrack if is TcpListener. + if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) { + if t.is_some() { + t.take(); + } + } + } + // close(2) is done by dropping the value. Therefore we just need to remove // the resource from the RESOURCE_TABLE. pub fn close(&self) { let mut table = RESOURCE_TABLE.lock().unwrap(); - let r = table.remove(&self.rid); - assert!(r.is_some()); + let r = table.remove(&self.rid).unwrap(); + // If TcpListener, we must kill all pending accepts! + if let Repr::TcpListener(_, Some(t)) = r { + // Call notify on the tracked task, so that they would error out. + t.notify(); + } } pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> { diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index e1f8587c3..fa42846cb 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -78,14 +78,31 @@ pub fn accept(r: Resource) -> Accept { pub struct Accept { state: AcceptState, } - impl Future for Accept { type Item = (TcpStream, SocketAddr); type Error = io::Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { let (stream, addr) = match self.state { - AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()), + // Similar to try_ready!, but also track/untrack accept task + // in TcpListener resource. + // In this way, when the listener is closed, the task can be + // notified to error out (instead of stuck forever). + AcceptState::Pending(ref mut r) => match r.poll_accept() { + Ok(futures::prelude::Async::Ready(t)) => { + r.untrack_task(); + t + } + Ok(futures::prelude::Async::NotReady) => { + // Would error out if another accept task is being tracked. + r.track_task()?; + return Ok(futures::prelude::Async::NotReady); + } + Err(e) => { + r.untrack_task(); + return Err(e); + } + }, AcceptState::Empty => panic!("poll Accept after it's done"), }; diff --git a/js/net_test.ts b/js/net_test.ts index 036f136b8..9f7e621bc 100644 --- a/js/net_test.ts +++ b/js/net_test.ts @@ -21,7 +21,6 @@ testPerm({ net: true }, async function netCloseWhileAccept(): Promise<void> { assertEquals(err.message, "Listener has been closed"); }); -/* TODO(ry) Re-enable this test. testPerm({ net: true }, async function netConcurrentAccept(): Promise<void> { const listener = Deno.listen("tcp", ":4502"); let acceptErrCount = 0; @@ -42,7 +41,6 @@ testPerm({ net: true }, async function netConcurrentAccept(): Promise<void> { await [p, p1]; assertEquals(acceptErrCount, 1); }); -*/ testPerm({ net: true }, async function netDialListen(): Promise<void> { const listener = Deno.listen("tcp", ":4500"); |