diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2022-04-20 18:16:44 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-20 18:16:44 +0530 |
commit | 57a8fc37fc99491fa2559694f78af52a597bc501 (patch) | |
tree | b6d3bd41faa72dfeec4643e5fbb68a669ca03e79 /ext/http/lib.rs | |
parent | 3833d37b15e1e8380efd1a9eea956a8b33745555 (diff) |
perf(http): optimize `ReadableStream`s backed by a resource (#14284)
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 9c0109937..dff5c14cb 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -75,6 +75,7 @@ pub fn init() -> Extension { op_http_read::decl(), op_http_write_headers::decl(), op_http_write::decl(), + op_http_write_resource::decl(), op_http_shutdown::decl(), op_http_websocket_accept_header::decl(), op_http_upgrade_websocket::decl(), @@ -665,6 +666,56 @@ async fn op_http_write_headers( } #[op] +async fn op_http_write_resource( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + stream: ResourceId, +) -> Result<(), AnyError> { + let http_stream = state + .borrow() + .resource_table + .get::<HttpStreamResource>(rid)?; + let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; + let resource = state.borrow().resource_table.get_any(stream)?; + loop { + let body_tx = match &mut *wr { + HttpResponseWriter::Body(body_tx) => body_tx, + HttpResponseWriter::Headers(_) => { + return Err(http_error("no response headers")) + } + HttpResponseWriter::Closed => { + return Err(http_error("response already completed")) + } + }; + + let mut vec = vec![0u8; 64 * 1024]; + let vec_ptr = vec.as_mut_ptr(); + let buf = ZeroCopyBuf::new_temp(vec); + let nread = resource.clone().read(buf).await?; + if nread == 0 { + break; + } + // SAFETY: ZeroCopyBuf keeps the Vec<u8> alive. + let bytes = + Bytes::from_static(unsafe { std::slice::from_raw_parts(vec_ptr, nread) }); + match body_tx.send_data(bytes).await { + Ok(_) => {} + Err(err) => { + // Don't return "channel closed", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + assert!(err.is_closed()); + http_stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + } + } + } + + take(&mut *wr); + Ok(()) +} + +#[op] async fn op_http_write( state: Rc<RefCell<OpState>>, rid: ResourceId, |