diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-04-02 15:27:12 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-02 23:27:12 +0200 |
commit | 513dadadcff3e54a59c7d943f33acd90ff12ad89 (patch) | |
tree | 512ed6eaf6a99f8d6766d7e545abd8bf3f18dada /ext/http/lib.rs | |
parent | d939a5e96c5ad1068a38186d01dc2ff0195478e5 (diff) |
feat(ext/http): add an op to perform raw HTTP upgrade (#18511)
This commit adds new "op_http_upgrade_early", that allows to hijack
existing "Deno.HttpConn" acquired from "Deno.serveHttp" API
and performing a Websocket upgrade on this connection.
This is not a public API and is meant to be used internally in the
"ext/node" polyfills for "http" module.
---------
Co-authored-by: Bartek IwaĆczuk <biwanczuk@gmail.com>
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 20436a82d..289e7bf0f 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -32,6 +32,7 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::StringOrBuffer; +use deno_core::WriteOutcome; use deno_core::ZeroCopyBuf; use deno_websocket::ws_create_server_stream; use flate2::write::GzEncoder; @@ -65,15 +66,18 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; +use websocket_upgrade::WebSocketUpgrade; use crate::reader_stream::ExternallyAbortableReaderStream; use crate::reader_stream::ShutdownHandle; pub mod compressible; mod reader_stream; +mod websocket_upgrade; deno_core::extension!( deno_http, @@ -86,6 +90,7 @@ deno_core::extension!( op_http_write_resource, op_http_shutdown, op_http_websocket_accept_header, + op_http_upgrade_early, op_http_upgrade_websocket, ], esm = ["01_http.js"], @@ -938,6 +943,192 @@ fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> { Ok(base64::encode(digest)) } +struct EarlyUpgradeSocket(AsyncRefCell<EarlyUpgradeSocketInner>, CancelHandle); + +enum EarlyUpgradeSocketInner { + PreResponse( + Rc<HttpStreamResource>, + WebSocketUpgrade, + // Readers need to block in this state, so they can wait here for the broadcast. + tokio::sync::broadcast::Sender< + Rc<AsyncRefCell<tokio::io::ReadHalf<hyper::upgrade::Upgraded>>>, + >, + ), + PostResponse( + Rc<AsyncRefCell<tokio::io::ReadHalf<hyper::upgrade::Upgraded>>>, + Rc<AsyncRefCell<tokio::io::WriteHalf<hyper::upgrade::Upgraded>>>, + ), +} + +impl EarlyUpgradeSocket { + /// Gets a reader without holding the lock. + async fn get_reader( + self: Rc<Self>, + ) -> Result< + Rc<AsyncRefCell<tokio::io::ReadHalf<hyper::upgrade::Upgraded>>>, + AnyError, + > { + let mut borrow = RcRef::map(self.clone(), |x| &x.0).borrow_mut().await; + let cancel = RcRef::map(self, |x| &x.1); + let inner = &mut *borrow; + match inner { + EarlyUpgradeSocketInner::PreResponse(_, _, tx) => { + let mut rx = tx.subscribe(); + // Ensure we're not borrowing self here + drop(borrow); + Ok( + rx.recv() + .map_err(AnyError::from) + .try_or_cancel(&cancel) + .await?, + ) + } + EarlyUpgradeSocketInner::PostResponse(rx, _) => Ok(rx.clone()), + } + } + + async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> { + let reader = self.clone().get_reader().await?; + let cancel = RcRef::map(self, |x| &x.1); + Ok( + reader + .borrow_mut() + .await + .read(data) + .try_or_cancel(&cancel) + .await?, + ) + } + + /// Write all the data provided, only holding the lock while we see if the connection needs to be + /// upgraded. + async fn write_all(self: Rc<Self>, buf: &[u8]) -> Result<(), AnyError> { + let mut borrow = RcRef::map(self.clone(), |x| &x.0).borrow_mut().await; + let cancel = RcRef::map(self, |x| &x.1); + let inner = &mut *borrow; + match inner { + EarlyUpgradeSocketInner::PreResponse(stream, upgrade, rx_tx) => { + if let Some((resp, extra)) = upgrade.write(buf)? { + let new_wr = HttpResponseWriter::Closed; + let mut old_wr = + RcRef::map(stream.clone(), |r| &r.wr).borrow_mut().await; + let response_tx = match replace(&mut *old_wr, new_wr) { + HttpResponseWriter::Headers(response_tx) => response_tx, + _ => return Err(http_error("response headers already sent")), + }; + + if response_tx.send(resp).is_err() { + stream.conn.closed().await?; + return Err(http_error("connection closed while sending response")); + }; + + let mut old_rd = + RcRef::map(stream.clone(), |r| &r.rd).borrow_mut().await; + let new_rd = HttpRequestReader::Closed; + let upgraded = match replace(&mut *old_rd, new_rd) { + HttpRequestReader::Headers(request) => { + hyper::upgrade::on(request) + .map_err(AnyError::from) + .try_or_cancel(&cancel) + .await? + } + _ => { + return Err(http_error("response already started")); + } + }; + + let (rx, tx) = tokio::io::split(upgraded); + let rx = Rc::new(AsyncRefCell::new(rx)); + let tx = Rc::new(AsyncRefCell::new(tx)); + + // Take the tx and rx lock before we allow anything else to happen because we want to control + // the order of reads and writes. + let mut tx_lock = tx.clone().borrow_mut().await; + let rx_lock = rx.clone().borrow_mut().await; + + // Allow all the pending readers to go now. We still have the lock on inner, so no more + // pending readers can show up. We intentionally ignore errors here, as there may be + // nobody waiting on a read. + _ = rx_tx.send(rx.clone()); + + // We swap out inner here, so once the lock is gone, readers will acquire rx directly. + // We also fully release our lock. + *inner = EarlyUpgradeSocketInner::PostResponse(rx, tx); + drop(borrow); + + // We've updated inner and unlocked it, reads are free to go in-order. + drop(rx_lock); + + // If we had extra data after the response, write that to the upgraded connection + if !extra.is_empty() { + tx_lock.write_all(&extra).try_or_cancel(&cancel).await?; + } + } + } + EarlyUpgradeSocketInner::PostResponse(_, tx) => { + let tx = tx.clone(); + drop(borrow); + tx.borrow_mut() + .await + .write_all(buf) + .try_or_cancel(&cancel) + .await?; + } + }; + Ok(()) + } +} + +impl Resource for EarlyUpgradeSocket { + fn name(&self) -> Cow<str> { + "upgradedHttpConnection".into() + } + + deno_core::impl_readable_byob!(); + + fn write( + self: Rc<Self>, + buf: BufView, + ) -> AsyncResult<deno_core::WriteOutcome> { + Box::pin(async move { + let nwritten = buf.len(); + Self::write_all(self, &buf).await?; + Ok(WriteOutcome::Full { nwritten }) + }) + } + + fn write_all(self: Rc<Self>, buf: BufView) -> AsyncResult<()> { + Box::pin(async move { Self::write_all(self, &buf).await }) + } + + fn close(self: Rc<Self>) { + self.1.cancel() + } +} + +#[op] +async fn op_http_upgrade_early( + state: Rc<RefCell<OpState>>, + rid: ResourceId, +) -> Result<ResourceId, AnyError> { + let stream = state + .borrow_mut() + .resource_table + .get::<HttpStreamResource>(rid)?; + let resources = &mut state.borrow_mut().resource_table; + let (tx, _rx) = tokio::sync::broadcast::channel(1); + let socket = EarlyUpgradeSocketInner::PreResponse( + stream, + WebSocketUpgrade::default(), + tx, + ); + let rid = resources.add(EarlyUpgradeSocket( + AsyncRefCell::new(socket), + CancelHandle::new(), + )); + Ok(rid) +} + struct UpgradedStream(hyper::upgrade::Upgraded); impl tokio::io::AsyncRead for UpgradedStream { fn poll_read( |