summaryrefslogtreecommitdiff
path: root/ext/http/response_body.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-08-20 19:35:26 -0600
committerGitHub <noreply@github.com>2023-08-21 01:35:26 +0000
commit576d0db372c3f4c9b01caecdbe2360a73de6d36d (patch)
tree7b083558f91a156181f870ae4c039cbcf8cd5ff8 /ext/http/response_body.rs
parentefdf0bbd9b76b8b1b5d0374a63358192534f22ad (diff)
fix(ext/http): ensure request body resource lives as long as response is alive (#20206)
Deno.serve's fast streaming implementation was not keeping the request body resource ID alive. We were taking the `Rc<Resource>` from the resource table during the response, so a hairpin duplex response that fed back the request body would work. However, if any JS code attempted to read from the request body (which requires the resource ID to be valid), the response would fail with a difficult-to-diagnose "EOF" error. This was affecting more complex duplex uses of `Deno.fetch` (though as far as I can tell was unreported). Simple test: ```ts const reader = request.body.getReader(); return new Response( new ReadableStream({ async pull(controller) { const { done, value } = await reader.read(); if (done) { controller.close(); } else { controller.enqueue(value); } }, }), ``` And then attempt to use the stream in duplex mode: ```ts async function testDuplex( reader: ReadableStreamDefaultReader<Uint8Array>, writable: WritableStreamDefaultWriter<Uint8Array>, ) { await writable.write(new Uint8Array([1])); const chunk1 = await reader.read(); assert(!chunk1.done); assertEquals(chunk1.value, new Uint8Array([1])); await writable.write(new Uint8Array([2])); const chunk2 = await reader.read(); assert(!chunk2.done); assertEquals(chunk2.value, new Uint8Array([2])); await writable.close(); const chunk3 = await reader.read(); assert(chunk3.done); } ``` In older versions of Deno, this would just lock up. I believe after 23ff0e722e3c4b0827940853c53c5ee2ede5ec9f, it started throwing a more explicit error: ``` httpServerStreamDuplexJavascript => ./cli/tests/unit/serve_test.ts:1339:6 error: TypeError: request or response body error: error reading a body from connection: Connection reset by peer (os error 54) at async Object.pull (ext:deno_web/06_streams.js:810:27) ```
Diffstat (limited to 'ext/http/response_body.rs')
-rw-r--r--ext/http/response_body.rs52
1 files changed, 31 insertions, 21 deletions
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index bd9d6f433..f88f13f88 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -23,6 +23,8 @@ use hyper1::body::Frame;
use hyper1::body::SizeHint;
use pin_project::pin_project;
+use crate::slab::HttpRequestBodyAutocloser;
+
/// Simplification for nested types we use for our streams. We provide a way to convert from
/// this type into Hyper's body [`Frame`].
enum ResponseStreamResult {
@@ -156,34 +158,40 @@ impl std::fmt::Debug for ResponseBytesInner {
/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
/// required by hyper. As the API requires information about request completion (including a success/fail
/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
-#[derive(Debug, Default)]
-pub struct ResponseBytes(
- ResponseBytesInner,
- CompletionHandle,
- Rc<RefCell<Option<HeaderMap>>>,
-);
+#[derive(Default)]
+pub struct ResponseBytes {
+ inner: ResponseBytesInner,
+ completion_handle: CompletionHandle,
+ headers: Rc<RefCell<Option<HeaderMap>>>,
+ res: Option<HttpRequestBodyAutocloser>,
+}
impl ResponseBytes {
- pub fn initialize(&mut self, inner: ResponseBytesInner) {
- debug_assert!(matches!(self.0, ResponseBytesInner::Empty));
- self.0 = inner;
+ pub fn initialize(
+ &mut self,
+ inner: ResponseBytesInner,
+ req_body_resource: Option<HttpRequestBodyAutocloser>,
+ ) {
+ debug_assert!(matches!(self.inner, ResponseBytesInner::Empty));
+ self.inner = inner;
+ self.res = req_body_resource;
}
pub fn completion_handle(&self) -> CompletionHandle {
- self.1.clone()
+ self.completion_handle.clone()
}
pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> {
- self.2.clone()
+ self.headers.clone()
}
fn complete(&mut self, success: bool) -> ResponseBytesInner {
- if matches!(self.0, ResponseBytesInner::Done) {
+ if matches!(self.inner, ResponseBytesInner::Done) {
return ResponseBytesInner::Done;
}
- let current = std::mem::replace(&mut self.0, ResponseBytesInner::Done);
- self.1.complete(success);
+ let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done);
+ self.completion_handle.complete(success);
current
}
}
@@ -274,9 +282,9 @@ impl Body for ResponseBytes {
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let res = loop {
- let res = match &mut self.0 {
+ let res = match &mut self.inner {
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
- if let Some(trailers) = self.2.borrow_mut().take() {
+ if let Some(trailers) = self.headers.borrow_mut().take() {
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
unreachable!()
@@ -303,7 +311,7 @@ impl Body for ResponseBytes {
};
if matches!(res, ResponseStreamResult::EndOfStream) {
- if let Some(trailers) = self.2.borrow_mut().take() {
+ if let Some(trailers) = self.headers.borrow_mut().take() {
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
self.complete(true);
@@ -312,21 +320,23 @@ impl Body for ResponseBytes {
}
fn is_end_stream(&self) -> bool {
- matches!(self.0, ResponseBytesInner::Done | ResponseBytesInner::Empty)
- && self.2.borrow_mut().is_none()
+ matches!(
+ self.inner,
+ ResponseBytesInner::Done | ResponseBytesInner::Empty
+ ) && self.headers.borrow_mut().is_none()
}
fn size_hint(&self) -> SizeHint {
// The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
// anyways just in case hyper needs it.
- self.0.size_hint()
+ self.inner.size_hint()
}
}
impl Drop for ResponseBytes {
fn drop(&mut self) {
// We won't actually poll_frame for Empty responses so this is where we return success
- self.complete(matches!(self.0, ResponseBytesInner::Empty));
+ self.complete(matches!(self.inner, ResponseBytesInner::Empty));
}
}