summaryrefslogtreecommitdiff
path: root/ext/fetch/lib.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-06-15 15:34:21 +0200
committerGitHub <noreply@github.com>2023-06-15 15:34:21 +0200
commitf145cbfaccd9f3b251b2f80690ad7c68b26d924b (patch)
treef546388ed6b1d3ce28400a5e32d85a75efcfd3a5 /ext/fetch/lib.rs
parent3d71c36888018d5154b42997bb64adc2892c034d (diff)
refactor(ext/fetch): simplify fetch ops (#19494)
Addresses feedback from https://github.com/denoland/deno/pull/19412#discussion_r1227912676
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r--ext/fetch/lib.rs114
1 files changed, 55 insertions, 59 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index ded69b2c4..538b741a6 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -112,7 +112,6 @@ deno_core::extension!(deno_fetch,
ops = [
op_fetch<FP>,
op_fetch_send,
- op_fetch_response_into_byte_stream,
op_fetch_response_upgrade,
op_fetch_custom_client<FP>,
],
@@ -427,7 +426,6 @@ pub struct FetchResponse {
pub async fn op_fetch_send(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
- into_byte_stream: bool,
) -> Result<FetchResponse, AnyError> {
let request = state
.borrow_mut()
@@ -459,27 +457,10 @@ pub async fn op_fetch_send(
(None, None)
};
- let response_rid = if !into_byte_stream {
- state
- .borrow_mut()
- .resource_table
- .add(FetchResponseResource {
- response: res,
- size: content_length,
- })
- } else {
- let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
- r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
- }));
- state
- .borrow_mut()
- .resource_table
- .add(FetchResponseBodyResource {
- reader: AsyncRefCell::new(stream.peekable()),
- cancel: CancelHandle::default(),
- size: content_length,
- })
- };
+ let response_rid = state
+ .borrow_mut()
+ .resource_table
+ .add(FetchResponseResource::new(res, content_length));
Ok(FetchResponse {
status: status.as_u16(),
@@ -494,28 +475,6 @@ pub async fn op_fetch_send(
}
#[op]
-pub fn op_fetch_response_into_byte_stream(
- state: &mut OpState,
- rid: ResourceId,
-) -> Result<ResourceId, AnyError> {
- let raw_response = state.resource_table.take::<FetchResponseResource>(rid)?;
- let raw_response = Rc::try_unwrap(raw_response)
- .expect("Someone is holding onto FetchResponseResource");
- let stream: BytesStream =
- Box::pin(raw_response.response.bytes_stream().map(|r| {
- r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
- }));
-
- let rid = state.resource_table.add(FetchResponseBodyResource {
- reader: AsyncRefCell::new(stream.peekable()),
- cancel: CancelHandle::default(),
- size: raw_response.size,
- });
-
- Ok(rid)
-}
-
-#[op]
pub async fn op_fetch_response_upgrade(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
@@ -530,7 +489,7 @@ pub async fn op_fetch_response_upgrade(
let (read, write) = tokio::io::duplex(1024);
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
- let upgraded = raw_response.response.upgrade().await?;
+ let upgraded = raw_response.upgrade().await?;
{
// Stage 3: Pump the data
let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);
@@ -698,35 +657,72 @@ impl Resource for FetchRequestBodyResource {
type BytesStream =
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
+pub enum FetchResponseReader {
+ Start(Response),
+ BodyReader(Peekable<BytesStream>),
+}
+
+impl Default for FetchResponseReader {
+ fn default() -> Self {
+ let stream: BytesStream = Box::pin(deno_core::futures::stream::empty());
+ Self::BodyReader(stream.peekable())
+ }
+}
#[derive(Debug)]
pub struct FetchResponseResource {
- pub response: Response,
+ pub response_reader: AsyncRefCell<FetchResponseReader>,
+ pub cancel: CancelHandle,
pub size: Option<u64>,
}
-impl Resource for FetchResponseResource {
- fn name(&self) -> Cow<str> {
- "fetchResponse".into()
+impl FetchResponseResource {
+ pub fn new(response: Response, size: Option<u64>) -> Self {
+ Self {
+ response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)),
+ cancel: CancelHandle::default(),
+ size,
+ }
}
-}
-pub struct FetchResponseBodyResource {
- pub reader: AsyncRefCell<Peekable<BytesStream>>,
- pub cancel: CancelHandle,
- pub size: Option<u64>,
+ pub async fn upgrade(self) -> Result<reqwest::Upgraded, AnyError> {
+ let reader = self.response_reader.into_inner();
+ match reader {
+ FetchResponseReader::Start(resp) => Ok(resp.upgrade().await?),
+ _ => unreachable!(),
+ }
+ }
}
-impl Resource for FetchResponseBodyResource {
+impl Resource for FetchResponseResource {
fn name(&self) -> Cow<str> {
- "fetchResponseBody".into()
+ "fetchResponse".into()
}
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(async move {
- let reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
+ let mut reader =
+ RcRef::map(&self, |r| &r.response_reader).borrow_mut().await;
+ let body = loop {
+ match &mut *reader {
+ FetchResponseReader::BodyReader(reader) => break reader,
+ FetchResponseReader::Start(_) => {}
+ }
+
+ match std::mem::take(&mut *reader) {
+ FetchResponseReader::Start(resp) => {
+ let stream: BytesStream = Box::pin(resp.bytes_stream().map(|r| {
+ r.map_err(|err| {
+ std::io::Error::new(std::io::ErrorKind::Other, err)
+ })
+ }));
+ *reader = FetchResponseReader::BodyReader(stream.peekable());
+ }
+ FetchResponseReader::BodyReader(_) => unreachable!(),
+ }
+ };
let fut = async move {
- let mut reader = Pin::new(reader);
+ let mut reader = Pin::new(body);
loop {
match reader.as_mut().peek_mut().await {
Some(Ok(chunk)) if !chunk.is_empty() => {