diff options
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r-- | ext/http/http_next.rs | 261 |
1 files changed, 186 insertions, 75 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index efe1b88c9..7dbac6021 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -18,8 +18,8 @@ use crate::service::HttpServerState; use crate::service::SignallingRc; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; +use crate::Options; use cache_control::CacheControl; -use deno_core::error::AnyError; use deno_core::external; use deno_core::futures::future::poll_fn; use deno_core::futures::TryFutureExt; @@ -146,12 +146,32 @@ macro_rules! clone_external { }}; } +#[derive(Debug, thiserror::Error)] +pub enum HttpNextError { + #[error(transparent)] + Resource(deno_core::error::AnyError), + #[error("{0}")] + Io(#[from] io::Error), + #[error(transparent)] + WebSocketUpgrade(crate::websocket_upgrade::WebSocketUpgradeError), + #[error("{0}")] + Hyper(#[from] hyper::Error), + #[error(transparent)] + JoinError(#[from] tokio::task::JoinError), + #[error(transparent)] + Canceled(#[from] deno_core::Canceled), + #[error(transparent)] + HttpPropertyExtractor(deno_core::error::AnyError), + #[error(transparent)] + UpgradeUnavailable(#[from] crate::service::UpgradeUnavailableError), +} + #[op2(fast)] #[smi] pub fn op_http_upgrade_raw( state: &mut OpState, external: *const c_void, -) -> Result<ResourceId, AnyError> { +) -> Result<ResourceId, HttpNextError> { // SAFETY: external is deleted before calling this op. let http = unsafe { take_external!(external, "op_http_upgrade_raw") }; @@ -177,7 +197,7 @@ pub fn op_http_upgrade_raw( upgraded.write_all(&bytes).await?; break upgraded; } - Err(err) => return Err(err), + Err(err) => return Err(HttpNextError::WebSocketUpgrade(err)), } }; @@ -193,7 +213,7 @@ pub fn op_http_upgrade_raw( } read_tx.write_all(&buf[..read]).await?; } - Ok::<_, AnyError>(()) + Ok::<_, HttpNextError>(()) }); spawn(async move { let mut buf = [0; 1024]; @@ -204,7 +224,7 @@ pub fn op_http_upgrade_raw( } upgraded_tx.write_all(&buf[..read]).await?; } - Ok::<_, AnyError>(()) + Ok::<_, HttpNextError>(()) }); Ok(()) @@ -223,7 +243,7 @@ pub async fn op_http_upgrade_websocket_next( state: Rc<RefCell<OpState>>, external: *const c_void, #[serde] headers: Vec<(ByteString, ByteString)>, -) -> Result<ResourceId, AnyError> { +) -> Result<ResourceId, HttpNextError> { let http = // SAFETY: external is deleted before calling this op. unsafe { take_external!(external, "op_http_upgrade_websocket_next") }; @@ -246,7 +266,11 @@ pub async fn op_http_upgrade_websocket_next( // Stage 3: take the extracted raw network stream and upgrade it to a websocket, then return it let (stream, bytes) = extract_network_stream(upgraded); - ws_create_server_stream(&mut state.borrow_mut(), stream, bytes) + Ok(ws_create_server_stream( + &mut state.borrow_mut(), + stream, + bytes, + )) } #[op2(fast)] @@ -296,7 +320,7 @@ where let authority: v8::Local<v8::Value> = match request_properties.authority { Some(authority) => v8::String::new_from_utf8( scope, - authority.as_ref(), + authority.as_bytes(), v8::NewStringType::Normal, ) .unwrap() @@ -305,15 +329,25 @@ where }; // Only extract the path part - we handle authority elsewhere - let path = match &request_parts.uri.path_and_query() { - Some(path_and_query) => path_and_query.to_string(), - None => "".to_owned(), + let path = match request_parts.uri.path_and_query() { + Some(path_and_query) => { + let path = path_and_query.as_str(); + if matches!(path.as_bytes().first(), Some(b'/' | b'*')) { + Cow::Borrowed(path) + } else { + Cow::Owned(format!("/{}", path)) + } + } + None => Cow::Borrowed(""), }; - let path: v8::Local<v8::Value> = - v8::String::new_from_utf8(scope, path.as_ref(), v8::NewStringType::Normal) - .unwrap() - .into(); + let path: v8::Local<v8::Value> = v8::String::new_from_utf8( + scope, + path.as_bytes(), + v8::NewStringType::Normal, + ) + .unwrap() + .into(); let peer_address: v8::Local<v8::Value> = v8::String::new_from_utf8( scope, @@ -531,6 +565,7 @@ fn is_request_compressible( match accept_encoding.to_str() { // Firefox and Chrome send this -- no need to parse Ok("gzip, deflate, br") => return Compression::Brotli, + Ok("gzip, deflate, br, zstd") => return Compression::Brotli, Ok("gzip") => return Compression::GZip, Ok("br") => return Compression::Brotli, _ => (), @@ -667,6 +702,27 @@ fn set_response( http.complete(); } +#[op2(fast)] +pub fn op_http_get_request_cancelled(external: *const c_void) -> bool { + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_get_request_cancelled") }; + http.cancelled() +} + +#[op2(async)] +pub async fn op_http_request_on_cancel(external: *const c_void) { + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_request_on_cancel") }; + let (tx, rx) = tokio::sync::oneshot::channel(); + + http.on_cancel(tx); + drop(http); + + rx.await.ok(); +} + /// Returned promise resolves when body streaming finishes. /// Call [`op_http_close_after_finish`] when done with the external. #[op2(async)] @@ -676,7 +732,7 @@ pub async fn op_http_set_response_body_resource( #[smi] stream_rid: ResourceId, auto_close: bool, status: u16, -) -> Result<bool, AnyError> { +) -> Result<bool, HttpNextError> { let http = // SAFETY: op is called with external. unsafe { clone_external!(external, "op_http_set_response_body_resource") }; @@ -691,9 +747,15 @@ pub async fn op_http_set_response_body_resource( let resource = { let mut state = state.borrow_mut(); if auto_close { - state.resource_table.take_any(stream_rid)? + state + .resource_table + .take_any(stream_rid) + .map_err(HttpNextError::Resource)? } else { - state.resource_table.get_any(stream_rid)? + state + .resource_table + .get_any(stream_rid) + .map_err(HttpNextError::Resource)? } }; @@ -760,10 +822,16 @@ fn serve_http11_unconditional( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, + http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>, ) -> impl Future<Output = Result<(), hyper::Error>> + 'static { - let conn = http1::Builder::new() - .keep_alive(true) - .writev(*USE_WRITEV) + let mut builder = http1::Builder::new(); + builder.keep_alive(true).writev(*USE_WRITEV); + + if let Some(http1_builder_hook) = http1_builder_hook { + builder = http1_builder_hook(builder); + } + + let conn = builder .serve_connection(TokioIo::new(io), svc) .with_upgrades(); @@ -782,9 +850,17 @@ fn serve_http2_unconditional( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, + http2_builder_hook: Option< + fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>, + >, ) -> impl Future<Output = Result<(), hyper::Error>> + 'static { - let conn = - http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc); + let mut builder = http2::Builder::new(LocalExecutor); + + if let Some(http2_builder_hook) = http2_builder_hook { + builder = http2_builder_hook(builder); + } + + let conn = builder.serve_connection(TokioIo::new(io), svc); async { match conn.or_abort(cancel).await { Err(mut conn) => { @@ -800,17 +876,18 @@ async fn serve_http2_autodetect( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, -) -> Result<(), AnyError> { + options: Options, +) -> Result<(), HttpNextError> { let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX); let (matches, io) = prefix.match_prefix().await?; if matches { - serve_http2_unconditional(io, svc, cancel) + serve_http2_unconditional(io, svc, cancel, options.http2_builder_hook) .await - .map_err(|e| e.into()) + .map_err(HttpNextError::Hyper) } else { - serve_http11_unconditional(io, svc, cancel) + serve_http11_unconditional(io, svc, cancel, options.http1_builder_hook) .await - .map_err(|e| e.into()) + .map_err(HttpNextError::Hyper) } } @@ -819,7 +896,8 @@ fn serve_https( request_info: HttpConnectionProperties, lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>, -) -> JoinHandle<Result<(), AnyError>> { + options: Options, +) -> JoinHandle<Result<(), HttpNextError>> { let HttpLifetime { server_state, connection_cancel_handle, @@ -830,21 +908,31 @@ fn serve_https( handle_request(req, request_info.clone(), server_state.clone(), tx.clone()) }); spawn( - async { + async move { let handshake = io.handshake().await?; // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect // based on the prefix bytes let handshake = handshake.alpn; if Some(TLS_ALPN_HTTP_2) == handshake.as_deref() { - serve_http2_unconditional(io, svc, listen_cancel_handle) - .await - .map_err(|e| e.into()) + serve_http2_unconditional( + io, + svc, + listen_cancel_handle, + options.http2_builder_hook, + ) + .await + .map_err(HttpNextError::Hyper) } else if Some(TLS_ALPN_HTTP_11) == handshake.as_deref() { - serve_http11_unconditional(io, svc, listen_cancel_handle) - .await - .map_err(|e| e.into()) + serve_http11_unconditional( + io, + svc, + listen_cancel_handle, + options.http1_builder_hook, + ) + .await + .map_err(HttpNextError::Hyper) } else { - serve_http2_autodetect(io, svc, listen_cancel_handle).await + serve_http2_autodetect(io, svc, listen_cancel_handle, options).await } } .try_or_cancel(connection_cancel_handle), @@ -856,7 +944,8 @@ fn serve_http( request_info: HttpConnectionProperties, lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>, -) -> JoinHandle<Result<(), AnyError>> { + options: Options, +) -> JoinHandle<Result<(), HttpNextError>> { let HttpLifetime { server_state, connection_cancel_handle, @@ -867,7 +956,7 @@ fn serve_http( handle_request(req, request_info.clone(), server_state.clone(), tx.clone()) }); spawn( - serve_http2_autodetect(io, svc, listen_cancel_handle) + serve_http2_autodetect(io, svc, listen_cancel_handle, options) .try_or_cancel(connection_cancel_handle), ) } @@ -877,7 +966,8 @@ fn serve_http_on<HTTP>( listen_properties: &HttpListenProperties, lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>, -) -> JoinHandle<Result<(), AnyError>> + options: Options, +) -> JoinHandle<Result<(), HttpNextError>> where HTTP: HttpPropertyExtractor, { @@ -888,14 +978,14 @@ where match network_stream { NetworkStream::Tcp(conn) => { - serve_http(conn, connection_properties, lifetime, tx) + serve_http(conn, connection_properties, lifetime, tx, options) } NetworkStream::Tls(conn) => { - serve_https(conn, connection_properties, lifetime, tx) + serve_https(conn, connection_properties, lifetime, tx, options) } #[cfg(unix)] NetworkStream::Unix(conn) => { - serve_http(conn, connection_properties, lifetime, tx) + serve_http(conn, connection_properties, lifetime, tx, options) } } } @@ -908,7 +998,7 @@ struct HttpLifetime { } struct HttpJoinHandle { - join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>, + join_handle: AsyncRefCell<Option<JoinHandle<Result<(), HttpNextError>>>>, connection_cancel_handle: Rc<CancelHandle>, listen_cancel_handle: Rc<CancelHandle>, rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>, @@ -968,12 +1058,13 @@ impl Drop for HttpJoinHandle { pub fn op_http_serve<HTTP>( state: Rc<RefCell<OpState>>, #[smi] listener_rid: ResourceId, -) -> Result<(ResourceId, &'static str, String), AnyError> +) -> Result<(ResourceId, &'static str, String), HttpNextError> where HTTP: HttpPropertyExtractor, { let listener = - HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?; + HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid) + .map_err(HttpNextError::Resource)?; let listen_properties = HTTP::listen_properties_from_listener(&listener)?; @@ -983,21 +1074,28 @@ where let lifetime = resource.lifetime(); + let options = { + let state = state.borrow(); + *state.borrow::<Options>() + }; + let listen_properties_clone: HttpListenProperties = listen_properties.clone(); let handle = spawn(async move { loop { let conn = HTTP::accept_connection_from_listener(&listener) .try_or_cancel(listen_cancel_clone.clone()) - .await?; + .await + .map_err(HttpNextError::HttpPropertyExtractor)?; serve_http_on::<HTTP>( conn, &listen_properties_clone, lifetime.clone(), tx.clone(), + options, ); } #[allow(unreachable_code)] - Ok::<_, AnyError>(()) + Ok::<_, HttpNextError>(()) }); // Set the handle after we start the future @@ -1017,25 +1115,31 @@ where pub fn op_http_serve_on<HTTP>( state: Rc<RefCell<OpState>>, #[smi] connection_rid: ResourceId, -) -> Result<(ResourceId, &'static str, String), AnyError> +) -> Result<(ResourceId, &'static str, String), HttpNextError> where HTTP: HttpPropertyExtractor, { let connection = - HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?; + HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid) + .map_err(HttpNextError::Resource)?; let listen_properties = HTTP::listen_properties_from_connection(&connection)?; let (tx, rx) = tokio::sync::mpsc::channel(10); let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx)); - let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> = - serve_http_on::<HTTP>( - connection, - &listen_properties, - resource.lifetime(), - tx, - ); + let options = { + let state = state.borrow(); + *state.borrow::<Options>() + }; + + let handle = serve_http_on::<HTTP>( + connection, + &listen_properties, + resource.lifetime(), + tx, + options, + ); // Set the handle after we start the future *RcRef::map(&resource, |this| &this.join_handle) @@ -1081,12 +1185,13 @@ pub fn op_http_try_wait( pub async fn op_http_wait( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, -) -> Result<*const c_void, AnyError> { +) -> Result<*const c_void, HttpNextError> { // We will get the join handle initially, as we might be consuming requests still let join_handle = state .borrow_mut() .resource_table - .get::<HttpJoinHandle>(rid)?; + .get::<HttpJoinHandle>(rid) + .map_err(HttpNextError::Resource)?; let cancel = join_handle.listen_cancel_handle(); let next = async { @@ -1113,13 +1218,12 @@ pub async fn op_http_wait( // Filter out shutdown (ENOTCONN) errors if let Err(err) = res { - if let Some(err) = err.source() { - if let Some(err) = err.downcast_ref::<io::Error>() { - if err.kind() == io::ErrorKind::NotConnected { - return Ok(null()); - } + if let HttpNextError::Io(err) = &err { + if err.kind() == io::ErrorKind::NotConnected { + return Ok(null()); } } + return Err(err); } @@ -1132,7 +1236,7 @@ pub fn op_http_cancel( state: &mut OpState, #[smi] rid: ResourceId, graceful: bool, -) -> Result<(), AnyError> { +) -> Result<(), deno_core::error::AnyError> { let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?; if graceful { @@ -1152,11 +1256,12 @@ pub async fn op_http_close( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, graceful: bool, -) -> Result<(), AnyError> { +) -> Result<(), HttpNextError> { let join_handle = state .borrow_mut() .resource_table - .take::<HttpJoinHandle>(rid)?; + .take::<HttpJoinHandle>(rid) + .map_err(HttpNextError::Resource)?; if graceful { http_general_trace!("graceful shutdown"); @@ -1202,23 +1307,26 @@ impl UpgradeStream { } } - async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { + async fn read( + self: Rc<Self>, + buf: &mut [u8], + ) -> Result<usize, std::io::Error> { let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); async { let read = RcRef::map(self, |this| &this.read); let mut read = read.borrow_mut().await; - Ok(Pin::new(&mut *read).read(buf).await?) + Pin::new(&mut *read).read(buf).await } .try_or_cancel(cancel_handle) .await } - async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, std::io::Error> { let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); async { let write = RcRef::map(self, |this| &this.write); let mut write = write.borrow_mut().await; - Ok(Pin::new(&mut *write).write(buf).await?) + Pin::new(&mut *write).write(buf).await } .try_or_cancel(cancel_handle) .await @@ -1228,7 +1336,7 @@ impl UpgradeStream { self: Rc<Self>, buf1: &[u8], buf2: &[u8], - ) -> Result<usize, AnyError> { + ) -> Result<usize, std::io::Error> { let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await; let total = buf1.len() + buf2.len(); @@ -1281,9 +1389,12 @@ pub async fn op_raw_write_vectored( #[smi] rid: ResourceId, #[buffer] buf1: JsBuffer, #[buffer] buf2: JsBuffer, -) -> Result<usize, AnyError> { - let resource: Rc<UpgradeStream> = - state.borrow().resource_table.get::<UpgradeStream>(rid)?; +) -> Result<usize, HttpNextError> { + let resource: Rc<UpgradeStream> = state + .borrow() + .resource_table + .get::<UpgradeStream>(rid) + .map_err(HttpNextError::Resource)?; let nwritten = resource.write_vectored(&buf1, &buf2).await?; Ok(nwritten) } |