summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-08-17 07:52:37 -0600
committerGitHub <noreply@github.com>2023-08-17 07:52:37 -0600
commit23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch)
tree1521ffd2ac5e803224546cb349b3905925b9b5ff /ext
parent0960e895da1275792c1f38999f6a185c864edb3f (diff)
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work. This is a resource wrapper for `ReadableStream`, allowing us to treat all `ReadableStream` instances as resources, and remove special paths in both `fetch` and `serve`. Performance with a ReadableStream response yields ~18% improvement: ``` return new Response(new ReadableStream({ start(controller) { controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100])); controller.close(); } }) ``` This patch: ``` 12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080 Running 10s test @ http://localhost:8080 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 99.96us 100.03us 6.65ms 98.84% Req/Sec 47.73k 2.43k 51.02k 89.11% 959308 requests in 10.10s, 117.10MB read Requests/sec: 94978.71 Transfer/sec: 11.59MB ``` main: ``` Running 10s test @ http://localhost:8080 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 163.03us 685.51us 19.73ms 99.27% Req/Sec 39.50k 3.98k 66.11k 95.52% 789582 requests in 10.10s, 82.83MB read Requests/sec: 78182.65 Transfer/sec: 8.20MB ```
Diffstat (limited to 'ext')
-rw-r--r--ext/http/00_serve.js154
-rw-r--r--ext/http/http_next.rs48
-rw-r--r--ext/http/lib.rs1
-rw-r--r--ext/http/response_body.rs83
-rw-r--r--ext/web/06_streams.js83
-rw-r--r--ext/web/Cargo.toml2
-rw-r--r--ext/web/lib.rs7
-rw-r--r--ext/web/stream_resource.rs274
8 files changed, 419 insertions, 233 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index 3447f48e2..265b79706 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -30,9 +30,9 @@ import {
import {
Deferred,
getReadableStreamResourceBacking,
- readableStreamClose,
readableStreamForRid,
ReadableStreamPrototype,
+ resourceForReadableStream,
} from "ext:deno_web/06_streams.js";
import { listen, TcpConn } from "ext:deno_net/01_net.js";
import { listenTls } from "ext:deno_net/02_tls.js";
@@ -41,10 +41,6 @@ const {
Error,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeCatch,
- SafeSet,
- SafeSetIterator,
- SetPrototypeAdd,
- SetPrototypeDelete,
Symbol,
SymbolFor,
TypeError,
@@ -61,7 +57,6 @@ const {
op_http_set_promise_complete,
op_http_set_response_body_bytes,
op_http_set_response_body_resource,
- op_http_set_response_body_stream,
op_http_set_response_body_text,
op_http_set_response_header,
op_http_set_response_headers,
@@ -339,7 +334,6 @@ class InnerRequest {
class CallbackContext {
abortController;
- responseBodies;
scheme;
fallbackHost;
serverRid;
@@ -352,7 +346,6 @@ class CallbackContext {
{ once: true },
);
this.abortController = new AbortController();
- this.responseBodies = new SafeSet();
this.serverRid = args[0];
this.scheme = args[1];
this.fallbackHost = args[2];
@@ -379,23 +372,24 @@ class ServeHandlerInfo {
}
}
-function fastSyncResponseOrStream(req, respBody) {
+function fastSyncResponseOrStream(req, respBody, status) {
if (respBody === null || respBody === undefined) {
// Don't set the body
- return null;
+ op_http_set_promise_complete(req, status);
+ return;
}
const stream = respBody.streamOrStatic;
const body = stream.body;
if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
- op_http_set_response_body_bytes(req, body);
- return null;
+ op_http_set_response_body_bytes(req, body, status);
+ return;
}
if (typeof body === "string") {
- op_http_set_response_body_text(req, body);
- return null;
+ op_http_set_response_body_text(req, body, status);
+ return;
}
// At this point in the response it needs to be a stream
@@ -408,115 +402,16 @@ function fastSyncResponseOrStream(req, respBody) {
req,
resourceBacking.rid,
resourceBacking.autoClose,
+ status,
+ );
+ } else {
+ const rid = resourceForReadableStream(stream);
+ op_http_set_response_body_resource(
+ req,
+ rid,
+ true,
+ status,
);
- return null;
- }
-
- return stream;
-}
-
-async function asyncResponse(responseBodies, req, status, stream) {
- const reader = stream.getReader();
- let responseRid;
- let closed = false;
- let timeout;
-
- try {
- // IMPORTANT: We get a performance boost from this optimization, but V8 is very
- // sensitive to the order and structure. Benchmark any changes to this code.
-
- // Optimize for streams that are done in zero or one packets. We will not
- // have to allocate a resource in this case.
- const { value: value1, done: done1 } = await reader.read();
- if (done1) {
- closed = true;
- // Exit 1: no response body at all, extreme fast path
- // Reader will be closed by finally block
- return;
- }
-
- // The second value cannot block indefinitely, as someone may be waiting on a response
- // of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms
- // and we race it.
- let timeoutPromise;
- timeout = setTimeout(() => {
- responseRid = op_http_set_response_body_stream(req);
- SetPrototypeAdd(responseBodies, responseRid);
- op_http_set_promise_complete(req, status);
- // TODO(mmastrac): if this promise fails before we get to the await below, it crashes
- // the process with an error:
- //
- // 'Uncaught (in promise) BadResource: failed to write'.
- //
- // To avoid this, we're going to swallow errors here and allow the code later in the
- // file to re-throw them in a way that doesn't appear to be an uncaught promise rejection.
- timeoutPromise = PromisePrototypeCatch(
- core.writeAll(responseRid, value1),
- () => null,
- );
- }, 250);
- const { value: value2, done: done2 } = await reader.read();
-
- if (timeoutPromise) {
- await timeoutPromise;
- if (done2) {
- closed = true;
- // Exit 2(a): read 2 is EOS, and timeout resolved.
- // Reader will be closed by finally block
- // Response stream will be closed by finally block.
- return;
- }
-
- // Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward.
- } else {
- clearTimeout(timeout);
- timeout = undefined;
-
- if (done2) {
- // Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough.
- // Reader will be closed by finally block
- // No response stream
- closed = true;
- op_http_set_response_body_bytes(req, value1);
- return;
- }
-
- responseRid = op_http_set_response_body_stream(req);
- SetPrototypeAdd(responseBodies, responseRid);
- op_http_set_promise_complete(req, status);
- // Write our first packet
- await core.writeAll(responseRid, value1);
- }
-
- await core.writeAll(responseRid, value2);
- while (true) {
- const { value, done } = await reader.read();
- if (done) {
- closed = true;
- break;
- }
- await core.writeAll(responseRid, value);
- }
- } catch (error) {
- closed = true;
- try {
- await reader.cancel(error);
- } catch {
- // Pass
- }
- } finally {
- if (!closed) {
- readableStreamClose(reader);
- }
- if (timeout !== undefined) {
- clearTimeout(timeout);
- }
- if (responseRid) {
- core.tryClose(responseRid);
- SetPrototypeDelete(responseBodies, responseRid);
- } else {
- op_http_set_promise_complete(req, status);
- }
}
}
@@ -528,7 +423,6 @@ async function asyncResponse(responseBodies, req, status, stream) {
* This function returns a promise that will only reject in the case of abnormal exit.
*/
function mapToCallback(context, callback, onError) {
- const responseBodies = context.responseBodies;
const signal = context.abortController.signal;
const hasCallback = callback.length > 0;
const hasOneCallback = callback.length === 1;
@@ -591,15 +485,7 @@ function mapToCallback(context, callback, onError) {
}
}
- // Attempt to respond quickly to this request, otherwise extract the stream
- const stream = fastSyncResponseOrStream(req, inner.body);
- if (stream !== null) {
- // Handle the stream asynchronously
- await asyncResponse(responseBodies, req, status, stream);
- } else {
- op_http_set_promise_complete(req, status);
- }
-
+ fastSyncResponseOrStream(req, inner.body, status);
innerRequest?.close();
};
}
@@ -755,10 +641,6 @@ function serveHttpOn(context, callback) {
}
PromisePrototypeCatch(callback(req), promiseErrorHandler);
}
-
- for (const streamRid of new SafeSetIterator(context.responseBodies)) {
- core.tryClose(streamRid);
- }
})();
return {
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 2e9b315ca..60ef83b0f 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -10,7 +10,6 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
-use crate::response_body::V8StreamHttpResponseBody;
use crate::slab::slab_drop;
use crate::slab::slab_get;
use crate::slab::slab_init;
@@ -30,6 +29,7 @@ use deno_core::task::JoinHandle;
use deno_core::v8;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
+use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@@ -573,6 +573,7 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
fn set_response(
slab_id: SlabId,
length: Option<usize>,
+ status: u16,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
let mut http = slab_get(slab_id);
@@ -583,7 +584,14 @@ fn set_response(
length,
response.headers_mut(),
);
- response.body_mut().initialize(response_fn(compression))
+ response.body_mut().initialize(response_fn(compression));
+
+ // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
+ // will quitely ignore invalid values.
+ if let Ok(code) = StatusCode::from_u16(status) {
+ *response.status_mut() = code;
+ }
+ http.complete();
}
#[op2(fast)]
@@ -592,6 +600,7 @@ pub fn op_http_set_response_body_resource(
#[smi] slab_id: SlabId,
#[smi] stream_rid: ResourceId,
auto_close: bool,
+ status: u16,
) -> Result<(), AnyError> {
// If the stream is auto_close, we will hold the last ref to it until the response is complete.
let resource = if auto_close {
@@ -603,6 +612,7 @@ pub fn op_http_set_response_body_resource(
set_response(
slab_id,
resource.size_hint().1.map(|s| s as usize),
+ status,
move |compression| {
ResponseBytesInner::from_resource(compression, resource, auto_close)
},
@@ -612,42 +622,34 @@ pub fn op_http_set_response_body_resource(
}
#[op2(fast)]
-#[smi]
-pub fn op_http_set_response_body_stream(
- state: &mut OpState,
- #[smi] slab_id: SlabId,
-) -> Result<ResourceId, AnyError> {
- // TODO(mmastrac): what should this channel size be?
- let (tx, rx) = tokio::sync::mpsc::channel(1);
- set_response(slab_id, None, |compression| {
- ResponseBytesInner::from_v8(compression, rx)
- });
-
- Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx)))
-}
-
-#[op2(fast)]
pub fn op_http_set_response_body_text(
#[smi] slab_id: SlabId,
#[string] text: String,
+ status: u16,
) {
if !text.is_empty() {
- set_response(slab_id, Some(text.len()), |compression| {
+ set_response(slab_id, Some(text.len()), status, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
+ } else {
+ op_http_set_promise_complete::call(slab_id, status);
}
}
-#[op2(fast)]
+// Skipping `fast` because we prefer an owned buffer here.
+#[op2]
pub fn op_http_set_response_body_bytes(
#[smi] slab_id: SlabId,
- #[buffer] buffer: &[u8],
+ #[buffer] buffer: JsBuffer,
+ status: u16,
) {
if !buffer.is_empty() {
- set_response(slab_id, Some(buffer.len()), |compression| {
- ResponseBytesInner::from_slice(compression, buffer)
+ set_response(slab_id, Some(buffer.len()), status, |compression| {
+ ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
- };
+ } else {
+ op_http_set_promise_complete::call(slab_id, status);
+ }
}
#[op2(async)]
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 8060b5a1e..e0c5c89d0 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -115,7 +115,6 @@ deno_core::extension!(
http_next::op_http_set_promise_complete,
http_next::op_http_set_response_body_bytes,
http_next::op_http_set_response_body_resource,
- http_next::op_http_set_response_body_stream,
http_next::op_http_set_response_body_text,
http_next::op_http_set_response_header,
http_next::op_http_set_response_headers,
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index 3697b2732..bd9d6f433 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -1,5 +1,4 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
use std::io::Write;
@@ -11,18 +10,12 @@ use brotli::enc::encode::BrotliEncoderParameter;
use brotli::ffi::compressor::BrotliEncoderState;
use bytes::Bytes;
use bytes::BytesMut;
-use deno_core::error::bad_resource;
use deno_core::error::AnyError;
use deno_core::futures::ready;
use deno_core::futures::FutureExt;
-use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
-use deno_core::CancelHandle;
-use deno_core::CancelTryFuture;
-use deno_core::RcRef;
use deno_core::Resource;
-use deno_core::WriteOutcome;
use flate2::write::GzEncoder;
use http::HeaderMap;
use hyper1::body::Body;
@@ -126,8 +119,8 @@ pub enum Compression {
pub enum ResponseStream {
/// A resource stream, piped in fast mode.
Resource(ResourceBodyAdapter),
- /// A JS-backed stream, written in JS and transported via pipe.
- V8Stream(tokio::sync::mpsc::Receiver<BufView>),
+ #[cfg(test)]
+ TestChannel(tokio::sync::mpsc::Receiver<BufView>),
}
#[derive(Default)]
@@ -217,13 +210,6 @@ impl ResponseBytesInner {
}
}
- pub fn from_v8(
- compression: Compression,
- rx: tokio::sync::mpsc::Receiver<BufView>,
- ) -> Self {
- Self::from_stream(compression, ResponseStream::V8Stream(rx))
- }
-
pub fn from_resource(
compression: Compression,
stm: Rc<dyn Resource>,
@@ -235,12 +221,12 @@ impl ResponseBytesInner {
)
}
- pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self {
+ pub fn from_bufview(compression: Compression, buf: BufView) -> Self {
match compression {
Compression::GZip => {
let mut writer =
GzEncoder::new(Vec::new(), flate2::Compression::fast());
- writer.write_all(bytes).unwrap();
+ writer.write_all(&buf).unwrap();
Self::Bytes(BufView::from(writer.finish().unwrap()))
}
Compression::Brotli => {
@@ -251,11 +237,11 @@ impl ResponseBytesInner {
// (~4MB)
let mut writer =
brotli::CompressorWriter::new(Vec::new(), 65 * 1024, 6, 22);
- writer.write_all(bytes).unwrap();
+ writer.write_all(&buf).unwrap();
writer.flush().unwrap();
Self::Bytes(BufView::from(writer.into_inner()))
}
- _ => Self::Bytes(BufView::from(bytes.to_vec())),
+ _ => Self::Bytes(buf),
}
}
@@ -368,14 +354,16 @@ impl PollFrame for ResponseStream {
) -> std::task::Poll<ResponseStreamResult> {
match &mut *self {
ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx),
- ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx),
+ #[cfg(test)]
+ ResponseStream::TestChannel(rx) => Pin::new(rx).poll_frame(cx),
}
}
fn size_hint(&self) -> SizeHint {
match self {
ResponseStream::Resource(res) => res.size_hint(),
- ResponseStream::V8Stream(res) => res.size_hint(),
+ #[cfg(test)]
+ ResponseStream::TestChannel(_) => SizeHint::default(),
}
}
}
@@ -414,6 +402,7 @@ impl PollFrame for ResourceBodyAdapter {
}
}
+#[cfg(test)]
impl PollFrame for tokio::sync::mpsc::Receiver<BufView> {
fn poll_frame(
mut self: Pin<&mut Self>,
@@ -761,52 +750,6 @@ impl PollFrame for BrotliResponseStream {
}
}
-/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which
-/// feed's hyper's HTTP response.
-pub struct V8StreamHttpResponseBody(
- AsyncRefCell<Option<tokio::sync::mpsc::Sender<BufView>>>,
- CancelHandle,
-);
-
-impl V8StreamHttpResponseBody {
- pub fn new(sender: tokio::sync::mpsc::Sender<BufView>) -> Self {
- Self(AsyncRefCell::new(Some(sender)), CancelHandle::default())
- }
-}
-
-impl Resource for V8StreamHttpResponseBody {
- fn name(&self) -> Cow<str> {
- "responseBody".into()
- }
-
- fn write(
- self: Rc<Self>,
- buf: BufView,
- ) -> AsyncResult<deno_core::WriteOutcome> {
- let cancel_handle = RcRef::map(&self, |this| &this.1);
- Box::pin(
- async move {
- let nwritten = buf.len();
-
- let res = RcRef::map(self, |this| &this.0).borrow().await;
- if let Some(tx) = res.as_ref() {
- tx.send(buf)
- .await
- .map_err(|_| bad_resource("failed to write"))?;
- Ok(WriteOutcome::Full { nwritten })
- } else {
- Err(bad_resource("failed to write"))
- }
- }
- .try_or_cancel(cancel_handle),
- )
- }
-
- fn close(self: Rc<Self>) {
- self.1.cancel();
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -892,7 +835,7 @@ mod tests {
expected.extend(v);
}
let (tx, rx) = tokio::sync::mpsc::channel(1);
- let underlying = ResponseStream::V8Stream(rx);
+ let underlying = ResponseStream::TestChannel(rx);
let mut resp = GZipResponseStream::new(underlying);
let handle = tokio::task::spawn(async move {
for chunk in v {
@@ -934,7 +877,7 @@ mod tests {
expected.extend(v);
}
let (tx, rx) = tokio::sync::mpsc::channel(1);
- let underlying = ResponseStream::V8Stream(rx);
+ let underlying = ResponseStream::TestChannel(rx);
let mut resp = BrotliResponseStream::new(underlying);
let handle = tokio::task::spawn(async move {
for chunk in v {
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 01f84aa2c..0849d221d 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -1,4 +1,5 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// deno-lint-ignore-file camelcase
// @ts-check
/// <reference path="../webidl/internal.d.ts" />
@@ -7,7 +8,17 @@
/// <reference lib="esnext" />
const core = globalThis.Deno.core;
-const ops = core.ops;
+const internals = globalThis.__bootstrap.internals;
+const {
+ op_arraybuffer_was_detached,
+ op_transfer_arraybuffer,
+ op_readable_stream_resource_allocate,
+ op_readable_stream_resource_get_sink,
+ op_readable_stream_resource_write_error,
+ op_readable_stream_resource_write_buf,
+ op_readable_stream_resource_close,
+ op_readable_stream_resource_await_close,
+} = core.ensureFastOps();
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { structuredClone } from "ext:deno_web/02_structured_clone.js";
import {
@@ -61,6 +72,7 @@ const {
SafeWeakMap,
// TODO(lucacasonato): add SharedArrayBuffer to primordials
// SharedArrayBufferPrototype,
+ String,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
@@ -218,7 +230,7 @@ function isDetachedBuffer(O) {
return false;
}
return ArrayBufferPrototypeGetByteLength(O) === 0 &&
- ops.op_arraybuffer_was_detached(O);
+ op_arraybuffer_was_detached(O);
}
/**
@@ -244,7 +256,7 @@ function canTransferArrayBuffer(O) {
* @returns {ArrayBufferLike}
*/
function transferArrayBuffer(O) {
- return ops.op_transfer_arraybuffer(O);
+ return op_transfer_arraybuffer(O);
}
/**
@@ -695,6 +707,68 @@ function isReadableStreamDisturbed(stream) {
return stream[_disturbed];
}
+/**
+ * Create a new resource that wraps a ReadableStream. The resource will support
+ * read operations, and those read operations will be fed by the output of the
+ * ReadableStream source.
+ * @param {ReadableStream<Uint8Array>} stream
+ * @returns {number}
+ */
+function resourceForReadableStream(stream) {
+ const reader = acquireReadableStreamDefaultReader(stream);
+
+ // Allocate the resource
+ const rid = op_readable_stream_resource_allocate();
+
+ // Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors
+ PromisePrototypeCatch(
+ PromisePrototypeThen(
+ op_readable_stream_resource_await_close(rid),
+ () => reader.cancel(),
+ ),
+ () => {},
+ );
+
+ // The ops here look like op_write_all/op_close, but we're not actually writing to a
+ // real resource.
+ (async () => {
+ try {
+ // This allocation is freed in the finally block below, guaranteeing it won't leak
+ const sink = op_readable_stream_resource_get_sink(rid);
+ try {
+ while (true) {
+ let value;
+ try {
+ const read = await reader.read();
+ value = read.value;
+ if (read.done) {
+ break;
+ }
+ } catch (err) {
+ const message = err.message;
+ if (message) {
+ await op_readable_stream_resource_write_error(sink, err.message);
+ } else {
+ await op_readable_stream_resource_write_error(sink, String(err));
+ }
+ break;
+ }
+ // If the chunk has non-zero length, write it
+ if (value.length > 0) {
+ await op_readable_stream_resource_write_buf(sink, value);
+ }
+ }
+ } finally {
+ op_readable_stream_resource_close(sink);
+ }
+ } catch (err) {
+ // Something went terribly wrong with this stream -- log and continue
+ console.error("Unexpected internal error on stream", err);
+ }
+ })();
+ return rid;
+}
+
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
// A finalization registry to clean up underlying resources that are GC'ed.
@@ -6454,6 +6528,8 @@ webidl.converters.StreamPipeOptions = webidl
{ key: "signal", converter: webidl.converters.AbortSignal },
]);
+internals.resourceForReadableStream = resourceForReadableStream;
+
export {
// Non-Public
_state,
@@ -6482,6 +6558,7 @@ export {
ReadableStreamPrototype,
readableStreamTee,
readableStreamThrowIfErrored,
+ resourceForReadableStream,
TransformStream,
TransformStreamDefaultController,
WritableStream,
diff --git a/ext/web/Cargo.toml b/ext/web/Cargo.toml
index dbc2df8c0..b923bc95e 100644
--- a/ext/web/Cargo.toml
+++ b/ext/web/Cargo.toml
@@ -16,9 +16,11 @@ path = "lib.rs"
[dependencies]
async-trait.workspace = true
base64-simd = "0.8"
+bytes.workspace = true
deno_core.workspace = true
encoding_rs.workspace = true
flate2.workspace = true
+futures.workspace = true
serde = "1.0.149"
tokio.workspace = true
uuid = { workspace = true, features = ["serde"] }
diff --git a/ext/web/lib.rs b/ext/web/lib.rs
index 374815804..88937efb2 100644
--- a/ext/web/lib.rs
+++ b/ext/web/lib.rs
@@ -4,6 +4,7 @@ mod blob;
mod compression;
mod hr_timer_lock;
mod message_port;
+mod stream_resource;
mod timers;
use deno_core::error::range_error;
@@ -90,6 +91,12 @@ deno_core::extension!(deno_web,
op_cancel_handle,
op_sleep,
op_transfer_arraybuffer,
+ stream_resource::op_readable_stream_resource_allocate,
+ stream_resource::op_readable_stream_resource_get_sink,
+ stream_resource::op_readable_stream_resource_write_error,
+ stream_resource::op_readable_stream_resource_write_buf,
+ stream_resource::op_readable_stream_resource_close,
+ stream_resource::op_readable_stream_resource_await_close,
],
esm = [
"00_infra.js",
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
new file mode 100644
index 000000000..4c2a75648
--- /dev/null
+++ b/ext/web/stream_resource.rs
@@ -0,0 +1,274 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use deno_core::anyhow::Error;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op2;
+use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::BufView;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
+use deno_core::JsBuffer;
+use deno_core::OpState;
+use deno_core::RcLike;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use futures::stream::Peekable;
+use futures::Stream;
+use futures::StreamExt;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::ffi::c_void;
+use std::future::Future;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+use std::task::Waker;
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::mpsc::Sender;
+
+type SenderCell = RefCell<Option<Sender<Result<BufView, Error>>>>;
+
+// This indirection allows us to more easily integrate the fast streams work at a later date
+#[repr(transparent)]
+struct ChannelStreamAdapter<C>(C);
+
+impl<C> Stream for ChannelStreamAdapter<C>
+where
+ C: ChannelBytesRead,
+{
+ type Item = Result<BufView, AnyError>;
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ self.0.poll_recv(cx)
+ }
+}
+
+pub trait ChannelBytesRead: Unpin + 'static {
+ fn poll_recv(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<BufView, AnyError>>>;
+}
+
+impl ChannelBytesRead for tokio::sync::mpsc::Receiver<Result<BufView, Error>> {
+ fn poll_recv(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<BufView, AnyError>>> {
+ self.poll_recv(cx)
+ }
+}
+
+#[allow(clippy::type_complexity)]
+struct ReadableStreamResource {
+ reader: AsyncRefCell<
+ Peekable<ChannelStreamAdapter<Receiver<Result<BufView, Error>>>>,
+ >,
+ cancel_handle: CancelHandle,
+ data: ReadableStreamResourceData,
+}
+
+impl ReadableStreamResource {
+ pub fn cancel_handle(self: &Rc<Self>) -> impl RcLike<CancelHandle> {
+ RcRef::map(self, |s| &s.cancel_handle).clone()
+ }
+
+ async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> {
+ let cancel_handle = self.cancel_handle();
+ let peekable = RcRef::map(self, |this| &this.reader);
+ let mut peekable = peekable.borrow_mut().await;
+ match Pin::new(&mut *peekable)
+ .peek_mut()
+ .or_cancel(cancel_handle)
+ .await?
+ {
+ None => Ok(BufView::empty()),
+ // Take the actual error since we only have a reference to it
+ Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()),
+ Some(Ok(bytes)) => {
+ if bytes.len() <= limit {
+ // We can safely take the next item since we peeked it
+ return peekable.next().await.unwrap();
+ }
+ // The remainder of the bytes after we split it is still left in the peek buffer
+ let ret = bytes.split_to(limit);
+ Ok(ret)
+ }
+ }
+ }
+}
+
+impl Resource for ReadableStreamResource {
+ fn name(&self) -> Cow<str> {
+ Cow::Borrowed("readableStream")
+ }
+
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
+ Box::pin(ReadableStreamResource::read(self, limit))
+ }
+}
+
+// TODO(mmastrac): Move this to deno_core
+#[derive(Clone, Debug, Default)]
+pub struct CompletionHandle {
+ inner: Rc<RefCell<CompletionHandleInner>>,
+}
+
+#[derive(Debug, Default)]
+struct CompletionHandleInner {
+ complete: bool,
+ success: bool,
+ waker: Option<Waker>,
+}
+
+impl CompletionHandle {
+ pub fn complete(&self, success: bool) {
+ let mut mut_self = self.inner.borrow_mut();
+ mut_self.complete = true;
+ mut_self.success = success;
+ if let Some(waker) = mut_self.waker.take() {
+ drop(mut_self);
+ waker.wake();
+ }
+ }
+}
+
+impl Future for CompletionHandle {
+ type Output = bool;
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ let mut mut_self = self.inner.borrow_mut();
+ if mut_self.complete {
+ return std::task::Poll::Ready(mut_self.success);
+ }
+
+ mut_self.waker = Some(cx.waker().clone());
+ std::task::Poll::Pending
+ }
+}
+
+fn sender_closed() -> Error {
+ type_error("sender closed")
+}
+
+/// Allocate a resource that wraps a ReadableStream.
+#[op2(fast)]
+#[smi]
+pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId {
+ let (tx, rx) = tokio::sync::mpsc::channel(1);
+ let tx = RefCell::new(Some(tx));
+ let completion = CompletionHandle::default();
+ let tx = Box::new(tx);
+ let resource = ReadableStreamResource {
+ cancel_handle: Default::default(),
+ reader: AsyncRefCell::new(ChannelStreamAdapter(rx).peekable()),
+ data: ReadableStreamResourceData {
+ tx: Box::into_raw(tx),
+ completion,
+ },
+ };
+ state.resource_table.add(resource)
+}
+
+#[op2(fast)]
+pub fn op_readable_stream_resource_get_sink(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+) -> *const c_void {
+ let Ok(resource) = state.resource_table.get::<ReadableStreamResource>(rid) else {
+ return std::ptr::null();
+ };
+ resource.data.tx as _
+}
+
+fn get_sender(sender: *const c_void) -> Option<Sender<Result<BufView, Error>>> {
+ // SAFETY: We know this is a valid v8::External
+ unsafe {
+ (sender as *const SenderCell)
+ .as_ref()
+ .and_then(|r| r.borrow_mut().as_ref().cloned())
+ }
+}
+
+fn drop_sender(sender: *const c_void) {
+ // SAFETY: We know this is a valid v8::External
+ unsafe {
+ assert!(!sender.is_null());
+ _ = Box::from_raw(sender as *mut SenderCell);
+ }
+}
+
+#[op2(async)]
+pub fn op_readable_stream_resource_write_buf(
+ sender: *const c_void,
+ #[buffer] buffer: JsBuffer,
+) -> impl Future<Output = Result<(), Error>> {
+ let sender = get_sender(sender);
+ async move {
+ let sender = sender.ok_or_else(sender_closed)?;
+ sender
+ .send(Ok(buffer.into()))
+ .await
+ .map_err(|_| sender_closed())?;
+ Ok(())
+ }
+}
+
+#[op2(async)]
+pub fn op_readable_stream_resource_write_error(
+ sender: *const c_void,
+ #[string] error: String,
+) -> impl Future<Output = Result<(), Error>> {
+ let sender = get_sender(sender);
+ async move {
+ let sender = sender.ok_or_else(sender_closed)?;
+ sender
+ .send(Err(type_error(Cow::Owned(error))))
+ .await
+ .map_err(|_| sender_closed())?;
+ Ok(())
+ }
+}
+
+#[op2(fast)]
+#[smi]
+pub fn op_readable_stream_resource_close(sender: *const c_void) {
+ drop_sender(sender);
+}
+
+#[op2(async)]
+pub fn op_readable_stream_resource_await_close(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+) -> impl Future<Output = ()> {
+ let completion = state
+ .resource_table
+ .get::<ReadableStreamResource>(rid)
+ .ok()
+ .map(|r| r.data.completion.clone());
+
+ async move {
+ if let Some(completion) = completion {
+ completion.await;
+ }
+ }
+}
+
+struct ReadableStreamResourceData {
+ tx: *const SenderCell,
+ completion: CompletionHandle,
+}
+
+impl Drop for ReadableStreamResourceData {
+ fn drop(&mut self) {
+ self.completion.complete(true);
+ }
+}