summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2022-12-20 09:46:45 +0100
committerGitHub <noreply@github.com>2022-12-20 08:46:45 +0000
commit8e947bb674725195a4d2a754445ee71029108f61 (patch)
treece4e859c2f9bc5443e88781a09960a7fb1859afa /ext
parent948f85216a15e4ef489af21bb532a9b201b0364c (diff)
fix(ext/http): close stream on resp body error (#17126)
Previously, errored streaming response bodies did not cause the HTTP stream to be aborted. It instead caused the stream to be closed gracefully, which had the result that the client could not detect the difference between a successful response and an errored response. This commit fixes the issue by aborting the stream on error.
Diffstat (limited to 'ext')
-rw-r--r--ext/http/01_http.js15
-rw-r--r--ext/http/Cargo.toml1
-rw-r--r--ext/http/lib.rs100
-rw-r--r--ext/http/reader_stream.rs157
4 files changed, 245 insertions, 28 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index bd740b600..dfb0f206c 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -263,6 +263,7 @@
}
if (isStreamingResponseBody) {
+ let success = false;
if (
respBody === null ||
!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody)
@@ -284,6 +285,7 @@
);
if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid);
readableStreamClose(respBody); // Release JS lock.
+ success = true;
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (
@@ -320,13 +322,16 @@
throw error;
}
}
+ success = true;
}
- try {
- await core.opAsync("op_http_shutdown", streamRid);
- } catch (error) {
- await reader.cancel(error);
- throw error;
+ if (success) {
+ try {
+ await core.opAsync("op_http_shutdown", streamRid);
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
}
}
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index 2f4ae31e6..65cd4ccfe 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -31,6 +31,7 @@ hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "r
mime = "0.3.16"
percent-encoding.workspace = true
phf = { version = "0.10", features = ["macros"] }
+pin-project.workspace = true
ring.workspace = true
serde.workspace = true
tokio.workspace = true
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index af117d3f9..812394d94 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -70,9 +70,12 @@ use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::task::spawn_local;
-use tokio_util::io::ReaderStream;
+
+use crate::reader_stream::ExternallyAbortableReaderStream;
+use crate::reader_stream::ShutdownHandle;
pub mod compressible;
+mod reader_stream;
pub fn init() -> Extension {
Extension::builder()
@@ -414,8 +417,11 @@ impl Default for HttpRequestReader {
/// The write half of an HTTP stream.
enum HttpResponseWriter {
Headers(oneshot::Sender<Response<Body>>),
- Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
- BodyUncompressed(hyper::body::Sender),
+ Body {
+ writer: Pin<Box<dyn tokio::io::AsyncWrite>>,
+ shutdown_handle: ShutdownHandle,
+ },
+ BodyUncompressed(BodyUncompressedSender),
Closed,
}
@@ -425,6 +431,36 @@ impl Default for HttpResponseWriter {
}
}
+struct BodyUncompressedSender(Option<hyper::body::Sender>);
+
+impl BodyUncompressedSender {
+ fn sender(&mut self) -> &mut hyper::body::Sender {
+ // This is safe because we only ever take the sender out of the option
+ // inside of the shutdown method.
+ self.0.as_mut().unwrap()
+ }
+
+ fn shutdown(mut self) {
+ // take the sender out of self so that when self is dropped at the end of
+ // this block, it doesn't get aborted
+ self.0.take();
+ }
+}
+
+impl From<hyper::body::Sender> for BodyUncompressedSender {
+ fn from(sender: hyper::body::Sender) -> Self {
+ BodyUncompressedSender(Some(sender))
+ }
+}
+
+impl Drop for BodyUncompressedSender {
+ fn drop(&mut self) {
+ if let Some(sender) = self.0.take() {
+ sender.abort();
+ }
+ }
+}
+
// We use a tuple instead of struct to avoid serialization overhead of the keys.
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
@@ -668,14 +704,22 @@ fn http_response(
Encoding::Gzip => Box::pin(GzipEncoder::new(writer)),
_ => unreachable!(), // forbidden by accepts_compression
};
+ let (stream, shutdown_handle) =
+ ExternallyAbortableReaderStream::new(reader);
Ok((
- HttpResponseWriter::Body(writer),
- Body::wrap_stream(ReaderStream::new(reader)),
+ HttpResponseWriter::Body {
+ writer,
+ shutdown_handle,
+ },
+ Body::wrap_stream(stream),
))
}
None => {
let (body_tx, body_rx) = Body::channel();
- Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx))
+ Ok((
+ HttpResponseWriter::BodyUncompressed(body_tx.into()),
+ body_rx,
+ ))
}
}
}
@@ -768,10 +812,10 @@ async fn op_http_write_resource(
}
match &mut *wr {
- HttpResponseWriter::Body(body) => {
- let mut result = body.write_all(&view).await;
+ HttpResponseWriter::Body { writer, .. } => {
+ let mut result = writer.write_all(&view).await;
if result.is_ok() {
- result = body.flush().await;
+ result = writer.flush().await;
}
if let Err(err) = result {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
@@ -784,7 +828,7 @@ async fn op_http_write_resource(
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::from(view);
- if let Err(err) = body.send_data(bytes).await {
+ if let Err(err) = body.sender().send_data(bytes).await {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
http_stream.conn.closed().await?;
@@ -813,10 +857,10 @@ async fn op_http_write(
match &mut *wr {
HttpResponseWriter::Headers(_) => Err(http_error("no response headers")),
HttpResponseWriter::Closed => Err(http_error("response already completed")),
- HttpResponseWriter::Body(body) => {
- let mut result = body.write_all(&buf).await;
+ HttpResponseWriter::Body { writer, .. } => {
+ let mut result = writer.write_all(&buf).await;
if result.is_ok() {
- result = body.flush().await;
+ result = writer.flush().await;
}
match result {
Ok(_) => Ok(()),
@@ -833,7 +877,7 @@ async fn op_http_write(
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::from(buf);
- match body.send_data(bytes).await {
+ match body.sender().send_data(bytes).await {
Ok(_) => Ok(()),
Err(err) => {
assert!(err.is_closed());
@@ -862,17 +906,27 @@ async fn op_http_shutdown(
.get::<HttpStreamResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let wr = take(&mut *wr);
- if let HttpResponseWriter::Body(mut body_writer) = wr {
- match body_writer.shutdown().await {
- Ok(_) => {}
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- stream.conn.closed().await?;
+ match wr {
+ HttpResponseWriter::Body {
+ mut writer,
+ shutdown_handle,
+ } => {
+ shutdown_handle.shutdown();
+ match writer.shutdown().await {
+ Ok(_) => {}
+ Err(err) => {
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ stream.conn.closed().await?;
+ }
}
}
- }
+ HttpResponseWriter::BodyUncompressed(body) => {
+ body.shutdown();
+ }
+ _ => {}
+ };
Ok(())
}
diff --git a/ext/http/reader_stream.rs b/ext/http/reader_stream.rs
new file mode 100644
index 000000000..388b8db81
--- /dev/null
+++ b/ext/http/reader_stream.rs
@@ -0,0 +1,157 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use std::pin::Pin;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+
+use bytes::Bytes;
+use deno_core::futures::Stream;
+use pin_project::pin_project;
+use tokio::io::AsyncRead;
+use tokio_util::io::ReaderStream;
+
+/// [ExternallyAbortableByteStream] adapts a [tokio::AsyncRead] into a [Stream].
+/// It is used to bridge between the HTTP response body resource, and
+/// `hyper::Body`. The stream has the special property that it errors if the
+/// underlying reader is closed before an explicit EOF is sent (in the form of
+/// setting the `shutdown` flag to true).
+#[pin_project]
+pub struct ExternallyAbortableReaderStream<R: AsyncRead> {
+ #[pin]
+ inner: ReaderStream<R>,
+ done: Arc<AtomicBool>,
+}
+
+pub struct ShutdownHandle(Arc<AtomicBool>);
+
+impl ShutdownHandle {
+ pub fn shutdown(&self) {
+ self.0.store(true, std::sync::atomic::Ordering::SeqCst);
+ }
+}
+
+impl<R: AsyncRead> ExternallyAbortableReaderStream<R> {
+ pub fn new(reader: R) -> (Self, ShutdownHandle) {
+ let done = Arc::new(AtomicBool::new(false));
+ let this = Self {
+ inner: ReaderStream::new(reader),
+ done: done.clone(),
+ };
+ (this, ShutdownHandle(done))
+ }
+}
+
+impl<R: AsyncRead> Stream for ExternallyAbortableReaderStream<R> {
+ type Item = std::io::Result<Bytes>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let this = self.project();
+ let val = std::task::ready!(this.inner.poll_next(cx));
+ match val {
+ None if this.done.load(Ordering::SeqCst) => Poll::Ready(None),
+ None => Poll::Ready(Some(Err(std::io::Error::new(
+ std::io::ErrorKind::UnexpectedEof,
+ "stream reader has shut down",
+ )))),
+ Some(val) => Poll::Ready(Some(val)),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use bytes::Bytes;
+ use deno_core::futures::StreamExt;
+ use tokio::io::AsyncWriteExt;
+
+ #[tokio::test]
+ async fn success() {
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, mut writer) = tokio::io::split(b);
+
+ let (mut stream, shutdown_handle) =
+ ExternallyAbortableReaderStream::new(reader);
+
+ writer.write_all(b"hello").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ writer.write_all(b"world").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
+
+ shutdown_handle.shutdown();
+ writer.shutdown().await.unwrap();
+ drop(writer);
+ assert!(stream.next().await.is_none());
+ }
+
+ #[tokio::test]
+ async fn error() {
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, mut writer) = tokio::io::split(b);
+
+ let (mut stream, _shutdown_handle) =
+ ExternallyAbortableReaderStream::new(reader);
+
+ writer.write_all(b"hello").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ drop(writer);
+ assert_eq!(
+ stream.next().await.unwrap().unwrap_err().kind(),
+ std::io::ErrorKind::UnexpectedEof
+ );
+ }
+
+ #[tokio::test]
+ async fn error2() {
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, mut writer) = tokio::io::split(b);
+
+ let (mut stream, _shutdown_handle) =
+ ExternallyAbortableReaderStream::new(reader);
+
+ writer.write_all(b"hello").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ writer.shutdown().await.unwrap();
+ drop(writer);
+ assert_eq!(
+ stream.next().await.unwrap().unwrap_err().kind(),
+ std::io::ErrorKind::UnexpectedEof
+ );
+ }
+
+ #[tokio::test]
+ async fn write_after_shutdown() {
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, mut writer) = tokio::io::split(b);
+
+ let (mut stream, shutdown_handle) =
+ ExternallyAbortableReaderStream::new(reader);
+
+ writer.write_all(b"hello").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+ writer.write_all(b"world").await.unwrap();
+ assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
+
+ shutdown_handle.shutdown();
+ writer.shutdown().await.unwrap();
+
+ assert!(writer.write_all(b"!").await.is_err());
+
+ drop(writer);
+ assert!(stream.next().await.is_none());
+ }
+}