summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-01 08:56:10 -0700
committerGitHub <noreply@github.com>2023-12-01 08:56:10 -0700
commite6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch)
tree57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext
parent687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff)
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and unify implementation with `ext/serve`. This allows us to work in Rust with resources only. Two additional changes made to `resourceForReadableStream` were required: - Add an optional length to `resourceForReadableStream` which translates to `size_hint` - Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext')
-rw-r--r--ext/fetch/26_fetch.js179
-rw-r--r--ext/fetch/Cargo.toml1
-rw-r--r--ext/fetch/lib.rs212
-rw-r--r--ext/node/ops/http.rs25
-rw-r--r--ext/node/polyfills/_http_outgoing.ts3
-rw-r--r--ext/node/polyfills/http.ts36
-rw-r--r--ext/web/06_streams.js8
-rw-r--r--ext/web/lib.rs1
-rw-r--r--ext/web/stream_resource.rs33
9 files changed, 201 insertions, 297 deletions
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js
index e586d9a3a..8a71d9bcf 100644
--- a/ext/fetch/26_fetch.js
+++ b/ext/fetch/26_fetch.js
@@ -14,11 +14,12 @@ const core = globalThis.Deno.core;
const ops = core.ops;
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { byteLowerCase } from "ext:deno_web/00_infra.js";
-import { BlobPrototype } from "ext:deno_web/09_file.js";
import {
errorReadableStream,
+ getReadableStreamResourceBacking,
readableStreamForRid,
ReadableStreamPrototype,
+ resourceForReadableStream,
} from "ext:deno_web/06_streams.js";
import { extractBody, InnerBody } from "ext:deno_fetch/22_body.js";
import { processUrlList, toInnerRequest } from "ext:deno_fetch/23_request.js";
@@ -37,22 +38,17 @@ const {
ArrayPrototypeSplice,
ArrayPrototypeFilter,
ArrayPrototypeIncludes,
+ Error,
ObjectPrototypeIsPrototypeOf,
Promise,
PromisePrototypeThen,
PromisePrototypeCatch,
SafeArrayIterator,
- SafeWeakMap,
String,
StringPrototypeStartsWith,
StringPrototypeToLowerCase,
TypeError,
- Uint8Array,
Uint8ArrayPrototype,
- WeakMapPrototypeDelete,
- WeakMapPrototypeGet,
- WeakMapPrototypeHas,
- WeakMapPrototypeSet,
} = primordials;
const REQUEST_BODY_HEADER_NAMES = [
@@ -62,28 +58,9 @@ const REQUEST_BODY_HEADER_NAMES = [
"content-type",
];
-const requestBodyReaders = new SafeWeakMap();
-
-/**
- * @param {{ method: string, url: string, headers: [string, string][], clientRid: number | null, hasBody: boolean }} args
- * @param {Uint8Array | null} body
- * @returns {{ requestRid: number, requestBodyRid: number | null, cancelHandleRid: number | null }}
- */
-function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) {
- return ops.op_fetch(
- method,
- url,
- headers,
- clientRid,
- hasBody,
- bodyLength,
- body,
- );
-}
-
/**
* @param {number} rid
- * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>}
+ * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number, error: string? }>}
*/
function opFetchSend(rid) {
return core.opAsync("op_fetch_send", rid);
@@ -145,154 +122,59 @@ async function mainFetch(req, recursive, terminator) {
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
let reqBody = null;
-
- if (req.body !== null) {
- if (
- ObjectPrototypeIsPrototypeOf(
- ReadableStreamPrototype,
- req.body.streamOrStatic,
- )
- ) {
- if (
- req.body.length === null ||
- ObjectPrototypeIsPrototypeOf(BlobPrototype, req.body.source)
- ) {
- reqBody = req.body.stream;
+ let reqRid = null;
+
+ if (req.body) {
+ const stream = req.body.streamOrStatic;
+ const body = stream.body;
+
+ if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
+ reqBody = body;
+ } else if (typeof body === "string") {
+ reqBody = core.encode(body);
+ } else if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) {
+ const resourceBacking = getReadableStreamResourceBacking(stream);
+ if (resourceBacking) {
+ reqRid = resourceBacking.rid;
} else {
- const reader = req.body.stream.getReader();
- WeakMapPrototypeSet(requestBodyReaders, req, reader);
- const r1 = await reader.read();
- if (r1.done) {
- reqBody = new Uint8Array(0);
- } else {
- reqBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
- }
- WeakMapPrototypeDelete(requestBodyReaders, req);
+ reqRid = resourceForReadableStream(stream, req.body.length);
}
} else {
- req.body.streamOrStatic.consumed = true;
- reqBody = req.body.streamOrStatic.body;
- // TODO(@AaronO): plumb support for StringOrBuffer all the way
- reqBody = typeof reqBody === "string" ? core.encode(reqBody) : reqBody;
+ throw TypeError("invalid body");
}
}
- const { requestRid, requestBodyRid, cancelHandleRid } = opFetch(
+ const { requestRid, cancelHandleRid } = ops.op_fetch(
req.method,
req.currentUrl(),
req.headerList,
req.clientRid,
- reqBody !== null,
- req.body?.length,
- ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, reqBody) ? reqBody : null,
+ reqBody !== null || reqRid !== null,
+ reqBody,
+ reqRid,
);
function onAbort() {
if (cancelHandleRid !== null) {
core.tryClose(cancelHandleRid);
}
- if (requestBodyRid !== null) {
- core.tryClose(requestBodyRid);
- }
}
terminator[abortSignal.add](onAbort);
-
- let requestSendError;
- let requestSendErrorSet = false;
-
- async function propagateError(err, message) {
- // TODO(lucacasonato): propagate error into response body stream
- try {
- await core.writeTypeError(requestBodyRid, message);
- } catch (err) {
- if (!requestSendErrorSet) {
- requestSendErrorSet = true;
- requestSendError = err;
- }
- }
- if (!requestSendErrorSet) {
- requestSendErrorSet = true;
- requestSendError = err;
- }
- }
-
- if (requestBodyRid !== null) {
- if (
- reqBody === null ||
- !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, reqBody)
- ) {
- throw new TypeError("Unreachable");
- }
- const reader = reqBody.getReader();
- WeakMapPrototypeSet(requestBodyReaders, req, reader);
- (async () => {
- let done = false;
- while (!done) {
- let val;
- try {
- const res = await reader.read();
- done = res.done;
- val = res.value;
- } catch (err) {
- if (terminator.aborted) break;
- await propagateError(err, "failed to read");
- break;
- }
- if (done) break;
- if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, val)) {
- const error = new TypeError(
- "Item in request body ReadableStream is not a Uint8Array",
- );
- await reader.cancel(error);
- await propagateError(error, error.message);
- break;
- }
- try {
- await core.writeAll(requestBodyRid, val);
- } catch (err) {
- if (terminator.aborted) break;
- await reader.cancel(err);
- await propagateError(err, "failed to write");
- break;
- }
- }
- if (done && !terminator.aborted) {
- try {
- await core.shutdown(requestBodyRid);
- } catch (err) {
- if (!terminator.aborted) {
- await propagateError(err, "failed to flush");
- }
- }
- }
- WeakMapPrototypeDelete(requestBodyReaders, req);
- reader.releaseLock();
- core.tryClose(requestBodyRid);
- })();
- }
let resp;
try {
resp = await opFetchSend(requestRid);
} catch (err) {
if (terminator.aborted) return;
- if (requestSendErrorSet) {
- // if the request body stream errored, we want to propagate that error
- // instead of the original error from opFetchSend
- throw new TypeError("Failed to fetch: request body stream errored", {
- cause: requestSendError,
- });
- }
- if (requestBodyRid !== null) {
- core.tryClose(requestBodyRid);
- }
throw err;
} finally {
if (cancelHandleRid !== null) {
core.tryClose(cancelHandleRid);
}
}
+ // Re-throw any body errors
+ if (resp.error) {
+ throw new TypeError("body failed", { cause: new Error(resp.error) });
+ }
if (terminator.aborted) return abortedNetworkError();
processUrlList(req.urlList, req.urlListProcessed);
@@ -510,9 +392,8 @@ function fetch(input, init = {}) {
function abortFetch(request, responseObject, error) {
if (request.body !== null) {
- if (WeakMapPrototypeHas(requestBodyReaders, request)) {
- WeakMapPrototypeGet(requestBodyReaders, request).cancel(error);
- } else {
+ // Cancel the body if we haven't taken it as a resource yet
+ if (!request.body.streamOrStatic.locked) {
request.body.cancel(error);
}
}
diff --git a/ext/fetch/Cargo.toml b/ext/fetch/Cargo.toml
index 901e9ce5f..ede514b97 100644
--- a/ext/fetch/Cargo.toml
+++ b/ext/fetch/Cargo.toml
@@ -20,6 +20,7 @@ deno_core.workspace = true
deno_tls.workspace = true
dyn-clone = "1"
http.workspace = true
+pin-project.workspace = true
reqwest.workspace = true
serde.workspace = true
tokio.workspace = true
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index 7cde5584f..6e1ecb5e4 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -11,6 +11,8 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
use deno_core::anyhow::Error;
use deno_core::error::type_error;
@@ -21,13 +23,11 @@ use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::op2;
-use deno_core::BufView;
-use deno_core::WriteOutcome;
-
use deno_core::unsync::spawn;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
+use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@@ -62,7 +62,6 @@ use serde::Deserialize;
use serde::Serialize;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
-use tokio::sync::mpsc;
// Re-export reqwest and data_url
pub use data_url;
@@ -184,7 +183,6 @@ pub fn get_declaration() -> PathBuf {
#[serde(rename_all = "camelCase")]
pub struct FetchReturn {
pub request_rid: ResourceId,
- pub request_body_rid: Option<ResourceId>,
pub cancel_handle_rid: Option<ResourceId>,
}
@@ -216,6 +214,59 @@ pub fn get_or_create_client_from_state(
}
}
+#[allow(clippy::type_complexity)]
+pub struct ResourceToBodyAdapter(
+ Rc<dyn Resource>,
+ Option<Pin<Box<dyn Future<Output = Result<BufView, Error>>>>>,
+);
+
+impl ResourceToBodyAdapter {
+ pub fn new(resource: Rc<dyn Resource>) -> Self {
+ let future = resource.clone().read(64 * 1024);
+ Self(resource, Some(future))
+ }
+}
+
+// SAFETY: we only use this on a single-threaded executor
+unsafe impl Send for ResourceToBodyAdapter {}
+// SAFETY: we only use this on a single-threaded executor
+unsafe impl Sync for ResourceToBodyAdapter {}
+
+impl Stream for ResourceToBodyAdapter {
+ type Item = Result<BufView, Error>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+ if let Some(mut fut) = this.1.take() {
+ match fut.poll_unpin(cx) {
+ Poll::Pending => {
+ this.1 = Some(fut);
+ Poll::Pending
+ }
+ Poll::Ready(res) => match res {
+ Ok(buf) if buf.is_empty() => Poll::Ready(None),
+ Ok(_) => {
+ this.1 = Some(this.0.clone().read(64 * 1024));
+ Poll::Ready(Some(res))
+ }
+ _ => Poll::Ready(Some(res)),
+ },
+ }
+ } else {
+ Poll::Ready(None)
+ }
+ }
+}
+
+impl Drop for ResourceToBodyAdapter {
+ fn drop(&mut self) {
+ self.0.clone().close()
+ }
+}
+
#[op2]
#[serde]
#[allow(clippy::too_many_arguments)]
@@ -226,8 +277,8 @@ pub fn op_fetch<FP>(
#[serde] headers: Vec<(ByteString, ByteString)>,
#[smi] client_rid: Option<u32>,
has_body: bool,
- #[number] body_length: Option<u64>,
#[buffer] data: Option<JsBuffer>,
+ #[smi] resource: Option<ResourceId>,
) -> Result<FetchReturn, AnyError>
where
FP: FetchPermissions + 'static,
@@ -244,7 +295,7 @@ where
// Check scheme before asking for net permission
let scheme = url.scheme();
- let (request_rid, request_body_rid, cancel_handle_rid) = match scheme {
+ let (request_rid, cancel_handle_rid) = match scheme {
"file" => {
let path = url.to_file_path().map_err(|_| {
type_error("NetworkError when attempting to fetch resource.")
@@ -268,7 +319,7 @@ where
let maybe_cancel_handle_rid = maybe_cancel_handle
.map(|ch| state.resource_table.add(FetchCancelHandle(ch)));
- (request_rid, None, maybe_cancel_handle_rid)
+ (request_rid, maybe_cancel_handle_rid)
}
"http" | "https" => {
let permissions = state.borrow_mut::<FP>();
@@ -282,34 +333,25 @@ where
let mut request = client.request(method.clone(), url);
- let request_body_rid = if has_body {
- match data {
- None => {
- // If no body is passed, we return a writer for streaming the body.
- let (tx, stream) = tokio::sync::mpsc::channel(1);
-
- // If the size of the body is known, we include a content-length
- // header explicitly.
- if let Some(body_size) = body_length {
- request =
- request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
- }
-
- request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
-
- let request_body_rid =
- state.resource_table.add(FetchRequestBodyResource {
- body: AsyncRefCell::new(Some(tx)),
- cancel: CancelHandle::default(),
- });
-
- Some(request_body_rid)
- }
- Some(data) => {
+ if has_body {
+ match (data, resource) {
+ (Some(data), _) => {
// If a body is passed, we use it, and don't return a body for streaming.
request = request.body(data.to_vec());
- None
}
+ (_, Some(resource)) => {
+ let resource = state.resource_table.take_any(resource)?;
+ match resource.size_hint() {
+ (body_size, Some(n)) if body_size == n && body_size > 0 => {
+ request =
+ request.header(CONTENT_LENGTH, HeaderValue::from(body_size));
+ }
+ _ => {}
+ }
+ request = request
+ .body(Body::wrap_stream(ResourceToBodyAdapter::new(resource)))
+ }
+ (None, None) => unreachable!(),
}
} else {
// POST and PUT requests should always have a 0 length content-length,
@@ -317,7 +359,6 @@ where
if matches!(method, Method::POST | Method::PUT) {
request = request.header(CONTENT_LENGTH, HeaderValue::from(0));
}
- None
};
let mut header_map = HeaderMap::new();
@@ -354,7 +395,7 @@ where
.send()
.or_cancel(cancel_handle_)
.await
- .map(|res| res.map_err(|err| type_error(err.to_string())))
+ .map(|res| res.map_err(|err| err.into()))
};
let request_rid = state
@@ -364,7 +405,7 @@ where
let cancel_handle_rid =
state.resource_table.add(FetchCancelHandle(cancel_handle));
- (request_rid, request_body_rid, Some(cancel_handle_rid))
+ (request_rid, Some(cancel_handle_rid))
}
"data" => {
let data_url = DataUrl::process(url.as_str())
@@ -385,7 +426,7 @@ where
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
- (request_rid, None, None)
+ (request_rid, None)
}
"blob" => {
// Blob URL resolution happens in the JS side of fetch. If we got here is
@@ -397,12 +438,11 @@ where
Ok(FetchReturn {
request_rid,
- request_body_rid,
cancel_handle_rid,
})
}
-#[derive(Serialize)]
+#[derive(Default, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FetchResponse {
pub status: u16,
@@ -413,6 +453,7 @@ pub struct FetchResponse {
pub content_length: Option<u64>,
pub remote_addr_ip: Option<String>,
pub remote_addr_port: Option<u16>,
+ pub error: Option<String>,
}
#[op2(async)]
@@ -432,7 +473,29 @@ pub async fn op_fetch_send(
let res = match request.0.await {
Ok(Ok(res)) => res,
- Ok(Err(err)) => return Err(type_error(err.to_string())),
+ Ok(Err(err)) => {
+ // We're going to try and rescue the error cause from a stream and return it from this fetch.
+ // If any error in the chain is a reqwest body error, return that as a special result we can use to
+ // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`).
+ // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead
+ let mut err_ref: &dyn std::error::Error = err.as_ref();
+ while let Some(err) = std::error::Error::source(err_ref) {
+ if let Some(err) = err.downcast_ref::<reqwest::Error>() {
+ if err.is_body() {
+ // Extracts the next error cause and uses that for the message
+ if let Some(err) = std::error::Error::source(err) {
+ return Ok(FetchResponse {
+ error: Some(err.to_string()),
+ ..Default::default()
+ });
+ }
+ }
+ }
+ err_ref = err;
+ }
+
+ return Err(type_error(err.to_string()));
+ }
Err(_) => return Err(type_error("request was cancelled")),
};
@@ -465,6 +528,7 @@ pub async fn op_fetch_send(
content_length,
remote_addr_ip,
remote_addr_port,
+ error: None,
})
}
@@ -599,74 +663,6 @@ impl Resource for FetchCancelHandle {
}
}
-/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`].
-pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>);
-
-impl Stream for FetchBodyStream {
- type Item = Result<bytes::Bytes, Error>;
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
- self.0.poll_recv(cx)
- }
-}
-
-pub struct FetchRequestBodyResource {
- pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>,
- pub cancel: CancelHandle,
-}
-
-impl Resource for FetchRequestBodyResource {
- fn name(&self) -> Cow<str> {
- "fetchRequestBody".into()
- }
-
- fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> {
- Box::pin(async move {
- let bytes: bytes::Bytes = buf.into();
- let nwritten = bytes.len();
- let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
- let body = (*body).as_ref();
- let cancel = RcRef::map(self, |r| &r.cancel);
- let body = body.ok_or(type_error(
- "request body receiver not connected (request closed)",
- ))?;
- body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
- type_error("request body receiver not connected (request closed)")
- })?;
- Ok(WriteOutcome::Full { nwritten })
- })
- }
-
- fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> {
- async move {
- let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
- let body = (*body).as_ref();
- let cancel = RcRef::map(self, |r| &r.cancel);
- let body = body.ok_or(type_error(
- "request body receiver not connected (request closed)",
- ))?;
- body.send(Err(error)).or_cancel(cancel).await??;
- Ok(())
- }
- .boxed_local()
- }
-
- fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
- async move {
- let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
- body.take();
- Ok(())
- }
- .boxed_local()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel();
- }
-}
-
type BytesStream =
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs
index 40ef6df32..fd593244c 100644
--- a/ext/node/ops/http.rs
+++ b/ext/node/ops/http.rs
@@ -4,18 +4,17 @@ use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::url::Url;
-use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
+use deno_core::ResourceId;
use deno_fetch::get_or_create_client_from_state;
-use deno_fetch::FetchBodyStream;
use deno_fetch::FetchCancelHandle;
-use deno_fetch::FetchRequestBodyResource;
use deno_fetch::FetchRequestResource;
use deno_fetch::FetchReturn;
use deno_fetch::HttpClientResource;
+use deno_fetch::ResourceToBodyAdapter;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
@@ -31,7 +30,7 @@ pub fn op_node_http_request<P>(
#[string] url: String,
#[serde] headers: Vec<(ByteString, ByteString)>,
#[smi] client_rid: Option<u32>,
- has_body: bool,
+ #[smi] body: Option<ResourceId>,
) -> Result<FetchReturn, AnyError>
where
P: crate::NodePermissions + 'static,
@@ -63,25 +62,16 @@ where
let mut request = client.request(method.clone(), url).headers(header_map);
- let request_body_rid = if has_body {
- // If no body is passed, we return a writer for streaming the body.
- let (tx, stream) = tokio::sync::mpsc::channel(1);
-
- request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
-
- let request_body_rid = state.resource_table.add(FetchRequestBodyResource {
- body: AsyncRefCell::new(Some(tx)),
- cancel: CancelHandle::default(),
- });
-
- Some(request_body_rid)
+ if let Some(body) = body {
+ request = request.body(Body::wrap_stream(ResourceToBodyAdapter::new(
+ state.resource_table.take_any(body)?,
+ )));
} else {
// POST and PUT requests should always have a 0 length content-length,
// if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
if matches!(method, Method::POST | Method::PUT) {
request = request.header(CONTENT_LENGTH, HeaderValue::from(0));
}
- None
};
let cancel_handle = CancelHandle::new_rc();
@@ -104,7 +94,6 @@ where
Ok(FetchReturn {
request_rid,
- request_body_rid,
cancel_handle_rid: Some(cancel_handle_rid),
})
}
diff --git a/ext/node/polyfills/_http_outgoing.ts b/ext/node/polyfills/_http_outgoing.ts
index 50869ad82..8882ade55 100644
--- a/ext/node/polyfills/_http_outgoing.ts
+++ b/ext/node/polyfills/_http_outgoing.ts
@@ -4,7 +4,6 @@
// TODO(petamoriken): enable prefer-primordials for node polyfills
// deno-lint-ignore-file prefer-primordials
-const core = globalThis.__bootstrap.core;
import { getDefaultHighWaterMark } from "ext:deno_node/internal/streams/state.mjs";
import assert from "ext:deno_node/internal/assert.mjs";
import EE from "node:events";
@@ -544,7 +543,7 @@ export class OutgoingMessage extends Stream {
data = new Uint8Array(data.buffer);
}
if (data.buffer.byteLength > 0) {
- core.writeAll(this._bodyWriteRid, data).then(() => {
+ this._bodyWriter.write(data).then(() => {
callback?.();
this.emit("drain");
}).catch((e) => {
diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts
index 475d691cc..a694c9e9b 100644
--- a/ext/node/polyfills/http.ts
+++ b/ext/node/polyfills/http.ts
@@ -58,6 +58,7 @@ import { createHttpClient } from "ext:deno_fetch/22_http_client.js";
import { headersEntries } from "ext:deno_fetch/20_headers.js";
import { timerId } from "ext:deno_web/03_abort_signal.js";
import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js";
+import { resourceForReadableStream } from "ext:deno_web/06_streams.js";
import { TcpConn } from "ext:deno_net/01_net.js";
enum STATUS_CODES {
@@ -586,15 +587,28 @@ class ClientRequest extends OutgoingMessage {
const client = this._getClient() ?? createHttpClient({ http2: false });
this._client = client;
+ if (
+ this.method === "POST" || this.method === "PATCH" || this.method === "PUT"
+ ) {
+ const { readable, writable } = new TransformStream({
+ cancel: (e) => {
+ this._requestSendError = e;
+ },
+ });
+
+ this._bodyWritable = writable;
+ this._bodyWriter = writable.getWriter();
+
+ this._bodyWriteRid = resourceForReadableStream(readable);
+ }
+
this._req = core.ops.op_node_http_request(
this.method,
url,
headers,
client.rid,
- (this.method === "POST" || this.method === "PATCH" ||
- this.method === "PUT") && this._contentLength !== 0,
+ this._bodyWriteRid,
);
- this._bodyWriteRid = this._req.requestBodyRid;
}
_implicitHeader() {
@@ -638,23 +652,11 @@ class ClientRequest extends OutgoingMessage {
this._implicitHeader();
this._send("", "latin1");
}
+ this._bodyWriter?.close();
(async () => {
try {
- const [res, _] = await Promise.all([
- core.opAsync("op_fetch_send", this._req.requestRid),
- (async () => {
- if (this._bodyWriteRid) {
- try {
- await core.shutdown(this._bodyWriteRid);
- } catch (err) {
- this._requestSendError = err;
- }
-
- core.tryClose(this._bodyWriteRid);
- }
- })(),
- ]);
+ const res = await core.opAsync("op_fetch_send", this._req.requestRid);
try {
cb?.();
} catch (_) {
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 4f472984d..9fc15d1ad 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -12,6 +12,7 @@ const {
op_arraybuffer_was_detached,
op_transfer_arraybuffer,
op_readable_stream_resource_allocate,
+ op_readable_stream_resource_allocate_sized,
op_readable_stream_resource_get_sink,
op_readable_stream_resource_write_error,
op_readable_stream_resource_write_buf,
@@ -863,13 +864,16 @@ function readableStreamReadFn(reader, sink) {
* read operations, and those read operations will be fed by the output of the
* ReadableStream source.
* @param {ReadableStream<Uint8Array>} stream
+ * @param {number | undefined} length
* @returns {number}
*/
-function resourceForReadableStream(stream) {
+function resourceForReadableStream(stream, length) {
const reader = acquireReadableStreamDefaultReader(stream);
// Allocate the resource
- const rid = op_readable_stream_resource_allocate();
+ const rid = typeof length == "number"
+ ? op_readable_stream_resource_allocate_sized(length)
+ : op_readable_stream_resource_allocate();
// Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors
PromisePrototypeCatch(
diff --git a/ext/web/lib.rs b/ext/web/lib.rs
index 4e0d97f5c..a68b6344e 100644
--- a/ext/web/lib.rs
+++ b/ext/web/lib.rs
@@ -91,6 +91,7 @@ deno_core::extension!(deno_web,
op_sleep,
op_transfer_arraybuffer,
stream_resource::op_readable_stream_resource_allocate,
+ stream_resource::op_readable_stream_resource_allocate_sized,
stream_resource::op_readable_stream_resource_get_sink,
stream_resource::op_readable_stream_resource_write_error,
stream_resource::op_readable_stream_resource_write_buf,
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
index 93d10f806..8505be01a 100644
--- a/ext/web/stream_resource.rs
+++ b/ext/web/stream_resource.rs
@@ -197,7 +197,14 @@ impl BoundedBufferChannelInner {
pub fn write(&mut self, buffer: V8Slice<u8>) -> Result<(), V8Slice<u8>> {
let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE;
if next_producer_index == self.ring_consumer {
- return Err(buffer);
+ // Note that we may have been allowed to write because of a close/error condition, but the
+ // underlying channel is actually closed. If this is the case, we return `Ok(())`` and just
+ // drop the bytes on the floor.
+ return if self.closed || self.error.is_some() {
+ Ok(())
+ } else {
+ Err(buffer)
+ };
}
self.current_size += buffer.len();
@@ -336,6 +343,7 @@ struct ReadableStreamResource {
channel: BoundedBufferChannel,
cancel_handle: CancelHandle,
data: ReadableStreamResourceData,
+ size_hint: (u64, Option<u64>),
}
impl ReadableStreamResource {
@@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource {
fn close(self: Rc<Self>) {
self.close_channel();
}
+
+ fn size_hint(&self) -> (u64, Option<u64>) {
+ self.size_hint
+ }
}
impl Drop for ReadableStreamResource {
@@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId {
cancel_handle: Default::default(),
channel: BoundedBufferChannel::default(),
data: ReadableStreamResourceData { completion },
+ size_hint: (0, None),
+ };
+ state.resource_table.add(resource)
+}
+
+/// Allocate a resource that wraps a ReadableStream, with a size hint.
+#[op2(fast)]
+#[smi]
+pub fn op_readable_stream_resource_allocate_sized(
+ state: &mut OpState,
+ #[number] length: u64,
+) -> ResourceId {
+ let completion = CompletionHandle::default();
+ let resource = ReadableStreamResource {
+ read_queue: Default::default(),
+ cancel_handle: Default::default(),
+ channel: BoundedBufferChannel::default(),
+ data: ReadableStreamResourceData { completion },
+ size_hint: (length, Some(length)),
};
state.resource_table.add(resource)
}