diff options
Diffstat (limited to 'cli/http_body.rs')
-rw-r--r-- | cli/http_body.rs | 80 |
1 files changed, 42 insertions, 38 deletions
diff --git a/cli/http_body.rs b/cli/http_body.rs index c03dfd637..72ec8017e 100644 --- a/cli/http_body.rs +++ b/cli/http_body.rs @@ -1,19 +1,20 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use futures::stream::Stream; -use futures::Async; -use futures::Poll; +use futures::io::AsyncRead; +use futures::stream::StreamExt; use reqwest::r#async::Chunk; use reqwest::r#async::Decoder; use std::cmp::min; use std::io; use std::io::Read; -use tokio::io::AsyncRead; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Wraps `reqwest::Decoder` so that it can be exposed as an `AsyncRead` and integrated /// into resources more easily. pub struct HttpBody { - decoder: Decoder, + decoder: futures::compat::Compat01As03<Decoder>, chunk: Option<Chunk>, pos: usize, } @@ -21,7 +22,7 @@ pub struct HttpBody { impl HttpBody { pub fn from(body: Decoder) -> Self { Self { - decoder: body, + decoder: futures::compat::Compat01As03::new(body), chunk: None, pos: 0, } @@ -35,55 +36,58 @@ impl Read for HttpBody { } impl AsyncRead for HttpBody { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> { - if let Some(chunk) = self.chunk.take() { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll<Result<usize, io::Error>> { + let mut inner = self.get_mut(); + if let Some(chunk) = inner.chunk.take() { debug!( "HttpBody Fake Read buf {} chunk {} pos {}", buf.len(), chunk.len(), - self.pos + inner.pos ); - let n = min(buf.len(), chunk.len() - self.pos); + let n = min(buf.len(), chunk.len() - inner.pos); { - let rest = &chunk[self.pos..]; + let rest = &chunk[inner.pos..]; buf[..n].clone_from_slice(&rest[..n]); } - self.pos += n; - if self.pos == chunk.len() { - self.pos = 0; + inner.pos += n; + if inner.pos == chunk.len() { + inner.pos = 0; } else { - self.chunk = Some(chunk); + inner.chunk = Some(chunk); } - return Ok(Async::Ready(n)); + return Poll::Ready(Ok(n)); } else { - assert_eq!(self.pos, 0); + assert_eq!(inner.pos, 0); } - let p = self.decoder.poll(); + let p = inner.decoder.poll_next_unpin(cx); match p { - Err(e) => Err( + Poll::Ready(Some(Err(e))) => Poll::Ready(Err( // TODO Need to map hyper::Error into std::io::Error. io::Error::new(io::ErrorKind::Other, e), - ), - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(maybe_chunk)) => match maybe_chunk { - None => Ok(Async::Ready(0)), - Some(chunk) => { - debug!( - "HttpBody Real Read buf {} chunk {} pos {}", - buf.len(), - chunk.len(), - self.pos - ); - let n = min(buf.len(), chunk.len()); - buf[..n].clone_from_slice(&chunk[..n]); - if buf.len() < chunk.len() { - self.pos = n; - self.chunk = Some(chunk); - } - Ok(Async::Ready(n)) + )), + Poll::Ready(Some(Ok(chunk))) => { + debug!( + "HttpBody Real Read buf {} chunk {} pos {}", + buf.len(), + chunk.len(), + inner.pos + ); + let n = min(buf.len(), chunk.len()); + buf[..n].clone_from_slice(&chunk[..n]); + if buf.len() < chunk.len() { + inner.pos = n; + inner.chunk = Some(chunk); } - }, + Poll::Ready(Ok(n)) + } + Poll::Ready(None) => Poll::Ready(Ok(0)), + Poll::Pending => Poll::Pending, } } } |