summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCasper Beyer <caspervonb@pm.me>2020-09-28 02:19:36 +0800
committerGitHub <noreply@github.com>2020-09-27 14:19:36 -0400
commit5db72dcaf364d0a9d9a5c5c9c10e80cdf8a199ad (patch)
tree9c5beceaf252821cb45b2be1d8f3c630367aaad6
parentebcb032c6bc60ae94039bc047509acffd9713379 (diff)
fix(cli/inspector): shutdown server gracefully on drop (#7716)
-rw-r--r--cli/inspector.rs64
1 files changed, 39 insertions, 25 deletions
diff --git a/cli/inspector.rs b/cli/inspector.rs
index e4c0085ef..d0601c522 100644
--- a/cli/inspector.rs
+++ b/cli/inspector.rs
@@ -10,6 +10,7 @@ use deno_core::futures::channel::mpsc::UnboundedReceiver;
use deno_core::futures::channel::mpsc::UnboundedSender;
use deno_core::futures::channel::oneshot;
use deno_core::futures::future::Future;
+use deno_core::futures::pin_mut;
use deno_core::futures::prelude::*;
use deno_core::futures::select;
use deno_core::futures::stream::FuturesUnordered;
@@ -43,21 +44,30 @@ use warp::Filter;
pub struct InspectorServer {
pub host: SocketAddr,
register_inspector_tx: UnboundedSender<InspectorInfo>,
- _thread_handle: thread::JoinHandle<()>,
+ shutdown_server_tx: Option<oneshot::Sender<()>>,
+ thread_handle: Option<thread::JoinHandle<()>>,
}
impl InspectorServer {
pub fn new(host: SocketAddr) -> Self {
let (register_inspector_tx, register_inspector_rx) =
mpsc::unbounded::<InspectorInfo>();
+
+ let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel();
+
let thread_handle = thread::spawn(move || {
- crate::tokio_util::run_basic(server(host, register_inspector_rx))
+ crate::tokio_util::run_basic(server(
+ host,
+ register_inspector_rx,
+ shutdown_server_rx,
+ ))
});
Self {
host,
register_inspector_tx,
- _thread_handle: thread_handle,
+ shutdown_server_tx: Some(shutdown_server_tx),
+ thread_handle: Some(thread_handle),
}
}
@@ -66,6 +76,20 @@ impl InspectorServer {
}
}
+impl Drop for InspectorServer {
+ fn drop(&mut self) {
+ if let Some(shutdown_server_tx) = self.shutdown_server_tx.take() {
+ shutdown_server_tx
+ .send(())
+ .expect("unable to send shutdown signal");
+ }
+
+ if let Some(thread_handle) = self.thread_handle.take() {
+ thread_handle.join().expect("unable to join thread");
+ }
+ }
+}
+
/// Inspector information that is sent from the isolate thread to the server
/// thread when a new inspector is created.
struct InspectorInfo {
@@ -117,24 +141,8 @@ impl InspectorInfo {
async fn server(
host: SocketAddr,
register_inspector_rx: UnboundedReceiver<InspectorInfo>,
+ shutdown_server_rx: oneshot::Receiver<()>,
) {
- // When the main thread shuts down, The Rust stdlib will call `WSACleanup()`,
- // which shuts down the network stack. This thread will still be
- // running at that time (because it never exits), but all attempts at network
- // I/O will fail with a `WSANOTINITIALIZED` error, which causes a panic.
- // To prevent this from happening, Winsock is initialized another time here;
- // this increases Winsock's internal reference count, so it won't shut
- // itself down when the main thread calls `WSACleanup()` upon exit.
- // TODO: When the last `Inspector` instance is dropped, make it signal the
- // server thread so it exits cleanly, then join it with the main thread.
- #[cfg(windows)]
- unsafe {
- use winapi::um::winsock2::{WSAStartup, WSADATA};
- let mut wsa_data = MaybeUninit::<WSADATA>::zeroed();
- let r = WSAStartup(0x202 /* Winsock 2.2 */, wsa_data.as_mut_ptr());
- assert_eq!(r, 0);
- }
-
// TODO: put the `inspector_map` in an `Rc<RefCell<_>>` instead. This is
// currently not possible because warp requires all filters to implement
// `Send`, which should not be necessary because we are using the
@@ -143,7 +151,7 @@ async fn server(
let inspector_map = Arc::new(Mutex::new(inspector_map));
let inspector_map_ = inspector_map.clone();
- let mut register_inspector_handler = register_inspector_rx
+ let register_inspector_handler = register_inspector_rx
.map(|info| {
eprintln!(
"Debugger listening on {}",
@@ -157,7 +165,7 @@ async fn server(
.collect::<()>();
let inspector_map_ = inspector_map_.clone();
- let mut deregister_inspector_handler = future::poll_fn(|cx| {
+ let deregister_inspector_handler = future::poll_fn(|cx| {
let mut g = inspector_map_.lock().unwrap();
g.retain(|_, info| info.canary_rx.poll_unpin(cx) == Poll::Pending);
Poll::<Never>::Pending
@@ -208,8 +216,10 @@ async fn server(
let server_routes =
websocket_route.or(json_version_route).or(json_list_route);
- let mut server_handler = warp::serve(server_routes)
- .try_bind_ephemeral(host)
+ let server_handler = warp::serve(server_routes)
+ .try_bind_with_graceful_shutdown(host, async {
+ shutdown_server_rx.await.ok();
+ })
.map(|(_, fut)| fut)
.unwrap_or_else(|err| {
eprintln!("Cannot start inspector server: {}.", err);
@@ -217,10 +227,14 @@ async fn server(
})
.fuse();
+ pin_mut!(register_inspector_handler);
+ pin_mut!(deregister_inspector_handler);
+ pin_mut!(server_handler);
+
select! {
_ = register_inspector_handler => (),
_ = deregister_inspector_handler => unreachable!(),
- _ = server_handler => unreachable!(),
+ _ = server_handler => (),
}
}