diff options
author | haturau <135221985+haturatu@users.noreply.github.com> | 2024-11-20 01:20:47 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-20 01:20:47 +0900 |
commit | 85719a67e59c7aa45bead26e4942d7df8b1b42d4 (patch) | |
tree | face0aecaac53e93ce2f23b53c48859bcf1a36ec /ext/http | |
parent | 67697bc2e4a62a9670699fd18ad0dd8efc5bd955 (diff) | |
parent | 186b52731c6bb326c4d32905c5e732d082e83465 (diff) |
Merge branch 'denoland:main' into main
Diffstat (limited to 'ext/http')
-rw-r--r-- | ext/http/00_serve.ts | 158 | ||||
-rw-r--r-- | ext/http/Cargo.toml | 2 | ||||
-rw-r--r-- | ext/http/fly_accept_encoding.rs | 2 | ||||
-rw-r--r-- | ext/http/http_next.rs | 261 | ||||
-rw-r--r-- | ext/http/lib.rs | 184 | ||||
-rw-r--r-- | ext/http/request_body.rs | 12 | ||||
-rw-r--r-- | ext/http/request_properties.rs | 27 | ||||
-rw-r--r-- | ext/http/service.rs | 23 | ||||
-rw-r--r-- | ext/http/websocket_upgrade.rs | 60 |
9 files changed, 492 insertions, 237 deletions
diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts index 3b9b085a2..fcdb87d09 100644 --- a/ext/http/00_serve.ts +++ b/ext/http/00_serve.ts @@ -14,6 +14,7 @@ import { op_http_get_request_headers, op_http_get_request_method_and_url, op_http_read_request_body, + op_http_request_on_cancel, op_http_serve, op_http_serve_on, op_http_set_promise_complete, @@ -41,6 +42,10 @@ const { Uint8Array, Promise, } = primordials; +const { + getAsyncContext, + setAsyncContext, +} = core; import { InnerBody } from "ext:deno_fetch/22_body.js"; import { Event } from "ext:deno_web/02_event.js"; @@ -76,7 +81,11 @@ import { ReadableStreamPrototype, resourceForReadableStream, } from "ext:deno_web/06_streams.js"; -import { listen, listenOptionApiName, TcpConn } from "ext:deno_net/01_net.js"; +import { + listen, + listenOptionApiName, + UpgradedConn, +} from "ext:deno_net/01_net.js"; import { hasTlsKeyPairOptions, listenTls } from "ext:deno_net/02_tls.js"; import { SymbolAsyncDispose } from "ext:deno_web/00_infra.js"; @@ -189,7 +198,7 @@ class InnerRequest { const upgradeRid = op_http_upgrade_raw(external); - const conn = new TcpConn( + const conn = new UpgradedConn( upgradeRid, underlyingConn?.remoteAddr, underlyingConn?.localAddr, @@ -369,6 +378,18 @@ class InnerRequest { get external() { return this.#external; } + + onCancel(callback) { + if (this.#external === null) { + callback(); + return; + } + + PromisePrototypeThen( + op_http_request_on_cancel(this.#external), + callback, + ); + } } class CallbackContext { @@ -380,8 +401,10 @@ class CallbackContext { /** @type {Promise<void> | undefined} */ closing; listener; + asyncContext; constructor(signal, args, listener) { + this.asyncContext = getAsyncContext(); // The abort signal triggers a non-graceful shutdown signal?.addEventListener( "abort", @@ -491,82 +514,89 @@ function fastSyncResponseOrStream( */ function mapToCallback(context, callback, onError) { return async function (req) { - // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback - // 500 error. - let innerRequest; - let response; - try { - innerRequest = new InnerRequest(req, context); - const request = fromInnerRequest(innerRequest, "immutable"); - innerRequest.request = request; - response = await callback( - request, - new ServeHandlerInfo(innerRequest), - ); - - // Throwing Error if the handler return value is not a Response class - if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) { - throw new TypeError( - "Return value from serve handler must be a response or a promise resolving to a response", - ); - } + const asyncContext = getAsyncContext(); + setAsyncContext(context.asyncContext); - if (response.type === "error") { - throw new TypeError( - "Return value from serve handler must not be an error response (like Response.error())", + try { + // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback + // 500 error. + let innerRequest; + let response; + try { + innerRequest = new InnerRequest(req, context); + const request = fromInnerRequest(innerRequest, "immutable"); + innerRequest.request = request; + response = await callback( + request, + new ServeHandlerInfo(innerRequest), ); - } - if (response.bodyUsed) { - throw new TypeError( - "The body of the Response returned from the serve handler has already been consumed", - ); - } - } catch (error) { - try { - response = await onError(error); + // Throwing Error if the handler return value is not a Response class if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) { throw new TypeError( - "Return value from onError handler must be a response or a promise resolving to a response", + "Return value from serve handler must be a response or a promise resolving to a response", + ); + } + + if (response.type === "error") { + throw new TypeError( + "Return value from serve handler must not be an error response (like Response.error())", + ); + } + + if (response.bodyUsed) { + throw new TypeError( + "The body of the Response returned from the serve handler has already been consumed", ); } } catch (error) { - // deno-lint-ignore no-console - console.error("Exception in onError while handling exception", error); - response = internalServerError(); + try { + response = await onError(error); + if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) { + throw new TypeError( + "Return value from onError handler must be a response or a promise resolving to a response", + ); + } + } catch (error) { + // deno-lint-ignore no-console + console.error("Exception in onError while handling exception", error); + response = internalServerError(); + } } - } - const inner = toInnerResponse(response); - if (innerRequest?.[_upgraded]) { - // We're done here as the connection has been upgraded during the callback and no longer requires servicing. - if (response !== UPGRADE_RESPONSE_SENTINEL) { - // deno-lint-ignore no-console - console.error("Upgrade response was not returned from callback"); - context.close(); + const inner = toInnerResponse(response); + if (innerRequest?.[_upgraded]) { + // We're done here as the connection has been upgraded during the callback and no longer requires servicing. + if (response !== UPGRADE_RESPONSE_SENTINEL) { + // deno-lint-ignore no-console + console.error("Upgrade response was not returned from callback"); + context.close(); + } + innerRequest?.[_upgraded](); + return; } - innerRequest?.[_upgraded](); - return; - } - // Did everything shut down while we were waiting? - if (context.closed) { - // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate - innerRequest?.close(); - op_http_set_promise_complete(req, 503); - return; - } + // Did everything shut down while we were waiting? + if (context.closed) { + // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate + innerRequest?.close(); + op_http_set_promise_complete(req, 503); + return; + } - const status = inner.status; - const headers = inner.headerList; - if (headers && headers.length > 0) { - if (headers.length == 1) { - op_http_set_response_header(req, headers[0][0], headers[0][1]); - } else { - op_http_set_response_headers(req, headers); + const status = inner.status; + const headers = inner.headerList; + if (headers && headers.length > 0) { + if (headers.length == 1) { + op_http_set_response_header(req, headers[0][0], headers[0][1]); + } else { + op_http_set_response_headers(req, headers); + } } - } - fastSyncResponseOrStream(req, inner.body, status, innerRequest); + fastSyncResponseOrStream(req, inner.body, status, innerRequest); + } finally { + setAsyncContext(asyncContext); + } }; } diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index b7637bec3..ed98fe349 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "deno_http" -version = "0.169.0" +version = "0.175.0" authors.workspace = true edition.workspace = true license.workspace = true diff --git a/ext/http/fly_accept_encoding.rs b/ext/http/fly_accept_encoding.rs index 94e336876..4d6fd2231 100644 --- a/ext/http/fly_accept_encoding.rs +++ b/ext/http/fly_accept_encoding.rs @@ -119,7 +119,7 @@ fn encodings_iter_inner<'s>( }; Some(Ok((encoding, qval))) }) - .map(|r| r?) // flatten Result<Result<... + .flatten() } #[cfg(test)] diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index efe1b88c9..7dbac6021 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -18,8 +18,8 @@ use crate::service::HttpServerState; use crate::service::SignallingRc; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; +use crate::Options; use cache_control::CacheControl; -use deno_core::error::AnyError; use deno_core::external; use deno_core::futures::future::poll_fn; use deno_core::futures::TryFutureExt; @@ -146,12 +146,32 @@ macro_rules! clone_external { }}; } +#[derive(Debug, thiserror::Error)] +pub enum HttpNextError { + #[error(transparent)] + Resource(deno_core::error::AnyError), + #[error("{0}")] + Io(#[from] io::Error), + #[error(transparent)] + WebSocketUpgrade(crate::websocket_upgrade::WebSocketUpgradeError), + #[error("{0}")] + Hyper(#[from] hyper::Error), + #[error(transparent)] + JoinError(#[from] tokio::task::JoinError), + #[error(transparent)] + Canceled(#[from] deno_core::Canceled), + #[error(transparent)] + HttpPropertyExtractor(deno_core::error::AnyError), + #[error(transparent)] + UpgradeUnavailable(#[from] crate::service::UpgradeUnavailableError), +} + #[op2(fast)] #[smi] pub fn op_http_upgrade_raw( state: &mut OpState, external: *const c_void, -) -> Result<ResourceId, AnyError> { +) -> Result<ResourceId, HttpNextError> { // SAFETY: external is deleted before calling this op. let http = unsafe { take_external!(external, "op_http_upgrade_raw") }; @@ -177,7 +197,7 @@ pub fn op_http_upgrade_raw( upgraded.write_all(&bytes).await?; break upgraded; } - Err(err) => return Err(err), + Err(err) => return Err(HttpNextError::WebSocketUpgrade(err)), } }; @@ -193,7 +213,7 @@ pub fn op_http_upgrade_raw( } read_tx.write_all(&buf[..read]).await?; } - Ok::<_, AnyError>(()) + Ok::<_, HttpNextError>(()) }); spawn(async move { let mut buf = [0; 1024]; @@ -204,7 +224,7 @@ pub fn op_http_upgrade_raw( } upgraded_tx.write_all(&buf[..read]).await?; } - Ok::<_, AnyError>(()) + Ok::<_, HttpNextError>(()) }); Ok(()) @@ -223,7 +243,7 @@ pub async fn op_http_upgrade_websocket_next( state: Rc<RefCell<OpState>>, external: *const c_void, #[serde] headers: Vec<(ByteString, ByteString)>, -) -> Result<ResourceId, AnyError> { +) -> Result<ResourceId, HttpNextError> { let http = // SAFETY: external is deleted before calling this op. unsafe { take_external!(external, "op_http_upgrade_websocket_next") }; @@ -246,7 +266,11 @@ pub async fn op_http_upgrade_websocket_next( // Stage 3: take the extracted raw network stream and upgrade it to a websocket, then return it let (stream, bytes) = extract_network_stream(upgraded); - ws_create_server_stream(&mut state.borrow_mut(), stream, bytes) + Ok(ws_create_server_stream( + &mut state.borrow_mut(), + stream, + bytes, + )) } #[op2(fast)] @@ -296,7 +320,7 @@ where let authority: v8::Local<v8::Value> = match request_properties.authority { Some(authority) => v8::String::new_from_utf8( scope, - authority.as_ref(), + authority.as_bytes(), v8::NewStringType::Normal, ) .unwrap() @@ -305,15 +329,25 @@ where }; // Only extract the path part - we handle authority elsewhere - let path = match &request_parts.uri.path_and_query() { - Some(path_and_query) => path_and_query.to_string(), - None => "".to_owned(), + let path = match request_parts.uri.path_and_query() { + Some(path_and_query) => { + let path = path_and_query.as_str(); + if matches!(path.as_bytes().first(), Some(b'/' | b'*')) { + Cow::Borrowed(path) + } else { + Cow::Owned(format!("/{}", path)) + } + } + None => Cow::Borrowed(""), }; - let path: v8::Local<v8::Value> = - v8::String::new_from_utf8(scope, path.as_ref(), v8::NewStringType::Normal) - .unwrap() - .into(); + let path: v8::Local<v8::Value> = v8::String::new_from_utf8( + scope, + path.as_bytes(), + v8::NewStringType::Normal, + ) + .unwrap() + .into(); let peer_address: v8::Local<v8::Value> = v8::String::new_from_utf8( scope, @@ -531,6 +565,7 @@ fn is_request_compressible( match accept_encoding.to_str() { // Firefox and Chrome send this -- no need to parse Ok("gzip, deflate, br") => return Compression::Brotli, + Ok("gzip, deflate, br, zstd") => return Compression::Brotli, Ok("gzip") => return Compression::GZip, Ok("br") => return Compression::Brotli, _ => (), @@ -667,6 +702,27 @@ fn set_response( http.complete(); } +#[op2(fast)] +pub fn op_http_get_request_cancelled(external: *const c_void) -> bool { + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_get_request_cancelled") }; + http.cancelled() +} + +#[op2(async)] +pub async fn op_http_request_on_cancel(external: *const c_void) { + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_request_on_cancel") }; + let (tx, rx) = tokio::sync::oneshot::channel(); + + http.on_cancel(tx); + drop(http); + + rx.await.ok(); +} + /// Returned promise resolves when body streaming finishes. /// Call [`op_http_close_after_finish`] when done with the external. #[op2(async)] @@ -676,7 +732,7 @@ pub async fn op_http_set_response_body_resource( #[smi] stream_rid: ResourceId, auto_close: bool, status: u16, -) -> Result<bool, AnyError> { +) -> Result<bool, HttpNextError> { let http = // SAFETY: op is called with external. unsafe { clone_external!(external, "op_http_set_response_body_resource") }; @@ -691,9 +747,15 @@ pub async fn op_http_set_response_body_resource( let resource = { let mut state = state.borrow_mut(); if auto_close { - state.resource_table.take_any(stream_rid)? + state + .resource_table + .take_any(stream_rid) + .map_err(HttpNextError::Resource)? } else { - state.resource_table.get_any(stream_rid)? + state + .resource_table + .get_any(stream_rid) + .map_err(HttpNextError::Resource)? } }; @@ -760,10 +822,16 @@ fn serve_http11_unconditional( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, + http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>, ) -> impl Future<Output = Result<(), hyper::Error>> + 'static { - let conn = http1::Builder::new() - .keep_alive(true) - .writev(*USE_WRITEV) + let mut builder = http1::Builder::new(); + builder.keep_alive(true).writev(*USE_WRITEV); + + if let Some(http1_builder_hook) = http1_builder_hook { + builder = http1_builder_hook(builder); + } + + let conn = builder .serve_connection(TokioIo::new(io), svc) .with_upgrades(); @@ -782,9 +850,17 @@ fn serve_http2_unconditional( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, + http2_builder_hook: Option< + fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>, + >, ) -> impl Future<Output = Result<(), hyper::Error>> + 'static { - let conn = - http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc); + let mut builder = http2::Builder::new(LocalExecutor); + + if let Some(http2_builder_hook) = http2_builder_hook { + builder = http2_builder_hook(builder); + } + + let conn = builder.serve_connection(TokioIo::new(io), svc); async { match conn.or_abort(cancel).await { Err(mut conn) => { @@ -800,17 +876,18 @@ async fn serve_http2_autodetect( io: impl HttpServeStream, svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, -) -> Result<(), AnyError> { + options: Options, +) -> Result<(), HttpNextError> { let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX); let (matches, io) = prefix.match_prefix().await?; if matches { - serve_http2_unconditional(io, svc, cancel) + serve_http2_unconditional(io, svc, cancel, options.http2_builder_hook) .await - .map_err(|e| e.into()) + .map_err(HttpNextError::Hyper) } else { - serve_http11_unconditional(io, svc, cancel) + serve_http11_unconditional(io, svc, cancel, options.http1_builder_hook) .await - .map_err(|e| e.into()) + .map_err(HttpNextError::Hyper) } } @@ -819,7 +896,8 @@ fn serve_https( request_info: HttpConnectionProperties, lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>, -) -> JoinHandle<Result<(), AnyError>> { + options: Options, +) -> JoinHandle<Result<(), HttpNextError>> { let HttpLifetime { server_state, connection_cancel_handle, @@ -830,21 +908,31 @@ fn serve_https( handle_request(req, request_info.clone(), server_state.clone(), tx.clone()) }); spawn( - async { + async move { let handshake = io.handshake().await?; // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect // based on the prefix bytes let handshake = handshake.alpn; if Some(TLS_ALPN_HTTP_2) == handshake.as_deref() { - serve_http2_unconditional(io, svc, listen_cancel_handle) - .await - .map_err(|e| e.into()) + serve_http2_unconditional( + io, + svc, + listen_cancel_handle, + options.http2_builder_hook, + ) + .await + .map_err(HttpNextError::Hyper) } else if Some(TLS_ALPN_HTTP_11) == handshake.as_deref() { - serve_http11_unconditional(io, svc, listen_cancel_handle) - .await - .map_err(|e| e.into()) + serve_http11_unconditional( + io, + svc, + listen_cancel_handle, + options.http1_builder_hook, + ) + .await + .map_err(HttpNextError::Hyper) } else { - serve_http2_autodetect(io, svc, listen_cancel_handle).await + serve_http2_autodetect(io, svc, listen_cancel_handle, options).await } } .try_or_cancel(connection_cancel_handle), @@ -856,7 +944,8 @@ fn serve_http( request_info: HttpConnectionProperties, lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>, -) -> JoinHandle<Result<(), AnyError>> { + options: Options, +) -> JoinHandle<Result<(), HttpNextError>> { let HttpLifetime { server_state, connection_cancel_handle, @@ -867,7 +956,7 @@ fn serve_http( handle_request(req, request_info.clone(), server_state.clone(), tx.clone()) }); spawn( - serve_http2_autodetect(io, svc, listen_cancel_handle) + serve_http2_autodetect(io, svc, listen_cancel_handle, options) .try_or_cancel(connection_cancel_handle), ) } @@ -877,7 +966,8 @@ fn serve_http_on<HTTP>( listen_properties: &HttpListenProperties, lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>, -) -> JoinHandle<Result<(), AnyError>> + options: Options, +) -> JoinHandle<Result<(), HttpNextError>> where HTTP: HttpPropertyExtractor, { @@ -888,14 +978,14 @@ where match network_stream { NetworkStream::Tcp(conn) => { - serve_http(conn, connection_properties, lifetime, tx) + serve_http(conn, connection_properties, lifetime, tx, options) } NetworkStream::Tls(conn) => { - serve_https(conn, connection_properties, lifetime, tx) + serve_https(conn, connection_properties, lifetime, tx, options) } #[cfg(unix)] NetworkStream::Unix(conn) => { - serve_http(conn, connection_properties, lifetime, tx) + serve_http(conn, connection_properties, lifetime, tx, options) } } } @@ -908,7 +998,7 @@ struct HttpLifetime { } struct HttpJoinHandle { - join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>, + join_handle: AsyncRefCell<Option<JoinHandle<Result<(), HttpNextError>>>>, connection_cancel_handle: Rc<CancelHandle>, listen_cancel_handle: Rc<CancelHandle>, rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>, @@ -968,12 +1058,13 @@ impl Drop for HttpJoinHandle { pub fn op_http_serve<HTTP>( state: Rc<RefCell<OpState>>, #[smi] listener_rid: ResourceId, -) -> Result<(ResourceId, &'static str, String), AnyError> +) -> Result<(ResourceId, &'static str, String), HttpNextError> where HTTP: HttpPropertyExtractor, { let listener = - HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?; + HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid) + .map_err(HttpNextError::Resource)?; let listen_properties = HTTP::listen_properties_from_listener(&listener)?; @@ -983,21 +1074,28 @@ where let lifetime = resource.lifetime(); + let options = { + let state = state.borrow(); + *state.borrow::<Options>() + }; + let listen_properties_clone: HttpListenProperties = listen_properties.clone(); let handle = spawn(async move { loop { let conn = HTTP::accept_connection_from_listener(&listener) .try_or_cancel(listen_cancel_clone.clone()) - .await?; + .await + .map_err(HttpNextError::HttpPropertyExtractor)?; serve_http_on::<HTTP>( conn, &listen_properties_clone, lifetime.clone(), tx.clone(), + options, ); } #[allow(unreachable_code)] - Ok::<_, AnyError>(()) + Ok::<_, HttpNextError>(()) }); // Set the handle after we start the future @@ -1017,25 +1115,31 @@ where pub fn op_http_serve_on<HTTP>( state: Rc<RefCell<OpState>>, #[smi] connection_rid: ResourceId, -) -> Result<(ResourceId, &'static str, String), AnyError> +) -> Result<(ResourceId, &'static str, String), HttpNextError> where HTTP: HttpPropertyExtractor, { let connection = - HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?; + HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid) + .map_err(HttpNextError::Resource)?; let listen_properties = HTTP::listen_properties_from_connection(&connection)?; let (tx, rx) = tokio::sync::mpsc::channel(10); let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx)); - let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> = - serve_http_on::<HTTP>( - connection, - &listen_properties, - resource.lifetime(), - tx, - ); + let options = { + let state = state.borrow(); + *state.borrow::<Options>() + }; + + let handle = serve_http_on::<HTTP>( + connection, + &listen_properties, + resource.lifetime(), + tx, + options, + ); // Set the handle after we start the future *RcRef::map(&resource, |this| &this.join_handle) @@ -1081,12 +1185,13 @@ pub fn op_http_try_wait( pub async fn op_http_wait( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, -) -> Result<*const c_void, AnyError> { +) -> Result<*const c_void, HttpNextError> { // We will get the join handle initially, as we might be consuming requests still let join_handle = state .borrow_mut() .resource_table - .get::<HttpJoinHandle>(rid)?; + .get::<HttpJoinHandle>(rid) + .map_err(HttpNextError::Resource)?; let cancel = join_handle.listen_cancel_handle(); let next = async { @@ -1113,13 +1218,12 @@ pub async fn op_http_wait( // Filter out shutdown (ENOTCONN) errors if let Err(err) = res { - if let Some(err) = err.source() { - if let Some(err) = err.downcast_ref::<io::Error>() { - if err.kind() == io::ErrorKind::NotConnected { - return Ok(null()); - } + if let HttpNextError::Io(err) = &err { + if err.kind() == io::ErrorKind::NotConnected { + return Ok(null()); } } + return Err(err); } @@ -1132,7 +1236,7 @@ pub fn op_http_cancel( state: &mut OpState, #[smi] rid: ResourceId, graceful: bool, -) -> Result<(), AnyError> { +) -> Result<(), deno_core::error::AnyError> { let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?; if graceful { @@ -1152,11 +1256,12 @@ pub async fn op_http_close( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, graceful: bool, -) -> Result<(), AnyError> { +) -> Result<(), HttpNextError> { let join_handle = state .borrow_mut() .resource_table - .take::<HttpJoinHandle>(rid)?; + .take::<HttpJoinHandle>(rid) + .map_err(HttpNextError::Resource)?; if graceful { http_general_trace!("graceful shutdown"); @@ -1202,23 +1307,26 @@ impl UpgradeStream { } } - async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { + async fn read( + self: Rc<Self>, + buf: &mut [u8], + ) -> Result<usize, std::io::Error> { let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); async { let read = RcRef::map(self, |this| &this.read); let mut read = read.borrow_mut().await; - Ok(Pin::new(&mut *read).read(buf).await?) + Pin::new(&mut *read).read(buf).await } .try_or_cancel(cancel_handle) .await } - async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, std::io::Error> { let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); async { let write = RcRef::map(self, |this| &this.write); let mut write = write.borrow_mut().await; - Ok(Pin::new(&mut *write).write(buf).await?) + Pin::new(&mut *write).write(buf).await } .try_or_cancel(cancel_handle) .await @@ -1228,7 +1336,7 @@ impl UpgradeStream { self: Rc<Self>, buf1: &[u8], buf2: &[u8], - ) -> Result<usize, AnyError> { + ) -> Result<usize, std::io::Error> { let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await; let total = buf1.len() + buf2.len(); @@ -1281,9 +1389,12 @@ pub async fn op_raw_write_vectored( #[smi] rid: ResourceId, #[buffer] buf1: JsBuffer, #[buffer] buf2: JsBuffer, -) -> Result<usize, AnyError> { - let resource: Rc<UpgradeStream> = - state.borrow().resource_table.get::<UpgradeStream>(rid)?; +) -> Result<usize, HttpNextError> { + let resource: Rc<UpgradeStream> = state + .borrow() + .resource_table + .get::<UpgradeStream>(rid) + .map_err(HttpNextError::Resource)?; let nwritten = resource.write_vectored(&buf1, &buf2).await?; Ok(nwritten) } diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 934f8a002..39b0bbc2a 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -6,8 +6,6 @@ use async_compression::Level; use base64::prelude::BASE64_STANDARD; use base64::Engine; use cache_control::CacheControl; -use deno_core::error::custom_error; -use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; use deno_core::futures::channel::oneshot; use deno_core::futures::future::pending; @@ -41,6 +39,8 @@ use deno_net::raw::NetworkStream; use deno_websocket::ws_create_server_stream; use flate2::write::GzEncoder; use flate2::Compression; +use hyper::server::conn::http1; +use hyper::server::conn::http2; use hyper_util::rt::TokioIo; use hyper_v014::body::Bytes; use hyper_v014::body::HttpBody; @@ -89,11 +89,33 @@ mod service; mod websocket_upgrade; use fly_accept_encoding::Encoding; +pub use http_next::HttpNextError; pub use request_properties::DefaultHttpPropertyExtractor; pub use request_properties::HttpConnectionProperties; pub use request_properties::HttpListenProperties; pub use request_properties::HttpPropertyExtractor; pub use request_properties::HttpRequestProperties; +pub use service::UpgradeUnavailableError; +pub use websocket_upgrade::WebSocketUpgradeError; + +#[derive(Debug, Default, Clone, Copy)] +pub struct Options { + /// By passing a hook function, the caller can customize various configuration + /// options for the HTTP/2 server. + /// See [`http2::Builder`] for what parameters can be customized. + /// + /// If `None`, the default configuration provided by hyper will be used. Note + /// that the default configuration is subject to change in future versions. + pub http2_builder_hook: + Option<fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>>, + /// By passing a hook function, the caller can customize various configuration + /// options for the HTTP/1 server. + /// See [`http1::Builder`] for what parameters can be customized. + /// + /// If `None`, the default configuration provided by hyper will be used. Note + /// that the default configuration is subject to change in future versions. + pub http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>, +} deno_core::extension!( deno_http, @@ -111,7 +133,9 @@ deno_core::extension!( http_next::op_http_close_after_finish, http_next::op_http_get_request_header, http_next::op_http_get_request_headers, + http_next::op_http_request_on_cancel, http_next::op_http_get_request_method_and_url<HTTP>, + http_next::op_http_get_request_cancelled, http_next::op_http_read_request_body, http_next::op_http_serve_on<HTTP>, http_next::op_http_serve<HTTP>, @@ -132,8 +156,46 @@ deno_core::extension!( http_next::op_http_cancel, ], esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"], + options = { + options: Options, + }, + state = |state, options| { + state.put::<Options>(options.options); + } ); +#[derive(Debug, thiserror::Error)] +pub enum HttpError { + #[error(transparent)] + Resource(deno_core::error::AnyError), + #[error(transparent)] + Canceled(#[from] deno_core::Canceled), + #[error("{0}")] + HyperV014(#[source] Arc<hyper_v014::Error>), + #[error("{0}")] + InvalidHeaderName(#[from] hyper_v014::header::InvalidHeaderName), + #[error("{0}")] + InvalidHeaderValue(#[from] hyper_v014::header::InvalidHeaderValue), + #[error("{0}")] + Http(#[from] hyper_v014::http::Error), + #[error("response headers already sent")] + ResponseHeadersAlreadySent, + #[error("connection closed while sending response")] + ConnectionClosedWhileSendingResponse, + #[error("already in use")] + AlreadyInUse, + #[error("{0}")] + Io(#[from] std::io::Error), + #[error("no response headers")] + NoResponseHeaders, + #[error("response already completed")] + ResponseAlreadyCompleted, + #[error("cannot upgrade because request body was used")] + UpgradeBodyUsed, + #[error(transparent)] + Other(deno_core::error::AnyError), +} + pub enum HttpSocketAddr { IpSocket(std::net::SocketAddr), #[cfg(unix)] @@ -216,7 +278,7 @@ impl HttpConnResource { String, String, )>, - AnyError, + HttpError, > { let fut = async { let (request_tx, request_rx) = oneshot::channel(); @@ -259,8 +321,8 @@ impl HttpConnResource { } /// A future that completes when this HTTP connection is closed or errors. - async fn closed(&self) -> Result<(), AnyError> { - self.closed_fut.clone().map_err(AnyError::from).await + async fn closed(&self) -> Result<(), HttpError> { + self.closed_fut.clone().map_err(HttpError::HyperV014).await } } @@ -280,14 +342,13 @@ pub fn http_create_conn_resource<S, A>( io: S, addr: A, scheme: &'static str, -) -> Result<ResourceId, AnyError> +) -> ResourceId where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, A: Into<HttpSocketAddr>, { let conn = HttpConnResource::new(io, scheme, addr.into()); - let rid = state.resource_table.add(conn); - Ok(rid) + state.resource_table.add(conn) } /// An object that implements the `hyper::Service` trait, through which Hyper @@ -423,7 +484,9 @@ impl Resource for HttpStreamReadResource { // safely call `await` on it without creating a race condition. Some(_) => match body.as_mut().next().await.unwrap() { Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), + Err(err) => { + break Err(HttpError::HyperV014(Arc::new(err)).into()) + } }, None => break Ok(BufView::empty()), } @@ -545,8 +608,12 @@ struct NextRequestResponse( async fn op_http_accept( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, -) -> Result<Option<NextRequestResponse>, AnyError> { - let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?; +) -> Result<Option<NextRequestResponse>, HttpError> { + let conn = state + .borrow() + .resource_table + .get::<HttpConnResource>(rid) + .map_err(HttpError::Resource)?; match conn.accept().await { Ok(Some((read_stream, write_stream, method, url))) => { @@ -657,11 +724,12 @@ async fn op_http_write_headers( #[smi] status: u16, #[serde] headers: Vec<(ByteString, ByteString)>, #[serde] data: Option<StringOrBuffer>, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let stream = state .borrow_mut() .resource_table - .get::<HttpStreamWriteResource>(rid)?; + .get::<HttpStreamWriteResource>(rid) + .map_err(HttpError::Resource)?; // Track supported encoding let encoding = stream.accept_encoding; @@ -708,14 +776,14 @@ async fn op_http_write_headers( let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let response_tx = match replace(&mut *old_wr, new_wr) { HttpResponseWriter::Headers(response_tx) => response_tx, - _ => return Err(http_error("response headers already sent")), + _ => return Err(HttpError::ResponseHeadersAlreadySent), }; match response_tx.send(body) { Ok(_) => Ok(()), Err(_) => { stream.conn.closed().await?; - Err(http_error("connection closed while sending response")) + Err(HttpError::ConnectionClosedWhileSendingResponse) } } } @@ -725,11 +793,14 @@ async fn op_http_write_headers( fn op_http_headers( state: &mut OpState, #[smi] rid: u32, -) -> Result<Vec<(ByteString, ByteString)>, AnyError> { - let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?; +) -> Result<Vec<(ByteString, ByteString)>, HttpError> { + let stream = state + .resource_table + .get::<HttpStreamReadResource>(rid) + .map_err(HttpError::Resource)?; let rd = RcRef::map(&stream, |r| &r.rd) .try_borrow() - .ok_or_else(|| http_error("already in use"))?; + .ok_or(HttpError::AlreadyInUse)?; match &*rd { HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())), HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)), @@ -741,7 +812,7 @@ fn http_response( data: Option<StringOrBuffer>, compressing: bool, encoding: Encoding, -) -> Result<(HttpResponseWriter, hyper_v014::Body), AnyError> { +) -> Result<(HttpResponseWriter, hyper_v014::Body), HttpError> { // Gzip, after level 1, doesn't produce significant size difference. // This default matches nginx default gzip compression level (1): // https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level @@ -878,25 +949,34 @@ async fn op_http_write_resource( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, #[smi] stream: ResourceId, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let http_stream = state .borrow() .resource_table - .get::<HttpStreamWriteResource>(rid)?; + .get::<HttpStreamWriteResource>(rid) + .map_err(HttpError::Resource)?; let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; - let resource = state.borrow().resource_table.get_any(stream)?; + let resource = state + .borrow() + .resource_table + .get_any(stream) + .map_err(HttpError::Resource)?; loop { match *wr { HttpResponseWriter::Headers(_) => { - return Err(http_error("no response headers")) + return Err(HttpError::NoResponseHeaders) } HttpResponseWriter::Closed => { - return Err(http_error("response already completed")) + return Err(HttpError::ResponseAlreadyCompleted) } _ => {} }; - let view = resource.clone().read(64 * 1024).await?; // 64KB + let view = resource + .clone() + .read(64 * 1024) + .await + .map_err(HttpError::Other)?; // 64KB if view.is_empty() { break; } @@ -937,16 +1017,17 @@ async fn op_http_write( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, #[buffer] buf: JsBuffer, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let stream = state .borrow() .resource_table - .get::<HttpStreamWriteResource>(rid)?; + .get::<HttpStreamWriteResource>(rid) + .map_err(HttpError::Resource)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; match &mut *wr { - HttpResponseWriter::Headers(_) => Err(http_error("no response headers")), - HttpResponseWriter::Closed => Err(http_error("response already completed")), + HttpResponseWriter::Headers(_) => Err(HttpError::NoResponseHeaders), + HttpResponseWriter::Closed => Err(HttpError::ResponseAlreadyCompleted), HttpResponseWriter::Body { writer, .. } => { let mut result = writer.write_all(&buf).await; if result.is_ok() { @@ -961,7 +1042,7 @@ async fn op_http_write( stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; - Err(http_error("response already completed")) + Err(HttpError::ResponseAlreadyCompleted) } } } @@ -975,7 +1056,7 @@ async fn op_http_write( stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; - Err(http_error("response already completed")) + Err(HttpError::ResponseAlreadyCompleted) } } } @@ -989,11 +1070,12 @@ async fn op_http_write( async fn op_http_shutdown( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let stream = state .borrow() .resource_table - .get::<HttpStreamWriteResource>(rid)?; + .get::<HttpStreamWriteResource>(rid) + .map_err(HttpError::Resource)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let wr = take(&mut *wr); match wr { @@ -1022,14 +1104,12 @@ async fn op_http_shutdown( #[op2] #[string] -fn op_http_websocket_accept_header( - #[string] key: String, -) -> Result<String, AnyError> { +fn op_http_websocket_accept_header(#[string] key: String) -> String { let digest = ring::digest::digest( &ring::digest::SHA1_FOR_LEGACY_USE_ONLY, format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(), ); - Ok(BASE64_STANDARD.encode(digest)) + BASE64_STANDARD.encode(digest) } #[op2(async)] @@ -1037,30 +1117,34 @@ fn op_http_websocket_accept_header( async fn op_http_upgrade_websocket( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, -) -> Result<ResourceId, AnyError> { +) -> Result<ResourceId, HttpError> { let stream = state .borrow_mut() .resource_table - .get::<HttpStreamReadResource>(rid)?; + .get::<HttpStreamReadResource>(rid) + .map_err(HttpError::Resource)?; let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; let request = match &mut *rd { HttpRequestReader::Headers(request) => request, - _ => { - return Err(http_error("cannot upgrade because request body was used")) - } + _ => return Err(HttpError::UpgradeBodyUsed), }; - let (transport, bytes) = - extract_network_stream(hyper_v014::upgrade::on(request).await?); - let ws_rid = - ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?; - Ok(ws_rid) + let (transport, bytes) = extract_network_stream( + hyper_v014::upgrade::on(request) + .await + .map_err(|err| HttpError::HyperV014(Arc::new(err)))?, + ); + Ok(ws_create_server_stream( + &mut state.borrow_mut(), + transport, + bytes, + )) } // Needed so hyper can use non Send futures #[derive(Clone)] -struct LocalExecutor; +pub struct LocalExecutor; impl<Fut> hyper_v014::rt::Executor<Fut> for LocalExecutor where @@ -1082,10 +1166,6 @@ where } } -fn http_error(message: &'static str) -> AnyError { - custom_error("Http", message) -} - /// Filters out the ever-surprising 'shutdown ENOTCONN' errors. fn filter_enotconn( result: Result<(), hyper_v014::Error>, diff --git a/ext/http/request_body.rs b/ext/http/request_body.rs index 45df12457..f1c3f358e 100644 --- a/ext/http/request_body.rs +++ b/ext/http/request_body.rs @@ -1,9 +1,9 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use bytes::Bytes; -use deno_core::error::AnyError; use deno_core::futures::stream::Peekable; use deno_core::futures::Stream; use deno_core::futures::StreamExt; +use deno_core::futures::TryFutureExt; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::BufView; @@ -22,7 +22,7 @@ use std::task::Poll; struct ReadFuture(Incoming); impl Stream for ReadFuture { - type Item = Result<Bytes, AnyError>; + type Item = Result<Bytes, hyper::Error>; fn poll_next( self: Pin<&mut Self>, @@ -37,13 +37,13 @@ impl Stream for ReadFuture { if let Ok(data) = frame.into_data() { // Ensure that we never yield an empty frame if !data.is_empty() { - break Poll::Ready(Some(Ok::<_, AnyError>(data))); + break Poll::Ready(Some(Ok(data))); } } // Loop again so we don't lose the waker continue; } - Some(Err(e)) => Poll::Ready(Some(Err(e.into()))), + Some(Err(e)) => Poll::Ready(Some(Err(e))), None => Poll::Ready(None), }; } @@ -58,7 +58,7 @@ impl HttpRequestBody { Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint) } - async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> { + async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, hyper::Error> { let peekable = RcRef::map(self, |this| &this.0); let mut peekable = peekable.borrow_mut().await; match Pin::new(&mut *peekable).peek_mut().await { @@ -82,7 +82,7 @@ impl Resource for HttpRequestBody { } fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { - Box::pin(HttpRequestBody::read(self, limit)) + Box::pin(HttpRequestBody::read(self, limit).map_err(Into::into)) } fn size_hint(&self) -> (u64, Option<u64>) { diff --git a/ext/http/request_properties.rs b/ext/http/request_properties.rs index 1422c7417..39d35a79f 100644 --- a/ext/http/request_properties.rs +++ b/ext/http/request_properties.rs @@ -34,8 +34,8 @@ pub struct HttpConnectionProperties { pub stream_type: NetworkStreamType, } -pub struct HttpRequestProperties { - pub authority: Option<String>, +pub struct HttpRequestProperties<'a> { + pub authority: Option<Cow<'a, str>>, } /// Pluggable trait to determine listen, connection and request properties @@ -84,11 +84,11 @@ pub trait HttpPropertyExtractor { ) -> NetworkStream; /// Determines the request properties. - fn request_properties( - connection_properties: &HttpConnectionProperties, - uri: &Uri, - headers: &HeaderMap, - ) -> HttpRequestProperties; + fn request_properties<'a>( + connection_properties: &'a HttpConnectionProperties, + uri: &'a Uri, + headers: &'a HeaderMap, + ) -> HttpRequestProperties<'a>; } pub struct DefaultHttpPropertyExtractor {} @@ -180,18 +180,17 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor { } } - fn request_properties( - connection_properties: &HttpConnectionProperties, - uri: &Uri, - headers: &HeaderMap, - ) -> HttpRequestProperties { + fn request_properties<'a>( + connection_properties: &'a HttpConnectionProperties, + uri: &'a Uri, + headers: &'a HeaderMap, + ) -> HttpRequestProperties<'a> { let authority = req_host( uri, headers, connection_properties.stream_type, connection_properties.local_port.unwrap_or_default(), - ) - .map(|s| s.into_owned()); + ); HttpRequestProperties { authority } } diff --git a/ext/http/service.rs b/ext/http/service.rs index 787e9babf..ce24dea43 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -2,7 +2,6 @@ use crate::request_properties::HttpConnectionProperties; use crate::response_body::ResponseBytesInner; use crate::response_body::ResponseStreamResult; -use deno_core::error::AnyError; use deno_core::futures::ready; use deno_core::BufView; use deno_core::OpState; @@ -28,6 +27,7 @@ use std::rc::Rc; use std::task::Context; use std::task::Poll; use std::task::Waker; +use tokio::sync::oneshot; pub type Request = hyper::Request<Incoming>; pub type Response = hyper::Response<HttpRecordResponse>; @@ -206,8 +206,13 @@ pub(crate) async fn handle_request( Ok(response) } +#[derive(Debug, thiserror::Error)] +#[error("upgrade unavailable")] +pub struct UpgradeUnavailableError; + struct HttpRecordInner { server_state: SignallingRc<HttpServerState>, + closed_channel: Option<oneshot::Sender<()>>, request_info: HttpConnectionProperties, request_parts: http::request::Parts, request_body: Option<RequestBodyState>, @@ -273,6 +278,7 @@ impl HttpRecord { response_body_finished: false, response_body_waker: None, trailers: None, + closed_channel: None, been_dropped: false, finished: false, needs_close_after_finish: false, @@ -309,6 +315,10 @@ impl HttpRecord { RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish) } + pub fn on_cancel(&self, sender: oneshot::Sender<()>) { + self.self_mut().closed_channel = Some(sender); + } + fn recycle(self: Rc<Self>) { assert!( Rc::strong_count(&self) == 1, @@ -344,14 +354,14 @@ impl HttpRecord { } /// Perform the Hyper upgrade on this record. - pub fn upgrade(&self) -> Result<OnUpgrade, AnyError> { + pub fn upgrade(&self) -> Result<OnUpgrade, UpgradeUnavailableError> { // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit self .self_mut() .request_parts .extensions .remove::<OnUpgrade>() - .ok_or_else(|| AnyError::msg("upgrade unavailable")) + .ok_or(UpgradeUnavailableError) } /// Take the Hyper body from this record. @@ -387,6 +397,9 @@ impl HttpRecord { inner.been_dropped = true; // The request body might include actual resources. inner.request_body.take(); + if let Some(closed_channel) = inner.closed_channel.take() { + let _ = closed_channel.send(()); + } } /// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well). @@ -515,7 +528,7 @@ pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>); impl Body for HttpRecordResponse { type Data = BufView; - type Error = AnyError; + type Error = deno_core::error::AnyError; fn poll_frame( self: Pin<&mut Self>, @@ -640,7 +653,7 @@ mod tests { } #[tokio::test] - async fn test_handle_request() -> Result<(), AnyError> { + async fn test_handle_request() -> Result<(), deno_core::error::AnyError> { let (tx, mut rx) = tokio::sync::mpsc::channel(10); let server_state = HttpServerState::new(); let server_state_check = server_state.clone(); diff --git a/ext/http/websocket_upgrade.rs b/ext/http/websocket_upgrade.rs index 4dead767a..af9504717 100644 --- a/ext/http/websocket_upgrade.rs +++ b/ext/http/websocket_upgrade.rs @@ -4,7 +4,6 @@ use std::marker::PhantomData; use bytes::Bytes; use bytes::BytesMut; -use deno_core::error::AnyError; use httparse::Status; use hyper::header::HeaderName; use hyper::header::HeaderValue; @@ -13,12 +12,30 @@ use memmem::Searcher; use memmem::TwoWaySearcher; use once_cell::sync::OnceCell; -use crate::http_error; +#[derive(Debug, thiserror::Error)] +pub enum WebSocketUpgradeError { + #[error("invalid headers")] + InvalidHeaders, + #[error("{0}")] + HttpParse(#[from] httparse::Error), + #[error("{0}")] + Http(#[from] http::Error), + #[error("{0}")] + Utf8(#[from] std::str::Utf8Error), + #[error("{0}")] + InvalidHeaderName(#[from] http::header::InvalidHeaderName), + #[error("{0}")] + InvalidHeaderValue(#[from] http::header::InvalidHeaderValue), + #[error("invalid HTTP status line")] + InvalidHttpStatusLine, + #[error("attempted to write to completed upgrade buffer")] + UpgradeBufferAlreadyCompleted, +} /// Given a buffer that ends in `\n\n` or `\r\n\r\n`, returns a parsed [`Request<Body>`]. fn parse_response<T: Default>( header_bytes: &[u8], -) -> Result<(usize, Response<T>), AnyError> { +) -> Result<(usize, Response<T>), WebSocketUpgradeError> { let mut headers = [httparse::EMPTY_HEADER; 16]; let status = httparse::parse_headers(header_bytes, &mut headers)?; match status { @@ -32,7 +49,7 @@ fn parse_response<T: Default>( } Ok((index, resp)) } - _ => Err(http_error("invalid headers")), + _ => Err(WebSocketUpgradeError::InvalidHeaders), } } @@ -69,11 +86,14 @@ pub struct WebSocketUpgrade<T: Default> { impl<T: Default> WebSocketUpgrade<T> { /// Ensures that the status line starts with "HTTP/1.1 101 " which matches all of the node.js /// WebSocket libraries that are known. We don't care about the trailing status text. - fn validate_status(&self, status: &[u8]) -> Result<(), AnyError> { + fn validate_status( + &self, + status: &[u8], + ) -> Result<(), WebSocketUpgradeError> { if status.starts_with(b"HTTP/1.1 101 ") { Ok(()) } else { - Err(http_error("invalid HTTP status line")) + Err(WebSocketUpgradeError::InvalidHttpStatusLine) } } @@ -82,7 +102,7 @@ impl<T: Default> WebSocketUpgrade<T> { pub fn write( &mut self, bytes: &[u8], - ) -> Result<Option<(Response<T>, Bytes)>, AnyError> { + ) -> Result<Option<(Response<T>, Bytes)>, WebSocketUpgradeError> { use WebSocketUpgradeState::*; match self.state { @@ -142,9 +162,7 @@ impl<T: Default> WebSocketUpgrade<T> { Ok(None) } } - Complete => { - Err(http_error("attempted to write to completed upgrade buffer")) - } + Complete => Err(WebSocketUpgradeError::UpgradeBufferAlreadyCompleted), } } } @@ -157,8 +175,8 @@ mod tests { type ExpectedResponseAndHead = Option<(Response<Body>, &'static [u8])>; fn assert_response( - result: Result<Option<(Response<Body>, Bytes)>, AnyError>, - expected: Result<ExpectedResponseAndHead, &'static str>, + result: Result<Option<(Response<Body>, Bytes)>, WebSocketUpgradeError>, + expected: Result<ExpectedResponseAndHead, WebSocketUpgradeError>, chunk_info: Option<(usize, usize)>, ) { let formatted = format!("{result:?}"); @@ -189,8 +207,8 @@ mod tests { "Expected Ok(None), was {formatted}", ), Err(e) => assert_eq!( - e, - result.err().map(|e| format!("{e:?}")).unwrap_or_default(), + format!("{e:?}"), + format!("{:?}", result.unwrap_err()), "Expected error, was {formatted}", ), } @@ -198,7 +216,7 @@ mod tests { fn validate_upgrade_all_at_once( s: &str, - expected: Result<ExpectedResponseAndHead, &'static str>, + expected: Result<ExpectedResponseAndHead, WebSocketUpgradeError>, ) { let mut upgrade = WebSocketUpgrade::default(); let res = upgrade.write(s.as_bytes()); @@ -209,7 +227,7 @@ mod tests { fn validate_upgrade_chunks( s: &str, size: usize, - expected: Result<ExpectedResponseAndHead, &'static str>, + expected: Result<ExpectedResponseAndHead, WebSocketUpgradeError>, ) { let chunk_info = Some((s.as_bytes().len(), size)); let mut upgrade = WebSocketUpgrade::default(); @@ -226,7 +244,7 @@ mod tests { fn validate_upgrade( s: &str, - expected: fn() -> Result<ExpectedResponseAndHead, &'static str>, + expected: fn() -> Result<ExpectedResponseAndHead, WebSocketUpgradeError>, ) { validate_upgrade_all_at_once(s, expected()); validate_upgrade_chunks(s, 1, expected()); @@ -315,7 +333,7 @@ mod tests { #[test] fn upgrade_invalid_status() { validate_upgrade("HTTP/1.1 200 OK\nConnection: Upgrade\n\n", || { - Err("invalid HTTP status line") + Err(WebSocketUpgradeError::InvalidHttpStatusLine) }); } @@ -327,7 +345,11 @@ mod tests { .join("\n"); validate_upgrade( &format!("HTTP/1.1 101 Switching Protocols\n{headers}\n\n"), - || Err("too many headers"), + || { + Err(WebSocketUpgradeError::HttpParse( + httparse::Error::TooManyHeaders, + )) + }, ); } } |