diff options
Diffstat (limited to 'ext/http')
-rw-r--r-- | ext/http/01_http.js | 63 | ||||
-rw-r--r-- | ext/http/lib.rs | 51 |
2 files changed, 90 insertions, 24 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 217bfc061..ff4b6f41f 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -32,7 +32,8 @@ } = window.__bootstrap.webSocket; const { TcpConn, UnixConn } = window.__bootstrap.net; const { TlsConn } = window.__bootstrap.tls; - const { Deferred } = window.__bootstrap.streams; + const { Deferred, getReadableStreamRid, readableStreamClose } = + window.__bootstrap.streams; const { ArrayPrototypeIncludes, ArrayPrototypePush, @@ -235,7 +236,6 @@ typeof respBody === "string" || ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody) ); - try { await core.opAsync( "op_http_write_headers", @@ -269,35 +269,50 @@ ) { throw new TypeError("Unreachable"); } - const reader = respBody.getReader(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { - await reader.cancel(new TypeError("Value not a Uint8Array")); - break; + const resourceRid = getReadableStreamRid(respBody); + if (resourceRid) { + if (respBody.locked) { + throw new TypeError("ReadableStream is locked."); } + const _reader = respBody.getReader(); // Aquire JS lock. + await core.opAsync( + "op_http_write_resource", + streamRid, + resourceRid, + ); + readableStreamClose(respBody); // Release JS lock. + } else { + const reader = respBody.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { + await reader.cancel(new TypeError("Value not a Uint8Array")); + break; + } + try { + await core.opAsync("op_http_write", streamRid, value); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if ( + ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && + connError != null + ) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + await reader.cancel(error); + throw error; + } + } + try { - await core.opAsync("op_http_write", streamRid, value); + await core.opAsync("op_http_shutdown", streamRid); } catch (error) { - const connError = httpConn[connErrorSymbol]; - if ( - ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && - connError != null - ) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } await reader.cancel(error); throw error; } } - try { - await core.opAsync("op_http_shutdown", streamRid); - } catch (error) { - await reader.cancel(error); - throw error; - } } const deferred = request[_deferred]; diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 9c0109937..dff5c14cb 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -75,6 +75,7 @@ pub fn init() -> Extension { op_http_read::decl(), op_http_write_headers::decl(), op_http_write::decl(), + op_http_write_resource::decl(), op_http_shutdown::decl(), op_http_websocket_accept_header::decl(), op_http_upgrade_websocket::decl(), @@ -665,6 +666,56 @@ async fn op_http_write_headers( } #[op] +async fn op_http_write_resource( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + stream: ResourceId, +) -> Result<(), AnyError> { + let http_stream = state + .borrow() + .resource_table + .get::<HttpStreamResource>(rid)?; + let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; + let resource = state.borrow().resource_table.get_any(stream)?; + loop { + let body_tx = match &mut *wr { + HttpResponseWriter::Body(body_tx) => body_tx, + HttpResponseWriter::Headers(_) => { + return Err(http_error("no response headers")) + } + HttpResponseWriter::Closed => { + return Err(http_error("response already completed")) + } + }; + + let mut vec = vec![0u8; 64 * 1024]; + let vec_ptr = vec.as_mut_ptr(); + let buf = ZeroCopyBuf::new_temp(vec); + let nread = resource.clone().read(buf).await?; + if nread == 0 { + break; + } + // SAFETY: ZeroCopyBuf keeps the Vec<u8> alive. + let bytes = + Bytes::from_static(unsafe { std::slice::from_raw_parts(vec_ptr, nread) }); + match body_tx.send_data(bytes).await { + Ok(_) => {} + Err(err) => { + // Don't return "channel closed", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + assert!(err.is_closed()); + http_stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + } + } + } + + take(&mut *wr); + Ok(()) +} + +#[op] async fn op_http_write( state: Rc<RefCell<OpState>>, rid: ResourceId, |