summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/serve_test.ts86
-rw-r--r--ext/http/00_serve.js4
-rw-r--r--ext/http/http_next.rs21
-rw-r--r--ext/http/response_body.rs52
-rw-r--r--ext/http/slab.rs62
5 files changed, 172 insertions, 53 deletions
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts
index f0a5b430b..f412a9071 100644
--- a/cli/tests/unit/serve_test.ts
+++ b/cli/tests/unit/serve_test.ts
@@ -721,7 +721,7 @@ function createStreamTest(count: number, delay: number, action: string) {
}
for (const count of [0, 1, 2, 3]) {
- for (const delay of [0, 1, 1000]) {
+ for (const delay of [0, 1, 25]) {
// Creating a stream that errors in start will throw
if (delay > 0) {
createStreamTest(count, delay, "Throw");
@@ -1288,45 +1288,91 @@ Deno.test(
},
);
+async function testDuplex(
+ reader: ReadableStreamDefaultReader<Uint8Array>,
+ writable: WritableStreamDefaultWriter<Uint8Array>,
+) {
+ await writable.write(new Uint8Array([1]));
+ const chunk1 = await reader.read();
+ assert(!chunk1.done);
+ assertEquals(chunk1.value, new Uint8Array([1]));
+ await writable.write(new Uint8Array([2]));
+ const chunk2 = await reader.read();
+ assert(!chunk2.done);
+ assertEquals(chunk2.value, new Uint8Array([2]));
+ await writable.close();
+ const chunk3 = await reader.read();
+ assert(chunk3.done);
+}
+
Deno.test(
{ permissions: { net: true } },
- async function httpServerStreamDuplex() {
+ async function httpServerStreamDuplexDirect() {
const promise = deferred();
const ac = new AbortController();
const server = Deno.serve(
{ port: servePort, signal: ac.signal },
- (request) => {
+ (request: Request) => {
assert(request.body);
-
promise.resolve();
return new Response(request.body);
},
);
- const ts = new TransformStream();
- const writable = ts.writable.getWriter();
-
+ const { readable, writable } = new TransformStream();
const resp = await fetch(`http://127.0.0.1:${servePort}/`, {
method: "POST",
- body: ts.readable,
+ body: readable,
});
await promise;
assert(resp.body);
- const reader = resp.body.getReader();
- await writable.write(new Uint8Array([1]));
- const chunk1 = await reader.read();
- assert(!chunk1.done);
- assertEquals(chunk1.value, new Uint8Array([1]));
- await writable.write(new Uint8Array([2]));
- const chunk2 = await reader.read();
- assert(!chunk2.done);
- assertEquals(chunk2.value, new Uint8Array([2]));
- await writable.close();
- const chunk3 = await reader.read();
- assert(chunk3.done);
+ await testDuplex(resp.body.getReader(), writable.getWriter());
+ ac.abort();
+ await server.finished;
+ },
+);
+
+// Test that a duplex stream passing through JavaScript also works (ie: that the request body resource
+// is still alive). https://github.com/denoland/deno/pull/20206
+Deno.test(
+ { permissions: { net: true } },
+ async function httpServerStreamDuplexJavascript() {
+ const promise = deferred();
+ const ac = new AbortController();
+
+ const server = Deno.serve(
+ { port: servePort, signal: ac.signal },
+ (request: Request) => {
+ assert(request.body);
+ promise.resolve();
+ const reader = request.body.getReader();
+ return new Response(
+ new ReadableStream({
+ async pull(controller) {
+ await new Promise((r) => setTimeout(r, 100));
+ const { done, value } = await reader.read();
+ if (done) {
+ controller.close();
+ } else {
+ controller.enqueue(value);
+ }
+ },
+ }),
+ );
+ },
+ );
+ const { readable, writable } = new TransformStream();
+ const resp = await fetch(`http://127.0.0.1:${servePort}/`, {
+ method: "POST",
+ body: readable,
+ });
+
+ await promise;
+ assert(resp.body);
+ await testDuplex(resp.body.getReader(), writable.getWriter());
ac.abort();
await server.finished;
},
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index 265b79706..7307ab2d8 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -133,10 +133,6 @@ class InnerRequest {
}
close() {
- if (this.#streamRid !== undefined) {
- core.close(this.#streamRid);
- this.#streamRid = undefined;
- }
this.#slabId = undefined;
}
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 60ef83b0f..17e9befe2 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -14,6 +14,7 @@ use crate::slab::slab_drop;
use crate::slab::slab_get;
use crate::slab::slab_init;
use crate::slab::slab_insert;
+use crate::slab::HttpRequestBodyAutocloser;
use crate::slab::SlabId;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
@@ -376,13 +377,20 @@ pub fn op_http_get_request_headers<'scope>(
#[op(fast)]
pub fn op_http_read_request_body(
- state: &mut OpState,
+ state: Rc<RefCell<OpState>>,
slab_id: SlabId,
) -> ResourceId {
let mut http = slab_get(slab_id);
- let incoming = http.take_body();
- let body_resource = Rc::new(HttpRequestBody::new(incoming));
- state.resource_table.add_rc(body_resource)
+ let rid = if let Some(incoming) = http.take_body() {
+ let body_resource = Rc::new(HttpRequestBody::new(incoming));
+ state.borrow_mut().resource_table.add_rc(body_resource)
+ } else {
+ // This should not be possible, but rather than panicking we'll return an invalid
+ // resource value to JavaScript.
+ ResourceId::MAX
+ };
+ http.put_resource(HttpRequestBodyAutocloser::new(rid, state.clone()));
+ rid
}
#[op2(fast)]
@@ -577,6 +585,7 @@ fn set_response(
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
let mut http = slab_get(slab_id);
+ let resource = http.take_resource();
let compression = is_request_compressible(&http.request_parts().headers);
let response = http.response();
let compression = modify_compressibility_from_response(
@@ -584,7 +593,9 @@ fn set_response(
length,
response.headers_mut(),
);
- response.body_mut().initialize(response_fn(compression));
+ response
+ .body_mut()
+ .initialize(response_fn(compression), resource);
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index bd9d6f433..f88f13f88 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -23,6 +23,8 @@ use hyper1::body::Frame;
use hyper1::body::SizeHint;
use pin_project::pin_project;
+use crate::slab::HttpRequestBodyAutocloser;
+
/// Simplification for nested types we use for our streams. We provide a way to convert from
/// this type into Hyper's body [`Frame`].
enum ResponseStreamResult {
@@ -156,34 +158,40 @@ impl std::fmt::Debug for ResponseBytesInner {
/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
/// required by hyper. As the API requires information about request completion (including a success/fail
/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
-#[derive(Debug, Default)]
-pub struct ResponseBytes(
- ResponseBytesInner,
- CompletionHandle,
- Rc<RefCell<Option<HeaderMap>>>,
-);
+#[derive(Default)]
+pub struct ResponseBytes {
+ inner: ResponseBytesInner,
+ completion_handle: CompletionHandle,
+ headers: Rc<RefCell<Option<HeaderMap>>>,
+ res: Option<HttpRequestBodyAutocloser>,
+}
impl ResponseBytes {
- pub fn initialize(&mut self, inner: ResponseBytesInner) {
- debug_assert!(matches!(self.0, ResponseBytesInner::Empty));
- self.0 = inner;
+ pub fn initialize(
+ &mut self,
+ inner: ResponseBytesInner,
+ req_body_resource: Option<HttpRequestBodyAutocloser>,
+ ) {
+ debug_assert!(matches!(self.inner, ResponseBytesInner::Empty));
+ self.inner = inner;
+ self.res = req_body_resource;
}
pub fn completion_handle(&self) -> CompletionHandle {
- self.1.clone()
+ self.completion_handle.clone()
}
pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> {
- self.2.clone()
+ self.headers.clone()
}
fn complete(&mut self, success: bool) -> ResponseBytesInner {
- if matches!(self.0, ResponseBytesInner::Done) {
+ if matches!(self.inner, ResponseBytesInner::Done) {
return ResponseBytesInner::Done;
}
- let current = std::mem::replace(&mut self.0, ResponseBytesInner::Done);
- self.1.complete(success);
+ let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done);
+ self.completion_handle.complete(success);
current
}
}
@@ -274,9 +282,9 @@ impl Body for ResponseBytes {
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let res = loop {
- let res = match &mut self.0 {
+ let res = match &mut self.inner {
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
- if let Some(trailers) = self.2.borrow_mut().take() {
+ if let Some(trailers) = self.headers.borrow_mut().take() {
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
unreachable!()
@@ -303,7 +311,7 @@ impl Body for ResponseBytes {
};
if matches!(res, ResponseStreamResult::EndOfStream) {
- if let Some(trailers) = self.2.borrow_mut().take() {
+ if let Some(trailers) = self.headers.borrow_mut().take() {
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
self.complete(true);
@@ -312,21 +320,23 @@ impl Body for ResponseBytes {
}
fn is_end_stream(&self) -> bool {
- matches!(self.0, ResponseBytesInner::Done | ResponseBytesInner::Empty)
- && self.2.borrow_mut().is_none()
+ matches!(
+ self.inner,
+ ResponseBytesInner::Done | ResponseBytesInner::Empty
+ ) && self.headers.borrow_mut().is_none()
}
fn size_hint(&self) -> SizeHint {
// The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
// anyways just in case hyper needs it.
- self.0.size_hint()
+ self.inner.size_hint()
}
}
impl Drop for ResponseBytes {
fn drop(&mut self) {
// We won't actually poll_frame for Empty responses so this is where we return success
- self.complete(matches!(self.0, ResponseBytesInner::Empty));
+ self.complete(matches!(self.inner, ResponseBytesInner::Empty));
}
}
diff --git a/ext/http/slab.rs b/ext/http/slab.rs
index dbe1a6635..8c285c860 100644
--- a/ext/http/slab.rs
+++ b/ext/http/slab.rs
@@ -3,6 +3,8 @@ use crate::request_properties::HttpConnectionProperties;
use crate::response_body::CompletionHandle;
use crate::response_body::ResponseBytes;
use deno_core::error::AnyError;
+use deno_core::OpState;
+use deno_core::ResourceId;
use http::request::Parts;
use http::HeaderMap;
use hyper1::body::Incoming;
@@ -18,10 +20,36 @@ pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
pub type SlabId = u32;
+enum RequestBodyState {
+ Incoming(Incoming),
+ Resource(HttpRequestBodyAutocloser),
+}
+
+impl From<Incoming> for RequestBodyState {
+ fn from(value: Incoming) -> Self {
+ RequestBodyState::Incoming(value)
+ }
+}
+
+/// Ensures that the request body closes itself when no longer needed.
+pub struct HttpRequestBodyAutocloser(ResourceId, Rc<RefCell<OpState>>);
+
+impl HttpRequestBodyAutocloser {
+ pub fn new(res: ResourceId, op_state: Rc<RefCell<OpState>>) -> Self {
+ Self(res, op_state)
+ }
+}
+
+impl Drop for HttpRequestBodyAutocloser {
+ fn drop(&mut self) {
+ _ = self.1.borrow_mut().resource_table.close(self.0);
+ }
+}
+
pub struct HttpSlabRecord {
request_info: HttpConnectionProperties,
request_parts: Parts,
- request_body: Option<Incoming>,
+ request_body: Option<RequestBodyState>,
// The response may get taken before we tear this down
response: Option<Response>,
promise: CompletionHandle,
@@ -98,6 +126,7 @@ fn slab_insert_raw(
let mut slab = slab.borrow_mut();
let body = ResponseBytes::default();
let trailers = body.trailers();
+ let request_body = request_body.map(|r| r.into());
slab.insert(HttpSlabRecord {
request_info,
request_parts,
@@ -174,8 +203,35 @@ impl SlabEntry {
}
/// Take the Hyper body from this entry.
- pub fn take_body(&mut self) -> Incoming {
- self.self_mut().request_body.take().unwrap()
+ pub fn take_body(&mut self) -> Option<Incoming> {
+ let body_holder = &mut self.self_mut().request_body;
+ let body = body_holder.take();
+ match body {
+ Some(RequestBodyState::Incoming(body)) => Some(body),
+ x => {
+ *body_holder = x;
+ None
+ }
+ }
+ }
+
+ pub fn take_resource(&mut self) -> Option<HttpRequestBodyAutocloser> {
+ let body_holder = &mut self.self_mut().request_body;
+ let body = body_holder.take();
+ match body {
+ Some(RequestBodyState::Resource(res)) => Some(res),
+ x => {
+ *body_holder = x;
+ None
+ }
+ }
+ }
+
+ /// Replace the request body with a resource ID and the OpState we'll need to shut it down.
+ /// We cannot keep just the resource itself, as JS code might be reading from the resource ID
+ /// to generate the response data (requiring us to keep it in the resource table).
+ pub fn put_resource(&mut self, res: HttpRequestBodyAutocloser) {
+ self.self_mut().request_body = Some(RequestBodyState::Resource(res));
}
/// Complete this entry, potentially expunging it if it is complete.