summaryrefslogtreecommitdiff
path: root/ext/http/lib.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-04-02 15:27:12 -0600
committerGitHub <noreply@github.com>2023-04-02 23:27:12 +0200
commit513dadadcff3e54a59c7d943f33acd90ff12ad89 (patch)
tree512ed6eaf6a99f8d6766d7e545abd8bf3f18dada /ext/http/lib.rs
parentd939a5e96c5ad1068a38186d01dc2ff0195478e5 (diff)
feat(ext/http): add an op to perform raw HTTP upgrade (#18511)
This commit adds new "op_http_upgrade_early", that allows to hijack existing "Deno.HttpConn" acquired from "Deno.serveHttp" API and performing a Websocket upgrade on this connection. This is not a public API and is meant to be used internally in the "ext/node" polyfills for "http" module. --------- Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r--ext/http/lib.rs191
1 files changed, 191 insertions, 0 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 20436a82d..289e7bf0f 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -32,6 +32,7 @@ use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::StringOrBuffer;
+use deno_core::WriteOutcome;
use deno_core::ZeroCopyBuf;
use deno_websocket::ws_create_server_stream;
use flate2::write::GzEncoder;
@@ -65,15 +66,18 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
+use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::task::spawn_local;
+use websocket_upgrade::WebSocketUpgrade;
use crate::reader_stream::ExternallyAbortableReaderStream;
use crate::reader_stream::ShutdownHandle;
pub mod compressible;
mod reader_stream;
+mod websocket_upgrade;
deno_core::extension!(
deno_http,
@@ -86,6 +90,7 @@ deno_core::extension!(
op_http_write_resource,
op_http_shutdown,
op_http_websocket_accept_header,
+ op_http_upgrade_early,
op_http_upgrade_websocket,
],
esm = ["01_http.js"],
@@ -938,6 +943,192 @@ fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> {
Ok(base64::encode(digest))
}
+struct EarlyUpgradeSocket(AsyncRefCell<EarlyUpgradeSocketInner>, CancelHandle);
+
+enum EarlyUpgradeSocketInner {
+ PreResponse(
+ Rc<HttpStreamResource>,
+ WebSocketUpgrade,
+ // Readers need to block in this state, so they can wait here for the broadcast.
+ tokio::sync::broadcast::Sender<
+ Rc<AsyncRefCell<tokio::io::ReadHalf<hyper::upgrade::Upgraded>>>,
+ >,
+ ),
+ PostResponse(
+ Rc<AsyncRefCell<tokio::io::ReadHalf<hyper::upgrade::Upgraded>>>,
+ Rc<AsyncRefCell<tokio::io::WriteHalf<hyper::upgrade::Upgraded>>>,
+ ),
+}
+
+impl EarlyUpgradeSocket {
+ /// Gets a reader without holding the lock.
+ async fn get_reader(
+ self: Rc<Self>,
+ ) -> Result<
+ Rc<AsyncRefCell<tokio::io::ReadHalf<hyper::upgrade::Upgraded>>>,
+ AnyError,
+ > {
+ let mut borrow = RcRef::map(self.clone(), |x| &x.0).borrow_mut().await;
+ let cancel = RcRef::map(self, |x| &x.1);
+ let inner = &mut *borrow;
+ match inner {
+ EarlyUpgradeSocketInner::PreResponse(_, _, tx) => {
+ let mut rx = tx.subscribe();
+ // Ensure we're not borrowing self here
+ drop(borrow);
+ Ok(
+ rx.recv()
+ .map_err(AnyError::from)
+ .try_or_cancel(&cancel)
+ .await?,
+ )
+ }
+ EarlyUpgradeSocketInner::PostResponse(rx, _) => Ok(rx.clone()),
+ }
+ }
+
+ async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> {
+ let reader = self.clone().get_reader().await?;
+ let cancel = RcRef::map(self, |x| &x.1);
+ Ok(
+ reader
+ .borrow_mut()
+ .await
+ .read(data)
+ .try_or_cancel(&cancel)
+ .await?,
+ )
+ }
+
+ /// Write all the data provided, only holding the lock while we see if the connection needs to be
+ /// upgraded.
+ async fn write_all(self: Rc<Self>, buf: &[u8]) -> Result<(), AnyError> {
+ let mut borrow = RcRef::map(self.clone(), |x| &x.0).borrow_mut().await;
+ let cancel = RcRef::map(self, |x| &x.1);
+ let inner = &mut *borrow;
+ match inner {
+ EarlyUpgradeSocketInner::PreResponse(stream, upgrade, rx_tx) => {
+ if let Some((resp, extra)) = upgrade.write(buf)? {
+ let new_wr = HttpResponseWriter::Closed;
+ let mut old_wr =
+ RcRef::map(stream.clone(), |r| &r.wr).borrow_mut().await;
+ let response_tx = match replace(&mut *old_wr, new_wr) {
+ HttpResponseWriter::Headers(response_tx) => response_tx,
+ _ => return Err(http_error("response headers already sent")),
+ };
+
+ if response_tx.send(resp).is_err() {
+ stream.conn.closed().await?;
+ return Err(http_error("connection closed while sending response"));
+ };
+
+ let mut old_rd =
+ RcRef::map(stream.clone(), |r| &r.rd).borrow_mut().await;
+ let new_rd = HttpRequestReader::Closed;
+ let upgraded = match replace(&mut *old_rd, new_rd) {
+ HttpRequestReader::Headers(request) => {
+ hyper::upgrade::on(request)
+ .map_err(AnyError::from)
+ .try_or_cancel(&cancel)
+ .await?
+ }
+ _ => {
+ return Err(http_error("response already started"));
+ }
+ };
+
+ let (rx, tx) = tokio::io::split(upgraded);
+ let rx = Rc::new(AsyncRefCell::new(rx));
+ let tx = Rc::new(AsyncRefCell::new(tx));
+
+ // Take the tx and rx lock before we allow anything else to happen because we want to control
+ // the order of reads and writes.
+ let mut tx_lock = tx.clone().borrow_mut().await;
+ let rx_lock = rx.clone().borrow_mut().await;
+
+ // Allow all the pending readers to go now. We still have the lock on inner, so no more
+ // pending readers can show up. We intentionally ignore errors here, as there may be
+ // nobody waiting on a read.
+ _ = rx_tx.send(rx.clone());
+
+ // We swap out inner here, so once the lock is gone, readers will acquire rx directly.
+ // We also fully release our lock.
+ *inner = EarlyUpgradeSocketInner::PostResponse(rx, tx);
+ drop(borrow);
+
+ // We've updated inner and unlocked it, reads are free to go in-order.
+ drop(rx_lock);
+
+ // If we had extra data after the response, write that to the upgraded connection
+ if !extra.is_empty() {
+ tx_lock.write_all(&extra).try_or_cancel(&cancel).await?;
+ }
+ }
+ }
+ EarlyUpgradeSocketInner::PostResponse(_, tx) => {
+ let tx = tx.clone();
+ drop(borrow);
+ tx.borrow_mut()
+ .await
+ .write_all(buf)
+ .try_or_cancel(&cancel)
+ .await?;
+ }
+ };
+ Ok(())
+ }
+}
+
+impl Resource for EarlyUpgradeSocket {
+ fn name(&self) -> Cow<str> {
+ "upgradedHttpConnection".into()
+ }
+
+ deno_core::impl_readable_byob!();
+
+ fn write(
+ self: Rc<Self>,
+ buf: BufView,
+ ) -> AsyncResult<deno_core::WriteOutcome> {
+ Box::pin(async move {
+ let nwritten = buf.len();
+ Self::write_all(self, &buf).await?;
+ Ok(WriteOutcome::Full { nwritten })
+ })
+ }
+
+ fn write_all(self: Rc<Self>, buf: BufView) -> AsyncResult<()> {
+ Box::pin(async move { Self::write_all(self, &buf).await })
+ }
+
+ fn close(self: Rc<Self>) {
+ self.1.cancel()
+ }
+}
+
+#[op]
+async fn op_http_upgrade_early(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+) -> Result<ResourceId, AnyError> {
+ let stream = state
+ .borrow_mut()
+ .resource_table
+ .get::<HttpStreamResource>(rid)?;
+ let resources = &mut state.borrow_mut().resource_table;
+ let (tx, _rx) = tokio::sync::broadcast::channel(1);
+ let socket = EarlyUpgradeSocketInner::PreResponse(
+ stream,
+ WebSocketUpgrade::default(),
+ tx,
+ );
+ let rid = resources.add(EarlyUpgradeSocket(
+ AsyncRefCell::new(socket),
+ CancelHandle::new(),
+ ));
+ Ok(rid)
+}
+
struct UpgradedStream(hyper::upgrade::Upgraded);
impl tokio::io::AsyncRead for UpgradedStream {
fn poll_read(