diff options
Diffstat (limited to 'ext/http')
-rw-r--r-- | ext/http/01_http.js | 15 | ||||
-rw-r--r-- | ext/http/Cargo.toml | 1 | ||||
-rw-r--r-- | ext/http/lib.rs | 100 | ||||
-rw-r--r-- | ext/http/reader_stream.rs | 157 |
4 files changed, 245 insertions, 28 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js index bd740b600..dfb0f206c 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -263,6 +263,7 @@ } if (isStreamingResponseBody) { + let success = false; if ( respBody === null || !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody) @@ -284,6 +285,7 @@ ); if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid); readableStreamClose(respBody); // Release JS lock. + success = true; } catch (error) { const connError = httpConn[connErrorSymbol]; if ( @@ -320,13 +322,16 @@ throw error; } } + success = true; } - try { - await core.opAsync("op_http_shutdown", streamRid); - } catch (error) { - await reader.cancel(error); - throw error; + if (success) { + try { + await core.opAsync("op_http_shutdown", streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } } } diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index 2f4ae31e6..65cd4ccfe 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -31,6 +31,7 @@ hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "r mime = "0.3.16" percent-encoding.workspace = true phf = { version = "0.10", features = ["macros"] } +pin-project.workspace = true ring.workspace = true serde.workspace = true tokio.workspace = true diff --git a/ext/http/lib.rs b/ext/http/lib.rs index af117d3f9..812394d94 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -70,9 +70,12 @@ use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; -use tokio_util::io::ReaderStream; + +use crate::reader_stream::ExternallyAbortableReaderStream; +use crate::reader_stream::ShutdownHandle; pub mod compressible; +mod reader_stream; pub fn init() -> Extension { Extension::builder() @@ -414,8 +417,11 @@ impl Default for HttpRequestReader { /// The write half of an HTTP stream. enum HttpResponseWriter { Headers(oneshot::Sender<Response<Body>>), - Body(Pin<Box<dyn tokio::io::AsyncWrite>>), - BodyUncompressed(hyper::body::Sender), + Body { + writer: Pin<Box<dyn tokio::io::AsyncWrite>>, + shutdown_handle: ShutdownHandle, + }, + BodyUncompressed(BodyUncompressedSender), Closed, } @@ -425,6 +431,36 @@ impl Default for HttpResponseWriter { } } +struct BodyUncompressedSender(Option<hyper::body::Sender>); + +impl BodyUncompressedSender { + fn sender(&mut self) -> &mut hyper::body::Sender { + // This is safe because we only ever take the sender out of the option + // inside of the shutdown method. + self.0.as_mut().unwrap() + } + + fn shutdown(mut self) { + // take the sender out of self so that when self is dropped at the end of + // this block, it doesn't get aborted + self.0.take(); + } +} + +impl From<hyper::body::Sender> for BodyUncompressedSender { + fn from(sender: hyper::body::Sender) -> Self { + BodyUncompressedSender(Some(sender)) + } +} + +impl Drop for BodyUncompressedSender { + fn drop(&mut self) { + if let Some(sender) = self.0.take() { + sender.abort(); + } + } +} + // We use a tuple instead of struct to avoid serialization overhead of the keys. #[derive(Serialize)] #[serde(rename_all = "camelCase")] @@ -668,14 +704,22 @@ fn http_response( Encoding::Gzip => Box::pin(GzipEncoder::new(writer)), _ => unreachable!(), // forbidden by accepts_compression }; + let (stream, shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); Ok(( - HttpResponseWriter::Body(writer), - Body::wrap_stream(ReaderStream::new(reader)), + HttpResponseWriter::Body { + writer, + shutdown_handle, + }, + Body::wrap_stream(stream), )) } None => { let (body_tx, body_rx) = Body::channel(); - Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx)) + Ok(( + HttpResponseWriter::BodyUncompressed(body_tx.into()), + body_rx, + )) } } } @@ -768,10 +812,10 @@ async fn op_http_write_resource( } match &mut *wr { - HttpResponseWriter::Body(body) => { - let mut result = body.write_all(&view).await; + HttpResponseWriter::Body { writer, .. } => { + let mut result = writer.write_all(&view).await; if result.is_ok() { - result = body.flush().await; + result = writer.flush().await; } if let Err(err) = result { assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); @@ -784,7 +828,7 @@ async fn op_http_write_resource( } HttpResponseWriter::BodyUncompressed(body) => { let bytes = Bytes::from(view); - if let Err(err) = body.send_data(bytes).await { + if let Err(err) = body.sender().send_data(bytes).await { assert!(err.is_closed()); // Pull up the failure associated with the transport connection instead. http_stream.conn.closed().await?; @@ -813,10 +857,10 @@ async fn op_http_write( match &mut *wr { HttpResponseWriter::Headers(_) => Err(http_error("no response headers")), HttpResponseWriter::Closed => Err(http_error("response already completed")), - HttpResponseWriter::Body(body) => { - let mut result = body.write_all(&buf).await; + HttpResponseWriter::Body { writer, .. } => { + let mut result = writer.write_all(&buf).await; if result.is_ok() { - result = body.flush().await; + result = writer.flush().await; } match result { Ok(_) => Ok(()), @@ -833,7 +877,7 @@ async fn op_http_write( } HttpResponseWriter::BodyUncompressed(body) => { let bytes = Bytes::from(buf); - match body.send_data(bytes).await { + match body.sender().send_data(bytes).await { Ok(_) => Ok(()), Err(err) => { assert!(err.is_closed()); @@ -862,17 +906,27 @@ async fn op_http_shutdown( .get::<HttpStreamResource>(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let wr = take(&mut *wr); - if let HttpResponseWriter::Body(mut body_writer) = wr { - match body_writer.shutdown().await { - Ok(_) => {} - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - stream.conn.closed().await?; + match wr { + HttpResponseWriter::Body { + mut writer, + shutdown_handle, + } => { + shutdown_handle.shutdown(); + match writer.shutdown().await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + stream.conn.closed().await?; + } } } - } + HttpResponseWriter::BodyUncompressed(body) => { + body.shutdown(); + } + _ => {} + }; Ok(()) } diff --git a/ext/http/reader_stream.rs b/ext/http/reader_stream.rs new file mode 100644 index 000000000..388b8db81 --- /dev/null +++ b/ext/http/reader_stream.rs @@ -0,0 +1,157 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use std::pin::Pin; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +use bytes::Bytes; +use deno_core::futures::Stream; +use pin_project::pin_project; +use tokio::io::AsyncRead; +use tokio_util::io::ReaderStream; + +/// [ExternallyAbortableByteStream] adapts a [tokio::AsyncRead] into a [Stream]. +/// It is used to bridge between the HTTP response body resource, and +/// `hyper::Body`. The stream has the special property that it errors if the +/// underlying reader is closed before an explicit EOF is sent (in the form of +/// setting the `shutdown` flag to true). +#[pin_project] +pub struct ExternallyAbortableReaderStream<R: AsyncRead> { + #[pin] + inner: ReaderStream<R>, + done: Arc<AtomicBool>, +} + +pub struct ShutdownHandle(Arc<AtomicBool>); + +impl ShutdownHandle { + pub fn shutdown(&self) { + self.0.store(true, std::sync::atomic::Ordering::SeqCst); + } +} + +impl<R: AsyncRead> ExternallyAbortableReaderStream<R> { + pub fn new(reader: R) -> (Self, ShutdownHandle) { + let done = Arc::new(AtomicBool::new(false)); + let this = Self { + inner: ReaderStream::new(reader), + done: done.clone(), + }; + (this, ShutdownHandle(done)) + } +} + +impl<R: AsyncRead> Stream for ExternallyAbortableReaderStream<R> { + type Item = std::io::Result<Bytes>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = self.project(); + let val = std::task::ready!(this.inner.poll_next(cx)); + match val { + None if this.done.load(Ordering::SeqCst) => Poll::Ready(None), + None => Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "stream reader has shut down", + )))), + Some(val) => Poll::Ready(Some(val)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use deno_core::futures::StreamExt; + use tokio::io::AsyncWriteExt; + + #[tokio::test] + async fn success() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + writer.write_all(b"world").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world")); + + shutdown_handle.shutdown(); + writer.shutdown().await.unwrap(); + drop(writer); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn error() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, _shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + drop(writer); + assert_eq!( + stream.next().await.unwrap().unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } + + #[tokio::test] + async fn error2() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, _shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + writer.shutdown().await.unwrap(); + drop(writer); + assert_eq!( + stream.next().await.unwrap().unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } + + #[tokio::test] + async fn write_after_shutdown() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + writer.write_all(b"world").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world")); + + shutdown_handle.shutdown(); + writer.shutdown().await.unwrap(); + + assert!(writer.write_all(b"!").await.is_err()); + + drop(writer); + assert!(stream.next().await.is_none()); + } +} |