summaryrefslogtreecommitdiff
path: root/ext/http/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r--ext/http/lib.rs184
1 files changed, 132 insertions, 52 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 934f8a002..39b0bbc2a 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -6,8 +6,6 @@ use async_compression::Level;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use cache_control::CacheControl;
-use deno_core::error::custom_error;
-use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::channel::oneshot;
use deno_core::futures::future::pending;
@@ -41,6 +39,8 @@ use deno_net::raw::NetworkStream;
use deno_websocket::ws_create_server_stream;
use flate2::write::GzEncoder;
use flate2::Compression;
+use hyper::server::conn::http1;
+use hyper::server::conn::http2;
use hyper_util::rt::TokioIo;
use hyper_v014::body::Bytes;
use hyper_v014::body::HttpBody;
@@ -89,11 +89,33 @@ mod service;
mod websocket_upgrade;
use fly_accept_encoding::Encoding;
+pub use http_next::HttpNextError;
pub use request_properties::DefaultHttpPropertyExtractor;
pub use request_properties::HttpConnectionProperties;
pub use request_properties::HttpListenProperties;
pub use request_properties::HttpPropertyExtractor;
pub use request_properties::HttpRequestProperties;
+pub use service::UpgradeUnavailableError;
+pub use websocket_upgrade::WebSocketUpgradeError;
+
+#[derive(Debug, Default, Clone, Copy)]
+pub struct Options {
+ /// By passing a hook function, the caller can customize various configuration
+ /// options for the HTTP/2 server.
+ /// See [`http2::Builder`] for what parameters can be customized.
+ ///
+ /// If `None`, the default configuration provided by hyper will be used. Note
+ /// that the default configuration is subject to change in future versions.
+ pub http2_builder_hook:
+ Option<fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>>,
+ /// By passing a hook function, the caller can customize various configuration
+ /// options for the HTTP/1 server.
+ /// See [`http1::Builder`] for what parameters can be customized.
+ ///
+ /// If `None`, the default configuration provided by hyper will be used. Note
+ /// that the default configuration is subject to change in future versions.
+ pub http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>,
+}
deno_core::extension!(
deno_http,
@@ -111,7 +133,9 @@ deno_core::extension!(
http_next::op_http_close_after_finish,
http_next::op_http_get_request_header,
http_next::op_http_get_request_headers,
+ http_next::op_http_request_on_cancel,
http_next::op_http_get_request_method_and_url<HTTP>,
+ http_next::op_http_get_request_cancelled,
http_next::op_http_read_request_body,
http_next::op_http_serve_on<HTTP>,
http_next::op_http_serve<HTTP>,
@@ -132,8 +156,46 @@ deno_core::extension!(
http_next::op_http_cancel,
],
esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
+ options = {
+ options: Options,
+ },
+ state = |state, options| {
+ state.put::<Options>(options.options);
+ }
);
+#[derive(Debug, thiserror::Error)]
+pub enum HttpError {
+ #[error(transparent)]
+ Resource(deno_core::error::AnyError),
+ #[error(transparent)]
+ Canceled(#[from] deno_core::Canceled),
+ #[error("{0}")]
+ HyperV014(#[source] Arc<hyper_v014::Error>),
+ #[error("{0}")]
+ InvalidHeaderName(#[from] hyper_v014::header::InvalidHeaderName),
+ #[error("{0}")]
+ InvalidHeaderValue(#[from] hyper_v014::header::InvalidHeaderValue),
+ #[error("{0}")]
+ Http(#[from] hyper_v014::http::Error),
+ #[error("response headers already sent")]
+ ResponseHeadersAlreadySent,
+ #[error("connection closed while sending response")]
+ ConnectionClosedWhileSendingResponse,
+ #[error("already in use")]
+ AlreadyInUse,
+ #[error("{0}")]
+ Io(#[from] std::io::Error),
+ #[error("no response headers")]
+ NoResponseHeaders,
+ #[error("response already completed")]
+ ResponseAlreadyCompleted,
+ #[error("cannot upgrade because request body was used")]
+ UpgradeBodyUsed,
+ #[error(transparent)]
+ Other(deno_core::error::AnyError),
+}
+
pub enum HttpSocketAddr {
IpSocket(std::net::SocketAddr),
#[cfg(unix)]
@@ -216,7 +278,7 @@ impl HttpConnResource {
String,
String,
)>,
- AnyError,
+ HttpError,
> {
let fut = async {
let (request_tx, request_rx) = oneshot::channel();
@@ -259,8 +321,8 @@ impl HttpConnResource {
}
/// A future that completes when this HTTP connection is closed or errors.
- async fn closed(&self) -> Result<(), AnyError> {
- self.closed_fut.clone().map_err(AnyError::from).await
+ async fn closed(&self) -> Result<(), HttpError> {
+ self.closed_fut.clone().map_err(HttpError::HyperV014).await
}
}
@@ -280,14 +342,13 @@ pub fn http_create_conn_resource<S, A>(
io: S,
addr: A,
scheme: &'static str,
-) -> Result<ResourceId, AnyError>
+) -> ResourceId
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
A: Into<HttpSocketAddr>,
{
let conn = HttpConnResource::new(io, scheme, addr.into());
- let rid = state.resource_table.add(conn);
- Ok(rid)
+ state.resource_table.add(conn)
}
/// An object that implements the `hyper::Service` trait, through which Hyper
@@ -423,7 +484,9 @@ impl Resource for HttpStreamReadResource {
// safely call `await` on it without creating a race condition.
Some(_) => match body.as_mut().next().await.unwrap() {
Ok(chunk) => assert!(chunk.is_empty()),
- Err(err) => break Err(AnyError::from(err)),
+ Err(err) => {
+ break Err(HttpError::HyperV014(Arc::new(err)).into())
+ }
},
None => break Ok(BufView::empty()),
}
@@ -545,8 +608,12 @@ struct NextRequestResponse(
async fn op_http_accept(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<Option<NextRequestResponse>, AnyError> {
- let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?;
+) -> Result<Option<NextRequestResponse>, HttpError> {
+ let conn = state
+ .borrow()
+ .resource_table
+ .get::<HttpConnResource>(rid)
+ .map_err(HttpError::Resource)?;
match conn.accept().await {
Ok(Some((read_stream, write_stream, method, url))) => {
@@ -657,11 +724,12 @@ async fn op_http_write_headers(
#[smi] status: u16,
#[serde] headers: Vec<(ByteString, ByteString)>,
#[serde] data: Option<StringOrBuffer>,
-) -> Result<(), AnyError> {
+) -> Result<(), HttpError> {
let stream = state
.borrow_mut()
.resource_table
- .get::<HttpStreamWriteResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)
+ .map_err(HttpError::Resource)?;
// Track supported encoding
let encoding = stream.accept_encoding;
@@ -708,14 +776,14 @@ async fn op_http_write_headers(
let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let response_tx = match replace(&mut *old_wr, new_wr) {
HttpResponseWriter::Headers(response_tx) => response_tx,
- _ => return Err(http_error("response headers already sent")),
+ _ => return Err(HttpError::ResponseHeadersAlreadySent),
};
match response_tx.send(body) {
Ok(_) => Ok(()),
Err(_) => {
stream.conn.closed().await?;
- Err(http_error("connection closed while sending response"))
+ Err(HttpError::ConnectionClosedWhileSendingResponse)
}
}
}
@@ -725,11 +793,14 @@ async fn op_http_write_headers(
fn op_http_headers(
state: &mut OpState,
#[smi] rid: u32,
-) -> Result<Vec<(ByteString, ByteString)>, AnyError> {
- let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?;
+) -> Result<Vec<(ByteString, ByteString)>, HttpError> {
+ let stream = state
+ .resource_table
+ .get::<HttpStreamReadResource>(rid)
+ .map_err(HttpError::Resource)?;
let rd = RcRef::map(&stream, |r| &r.rd)
.try_borrow()
- .ok_or_else(|| http_error("already in use"))?;
+ .ok_or(HttpError::AlreadyInUse)?;
match &*rd {
HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())),
HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)),
@@ -741,7 +812,7 @@ fn http_response(
data: Option<StringOrBuffer>,
compressing: bool,
encoding: Encoding,
-) -> Result<(HttpResponseWriter, hyper_v014::Body), AnyError> {
+) -> Result<(HttpResponseWriter, hyper_v014::Body), HttpError> {
// Gzip, after level 1, doesn't produce significant size difference.
// This default matches nginx default gzip compression level (1):
// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
@@ -878,25 +949,34 @@ async fn op_http_write_resource(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[smi] stream: ResourceId,
-) -> Result<(), AnyError> {
+) -> Result<(), HttpError> {
let http_stream = state
.borrow()
.resource_table
- .get::<HttpStreamWriteResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)
+ .map_err(HttpError::Resource)?;
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
- let resource = state.borrow().resource_table.get_any(stream)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(stream)
+ .map_err(HttpError::Resource)?;
loop {
match *wr {
HttpResponseWriter::Headers(_) => {
- return Err(http_error("no response headers"))
+ return Err(HttpError::NoResponseHeaders)
}
HttpResponseWriter::Closed => {
- return Err(http_error("response already completed"))
+ return Err(HttpError::ResponseAlreadyCompleted)
}
_ => {}
};
- let view = resource.clone().read(64 * 1024).await?; // 64KB
+ let view = resource
+ .clone()
+ .read(64 * 1024)
+ .await
+ .map_err(HttpError::Other)?; // 64KB
if view.is_empty() {
break;
}
@@ -937,16 +1017,17 @@ async fn op_http_write(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[buffer] buf: JsBuffer,
-) -> Result<(), AnyError> {
+) -> Result<(), HttpError> {
let stream = state
.borrow()
.resource_table
- .get::<HttpStreamWriteResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)
+ .map_err(HttpError::Resource)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
match &mut *wr {
- HttpResponseWriter::Headers(_) => Err(http_error("no response headers")),
- HttpResponseWriter::Closed => Err(http_error("response already completed")),
+ HttpResponseWriter::Headers(_) => Err(HttpError::NoResponseHeaders),
+ HttpResponseWriter::Closed => Err(HttpError::ResponseAlreadyCompleted),
HttpResponseWriter::Body { writer, .. } => {
let mut result = writer.write_all(&buf).await;
if result.is_ok() {
@@ -961,7 +1042,7 @@ async fn op_http_write(
stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
- Err(http_error("response already completed"))
+ Err(HttpError::ResponseAlreadyCompleted)
}
}
}
@@ -975,7 +1056,7 @@ async fn op_http_write(
stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
- Err(http_error("response already completed"))
+ Err(HttpError::ResponseAlreadyCompleted)
}
}
}
@@ -989,11 +1070,12 @@ async fn op_http_write(
async fn op_http_shutdown(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<(), AnyError> {
+) -> Result<(), HttpError> {
let stream = state
.borrow()
.resource_table
- .get::<HttpStreamWriteResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)
+ .map_err(HttpError::Resource)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let wr = take(&mut *wr);
match wr {
@@ -1022,14 +1104,12 @@ async fn op_http_shutdown(
#[op2]
#[string]
-fn op_http_websocket_accept_header(
- #[string] key: String,
-) -> Result<String, AnyError> {
+fn op_http_websocket_accept_header(#[string] key: String) -> String {
let digest = ring::digest::digest(
&ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(),
);
- Ok(BASE64_STANDARD.encode(digest))
+ BASE64_STANDARD.encode(digest)
}
#[op2(async)]
@@ -1037,30 +1117,34 @@ fn op_http_websocket_accept_header(
async fn op_http_upgrade_websocket(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<ResourceId, AnyError> {
+) -> Result<ResourceId, HttpError> {
let stream = state
.borrow_mut()
.resource_table
- .get::<HttpStreamReadResource>(rid)?;
+ .get::<HttpStreamReadResource>(rid)
+ .map_err(HttpError::Resource)?;
let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
let request = match &mut *rd {
HttpRequestReader::Headers(request) => request,
- _ => {
- return Err(http_error("cannot upgrade because request body was used"))
- }
+ _ => return Err(HttpError::UpgradeBodyUsed),
};
- let (transport, bytes) =
- extract_network_stream(hyper_v014::upgrade::on(request).await?);
- let ws_rid =
- ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?;
- Ok(ws_rid)
+ let (transport, bytes) = extract_network_stream(
+ hyper_v014::upgrade::on(request)
+ .await
+ .map_err(|err| HttpError::HyperV014(Arc::new(err)))?,
+ );
+ Ok(ws_create_server_stream(
+ &mut state.borrow_mut(),
+ transport,
+ bytes,
+ ))
}
// Needed so hyper can use non Send futures
#[derive(Clone)]
-struct LocalExecutor;
+pub struct LocalExecutor;
impl<Fut> hyper_v014::rt::Executor<Fut> for LocalExecutor
where
@@ -1082,10 +1166,6 @@ where
}
}
-fn http_error(message: &'static str) -> AnyError {
- custom_error("Http", message)
-}
-
/// Filters out the ever-surprising 'shutdown ENOTCONN' errors.
fn filter_enotconn(
result: Result<(), hyper_v014::Error>,