summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSatya Rohith <me@satyarohith.com>2024-09-04 18:10:28 +0530
committerGitHub <noreply@github.com>2024-09-04 18:10:28 +0530
commit334c842392e2587b8ca1d7cc7cc7d9231fc15286 (patch)
tree755a7cf9b78ef0a81d566c9d6ff2c9538cca0283
parent84dc375b2d28a0ba9ddf0fbc5168505c19b1adea (diff)
chore(ext/fetch): remove op_fetch_response_upgrade (#25421)
-rw-r--r--ext/fetch/README.md1
-rw-r--r--ext/fetch/lib.rs113
2 files changed, 0 insertions, 114 deletions
diff --git a/ext/fetch/README.md b/ext/fetch/README.md
index d088a6147..3af8110a6 100644
--- a/ext/fetch/README.md
+++ b/ext/fetch/README.md
@@ -78,6 +78,5 @@ Following ops are provided, which can be accessed through `Deno.ops`:
- op_fetch
- op_fetch_send
-- op_fetch_response_upgrade
- op_utf8_to_byte_string
- op_fetch_custom_client
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index a3f9eeb4f..79659771e 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -28,7 +28,6 @@ use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt;
use deno_core::op2;
-use deno_core::unsync::spawn;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
@@ -70,12 +69,9 @@ use hyper::body::Frame;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::connect::HttpInfo;
use hyper_util::rt::TokioExecutor;
-use hyper_util::rt::TokioIo;
use hyper_util::rt::TokioTimer;
use serde::Deserialize;
use serde::Serialize;
-use tokio::io::AsyncReadExt;
-use tokio::io::AsyncWriteExt;
use tower::ServiceExt;
use tower_http::decompression::Decompression;
@@ -127,7 +123,6 @@ deno_core::extension!(deno_fetch,
ops = [
op_fetch<FP>,
op_fetch_send,
- op_fetch_response_upgrade,
op_utf8_to_byte_string,
op_fetch_custom_client<FP>,
],
@@ -627,114 +622,6 @@ pub async fn op_fetch_send(
})
}
-#[op2(async)]
-#[smi]
-pub async fn op_fetch_response_upgrade(
- state: Rc<RefCell<OpState>>,
- #[smi] rid: ResourceId,
-) -> Result<ResourceId, AnyError> {
- let raw_response = state
- .borrow_mut()
- .resource_table
- .take::<FetchResponseResource>(rid)?;
- let raw_response = Rc::try_unwrap(raw_response)
- .expect("Someone is holding onto FetchResponseResource");
-
- let (read, write) = tokio::io::duplex(1024);
- let (read_rx, write_tx) = tokio::io::split(read);
- let (mut write_rx, mut read_tx) = tokio::io::split(write);
- let upgraded = raw_response.upgrade().await?;
- {
- // Stage 3: Pump the data
- let (mut upgraded_rx, mut upgraded_tx) =
- tokio::io::split(TokioIo::new(upgraded));
-
- spawn(async move {
- let mut buf = [0; 1024];
- loop {
- let read = upgraded_rx.read(&mut buf).await?;
- if read == 0 {
- break;
- }
- read_tx.write_all(&buf[..read]).await?;
- }
- Ok::<_, AnyError>(())
- });
- spawn(async move {
- let mut buf = [0; 1024];
- loop {
- let read = write_rx.read(&mut buf).await?;
- if read == 0 {
- break;
- }
- upgraded_tx.write_all(&buf[..read]).await?;
- }
- Ok::<_, AnyError>(())
- });
- }
-
- Ok(
- state
- .borrow_mut()
- .resource_table
- .add(UpgradeStream::new(read_rx, write_tx)),
- )
-}
-
-struct UpgradeStream {
- read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
- write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,
- cancel_handle: CancelHandle,
-}
-
-impl UpgradeStream {
- pub fn new(
- read: tokio::io::ReadHalf<tokio::io::DuplexStream>,
- write: tokio::io::WriteHalf<tokio::io::DuplexStream>,
- ) -> Self {
- Self {
- read: AsyncRefCell::new(read),
- write: AsyncRefCell::new(write),
- cancel_handle: CancelHandle::new(),
- }
- }
-
- async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
- let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
- async {
- let read = RcRef::map(self, |this| &this.read);
- let mut read = read.borrow_mut().await;
- Ok(Pin::new(&mut *read).read(buf).await?)
- }
- .try_or_cancel(cancel_handle)
- .await
- }
-
- async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
- let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
- async {
- let write = RcRef::map(self, |this| &this.write);
- let mut write = write.borrow_mut().await;
- Ok(Pin::new(&mut *write).write(buf).await?)
- }
- .try_or_cancel(cancel_handle)
- .await
- }
-}
-
-impl Resource for UpgradeStream {
- fn name(&self) -> Cow<str> {
- "fetchUpgradedStream".into()
- }
-
- deno_core::impl_readable_byob!();
- deno_core::impl_writable!();
-
- fn close(self: Rc<Self>) {
- self.cancel_handle.cancel();
- }
-}
-
type CancelableResponseResult =
Result<Result<http::Response<ResBody>, AnyError>, Canceled>;