diff options
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 84 |
1 files changed, 81 insertions, 3 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 43e3c130a..561b13885 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -34,6 +34,7 @@ use deno_core::ResourceId; use deno_core::StringOrBuffer; use deno_core::WriteOutcome; use deno_core::ZeroCopyBuf; +use deno_net::raw::NetworkStream; use deno_websocket::ws_create_server_stream; use flate2::write::GzEncoder; use flate2::Compression; @@ -76,7 +77,11 @@ use crate::reader_stream::ExternallyAbortableReaderStream; use crate::reader_stream::ShutdownHandle; pub mod compressible; +mod http_next; mod reader_stream; +mod request_body; +mod request_properties; +mod response_body; mod websocket_upgrade; deno_core::extension!( @@ -92,8 +97,25 @@ deno_core::extension!( op_http_websocket_accept_header, op_http_upgrade_early, op_http_upgrade_websocket, + http_next::op_serve_http, + http_next::op_serve_http_on, + http_next::op_http_wait, + http_next::op_http_track, + http_next::op_set_response_header, + http_next::op_set_response_headers, + http_next::op_set_response_body_text, + http_next::op_set_promise_complete, + http_next::op_set_response_body_bytes, + http_next::op_set_response_body_resource, + http_next::op_set_response_body_stream, + http_next::op_get_request_header, + http_next::op_get_request_headers, + http_next::op_get_request_method_and_url, + http_next::op_read_request_body, + http_next::op_upgrade, + http_next::op_upgrade_raw, ], - esm = ["01_http.js"], + esm = ["00_serve.js", "01_http.js"], ); pub enum HttpSocketAddr { @@ -1147,8 +1169,10 @@ async fn op_http_upgrade_websocket( } }; - let transport = hyper::upgrade::on(request).await?; - let ws_rid = ws_create_server_stream(&state, transport).await?; + let (transport, bytes) = + extract_network_stream(hyper::upgrade::on(request).await?); + let ws_rid = + ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?; Ok(ws_rid) } @@ -1166,6 +1190,16 @@ where } } +impl<Fut> hyper1::rt::Executor<Fut> for LocalExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + spawn_local(fut); + } +} + fn http_error(message: &'static str) -> AnyError { custom_error("Http", message) } @@ -1192,3 +1226,47 @@ fn filter_enotconn( fn never() -> Pending<Never> { pending() } + +trait CanDowncastUpgrade: Sized { + fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>( + self, + ) -> Result<(T, Bytes), Self>; +} + +impl CanDowncastUpgrade for hyper1::upgrade::Upgraded { + fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>( + self, + ) -> Result<(T, Bytes), Self> { + let hyper1::upgrade::Parts { io, read_buf, .. } = self.downcast()?; + Ok((io, read_buf)) + } +} + +impl CanDowncastUpgrade for hyper::upgrade::Upgraded { + fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>( + self, + ) -> Result<(T, Bytes), Self> { + let hyper::upgrade::Parts { io, read_buf, .. } = self.downcast()?; + Ok((io, read_buf)) + } +} + +fn extract_network_stream<U: CanDowncastUpgrade>( + upgraded: U, +) -> (NetworkStream, Bytes) { + let upgraded = match upgraded.downcast::<tokio::net::TcpStream>() { + Ok((stream, bytes)) => return (NetworkStream::Tcp(stream), bytes), + Err(x) => x, + }; + let upgraded = match upgraded.downcast::<deno_net::ops_tls::TlsStream>() { + Ok((stream, bytes)) => return (NetworkStream::Tls(stream), bytes), + Err(x) => x, + }; + #[cfg(unix)] + let upgraded = match upgraded.downcast::<tokio::net::UnixStream>() { + Ok((stream, bytes)) => return (NetworkStream::Unix(stream), bytes), + Err(x) => x, + }; + drop(upgraded); + unreachable!("unexpected stream type"); +} |