summaryrefslogtreecommitdiff
path: root/cli/http_util.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-12-31 15:09:58 +0100
committerGitHub <noreply@github.com>2019-12-31 15:09:58 +0100
commit4258ed262f6eed9b0ee123e1ba9c91f999f0b429 (patch)
tree0136ceed29436bbbf499971c74b57ac722d3958d /cli/http_util.rs
parent05dce9016500f8cb102acb7ad68c184aa1047dae (diff)
refactor: move HttpBody to cli/http_util.rs (#3569)
Diffstat (limited to 'cli/http_util.rs')
-rw-r--r--cli/http_util.rs84
1 files changed, 84 insertions, 0 deletions
diff --git a/cli/http_util.rs b/cli/http_util.rs
index 83aaadd1e..4a925e3d9 100644
--- a/cli/http_util.rs
+++ b/cli/http_util.rs
@@ -2,6 +2,7 @@
use crate::deno_error;
use crate::deno_error::DenoError;
use crate::version;
+use bytes::Bytes;
use deno::ErrBox;
use futures::future::FutureExt;
use reqwest;
@@ -11,7 +12,14 @@ use reqwest::header::LOCATION;
use reqwest::header::USER_AGENT;
use reqwest::redirect::Policy;
use reqwest::Client;
+use reqwest::Response;
+use std::cmp::min;
use std::future::Future;
+use std::io;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+use tokio::io::AsyncRead;
use url::Url;
lazy_static! {
@@ -119,6 +127,82 @@ pub fn fetch_string_once(
fut.boxed()
}
+/// Wraps reqwest `Response` so that it can be exposed as an `AsyncRead` and integrated
+/// into resources more easily.
+pub struct HttpBody {
+ response: Response,
+ chunk: Option<Bytes>,
+ pos: usize,
+}
+
+impl HttpBody {
+ pub fn from(body: Response) -> Self {
+ Self {
+ response: body,
+ chunk: None,
+ pos: 0,
+ }
+ }
+}
+
+impl AsyncRead for HttpBody {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ let mut inner = self.get_mut();
+ if let Some(chunk) = inner.chunk.take() {
+ debug!(
+ "HttpBody Fake Read buf {} chunk {} pos {}",
+ buf.len(),
+ chunk.len(),
+ inner.pos
+ );
+ let n = min(buf.len(), chunk.len() - inner.pos);
+ {
+ let rest = &chunk[inner.pos..];
+ buf[..n].clone_from_slice(&rest[..n]);
+ }
+ inner.pos += n;
+ if inner.pos == chunk.len() {
+ inner.pos = 0;
+ } else {
+ inner.chunk = Some(chunk);
+ }
+ return Poll::Ready(Ok(n));
+ } else {
+ assert_eq!(inner.pos, 0);
+ }
+
+ let chunk_future = &mut inner.response.chunk();
+ // Safety: `chunk_future` lives only for duration of this poll. So, it doesn't move.
+ let chunk_future = unsafe { Pin::new_unchecked(chunk_future) };
+ match chunk_future.poll(cx) {
+ Poll::Ready(Err(e)) => {
+ Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e)))
+ }
+ Poll::Ready(Ok(Some(chunk))) => {
+ debug!(
+ "HttpBody Real Read buf {} chunk {} pos {}",
+ buf.len(),
+ chunk.len(),
+ inner.pos
+ );
+ let n = min(buf.len(), chunk.len());
+ buf[..n].clone_from_slice(&chunk[..n]);
+ if buf.len() < chunk.len() {
+ inner.pos = n;
+ inner.chunk = Some(chunk);
+ }
+ Poll::Ready(Ok(n))
+ }
+ Poll::Ready(Ok(None)) => Poll::Ready(Ok(0)),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;