summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/serve_test.ts42
-rw-r--r--ext/http/request_body.rs31
2 files changed, 60 insertions, 13 deletions
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts
index 3f58903a8..76433f1e3 100644
--- a/cli/tests/unit/serve_test.ts
+++ b/cli/tests/unit/serve_test.ts
@@ -1,6 +1,9 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-import { assertMatch } from "../../../test_util/std/testing/asserts.ts";
+import {
+ assertMatch,
+ assertRejects,
+} from "../../../test_util/std/testing/asserts.ts";
import { Buffer, BufReader, BufWriter } from "../../../test_util/std/io/mod.ts";
import { TextProtoReader } from "../testdata/run/textproto.ts";
import {
@@ -879,6 +882,43 @@ Deno.test(
},
);
+Deno.test(
+ { permissions: { net: true } },
+ async function httpServerAbortedRequestBody() {
+ const promise = deferred();
+ const ac = new AbortController();
+ const listeningPromise = deferred();
+
+ const server = Deno.serve({
+ handler: async (request) => {
+ await assertRejects(async () => {
+ await request.text();
+ });
+ promise.resolve();
+ // Not actually used
+ return new Response();
+ },
+ port: servePort,
+ signal: ac.signal,
+ onListen: onListen(listeningPromise),
+ onError: createOnErrorCb(ac),
+ });
+
+ await listeningPromise;
+ const conn = await Deno.connect({ port: servePort });
+ // Send POST request with a body + content-length, but don't send it all
+ const encoder = new TextEncoder();
+ const body =
+ `POST / HTTP/1.1\r\nHost: 127.0.0.1:${servePort}\r\nContent-Length: 10\r\n\r\n12345`;
+ const writeResult = await conn.write(encoder.encode(body));
+ assertEquals(body.length, writeResult);
+ conn.close();
+ await promise;
+ ac.abort();
+ await server.finished;
+ },
+);
+
function createStreamTest(count: number, delay: number, action: string) {
function doAction(controller: ReadableStreamDefaultController, i: number) {
if (i == count) {
diff --git a/ext/http/request_body.rs b/ext/http/request_body.rs
index 73908ca55..0c3f29320 100644
--- a/ext/http/request_body.rs
+++ b/ext/http/request_body.rs
@@ -15,6 +15,8 @@ use hyper1::body::SizeHint;
use std::borrow::Cow;
use std::pin::Pin;
use std::rc::Rc;
+use std::task::ready;
+use std::task::Poll;
/// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8.
struct ReadFuture(Incoming);
@@ -25,21 +27,26 @@ impl Stream for ReadFuture {
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
- let res = Pin::new(&mut self.get_mut().0).poll_frame(cx);
- match res {
- std::task::Poll::Ready(Some(Ok(frame))) => {
- if let Ok(data) = frame.into_data() {
- // Ensure that we never yield an empty frame
- if !data.is_empty() {
- return std::task::Poll::Ready(Some(Ok(data)));
+ ) -> Poll<Option<Self::Item>> {
+ // Loop until we receive a non-empty frame from Hyper
+ let this = self.get_mut();
+ loop {
+ let res = ready!(Pin::new(&mut this.0).poll_frame(cx));
+ break match res {
+ Some(Ok(frame)) => {
+ if let Ok(data) = frame.into_data() {
+ // Ensure that we never yield an empty frame
+ if !data.is_empty() {
+ break Poll::Ready(Some(Ok::<_, AnyError>(data)));
+ }
}
+ // Loop again so we don't lose the waker
+ continue;
}
- }
- std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),
- _ => {}
+ Some(Err(e)) => Poll::Ready(Some(Err(e.into()))),
+ None => Poll::Ready(None),
+ };
}
- std::task::Poll::Pending
}
}