diff options
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 110 |
1 files changed, 53 insertions, 57 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index a8c2810bc..e71d9fae3 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -23,6 +23,8 @@ use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op; use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -333,61 +335,58 @@ impl HttpStreamResource { } } -impl HttpStreamResource { - async fn read( - self: Rc<Self>, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { - let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; - - let body = loop { - match &mut *rd { - HttpRequestReader::Headers(_) => {} - HttpRequestReader::Body(_, body) => break body, - HttpRequestReader::Closed => return Ok((0, buf)), - } - match take(&mut *rd) { - HttpRequestReader::Headers(request) => { - let (parts, body) = request.into_parts(); - *rd = HttpRequestReader::Body(parts.headers, body.peekable()); +impl Resource for HttpStreamResource { + fn name(&self) -> Cow<str> { + "httpStream".into() + } + + fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { + Box::pin(async move { + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + + let body = loop { + match &mut *rd { + HttpRequestReader::Headers(_) => {} + HttpRequestReader::Body(_, body) => break body, + HttpRequestReader::Closed => return Ok(BufView::empty()), } - _ => unreachable!(), + match take(&mut *rd) { + HttpRequestReader::Headers(request) => { + let (parts, body) = request.into_parts(); + *rd = HttpRequestReader::Body(parts.headers, body.peekable()); + } + _ => unreachable!(), + }; }; - }; - let fut = async { - let mut body = Pin::new(body); - loop { - match body.as_mut().peek_mut().await { - Some(Ok(chunk)) if !chunk.is_empty() => { - let len = min(buf.len(), chunk.len()); - buf[..len].copy_from_slice(&chunk.split_to(len)); - break Ok((len, buf)); + let fut = async { + let mut body = Pin::new(body); + loop { + match body.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(limit, chunk.len()); + let buf = chunk.split_to(len); + let view = BufView::from(buf); + break Ok(view); + } + // This unwrap is safe because `peek_mut()` returned `Some`, and thus + // currently has a peeked value that can be synchronously returned + // from `next()`. + // + // The future returned from `next()` is always ready, so we can + // safely call `await` on it without creating a race condition. + Some(_) => match body.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok(BufView::empty()), } - Some(_) => match body.as_mut().next().await.unwrap() { - Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), - }, - None => break Ok((0, buf)), } - } - }; - - let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); - fut.try_or_cancel(cancel_handle).await - } -} - -impl Resource for HttpStreamResource { - fn name(&self) -> Cow<str> { - "httpStream".into() - } + }; - fn read_return( - self: Rc<Self>, - _buf: ZeroCopyBuf, - ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(_buf)) + let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); + fut.try_or_cancel(cancel_handle).await + }) } fn close(self: Rc<Self>) { @@ -763,16 +762,14 @@ async fn op_http_write_resource( _ => {} }; - let vec = vec![0u8; 64 * 1024]; // 64KB - let buf = ZeroCopyBuf::new_temp(vec); - let (nread, buf) = resource.clone().read_return(buf).await?; - if nread == 0 { + let view = resource.clone().read(64 * 1024).await?; // 64KB + if view.is_empty() { break; } match &mut *wr { HttpResponseWriter::Body(body) => { - if let Err(err) = body.write_all(&buf[..nread]).await { + if let Err(err) = body.write_all(&view).await { assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); // Don't return "broken pipe", that's an implementation detail. // Pull up the failure associated with the transport connection instead. @@ -782,9 +779,8 @@ async fn op_http_write_resource( } } HttpResponseWriter::BodyUncompressed(body) => { - let mut buf = buf.to_temp(); - buf.truncate(nread); - if let Err(err) = body.send_data(Bytes::from(buf)).await { + let bytes = Bytes::from(view); + if let Err(err) = body.send_data(bytes).await { assert!(err.is_closed()); // Pull up the failure associated with the transport connection instead. http_stream.conn.closed().await?; |