summaryrefslogtreecommitdiff
path: root/ext/http/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r--ext/http/lib.rs110
1 files changed, 53 insertions, 57 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index a8c2810bc..e71d9fae3 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -23,6 +23,8 @@ use deno_core::futures::TryFutureExt;
use deno_core::include_js_files;
use deno_core::op;
use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@@ -333,61 +335,58 @@ impl HttpStreamResource {
}
}
-impl HttpStreamResource {
- async fn read(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
- let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
-
- let body = loop {
- match &mut *rd {
- HttpRequestReader::Headers(_) => {}
- HttpRequestReader::Body(_, body) => break body,
- HttpRequestReader::Closed => return Ok((0, buf)),
- }
- match take(&mut *rd) {
- HttpRequestReader::Headers(request) => {
- let (parts, body) = request.into_parts();
- *rd = HttpRequestReader::Body(parts.headers, body.peekable());
+impl Resource for HttpStreamResource {
+ fn name(&self) -> Cow<str> {
+ "httpStream".into()
+ }
+
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
+ Box::pin(async move {
+ let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
+
+ let body = loop {
+ match &mut *rd {
+ HttpRequestReader::Headers(_) => {}
+ HttpRequestReader::Body(_, body) => break body,
+ HttpRequestReader::Closed => return Ok(BufView::empty()),
}
- _ => unreachable!(),
+ match take(&mut *rd) {
+ HttpRequestReader::Headers(request) => {
+ let (parts, body) = request.into_parts();
+ *rd = HttpRequestReader::Body(parts.headers, body.peekable());
+ }
+ _ => unreachable!(),
+ };
};
- };
- let fut = async {
- let mut body = Pin::new(body);
- loop {
- match body.as_mut().peek_mut().await {
- Some(Ok(chunk)) if !chunk.is_empty() => {
- let len = min(buf.len(), chunk.len());
- buf[..len].copy_from_slice(&chunk.split_to(len));
- break Ok((len, buf));
+ let fut = async {
+ let mut body = Pin::new(body);
+ loop {
+ match body.as_mut().peek_mut().await {
+ Some(Ok(chunk)) if !chunk.is_empty() => {
+ let len = min(limit, chunk.len());
+ let buf = chunk.split_to(len);
+ let view = BufView::from(buf);
+ break Ok(view);
+ }
+ // This unwrap is safe because `peek_mut()` returned `Some`, and thus
+ // currently has a peeked value that can be synchronously returned
+ // from `next()`.
+ //
+ // The future returned from `next()` is always ready, so we can
+ // safely call `await` on it without creating a race condition.
+ Some(_) => match body.as_mut().next().await.unwrap() {
+ Ok(chunk) => assert!(chunk.is_empty()),
+ Err(err) => break Err(AnyError::from(err)),
+ },
+ None => break Ok(BufView::empty()),
}
- Some(_) => match body.as_mut().next().await.unwrap() {
- Ok(chunk) => assert!(chunk.is_empty()),
- Err(err) => break Err(AnyError::from(err)),
- },
- None => break Ok((0, buf)),
}
- }
- };
-
- let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
- fut.try_or_cancel(cancel_handle).await
- }
-}
-
-impl Resource for HttpStreamResource {
- fn name(&self) -> Cow<str> {
- "httpStream".into()
- }
+ };
- fn read_return(
- self: Rc<Self>,
- _buf: ZeroCopyBuf,
- ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> {
- Box::pin(self.read(_buf))
+ let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
+ fut.try_or_cancel(cancel_handle).await
+ })
}
fn close(self: Rc<Self>) {
@@ -763,16 +762,14 @@ async fn op_http_write_resource(
_ => {}
};
- let vec = vec![0u8; 64 * 1024]; // 64KB
- let buf = ZeroCopyBuf::new_temp(vec);
- let (nread, buf) = resource.clone().read_return(buf).await?;
- if nread == 0 {
+ let view = resource.clone().read(64 * 1024).await?; // 64KB
+ if view.is_empty() {
break;
}
match &mut *wr {
HttpResponseWriter::Body(body) => {
- if let Err(err) = body.write_all(&buf[..nread]).await {
+ if let Err(err) = body.write_all(&view).await {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
@@ -782,9 +779,8 @@ async fn op_http_write_resource(
}
}
HttpResponseWriter::BodyUncompressed(body) => {
- let mut buf = buf.to_temp();
- buf.truncate(nread);
- if let Err(err) = body.send_data(Bytes::from(buf)).await {
+ let bytes = Bytes::from(view);
+ if let Err(err) = body.send_data(bytes).await {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
http_stream.conn.closed().await?;