diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2022-04-22 16:19:08 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-22 16:19:08 +0530 |
commit | 57f7e07c13a1a692602022af4fc32c6ac352bb72 (patch) | |
tree | e2b6bf271ebda2aecf158b25c9d6c466461549dd /ext/http/lib.rs | |
parent | 2724235ec798f1fbf8fb5bd291615987ac4b919e (diff) |
Reland "perf(http): optimize ReadableStreams backed by a resource" (#14346)
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 9c1b48fff..7fc90843f 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -77,6 +77,7 @@ pub fn init() -> Extension { op_http_read::decl(), op_http_write_headers::decl(), op_http_write::decl(), + op_http_write_resource::decl(), op_http_shutdown::decl(), op_http_websocket_accept_header::decl(), op_http_upgrade_websocket::decl(), @@ -684,6 +685,63 @@ async fn op_http_write_headers( } #[op] +async fn op_http_write_resource( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + stream: ResourceId, +) -> Result<(), AnyError> { + let http_stream = state + .borrow() + .resource_table + .get::<HttpStreamResource>(rid)?; + let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; + let resource = state.borrow().resource_table.get_any(stream)?; + loop { + let body_writer = match &mut *wr { + HttpResponseWriter::Body(body_writer) => body_writer, + HttpResponseWriter::Headers(_) => { + return Err(http_error("no response headers")) + } + HttpResponseWriter::Closed => { + return Err(http_error("response already completed")) + } + }; + + let vec = vec![0u8; 64 * 1024]; // 64KB + let buf = ZeroCopyBuf::new_temp(vec); + let (nread, buf) = resource.clone().read_return(buf).await?; + if nread == 0 { + break; + } + match body_writer.write_all(&buf[..nread]).await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + http_stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + } + } + } + + let wr = take(&mut *wr); + if let HttpResponseWriter::Body(mut body_writer) = wr { + match body_writer.shutdown().await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + http_stream.conn.closed().await?; + } + } + } + Ok(()) +} + +#[op] async fn op_http_write( state: Rc<RefCell<OpState>>, rid: ResourceId, |