summaryrefslogtreecommitdiff
path: root/ext/http/http_next.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r--ext/http/http_next.rs261
1 files changed, 186 insertions, 75 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index efe1b88c9..7dbac6021 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -18,8 +18,8 @@ use crate::service::HttpServerState;
use crate::service::SignallingRc;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
+use crate::Options;
use cache_control::CacheControl;
-use deno_core::error::AnyError;
use deno_core::external;
use deno_core::futures::future::poll_fn;
use deno_core::futures::TryFutureExt;
@@ -146,12 +146,32 @@ macro_rules! clone_external {
}};
}
+#[derive(Debug, thiserror::Error)]
+pub enum HttpNextError {
+ #[error(transparent)]
+ Resource(deno_core::error::AnyError),
+ #[error("{0}")]
+ Io(#[from] io::Error),
+ #[error(transparent)]
+ WebSocketUpgrade(crate::websocket_upgrade::WebSocketUpgradeError),
+ #[error("{0}")]
+ Hyper(#[from] hyper::Error),
+ #[error(transparent)]
+ JoinError(#[from] tokio::task::JoinError),
+ #[error(transparent)]
+ Canceled(#[from] deno_core::Canceled),
+ #[error(transparent)]
+ HttpPropertyExtractor(deno_core::error::AnyError),
+ #[error(transparent)]
+ UpgradeUnavailable(#[from] crate::service::UpgradeUnavailableError),
+}
+
#[op2(fast)]
#[smi]
pub fn op_http_upgrade_raw(
state: &mut OpState,
external: *const c_void,
-) -> Result<ResourceId, AnyError> {
+) -> Result<ResourceId, HttpNextError> {
// SAFETY: external is deleted before calling this op.
let http = unsafe { take_external!(external, "op_http_upgrade_raw") };
@@ -177,7 +197,7 @@ pub fn op_http_upgrade_raw(
upgraded.write_all(&bytes).await?;
break upgraded;
}
- Err(err) => return Err(err),
+ Err(err) => return Err(HttpNextError::WebSocketUpgrade(err)),
}
};
@@ -193,7 +213,7 @@ pub fn op_http_upgrade_raw(
}
read_tx.write_all(&buf[..read]).await?;
}
- Ok::<_, AnyError>(())
+ Ok::<_, HttpNextError>(())
});
spawn(async move {
let mut buf = [0; 1024];
@@ -204,7 +224,7 @@ pub fn op_http_upgrade_raw(
}
upgraded_tx.write_all(&buf[..read]).await?;
}
- Ok::<_, AnyError>(())
+ Ok::<_, HttpNextError>(())
});
Ok(())
@@ -223,7 +243,7 @@ pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>,
external: *const c_void,
#[serde] headers: Vec<(ByteString, ByteString)>,
-) -> Result<ResourceId, AnyError> {
+) -> Result<ResourceId, HttpNextError> {
let http =
// SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_upgrade_websocket_next") };
@@ -246,7 +266,11 @@ pub async fn op_http_upgrade_websocket_next(
// Stage 3: take the extracted raw network stream and upgrade it to a websocket, then return it
let (stream, bytes) = extract_network_stream(upgraded);
- ws_create_server_stream(&mut state.borrow_mut(), stream, bytes)
+ Ok(ws_create_server_stream(
+ &mut state.borrow_mut(),
+ stream,
+ bytes,
+ ))
}
#[op2(fast)]
@@ -296,7 +320,7 @@ where
let authority: v8::Local<v8::Value> = match request_properties.authority {
Some(authority) => v8::String::new_from_utf8(
scope,
- authority.as_ref(),
+ authority.as_bytes(),
v8::NewStringType::Normal,
)
.unwrap()
@@ -305,15 +329,25 @@ where
};
// Only extract the path part - we handle authority elsewhere
- let path = match &request_parts.uri.path_and_query() {
- Some(path_and_query) => path_and_query.to_string(),
- None => "".to_owned(),
+ let path = match request_parts.uri.path_and_query() {
+ Some(path_and_query) => {
+ let path = path_and_query.as_str();
+ if matches!(path.as_bytes().first(), Some(b'/' | b'*')) {
+ Cow::Borrowed(path)
+ } else {
+ Cow::Owned(format!("/{}", path))
+ }
+ }
+ None => Cow::Borrowed(""),
};
- let path: v8::Local<v8::Value> =
- v8::String::new_from_utf8(scope, path.as_ref(), v8::NewStringType::Normal)
- .unwrap()
- .into();
+ let path: v8::Local<v8::Value> = v8::String::new_from_utf8(
+ scope,
+ path.as_bytes(),
+ v8::NewStringType::Normal,
+ )
+ .unwrap()
+ .into();
let peer_address: v8::Local<v8::Value> = v8::String::new_from_utf8(
scope,
@@ -531,6 +565,7 @@ fn is_request_compressible(
match accept_encoding.to_str() {
// Firefox and Chrome send this -- no need to parse
Ok("gzip, deflate, br") => return Compression::Brotli,
+ Ok("gzip, deflate, br, zstd") => return Compression::Brotli,
Ok("gzip") => return Compression::GZip,
Ok("br") => return Compression::Brotli,
_ => (),
@@ -667,6 +702,27 @@ fn set_response(
http.complete();
}
+#[op2(fast)]
+pub fn op_http_get_request_cancelled(external: *const c_void) -> bool {
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_cancelled") };
+ http.cancelled()
+}
+
+#[op2(async)]
+pub async fn op_http_request_on_cancel(external: *const c_void) {
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_request_on_cancel") };
+ let (tx, rx) = tokio::sync::oneshot::channel();
+
+ http.on_cancel(tx);
+ drop(http);
+
+ rx.await.ok();
+}
+
/// Returned promise resolves when body streaming finishes.
/// Call [`op_http_close_after_finish`] when done with the external.
#[op2(async)]
@@ -676,7 +732,7 @@ pub async fn op_http_set_response_body_resource(
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
-) -> Result<bool, AnyError> {
+) -> Result<bool, HttpNextError> {
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_body_resource") };
@@ -691,9 +747,15 @@ pub async fn op_http_set_response_body_resource(
let resource = {
let mut state = state.borrow_mut();
if auto_close {
- state.resource_table.take_any(stream_rid)?
+ state
+ .resource_table
+ .take_any(stream_rid)
+ .map_err(HttpNextError::Resource)?
} else {
- state.resource_table.get_any(stream_rid)?
+ state
+ .resource_table
+ .get_any(stream_rid)
+ .map_err(HttpNextError::Resource)?
}
};
@@ -760,10 +822,16 @@ fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
+ http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>,
) -> impl Future<Output = Result<(), hyper::Error>> + 'static {
- let conn = http1::Builder::new()
- .keep_alive(true)
- .writev(*USE_WRITEV)
+ let mut builder = http1::Builder::new();
+ builder.keep_alive(true).writev(*USE_WRITEV);
+
+ if let Some(http1_builder_hook) = http1_builder_hook {
+ builder = http1_builder_hook(builder);
+ }
+
+ let conn = builder
.serve_connection(TokioIo::new(io), svc)
.with_upgrades();
@@ -782,9 +850,17 @@ fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
+ http2_builder_hook: Option<
+ fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>,
+ >,
) -> impl Future<Output = Result<(), hyper::Error>> + 'static {
- let conn =
- http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
+ let mut builder = http2::Builder::new(LocalExecutor);
+
+ if let Some(http2_builder_hook) = http2_builder_hook {
+ builder = http2_builder_hook(builder);
+ }
+
+ let conn = builder.serve_connection(TokioIo::new(io), svc);
async {
match conn.or_abort(cancel).await {
Err(mut conn) => {
@@ -800,17 +876,18 @@ async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
-) -> Result<(), AnyError> {
+ options: Options,
+) -> Result<(), HttpNextError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
- serve_http2_unconditional(io, svc, cancel)
+ serve_http2_unconditional(io, svc, cancel, options.http2_builder_hook)
.await
- .map_err(|e| e.into())
+ .map_err(HttpNextError::Hyper)
} else {
- serve_http11_unconditional(io, svc, cancel)
+ serve_http11_unconditional(io, svc, cancel, options.http1_builder_hook)
.await
- .map_err(|e| e.into())
+ .map_err(HttpNextError::Hyper)
}
}
@@ -819,7 +896,8 @@ fn serve_https(
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
-) -> JoinHandle<Result<(), AnyError>> {
+ options: Options,
+) -> JoinHandle<Result<(), HttpNextError>> {
let HttpLifetime {
server_state,
connection_cancel_handle,
@@ -830,21 +908,31 @@ fn serve_https(
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
});
spawn(
- async {
+ async move {
let handshake = io.handshake().await?;
// If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
// based on the prefix bytes
let handshake = handshake.alpn;
if Some(TLS_ALPN_HTTP_2) == handshake.as_deref() {
- serve_http2_unconditional(io, svc, listen_cancel_handle)
- .await
- .map_err(|e| e.into())
+ serve_http2_unconditional(
+ io,
+ svc,
+ listen_cancel_handle,
+ options.http2_builder_hook,
+ )
+ .await
+ .map_err(HttpNextError::Hyper)
} else if Some(TLS_ALPN_HTTP_11) == handshake.as_deref() {
- serve_http11_unconditional(io, svc, listen_cancel_handle)
- .await
- .map_err(|e| e.into())
+ serve_http11_unconditional(
+ io,
+ svc,
+ listen_cancel_handle,
+ options.http1_builder_hook,
+ )
+ .await
+ .map_err(HttpNextError::Hyper)
} else {
- serve_http2_autodetect(io, svc, listen_cancel_handle).await
+ serve_http2_autodetect(io, svc, listen_cancel_handle, options).await
}
}
.try_or_cancel(connection_cancel_handle),
@@ -856,7 +944,8 @@ fn serve_http(
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
-) -> JoinHandle<Result<(), AnyError>> {
+ options: Options,
+) -> JoinHandle<Result<(), HttpNextError>> {
let HttpLifetime {
server_state,
connection_cancel_handle,
@@ -867,7 +956,7 @@ fn serve_http(
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
});
spawn(
- serve_http2_autodetect(io, svc, listen_cancel_handle)
+ serve_http2_autodetect(io, svc, listen_cancel_handle, options)
.try_or_cancel(connection_cancel_handle),
)
}
@@ -877,7 +966,8 @@ fn serve_http_on<HTTP>(
listen_properties: &HttpListenProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
-) -> JoinHandle<Result<(), AnyError>>
+ options: Options,
+) -> JoinHandle<Result<(), HttpNextError>>
where
HTTP: HttpPropertyExtractor,
{
@@ -888,14 +978,14 @@ where
match network_stream {
NetworkStream::Tcp(conn) => {
- serve_http(conn, connection_properties, lifetime, tx)
+ serve_http(conn, connection_properties, lifetime, tx, options)
}
NetworkStream::Tls(conn) => {
- serve_https(conn, connection_properties, lifetime, tx)
+ serve_https(conn, connection_properties, lifetime, tx, options)
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
- serve_http(conn, connection_properties, lifetime, tx)
+ serve_http(conn, connection_properties, lifetime, tx, options)
}
}
}
@@ -908,7 +998,7 @@ struct HttpLifetime {
}
struct HttpJoinHandle {
- join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
+ join_handle: AsyncRefCell<Option<JoinHandle<Result<(), HttpNextError>>>>,
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
@@ -968,12 +1058,13 @@ impl Drop for HttpJoinHandle {
pub fn op_http_serve<HTTP>(
state: Rc<RefCell<OpState>>,
#[smi] listener_rid: ResourceId,
-) -> Result<(ResourceId, &'static str, String), AnyError>
+) -> Result<(ResourceId, &'static str, String), HttpNextError>
where
HTTP: HttpPropertyExtractor,
{
let listener =
- HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?;
+ HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)
+ .map_err(HttpNextError::Resource)?;
let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
@@ -983,21 +1074,28 @@ where
let lifetime = resource.lifetime();
+ let options = {
+ let state = state.borrow();
+ *state.borrow::<Options>()
+ };
+
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn(async move {
loop {
let conn = HTTP::accept_connection_from_listener(&listener)
.try_or_cancel(listen_cancel_clone.clone())
- .await?;
+ .await
+ .map_err(HttpNextError::HttpPropertyExtractor)?;
serve_http_on::<HTTP>(
conn,
&listen_properties_clone,
lifetime.clone(),
tx.clone(),
+ options,
);
}
#[allow(unreachable_code)]
- Ok::<_, AnyError>(())
+ Ok::<_, HttpNextError>(())
});
// Set the handle after we start the future
@@ -1017,25 +1115,31 @@ where
pub fn op_http_serve_on<HTTP>(
state: Rc<RefCell<OpState>>,
#[smi] connection_rid: ResourceId,
-) -> Result<(ResourceId, &'static str, String), AnyError>
+) -> Result<(ResourceId, &'static str, String), HttpNextError>
where
HTTP: HttpPropertyExtractor,
{
let connection =
- HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?;
+ HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)
+ .map_err(HttpNextError::Resource)?;
let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
- let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
- serve_http_on::<HTTP>(
- connection,
- &listen_properties,
- resource.lifetime(),
- tx,
- );
+ let options = {
+ let state = state.borrow();
+ *state.borrow::<Options>()
+ };
+
+ let handle = serve_http_on::<HTTP>(
+ connection,
+ &listen_properties,
+ resource.lifetime(),
+ tx,
+ options,
+ );
// Set the handle after we start the future
*RcRef::map(&resource, |this| &this.join_handle)
@@ -1081,12 +1185,13 @@ pub fn op_http_try_wait(
pub async fn op_http_wait(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<*const c_void, AnyError> {
+) -> Result<*const c_void, HttpNextError> {
// We will get the join handle initially, as we might be consuming requests still
let join_handle = state
.borrow_mut()
.resource_table
- .get::<HttpJoinHandle>(rid)?;
+ .get::<HttpJoinHandle>(rid)
+ .map_err(HttpNextError::Resource)?;
let cancel = join_handle.listen_cancel_handle();
let next = async {
@@ -1113,13 +1218,12 @@ pub async fn op_http_wait(
// Filter out shutdown (ENOTCONN) errors
if let Err(err) = res {
- if let Some(err) = err.source() {
- if let Some(err) = err.downcast_ref::<io::Error>() {
- if err.kind() == io::ErrorKind::NotConnected {
- return Ok(null());
- }
+ if let HttpNextError::Io(err) = &err {
+ if err.kind() == io::ErrorKind::NotConnected {
+ return Ok(null());
}
}
+
return Err(err);
}
@@ -1132,7 +1236,7 @@ pub fn op_http_cancel(
state: &mut OpState,
#[smi] rid: ResourceId,
graceful: bool,
-) -> Result<(), AnyError> {
+) -> Result<(), deno_core::error::AnyError> {
let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;
if graceful {
@@ -1152,11 +1256,12 @@ pub async fn op_http_close(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
graceful: bool,
-) -> Result<(), AnyError> {
+) -> Result<(), HttpNextError> {
let join_handle = state
.borrow_mut()
.resource_table
- .take::<HttpJoinHandle>(rid)?;
+ .take::<HttpJoinHandle>(rid)
+ .map_err(HttpNextError::Resource)?;
if graceful {
http_general_trace!("graceful shutdown");
@@ -1202,23 +1307,26 @@ impl UpgradeStream {
}
}
- async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
+ async fn read(
+ self: Rc<Self>,
+ buf: &mut [u8],
+ ) -> Result<usize, std::io::Error> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let read = RcRef::map(self, |this| &this.read);
let mut read = read.borrow_mut().await;
- Ok(Pin::new(&mut *read).read(buf).await?)
+ Pin::new(&mut *read).read(buf).await
}
.try_or_cancel(cancel_handle)
.await
}
- async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
+ async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, std::io::Error> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let write = RcRef::map(self, |this| &this.write);
let mut write = write.borrow_mut().await;
- Ok(Pin::new(&mut *write).write(buf).await?)
+ Pin::new(&mut *write).write(buf).await
}
.try_or_cancel(cancel_handle)
.await
@@ -1228,7 +1336,7 @@ impl UpgradeStream {
self: Rc<Self>,
buf1: &[u8],
buf2: &[u8],
- ) -> Result<usize, AnyError> {
+ ) -> Result<usize, std::io::Error> {
let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;
let total = buf1.len() + buf2.len();
@@ -1281,9 +1389,12 @@ pub async fn op_raw_write_vectored(
#[smi] rid: ResourceId,
#[buffer] buf1: JsBuffer,
#[buffer] buf2: JsBuffer,
-) -> Result<usize, AnyError> {
- let resource: Rc<UpgradeStream> =
- state.borrow().resource_table.get::<UpgradeStream>(rid)?;
+) -> Result<usize, HttpNextError> {
+ let resource: Rc<UpgradeStream> = state
+ .borrow()
+ .resource_table
+ .get::<UpgradeStream>(rid)
+ .map_err(HttpNextError::Resource)?;
let nwritten = resource.write_vectored(&buf1, &buf2).await?;
Ok(nwritten)
}