summaryrefslogtreecommitdiff
path: root/ext/http/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r--ext/http/lib.rs51
1 files changed, 51 insertions, 0 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 9c0109937..dff5c14cb 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -75,6 +75,7 @@ pub fn init() -> Extension {
op_http_read::decl(),
op_http_write_headers::decl(),
op_http_write::decl(),
+ op_http_write_resource::decl(),
op_http_shutdown::decl(),
op_http_websocket_accept_header::decl(),
op_http_upgrade_websocket::decl(),
@@ -665,6 +666,56 @@ async fn op_http_write_headers(
}
#[op]
+async fn op_http_write_resource(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ stream: ResourceId,
+) -> Result<(), AnyError> {
+ let http_stream = state
+ .borrow()
+ .resource_table
+ .get::<HttpStreamResource>(rid)?;
+ let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
+ let resource = state.borrow().resource_table.get_any(stream)?;
+ loop {
+ let body_tx = match &mut *wr {
+ HttpResponseWriter::Body(body_tx) => body_tx,
+ HttpResponseWriter::Headers(_) => {
+ return Err(http_error("no response headers"))
+ }
+ HttpResponseWriter::Closed => {
+ return Err(http_error("response already completed"))
+ }
+ };
+
+ let mut vec = vec![0u8; 64 * 1024];
+ let vec_ptr = vec.as_mut_ptr();
+ let buf = ZeroCopyBuf::new_temp(vec);
+ let nread = resource.clone().read(buf).await?;
+ if nread == 0 {
+ break;
+ }
+ // SAFETY: ZeroCopyBuf keeps the Vec<u8> alive.
+ let bytes =
+ Bytes::from_static(unsafe { std::slice::from_raw_parts(vec_ptr, nread) });
+ match body_tx.send_data(bytes).await {
+ Ok(_) => {}
+ Err(err) => {
+ // Don't return "channel closed", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ assert!(err.is_closed());
+ http_stream.conn.closed().await?;
+ // If there was no connection error, drop body_tx.
+ *wr = HttpResponseWriter::Closed;
+ }
+ }
+ }
+
+ take(&mut *wr);
+ Ok(())
+}
+
+#[op]
async fn op_http_write(
state: Rc<RefCell<OpState>>,
rid: ResourceId,