summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--cli/tests/unit/http_test.ts123
-rw-r--r--ext/http/01_http.js15
-rw-r--r--ext/http/Cargo.toml1
-rw-r--r--ext/http/lib.rs100
-rw-r--r--ext/http/reader_stream.rs157
6 files changed, 369 insertions, 28 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 6ae7e6b81..665a0901e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1095,6 +1095,7 @@ dependencies = [
"mime",
"percent-encoding",
"phf",
+ "pin-project",
"ring",
"serde",
"tokio",
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts
index 347551362..73bf07b68 100644
--- a/cli/tests/unit/http_test.ts
+++ b/cli/tests/unit/http_test.ts
@@ -2614,6 +2614,129 @@ Deno.test({
},
});
+async function httpServerWithErrorBody(
+ listener: Deno.Listener,
+ compression: boolean,
+): Promise<Deno.HttpConn> {
+ const conn = await listener.accept();
+ listener.close();
+ const httpConn = Deno.serveHttp(conn);
+ const e = await httpConn.nextRequest();
+ assert(e);
+ const { respondWith } = e;
+ const originalErr = new Error("boom");
+ const rs = new ReadableStream({
+ async start(controller) {
+ controller.enqueue(new Uint8Array([65]));
+ await delay(1000);
+ controller.error(originalErr);
+ },
+ });
+ const init = compression ? { headers: { "content-type": "text/plain" } } : {};
+ const response = new Response(rs, init);
+ const err = await assertRejects(() => respondWith(response));
+ assert(err === originalErr);
+ return httpConn;
+}
+
+for (const compression of [true, false]) {
+ Deno.test({
+ name: `http server errors stream if response body errors (http/1.1${
+ compression ? " + compression" : ""
+ })`,
+ permissions: { net: true },
+ async fn() {
+ const hostname = "localhost";
+ const port = 4501;
+
+ const listener = Deno.listen({ hostname, port });
+ const server = httpServerWithErrorBody(listener, compression);
+
+ const conn = await Deno.connect({ hostname, port });
+ const msg = new TextEncoder().encode(
+ `GET / HTTP/1.1\r\nHost: ${hostname}:${port}\r\n\r\n`,
+ );
+ const nwritten = await conn.write(msg);
+ assertEquals(nwritten, msg.byteLength);
+
+ const buf = new Uint8Array(1024);
+ const nread = await conn.read(buf);
+ assert(nread);
+ const data = new TextDecoder().decode(buf.subarray(0, nread));
+ assert(data.endsWith("1\r\nA\r\n"));
+ const nread2 = await conn.read(buf); // connection should be closed now because the stream errored
+ assertEquals(nread2, null);
+ conn.close();
+
+ const httpConn = await server;
+ httpConn.close();
+ },
+ });
+
+ Deno.test({
+ name: `http server errors stream if response body errors (http/1.1 + fetch${
+ compression ? " + compression" : ""
+ })`,
+ permissions: { net: true },
+ async fn() {
+ const hostname = "localhost";
+ const port = 4501;
+
+ const listener = Deno.listen({ hostname, port });
+ const server = httpServerWithErrorBody(listener, compression);
+
+ const resp = await fetch(`http://${hostname}:${port}/`);
+ assert(resp.body);
+ const reader = resp.body.getReader();
+ const result = await reader.read();
+ assert(!result.done);
+ assertEquals(result.value, new Uint8Array([65]));
+ const err = await assertRejects(() => reader.read());
+ assert(err instanceof TypeError);
+ assert(err.message.includes("unexpected EOF"));
+
+ const httpConn = await server;
+ httpConn.close();
+ },
+ });
+
+ Deno.test({
+ name: `http server errors stream if response body errors (http/2 + fetch${
+ compression ? " + compression" : ""
+ }))`,
+ permissions: { net: true, read: true },
+ async fn() {
+ const hostname = "localhost";
+ const port = 4501;
+
+ const listener = Deno.listenTls({
+ hostname,
+ port,
+ certFile: "cli/tests/testdata/tls/localhost.crt",
+ keyFile: "cli/tests/testdata/tls/localhost.key",
+ alpnProtocols: ["h2"],
+ });
+ const server = httpServerWithErrorBody(listener, compression);
+
+ const caCert = Deno.readTextFileSync("cli/tests/testdata/tls/RootCA.pem");
+ const client = Deno.createHttpClient({ caCerts: [caCert] });
+ const resp = await fetch(`https://${hostname}:${port}/`, { client });
+ client.close();
+ assert(resp.body);
+ const reader = resp.body.getReader();
+ const result = await reader.read();
+ assert(!result.done);
+ assertEquals(result.value, new Uint8Array([65]));
+ const err = await assertRejects(() => reader.read());
+ assert(err instanceof TypeError);
+ assert(err.message.includes("unexpected internal error encountered"));
+
+ const httpConn = await server;
+ httpConn.close();
+ },
+ });
+}
+
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index bd740b600..dfb0f206c 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -263,6 +263,7 @@
}
if (isStreamingResponseBody) {
+ let success = false;
if (
respBody === null ||
!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody)
@@ -284,6 +285,7 @@
);
if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid);
readableStreamClose(respBody); // Release JS lock.
+ success = true;
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (
@@ -320,13 +322,16 @@
throw error;
}
}
+ success = true;
}
- try {
- await core.opAsync("op_http_shutdown", streamRid);
- } catch (error) {
- await reader.cancel(error);
- throw error;
+ if (success) {
+ try {
+ await core.opAsync("op_http_shutdown", streamRid);
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
}
}
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index 2f4ae31e6..65cd4ccfe 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -31,6 +31,7 @@ hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "r
mime = "0.3.16"
percent-encoding.workspace = true
phf = { version = "0.10", features = ["macros"] }
+pin-project.workspace = true
ring.workspace = true
serde.workspace = true
tokio.workspace = true
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index af117d3f9..812394d94 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -70,9 +70,12 @@ use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::task::spawn_local;
-use tokio_util::io::ReaderStream;
+
+use crate::reader_stream::ExternallyAbortableReaderStream;
+use crate::reader_stream::ShutdownHandle;
pub mod compressible;
+mod reader_stream;
pub fn init() -> Extension {
Extension::builder()
@@ -414,8 +417,11 @@ impl Default for HttpRequestReader {
/// The write half of an HTTP stream.
enum HttpResponseWriter {
Headers(oneshot::Sender<Response<Body>>),
- Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
- BodyUncompressed(hyper::body::Sender),
+ Body {
+ writer: Pin<Box<dyn tokio::io::AsyncWrite>>,
+ shutdown_handle: ShutdownHandle,
+ },
+ BodyUncompressed(BodyUncompressedSender),
Closed,
}
@@ -425,6 +431,36 @@ impl Default for HttpResponseWriter {
}
}
+struct BodyUncompressedSender(Option<hyper::body::Sender>);
+
+impl BodyUncompressedSender {
+ fn sender(&mut self) -> &mut hyper::body::Sender {
+ // This is safe because we only ever take the sender out of the option
+ // inside of the shutdown method.
+ self.0.as_mut().unwrap()
+ }
+
+ fn shutdown(mut self) {
+ // take the sender out of self so that when self is dropped at the end of
+ // this block, it doesn't get aborted
+ self.0.take();
+ }
+}
+
+impl From<hyper::body::Sender> for BodyUncompressedSender {
+ fn from(sender: hyper::body::Sender) -> Self {
+ BodyUncompressedSender(Some(sender))
+ }
+}
+
+impl Drop for BodyUncompressedSender {
+ fn drop(&mut self) {
+ if let Some(sender) = self.0.take() {
+ sender.abort();
+ }
+ }
+}
+
// We use a tuple instead of struct to avoid serialization overhead of the keys.
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
@@ -668,14 +704,22 @@ fn http_response(
Encoding::Gzip => Box::pin(GzipEncoder::new(writer)),
_ => unreachable!(), // forbidden by accepts_compression
};
+ let (stream, shutdown_handle) =
+ ExternallyAbortableReaderStream::new(reader);
Ok((
- HttpResponseWriter::Body(writer),
- Body::wrap_stream(ReaderStream::new(reader)),
+ HttpResponseWriter::Body {
+ writer,
+ shutdown_handle,
+ },
+ Body::wrap_stream(stream),
))
}
None => {
let (body_tx, body_rx) = Body::channel();
- Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx))
+ Ok((
+ HttpResponseWriter::BodyUncompressed(body_tx.into()),
+ body_rx,
+ ))
}
}
}
@@ -768,10 +812,10 @@ async fn op_http_write_resource(
}
match &mut *wr {
- HttpResponseWriter::Body(body) => {
- let mut result = body.write_all(&view).await;
+ HttpResponseWriter::Body { writer, .. } => {
+ let mut result = writer.write_all(&view).await;
if result.is_ok() {
- result = body.flush().await;
+ result = writer.flush().await;
}
if let Err(err) = result {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
@@ -784,7 +828,7 @@ async fn op_http_write_resource(
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::from(view);
- if let Err(err) = body.send_data(bytes).await {
+ if let Err(err) = body.sender().send_data(bytes).await {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
http_stream.conn.closed().await?;
@@ -813,10 +857,10 @@ async fn op_http_write(
match &mut *wr {
HttpResponseWriter::Headers(_) => Err(http_error("no response headers")),
HttpResponseWriter::Closed => Err(http_error("response already completed")),
- HttpResponseWriter::Body(body) => {
- let mut result = body.write_all(&buf).await;
+ HttpResponseWriter::Body { writer, .. } => {
+ let mut result = writer.write_all(&buf).await;
if result.is_ok() {
- result = body.flush().await;
+ result = writer.flush().await;
}
match result {
Ok(_) => Ok(()),
@@ -833,7 +877,7 @@ async fn op_http_write(
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::from(buf);
- match body.send_data(bytes).await {
+ match body.sender().send_data(bytes).await {
Ok(_) => Ok(()),
Err(err) => {
assert!(err.is_closed());
@@ -862,17 +906,27 @@ async fn op_http_shutdown(
.get::<HttpStreamResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let wr = take(&mut *wr);
- if let HttpResponseWriter::Body(mut body_writer) = wr {
- match body_writer.shutdown().await {
- Ok(_) => {}
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- stream.conn.closed().await?;
+ match wr {
+ HttpResponseWriter::Body {
+ mut writer,
+ shutdown_handle,
+ } => {
+ shutdown_handle.shutdown();
+ match writer.shutdown().await {
+ Ok(_) => {}
+ Err(err) => {
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ stream.conn.closed().await?;
+ }
}
}
- }
+ HttpResponseWriter::BodyUncompressed(body) => {
+ body.shutdown();
+ }
+ _ => {}
+ };
Ok(())
}
diff --git a/ext/http/reader_stream.rs b/ext/http/reader_stream.rs
new file mode 100644
index 000000000..388b8db81
--- /dev/null
+++ b/ext/http/reader_stream.rs
@@ -0,0 +1,157 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use std::pin::Pin;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+
+use bytes::Bytes;
+use deno_core::futures::Stream;
+use pin_project::pin_project;
+use tokio::io::AsyncRead;
+use tokio_util::io::ReaderStream;
+
+/// [ExternallyAbortableByteStream] adapts a [tokio::AsyncRead] into a [Stream].
+/// It is used to bridge between the HTTP response body resource, and
+/// `hyper::Body`. The stream has the special property that it errors if the
+/// underlying reader is closed before an explicit EOF is sent (in the form of
+/// setting the `shutdown` flag to true).
+#[pin_project]
+pub struct ExternallyAbortableReaderStream<R: AsyncRead> {
+ #[pin]
+ inner: ReaderStream<R>,
+ done: Arc<AtomicBool>,
+}
+
+pub struct ShutdownHandle(Arc<AtomicBool>);
+
+impl ShutdownHandle {
+ pub fn shutdown(&self) {
+ self.0.store(true, std::sync::atomic::Ordering::SeqCst);
+ }
+}
+
+impl<R: AsyncRead> ExternallyAbortableReaderStream<R> {
+ pub fn new(reader: R) -> (Self, ShutdownHandle) {
+ let done = Arc::new(AtomicBool::new(false));
+ let this = Self {
+ inner: ReaderStream::new(reader),
+ done: done.clone(),
+ };
+ (this, ShutdownHandle(done))
+ }
+}
+
+impl<R: AsyncRead> Stream for ExternallyAbortableReaderStream<R> {
+ type Item = std::io::Result<Bytes>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let this = self.project();
+ let val = std::task::ready!(this.inner.poll_next(cx));
+ match val {
+ None if this.done.load(Ordering::SeqCst) => Poll::Ready(None),
+ None => Poll::Ready(Some(Err(std::io::Error::new(
+ std::io::ErrorKind::UnexpectedEof,
+ "stream reader has shut down",
+ )))),
+ Some(val) => Poll::Ready(Some(val)),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use bytes::Bytes;
+ use deno_core::futures::StreamExt;
+ use tokio::io::AsyncWriteExt;
+
+ #[tokio::test]
+ async fn success() {
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, mut writer) = tokio::io::split(b);
+
+ let (mut stream, shutdown_handle) =
+ ExternallyAbortableReaderStream::new(reader);
+
+ writer.write_all(b"hello").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ writer.write_all(b"world").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
+
+ shutdown_handle.shutdown();
+ writer.shutdown().await.unwrap();
+ drop(writer);
+ assert!(stream.next().await.is_none());
+ }
+
+ #[tokio::test]
+ async fn error() {
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, mut writer) = tokio::io::split(b);
+
+ let (mut stream, _shutdown_handle) =
+ ExternallyAbortableReaderStream::new(reader);
+
+ writer.write_all(b"hello").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ drop(writer);
+ assert_eq!(
+ stream.next().await.unwrap().unwrap_err().kind(),
+ std::io::ErrorKind::UnexpectedEof
+ );
+ }
+
+ #[tokio::test]
+ async fn error2() {
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, mut writer) = tokio::io::split(b);
+
+ let (mut stream, _shutdown_handle) =
+ ExternallyAbortableReaderStream::new(reader);
+
+ writer.write_all(b"hello").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ writer.shutdown().await.unwrap();
+ drop(writer);
+ assert_eq!(
+ stream.next().await.unwrap().unwrap_err().kind(),
+ std::io::ErrorKind::UnexpectedEof
+ );
+ }
+
+ #[tokio::test]
+ async fn write_after_shutdown() {
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, mut writer) = tokio::io::split(b);
+
+ let (mut stream, shutdown_handle) =
+ ExternallyAbortableReaderStream::new(reader);
+
+ writer.write_all(b"hello").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ writer.write_all(b"world").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
+
+ shutdown_handle.shutdown();
+ writer.shutdown().await.unwrap();
+
+ assert!(writer.write_all(b"!").await.is_err());
+
+ drop(writer);
+ assert!(stream.next().await.is_none());
+ }
+}