diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-12-23 16:46:09 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-23 15:46:09 +0000 |
commit | 36536c784ca981ae01d258d4239b2a362017d533 (patch) | |
tree | 7aa41e54ae7a088f5f494ceef2aab8c466cc8067 | |
parent | 140e8becd0cd3a0183c3f28f6ce3812ebcd76036 (diff) |
refactor: upgrade inspector server to Hyper 1.1 (#21599)
This commit rewrites "runtime/inspector_server.rs" to use Hyper 1.1.
Now "deno_runtime" crate depends on both Hyper 1.x and 0.y versions.
-rw-r--r-- | Cargo.lock | 32 | ||||
-rw-r--r-- | runtime/Cargo.toml | 5 | ||||
-rw-r--r-- | runtime/inspector_server.rs | 255 |
3 files changed, 183 insertions, 109 deletions
diff --git a/Cargo.lock b/Cargo.lock index a172e0c1e..42e21306f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -958,7 +958,7 @@ dependencies = [ "env_logger", "eszip", "fancy-regex", - "fastwebsockets", + "fastwebsockets 0.5.0", "flaky_test", "flate2", "fs3", @@ -1612,13 +1612,18 @@ dependencies = [ "deno_webstorage", "dlopen2", "encoding_rs", - "fastwebsockets", + "fastwebsockets 0.5.0", + "fastwebsockets 0.6.0", "filetime", "flate2", "fs3", "fwdansi", "http 0.2.11", + "http 1.0.0", + "http-body-util", "hyper 0.14.27", + "hyper 1.1.0", + "hyper-util", "libc", "log", "netif", @@ -1765,7 +1770,7 @@ dependencies = [ "deno_core", "deno_net", "deno_tls", - "fastwebsockets", + "fastwebsockets 0.5.0", "h2 0.3.22", "http 0.2.11", "hyper 0.14.27", @@ -2354,6 +2359,25 @@ dependencies = [ ] [[package]] +name = "fastwebsockets" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63dd7b57f9b33b1741fa631c9522eb35d43e96dcca4a6a91d5e4ca7c93acdc1" +dependencies = [ + "base64 0.21.5", + "http-body-util", + "hyper 1.1.0", + "hyper-util", + "pin-project", + "rand", + "sha1", + "simdutf8", + "thiserror", + "tokio", + "utf-8", +] + +[[package]] name = "fd-lock" version = "4.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -5958,7 +5982,7 @@ dependencies = [ "bytes", "console_static_text", "denokv_proto", - "fastwebsockets", + "fastwebsockets 0.5.0", "flate2", "futures", "glob", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 2e9fc4d7e..a00a4f669 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -98,10 +98,15 @@ fastwebsockets.workspace = true console_static_text.workspace = true dlopen2.workspace = true encoding_rs.workspace = true +fastwebsockets_06 = { package = "fastwebsockets", version = "0.6", features = ["upgrade"] } filetime = "0.2.16" fs3.workspace = true http.workspace = true +http-body-util = "0.1" +http_1 = { package = "http", version = "1.0" } hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] } +hyper-util = { version = "0.1", features = ["server", "server-auto"] } +hyper1 = { package = "hyper", version = "1.0.1", features = ["server"] } libc.workspace = true log.workspace = true netif = "0.1.6" diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs index 313b5fd51..a1864266a 100644 --- a/runtime/inspector_server.rs +++ b/runtime/inspector_server.rs @@ -7,7 +7,6 @@ use deno_core::futures::channel::mpsc::UnboundedReceiver; use deno_core::futures::channel::mpsc::UnboundedSender; use deno_core::futures::channel::oneshot; use deno_core::futures::future; -use deno_core::futures::future::Future; use deno_core::futures::prelude::*; use deno_core::futures::select; use deno_core::futures::stream::StreamExt; @@ -20,17 +19,20 @@ use deno_core::url::Url; use deno_core::InspectorMsg; use deno_core::InspectorSessionProxy; use deno_core::JsRuntime; -use fastwebsockets::Frame; -use fastwebsockets::OpCode; -use fastwebsockets::WebSocket; +use fastwebsockets_06::Frame; +use fastwebsockets_06::OpCode; +use fastwebsockets_06::WebSocket; +use hyper::body::Bytes; +use hyper_util::rt::TokioIo; use std::cell::RefCell; use std::collections::HashMap; -use std::convert::Infallible; use std::net::SocketAddr; use std::pin::pin; use std::process; use std::rc::Rc; use std::thread; +use tokio::net::TcpListener; +use tokio::sync::broadcast; use uuid::Uuid; /// Websocket server that is used to proxy connections from @@ -38,7 +40,7 @@ use uuid::Uuid; pub struct InspectorServer { pub host: SocketAddr, register_inspector_tx: UnboundedSender<InspectorInfo>, - shutdown_server_tx: Option<oneshot::Sender<()>>, + shutdown_server_tx: Option<broadcast::Sender<()>>, thread_handle: Option<thread::JoinHandle<()>>, } @@ -47,7 +49,7 @@ impl InspectorServer { let (register_inspector_tx, register_inspector_rx) = mpsc::unbounded::<InspectorInfo>(); - let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel(); + let (shutdown_server_tx, shutdown_server_rx) = broadcast::channel(1); let thread_handle = thread::spawn(move || { let rt = crate::tokio_util::create_basic_runtime(); @@ -101,26 +103,12 @@ impl Drop for InspectorServer { } } -// Needed so hyper can use non Send futures -#[derive(Clone)] -struct LocalExecutor; - -impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor -where - Fut: Future + 'static, - Fut::Output: 'static, -{ - fn execute(&self, fut: Fut) { - deno_core::unsync::spawn(fut); - } -} - fn handle_ws_request( - req: http::Request<hyper::Body>, + req: http_1::Request<hyper1::body::Incoming>, inspector_map_rc: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>, -) -> http::Result<http::Response<hyper::Body>> { +) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> { let (parts, body) = req.into_parts(); - let req = http::Request::from_parts(parts, ()); + let req = http_1::Request::from_parts(parts, ()); let maybe_uuid = req .uri() @@ -129,9 +117,9 @@ fn handle_ws_request( .and_then(|s| Uuid::parse_str(s).ok()); if maybe_uuid.is_none() { - return http::Response::builder() - .status(http::StatusCode::BAD_REQUEST) - .body("Malformed inspector UUID".into()); + return http_1::Response::builder() + .status(http_1::StatusCode::BAD_REQUEST) + .body(Box::new(Bytes::from("Malformed inspector UUID").into())); } // run in a block to not hold borrow to `inspector_map` for too long @@ -140,34 +128,47 @@ fn handle_ws_request( let maybe_inspector_info = inspector_map.get(&maybe_uuid.unwrap()); if maybe_inspector_info.is_none() { - return http::Response::builder() - .status(http::StatusCode::NOT_FOUND) - .body("Invalid inspector UUID".into()); + return http_1::Response::builder() + .status(http_1::StatusCode::NOT_FOUND) + .body(Box::new(Bytes::from("Invalid inspector UUID").into())); } let info = maybe_inspector_info.unwrap(); info.new_session_tx.clone() }; let (parts, _) = req.into_parts(); - let mut req = http::Request::from_parts(parts, body); - - let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) { - Ok(e) => e, + let mut req = http_1::Request::from_parts(parts, body); + + let (resp, fut) = match fastwebsockets_06::upgrade::upgrade(&mut req) { + Ok((resp, fut)) => { + let (parts, _body) = resp.into_parts(); + let resp = http_1::Response::from_parts( + parts, + Box::new(http_body_util::Full::new(Bytes::new())), + ); + (resp, fut) + } _ => { - return http::Response::builder() - .status(http::StatusCode::BAD_REQUEST) - .body("Not a valid Websocket Request".into()); + return http_1::Response::builder() + .status(http_1::StatusCode::BAD_REQUEST) + .body(Box::new( + Bytes::from("Not a valid Websocket Request").into(), + )); } }; // spawn a task that will wait for websocket connection and then pump messages between // the socket and inspector proxy spawn(async move { - let websocket = if let Ok(w) = fut.await { - w - } else { - eprintln!("Inspector server failed to upgrade to WS connection"); - return; + let websocket = match fut.await { + Ok(w) => w, + Err(err) => { + eprintln!( + "Inspector server failed to upgrade to WS connection: {:?}", + err + ); + return; + } }; // The 'outbound' channel carries messages sent to the websocket. @@ -191,31 +192,37 @@ fn handle_ws_request( fn handle_json_request( inspector_map: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>, host: Option<String>, -) -> http::Result<http::Response<hyper::Body>> { +) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> { let data = inspector_map .borrow() .values() .map(move |info| info.get_json_metadata(&host)) .collect::<Vec<_>>(); - http::Response::builder() - .status(http::StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") - .body(serde_json::to_string(&data).unwrap().into()) + let body: http_body_util::Full<Bytes> = + Bytes::from(serde_json::to_string(&data).unwrap()).into(); + http_1::Response::builder() + .status(http_1::StatusCode::OK) + .header(http_1::header::CONTENT_TYPE, "application/json") + .body(Box::new(body)) } fn handle_json_version_request( version_response: Value, -) -> http::Result<http::Response<hyper::Body>> { - http::Response::builder() - .status(http::StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") - .body(serde_json::to_string(&version_response).unwrap().into()) +) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> { + let body = Box::new(http_body_util::Full::from( + serde_json::to_string(&version_response).unwrap(), + )); + + http_1::Response::builder() + .status(http_1::StatusCode::OK) + .header(http_1::header::CONTENT_TYPE, "application/json") + .body(body) } async fn server( host: SocketAddr, register_inspector_rx: UnboundedReceiver<InspectorInfo>, - shutdown_server_rx: oneshot::Receiver<()>, + shutdown_server_rx: broadcast::Receiver<()>, name: &str, ) { let inspector_map_ = @@ -253,61 +260,99 @@ async fn server( "V8-Version": deno_core::v8_version(), }); - let make_svc = hyper::service::make_service_fn(|_| { - let inspector_map = Rc::clone(&inspector_map_); - let json_version_response = json_version_response.clone(); - - future::ok::<_, Infallible>(hyper::service::service_fn( - move |req: http::Request<hyper::Body>| { - future::ready({ - // If the host header can make a valid URL, use it - let host = req - .headers() - .get("host") - .and_then(|host| host.to_str().ok()) - .and_then(|host| Url::parse(&format!("http://{host}")).ok()) - .and_then(|url| match (url.host(), url.port()) { - (Some(host), Some(port)) => Some(format!("{host}:{port}")), - (Some(host), None) => Some(format!("{host}")), - _ => None, - }); - match (req.method(), req.uri().path()) { - (&http::Method::GET, path) if path.starts_with("/ws/") => { - handle_ws_request(req, Rc::clone(&inspector_map)) - } - (&http::Method::GET, "/json/version") => { - handle_json_version_request(json_version_response.clone()) + // Create the server manually so it can use the Local Executor + let listener = match TcpListener::bind(&host).await { + Ok(l) => l, + Err(err) => { + eprintln!("Cannot start inspector server: {:?}", err); + return; + } + }; + + let mut server_handler = pin!(deno_core::unsync::spawn(async move { + loop { + let mut rx = shutdown_server_rx.resubscribe(); + let mut shutdown_rx = pin!(rx.recv()); + let mut accept = pin!(listener.accept()); + + let stream = tokio::select! { + accept_result = &mut accept => { + match accept_result { + Ok((s, _)) => s, + Err(err) => { + eprintln!("Failed to accept inspector connection: {:?}", err); + continue; } - (&http::Method::GET, "/json") => { - handle_json_request(Rc::clone(&inspector_map), host) + } + }, + + _ = &mut shutdown_rx => { + break; + } + }; + let io = TokioIo::new(stream); + + let inspector_map = Rc::clone(&inspector_map_); + let json_version_response = json_version_response.clone(); + let mut shutdown_server_rx = shutdown_server_rx.resubscribe(); + + let service = hyper1::service::service_fn( + move |req: http_1::Request<hyper1::body::Incoming>| { + future::ready({ + // If the host header can make a valid URL, use it + let host = req + .headers() + .get("host") + .and_then(|host| host.to_str().ok()) + .and_then(|host| Url::parse(&format!("http://{host}")).ok()) + .and_then(|url| match (url.host(), url.port()) { + (Some(host), Some(port)) => Some(format!("{host}:{port}")), + (Some(host), None) => Some(format!("{host}")), + _ => None, + }); + match (req.method(), req.uri().path()) { + (&http_1::Method::GET, path) if path.starts_with("/ws/") => { + handle_ws_request(req, Rc::clone(&inspector_map)) + } + (&http_1::Method::GET, "/json/version") => { + handle_json_version_request(json_version_response.clone()) + } + (&http_1::Method::GET, "/json") => { + handle_json_request(Rc::clone(&inspector_map), host) + } + (&http_1::Method::GET, "/json/list") => { + handle_json_request(Rc::clone(&inspector_map), host) + } + _ => http_1::Response::builder() + .status(http_1::StatusCode::NOT_FOUND) + .body(Box::new(http_body_util::Full::new(Bytes::from( + "Not Found", + )))), } - (&http::Method::GET, "/json/list") => { - handle_json_request(Rc::clone(&inspector_map), host) + }) + }, + ); + + deno_core::unsync::spawn(async move { + let server = hyper1::server::conn::http1::Builder::new(); + + let mut conn = + pin!(server.serve_connection(io, service).with_upgrades()); + let mut shutdown_rx = pin!(shutdown_server_rx.recv()); + + tokio::select! { + result = conn.as_mut() => { + if let Err(err) = result { + eprintln!("Failed to serve connection: {:?}", err); } - _ => http::Response::builder() - .status(http::StatusCode::NOT_FOUND) - .body("Not Found".into()), + }, + _ = &mut shutdown_rx => { + conn.as_mut().graceful_shutdown(); + let _ = conn.await; } - }) - }, - )) - }); - - // Create the server manually so it can use the Local Executor - let mut server_handler = pin!(hyper::server::Builder::new( - hyper::server::conn::AddrIncoming::bind(&host).unwrap_or_else(|e| { - eprintln!("Cannot start inspector server: {e}."); - process::exit(1); - }), - hyper::server::conn::Http::new().with_executor(LocalExecutor), - ) - .serve(make_svc) - .with_graceful_shutdown(async { - shutdown_server_rx.await.ok(); - }) - .unwrap_or_else(|err| { - eprintln!("Cannot start inspector server: {err}."); - process::exit(1); + } + }); + } }) .fuse()); @@ -331,7 +376,7 @@ async fn server( /// 'futures' crate, therefore they can't participate in Tokio's cooperative /// task yielding. async fn pump_websocket_messages( - mut websocket: WebSocket<hyper::upgrade::Upgraded>, + mut websocket: WebSocket<TokioIo<hyper1::upgrade::Upgraded>>, inbound_tx: UnboundedSender<String>, mut outbound_rx: UnboundedReceiver<InspectorMsg>, ) { |