summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/http/lib.rs113
1 files changed, 69 insertions, 44 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 1888409ba..b4a2f0c45 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -343,6 +343,7 @@ impl Default for HttpRequestReader {
enum HttpResponseWriter {
Headers(oneshot::Sender<Response<Body>>),
Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
+ BodyUncompressed(hyper::body::Sender),
Closed,
}
@@ -638,17 +639,15 @@ async fn op_http_write_headers(
}
None => {
// If no buffer was passed, the caller will stream the response body.
-
- // Create a one way pipe that implements tokio's async io traits. To do
- // this we create a [tokio::io::DuplexStream], but then throw away one
- // of the directions to create a one way pipe.
- let (a, b) = tokio::io::duplex(64 * 1024);
- let (reader, _) = tokio::io::split(a);
- let (_, writer) = tokio::io::split(b);
-
- let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
-
if should_compress {
+ // Create a one way pipe that implements tokio's async io traits. To do
+ // this we create a [tokio::io::DuplexStream], but then throw away one
+ // of the directions to create a one way pipe.
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, writer) = tokio::io::split(b);
+
+ let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
match *stream.accept_encoding.borrow() {
Encoding::Brotli => {
let writer = BrotliEncoder::new(writer);
@@ -662,12 +661,14 @@ async fn op_http_write_headers(
builder = builder.header("content-encoding", "gzip");
}
}
+
+ body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
+ new_wr = HttpResponseWriter::Body(writer_body);
} else {
- writer_body = Box::pin(writer);
+ let (body_tx, body_rx) = Body::channel();
+ body = builder.body(body_rx)?;
+ new_wr = HttpResponseWriter::BodyUncompressed(body_tx);
}
-
- body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
- new_wr = HttpResponseWriter::Body(writer_body);
}
}
@@ -699,14 +700,14 @@ async fn op_http_write_resource(
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
let resource = state.borrow().resource_table.get_any(stream)?;
loop {
- let body_writer = match &mut *wr {
- HttpResponseWriter::Body(body_writer) => body_writer,
+ match *wr {
HttpResponseWriter::Headers(_) => {
return Err(http_error("no response headers"))
}
HttpResponseWriter::Closed => {
return Err(http_error("response already completed"))
}
+ _ => {}
};
let vec = vec![0u8; 64 * 1024]; // 64KB
@@ -715,17 +716,29 @@ async fn op_http_write_resource(
if nread == 0 {
break;
}
- match body_writer.write_all(&buf[..nread]).await {
- Ok(_) => {}
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- http_stream.conn.closed().await?;
- // If there was no connection error, drop body_tx.
- *wr = HttpResponseWriter::Closed;
+
+ match &mut *wr {
+ HttpResponseWriter::Body(body) => {
+ if let Err(err) = body.write_all(&buf[..nread]).await {
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ http_stream.conn.closed().await?;
+ // If there was no connection error, drop body_tx.
+ *wr = HttpResponseWriter::Closed;
+ }
}
- }
+ HttpResponseWriter::BodyUncompressed(body) => {
+ if let Err(err) = body.send_data(Bytes::from(buf.to_temp())).await {
+ assert!(err.is_closed());
+ // Pull up the failure associated with the transport connection instead.
+ http_stream.conn.closed().await?;
+ // If there was no connection error, drop body_tx.
+ *wr = HttpResponseWriter::Closed;
+ }
+ }
+ _ => unreachable!(),
+ };
}
let wr = take(&mut *wr);
@@ -756,30 +769,42 @@ async fn op_http_write(
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
loop {
- let body_writer = match &mut *wr {
- HttpResponseWriter::Body(body_tx) => body_tx,
+ match &mut *wr {
HttpResponseWriter::Headers(_) => {
break Err(http_error("no response headers"))
}
HttpResponseWriter::Closed => {
break Err(http_error("response already completed"))
}
- };
-
- let mut res = body_writer.write_all(&buf).await;
- if res.is_ok() {
- res = body_writer.flush().await;
- }
-
- match res {
- Ok(_) => break Ok(()),
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- stream.conn.closed().await?;
- // If there was no connection error, drop body_tx.
- *wr = HttpResponseWriter::Closed;
+ HttpResponseWriter::Body(body) => {
+ let mut result = body.write_all(&buf).await;
+ if result.is_ok() {
+ result = body.flush().await;
+ }
+ match result {
+ Ok(_) => break Ok(()),
+ Err(err) => {
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ stream.conn.closed().await?;
+ // If there was no connection error, drop body_tx.
+ *wr = HttpResponseWriter::Closed;
+ }
+ }
+ }
+ HttpResponseWriter::BodyUncompressed(body) => {
+ let bytes = Bytes::copy_from_slice(&buf[..]);
+ match body.send_data(bytes).await {
+ Ok(_) => break Ok(()),
+ Err(err) => {
+ assert!(err.is_closed());
+ // Pull up the failure associated with the transport connection instead.
+ stream.conn.closed().await?;
+ // If there was no connection error, drop body_tx.
+ *wr = HttpResponseWriter::Closed;
+ }
+ }
}
}
}