summaryrefslogtreecommitdiff
path: root/ext/http
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http')
-rw-r--r--ext/http/00_serve.ts158
-rw-r--r--ext/http/Cargo.toml2
-rw-r--r--ext/http/fly_accept_encoding.rs2
-rw-r--r--ext/http/http_next.rs261
-rw-r--r--ext/http/lib.rs184
-rw-r--r--ext/http/request_body.rs12
-rw-r--r--ext/http/request_properties.rs27
-rw-r--r--ext/http/service.rs23
-rw-r--r--ext/http/websocket_upgrade.rs60
9 files changed, 492 insertions, 237 deletions
diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts
index 3b9b085a2..fcdb87d09 100644
--- a/ext/http/00_serve.ts
+++ b/ext/http/00_serve.ts
@@ -14,6 +14,7 @@ import {
op_http_get_request_headers,
op_http_get_request_method_and_url,
op_http_read_request_body,
+ op_http_request_on_cancel,
op_http_serve,
op_http_serve_on,
op_http_set_promise_complete,
@@ -41,6 +42,10 @@ const {
Uint8Array,
Promise,
} = primordials;
+const {
+ getAsyncContext,
+ setAsyncContext,
+} = core;
import { InnerBody } from "ext:deno_fetch/22_body.js";
import { Event } from "ext:deno_web/02_event.js";
@@ -76,7 +81,11 @@ import {
ReadableStreamPrototype,
resourceForReadableStream,
} from "ext:deno_web/06_streams.js";
-import { listen, listenOptionApiName, TcpConn } from "ext:deno_net/01_net.js";
+import {
+ listen,
+ listenOptionApiName,
+ UpgradedConn,
+} from "ext:deno_net/01_net.js";
import { hasTlsKeyPairOptions, listenTls } from "ext:deno_net/02_tls.js";
import { SymbolAsyncDispose } from "ext:deno_web/00_infra.js";
@@ -189,7 +198,7 @@ class InnerRequest {
const upgradeRid = op_http_upgrade_raw(external);
- const conn = new TcpConn(
+ const conn = new UpgradedConn(
upgradeRid,
underlyingConn?.remoteAddr,
underlyingConn?.localAddr,
@@ -369,6 +378,18 @@ class InnerRequest {
get external() {
return this.#external;
}
+
+ onCancel(callback) {
+ if (this.#external === null) {
+ callback();
+ return;
+ }
+
+ PromisePrototypeThen(
+ op_http_request_on_cancel(this.#external),
+ callback,
+ );
+ }
}
class CallbackContext {
@@ -380,8 +401,10 @@ class CallbackContext {
/** @type {Promise<void> | undefined} */
closing;
listener;
+ asyncContext;
constructor(signal, args, listener) {
+ this.asyncContext = getAsyncContext();
// The abort signal triggers a non-graceful shutdown
signal?.addEventListener(
"abort",
@@ -491,82 +514,89 @@ function fastSyncResponseOrStream(
*/
function mapToCallback(context, callback, onError) {
return async function (req) {
- // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
- // 500 error.
- let innerRequest;
- let response;
- try {
- innerRequest = new InnerRequest(req, context);
- const request = fromInnerRequest(innerRequest, "immutable");
- innerRequest.request = request;
- response = await callback(
- request,
- new ServeHandlerInfo(innerRequest),
- );
-
- // Throwing Error if the handler return value is not a Response class
- if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
- throw new TypeError(
- "Return value from serve handler must be a response or a promise resolving to a response",
- );
- }
+ const asyncContext = getAsyncContext();
+ setAsyncContext(context.asyncContext);
- if (response.type === "error") {
- throw new TypeError(
- "Return value from serve handler must not be an error response (like Response.error())",
+ try {
+ // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
+ // 500 error.
+ let innerRequest;
+ let response;
+ try {
+ innerRequest = new InnerRequest(req, context);
+ const request = fromInnerRequest(innerRequest, "immutable");
+ innerRequest.request = request;
+ response = await callback(
+ request,
+ new ServeHandlerInfo(innerRequest),
);
- }
- if (response.bodyUsed) {
- throw new TypeError(
- "The body of the Response returned from the serve handler has already been consumed",
- );
- }
- } catch (error) {
- try {
- response = await onError(error);
+ // Throwing Error if the handler return value is not a Response class
if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
throw new TypeError(
- "Return value from onError handler must be a response or a promise resolving to a response",
+ "Return value from serve handler must be a response or a promise resolving to a response",
+ );
+ }
+
+ if (response.type === "error") {
+ throw new TypeError(
+ "Return value from serve handler must not be an error response (like Response.error())",
+ );
+ }
+
+ if (response.bodyUsed) {
+ throw new TypeError(
+ "The body of the Response returned from the serve handler has already been consumed",
);
}
} catch (error) {
- // deno-lint-ignore no-console
- console.error("Exception in onError while handling exception", error);
- response = internalServerError();
+ try {
+ response = await onError(error);
+ if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
+ throw new TypeError(
+ "Return value from onError handler must be a response or a promise resolving to a response",
+ );
+ }
+ } catch (error) {
+ // deno-lint-ignore no-console
+ console.error("Exception in onError while handling exception", error);
+ response = internalServerError();
+ }
}
- }
- const inner = toInnerResponse(response);
- if (innerRequest?.[_upgraded]) {
- // We're done here as the connection has been upgraded during the callback and no longer requires servicing.
- if (response !== UPGRADE_RESPONSE_SENTINEL) {
- // deno-lint-ignore no-console
- console.error("Upgrade response was not returned from callback");
- context.close();
+ const inner = toInnerResponse(response);
+ if (innerRequest?.[_upgraded]) {
+ // We're done here as the connection has been upgraded during the callback and no longer requires servicing.
+ if (response !== UPGRADE_RESPONSE_SENTINEL) {
+ // deno-lint-ignore no-console
+ console.error("Upgrade response was not returned from callback");
+ context.close();
+ }
+ innerRequest?.[_upgraded]();
+ return;
}
- innerRequest?.[_upgraded]();
- return;
- }
- // Did everything shut down while we were waiting?
- if (context.closed) {
- // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
- innerRequest?.close();
- op_http_set_promise_complete(req, 503);
- return;
- }
+ // Did everything shut down while we were waiting?
+ if (context.closed) {
+ // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
+ innerRequest?.close();
+ op_http_set_promise_complete(req, 503);
+ return;
+ }
- const status = inner.status;
- const headers = inner.headerList;
- if (headers && headers.length > 0) {
- if (headers.length == 1) {
- op_http_set_response_header(req, headers[0][0], headers[0][1]);
- } else {
- op_http_set_response_headers(req, headers);
+ const status = inner.status;
+ const headers = inner.headerList;
+ if (headers && headers.length > 0) {
+ if (headers.length == 1) {
+ op_http_set_response_header(req, headers[0][0], headers[0][1]);
+ } else {
+ op_http_set_response_headers(req, headers);
+ }
}
- }
- fastSyncResponseOrStream(req, inner.body, status, innerRequest);
+ fastSyncResponseOrStream(req, inner.body, status, innerRequest);
+ } finally {
+ setAsyncContext(asyncContext);
+ }
};
}
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index b7637bec3..ed98fe349 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -2,7 +2,7 @@
[package]
name = "deno_http"
-version = "0.169.0"
+version = "0.175.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
diff --git a/ext/http/fly_accept_encoding.rs b/ext/http/fly_accept_encoding.rs
index 94e336876..4d6fd2231 100644
--- a/ext/http/fly_accept_encoding.rs
+++ b/ext/http/fly_accept_encoding.rs
@@ -119,7 +119,7 @@ fn encodings_iter_inner<'s>(
};
Some(Ok((encoding, qval)))
})
- .map(|r| r?) // flatten Result<Result<...
+ .flatten()
}
#[cfg(test)]
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index efe1b88c9..7dbac6021 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -18,8 +18,8 @@ use crate::service::HttpServerState;
use crate::service::SignallingRc;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
+use crate::Options;
use cache_control::CacheControl;
-use deno_core::error::AnyError;
use deno_core::external;
use deno_core::futures::future::poll_fn;
use deno_core::futures::TryFutureExt;
@@ -146,12 +146,32 @@ macro_rules! clone_external {
}};
}
+#[derive(Debug, thiserror::Error)]
+pub enum HttpNextError {
+ #[error(transparent)]
+ Resource(deno_core::error::AnyError),
+ #[error("{0}")]
+ Io(#[from] io::Error),
+ #[error(transparent)]
+ WebSocketUpgrade(crate::websocket_upgrade::WebSocketUpgradeError),
+ #[error("{0}")]
+ Hyper(#[from] hyper::Error),
+ #[error(transparent)]
+ JoinError(#[from] tokio::task::JoinError),
+ #[error(transparent)]
+ Canceled(#[from] deno_core::Canceled),
+ #[error(transparent)]
+ HttpPropertyExtractor(deno_core::error::AnyError),
+ #[error(transparent)]
+ UpgradeUnavailable(#[from] crate::service::UpgradeUnavailableError),
+}
+
#[op2(fast)]
#[smi]
pub fn op_http_upgrade_raw(
state: &mut OpState,
external: *const c_void,
-) -> Result<ResourceId, AnyError> {
+) -> Result<ResourceId, HttpNextError> {
// SAFETY: external is deleted before calling this op.
let http = unsafe { take_external!(external, "op_http_upgrade_raw") };
@@ -177,7 +197,7 @@ pub fn op_http_upgrade_raw(
upgraded.write_all(&bytes).await?;
break upgraded;
}
- Err(err) => return Err(err),
+ Err(err) => return Err(HttpNextError::WebSocketUpgrade(err)),
}
};
@@ -193,7 +213,7 @@ pub fn op_http_upgrade_raw(
}
read_tx.write_all(&buf[..read]).await?;
}
- Ok::<_, AnyError>(())
+ Ok::<_, HttpNextError>(())
});
spawn(async move {
let mut buf = [0; 1024];
@@ -204,7 +224,7 @@ pub fn op_http_upgrade_raw(
}
upgraded_tx.write_all(&buf[..read]).await?;
}
- Ok::<_, AnyError>(())
+ Ok::<_, HttpNextError>(())
});
Ok(())
@@ -223,7 +243,7 @@ pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>,
external: *const c_void,
#[serde] headers: Vec<(ByteString, ByteString)>,
-) -> Result<ResourceId, AnyError> {
+) -> Result<ResourceId, HttpNextError> {
let http =
// SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_upgrade_websocket_next") };
@@ -246,7 +266,11 @@ pub async fn op_http_upgrade_websocket_next(
// Stage 3: take the extracted raw network stream and upgrade it to a websocket, then return it
let (stream, bytes) = extract_network_stream(upgraded);
- ws_create_server_stream(&mut state.borrow_mut(), stream, bytes)
+ Ok(ws_create_server_stream(
+ &mut state.borrow_mut(),
+ stream,
+ bytes,
+ ))
}
#[op2(fast)]
@@ -296,7 +320,7 @@ where
let authority: v8::Local<v8::Value> = match request_properties.authority {
Some(authority) => v8::String::new_from_utf8(
scope,
- authority.as_ref(),
+ authority.as_bytes(),
v8::NewStringType::Normal,
)
.unwrap()
@@ -305,15 +329,25 @@ where
};
// Only extract the path part - we handle authority elsewhere
- let path = match &request_parts.uri.path_and_query() {
- Some(path_and_query) => path_and_query.to_string(),
- None => "".to_owned(),
+ let path = match request_parts.uri.path_and_query() {
+ Some(path_and_query) => {
+ let path = path_and_query.as_str();
+ if matches!(path.as_bytes().first(), Some(b'/' | b'*')) {
+ Cow::Borrowed(path)
+ } else {
+ Cow::Owned(format!("/{}", path))
+ }
+ }
+ None => Cow::Borrowed(""),
};
- let path: v8::Local<v8::Value> =
- v8::String::new_from_utf8(scope, path.as_ref(), v8::NewStringType::Normal)
- .unwrap()
- .into();
+ let path: v8::Local<v8::Value> = v8::String::new_from_utf8(
+ scope,
+ path.as_bytes(),
+ v8::NewStringType::Normal,
+ )
+ .unwrap()
+ .into();
let peer_address: v8::Local<v8::Value> = v8::String::new_from_utf8(
scope,
@@ -531,6 +565,7 @@ fn is_request_compressible(
match accept_encoding.to_str() {
// Firefox and Chrome send this -- no need to parse
Ok("gzip, deflate, br") => return Compression::Brotli,
+ Ok("gzip, deflate, br, zstd") => return Compression::Brotli,
Ok("gzip") => return Compression::GZip,
Ok("br") => return Compression::Brotli,
_ => (),
@@ -667,6 +702,27 @@ fn set_response(
http.complete();
}
+#[op2(fast)]
+pub fn op_http_get_request_cancelled(external: *const c_void) -> bool {
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_cancelled") };
+ http.cancelled()
+}
+
+#[op2(async)]
+pub async fn op_http_request_on_cancel(external: *const c_void) {
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_request_on_cancel") };
+ let (tx, rx) = tokio::sync::oneshot::channel();
+
+ http.on_cancel(tx);
+ drop(http);
+
+ rx.await.ok();
+}
+
/// Returned promise resolves when body streaming finishes.
/// Call [`op_http_close_after_finish`] when done with the external.
#[op2(async)]
@@ -676,7 +732,7 @@ pub async fn op_http_set_response_body_resource(
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
-) -> Result<bool, AnyError> {
+) -> Result<bool, HttpNextError> {
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_body_resource") };
@@ -691,9 +747,15 @@ pub async fn op_http_set_response_body_resource(
let resource = {
let mut state = state.borrow_mut();
if auto_close {
- state.resource_table.take_any(stream_rid)?
+ state
+ .resource_table
+ .take_any(stream_rid)
+ .map_err(HttpNextError::Resource)?
} else {
- state.resource_table.get_any(stream_rid)?
+ state
+ .resource_table
+ .get_any(stream_rid)
+ .map_err(HttpNextError::Resource)?
}
};
@@ -760,10 +822,16 @@ fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
+ http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>,
) -> impl Future<Output = Result<(), hyper::Error>> + 'static {
- let conn = http1::Builder::new()
- .keep_alive(true)
- .writev(*USE_WRITEV)
+ let mut builder = http1::Builder::new();
+ builder.keep_alive(true).writev(*USE_WRITEV);
+
+ if let Some(http1_builder_hook) = http1_builder_hook {
+ builder = http1_builder_hook(builder);
+ }
+
+ let conn = builder
.serve_connection(TokioIo::new(io), svc)
.with_upgrades();
@@ -782,9 +850,17 @@ fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
+ http2_builder_hook: Option<
+ fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>,
+ >,
) -> impl Future<Output = Result<(), hyper::Error>> + 'static {
- let conn =
- http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
+ let mut builder = http2::Builder::new(LocalExecutor);
+
+ if let Some(http2_builder_hook) = http2_builder_hook {
+ builder = http2_builder_hook(builder);
+ }
+
+ let conn = builder.serve_connection(TokioIo::new(io), svc);
async {
match conn.or_abort(cancel).await {
Err(mut conn) => {
@@ -800,17 +876,18 @@ async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
-) -> Result<(), AnyError> {
+ options: Options,
+) -> Result<(), HttpNextError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
- serve_http2_unconditional(io, svc, cancel)
+ serve_http2_unconditional(io, svc, cancel, options.http2_builder_hook)
.await
- .map_err(|e| e.into())
+ .map_err(HttpNextError::Hyper)
} else {
- serve_http11_unconditional(io, svc, cancel)
+ serve_http11_unconditional(io, svc, cancel, options.http1_builder_hook)
.await
- .map_err(|e| e.into())
+ .map_err(HttpNextError::Hyper)
}
}
@@ -819,7 +896,8 @@ fn serve_https(
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
-) -> JoinHandle<Result<(), AnyError>> {
+ options: Options,
+) -> JoinHandle<Result<(), HttpNextError>> {
let HttpLifetime {
server_state,
connection_cancel_handle,
@@ -830,21 +908,31 @@ fn serve_https(
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
});
spawn(
- async {
+ async move {
let handshake = io.handshake().await?;
// If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
// based on the prefix bytes
let handshake = handshake.alpn;
if Some(TLS_ALPN_HTTP_2) == handshake.as_deref() {
- serve_http2_unconditional(io, svc, listen_cancel_handle)
- .await
- .map_err(|e| e.into())
+ serve_http2_unconditional(
+ io,
+ svc,
+ listen_cancel_handle,
+ options.http2_builder_hook,
+ )
+ .await
+ .map_err(HttpNextError::Hyper)
} else if Some(TLS_ALPN_HTTP_11) == handshake.as_deref() {
- serve_http11_unconditional(io, svc, listen_cancel_handle)
- .await
- .map_err(|e| e.into())
+ serve_http11_unconditional(
+ io,
+ svc,
+ listen_cancel_handle,
+ options.http1_builder_hook,
+ )
+ .await
+ .map_err(HttpNextError::Hyper)
} else {
- serve_http2_autodetect(io, svc, listen_cancel_handle).await
+ serve_http2_autodetect(io, svc, listen_cancel_handle, options).await
}
}
.try_or_cancel(connection_cancel_handle),
@@ -856,7 +944,8 @@ fn serve_http(
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
-) -> JoinHandle<Result<(), AnyError>> {
+ options: Options,
+) -> JoinHandle<Result<(), HttpNextError>> {
let HttpLifetime {
server_state,
connection_cancel_handle,
@@ -867,7 +956,7 @@ fn serve_http(
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
});
spawn(
- serve_http2_autodetect(io, svc, listen_cancel_handle)
+ serve_http2_autodetect(io, svc, listen_cancel_handle, options)
.try_or_cancel(connection_cancel_handle),
)
}
@@ -877,7 +966,8 @@ fn serve_http_on<HTTP>(
listen_properties: &HttpListenProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
-) -> JoinHandle<Result<(), AnyError>>
+ options: Options,
+) -> JoinHandle<Result<(), HttpNextError>>
where
HTTP: HttpPropertyExtractor,
{
@@ -888,14 +978,14 @@ where
match network_stream {
NetworkStream::Tcp(conn) => {
- serve_http(conn, connection_properties, lifetime, tx)
+ serve_http(conn, connection_properties, lifetime, tx, options)
}
NetworkStream::Tls(conn) => {
- serve_https(conn, connection_properties, lifetime, tx)
+ serve_https(conn, connection_properties, lifetime, tx, options)
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
- serve_http(conn, connection_properties, lifetime, tx)
+ serve_http(conn, connection_properties, lifetime, tx, options)
}
}
}
@@ -908,7 +998,7 @@ struct HttpLifetime {
}
struct HttpJoinHandle {
- join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
+ join_handle: AsyncRefCell<Option<JoinHandle<Result<(), HttpNextError>>>>,
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
@@ -968,12 +1058,13 @@ impl Drop for HttpJoinHandle {
pub fn op_http_serve<HTTP>(
state: Rc<RefCell<OpState>>,
#[smi] listener_rid: ResourceId,
-) -> Result<(ResourceId, &'static str, String), AnyError>
+) -> Result<(ResourceId, &'static str, String), HttpNextError>
where
HTTP: HttpPropertyExtractor,
{
let listener =
- HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?;
+ HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)
+ .map_err(HttpNextError::Resource)?;
let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
@@ -983,21 +1074,28 @@ where
let lifetime = resource.lifetime();
+ let options = {
+ let state = state.borrow();
+ *state.borrow::<Options>()
+ };
+
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn(async move {
loop {
let conn = HTTP::accept_connection_from_listener(&listener)
.try_or_cancel(listen_cancel_clone.clone())
- .await?;
+ .await
+ .map_err(HttpNextError::HttpPropertyExtractor)?;
serve_http_on::<HTTP>(
conn,
&listen_properties_clone,
lifetime.clone(),
tx.clone(),
+ options,
);
}
#[allow(unreachable_code)]
- Ok::<_, AnyError>(())
+ Ok::<_, HttpNextError>(())
});
// Set the handle after we start the future
@@ -1017,25 +1115,31 @@ where
pub fn op_http_serve_on<HTTP>(
state: Rc<RefCell<OpState>>,
#[smi] connection_rid: ResourceId,
-) -> Result<(ResourceId, &'static str, String), AnyError>
+) -> Result<(ResourceId, &'static str, String), HttpNextError>
where
HTTP: HttpPropertyExtractor,
{
let connection =
- HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?;
+ HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)
+ .map_err(HttpNextError::Resource)?;
let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
- let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
- serve_http_on::<HTTP>(
- connection,
- &listen_properties,
- resource.lifetime(),
- tx,
- );
+ let options = {
+ let state = state.borrow();
+ *state.borrow::<Options>()
+ };
+
+ let handle = serve_http_on::<HTTP>(
+ connection,
+ &listen_properties,
+ resource.lifetime(),
+ tx,
+ options,
+ );
// Set the handle after we start the future
*RcRef::map(&resource, |this| &this.join_handle)
@@ -1081,12 +1185,13 @@ pub fn op_http_try_wait(
pub async fn op_http_wait(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<*const c_void, AnyError> {
+) -> Result<*const c_void, HttpNextError> {
// We will get the join handle initially, as we might be consuming requests still
let join_handle = state
.borrow_mut()
.resource_table
- .get::<HttpJoinHandle>(rid)?;
+ .get::<HttpJoinHandle>(rid)
+ .map_err(HttpNextError::Resource)?;
let cancel = join_handle.listen_cancel_handle();
let next = async {
@@ -1113,13 +1218,12 @@ pub async fn op_http_wait(
// Filter out shutdown (ENOTCONN) errors
if let Err(err) = res {
- if let Some(err) = err.source() {
- if let Some(err) = err.downcast_ref::<io::Error>() {
- if err.kind() == io::ErrorKind::NotConnected {
- return Ok(null());
- }
+ if let HttpNextError::Io(err) = &err {
+ if err.kind() == io::ErrorKind::NotConnected {
+ return Ok(null());
}
}
+
return Err(err);
}
@@ -1132,7 +1236,7 @@ pub fn op_http_cancel(
state: &mut OpState,
#[smi] rid: ResourceId,
graceful: bool,
-) -> Result<(), AnyError> {
+) -> Result<(), deno_core::error::AnyError> {
let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;
if graceful {
@@ -1152,11 +1256,12 @@ pub async fn op_http_close(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
graceful: bool,
-) -> Result<(), AnyError> {
+) -> Result<(), HttpNextError> {
let join_handle = state
.borrow_mut()
.resource_table
- .take::<HttpJoinHandle>(rid)?;
+ .take::<HttpJoinHandle>(rid)
+ .map_err(HttpNextError::Resource)?;
if graceful {
http_general_trace!("graceful shutdown");
@@ -1202,23 +1307,26 @@ impl UpgradeStream {
}
}
- async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
+ async fn read(
+ self: Rc<Self>,
+ buf: &mut [u8],
+ ) -> Result<usize, std::io::Error> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let read = RcRef::map(self, |this| &this.read);
let mut read = read.borrow_mut().await;
- Ok(Pin::new(&mut *read).read(buf).await?)
+ Pin::new(&mut *read).read(buf).await
}
.try_or_cancel(cancel_handle)
.await
}
- async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
+ async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, std::io::Error> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let write = RcRef::map(self, |this| &this.write);
let mut write = write.borrow_mut().await;
- Ok(Pin::new(&mut *write).write(buf).await?)
+ Pin::new(&mut *write).write(buf).await
}
.try_or_cancel(cancel_handle)
.await
@@ -1228,7 +1336,7 @@ impl UpgradeStream {
self: Rc<Self>,
buf1: &[u8],
buf2: &[u8],
- ) -> Result<usize, AnyError> {
+ ) -> Result<usize, std::io::Error> {
let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;
let total = buf1.len() + buf2.len();
@@ -1281,9 +1389,12 @@ pub async fn op_raw_write_vectored(
#[smi] rid: ResourceId,
#[buffer] buf1: JsBuffer,
#[buffer] buf2: JsBuffer,
-) -> Result<usize, AnyError> {
- let resource: Rc<UpgradeStream> =
- state.borrow().resource_table.get::<UpgradeStream>(rid)?;
+) -> Result<usize, HttpNextError> {
+ let resource: Rc<UpgradeStream> = state
+ .borrow()
+ .resource_table
+ .get::<UpgradeStream>(rid)
+ .map_err(HttpNextError::Resource)?;
let nwritten = resource.write_vectored(&buf1, &buf2).await?;
Ok(nwritten)
}
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 934f8a002..39b0bbc2a 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -6,8 +6,6 @@ use async_compression::Level;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use cache_control::CacheControl;
-use deno_core::error::custom_error;
-use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::channel::oneshot;
use deno_core::futures::future::pending;
@@ -41,6 +39,8 @@ use deno_net::raw::NetworkStream;
use deno_websocket::ws_create_server_stream;
use flate2::write::GzEncoder;
use flate2::Compression;
+use hyper::server::conn::http1;
+use hyper::server::conn::http2;
use hyper_util::rt::TokioIo;
use hyper_v014::body::Bytes;
use hyper_v014::body::HttpBody;
@@ -89,11 +89,33 @@ mod service;
mod websocket_upgrade;
use fly_accept_encoding::Encoding;
+pub use http_next::HttpNextError;
pub use request_properties::DefaultHttpPropertyExtractor;
pub use request_properties::HttpConnectionProperties;
pub use request_properties::HttpListenProperties;
pub use request_properties::HttpPropertyExtractor;
pub use request_properties::HttpRequestProperties;
+pub use service::UpgradeUnavailableError;
+pub use websocket_upgrade::WebSocketUpgradeError;
+
+#[derive(Debug, Default, Clone, Copy)]
+pub struct Options {
+ /// By passing a hook function, the caller can customize various configuration
+ /// options for the HTTP/2 server.
+ /// See [`http2::Builder`] for what parameters can be customized.
+ ///
+ /// If `None`, the default configuration provided by hyper will be used. Note
+ /// that the default configuration is subject to change in future versions.
+ pub http2_builder_hook:
+ Option<fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>>,
+ /// By passing a hook function, the caller can customize various configuration
+ /// options for the HTTP/1 server.
+ /// See [`http1::Builder`] for what parameters can be customized.
+ ///
+ /// If `None`, the default configuration provided by hyper will be used. Note
+ /// that the default configuration is subject to change in future versions.
+ pub http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>,
+}
deno_core::extension!(
deno_http,
@@ -111,7 +133,9 @@ deno_core::extension!(
http_next::op_http_close_after_finish,
http_next::op_http_get_request_header,
http_next::op_http_get_request_headers,
+ http_next::op_http_request_on_cancel,
http_next::op_http_get_request_method_and_url<HTTP>,
+ http_next::op_http_get_request_cancelled,
http_next::op_http_read_request_body,
http_next::op_http_serve_on<HTTP>,
http_next::op_http_serve<HTTP>,
@@ -132,8 +156,46 @@ deno_core::extension!(
http_next::op_http_cancel,
],
esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
+ options = {
+ options: Options,
+ },
+ state = |state, options| {
+ state.put::<Options>(options.options);
+ }
);
+#[derive(Debug, thiserror::Error)]
+pub enum HttpError {
+ #[error(transparent)]
+ Resource(deno_core::error::AnyError),
+ #[error(transparent)]
+ Canceled(#[from] deno_core::Canceled),
+ #[error("{0}")]
+ HyperV014(#[source] Arc<hyper_v014::Error>),
+ #[error("{0}")]
+ InvalidHeaderName(#[from] hyper_v014::header::InvalidHeaderName),
+ #[error("{0}")]
+ InvalidHeaderValue(#[from] hyper_v014::header::InvalidHeaderValue),
+ #[error("{0}")]
+ Http(#[from] hyper_v014::http::Error),
+ #[error("response headers already sent")]
+ ResponseHeadersAlreadySent,
+ #[error("connection closed while sending response")]
+ ConnectionClosedWhileSendingResponse,
+ #[error("already in use")]
+ AlreadyInUse,
+ #[error("{0}")]
+ Io(#[from] std::io::Error),
+ #[error("no response headers")]
+ NoResponseHeaders,
+ #[error("response already completed")]
+ ResponseAlreadyCompleted,
+ #[error("cannot upgrade because request body was used")]
+ UpgradeBodyUsed,
+ #[error(transparent)]
+ Other(deno_core::error::AnyError),
+}
+
pub enum HttpSocketAddr {
IpSocket(std::net::SocketAddr),
#[cfg(unix)]
@@ -216,7 +278,7 @@ impl HttpConnResource {
String,
String,
)>,
- AnyError,
+ HttpError,
> {
let fut = async {
let (request_tx, request_rx) = oneshot::channel();
@@ -259,8 +321,8 @@ impl HttpConnResource {
}
/// A future that completes when this HTTP connection is closed or errors.
- async fn closed(&self) -> Result<(), AnyError> {
- self.closed_fut.clone().map_err(AnyError::from).await
+ async fn closed(&self) -> Result<(), HttpError> {
+ self.closed_fut.clone().map_err(HttpError::HyperV014).await
}
}
@@ -280,14 +342,13 @@ pub fn http_create_conn_resource<S, A>(
io: S,
addr: A,
scheme: &'static str,
-) -> Result<ResourceId, AnyError>
+) -> ResourceId
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
A: Into<HttpSocketAddr>,
{
let conn = HttpConnResource::new(io, scheme, addr.into());
- let rid = state.resource_table.add(conn);
- Ok(rid)
+ state.resource_table.add(conn)
}
/// An object that implements the `hyper::Service` trait, through which Hyper
@@ -423,7 +484,9 @@ impl Resource for HttpStreamReadResource {
// safely call `await` on it without creating a race condition.
Some(_) => match body.as_mut().next().await.unwrap() {
Ok(chunk) => assert!(chunk.is_empty()),
- Err(err) => break Err(AnyError::from(err)),
+ Err(err) => {
+ break Err(HttpError::HyperV014(Arc::new(err)).into())
+ }
},
None => break Ok(BufView::empty()),
}
@@ -545,8 +608,12 @@ struct NextRequestResponse(
async fn op_http_accept(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<Option<NextRequestResponse>, AnyError> {
- let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?;
+) -> Result<Option<NextRequestResponse>, HttpError> {
+ let conn = state
+ .borrow()
+ .resource_table
+ .get::<HttpConnResource>(rid)
+ .map_err(HttpError::Resource)?;
match conn.accept().await {
Ok(Some((read_stream, write_stream, method, url))) => {
@@ -657,11 +724,12 @@ async fn op_http_write_headers(
#[smi] status: u16,
#[serde] headers: Vec<(ByteString, ByteString)>,
#[serde] data: Option<StringOrBuffer>,
-) -> Result<(), AnyError> {
+) -> Result<(), HttpError> {
let stream = state
.borrow_mut()
.resource_table
- .get::<HttpStreamWriteResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)
+ .map_err(HttpError::Resource)?;
// Track supported encoding
let encoding = stream.accept_encoding;
@@ -708,14 +776,14 @@ async fn op_http_write_headers(
let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let response_tx = match replace(&mut *old_wr, new_wr) {
HttpResponseWriter::Headers(response_tx) => response_tx,
- _ => return Err(http_error("response headers already sent")),
+ _ => return Err(HttpError::ResponseHeadersAlreadySent),
};
match response_tx.send(body) {
Ok(_) => Ok(()),
Err(_) => {
stream.conn.closed().await?;
- Err(http_error("connection closed while sending response"))
+ Err(HttpError::ConnectionClosedWhileSendingResponse)
}
}
}
@@ -725,11 +793,14 @@ async fn op_http_write_headers(
fn op_http_headers(
state: &mut OpState,
#[smi] rid: u32,
-) -> Result<Vec<(ByteString, ByteString)>, AnyError> {
- let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?;
+) -> Result<Vec<(ByteString, ByteString)>, HttpError> {
+ let stream = state
+ .resource_table
+ .get::<HttpStreamReadResource>(rid)
+ .map_err(HttpError::Resource)?;
let rd = RcRef::map(&stream, |r| &r.rd)
.try_borrow()
- .ok_or_else(|| http_error("already in use"))?;
+ .ok_or(HttpError::AlreadyInUse)?;
match &*rd {
HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())),
HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)),
@@ -741,7 +812,7 @@ fn http_response(
data: Option<StringOrBuffer>,
compressing: bool,
encoding: Encoding,
-) -> Result<(HttpResponseWriter, hyper_v014::Body), AnyError> {
+) -> Result<(HttpResponseWriter, hyper_v014::Body), HttpError> {
// Gzip, after level 1, doesn't produce significant size difference.
// This default matches nginx default gzip compression level (1):
// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
@@ -878,25 +949,34 @@ async fn op_http_write_resource(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[smi] stream: ResourceId,
-) -> Result<(), AnyError> {
+) -> Result<(), HttpError> {
let http_stream = state
.borrow()
.resource_table
- .get::<HttpStreamWriteResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)
+ .map_err(HttpError::Resource)?;
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
- let resource = state.borrow().resource_table.get_any(stream)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(stream)
+ .map_err(HttpError::Resource)?;
loop {
match *wr {
HttpResponseWriter::Headers(_) => {
- return Err(http_error("no response headers"))
+ return Err(HttpError::NoResponseHeaders)
}
HttpResponseWriter::Closed => {
- return Err(http_error("response already completed"))
+ return Err(HttpError::ResponseAlreadyCompleted)
}
_ => {}
};
- let view = resource.clone().read(64 * 1024).await?; // 64KB
+ let view = resource
+ .clone()
+ .read(64 * 1024)
+ .await
+ .map_err(HttpError::Other)?; // 64KB
if view.is_empty() {
break;
}
@@ -937,16 +1017,17 @@ async fn op_http_write(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[buffer] buf: JsBuffer,
-) -> Result<(), AnyError> {
+) -> Result<(), HttpError> {
let stream = state
.borrow()
.resource_table
- .get::<HttpStreamWriteResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)
+ .map_err(HttpError::Resource)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
match &mut *wr {
- HttpResponseWriter::Headers(_) => Err(http_error("no response headers")),
- HttpResponseWriter::Closed => Err(http_error("response already completed")),
+ HttpResponseWriter::Headers(_) => Err(HttpError::NoResponseHeaders),
+ HttpResponseWriter::Closed => Err(HttpError::ResponseAlreadyCompleted),
HttpResponseWriter::Body { writer, .. } => {
let mut result = writer.write_all(&buf).await;
if result.is_ok() {
@@ -961,7 +1042,7 @@ async fn op_http_write(
stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
- Err(http_error("response already completed"))
+ Err(HttpError::ResponseAlreadyCompleted)
}
}
}
@@ -975,7 +1056,7 @@ async fn op_http_write(
stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
- Err(http_error("response already completed"))
+ Err(HttpError::ResponseAlreadyCompleted)
}
}
}
@@ -989,11 +1070,12 @@ async fn op_http_write(
async fn op_http_shutdown(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<(), AnyError> {
+) -> Result<(), HttpError> {
let stream = state
.borrow()
.resource_table
- .get::<HttpStreamWriteResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)
+ .map_err(HttpError::Resource)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let wr = take(&mut *wr);
match wr {
@@ -1022,14 +1104,12 @@ async fn op_http_shutdown(
#[op2]
#[string]
-fn op_http_websocket_accept_header(
- #[string] key: String,
-) -> Result<String, AnyError> {
+fn op_http_websocket_accept_header(#[string] key: String) -> String {
let digest = ring::digest::digest(
&ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(),
);
- Ok(BASE64_STANDARD.encode(digest))
+ BASE64_STANDARD.encode(digest)
}
#[op2(async)]
@@ -1037,30 +1117,34 @@ fn op_http_websocket_accept_header(
async fn op_http_upgrade_websocket(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<ResourceId, AnyError> {
+) -> Result<ResourceId, HttpError> {
let stream = state
.borrow_mut()
.resource_table
- .get::<HttpStreamReadResource>(rid)?;
+ .get::<HttpStreamReadResource>(rid)
+ .map_err(HttpError::Resource)?;
let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
let request = match &mut *rd {
HttpRequestReader::Headers(request) => request,
- _ => {
- return Err(http_error("cannot upgrade because request body was used"))
- }
+ _ => return Err(HttpError::UpgradeBodyUsed),
};
- let (transport, bytes) =
- extract_network_stream(hyper_v014::upgrade::on(request).await?);
- let ws_rid =
- ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?;
- Ok(ws_rid)
+ let (transport, bytes) = extract_network_stream(
+ hyper_v014::upgrade::on(request)
+ .await
+ .map_err(|err| HttpError::HyperV014(Arc::new(err)))?,
+ );
+ Ok(ws_create_server_stream(
+ &mut state.borrow_mut(),
+ transport,
+ bytes,
+ ))
}
// Needed so hyper can use non Send futures
#[derive(Clone)]
-struct LocalExecutor;
+pub struct LocalExecutor;
impl<Fut> hyper_v014::rt::Executor<Fut> for LocalExecutor
where
@@ -1082,10 +1166,6 @@ where
}
}
-fn http_error(message: &'static str) -> AnyError {
- custom_error("Http", message)
-}
-
/// Filters out the ever-surprising 'shutdown ENOTCONN' errors.
fn filter_enotconn(
result: Result<(), hyper_v014::Error>,
diff --git a/ext/http/request_body.rs b/ext/http/request_body.rs
index 45df12457..f1c3f358e 100644
--- a/ext/http/request_body.rs
+++ b/ext/http/request_body.rs
@@ -1,9 +1,9 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use bytes::Bytes;
-use deno_core::error::AnyError;
use deno_core::futures::stream::Peekable;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
+use deno_core::futures::TryFutureExt;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
@@ -22,7 +22,7 @@ use std::task::Poll;
struct ReadFuture(Incoming);
impl Stream for ReadFuture {
- type Item = Result<Bytes, AnyError>;
+ type Item = Result<Bytes, hyper::Error>;
fn poll_next(
self: Pin<&mut Self>,
@@ -37,13 +37,13 @@ impl Stream for ReadFuture {
if let Ok(data) = frame.into_data() {
// Ensure that we never yield an empty frame
if !data.is_empty() {
- break Poll::Ready(Some(Ok::<_, AnyError>(data)));
+ break Poll::Ready(Some(Ok(data)));
}
}
// Loop again so we don't lose the waker
continue;
}
- Some(Err(e)) => Poll::Ready(Some(Err(e.into()))),
+ Some(Err(e)) => Poll::Ready(Some(Err(e))),
None => Poll::Ready(None),
};
}
@@ -58,7 +58,7 @@ impl HttpRequestBody {
Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint)
}
- async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> {
+ async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, hyper::Error> {
let peekable = RcRef::map(self, |this| &this.0);
let mut peekable = peekable.borrow_mut().await;
match Pin::new(&mut *peekable).peek_mut().await {
@@ -82,7 +82,7 @@ impl Resource for HttpRequestBody {
}
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
- Box::pin(HttpRequestBody::read(self, limit))
+ Box::pin(HttpRequestBody::read(self, limit).map_err(Into::into))
}
fn size_hint(&self) -> (u64, Option<u64>) {
diff --git a/ext/http/request_properties.rs b/ext/http/request_properties.rs
index 1422c7417..39d35a79f 100644
--- a/ext/http/request_properties.rs
+++ b/ext/http/request_properties.rs
@@ -34,8 +34,8 @@ pub struct HttpConnectionProperties {
pub stream_type: NetworkStreamType,
}
-pub struct HttpRequestProperties {
- pub authority: Option<String>,
+pub struct HttpRequestProperties<'a> {
+ pub authority: Option<Cow<'a, str>>,
}
/// Pluggable trait to determine listen, connection and request properties
@@ -84,11 +84,11 @@ pub trait HttpPropertyExtractor {
) -> NetworkStream;
/// Determines the request properties.
- fn request_properties(
- connection_properties: &HttpConnectionProperties,
- uri: &Uri,
- headers: &HeaderMap,
- ) -> HttpRequestProperties;
+ fn request_properties<'a>(
+ connection_properties: &'a HttpConnectionProperties,
+ uri: &'a Uri,
+ headers: &'a HeaderMap,
+ ) -> HttpRequestProperties<'a>;
}
pub struct DefaultHttpPropertyExtractor {}
@@ -180,18 +180,17 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
}
}
- fn request_properties(
- connection_properties: &HttpConnectionProperties,
- uri: &Uri,
- headers: &HeaderMap,
- ) -> HttpRequestProperties {
+ fn request_properties<'a>(
+ connection_properties: &'a HttpConnectionProperties,
+ uri: &'a Uri,
+ headers: &'a HeaderMap,
+ ) -> HttpRequestProperties<'a> {
let authority = req_host(
uri,
headers,
connection_properties.stream_type,
connection_properties.local_port.unwrap_or_default(),
- )
- .map(|s| s.into_owned());
+ );
HttpRequestProperties { authority }
}
diff --git a/ext/http/service.rs b/ext/http/service.rs
index 787e9babf..ce24dea43 100644
--- a/ext/http/service.rs
+++ b/ext/http/service.rs
@@ -2,7 +2,6 @@
use crate::request_properties::HttpConnectionProperties;
use crate::response_body::ResponseBytesInner;
use crate::response_body::ResponseStreamResult;
-use deno_core::error::AnyError;
use deno_core::futures::ready;
use deno_core::BufView;
use deno_core::OpState;
@@ -28,6 +27,7 @@ use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
+use tokio::sync::oneshot;
pub type Request = hyper::Request<Incoming>;
pub type Response = hyper::Response<HttpRecordResponse>;
@@ -206,8 +206,13 @@ pub(crate) async fn handle_request(
Ok(response)
}
+#[derive(Debug, thiserror::Error)]
+#[error("upgrade unavailable")]
+pub struct UpgradeUnavailableError;
+
struct HttpRecordInner {
server_state: SignallingRc<HttpServerState>,
+ closed_channel: Option<oneshot::Sender<()>>,
request_info: HttpConnectionProperties,
request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
@@ -273,6 +278,7 @@ impl HttpRecord {
response_body_finished: false,
response_body_waker: None,
trailers: None,
+ closed_channel: None,
been_dropped: false,
finished: false,
needs_close_after_finish: false,
@@ -309,6 +315,10 @@ impl HttpRecord {
RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish)
}
+ pub fn on_cancel(&self, sender: oneshot::Sender<()>) {
+ self.self_mut().closed_channel = Some(sender);
+ }
+
fn recycle(self: Rc<Self>) {
assert!(
Rc::strong_count(&self) == 1,
@@ -344,14 +354,14 @@ impl HttpRecord {
}
/// Perform the Hyper upgrade on this record.
- pub fn upgrade(&self) -> Result<OnUpgrade, AnyError> {
+ pub fn upgrade(&self) -> Result<OnUpgrade, UpgradeUnavailableError> {
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
self
.self_mut()
.request_parts
.extensions
.remove::<OnUpgrade>()
- .ok_or_else(|| AnyError::msg("upgrade unavailable"))
+ .ok_or(UpgradeUnavailableError)
}
/// Take the Hyper body from this record.
@@ -387,6 +397,9 @@ impl HttpRecord {
inner.been_dropped = true;
// The request body might include actual resources.
inner.request_body.take();
+ if let Some(closed_channel) = inner.closed_channel.take() {
+ let _ = closed_channel.send(());
+ }
}
/// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well).
@@ -515,7 +528,7 @@ pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>);
impl Body for HttpRecordResponse {
type Data = BufView;
- type Error = AnyError;
+ type Error = deno_core::error::AnyError;
fn poll_frame(
self: Pin<&mut Self>,
@@ -640,7 +653,7 @@ mod tests {
}
#[tokio::test]
- async fn test_handle_request() -> Result<(), AnyError> {
+ async fn test_handle_request() -> Result<(), deno_core::error::AnyError> {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let server_state = HttpServerState::new();
let server_state_check = server_state.clone();
diff --git a/ext/http/websocket_upgrade.rs b/ext/http/websocket_upgrade.rs
index 4dead767a..af9504717 100644
--- a/ext/http/websocket_upgrade.rs
+++ b/ext/http/websocket_upgrade.rs
@@ -4,7 +4,6 @@ use std::marker::PhantomData;
use bytes::Bytes;
use bytes::BytesMut;
-use deno_core::error::AnyError;
use httparse::Status;
use hyper::header::HeaderName;
use hyper::header::HeaderValue;
@@ -13,12 +12,30 @@ use memmem::Searcher;
use memmem::TwoWaySearcher;
use once_cell::sync::OnceCell;
-use crate::http_error;
+#[derive(Debug, thiserror::Error)]
+pub enum WebSocketUpgradeError {
+ #[error("invalid headers")]
+ InvalidHeaders,
+ #[error("{0}")]
+ HttpParse(#[from] httparse::Error),
+ #[error("{0}")]
+ Http(#[from] http::Error),
+ #[error("{0}")]
+ Utf8(#[from] std::str::Utf8Error),
+ #[error("{0}")]
+ InvalidHeaderName(#[from] http::header::InvalidHeaderName),
+ #[error("{0}")]
+ InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
+ #[error("invalid HTTP status line")]
+ InvalidHttpStatusLine,
+ #[error("attempted to write to completed upgrade buffer")]
+ UpgradeBufferAlreadyCompleted,
+}
/// Given a buffer that ends in `\n\n` or `\r\n\r\n`, returns a parsed [`Request<Body>`].
fn parse_response<T: Default>(
header_bytes: &[u8],
-) -> Result<(usize, Response<T>), AnyError> {
+) -> Result<(usize, Response<T>), WebSocketUpgradeError> {
let mut headers = [httparse::EMPTY_HEADER; 16];
let status = httparse::parse_headers(header_bytes, &mut headers)?;
match status {
@@ -32,7 +49,7 @@ fn parse_response<T: Default>(
}
Ok((index, resp))
}
- _ => Err(http_error("invalid headers")),
+ _ => Err(WebSocketUpgradeError::InvalidHeaders),
}
}
@@ -69,11 +86,14 @@ pub struct WebSocketUpgrade<T: Default> {
impl<T: Default> WebSocketUpgrade<T> {
/// Ensures that the status line starts with "HTTP/1.1 101 " which matches all of the node.js
/// WebSocket libraries that are known. We don't care about the trailing status text.
- fn validate_status(&self, status: &[u8]) -> Result<(), AnyError> {
+ fn validate_status(
+ &self,
+ status: &[u8],
+ ) -> Result<(), WebSocketUpgradeError> {
if status.starts_with(b"HTTP/1.1 101 ") {
Ok(())
} else {
- Err(http_error("invalid HTTP status line"))
+ Err(WebSocketUpgradeError::InvalidHttpStatusLine)
}
}
@@ -82,7 +102,7 @@ impl<T: Default> WebSocketUpgrade<T> {
pub fn write(
&mut self,
bytes: &[u8],
- ) -> Result<Option<(Response<T>, Bytes)>, AnyError> {
+ ) -> Result<Option<(Response<T>, Bytes)>, WebSocketUpgradeError> {
use WebSocketUpgradeState::*;
match self.state {
@@ -142,9 +162,7 @@ impl<T: Default> WebSocketUpgrade<T> {
Ok(None)
}
}
- Complete => {
- Err(http_error("attempted to write to completed upgrade buffer"))
- }
+ Complete => Err(WebSocketUpgradeError::UpgradeBufferAlreadyCompleted),
}
}
}
@@ -157,8 +175,8 @@ mod tests {
type ExpectedResponseAndHead = Option<(Response<Body>, &'static [u8])>;
fn assert_response(
- result: Result<Option<(Response<Body>, Bytes)>, AnyError>,
- expected: Result<ExpectedResponseAndHead, &'static str>,
+ result: Result<Option<(Response<Body>, Bytes)>, WebSocketUpgradeError>,
+ expected: Result<ExpectedResponseAndHead, WebSocketUpgradeError>,
chunk_info: Option<(usize, usize)>,
) {
let formatted = format!("{result:?}");
@@ -189,8 +207,8 @@ mod tests {
"Expected Ok(None), was {formatted}",
),
Err(e) => assert_eq!(
- e,
- result.err().map(|e| format!("{e:?}")).unwrap_or_default(),
+ format!("{e:?}"),
+ format!("{:?}", result.unwrap_err()),
"Expected error, was {formatted}",
),
}
@@ -198,7 +216,7 @@ mod tests {
fn validate_upgrade_all_at_once(
s: &str,
- expected: Result<ExpectedResponseAndHead, &'static str>,
+ expected: Result<ExpectedResponseAndHead, WebSocketUpgradeError>,
) {
let mut upgrade = WebSocketUpgrade::default();
let res = upgrade.write(s.as_bytes());
@@ -209,7 +227,7 @@ mod tests {
fn validate_upgrade_chunks(
s: &str,
size: usize,
- expected: Result<ExpectedResponseAndHead, &'static str>,
+ expected: Result<ExpectedResponseAndHead, WebSocketUpgradeError>,
) {
let chunk_info = Some((s.as_bytes().len(), size));
let mut upgrade = WebSocketUpgrade::default();
@@ -226,7 +244,7 @@ mod tests {
fn validate_upgrade(
s: &str,
- expected: fn() -> Result<ExpectedResponseAndHead, &'static str>,
+ expected: fn() -> Result<ExpectedResponseAndHead, WebSocketUpgradeError>,
) {
validate_upgrade_all_at_once(s, expected());
validate_upgrade_chunks(s, 1, expected());
@@ -315,7 +333,7 @@ mod tests {
#[test]
fn upgrade_invalid_status() {
validate_upgrade("HTTP/1.1 200 OK\nConnection: Upgrade\n\n", || {
- Err("invalid HTTP status line")
+ Err(WebSocketUpgradeError::InvalidHttpStatusLine)
});
}
@@ -327,7 +345,11 @@ mod tests {
.join("\n");
validate_upgrade(
&format!("HTTP/1.1 101 Switching Protocols\n{headers}\n\n"),
- || Err("too many headers"),
+ || {
+ Err(WebSocketUpgradeError::HttpParse(
+ httparse::Error::TooManyHeaders,
+ ))
+ },
);
}
}