From d047cab14b754d20a43c7119e327b451440aaed9 Mon Sep 17 00:00:00 2001 From: Leo Kettmeir Date: Fri, 18 Oct 2024 12:30:46 -0700 Subject: refactor(ext/websocket): use concrete error type (#26226) --- ext/http/lib.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'ext/http/lib.rs') diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 934f8a002..5461713aa 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1053,9 +1053,11 @@ async fn op_http_upgrade_websocket( let (transport, bytes) = extract_network_stream(hyper_v014::upgrade::on(request).await?); - let ws_rid = - ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?; - Ok(ws_rid) + Ok(ws_create_server_stream( + &mut state.borrow_mut(), + transport, + bytes, + )) } // Needed so hyper can use non Send futures -- cgit v1.2.3 From 2c3900370ac3e0b62f1e0dfb86a883c75952146d Mon Sep 17 00:00:00 2001 From: Leo Kettmeir Date: Fri, 18 Oct 2024 15:57:12 -0700 Subject: refactor(ext/http): use concrete error types (#26377) --- ext/http/lib.rs | 145 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 97 insertions(+), 48 deletions(-) (limited to 'ext/http/lib.rs') diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 5461713aa..6243804a1 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -6,8 +6,6 @@ use async_compression::Level; use base64::prelude::BASE64_STANDARD; use base64::Engine; use cache_control::CacheControl; -use deno_core::error::custom_error; -use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; use deno_core::futures::channel::oneshot; use deno_core::futures::future::pending; @@ -89,11 +87,14 @@ mod service; mod websocket_upgrade; use fly_accept_encoding::Encoding; +pub use http_next::HttpNextError; pub use request_properties::DefaultHttpPropertyExtractor; pub use request_properties::HttpConnectionProperties; pub use request_properties::HttpListenProperties; pub use request_properties::HttpPropertyExtractor; pub use request_properties::HttpRequestProperties; +pub use service::UpgradeUnavailableError; +pub use websocket_upgrade::WebSocketUpgradeError; deno_core::extension!( deno_http, @@ -134,6 +135,38 @@ deno_core::extension!( esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"], ); +#[derive(Debug, thiserror::Error)] +pub enum HttpError { + #[error(transparent)] + Resource(deno_core::error::AnyError), + #[error(transparent)] + Canceled(#[from] deno_core::Canceled), + #[error("{0}")] + HyperV014(#[source] Arc), + #[error("{0}")] + InvalidHeaderName(#[from] hyper_v014::header::InvalidHeaderName), + #[error("{0}")] + InvalidHeaderValue(#[from] hyper_v014::header::InvalidHeaderValue), + #[error("{0}")] + Http(#[from] hyper_v014::http::Error), + #[error("response headers already sent")] + ResponseHeadersAlreadySent, + #[error("connection closed while sending response")] + ConnectionClosedWhileSendingResponse, + #[error("already in use")] + AlreadyInUse, + #[error("{0}")] + Io(#[from] std::io::Error), + #[error("no response headers")] + NoResponseHeaders, + #[error("response already completed")] + ResponseAlreadyCompleted, + #[error("cannot upgrade because request body was used")] + UpgradeBodyUsed, + #[error(transparent)] + Other(deno_core::error::AnyError), +} + pub enum HttpSocketAddr { IpSocket(std::net::SocketAddr), #[cfg(unix)] @@ -216,7 +249,7 @@ impl HttpConnResource { String, String, )>, - AnyError, + HttpError, > { let fut = async { let (request_tx, request_rx) = oneshot::channel(); @@ -259,8 +292,8 @@ impl HttpConnResource { } /// A future that completes when this HTTP connection is closed or errors. - async fn closed(&self) -> Result<(), AnyError> { - self.closed_fut.clone().map_err(AnyError::from).await + async fn closed(&self) -> Result<(), HttpError> { + self.closed_fut.clone().map_err(HttpError::HyperV014).await } } @@ -280,14 +313,13 @@ pub fn http_create_conn_resource( io: S, addr: A, scheme: &'static str, -) -> Result +) -> ResourceId where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, A: Into, { let conn = HttpConnResource::new(io, scheme, addr.into()); - let rid = state.resource_table.add(conn); - Ok(rid) + state.resource_table.add(conn) } /// An object that implements the `hyper::Service` trait, through which Hyper @@ -423,7 +455,9 @@ impl Resource for HttpStreamReadResource { // safely call `await` on it without creating a race condition. Some(_) => match body.as_mut().next().await.unwrap() { Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), + Err(err) => { + break Err(HttpError::HyperV014(Arc::new(err)).into()) + } }, None => break Ok(BufView::empty()), } @@ -545,8 +579,12 @@ struct NextRequestResponse( async fn op_http_accept( state: Rc>, #[smi] rid: ResourceId, -) -> Result, AnyError> { - let conn = state.borrow().resource_table.get::(rid)?; +) -> Result, HttpError> { + let conn = state + .borrow() + .resource_table + .get::(rid) + .map_err(HttpError::Resource)?; match conn.accept().await { Ok(Some((read_stream, write_stream, method, url))) => { @@ -657,11 +695,12 @@ async fn op_http_write_headers( #[smi] status: u16, #[serde] headers: Vec<(ByteString, ByteString)>, #[serde] data: Option, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let stream = state .borrow_mut() .resource_table - .get::(rid)?; + .get::(rid) + .map_err(HttpError::Resource)?; // Track supported encoding let encoding = stream.accept_encoding; @@ -708,14 +747,14 @@ async fn op_http_write_headers( let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let response_tx = match replace(&mut *old_wr, new_wr) { HttpResponseWriter::Headers(response_tx) => response_tx, - _ => return Err(http_error("response headers already sent")), + _ => return Err(HttpError::ResponseHeadersAlreadySent), }; match response_tx.send(body) { Ok(_) => Ok(()), Err(_) => { stream.conn.closed().await?; - Err(http_error("connection closed while sending response")) + Err(HttpError::ConnectionClosedWhileSendingResponse) } } } @@ -725,11 +764,14 @@ async fn op_http_write_headers( fn op_http_headers( state: &mut OpState, #[smi] rid: u32, -) -> Result, AnyError> { - let stream = state.resource_table.get::(rid)?; +) -> Result, HttpError> { + let stream = state + .resource_table + .get::(rid) + .map_err(HttpError::Resource)?; let rd = RcRef::map(&stream, |r| &r.rd) .try_borrow() - .ok_or_else(|| http_error("already in use"))?; + .ok_or(HttpError::AlreadyInUse)?; match &*rd { HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())), HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)), @@ -741,7 +783,7 @@ fn http_response( data: Option, compressing: bool, encoding: Encoding, -) -> Result<(HttpResponseWriter, hyper_v014::Body), AnyError> { +) -> Result<(HttpResponseWriter, hyper_v014::Body), HttpError> { // Gzip, after level 1, doesn't produce significant size difference. // This default matches nginx default gzip compression level (1): // https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level @@ -878,25 +920,34 @@ async fn op_http_write_resource( state: Rc>, #[smi] rid: ResourceId, #[smi] stream: ResourceId, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let http_stream = state .borrow() .resource_table - .get::(rid)?; + .get::(rid) + .map_err(HttpError::Resource)?; let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; - let resource = state.borrow().resource_table.get_any(stream)?; + let resource = state + .borrow() + .resource_table + .get_any(stream) + .map_err(HttpError::Resource)?; loop { match *wr { HttpResponseWriter::Headers(_) => { - return Err(http_error("no response headers")) + return Err(HttpError::NoResponseHeaders) } HttpResponseWriter::Closed => { - return Err(http_error("response already completed")) + return Err(HttpError::ResponseAlreadyCompleted) } _ => {} }; - let view = resource.clone().read(64 * 1024).await?; // 64KB + let view = resource + .clone() + .read(64 * 1024) + .await + .map_err(HttpError::Other)?; // 64KB if view.is_empty() { break; } @@ -937,16 +988,17 @@ async fn op_http_write( state: Rc>, #[smi] rid: ResourceId, #[buffer] buf: JsBuffer, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let stream = state .borrow() .resource_table - .get::(rid)?; + .get::(rid) + .map_err(HttpError::Resource)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; match &mut *wr { - HttpResponseWriter::Headers(_) => Err(http_error("no response headers")), - HttpResponseWriter::Closed => Err(http_error("response already completed")), + HttpResponseWriter::Headers(_) => Err(HttpError::NoResponseHeaders), + HttpResponseWriter::Closed => Err(HttpError::ResponseAlreadyCompleted), HttpResponseWriter::Body { writer, .. } => { let mut result = writer.write_all(&buf).await; if result.is_ok() { @@ -961,7 +1013,7 @@ async fn op_http_write( stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; - Err(http_error("response already completed")) + Err(HttpError::ResponseAlreadyCompleted) } } } @@ -975,7 +1027,7 @@ async fn op_http_write( stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; - Err(http_error("response already completed")) + Err(HttpError::ResponseAlreadyCompleted) } } } @@ -989,11 +1041,12 @@ async fn op_http_write( async fn op_http_shutdown( state: Rc>, #[smi] rid: ResourceId, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let stream = state .borrow() .resource_table - .get::(rid)?; + .get::(rid) + .map_err(HttpError::Resource)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let wr = take(&mut *wr); match wr { @@ -1022,14 +1075,12 @@ async fn op_http_shutdown( #[op2] #[string] -fn op_http_websocket_accept_header( - #[string] key: String, -) -> Result { +fn op_http_websocket_accept_header(#[string] key: String) -> String { let digest = ring::digest::digest( &ring::digest::SHA1_FOR_LEGACY_USE_ONLY, format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(), ); - Ok(BASE64_STANDARD.encode(digest)) + BASE64_STANDARD.encode(digest) } #[op2(async)] @@ -1037,22 +1088,24 @@ fn op_http_websocket_accept_header( async fn op_http_upgrade_websocket( state: Rc>, #[smi] rid: ResourceId, -) -> Result { +) -> Result { let stream = state .borrow_mut() .resource_table - .get::(rid)?; + .get::(rid) + .map_err(HttpError::Resource)?; let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; let request = match &mut *rd { HttpRequestReader::Headers(request) => request, - _ => { - return Err(http_error("cannot upgrade because request body was used")) - } + _ => return Err(HttpError::UpgradeBodyUsed), }; - let (transport, bytes) = - extract_network_stream(hyper_v014::upgrade::on(request).await?); + let (transport, bytes) = extract_network_stream( + hyper_v014::upgrade::on(request) + .await + .map_err(|err| HttpError::HyperV014(Arc::new(err)))?, + ); Ok(ws_create_server_stream( &mut state.borrow_mut(), transport, @@ -1084,10 +1137,6 @@ where } } -fn http_error(message: &'static str) -> AnyError { - custom_error("Http", message) -} - /// Filters out the ever-surprising 'shutdown ENOTCONN' errors. fn filter_enotconn( result: Result<(), hyper_v014::Error>, -- cgit v1.2.3 From b9262130fec34137e38c922015c6b671c0fa9396 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Thu, 7 Nov 2024 17:12:13 +0530 Subject: feat(ext/http): abort signal when request is cancelled (#26761) Closes https://github.com/denoland/deno/issues/21653 --- ext/http/lib.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'ext/http/lib.rs') diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 6243804a1..9d71e3ad3 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -113,6 +113,7 @@ deno_core::extension!( http_next::op_http_get_request_header, http_next::op_http_get_request_headers, http_next::op_http_get_request_method_and_url, + http_next::op_http_get_request_cancelled, http_next::op_http_read_request_body, http_next::op_http_serve_on, http_next::op_http_serve, -- cgit v1.2.3 From b482a50299ae4f636a186038460e54af65e2b627 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Fri, 8 Nov 2024 18:46:11 +0530 Subject: feat(ext/http): abort event when request is cancelled (#26781) ```js Deno.serve(async (req) => { const { promise, resolve } = Promise.withResolvers(); req.signal.addEventListener("abort", () => { resolve(); }); await promise; return new Response("Ok"); }); ``` --- ext/http/lib.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'ext/http/lib.rs') diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 9d71e3ad3..49893b1b9 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -112,6 +112,7 @@ deno_core::extension!( http_next::op_http_close_after_finish, http_next::op_http_get_request_header, http_next::op_http_get_request_headers, + http_next::op_http_request_on_cancel, http_next::op_http_get_request_method_and_url, http_next::op_http_get_request_cancelled, http_next::op_http_read_request_body, -- cgit v1.2.3 From 9f26ca450936da66e57dad2206d0d11363e7b35c Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Tue, 19 Nov 2024 10:46:24 +0900 Subject: feat(ext/http): Make http server parameters configurable (#26785) This commit makes http server parameters configurable on the extension initialization via two callbacks users can provide. The main motivation behind this change is to allow `deno_http` users to tune the HTTP/2 server to suit their needs, although Deno CLI users will not benefit from it as no JavaScript interface is exposed to set these parameters currently. It is up to users whether to provide hook functions. If not provided, the default configuration from hyper crate will be used. --- ext/http/lib.rs | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) (limited to 'ext/http/lib.rs') diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 49893b1b9..39b0bbc2a 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -39,6 +39,8 @@ use deno_net::raw::NetworkStream; use deno_websocket::ws_create_server_stream; use flate2::write::GzEncoder; use flate2::Compression; +use hyper::server::conn::http1; +use hyper::server::conn::http2; use hyper_util::rt::TokioIo; use hyper_v014::body::Bytes; use hyper_v014::body::HttpBody; @@ -96,6 +98,25 @@ pub use request_properties::HttpRequestProperties; pub use service::UpgradeUnavailableError; pub use websocket_upgrade::WebSocketUpgradeError; +#[derive(Debug, Default, Clone, Copy)] +pub struct Options { + /// By passing a hook function, the caller can customize various configuration + /// options for the HTTP/2 server. + /// See [`http2::Builder`] for what parameters can be customized. + /// + /// If `None`, the default configuration provided by hyper will be used. Note + /// that the default configuration is subject to change in future versions. + pub http2_builder_hook: + Option) -> http2::Builder>, + /// By passing a hook function, the caller can customize various configuration + /// options for the HTTP/1 server. + /// See [`http1::Builder`] for what parameters can be customized. + /// + /// If `None`, the default configuration provided by hyper will be used. Note + /// that the default configuration is subject to change in future versions. + pub http1_builder_hook: Option http1::Builder>, +} + deno_core::extension!( deno_http, deps = [deno_web, deno_net, deno_fetch, deno_websocket], @@ -135,6 +156,12 @@ deno_core::extension!( http_next::op_http_cancel, ], esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"], + options = { + options: Options, + }, + state = |state, options| { + state.put::(options.options); + } ); #[derive(Debug, thiserror::Error)] @@ -1117,7 +1144,7 @@ async fn op_http_upgrade_websocket( // Needed so hyper can use non Send futures #[derive(Clone)] -struct LocalExecutor; +pub struct LocalExecutor; impl hyper_v014::rt::Executor for LocalExecutor where -- cgit v1.2.3