diff options
Diffstat (limited to 'runtime/inspector.rs')
-rw-r--r-- | runtime/inspector.rs | 228 |
1 files changed, 152 insertions, 76 deletions
diff --git a/runtime/inspector.rs b/runtime/inspector.rs index fc0e793d9..58df05c54 100644 --- a/runtime/inspector.rs +++ b/runtime/inspector.rs @@ -11,7 +11,7 @@ use deno_core::futures::channel::mpsc; use deno_core::futures::channel::mpsc::UnboundedReceiver; use deno_core::futures::channel::mpsc::UnboundedSender; use deno_core::futures::channel::oneshot; -use deno_core::futures::future::Future; +use deno_core::futures::future::{self, Future}; use deno_core::futures::pin_mut; use deno_core::futures::prelude::*; use deno_core::futures::select; @@ -23,8 +23,6 @@ use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; use deno_core::v8; -use std::cell::BorrowMutError; -use std::cell::RefCell; use std::collections::HashMap; use std::ffi::c_void; use std::mem::replace; @@ -40,9 +38,10 @@ use std::ptr::NonNull; use std::sync::Arc; use std::sync::Mutex; use std::thread; +use std::{cell::BorrowMutError, convert::Infallible}; +use std::{cell::RefCell, rc::Rc}; +use tokio_tungstenite::tungstenite; use uuid::Uuid; -use warp::filters::ws; -use warp::Filter; pub struct InspectorServer { pub host: SocketAddr, @@ -59,12 +58,12 @@ impl InspectorServer { let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel(); let thread_handle = thread::spawn(move || { - crate::tokio_util::run_basic(server( - host, - register_inspector_rx, - shutdown_server_rx, - name, - )) + let mut rt = crate::tokio_util::create_basic_runtime(); + let local = tokio::task::LocalSet::new(); + local.block_on( + &mut rt, + server(host, register_inspector_rx, shutdown_server_rx, name), + ) }); Self { @@ -142,95 +141,172 @@ impl InspectorInfo { } } +// Needed so hyper can use non Send futures +#[derive(Clone)] +struct LocalExecutor; + +impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn_local(fut); + } +} + +fn handle_ws_request( + req: http::Request<hyper::Body>, + inspector_map: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>, +) -> http::Result<http::Response<hyper::Body>> { + let (parts, body) = req.into_parts(); + let req = http::Request::from_parts(parts, ()); + + if let Some(new_websocket_tx) = req + .uri() + .path() + .strip_prefix("/ws/") + .and_then(|s| Uuid::parse_str(s).ok()) + .and_then(|uuid| { + inspector_map + .borrow() + .get(&uuid) + .map(|info| info.new_websocket_tx.clone()) + }) + { + let resp = tungstenite::handshake::server::create_response(&req) + .map(|resp| resp.map(|_| hyper::Body::empty())) + .or_else(|e| match e { + tungstenite::error::Error::HttpFormat(http_error) => Err(http_error), + _ => http::Response::builder() + .status(http::StatusCode::BAD_REQUEST) + .body("Not a valid Websocket Request".into()), + }); + tokio::task::spawn_local(async move { + let upgraded = body.on_upgrade().await.unwrap(); + let websocket = tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + tungstenite::protocol::Role::Server, + None, + ) + .await; + let (proxy, pump) = create_websocket_proxy(websocket); + + let _ = new_websocket_tx.unbounded_send(proxy); + pump.await; + }); + + resp + } else { + http::Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body("No Valid inspector".into()) + } +} + +fn handle_json_request( + inspector_map: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>, +) -> http::Result<http::Response<hyper::Body>> { + let data = inspector_map + .borrow() + .values() + .map(|info| info.get_json_metadata()) + .collect::<Vec<_>>(); + http::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&data).unwrap().into()) +} + +fn handle_json_version_request( + version_response: Value, +) -> http::Result<http::Response<hyper::Body>> { + http::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&version_response).unwrap().into()) +} + async fn server( host: SocketAddr, register_inspector_rx: UnboundedReceiver<InspectorInfo>, shutdown_server_rx: oneshot::Receiver<()>, name: String, ) { - // TODO: put the `inspector_map` in an `Rc<RefCell<_>>` instead. This is - // currently not possible because warp requires all filters to implement - // `Send`, which should not be necessary because we are using the - // single-threaded Tokio runtime. - let inspector_map = HashMap::<Uuid, InspectorInfo>::new(); - let inspector_map = Arc::new(Mutex::new(inspector_map)); - - let inspector_map_ = inspector_map.clone(); + let inspector_map_ = + Rc::new(RefCell::new(HashMap::<Uuid, InspectorInfo>::new())); + + let inspector_map = Rc::clone(&inspector_map_); let register_inspector_handler = register_inspector_rx .map(|info| { eprintln!( "Debugger listening on {}", info.get_websocket_debugger_url() ); - let mut g = inspector_map_.lock().unwrap(); - if g.insert(info.uuid, info).is_some() { + if inspector_map.borrow_mut().insert(info.uuid, info).is_some() { panic!("Inspector UUID already in map"); } }) .collect::<()>(); - let inspector_map_ = inspector_map_.clone(); + let inspector_map = Rc::clone(&inspector_map_); let deregister_inspector_handler = future::poll_fn(|cx| { - let mut g = inspector_map_.lock().unwrap(); - g.retain(|_, info| info.canary_rx.poll_unpin(cx) == Poll::Pending); + inspector_map + .borrow_mut() + .retain(|_, info| info.canary_rx.poll_unpin(cx) == Poll::Pending); Poll::<Never>::Pending }) .fuse(); - let inspector_map_ = inspector_map.clone(); - let websocket_route = warp::path("ws") - .and(warp::path::param()) - .and(warp::ws()) - .and_then(move |uuid: String, ws: warp::ws::Ws| { - future::ready( - Uuid::parse_str(&uuid) - .ok() - .and_then(|uuid| { - let g = inspector_map_.lock().unwrap(); - g.get(&uuid).map(|info| info.new_websocket_tx.clone()).map( - |new_websocket_tx| { - ws.on_upgrade(move |websocket| async move { - let (proxy, pump) = create_websocket_proxy(websocket); - let _ = new_websocket_tx.unbounded_send(proxy); - pump.await; - }) - }, - ) - }) - .ok_or_else(warp::reject::not_found), - ) - }); - let json_version_response = json!({ "Browser": name, "Protocol-Version": "1.3", "V8-Version": deno_core::v8_version(), }); - let json_version_route = warp::path!("json" / "version") - .map(move || warp::reply::json(&json_version_response)); - - let inspector_map_ = inspector_map.clone(); - let json_list_route = warp::path("json").map(move || { - let g = inspector_map_.lock().unwrap(); - let json_values = g - .values() - .map(|info| info.get_json_metadata()) - .collect::<Vec<_>>(); - warp::reply::json(&json!(json_values)) + + let make_svc = hyper::service::make_service_fn(|_| { + let inspector_map = Rc::clone(&inspector_map_); + let json_version_response = json_version_response.clone(); + + future::ok::<_, Infallible>(hyper::service::service_fn( + move |req: http::Request<hyper::Body>| { + future::ready({ + match (req.method(), req.uri().path()) { + (&http::Method::GET, path) if path.starts_with("/ws/") => { + handle_ws_request(req, inspector_map.clone()) + } + (&http::Method::GET, "/json") => { + handle_json_request(inspector_map.clone()) + } + (&http::Method::GET, "/json/version") => { + handle_json_version_request(json_version_response.clone()) + } + _ => http::Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body("Not Found".into()), + } + }) + }, + )) }); - let server_routes = - websocket_route.or(json_version_route).or(json_list_route); - let server_handler = warp::serve(server_routes) - .try_bind_with_graceful_shutdown(host, async { - shutdown_server_rx.await.ok(); - }) - .map(|(_, fut)| fut) - .unwrap_or_else(|err| { - eprintln!("Cannot start inspector server: {}.", err); + // Create the server manually so it can use the Local Executor + let server_handler = hyper::server::Builder::new( + hyper::server::conn::AddrIncoming::bind(&host).unwrap_or_else(|e| { + eprintln!("Cannot start inspector server: {}.", e); process::exit(1); - }) - .fuse(); + }), + hyper::server::conn::Http::new().with_executor(LocalExecutor), + ) + .serve(make_svc) + .with_graceful_shutdown(async { + shutdown_server_rx.await.ok(); + }) + .unwrap_or_else(|err| { + eprintln!("Cannot start inspector server: {}.", err); + process::exit(1); + }) + .fuse(); pin_mut!(register_inspector_handler); pin_mut!(deregister_inspector_handler); @@ -243,9 +319,9 @@ async fn server( } } -type WebSocketProxySender = UnboundedSender<ws::Message>; +type WebSocketProxySender = UnboundedSender<tungstenite::Message>; type WebSocketProxyReceiver = - UnboundedReceiver<Result<ws::Message, warp::Error>>; + UnboundedReceiver<Result<tungstenite::Message, tungstenite::Error>>; /// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form /// a duplex channel for sending/receiving websocket messages. @@ -273,7 +349,7 @@ impl WebSocketProxy { /// be used to send/receive messages on the websocket, and the second element /// is a future that does the forwarding. fn create_websocket_proxy( - websocket: ws::WebSocket, + websocket: tokio_tungstenite::WebSocketStream<hyper::upgrade::Upgraded>, ) -> (WebSocketProxy, impl Future<Output = ()> + Send) { // The 'outbound' channel carries messages sent to the websocket. let (outbound_tx, outbound_rx) = mpsc::unbounded(); @@ -759,8 +835,8 @@ impl DenoInspectorSession { let result = websocket_rx .map_ok(move |msg| { - let msg = msg.as_bytes(); - let msg = v8::inspector::StringView::from(msg); + let msg = msg.into_data(); + let msg = v8::inspector::StringView::from(msg.as_slice()); unsafe { &mut *self_ptr }.dispatch_protocol_message(msg); }) .try_collect::<()>() @@ -776,7 +852,7 @@ impl DenoInspectorSession { fn send_to_websocket(&self, msg: v8::UniquePtr<v8::inspector::StringBuffer>) { let msg = msg.unwrap().string().to_string(); - let msg = ws::Message::text(msg); + let msg = tungstenite::Message::text(msg); let _ = self.websocket_tx.unbounded_send(msg); } |