summaryrefslogtreecommitdiff
path: root/ext/http
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http')
-rw-r--r--ext/http/01_http.js63
-rw-r--r--ext/http/lib.rs51
2 files changed, 90 insertions, 24 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index 217bfc061..ff4b6f41f 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -32,7 +32,8 @@
} = window.__bootstrap.webSocket;
const { TcpConn, UnixConn } = window.__bootstrap.net;
const { TlsConn } = window.__bootstrap.tls;
- const { Deferred } = window.__bootstrap.streams;
+ const { Deferred, getReadableStreamRid, readableStreamClose } =
+ window.__bootstrap.streams;
const {
ArrayPrototypeIncludes,
ArrayPrototypePush,
@@ -235,7 +236,6 @@
typeof respBody === "string" ||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
);
-
try {
await core.opAsync(
"op_http_write_headers",
@@ -269,35 +269,50 @@
) {
throw new TypeError("Unreachable");
}
- const reader = respBody.getReader();
- while (true) {
- const { value, done } = await reader.read();
- if (done) break;
- if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
- await reader.cancel(new TypeError("Value not a Uint8Array"));
- break;
+ const resourceRid = getReadableStreamRid(respBody);
+ if (resourceRid) {
+ if (respBody.locked) {
+ throw new TypeError("ReadableStream is locked.");
}
+ const _reader = respBody.getReader(); // Aquire JS lock.
+ await core.opAsync(
+ "op_http_write_resource",
+ streamRid,
+ resourceRid,
+ );
+ readableStreamClose(respBody); // Release JS lock.
+ } else {
+ const reader = respBody.getReader();
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
+ await reader.cancel(new TypeError("Value not a Uint8Array"));
+ break;
+ }
+ try {
+ await core.opAsync("op_http_write", streamRid, value);
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (
+ ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
+ connError != null
+ ) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ await reader.cancel(error);
+ throw error;
+ }
+ }
+
try {
- await core.opAsync("op_http_write", streamRid, value);
+ await core.opAsync("op_http_shutdown", streamRid);
} catch (error) {
- const connError = httpConn[connErrorSymbol];
- if (
- ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
- connError != null
- ) {
- // deno-lint-ignore no-ex-assign
- error = new connError.constructor(connError.message);
- }
await reader.cancel(error);
throw error;
}
}
- try {
- await core.opAsync("op_http_shutdown", streamRid);
- } catch (error) {
- await reader.cancel(error);
- throw error;
- }
}
const deferred = request[_deferred];
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 9c0109937..dff5c14cb 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -75,6 +75,7 @@ pub fn init() -> Extension {
op_http_read::decl(),
op_http_write_headers::decl(),
op_http_write::decl(),
+ op_http_write_resource::decl(),
op_http_shutdown::decl(),
op_http_websocket_accept_header::decl(),
op_http_upgrade_websocket::decl(),
@@ -665,6 +666,56 @@ async fn op_http_write_headers(
}
#[op]
+async fn op_http_write_resource(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ stream: ResourceId,
+) -> Result<(), AnyError> {
+ let http_stream = state
+ .borrow()
+ .resource_table
+ .get::<HttpStreamResource>(rid)?;
+ let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
+ let resource = state.borrow().resource_table.get_any(stream)?;
+ loop {
+ let body_tx = match &mut *wr {
+ HttpResponseWriter::Body(body_tx) => body_tx,
+ HttpResponseWriter::Headers(_) => {
+ return Err(http_error("no response headers"))
+ }
+ HttpResponseWriter::Closed => {
+ return Err(http_error("response already completed"))
+ }
+ };
+
+ let mut vec = vec![0u8; 64 * 1024];
+ let vec_ptr = vec.as_mut_ptr();
+ let buf = ZeroCopyBuf::new_temp(vec);
+ let nread = resource.clone().read(buf).await?;
+ if nread == 0 {
+ break;
+ }
+ // SAFETY: ZeroCopyBuf keeps the Vec<u8> alive.
+ let bytes =
+ Bytes::from_static(unsafe { std::slice::from_raw_parts(vec_ptr, nread) });
+ match body_tx.send_data(bytes).await {
+ Ok(_) => {}
+ Err(err) => {
+ // Don't return "channel closed", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ assert!(err.is_closed());
+ http_stream.conn.closed().await?;
+ // If there was no connection error, drop body_tx.
+ *wr = HttpResponseWriter::Closed;
+ }
+ }
+ }
+
+ take(&mut *wr);
+ Ok(())
+}
+
+#[op]
async fn op_http_write(
state: Rc<RefCell<OpState>>,
rid: ResourceId,