diff options
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r-- | ext/http/http_next.rs | 135 |
1 files changed, 134 insertions, 1 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 1c2a232e2..593a9c816 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,11 +10,13 @@ use crate::response_body::CompletionHandle; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; use crate::response_body::V8StreamHttpResponseBody; +use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; use deno_core::error::AnyError; use deno_core::futures::TryFutureExt; use deno_core::op; use deno_core::AsyncRefCell; +use deno_core::AsyncResult; use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; @@ -39,6 +41,7 @@ use hyper1::server::conn::http2; use hyper1::service::service_fn; use hyper1::service::HttpService; use hyper1::upgrade::OnUpgrade; + use hyper1::StatusCode; use pin_project::pin_project; use pin_project::pinned_drop; @@ -52,6 +55,10 @@ use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; use std::rc::Rc; + +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; + use tokio::task::spawn_local; use tokio::task::JoinHandle; @@ -228,7 +235,79 @@ fn slab_insert( } #[op] -pub fn op_upgrade_raw(_index: usize) {} +pub fn op_upgrade_raw( + state: &mut OpState, + index: u32, +) -> Result<ResourceId, AnyError> { + // Stage 1: extract the upgrade future + let upgrade = with_http_mut(index, |http| { + // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit + http + .request_parts + .extensions + .remove::<OnUpgrade>() + .ok_or_else(|| AnyError::msg("upgrade unavailable")) + })?; + + let (read, write) = tokio::io::duplex(1024); + let (read_rx, write_tx) = tokio::io::split(read); + let (mut write_rx, mut read_tx) = tokio::io::split(write); + + spawn_local(async move { + let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default(); + + // Stage 2: Extract the Upgraded connection + let mut buf = [0; 1024]; + let upgraded = loop { + let read = Pin::new(&mut write_rx).read(&mut buf).await?; + match upgrade_stream.write(&buf[..read]) { + Ok(None) => continue, + Ok(Some((response, bytes))) => { + with_resp_mut(index, |resp| *resp = Some(response)); + with_promise_mut(index, |promise| promise.complete(true)); + let mut upgraded = upgrade.await?; + upgraded.write_all(&bytes).await?; + break upgraded; + } + Err(err) => return Err(err), + } + }; + + // Stage 3: Pump the data + let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); + + spawn_local(async move { + let mut buf = [0; 1024]; + loop { + let read = upgraded_rx.read(&mut buf).await?; + if read == 0 { + break; + } + read_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + spawn_local(async move { + let mut buf = [0; 1024]; + loop { + let read = write_rx.read(&mut buf).await?; + if read == 0 { + break; + } + upgraded_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + + Ok(()) + }); + + Ok( + state + .resource_table + .add(UpgradeStream::new(read_rx, write_tx)), + ) +} #[op] pub async fn op_upgrade( @@ -825,3 +904,57 @@ pub async fn op_http_wait( Ok(u32::MAX) } + +struct UpgradeStream { + read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>, + write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>, + cancel_handle: CancelHandle, +} + +impl UpgradeStream { + pub fn new( + read: tokio::io::ReadHalf<tokio::io::DuplexStream>, + write: tokio::io::WriteHalf<tokio::io::DuplexStream>, + ) -> Self { + Self { + read: AsyncRefCell::new(read), + write: AsyncRefCell::new(write), + cancel_handle: CancelHandle::new(), + } + } + + async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let read = RcRef::map(self, |this| &this.read); + let mut read = read.borrow_mut().await; + Ok(Pin::new(&mut *read).read(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } + + async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let write = RcRef::map(self, |this| &this.write); + let mut write = write.borrow_mut().await; + Ok(Pin::new(&mut *write).write(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } +} + +impl Resource for UpgradeStream { + fn name(&self) -> Cow<str> { + "httpRawUpgradeStream".into() + } + + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel(); + } +} |