diff options
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r-- | ext/fetch/lib.rs | 191 |
1 files changed, 178 insertions, 13 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index a36512c77..ded69b2c4 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -23,6 +23,7 @@ use deno_core::op; use deno_core::BufView; use deno_core::WriteOutcome; +use deno_core::task::spawn; use deno_core::url::Url; use deno_core::AsyncRefCell; use deno_core::AsyncResult; @@ -58,6 +59,8 @@ use reqwest::RequestBuilder; use reqwest::Response; use serde::Deserialize; use serde::Serialize; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; use tokio::sync::mpsc; // Re-export reqwest and data_url @@ -109,6 +112,8 @@ deno_core::extension!(deno_fetch, ops = [ op_fetch<FP>, op_fetch_send, + op_fetch_response_into_byte_stream, + op_fetch_response_upgrade, op_fetch_custom_client<FP>, ], esm = [ @@ -414,12 +419,15 @@ pub struct FetchResponse { pub url: String, pub response_rid: ResourceId, pub content_length: Option<u64>, + pub remote_addr_ip: Option<String>, + pub remote_addr_port: Option<u16>, } #[op] pub async fn op_fetch_send( state: Rc<RefCell<OpState>>, rid: ResourceId, + into_byte_stream: bool, ) -> Result<FetchResponse, AnyError> { let request = state .borrow_mut() @@ -436,7 +444,6 @@ pub async fn op_fetch_send( Err(_) => return Err(type_error("request was cancelled")), }; - //debug!("Fetch response {}", url); let status = res.status(); let url = res.url().to_string(); let mut res_headers = Vec::new(); @@ -445,29 +452,175 @@ pub async fn op_fetch_send( } let content_length = res.content_length(); + let remote_addr = res.remote_addr(); + let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr { + (Some(addr.ip().to_string()), Some(addr.port())) + } else { + (None, None) + }; - let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let rid = state - .borrow_mut() - .resource_table - .add(FetchResponseBodyResource { - reader: AsyncRefCell::new(stream.peekable()), - cancel: CancelHandle::default(), - size: content_length, - }); + let response_rid = if !into_byte_stream { + state + .borrow_mut() + .resource_table + .add(FetchResponseResource { + response: res, + size: content_length, + }) + } else { + let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + state + .borrow_mut() + .resource_table + .add(FetchResponseBodyResource { + reader: AsyncRefCell::new(stream.peekable()), + cancel: CancelHandle::default(), + size: content_length, + }) + }; Ok(FetchResponse { status: status.as_u16(), status_text: status.canonical_reason().unwrap_or("").to_string(), headers: res_headers, url, - response_rid: rid, + response_rid, content_length, + remote_addr_ip, + remote_addr_port, }) } +#[op] +pub fn op_fetch_response_into_byte_stream( + state: &mut OpState, + rid: ResourceId, +) -> Result<ResourceId, AnyError> { + let raw_response = state.resource_table.take::<FetchResponseResource>(rid)?; + let raw_response = Rc::try_unwrap(raw_response) + .expect("Someone is holding onto FetchResponseResource"); + let stream: BytesStream = + Box::pin(raw_response.response.bytes_stream().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + + let rid = state.resource_table.add(FetchResponseBodyResource { + reader: AsyncRefCell::new(stream.peekable()), + cancel: CancelHandle::default(), + size: raw_response.size, + }); + + Ok(rid) +} + +#[op] +pub async fn op_fetch_response_upgrade( + state: Rc<RefCell<OpState>>, + rid: ResourceId, +) -> Result<ResourceId, AnyError> { + let raw_response = state + .borrow_mut() + .resource_table + .take::<FetchResponseResource>(rid)?; + let raw_response = Rc::try_unwrap(raw_response) + .expect("Someone is holding onto FetchResponseResource"); + + let (read, write) = tokio::io::duplex(1024); + let (read_rx, write_tx) = tokio::io::split(read); + let (mut write_rx, mut read_tx) = tokio::io::split(write); + let upgraded = raw_response.response.upgrade().await?; + { + // Stage 3: Pump the data + let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); + + spawn(async move { + let mut buf = [0; 1024]; + loop { + let read = upgraded_rx.read(&mut buf).await?; + if read == 0 { + break; + } + read_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + spawn(async move { + let mut buf = [0; 1024]; + loop { + let read = write_rx.read(&mut buf).await?; + if read == 0 { + break; + } + upgraded_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + } + + Ok( + state + .borrow_mut() + .resource_table + .add(UpgradeStream::new(read_rx, write_tx)), + ) +} + +struct UpgradeStream { + read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>, + write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>, + cancel_handle: CancelHandle, +} + +impl UpgradeStream { + pub fn new( + read: tokio::io::ReadHalf<tokio::io::DuplexStream>, + write: tokio::io::WriteHalf<tokio::io::DuplexStream>, + ) -> Self { + Self { + read: AsyncRefCell::new(read), + write: AsyncRefCell::new(write), + cancel_handle: CancelHandle::new(), + } + } + + async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let read = RcRef::map(self, |this| &this.read); + let mut read = read.borrow_mut().await; + Ok(Pin::new(&mut *read).read(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } + + async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let write = RcRef::map(self, |this| &this.write); + let mut write = write.borrow_mut().await; + Ok(Pin::new(&mut *write).write(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } +} + +impl Resource for UpgradeStream { + fn name(&self) -> Cow<str> { + "fetchUpgradedStream".into() + } + + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel(); + } +} + type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>; pub struct FetchRequestResource( @@ -545,6 +698,18 @@ impl Resource for FetchRequestBodyResource { type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; +#[derive(Debug)] +pub struct FetchResponseResource { + pub response: Response, + pub size: Option<u64>, +} + +impl Resource for FetchResponseResource { + fn name(&self) -> Cow<str> { + "fetchResponse".into() + } +} + pub struct FetchResponseBodyResource { pub reader: AsyncRefCell<Peekable<BytesStream>>, pub cancel: CancelHandle, |