diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-12-01 08:56:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-01 08:56:10 -0700 |
commit | e6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch) | |
tree | 57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 | |
parent | 687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff) |
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and
unify implementation with `ext/serve`. This allows us to work in Rust
with resources only.
Two additional changes made to `resourceForReadableStream` were
required:
- Add an optional length to `resourceForReadableStream` which translates
to `size_hint`
- Fix a bug where writing to a closed stream that was full would panic
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | cli/tests/unit/fetch_test.ts | 74 | ||||
-rw-r--r-- | cli/tests/unit/streams_test.ts | 98 | ||||
-rw-r--r-- | ext/fetch/26_fetch.js | 179 | ||||
-rw-r--r-- | ext/fetch/Cargo.toml | 1 | ||||
-rw-r--r-- | ext/fetch/lib.rs | 212 | ||||
-rw-r--r-- | ext/node/ops/http.rs | 25 | ||||
-rw-r--r-- | ext/node/polyfills/_http_outgoing.ts | 3 | ||||
-rw-r--r-- | ext/node/polyfills/http.ts | 36 | ||||
-rw-r--r-- | ext/web/06_streams.js | 8 | ||||
-rw-r--r-- | ext/web/lib.rs | 1 | ||||
-rw-r--r-- | ext/web/stream_resource.rs | 33 |
12 files changed, 312 insertions, 359 deletions
diff --git a/Cargo.lock b/Cargo.lock index 81de0affd..df6308eee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1187,6 +1187,7 @@ dependencies = [ "deno_tls", "dyn-clone", "http", + "pin-project", "reqwest", "serde", "tokio", diff --git a/cli/tests/unit/fetch_test.ts b/cli/tests/unit/fetch_test.ts index 05c04f4ef..d12a93867 100644 --- a/cli/tests/unit/fetch_test.ts +++ b/cli/tests/unit/fetch_test.ts @@ -3,6 +3,7 @@ import { assert, assertEquals, assertRejects, + assertThrows, delay, fail, unimplemented, @@ -523,7 +524,7 @@ Deno.test( ); Deno.test({ permissions: { net: true } }, async function fetchInitBlobBody() { - const data = "const a = 1"; + const data = "const a = 1 🦕"; const blob = new Blob([data], { type: "text/javascript", }); @@ -555,7 +556,32 @@ Deno.test( async function fetchInitFormDataBlobFilenameBody() { const form = new FormData(); form.append("field", "value"); - form.append("file", new Blob([new TextEncoder().encode("deno")])); + form.append( + "file", + new Blob([new TextEncoder().encode("deno")]), + "file name", + ); + const response = await fetch("http://localhost:4545/echo_server", { + method: "POST", + body: form, + }); + const resultForm = await response.formData(); + assertEquals(form.get("field"), resultForm.get("field")); + const file = resultForm.get("file"); + assert(file instanceof File); + assertEquals(file.name, "file name"); + }, +); + +Deno.test( + { permissions: { net: true } }, + async function fetchInitFormDataFileFilenameBody() { + const form = new FormData(); + form.append("field", "value"); + form.append( + "file", + new File([new Blob([new TextEncoder().encode("deno")])], "file name"), + ); const response = await fetch("http://localhost:4545/echo_server", { method: "POST", body: form, @@ -564,7 +590,7 @@ Deno.test( assertEquals(form.get("field"), resultForm.get("field")); const file = resultForm.get("file"); assert(file instanceof File); - assertEquals(file.name, "blob"); + assertEquals(file.name, "file name"); }, ); @@ -1193,10 +1219,8 @@ Deno.test( "accept-encoding: gzip, br\r\n", `host: ${addr}\r\n`, `transfer-encoding: chunked\r\n\r\n`, - "6\r\n", - "hello \r\n", - "5\r\n", - "world\r\n", + "B\r\n", + "hello world\r\n", "0\r\n\r\n", ].join(""); assertEquals(actual, expected); @@ -1259,13 +1283,19 @@ Deno.test( Deno.test( { permissions: { net: true } }, async function fetchNoServerReadableStreamBody() { - const { promise, resolve } = Promise.withResolvers<void>(); + const completed = Promise.withResolvers<void>(); + const failed = Promise.withResolvers<void>(); const body = new ReadableStream({ start(controller) { controller.enqueue(new Uint8Array([1])); - setTimeout(() => { - controller.enqueue(new Uint8Array([2])); - resolve(); + setTimeout(async () => { + // This is technically a race. If the fetch has failed by this point, the enqueue will + // throw. If not, it will succeed. Windows appears to take a while to time out the fetch, + // so we will just wait for that here before we attempt to enqueue so it's consistent + // across platforms. + await failed.promise; + assertThrows(() => controller.enqueue(new Uint8Array([2]))); + completed.resolve(); }, 1000); }, }); @@ -1273,7 +1303,8 @@ Deno.test( await assertRejects(async () => { await fetch(nonExistentHostname, { body, method: "POST" }); }, TypeError); - await promise; + failed.resolve(); + await completed.promise; }, ); @@ -1853,8 +1884,9 @@ Deno.test( async function fetchBlobUrl(): Promise<void> { const blob = new Blob(["ok"], { type: "text/plain" }); const url = URL.createObjectURL(blob); + assert(url.startsWith("blob:"), `URL was ${url}`); const res = await fetch(url); - assert(res.url.startsWith("blob:http://js-unit-tests/")); + assertEquals(res.url, url); assertEquals(res.status, 200); assertEquals(res.headers.get("content-length"), "2"); assertEquals(res.headers.get("content-type"), "text/plain"); @@ -1941,9 +1973,12 @@ Deno.test( }) ); - assert(err instanceof TypeError); - assert(err.cause); - assert(err.cause instanceof Error); + assert(err instanceof TypeError, `err was not a TypeError ${err}`); + assert(err.cause, `err.cause was null ${err}`); + assert( + err.cause instanceof Error, + `err.cause was not an Error ${err.cause}`, + ); assertEquals(err.cause.message, "foo"); await server; @@ -1968,7 +2003,12 @@ Deno.test( method: "POST", signal: controller.signal, }); - controller.abort(); + try { + controller.abort(); + } catch (e) { + console.log(e); + fail("abort should not throw"); + } await promise; }, DOMException, diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts index bb8099efb..c488f214a 100644 --- a/cli/tests/unit/streams_test.ts +++ b/cli/tests/unit/streams_test.ts @@ -190,44 +190,46 @@ Deno.test(async function readableStream() { // Close the stream after reading everything Deno.test(async function readableStreamClose() { - const { promise: cancelPromise, resolve: cancelResolve } = Promise - .withResolvers(); - const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve)); + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream( + helloWorldStream(false, cancel.resolve), + ); const buffer = new Uint8Array(1024); const nread = await core.ops.op_read(rid, buffer); assertEquals(nread, 12); core.ops.op_close(rid); - assertEquals(await cancelPromise, "resource closed"); + assertEquals(await cancel.promise, "resource closed"); }); // Close the stream without reading everything Deno.test(async function readableStreamClosePartialRead() { - const { promise: cancelPromise, resolve: cancelResolve } = Promise - .withResolvers(); - const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve)); + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream( + helloWorldStream(false, cancel.resolve), + ); const buffer = new Uint8Array(5); const nread = await core.ops.op_read(rid, buffer); assertEquals(nread, 5); core.ops.op_close(rid); - assertEquals(await cancelPromise, "resource closed"); + assertEquals(await cancel.promise, "resource closed"); }); // Close the stream without reading anything Deno.test(async function readableStreamCloseWithoutRead() { - const { promise: cancelPromise, resolve: cancelResolve } = Promise - .withResolvers(); - const rid = resourceForReadableStream(helloWorldStream(false, cancelResolve)); + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream( + helloWorldStream(false, cancel.resolve), + ); core.ops.op_close(rid); - assertEquals(await cancelPromise, "resource closed"); + assertEquals(await cancel.promise, "resource closed"); }); // Close the stream without reading anything Deno.test(async function readableStreamCloseWithoutRead2() { - const { promise: cancelPromise, resolve: cancelResolve } = Promise - .withResolvers(); - const rid = resourceForReadableStream(longAsyncStream(cancelResolve)); + const cancel = Promise.withResolvers(); + const rid = resourceForReadableStream(longAsyncStream(cancel.resolve)); core.ops.op_close(rid); - assertEquals(await cancelPromise, "resource closed"); + assertEquals(await cancel.promise, "resource closed"); }); Deno.test(async function readableStreamPartial() { @@ -439,32 +441,38 @@ function createStreamTest( }); } -Deno.test(async function readableStreamWithAggressiveResourceClose() { - let first = true; - const { promise: reasonPromise, resolve: reasonResolve } = Promise - .withResolvers(); - const rid = resourceForReadableStream( - new ReadableStream({ - pull(controller) { - if (first) { - // We queue this up and then immediately close the resource (not the reader) - controller.enqueue(new Uint8Array(1)); - core.close(rid); - // This doesn't throw, even though the resource is closed - controller.enqueue(new Uint8Array(1)); - first = false; - } - }, - cancel(reason) { - reasonResolve(reason); - }, - }), - ); - try { - await core.ops.op_read(rid, new Uint8Array(1)); - fail(); - } catch (e) { - assertEquals(e.message, "operation canceled"); - } - assertEquals(await reasonPromise, "resource closed"); -}); +// 1024 is the size of the internal packet buffer -- we want to make sure we fill the internal pipe fully. +for (const packetCount of [1, 1024]) { + Deno.test(`readableStreamWithAggressiveResourceClose_${packetCount}`, async function () { + let first = true; + const { promise, resolve } = Promise.withResolvers(); + const rid = resourceForReadableStream( + new ReadableStream({ + pull(controller) { + if (first) { + // We queue this up and then immediately close the resource (not the reader) + for (let i = 0; i < packetCount; i++) { + controller.enqueue(new Uint8Array(1)); + } + core.close(rid); + // This doesn't throw, even though the resource is closed + controller.enqueue(new Uint8Array(1)); + first = false; + } + }, + cancel(reason) { + resolve(reason); + }, + }), + ); + try { + for (let i = 0; i < packetCount; i++) { + await core.ops.op_read(rid, new Uint8Array(1)); + } + fail(); + } catch (e) { + assertEquals(e.message, "operation canceled"); + } + assertEquals(await promise, "resource closed"); + }); +} diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index e586d9a3a..8a71d9bcf 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -14,11 +14,12 @@ const core = globalThis.Deno.core; const ops = core.ops; import * as webidl from "ext:deno_webidl/00_webidl.js"; import { byteLowerCase } from "ext:deno_web/00_infra.js"; -import { BlobPrototype } from "ext:deno_web/09_file.js"; import { errorReadableStream, + getReadableStreamResourceBacking, readableStreamForRid, ReadableStreamPrototype, + resourceForReadableStream, } from "ext:deno_web/06_streams.js"; import { extractBody, InnerBody } from "ext:deno_fetch/22_body.js"; import { processUrlList, toInnerRequest } from "ext:deno_fetch/23_request.js"; @@ -37,22 +38,17 @@ const { ArrayPrototypeSplice, ArrayPrototypeFilter, ArrayPrototypeIncludes, + Error, ObjectPrototypeIsPrototypeOf, Promise, PromisePrototypeThen, PromisePrototypeCatch, SafeArrayIterator, - SafeWeakMap, String, StringPrototypeStartsWith, StringPrototypeToLowerCase, TypeError, - Uint8Array, Uint8ArrayPrototype, - WeakMapPrototypeDelete, - WeakMapPrototypeGet, - WeakMapPrototypeHas, - WeakMapPrototypeSet, } = primordials; const REQUEST_BODY_HEADER_NAMES = [ @@ -62,28 +58,9 @@ const REQUEST_BODY_HEADER_NAMES = [ "content-type", ]; -const requestBodyReaders = new SafeWeakMap(); - -/** - * @param {{ method: string, url: string, headers: [string, string][], clientRid: number | null, hasBody: boolean }} args - * @param {Uint8Array | null} body - * @returns {{ requestRid: number, requestBodyRid: number | null, cancelHandleRid: number | null }} - */ -function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) { - return ops.op_fetch( - method, - url, - headers, - clientRid, - hasBody, - bodyLength, - body, - ); -} - /** * @param {number} rid - * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>} + * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number, error: string? }>} */ function opFetchSend(rid) { return core.opAsync("op_fetch_send", rid); @@ -145,154 +122,59 @@ async function mainFetch(req, recursive, terminator) { /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ let reqBody = null; - - if (req.body !== null) { - if ( - ObjectPrototypeIsPrototypeOf( - ReadableStreamPrototype, - req.body.streamOrStatic, - ) - ) { - if ( - req.body.length === null || - ObjectPrototypeIsPrototypeOf(BlobPrototype, req.body.source) - ) { - reqBody = req.body.stream; + let reqRid = null; + + if (req.body) { + const stream = req.body.streamOrStatic; + const body = stream.body; + + if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) { + reqBody = body; + } else if (typeof body === "string") { + reqBody = core.encode(body); + } else if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) { + const resourceBacking = getReadableStreamResourceBacking(stream); + if (resourceBacking) { + reqRid = resourceBacking.rid; } else { - const reader = req.body.stream.getReader(); - WeakMapPrototypeSet(requestBodyReaders, req, reader); - const r1 = await reader.read(); - if (r1.done) { - reqBody = new Uint8Array(0); - } else { - reqBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); - } - WeakMapPrototypeDelete(requestBodyReaders, req); + reqRid = resourceForReadableStream(stream, req.body.length); } } else { - req.body.streamOrStatic.consumed = true; - reqBody = req.body.streamOrStatic.body; - // TODO(@AaronO): plumb support for StringOrBuffer all the way - reqBody = typeof reqBody === "string" ? core.encode(reqBody) : reqBody; + throw TypeError("invalid body"); } } - const { requestRid, requestBodyRid, cancelHandleRid } = opFetch( + const { requestRid, cancelHandleRid } = ops.op_fetch( req.method, req.currentUrl(), req.headerList, req.clientRid, - reqBody !== null, - req.body?.length, - ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, reqBody) ? reqBody : null, + reqBody !== null || reqRid !== null, + reqBody, + reqRid, ); function onAbort() { if (cancelHandleRid !== null) { core.tryClose(cancelHandleRid); } - if (requestBodyRid !== null) { - core.tryClose(requestBodyRid); - } } terminator[abortSignal.add](onAbort); - - let requestSendError; - let requestSendErrorSet = false; - - async function propagateError(err, message) { - // TODO(lucacasonato): propagate error into response body stream - try { - await core.writeTypeError(requestBodyRid, message); - } catch (err) { - if (!requestSendErrorSet) { - requestSendErrorSet = true; - requestSendError = err; - } - } - if (!requestSendErrorSet) { - requestSendErrorSet = true; - requestSendError = err; - } - } - - if (requestBodyRid !== null) { - if ( - reqBody === null || - !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, reqBody) - ) { - throw new TypeError("Unreachable"); - } - const reader = reqBody.getReader(); - WeakMapPrototypeSet(requestBodyReaders, req, reader); - (async () => { - let done = false; - while (!done) { - let val; - try { - const res = await reader.read(); - done = res.done; - val = res.value; - } catch (err) { - if (terminator.aborted) break; - await propagateError(err, "failed to read"); - break; - } - if (done) break; - if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, val)) { - const error = new TypeError( - "Item in request body ReadableStream is not a Uint8Array", - ); - await reader.cancel(error); - await propagateError(error, error.message); - break; - } - try { - await core.writeAll(requestBodyRid, val); - } catch (err) { - if (terminator.aborted) break; - await reader.cancel(err); - await propagateError(err, "failed to write"); - break; - } - } - if (done && !terminator.aborted) { - try { - await core.shutdown(requestBodyRid); - } catch (err) { - if (!terminator.aborted) { - await propagateError(err, "failed to flush"); - } - } - } - WeakMapPrototypeDelete(requestBodyReaders, req); - reader.releaseLock(); - core.tryClose(requestBodyRid); - })(); - } let resp; try { resp = await opFetchSend(requestRid); } catch (err) { if (terminator.aborted) return; - if (requestSendErrorSet) { - // if the request body stream errored, we want to propagate that error - // instead of the original error from opFetchSend - throw new TypeError("Failed to fetch: request body stream errored", { - cause: requestSendError, - }); - } - if (requestBodyRid !== null) { - core.tryClose(requestBodyRid); - } throw err; } finally { if (cancelHandleRid !== null) { core.tryClose(cancelHandleRid); } } + // Re-throw any body errors + if (resp.error) { + throw new TypeError("body failed", { cause: new Error(resp.error) }); + } if (terminator.aborted) return abortedNetworkError(); processUrlList(req.urlList, req.urlListProcessed); @@ -510,9 +392,8 @@ function fetch(input, init = {}) { function abortFetch(request, responseObject, error) { if (request.body !== null) { - if (WeakMapPrototypeHas(requestBodyReaders, request)) { - WeakMapPrototypeGet(requestBodyReaders, request).cancel(error); - } else { + // Cancel the body if we haven't taken it as a resource yet + if (!request.body.streamOrStatic.locked) { request.body.cancel(error); } } diff --git a/ext/fetch/Cargo.toml b/ext/fetch/Cargo.toml index 901e9ce5f..ede514b97 100644 --- a/ext/fetch/Cargo.toml +++ b/ext/fetch/Cargo.toml @@ -20,6 +20,7 @@ deno_core.workspace = true deno_tls.workspace = true dyn-clone = "1" http.workspace = true +pin-project.workspace = true reqwest.workspace = true serde.workspace = true tokio.workspace = true diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 7cde5584f..6e1ecb5e4 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -11,6 +11,8 @@ use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use deno_core::anyhow::Error; use deno_core::error::type_error; @@ -21,13 +23,11 @@ use deno_core::futures::FutureExt; use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::op2; -use deno_core::BufView; -use deno_core::WriteOutcome; - use deno_core::unsync::spawn; use deno_core::url::Url; use deno_core::AsyncRefCell; use deno_core::AsyncResult; +use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -62,7 +62,6 @@ use serde::Deserialize; use serde::Serialize; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; -use tokio::sync::mpsc; // Re-export reqwest and data_url pub use data_url; @@ -184,7 +183,6 @@ pub fn get_declaration() -> PathBuf { #[serde(rename_all = "camelCase")] pub struct FetchReturn { pub request_rid: ResourceId, - pub request_body_rid: Option<ResourceId>, pub cancel_handle_rid: Option<ResourceId>, } @@ -216,6 +214,59 @@ pub fn get_or_create_client_from_state( } } +#[allow(clippy::type_complexity)] +pub struct ResourceToBodyAdapter( + Rc<dyn Resource>, + Option<Pin<Box<dyn Future<Output = Result<BufView, Error>>>>>, +); + +impl ResourceToBodyAdapter { + pub fn new(resource: Rc<dyn Resource>) -> Self { + let future = resource.clone().read(64 * 1024); + Self(resource, Some(future)) + } +} + +// SAFETY: we only use this on a single-threaded executor +unsafe impl Send for ResourceToBodyAdapter {} +// SAFETY: we only use this on a single-threaded executor +unsafe impl Sync for ResourceToBodyAdapter {} + +impl Stream for ResourceToBodyAdapter { + type Item = Result<BufView, Error>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let this = self.get_mut(); + if let Some(mut fut) = this.1.take() { + match fut.poll_unpin(cx) { + Poll::Pending => { + this.1 = Some(fut); + Poll::Pending + } + Poll::Ready(res) => match res { + Ok(buf) if buf.is_empty() => Poll::Ready(None), + Ok(_) => { + this.1 = Some(this.0.clone().read(64 * 1024)); + Poll::Ready(Some(res)) + } + _ => Poll::Ready(Some(res)), + }, + } + } else { + Poll::Ready(None) + } + } +} + +impl Drop for ResourceToBodyAdapter { + fn drop(&mut self) { + self.0.clone().close() + } +} + #[op2] #[serde] #[allow(clippy::too_many_arguments)] @@ -226,8 +277,8 @@ pub fn op_fetch<FP>( #[serde] headers: Vec<(ByteString, ByteString)>, #[smi] client_rid: Option<u32>, has_body: bool, - #[number] body_length: Option<u64>, #[buffer] data: Option<JsBuffer>, + #[smi] resource: Option<ResourceId>, ) -> Result<FetchReturn, AnyError> where FP: FetchPermissions + 'static, @@ -244,7 +295,7 @@ where // Check scheme before asking for net permission let scheme = url.scheme(); - let (request_rid, request_body_rid, cancel_handle_rid) = match scheme { + let (request_rid, cancel_handle_rid) = match scheme { "file" => { let path = url.to_file_path().map_err(|_| { type_error("NetworkError when attempting to fetch resource.") @@ -268,7 +319,7 @@ where let maybe_cancel_handle_rid = maybe_cancel_handle .map(|ch| state.resource_table.add(FetchCancelHandle(ch))); - (request_rid, None, maybe_cancel_handle_rid) + (request_rid, maybe_cancel_handle_rid) } "http" | "https" => { let permissions = state.borrow_mut::<FP>(); @@ -282,34 +333,25 @@ where let mut request = client.request(method.clone(), url); - let request_body_rid = if has_body { - match data { - None => { - // If no body is passed, we return a writer for streaming the body. - let (tx, stream) = tokio::sync::mpsc::channel(1); - - // If the size of the body is known, we include a content-length - // header explicitly. - if let Some(body_size) = body_length { - request = - request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) - } - - request = request.body(Body::wrap_stream(FetchBodyStream(stream))); - - let request_body_rid = - state.resource_table.add(FetchRequestBodyResource { - body: AsyncRefCell::new(Some(tx)), - cancel: CancelHandle::default(), - }); - - Some(request_body_rid) - } - Some(data) => { + if has_body { + match (data, resource) { + (Some(data), _) => { // If a body is passed, we use it, and don't return a body for streaming. request = request.body(data.to_vec()); - None } + (_, Some(resource)) => { + let resource = state.resource_table.take_any(resource)?; + match resource.size_hint() { + (body_size, Some(n)) if body_size == n && body_size > 0 => { + request = + request.header(CONTENT_LENGTH, HeaderValue::from(body_size)); + } + _ => {} + } + request = request + .body(Body::wrap_stream(ResourceToBodyAdapter::new(resource))) + } + (None, None) => unreachable!(), } } else { // POST and PUT requests should always have a 0 length content-length, @@ -317,7 +359,6 @@ where if matches!(method, Method::POST | Method::PUT) { request = request.header(CONTENT_LENGTH, HeaderValue::from(0)); } - None }; let mut header_map = HeaderMap::new(); @@ -354,7 +395,7 @@ where .send() .or_cancel(cancel_handle_) .await - .map(|res| res.map_err(|err| type_error(err.to_string()))) + .map(|res| res.map_err(|err| err.into())) }; let request_rid = state @@ -364,7 +405,7 @@ where let cancel_handle_rid = state.resource_table.add(FetchCancelHandle(cancel_handle)); - (request_rid, request_body_rid, Some(cancel_handle_rid)) + (request_rid, Some(cancel_handle_rid)) } "data" => { let data_url = DataUrl::process(url.as_str()) @@ -385,7 +426,7 @@ where .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, None, None) + (request_rid, None) } "blob" => { // Blob URL resolution happens in the JS side of fetch. If we got here is @@ -397,12 +438,11 @@ where Ok(FetchReturn { request_rid, - request_body_rid, cancel_handle_rid, }) } -#[derive(Serialize)] +#[derive(Default, Serialize)] #[serde(rename_all = "camelCase")] pub struct FetchResponse { pub status: u16, @@ -413,6 +453,7 @@ pub struct FetchResponse { pub content_length: Option<u64>, pub remote_addr_ip: Option<String>, pub remote_addr_port: Option<u16>, + pub error: Option<String>, } #[op2(async)] @@ -432,7 +473,29 @@ pub async fn op_fetch_send( let res = match request.0.await { Ok(Ok(res)) => res, - Ok(Err(err)) => return Err(type_error(err.to_string())), + Ok(Err(err)) => { + // We're going to try and rescue the error cause from a stream and return it from this fetch. + // If any error in the chain is a reqwest body error, return that as a special result we can use to + // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`). + // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead + let mut err_ref: &dyn std::error::Error = err.as_ref(); + while let Some(err) = std::error::Error::source(err_ref) { + if let Some(err) = err.downcast_ref::<reqwest::Error>() { + if err.is_body() { + // Extracts the next error cause and uses that for the message + if let Some(err) = std::error::Error::source(err) { + return Ok(FetchResponse { + error: Some(err.to_string()), + ..Default::default() + }); + } + } + } + err_ref = err; + } + + return Err(type_error(err.to_string())); + } Err(_) => return Err(type_error("request was cancelled")), }; @@ -465,6 +528,7 @@ pub async fn op_fetch_send( content_length, remote_addr_ip, remote_addr_port, + error: None, }) } @@ -599,74 +663,6 @@ impl Resource for FetchCancelHandle { } } -/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`]. -pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>); - -impl Stream for FetchBodyStream { - type Item = Result<bytes::Bytes, Error>; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Option<Self::Item>> { - self.0.poll_recv(cx) - } -} - -pub struct FetchRequestBodyResource { - pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>, - pub cancel: CancelHandle, -} - -impl Resource for FetchRequestBodyResource { - fn name(&self) -> Cow<str> { - "fetchRequestBody".into() - } - - fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> { - Box::pin(async move { - let bytes: bytes::Bytes = buf.into(); - let nwritten = bytes.len(); - let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; - let body = (*body).as_ref(); - let cancel = RcRef::map(self, |r| &r.cancel); - let body = body.ok_or(type_error( - "request body receiver not connected (request closed)", - ))?; - body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { - type_error("request body receiver not connected (request closed)") - })?; - Ok(WriteOutcome::Full { nwritten }) - }) - } - - fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> { - async move { - let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; - let body = (*body).as_ref(); - let cancel = RcRef::map(self, |r| &r.cancel); - let body = body.ok_or(type_error( - "request body receiver not connected (request closed)", - ))?; - body.send(Err(error)).or_cancel(cancel).await??; - Ok(()) - } - .boxed_local() - } - - fn shutdown(self: Rc<Self>) -> AsyncResult<()> { - async move { - let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await; - body.take(); - Ok(()) - } - .boxed_local() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel(); - } -} - type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index 40ef6df32..fd593244c 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -4,18 +4,17 @@ use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op2; use deno_core::url::Url; -use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::OpState; +use deno_core::ResourceId; use deno_fetch::get_or_create_client_from_state; -use deno_fetch::FetchBodyStream; use deno_fetch::FetchCancelHandle; -use deno_fetch::FetchRequestBodyResource; use deno_fetch::FetchRequestResource; use deno_fetch::FetchReturn; use deno_fetch::HttpClientResource; +use deno_fetch::ResourceToBodyAdapter; use reqwest::header::HeaderMap; use reqwest::header::HeaderName; use reqwest::header::HeaderValue; @@ -31,7 +30,7 @@ pub fn op_node_http_request<P>( #[string] url: String, #[serde] headers: Vec<(ByteString, ByteString)>, #[smi] client_rid: Option<u32>, - has_body: bool, + #[smi] body: Option<ResourceId>, ) -> Result<FetchReturn, AnyError> where P: crate::NodePermissions + 'static, @@ -63,25 +62,16 @@ where let mut request = client.request(method.clone(), url).headers(header_map); - let request_body_rid = if has_body { - // If no body is passed, we return a writer for streaming the body. - let (tx, stream) = tokio::sync::mpsc::channel(1); - - request = request.body(Body::wrap_stream(FetchBodyStream(stream))); - - let request_body_rid = state.resource_table.add(FetchRequestBodyResource { - body: AsyncRefCell::new(Some(tx)), - cancel: CancelHandle::default(), - }); - - Some(request_body_rid) + if let Some(body) = body { + request = request.body(Body::wrap_stream(ResourceToBodyAdapter::new( + state.resource_table.take_any(body)?, + ))); } else { // POST and PUT requests should always have a 0 length content-length, // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch if matches!(method, Method::POST | Method::PUT) { request = request.header(CONTENT_LENGTH, HeaderValue::from(0)); } - None }; let cancel_handle = CancelHandle::new_rc(); @@ -104,7 +94,6 @@ where Ok(FetchReturn { request_rid, - request_body_rid, cancel_handle_rid: Some(cancel_handle_rid), }) } diff --git a/ext/node/polyfills/_http_outgoing.ts b/ext/node/polyfills/_http_outgoing.ts index 50869ad82..8882ade55 100644 --- a/ext/node/polyfills/_http_outgoing.ts +++ b/ext/node/polyfills/_http_outgoing.ts @@ -4,7 +4,6 @@ // TODO(petamoriken): enable prefer-primordials for node polyfills // deno-lint-ignore-file prefer-primordials -const core = globalThis.__bootstrap.core; import { getDefaultHighWaterMark } from "ext:deno_node/internal/streams/state.mjs"; import assert from "ext:deno_node/internal/assert.mjs"; import EE from "node:events"; @@ -544,7 +543,7 @@ export class OutgoingMessage extends Stream { data = new Uint8Array(data.buffer); } if (data.buffer.byteLength > 0) { - core.writeAll(this._bodyWriteRid, data).then(() => { + this._bodyWriter.write(data).then(() => { callback?.(); this.emit("drain"); }).catch((e) => { diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 475d691cc..a694c9e9b 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -58,6 +58,7 @@ import { createHttpClient } from "ext:deno_fetch/22_http_client.js"; import { headersEntries } from "ext:deno_fetch/20_headers.js"; import { timerId } from "ext:deno_web/03_abort_signal.js"; import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js"; +import { resourceForReadableStream } from "ext:deno_web/06_streams.js"; import { TcpConn } from "ext:deno_net/01_net.js"; enum STATUS_CODES { @@ -586,15 +587,28 @@ class ClientRequest extends OutgoingMessage { const client = this._getClient() ?? createHttpClient({ http2: false }); this._client = client; + if ( + this.method === "POST" || this.method === "PATCH" || this.method === "PUT" + ) { + const { readable, writable } = new TransformStream({ + cancel: (e) => { + this._requestSendError = e; + }, + }); + + this._bodyWritable = writable; + this._bodyWriter = writable.getWriter(); + + this._bodyWriteRid = resourceForReadableStream(readable); + } + this._req = core.ops.op_node_http_request( this.method, url, headers, client.rid, - (this.method === "POST" || this.method === "PATCH" || - this.method === "PUT") && this._contentLength !== 0, + this._bodyWriteRid, ); - this._bodyWriteRid = this._req.requestBodyRid; } _implicitHeader() { @@ -638,23 +652,11 @@ class ClientRequest extends OutgoingMessage { this._implicitHeader(); this._send("", "latin1"); } + this._bodyWriter?.close(); (async () => { try { - const [res, _] = await Promise.all([ - core.opAsync("op_fetch_send", this._req.requestRid), - (async () => { - if (this._bodyWriteRid) { - try { - await core.shutdown(this._bodyWriteRid); - } catch (err) { - this._requestSendError = err; - } - - core.tryClose(this._bodyWriteRid); - } - })(), - ]); + const res = await core.opAsync("op_fetch_send", this._req.requestRid); try { cb?.(); } catch (_) { diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 4f472984d..9fc15d1ad 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -12,6 +12,7 @@ const { op_arraybuffer_was_detached, op_transfer_arraybuffer, op_readable_stream_resource_allocate, + op_readable_stream_resource_allocate_sized, op_readable_stream_resource_get_sink, op_readable_stream_resource_write_error, op_readable_stream_resource_write_buf, @@ -863,13 +864,16 @@ function readableStreamReadFn(reader, sink) { * read operations, and those read operations will be fed by the output of the * ReadableStream source. * @param {ReadableStream<Uint8Array>} stream + * @param {number | undefined} length * @returns {number} */ -function resourceForReadableStream(stream) { +function resourceForReadableStream(stream, length) { const reader = acquireReadableStreamDefaultReader(stream); // Allocate the resource - const rid = op_readable_stream_resource_allocate(); + const rid = typeof length == "number" + ? op_readable_stream_resource_allocate_sized(length) + : op_readable_stream_resource_allocate(); // Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors PromisePrototypeCatch( diff --git a/ext/web/lib.rs b/ext/web/lib.rs index 4e0d97f5c..a68b6344e 100644 --- a/ext/web/lib.rs +++ b/ext/web/lib.rs @@ -91,6 +91,7 @@ deno_core::extension!(deno_web, op_sleep, op_transfer_arraybuffer, stream_resource::op_readable_stream_resource_allocate, + stream_resource::op_readable_stream_resource_allocate_sized, stream_resource::op_readable_stream_resource_get_sink, stream_resource::op_readable_stream_resource_write_error, stream_resource::op_readable_stream_resource_write_buf, diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 93d10f806..8505be01a 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -197,7 +197,14 @@ impl BoundedBufferChannelInner { pub fn write(&mut self, buffer: V8Slice<u8>) -> Result<(), V8Slice<u8>> { let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE; if next_producer_index == self.ring_consumer { - return Err(buffer); + // Note that we may have been allowed to write because of a close/error condition, but the + // underlying channel is actually closed. If this is the case, we return `Ok(())`` and just + // drop the bytes on the floor. + return if self.closed || self.error.is_some() { + Ok(()) + } else { + Err(buffer) + }; } self.current_size += buffer.len(); @@ -336,6 +343,7 @@ struct ReadableStreamResource { channel: BoundedBufferChannel, cancel_handle: CancelHandle, data: ReadableStreamResourceData, + size_hint: (u64, Option<u64>), } impl ReadableStreamResource { @@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource { fn close(self: Rc<Self>) { self.close_channel(); } + + fn size_hint(&self) -> (u64, Option<u64>) { + self.size_hint + } } impl Drop for ReadableStreamResource { @@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId { cancel_handle: Default::default(), channel: BoundedBufferChannel::default(), data: ReadableStreamResourceData { completion }, + size_hint: (0, None), + }; + state.resource_table.add(resource) +} + +/// Allocate a resource that wraps a ReadableStream, with a size hint. +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_allocate_sized( + state: &mut OpState, + #[number] length: u64, +) -> ResourceId { + let completion = CompletionHandle::default(); + let resource = ReadableStreamResource { + read_queue: Default::default(), + cancel_handle: Default::default(), + channel: BoundedBufferChannel::default(), + data: ReadableStreamResourceData { completion }, + size_hint: (length, Some(length)), }; state.resource_table.add(resource) } |