summaryrefslogtreecommitdiff
path: root/runtime/inspector_server.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-12-23 16:46:09 +0100
committerGitHub <noreply@github.com>2023-12-23 15:46:09 +0000
commit36536c784ca981ae01d258d4239b2a362017d533 (patch)
tree7aa41e54ae7a088f5f494ceef2aab8c466cc8067 /runtime/inspector_server.rs
parent140e8becd0cd3a0183c3f28f6ce3812ebcd76036 (diff)
refactor: upgrade inspector server to Hyper 1.1 (#21599)
This commit rewrites "runtime/inspector_server.rs" to use Hyper 1.1. Now "deno_runtime" crate depends on both Hyper 1.x and 0.y versions.
Diffstat (limited to 'runtime/inspector_server.rs')
-rw-r--r--runtime/inspector_server.rs255
1 files changed, 150 insertions, 105 deletions
diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs
index 313b5fd51..a1864266a 100644
--- a/runtime/inspector_server.rs
+++ b/runtime/inspector_server.rs
@@ -7,7 +7,6 @@ use deno_core::futures::channel::mpsc::UnboundedReceiver;
use deno_core::futures::channel::mpsc::UnboundedSender;
use deno_core::futures::channel::oneshot;
use deno_core::futures::future;
-use deno_core::futures::future::Future;
use deno_core::futures::prelude::*;
use deno_core::futures::select;
use deno_core::futures::stream::StreamExt;
@@ -20,17 +19,20 @@ use deno_core::url::Url;
use deno_core::InspectorMsg;
use deno_core::InspectorSessionProxy;
use deno_core::JsRuntime;
-use fastwebsockets::Frame;
-use fastwebsockets::OpCode;
-use fastwebsockets::WebSocket;
+use fastwebsockets_06::Frame;
+use fastwebsockets_06::OpCode;
+use fastwebsockets_06::WebSocket;
+use hyper::body::Bytes;
+use hyper_util::rt::TokioIo;
use std::cell::RefCell;
use std::collections::HashMap;
-use std::convert::Infallible;
use std::net::SocketAddr;
use std::pin::pin;
use std::process;
use std::rc::Rc;
use std::thread;
+use tokio::net::TcpListener;
+use tokio::sync::broadcast;
use uuid::Uuid;
/// Websocket server that is used to proxy connections from
@@ -38,7 +40,7 @@ use uuid::Uuid;
pub struct InspectorServer {
pub host: SocketAddr,
register_inspector_tx: UnboundedSender<InspectorInfo>,
- shutdown_server_tx: Option<oneshot::Sender<()>>,
+ shutdown_server_tx: Option<broadcast::Sender<()>>,
thread_handle: Option<thread::JoinHandle<()>>,
}
@@ -47,7 +49,7 @@ impl InspectorServer {
let (register_inspector_tx, register_inspector_rx) =
mpsc::unbounded::<InspectorInfo>();
- let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel();
+ let (shutdown_server_tx, shutdown_server_rx) = broadcast::channel(1);
let thread_handle = thread::spawn(move || {
let rt = crate::tokio_util::create_basic_runtime();
@@ -101,26 +103,12 @@ impl Drop for InspectorServer {
}
}
-// Needed so hyper can use non Send futures
-#[derive(Clone)]
-struct LocalExecutor;
-
-impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
-where
- Fut: Future + 'static,
- Fut::Output: 'static,
-{
- fn execute(&self, fut: Fut) {
- deno_core::unsync::spawn(fut);
- }
-}
-
fn handle_ws_request(
- req: http::Request<hyper::Body>,
+ req: http_1::Request<hyper1::body::Incoming>,
inspector_map_rc: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
-) -> http::Result<http::Response<hyper::Body>> {
+) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> {
let (parts, body) = req.into_parts();
- let req = http::Request::from_parts(parts, ());
+ let req = http_1::Request::from_parts(parts, ());
let maybe_uuid = req
.uri()
@@ -129,9 +117,9 @@ fn handle_ws_request(
.and_then(|s| Uuid::parse_str(s).ok());
if maybe_uuid.is_none() {
- return http::Response::builder()
- .status(http::StatusCode::BAD_REQUEST)
- .body("Malformed inspector UUID".into());
+ return http_1::Response::builder()
+ .status(http_1::StatusCode::BAD_REQUEST)
+ .body(Box::new(Bytes::from("Malformed inspector UUID").into()));
}
// run in a block to not hold borrow to `inspector_map` for too long
@@ -140,34 +128,47 @@ fn handle_ws_request(
let maybe_inspector_info = inspector_map.get(&maybe_uuid.unwrap());
if maybe_inspector_info.is_none() {
- return http::Response::builder()
- .status(http::StatusCode::NOT_FOUND)
- .body("Invalid inspector UUID".into());
+ return http_1::Response::builder()
+ .status(http_1::StatusCode::NOT_FOUND)
+ .body(Box::new(Bytes::from("Invalid inspector UUID").into()));
}
let info = maybe_inspector_info.unwrap();
info.new_session_tx.clone()
};
let (parts, _) = req.into_parts();
- let mut req = http::Request::from_parts(parts, body);
-
- let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) {
- Ok(e) => e,
+ let mut req = http_1::Request::from_parts(parts, body);
+
+ let (resp, fut) = match fastwebsockets_06::upgrade::upgrade(&mut req) {
+ Ok((resp, fut)) => {
+ let (parts, _body) = resp.into_parts();
+ let resp = http_1::Response::from_parts(
+ parts,
+ Box::new(http_body_util::Full::new(Bytes::new())),
+ );
+ (resp, fut)
+ }
_ => {
- return http::Response::builder()
- .status(http::StatusCode::BAD_REQUEST)
- .body("Not a valid Websocket Request".into());
+ return http_1::Response::builder()
+ .status(http_1::StatusCode::BAD_REQUEST)
+ .body(Box::new(
+ Bytes::from("Not a valid Websocket Request").into(),
+ ));
}
};
// spawn a task that will wait for websocket connection and then pump messages between
// the socket and inspector proxy
spawn(async move {
- let websocket = if let Ok(w) = fut.await {
- w
- } else {
- eprintln!("Inspector server failed to upgrade to WS connection");
- return;
+ let websocket = match fut.await {
+ Ok(w) => w,
+ Err(err) => {
+ eprintln!(
+ "Inspector server failed to upgrade to WS connection: {:?}",
+ err
+ );
+ return;
+ }
};
// The 'outbound' channel carries messages sent to the websocket.
@@ -191,31 +192,37 @@ fn handle_ws_request(
fn handle_json_request(
inspector_map: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
host: Option<String>,
-) -> http::Result<http::Response<hyper::Body>> {
+) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> {
let data = inspector_map
.borrow()
.values()
.map(move |info| info.get_json_metadata(&host))
.collect::<Vec<_>>();
- http::Response::builder()
- .status(http::StatusCode::OK)
- .header(http::header::CONTENT_TYPE, "application/json")
- .body(serde_json::to_string(&data).unwrap().into())
+ let body: http_body_util::Full<Bytes> =
+ Bytes::from(serde_json::to_string(&data).unwrap()).into();
+ http_1::Response::builder()
+ .status(http_1::StatusCode::OK)
+ .header(http_1::header::CONTENT_TYPE, "application/json")
+ .body(Box::new(body))
}
fn handle_json_version_request(
version_response: Value,
-) -> http::Result<http::Response<hyper::Body>> {
- http::Response::builder()
- .status(http::StatusCode::OK)
- .header(http::header::CONTENT_TYPE, "application/json")
- .body(serde_json::to_string(&version_response).unwrap().into())
+) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> {
+ let body = Box::new(http_body_util::Full::from(
+ serde_json::to_string(&version_response).unwrap(),
+ ));
+
+ http_1::Response::builder()
+ .status(http_1::StatusCode::OK)
+ .header(http_1::header::CONTENT_TYPE, "application/json")
+ .body(body)
}
async fn server(
host: SocketAddr,
register_inspector_rx: UnboundedReceiver<InspectorInfo>,
- shutdown_server_rx: oneshot::Receiver<()>,
+ shutdown_server_rx: broadcast::Receiver<()>,
name: &str,
) {
let inspector_map_ =
@@ -253,61 +260,99 @@ async fn server(
"V8-Version": deno_core::v8_version(),
});
- let make_svc = hyper::service::make_service_fn(|_| {
- let inspector_map = Rc::clone(&inspector_map_);
- let json_version_response = json_version_response.clone();
-
- future::ok::<_, Infallible>(hyper::service::service_fn(
- move |req: http::Request<hyper::Body>| {
- future::ready({
- // If the host header can make a valid URL, use it
- let host = req
- .headers()
- .get("host")
- .and_then(|host| host.to_str().ok())
- .and_then(|host| Url::parse(&format!("http://{host}")).ok())
- .and_then(|url| match (url.host(), url.port()) {
- (Some(host), Some(port)) => Some(format!("{host}:{port}")),
- (Some(host), None) => Some(format!("{host}")),
- _ => None,
- });
- match (req.method(), req.uri().path()) {
- (&http::Method::GET, path) if path.starts_with("/ws/") => {
- handle_ws_request(req, Rc::clone(&inspector_map))
- }
- (&http::Method::GET, "/json/version") => {
- handle_json_version_request(json_version_response.clone())
+ // Create the server manually so it can use the Local Executor
+ let listener = match TcpListener::bind(&host).await {
+ Ok(l) => l,
+ Err(err) => {
+ eprintln!("Cannot start inspector server: {:?}", err);
+ return;
+ }
+ };
+
+ let mut server_handler = pin!(deno_core::unsync::spawn(async move {
+ loop {
+ let mut rx = shutdown_server_rx.resubscribe();
+ let mut shutdown_rx = pin!(rx.recv());
+ let mut accept = pin!(listener.accept());
+
+ let stream = tokio::select! {
+ accept_result = &mut accept => {
+ match accept_result {
+ Ok((s, _)) => s,
+ Err(err) => {
+ eprintln!("Failed to accept inspector connection: {:?}", err);
+ continue;
}
- (&http::Method::GET, "/json") => {
- handle_json_request(Rc::clone(&inspector_map), host)
+ }
+ },
+
+ _ = &mut shutdown_rx => {
+ break;
+ }
+ };
+ let io = TokioIo::new(stream);
+
+ let inspector_map = Rc::clone(&inspector_map_);
+ let json_version_response = json_version_response.clone();
+ let mut shutdown_server_rx = shutdown_server_rx.resubscribe();
+
+ let service = hyper1::service::service_fn(
+ move |req: http_1::Request<hyper1::body::Incoming>| {
+ future::ready({
+ // If the host header can make a valid URL, use it
+ let host = req
+ .headers()
+ .get("host")
+ .and_then(|host| host.to_str().ok())
+ .and_then(|host| Url::parse(&format!("http://{host}")).ok())
+ .and_then(|url| match (url.host(), url.port()) {
+ (Some(host), Some(port)) => Some(format!("{host}:{port}")),
+ (Some(host), None) => Some(format!("{host}")),
+ _ => None,
+ });
+ match (req.method(), req.uri().path()) {
+ (&http_1::Method::GET, path) if path.starts_with("/ws/") => {
+ handle_ws_request(req, Rc::clone(&inspector_map))
+ }
+ (&http_1::Method::GET, "/json/version") => {
+ handle_json_version_request(json_version_response.clone())
+ }
+ (&http_1::Method::GET, "/json") => {
+ handle_json_request(Rc::clone(&inspector_map), host)
+ }
+ (&http_1::Method::GET, "/json/list") => {
+ handle_json_request(Rc::clone(&inspector_map), host)
+ }
+ _ => http_1::Response::builder()
+ .status(http_1::StatusCode::NOT_FOUND)
+ .body(Box::new(http_body_util::Full::new(Bytes::from(
+ "Not Found",
+ )))),
}
- (&http::Method::GET, "/json/list") => {
- handle_json_request(Rc::clone(&inspector_map), host)
+ })
+ },
+ );
+
+ deno_core::unsync::spawn(async move {
+ let server = hyper1::server::conn::http1::Builder::new();
+
+ let mut conn =
+ pin!(server.serve_connection(io, service).with_upgrades());
+ let mut shutdown_rx = pin!(shutdown_server_rx.recv());
+
+ tokio::select! {
+ result = conn.as_mut() => {
+ if let Err(err) = result {
+ eprintln!("Failed to serve connection: {:?}", err);
}
- _ => http::Response::builder()
- .status(http::StatusCode::NOT_FOUND)
- .body("Not Found".into()),
+ },
+ _ = &mut shutdown_rx => {
+ conn.as_mut().graceful_shutdown();
+ let _ = conn.await;
}
- })
- },
- ))
- });
-
- // Create the server manually so it can use the Local Executor
- let mut server_handler = pin!(hyper::server::Builder::new(
- hyper::server::conn::AddrIncoming::bind(&host).unwrap_or_else(|e| {
- eprintln!("Cannot start inspector server: {e}.");
- process::exit(1);
- }),
- hyper::server::conn::Http::new().with_executor(LocalExecutor),
- )
- .serve(make_svc)
- .with_graceful_shutdown(async {
- shutdown_server_rx.await.ok();
- })
- .unwrap_or_else(|err| {
- eprintln!("Cannot start inspector server: {err}.");
- process::exit(1);
+ }
+ });
+ }
})
.fuse());
@@ -331,7 +376,7 @@ async fn server(
/// 'futures' crate, therefore they can't participate in Tokio's cooperative
/// task yielding.
async fn pump_websocket_messages(
- mut websocket: WebSocket<hyper::upgrade::Upgraded>,
+ mut websocket: WebSocket<TokioIo<hyper1::upgrade::Upgraded>>,
inbound_tx: UnboundedSender<String>,
mut outbound_rx: UnboundedReceiver<InspectorMsg>,
) {