diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-12-31 15:09:58 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-31 15:09:58 +0100 |
commit | 4258ed262f6eed9b0ee123e1ba9c91f999f0b429 (patch) | |
tree | 0136ceed29436bbbf499971c74b57ac722d3958d /cli/http_util.rs | |
parent | 05dce9016500f8cb102acb7ad68c184aa1047dae (diff) |
refactor: move HttpBody to cli/http_util.rs (#3569)
Diffstat (limited to 'cli/http_util.rs')
-rw-r--r-- | cli/http_util.rs | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/cli/http_util.rs b/cli/http_util.rs index 83aaadd1e..4a925e3d9 100644 --- a/cli/http_util.rs +++ b/cli/http_util.rs @@ -2,6 +2,7 @@ use crate::deno_error; use crate::deno_error::DenoError; use crate::version; +use bytes::Bytes; use deno::ErrBox; use futures::future::FutureExt; use reqwest; @@ -11,7 +12,14 @@ use reqwest::header::LOCATION; use reqwest::header::USER_AGENT; use reqwest::redirect::Policy; use reqwest::Client; +use reqwest::Response; +use std::cmp::min; use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncRead; use url::Url; lazy_static! { @@ -119,6 +127,82 @@ pub fn fetch_string_once( fut.boxed() } +/// Wraps reqwest `Response` so that it can be exposed as an `AsyncRead` and integrated +/// into resources more easily. +pub struct HttpBody { + response: Response, + chunk: Option<Bytes>, + pos: usize, +} + +impl HttpBody { + pub fn from(body: Response) -> Self { + Self { + response: body, + chunk: None, + pos: 0, + } + } +} + +impl AsyncRead for HttpBody { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll<Result<usize, io::Error>> { + let mut inner = self.get_mut(); + if let Some(chunk) = inner.chunk.take() { + debug!( + "HttpBody Fake Read buf {} chunk {} pos {}", + buf.len(), + chunk.len(), + inner.pos + ); + let n = min(buf.len(), chunk.len() - inner.pos); + { + let rest = &chunk[inner.pos..]; + buf[..n].clone_from_slice(&rest[..n]); + } + inner.pos += n; + if inner.pos == chunk.len() { + inner.pos = 0; + } else { + inner.chunk = Some(chunk); + } + return Poll::Ready(Ok(n)); + } else { + assert_eq!(inner.pos, 0); + } + + let chunk_future = &mut inner.response.chunk(); + // Safety: `chunk_future` lives only for duration of this poll. So, it doesn't move. + let chunk_future = unsafe { Pin::new_unchecked(chunk_future) }; + match chunk_future.poll(cx) { + Poll::Ready(Err(e)) => { + Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))) + } + Poll::Ready(Ok(Some(chunk))) => { + debug!( + "HttpBody Real Read buf {} chunk {} pos {}", + buf.len(), + chunk.len(), + inner.pos + ); + let n = min(buf.len(), chunk.len()); + buf[..n].clone_from_slice(&chunk[..n]); + if buf.len() < chunk.len() { + inner.pos = n; + inner.chunk = Some(chunk); + } + Poll::Ready(Ok(n)) + } + Poll::Ready(Ok(None)) => Poll::Ready(Ok(0)), + Poll::Pending => Poll::Pending, + } + } +} + #[cfg(test)] mod tests { use super::*; |