diff options
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 184 |
1 files changed, 132 insertions, 52 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 934f8a002..39b0bbc2a 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -6,8 +6,6 @@ use async_compression::Level; use base64::prelude::BASE64_STANDARD; use base64::Engine; use cache_control::CacheControl; -use deno_core::error::custom_error; -use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; use deno_core::futures::channel::oneshot; use deno_core::futures::future::pending; @@ -41,6 +39,8 @@ use deno_net::raw::NetworkStream; use deno_websocket::ws_create_server_stream; use flate2::write::GzEncoder; use flate2::Compression; +use hyper::server::conn::http1; +use hyper::server::conn::http2; use hyper_util::rt::TokioIo; use hyper_v014::body::Bytes; use hyper_v014::body::HttpBody; @@ -89,11 +89,33 @@ mod service; mod websocket_upgrade; use fly_accept_encoding::Encoding; +pub use http_next::HttpNextError; pub use request_properties::DefaultHttpPropertyExtractor; pub use request_properties::HttpConnectionProperties; pub use request_properties::HttpListenProperties; pub use request_properties::HttpPropertyExtractor; pub use request_properties::HttpRequestProperties; +pub use service::UpgradeUnavailableError; +pub use websocket_upgrade::WebSocketUpgradeError; + +#[derive(Debug, Default, Clone, Copy)] +pub struct Options { + /// By passing a hook function, the caller can customize various configuration + /// options for the HTTP/2 server. + /// See [`http2::Builder`] for what parameters can be customized. + /// + /// If `None`, the default configuration provided by hyper will be used. Note + /// that the default configuration is subject to change in future versions. + pub http2_builder_hook: + Option<fn(http2::Builder<LocalExecutor>) -> http2::Builder<LocalExecutor>>, + /// By passing a hook function, the caller can customize various configuration + /// options for the HTTP/1 server. + /// See [`http1::Builder`] for what parameters can be customized. + /// + /// If `None`, the default configuration provided by hyper will be used. Note + /// that the default configuration is subject to change in future versions. + pub http1_builder_hook: Option<fn(http1::Builder) -> http1::Builder>, +} deno_core::extension!( deno_http, @@ -111,7 +133,9 @@ deno_core::extension!( http_next::op_http_close_after_finish, http_next::op_http_get_request_header, http_next::op_http_get_request_headers, + http_next::op_http_request_on_cancel, http_next::op_http_get_request_method_and_url<HTTP>, + http_next::op_http_get_request_cancelled, http_next::op_http_read_request_body, http_next::op_http_serve_on<HTTP>, http_next::op_http_serve<HTTP>, @@ -132,8 +156,46 @@ deno_core::extension!( http_next::op_http_cancel, ], esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"], + options = { + options: Options, + }, + state = |state, options| { + state.put::<Options>(options.options); + } ); +#[derive(Debug, thiserror::Error)] +pub enum HttpError { + #[error(transparent)] + Resource(deno_core::error::AnyError), + #[error(transparent)] + Canceled(#[from] deno_core::Canceled), + #[error("{0}")] + HyperV014(#[source] Arc<hyper_v014::Error>), + #[error("{0}")] + InvalidHeaderName(#[from] hyper_v014::header::InvalidHeaderName), + #[error("{0}")] + InvalidHeaderValue(#[from] hyper_v014::header::InvalidHeaderValue), + #[error("{0}")] + Http(#[from] hyper_v014::http::Error), + #[error("response headers already sent")] + ResponseHeadersAlreadySent, + #[error("connection closed while sending response")] + ConnectionClosedWhileSendingResponse, + #[error("already in use")] + AlreadyInUse, + #[error("{0}")] + Io(#[from] std::io::Error), + #[error("no response headers")] + NoResponseHeaders, + #[error("response already completed")] + ResponseAlreadyCompleted, + #[error("cannot upgrade because request body was used")] + UpgradeBodyUsed, + #[error(transparent)] + Other(deno_core::error::AnyError), +} + pub enum HttpSocketAddr { IpSocket(std::net::SocketAddr), #[cfg(unix)] @@ -216,7 +278,7 @@ impl HttpConnResource { String, String, )>, - AnyError, + HttpError, > { let fut = async { let (request_tx, request_rx) = oneshot::channel(); @@ -259,8 +321,8 @@ impl HttpConnResource { } /// A future that completes when this HTTP connection is closed or errors. - async fn closed(&self) -> Result<(), AnyError> { - self.closed_fut.clone().map_err(AnyError::from).await + async fn closed(&self) -> Result<(), HttpError> { + self.closed_fut.clone().map_err(HttpError::HyperV014).await } } @@ -280,14 +342,13 @@ pub fn http_create_conn_resource<S, A>( io: S, addr: A, scheme: &'static str, -) -> Result<ResourceId, AnyError> +) -> ResourceId where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, A: Into<HttpSocketAddr>, { let conn = HttpConnResource::new(io, scheme, addr.into()); - let rid = state.resource_table.add(conn); - Ok(rid) + state.resource_table.add(conn) } /// An object that implements the `hyper::Service` trait, through which Hyper @@ -423,7 +484,9 @@ impl Resource for HttpStreamReadResource { // safely call `await` on it without creating a race condition. Some(_) => match body.as_mut().next().await.unwrap() { Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), + Err(err) => { + break Err(HttpError::HyperV014(Arc::new(err)).into()) + } }, None => break Ok(BufView::empty()), } @@ -545,8 +608,12 @@ struct NextRequestResponse( async fn op_http_accept( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, -) -> Result<Option<NextRequestResponse>, AnyError> { - let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?; +) -> Result<Option<NextRequestResponse>, HttpError> { + let conn = state + .borrow() + .resource_table + .get::<HttpConnResource>(rid) + .map_err(HttpError::Resource)?; match conn.accept().await { Ok(Some((read_stream, write_stream, method, url))) => { @@ -657,11 +724,12 @@ async fn op_http_write_headers( #[smi] status: u16, #[serde] headers: Vec<(ByteString, ByteString)>, #[serde] data: Option<StringOrBuffer>, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let stream = state .borrow_mut() .resource_table - .get::<HttpStreamWriteResource>(rid)?; + .get::<HttpStreamWriteResource>(rid) + .map_err(HttpError::Resource)?; // Track supported encoding let encoding = stream.accept_encoding; @@ -708,14 +776,14 @@ async fn op_http_write_headers( let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let response_tx = match replace(&mut *old_wr, new_wr) { HttpResponseWriter::Headers(response_tx) => response_tx, - _ => return Err(http_error("response headers already sent")), + _ => return Err(HttpError::ResponseHeadersAlreadySent), }; match response_tx.send(body) { Ok(_) => Ok(()), Err(_) => { stream.conn.closed().await?; - Err(http_error("connection closed while sending response")) + Err(HttpError::ConnectionClosedWhileSendingResponse) } } } @@ -725,11 +793,14 @@ async fn op_http_write_headers( fn op_http_headers( state: &mut OpState, #[smi] rid: u32, -) -> Result<Vec<(ByteString, ByteString)>, AnyError> { - let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?; +) -> Result<Vec<(ByteString, ByteString)>, HttpError> { + let stream = state + .resource_table + .get::<HttpStreamReadResource>(rid) + .map_err(HttpError::Resource)?; let rd = RcRef::map(&stream, |r| &r.rd) .try_borrow() - .ok_or_else(|| http_error("already in use"))?; + .ok_or(HttpError::AlreadyInUse)?; match &*rd { HttpRequestReader::Headers(request) => Ok(req_headers(request.headers())), HttpRequestReader::Body(headers, _) => Ok(req_headers(headers)), @@ -741,7 +812,7 @@ fn http_response( data: Option<StringOrBuffer>, compressing: bool, encoding: Encoding, -) -> Result<(HttpResponseWriter, hyper_v014::Body), AnyError> { +) -> Result<(HttpResponseWriter, hyper_v014::Body), HttpError> { // Gzip, after level 1, doesn't produce significant size difference. // This default matches nginx default gzip compression level (1): // https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level @@ -878,25 +949,34 @@ async fn op_http_write_resource( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, #[smi] stream: ResourceId, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let http_stream = state .borrow() .resource_table - .get::<HttpStreamWriteResource>(rid)?; + .get::<HttpStreamWriteResource>(rid) + .map_err(HttpError::Resource)?; let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; - let resource = state.borrow().resource_table.get_any(stream)?; + let resource = state + .borrow() + .resource_table + .get_any(stream) + .map_err(HttpError::Resource)?; loop { match *wr { HttpResponseWriter::Headers(_) => { - return Err(http_error("no response headers")) + return Err(HttpError::NoResponseHeaders) } HttpResponseWriter::Closed => { - return Err(http_error("response already completed")) + return Err(HttpError::ResponseAlreadyCompleted) } _ => {} }; - let view = resource.clone().read(64 * 1024).await?; // 64KB + let view = resource + .clone() + .read(64 * 1024) + .await + .map_err(HttpError::Other)?; // 64KB if view.is_empty() { break; } @@ -937,16 +1017,17 @@ async fn op_http_write( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, #[buffer] buf: JsBuffer, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let stream = state .borrow() .resource_table - .get::<HttpStreamWriteResource>(rid)?; + .get::<HttpStreamWriteResource>(rid) + .map_err(HttpError::Resource)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; match &mut *wr { - HttpResponseWriter::Headers(_) => Err(http_error("no response headers")), - HttpResponseWriter::Closed => Err(http_error("response already completed")), + HttpResponseWriter::Headers(_) => Err(HttpError::NoResponseHeaders), + HttpResponseWriter::Closed => Err(HttpError::ResponseAlreadyCompleted), HttpResponseWriter::Body { writer, .. } => { let mut result = writer.write_all(&buf).await; if result.is_ok() { @@ -961,7 +1042,7 @@ async fn op_http_write( stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; - Err(http_error("response already completed")) + Err(HttpError::ResponseAlreadyCompleted) } } } @@ -975,7 +1056,7 @@ async fn op_http_write( stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; - Err(http_error("response already completed")) + Err(HttpError::ResponseAlreadyCompleted) } } } @@ -989,11 +1070,12 @@ async fn op_http_write( async fn op_http_shutdown( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, -) -> Result<(), AnyError> { +) -> Result<(), HttpError> { let stream = state .borrow() .resource_table - .get::<HttpStreamWriteResource>(rid)?; + .get::<HttpStreamWriteResource>(rid) + .map_err(HttpError::Resource)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let wr = take(&mut *wr); match wr { @@ -1022,14 +1104,12 @@ async fn op_http_shutdown( #[op2] #[string] -fn op_http_websocket_accept_header( - #[string] key: String, -) -> Result<String, AnyError> { +fn op_http_websocket_accept_header(#[string] key: String) -> String { let digest = ring::digest::digest( &ring::digest::SHA1_FOR_LEGACY_USE_ONLY, format!("{key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11").as_bytes(), ); - Ok(BASE64_STANDARD.encode(digest)) + BASE64_STANDARD.encode(digest) } #[op2(async)] @@ -1037,30 +1117,34 @@ fn op_http_websocket_accept_header( async fn op_http_upgrade_websocket( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, -) -> Result<ResourceId, AnyError> { +) -> Result<ResourceId, HttpError> { let stream = state .borrow_mut() .resource_table - .get::<HttpStreamReadResource>(rid)?; + .get::<HttpStreamReadResource>(rid) + .map_err(HttpError::Resource)?; let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; let request = match &mut *rd { HttpRequestReader::Headers(request) => request, - _ => { - return Err(http_error("cannot upgrade because request body was used")) - } + _ => return Err(HttpError::UpgradeBodyUsed), }; - let (transport, bytes) = - extract_network_stream(hyper_v014::upgrade::on(request).await?); - let ws_rid = - ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?; - Ok(ws_rid) + let (transport, bytes) = extract_network_stream( + hyper_v014::upgrade::on(request) + .await + .map_err(|err| HttpError::HyperV014(Arc::new(err)))?, + ); + Ok(ws_create_server_stream( + &mut state.borrow_mut(), + transport, + bytes, + )) } // Needed so hyper can use non Send futures #[derive(Clone)] -struct LocalExecutor; +pub struct LocalExecutor; impl<Fut> hyper_v014::rt::Executor<Fut> for LocalExecutor where @@ -1082,10 +1166,6 @@ where } } -fn http_error(message: &'static str) -> AnyError { - custom_error("Http", message) -} - /// Filters out the ever-surprising 'shutdown ENOTCONN' errors. fn filter_enotconn( result: Result<(), hyper_v014::Error>, |