diff options
author | Satya Rohith <me@satyarohith.com> | 2024-09-04 18:10:28 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-04 18:10:28 +0530 |
commit | 334c842392e2587b8ca1d7cc7cc7d9231fc15286 (patch) | |
tree | 755a7cf9b78ef0a81d566c9d6ff2c9538cca0283 /ext/fetch | |
parent | 84dc375b2d28a0ba9ddf0fbc5168505c19b1adea (diff) |
chore(ext/fetch): remove op_fetch_response_upgrade (#25421)
Diffstat (limited to 'ext/fetch')
-rw-r--r-- | ext/fetch/README.md | 1 | ||||
-rw-r--r-- | ext/fetch/lib.rs | 113 |
2 files changed, 0 insertions, 114 deletions
diff --git a/ext/fetch/README.md b/ext/fetch/README.md index d088a6147..3af8110a6 100644 --- a/ext/fetch/README.md +++ b/ext/fetch/README.md @@ -78,6 +78,5 @@ Following ops are provided, which can be accessed through `Deno.ops`: - op_fetch - op_fetch_send -- op_fetch_response_upgrade - op_utf8_to_byte_string - op_fetch_custom_client diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index a3f9eeb4f..79659771e 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -28,7 +28,6 @@ use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::op2; -use deno_core::unsync::spawn; use deno_core::url::Url; use deno_core::AsyncRefCell; use deno_core::AsyncResult; @@ -70,12 +69,9 @@ use hyper::body::Frame; use hyper_util::client::legacy::connect::HttpConnector; use hyper_util::client::legacy::connect::HttpInfo; use hyper_util::rt::TokioExecutor; -use hyper_util::rt::TokioIo; use hyper_util::rt::TokioTimer; use serde::Deserialize; use serde::Serialize; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWriteExt; use tower::ServiceExt; use tower_http::decompression::Decompression; @@ -127,7 +123,6 @@ deno_core::extension!(deno_fetch, ops = [ op_fetch<FP>, op_fetch_send, - op_fetch_response_upgrade, op_utf8_to_byte_string, op_fetch_custom_client<FP>, ], @@ -627,114 +622,6 @@ pub async fn op_fetch_send( }) } -#[op2(async)] -#[smi] -pub async fn op_fetch_response_upgrade( - state: Rc<RefCell<OpState>>, - #[smi] rid: ResourceId, -) -> Result<ResourceId, AnyError> { - let raw_response = state - .borrow_mut() - .resource_table - .take::<FetchResponseResource>(rid)?; - let raw_response = Rc::try_unwrap(raw_response) - .expect("Someone is holding onto FetchResponseResource"); - - let (read, write) = tokio::io::duplex(1024); - let (read_rx, write_tx) = tokio::io::split(read); - let (mut write_rx, mut read_tx) = tokio::io::split(write); - let upgraded = raw_response.upgrade().await?; - { - // Stage 3: Pump the data - let (mut upgraded_rx, mut upgraded_tx) = - tokio::io::split(TokioIo::new(upgraded)); - - spawn(async move { - let mut buf = [0; 1024]; - loop { - let read = upgraded_rx.read(&mut buf).await?; - if read == 0 { - break; - } - read_tx.write_all(&buf[..read]).await?; - } - Ok::<_, AnyError>(()) - }); - spawn(async move { - let mut buf = [0; 1024]; - loop { - let read = write_rx.read(&mut buf).await?; - if read == 0 { - break; - } - upgraded_tx.write_all(&buf[..read]).await?; - } - Ok::<_, AnyError>(()) - }); - } - - Ok( - state - .borrow_mut() - .resource_table - .add(UpgradeStream::new(read_rx, write_tx)), - ) -} - -struct UpgradeStream { - read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>, - write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>, - cancel_handle: CancelHandle, -} - -impl UpgradeStream { - pub fn new( - read: tokio::io::ReadHalf<tokio::io::DuplexStream>, - write: tokio::io::WriteHalf<tokio::io::DuplexStream>, - ) -> Self { - Self { - read: AsyncRefCell::new(read), - write: AsyncRefCell::new(write), - cancel_handle: CancelHandle::new(), - } - } - - async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { - let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); - async { - let read = RcRef::map(self, |this| &this.read); - let mut read = read.borrow_mut().await; - Ok(Pin::new(&mut *read).read(buf).await?) - } - .try_or_cancel(cancel_handle) - .await - } - - async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { - let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); - async { - let write = RcRef::map(self, |this| &this.write); - let mut write = write.borrow_mut().await; - Ok(Pin::new(&mut *write).write(buf).await?) - } - .try_or_cancel(cancel_handle) - .await - } -} - -impl Resource for UpgradeStream { - fn name(&self) -> Cow<str> { - "fetchUpgradedStream".into() - } - - deno_core::impl_readable_byob!(); - deno_core::impl_writable!(); - - fn close(self: Rc<Self>) { - self.cancel_handle.cancel(); - } -} - type CancelableResponseResult = Result<Result<http::Response<ResBody>, AnyError>, Canceled>; |