diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-08-17 07:52:37 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-17 07:52:37 -0600 |
commit | 23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch) | |
tree | 1521ffd2ac5e803224546cb349b3905925b9b5ff /ext | |
parent | 0960e895da1275792c1f38999f6a185c864edb3f (diff) |
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work.
This is a resource wrapper for `ReadableStream`, allowing us to treat
all `ReadableStream` instances as resources, and remove special paths in
both `fetch` and `serve`.
Performance with a ReadableStream response yields ~18% improvement:
```
return new Response(new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]));
controller.close();
}
})
```
This patch:
```
12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 99.96us 100.03us 6.65ms 98.84%
Req/Sec 47.73k 2.43k 51.02k 89.11%
959308 requests in 10.10s, 117.10MB read
Requests/sec: 94978.71
Transfer/sec: 11.59MB
```
main:
```
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 163.03us 685.51us 19.73ms 99.27%
Req/Sec 39.50k 3.98k 66.11k 95.52%
789582 requests in 10.10s, 82.83MB read
Requests/sec: 78182.65
Transfer/sec: 8.20MB
```
Diffstat (limited to 'ext')
-rw-r--r-- | ext/http/00_serve.js | 154 | ||||
-rw-r--r-- | ext/http/http_next.rs | 48 | ||||
-rw-r--r-- | ext/http/lib.rs | 1 | ||||
-rw-r--r-- | ext/http/response_body.rs | 83 | ||||
-rw-r--r-- | ext/web/06_streams.js | 83 | ||||
-rw-r--r-- | ext/web/Cargo.toml | 2 | ||||
-rw-r--r-- | ext/web/lib.rs | 7 | ||||
-rw-r--r-- | ext/web/stream_resource.rs | 274 |
8 files changed, 419 insertions, 233 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 3447f48e2..265b79706 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -30,9 +30,9 @@ import { import { Deferred, getReadableStreamResourceBacking, - readableStreamClose, readableStreamForRid, ReadableStreamPrototype, + resourceForReadableStream, } from "ext:deno_web/06_streams.js"; import { listen, TcpConn } from "ext:deno_net/01_net.js"; import { listenTls } from "ext:deno_net/02_tls.js"; @@ -41,10 +41,6 @@ const { Error, ObjectPrototypeIsPrototypeOf, PromisePrototypeCatch, - SafeSet, - SafeSetIterator, - SetPrototypeAdd, - SetPrototypeDelete, Symbol, SymbolFor, TypeError, @@ -61,7 +57,6 @@ const { op_http_set_promise_complete, op_http_set_response_body_bytes, op_http_set_response_body_resource, - op_http_set_response_body_stream, op_http_set_response_body_text, op_http_set_response_header, op_http_set_response_headers, @@ -339,7 +334,6 @@ class InnerRequest { class CallbackContext { abortController; - responseBodies; scheme; fallbackHost; serverRid; @@ -352,7 +346,6 @@ class CallbackContext { { once: true }, ); this.abortController = new AbortController(); - this.responseBodies = new SafeSet(); this.serverRid = args[0]; this.scheme = args[1]; this.fallbackHost = args[2]; @@ -379,23 +372,24 @@ class ServeHandlerInfo { } } -function fastSyncResponseOrStream(req, respBody) { +function fastSyncResponseOrStream(req, respBody, status) { if (respBody === null || respBody === undefined) { // Don't set the body - return null; + op_http_set_promise_complete(req, status); + return; } const stream = respBody.streamOrStatic; const body = stream.body; if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) { - op_http_set_response_body_bytes(req, body); - return null; + op_http_set_response_body_bytes(req, body, status); + return; } if (typeof body === "string") { - op_http_set_response_body_text(req, body); - return null; + op_http_set_response_body_text(req, body, status); + return; } // At this point in the response it needs to be a stream @@ -408,115 +402,16 @@ function fastSyncResponseOrStream(req, respBody) { req, resourceBacking.rid, resourceBacking.autoClose, + status, + ); + } else { + const rid = resourceForReadableStream(stream); + op_http_set_response_body_resource( + req, + rid, + true, + status, ); - return null; - } - - return stream; -} - -async function asyncResponse(responseBodies, req, status, stream) { - const reader = stream.getReader(); - let responseRid; - let closed = false; - let timeout; - - try { - // IMPORTANT: We get a performance boost from this optimization, but V8 is very - // sensitive to the order and structure. Benchmark any changes to this code. - - // Optimize for streams that are done in zero or one packets. We will not - // have to allocate a resource in this case. - const { value: value1, done: done1 } = await reader.read(); - if (done1) { - closed = true; - // Exit 1: no response body at all, extreme fast path - // Reader will be closed by finally block - return; - } - - // The second value cannot block indefinitely, as someone may be waiting on a response - // of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms - // and we race it. - let timeoutPromise; - timeout = setTimeout(() => { - responseRid = op_http_set_response_body_stream(req); - SetPrototypeAdd(responseBodies, responseRid); - op_http_set_promise_complete(req, status); - // TODO(mmastrac): if this promise fails before we get to the await below, it crashes - // the process with an error: - // - // 'Uncaught (in promise) BadResource: failed to write'. - // - // To avoid this, we're going to swallow errors here and allow the code later in the - // file to re-throw them in a way that doesn't appear to be an uncaught promise rejection. - timeoutPromise = PromisePrototypeCatch( - core.writeAll(responseRid, value1), - () => null, - ); - }, 250); - const { value: value2, done: done2 } = await reader.read(); - - if (timeoutPromise) { - await timeoutPromise; - if (done2) { - closed = true; - // Exit 2(a): read 2 is EOS, and timeout resolved. - // Reader will be closed by finally block - // Response stream will be closed by finally block. - return; - } - - // Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward. - } else { - clearTimeout(timeout); - timeout = undefined; - - if (done2) { - // Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough. - // Reader will be closed by finally block - // No response stream - closed = true; - op_http_set_response_body_bytes(req, value1); - return; - } - - responseRid = op_http_set_response_body_stream(req); - SetPrototypeAdd(responseBodies, responseRid); - op_http_set_promise_complete(req, status); - // Write our first packet - await core.writeAll(responseRid, value1); - } - - await core.writeAll(responseRid, value2); - while (true) { - const { value, done } = await reader.read(); - if (done) { - closed = true; - break; - } - await core.writeAll(responseRid, value); - } - } catch (error) { - closed = true; - try { - await reader.cancel(error); - } catch { - // Pass - } - } finally { - if (!closed) { - readableStreamClose(reader); - } - if (timeout !== undefined) { - clearTimeout(timeout); - } - if (responseRid) { - core.tryClose(responseRid); - SetPrototypeDelete(responseBodies, responseRid); - } else { - op_http_set_promise_complete(req, status); - } } } @@ -528,7 +423,6 @@ async function asyncResponse(responseBodies, req, status, stream) { * This function returns a promise that will only reject in the case of abnormal exit. */ function mapToCallback(context, callback, onError) { - const responseBodies = context.responseBodies; const signal = context.abortController.signal; const hasCallback = callback.length > 0; const hasOneCallback = callback.length === 1; @@ -591,15 +485,7 @@ function mapToCallback(context, callback, onError) { } } - // Attempt to respond quickly to this request, otherwise extract the stream - const stream = fastSyncResponseOrStream(req, inner.body); - if (stream !== null) { - // Handle the stream asynchronously - await asyncResponse(responseBodies, req, status, stream); - } else { - op_http_set_promise_complete(req, status); - } - + fastSyncResponseOrStream(req, inner.body, status); innerRequest?.close(); }; } @@ -755,10 +641,6 @@ function serveHttpOn(context, callback) { } PromisePrototypeCatch(callback(req), promiseErrorHandler); } - - for (const streamRid of new SafeSetIterator(context.responseBodies)) { - core.tryClose(streamRid); - } })(); return { diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 2e9b315ca..60ef83b0f 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,7 +10,6 @@ use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; -use crate::response_body::V8StreamHttpResponseBody; use crate::slab::slab_drop; use crate::slab::slab_get; use crate::slab::slab_init; @@ -30,6 +29,7 @@ use deno_core::task::JoinHandle; use deno_core::v8; use deno_core::AsyncRefCell; use deno_core::AsyncResult; +use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -573,6 +573,7 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { fn set_response( slab_id: SlabId, length: Option<usize>, + status: u16, response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { let mut http = slab_get(slab_id); @@ -583,7 +584,14 @@ fn set_response( length, response.headers_mut(), ); - response.body_mut().initialize(response_fn(compression)) + response.body_mut().initialize(response_fn(compression)); + + // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we + // will quitely ignore invalid values. + if let Ok(code) = StatusCode::from_u16(status) { + *response.status_mut() = code; + } + http.complete(); } #[op2(fast)] @@ -592,6 +600,7 @@ pub fn op_http_set_response_body_resource( #[smi] slab_id: SlabId, #[smi] stream_rid: ResourceId, auto_close: bool, + status: u16, ) -> Result<(), AnyError> { // If the stream is auto_close, we will hold the last ref to it until the response is complete. let resource = if auto_close { @@ -603,6 +612,7 @@ pub fn op_http_set_response_body_resource( set_response( slab_id, resource.size_hint().1.map(|s| s as usize), + status, move |compression| { ResponseBytesInner::from_resource(compression, resource, auto_close) }, @@ -612,42 +622,34 @@ pub fn op_http_set_response_body_resource( } #[op2(fast)] -#[smi] -pub fn op_http_set_response_body_stream( - state: &mut OpState, - #[smi] slab_id: SlabId, -) -> Result<ResourceId, AnyError> { - // TODO(mmastrac): what should this channel size be? - let (tx, rx) = tokio::sync::mpsc::channel(1); - set_response(slab_id, None, |compression| { - ResponseBytesInner::from_v8(compression, rx) - }); - - Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx))) -} - -#[op2(fast)] pub fn op_http_set_response_body_text( #[smi] slab_id: SlabId, #[string] text: String, + status: u16, ) { if !text.is_empty() { - set_response(slab_id, Some(text.len()), |compression| { + set_response(slab_id, Some(text.len()), status, |compression| { ResponseBytesInner::from_vec(compression, text.into_bytes()) }); + } else { + op_http_set_promise_complete::call(slab_id, status); } } -#[op2(fast)] +// Skipping `fast` because we prefer an owned buffer here. +#[op2] pub fn op_http_set_response_body_bytes( #[smi] slab_id: SlabId, - #[buffer] buffer: &[u8], + #[buffer] buffer: JsBuffer, + status: u16, ) { if !buffer.is_empty() { - set_response(slab_id, Some(buffer.len()), |compression| { - ResponseBytesInner::from_slice(compression, buffer) + set_response(slab_id, Some(buffer.len()), status, |compression| { + ResponseBytesInner::from_bufview(compression, BufView::from(buffer)) }); - }; + } else { + op_http_set_promise_complete::call(slab_id, status); + } } #[op2(async)] diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 8060b5a1e..e0c5c89d0 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -115,7 +115,6 @@ deno_core::extension!( http_next::op_http_set_promise_complete, http_next::op_http_set_response_body_bytes, http_next::op_http_set_response_body_resource, - http_next::op_http_set_response_body_stream, http_next::op_http_set_response_body_text, http_next::op_http_set_response_header, http_next::op_http_set_response_headers, diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 3697b2732..bd9d6f433 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -1,5 +1,4 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use std::borrow::Cow; use std::cell::RefCell; use std::future::Future; use std::io::Write; @@ -11,18 +10,12 @@ use brotli::enc::encode::BrotliEncoderParameter; use brotli::ffi::compressor::BrotliEncoderState; use bytes::Bytes; use bytes::BytesMut; -use deno_core::error::bad_resource; use deno_core::error::AnyError; use deno_core::futures::ready; use deno_core::futures::FutureExt; -use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::BufView; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::RcRef; use deno_core::Resource; -use deno_core::WriteOutcome; use flate2::write::GzEncoder; use http::HeaderMap; use hyper1::body::Body; @@ -126,8 +119,8 @@ pub enum Compression { pub enum ResponseStream { /// A resource stream, piped in fast mode. Resource(ResourceBodyAdapter), - /// A JS-backed stream, written in JS and transported via pipe. - V8Stream(tokio::sync::mpsc::Receiver<BufView>), + #[cfg(test)] + TestChannel(tokio::sync::mpsc::Receiver<BufView>), } #[derive(Default)] @@ -217,13 +210,6 @@ impl ResponseBytesInner { } } - pub fn from_v8( - compression: Compression, - rx: tokio::sync::mpsc::Receiver<BufView>, - ) -> Self { - Self::from_stream(compression, ResponseStream::V8Stream(rx)) - } - pub fn from_resource( compression: Compression, stm: Rc<dyn Resource>, @@ -235,12 +221,12 @@ impl ResponseBytesInner { ) } - pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self { + pub fn from_bufview(compression: Compression, buf: BufView) -> Self { match compression { Compression::GZip => { let mut writer = GzEncoder::new(Vec::new(), flate2::Compression::fast()); - writer.write_all(bytes).unwrap(); + writer.write_all(&buf).unwrap(); Self::Bytes(BufView::from(writer.finish().unwrap())) } Compression::Brotli => { @@ -251,11 +237,11 @@ impl ResponseBytesInner { // (~4MB) let mut writer = brotli::CompressorWriter::new(Vec::new(), 65 * 1024, 6, 22); - writer.write_all(bytes).unwrap(); + writer.write_all(&buf).unwrap(); writer.flush().unwrap(); Self::Bytes(BufView::from(writer.into_inner())) } - _ => Self::Bytes(BufView::from(bytes.to_vec())), + _ => Self::Bytes(buf), } } @@ -368,14 +354,16 @@ impl PollFrame for ResponseStream { ) -> std::task::Poll<ResponseStreamResult> { match &mut *self { ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx), - ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx), + #[cfg(test)] + ResponseStream::TestChannel(rx) => Pin::new(rx).poll_frame(cx), } } fn size_hint(&self) -> SizeHint { match self { ResponseStream::Resource(res) => res.size_hint(), - ResponseStream::V8Stream(res) => res.size_hint(), + #[cfg(test)] + ResponseStream::TestChannel(_) => SizeHint::default(), } } } @@ -414,6 +402,7 @@ impl PollFrame for ResourceBodyAdapter { } } +#[cfg(test)] impl PollFrame for tokio::sync::mpsc::Receiver<BufView> { fn poll_frame( mut self: Pin<&mut Self>, @@ -761,52 +750,6 @@ impl PollFrame for BrotliResponseStream { } } -/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which -/// feed's hyper's HTTP response. -pub struct V8StreamHttpResponseBody( - AsyncRefCell<Option<tokio::sync::mpsc::Sender<BufView>>>, - CancelHandle, -); - -impl V8StreamHttpResponseBody { - pub fn new(sender: tokio::sync::mpsc::Sender<BufView>) -> Self { - Self(AsyncRefCell::new(Some(sender)), CancelHandle::default()) - } -} - -impl Resource for V8StreamHttpResponseBody { - fn name(&self) -> Cow<str> { - "responseBody".into() - } - - fn write( - self: Rc<Self>, - buf: BufView, - ) -> AsyncResult<deno_core::WriteOutcome> { - let cancel_handle = RcRef::map(&self, |this| &this.1); - Box::pin( - async move { - let nwritten = buf.len(); - - let res = RcRef::map(self, |this| &this.0).borrow().await; - if let Some(tx) = res.as_ref() { - tx.send(buf) - .await - .map_err(|_| bad_resource("failed to write"))?; - Ok(WriteOutcome::Full { nwritten }) - } else { - Err(bad_resource("failed to write")) - } - } - .try_or_cancel(cancel_handle), - ) - } - - fn close(self: Rc<Self>) { - self.1.cancel(); - } -} - #[cfg(test)] mod tests { use super::*; @@ -892,7 +835,7 @@ mod tests { expected.extend(v); } let (tx, rx) = tokio::sync::mpsc::channel(1); - let underlying = ResponseStream::V8Stream(rx); + let underlying = ResponseStream::TestChannel(rx); let mut resp = GZipResponseStream::new(underlying); let handle = tokio::task::spawn(async move { for chunk in v { @@ -934,7 +877,7 @@ mod tests { expected.extend(v); } let (tx, rx) = tokio::sync::mpsc::channel(1); - let underlying = ResponseStream::V8Stream(rx); + let underlying = ResponseStream::TestChannel(rx); let mut resp = BrotliResponseStream::new(underlying); let handle = tokio::task::spawn(async move { for chunk in v { diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 01f84aa2c..0849d221d 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -1,4 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// deno-lint-ignore-file camelcase // @ts-check /// <reference path="../webidl/internal.d.ts" /> @@ -7,7 +8,17 @@ /// <reference lib="esnext" /> const core = globalThis.Deno.core; -const ops = core.ops; +const internals = globalThis.__bootstrap.internals; +const { + op_arraybuffer_was_detached, + op_transfer_arraybuffer, + op_readable_stream_resource_allocate, + op_readable_stream_resource_get_sink, + op_readable_stream_resource_write_error, + op_readable_stream_resource_write_buf, + op_readable_stream_resource_close, + op_readable_stream_resource_await_close, +} = core.ensureFastOps(); import * as webidl from "ext:deno_webidl/00_webidl.js"; import { structuredClone } from "ext:deno_web/02_structured_clone.js"; import { @@ -61,6 +72,7 @@ const { SafeWeakMap, // TODO(lucacasonato): add SharedArrayBuffer to primordials // SharedArrayBufferPrototype, + String, Symbol, SymbolAsyncIterator, SymbolIterator, @@ -218,7 +230,7 @@ function isDetachedBuffer(O) { return false; } return ArrayBufferPrototypeGetByteLength(O) === 0 && - ops.op_arraybuffer_was_detached(O); + op_arraybuffer_was_detached(O); } /** @@ -244,7 +256,7 @@ function canTransferArrayBuffer(O) { * @returns {ArrayBufferLike} */ function transferArrayBuffer(O) { - return ops.op_transfer_arraybuffer(O); + return op_transfer_arraybuffer(O); } /** @@ -695,6 +707,68 @@ function isReadableStreamDisturbed(stream) { return stream[_disturbed]; } +/** + * Create a new resource that wraps a ReadableStream. The resource will support + * read operations, and those read operations will be fed by the output of the + * ReadableStream source. + * @param {ReadableStream<Uint8Array>} stream + * @returns {number} + */ +function resourceForReadableStream(stream) { + const reader = acquireReadableStreamDefaultReader(stream); + + // Allocate the resource + const rid = op_readable_stream_resource_allocate(); + + // Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors + PromisePrototypeCatch( + PromisePrototypeThen( + op_readable_stream_resource_await_close(rid), + () => reader.cancel(), + ), + () => {}, + ); + + // The ops here look like op_write_all/op_close, but we're not actually writing to a + // real resource. + (async () => { + try { + // This allocation is freed in the finally block below, guaranteeing it won't leak + const sink = op_readable_stream_resource_get_sink(rid); + try { + while (true) { + let value; + try { + const read = await reader.read(); + value = read.value; + if (read.done) { + break; + } + } catch (err) { + const message = err.message; + if (message) { + await op_readable_stream_resource_write_error(sink, err.message); + } else { + await op_readable_stream_resource_write_error(sink, String(err)); + } + break; + } + // If the chunk has non-zero length, write it + if (value.length > 0) { + await op_readable_stream_resource_write_buf(sink, value); + } + } + } finally { + op_readable_stream_resource_close(sink); + } + } catch (err) { + // Something went terribly wrong with this stream -- log and continue + console.error("Unexpected internal error on stream", err); + } + })(); + return rid; +} + const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB // A finalization registry to clean up underlying resources that are GC'ed. @@ -6454,6 +6528,8 @@ webidl.converters.StreamPipeOptions = webidl { key: "signal", converter: webidl.converters.AbortSignal }, ]); +internals.resourceForReadableStream = resourceForReadableStream; + export { // Non-Public _state, @@ -6482,6 +6558,7 @@ export { ReadableStreamPrototype, readableStreamTee, readableStreamThrowIfErrored, + resourceForReadableStream, TransformStream, TransformStreamDefaultController, WritableStream, diff --git a/ext/web/Cargo.toml b/ext/web/Cargo.toml index dbc2df8c0..b923bc95e 100644 --- a/ext/web/Cargo.toml +++ b/ext/web/Cargo.toml @@ -16,9 +16,11 @@ path = "lib.rs" [dependencies] async-trait.workspace = true base64-simd = "0.8" +bytes.workspace = true deno_core.workspace = true encoding_rs.workspace = true flate2.workspace = true +futures.workspace = true serde = "1.0.149" tokio.workspace = true uuid = { workspace = true, features = ["serde"] } diff --git a/ext/web/lib.rs b/ext/web/lib.rs index 374815804..88937efb2 100644 --- a/ext/web/lib.rs +++ b/ext/web/lib.rs @@ -4,6 +4,7 @@ mod blob; mod compression; mod hr_timer_lock; mod message_port; +mod stream_resource; mod timers; use deno_core::error::range_error; @@ -90,6 +91,12 @@ deno_core::extension!(deno_web, op_cancel_handle, op_sleep, op_transfer_arraybuffer, + stream_resource::op_readable_stream_resource_allocate, + stream_resource::op_readable_stream_resource_get_sink, + stream_resource::op_readable_stream_resource_write_error, + stream_resource::op_readable_stream_resource_write_buf, + stream_resource::op_readable_stream_resource_close, + stream_resource::op_readable_stream_resource_await_close, ], esm = [ "00_infra.js", diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs new file mode 100644 index 000000000..4c2a75648 --- /dev/null +++ b/ext/web/stream_resource.rs @@ -0,0 +1,274 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use deno_core::anyhow::Error; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::op2; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufView; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::JsBuffer; +use deno_core::OpState; +use deno_core::RcLike; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use futures::stream::Peekable; +use futures::Stream; +use futures::StreamExt; +use std::borrow::Cow; +use std::cell::RefCell; +use std::ffi::c_void; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use std::task::Waker; +use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::Sender; + +type SenderCell = RefCell<Option<Sender<Result<BufView, Error>>>>; + +// This indirection allows us to more easily integrate the fast streams work at a later date +#[repr(transparent)] +struct ChannelStreamAdapter<C>(C); + +impl<C> Stream for ChannelStreamAdapter<C> +where + C: ChannelBytesRead, +{ + type Item = Result<BufView, AnyError>; + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + self.0.poll_recv(cx) + } +} + +pub trait ChannelBytesRead: Unpin + 'static { + fn poll_recv( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<BufView, AnyError>>>; +} + +impl ChannelBytesRead for tokio::sync::mpsc::Receiver<Result<BufView, Error>> { + fn poll_recv( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<BufView, AnyError>>> { + self.poll_recv(cx) + } +} + +#[allow(clippy::type_complexity)] +struct ReadableStreamResource { + reader: AsyncRefCell< + Peekable<ChannelStreamAdapter<Receiver<Result<BufView, Error>>>>, + >, + cancel_handle: CancelHandle, + data: ReadableStreamResourceData, +} + +impl ReadableStreamResource { + pub fn cancel_handle(self: &Rc<Self>) -> impl RcLike<CancelHandle> { + RcRef::map(self, |s| &s.cancel_handle).clone() + } + + async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> { + let cancel_handle = self.cancel_handle(); + let peekable = RcRef::map(self, |this| &this.reader); + let mut peekable = peekable.borrow_mut().await; + match Pin::new(&mut *peekable) + .peek_mut() + .or_cancel(cancel_handle) + .await? + { + None => Ok(BufView::empty()), + // Take the actual error since we only have a reference to it + Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()), + Some(Ok(bytes)) => { + if bytes.len() <= limit { + // We can safely take the next item since we peeked it + return peekable.next().await.unwrap(); + } + // The remainder of the bytes after we split it is still left in the peek buffer + let ret = bytes.split_to(limit); + Ok(ret) + } + } + } +} + +impl Resource for ReadableStreamResource { + fn name(&self) -> Cow<str> { + Cow::Borrowed("readableStream") + } + + fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { + Box::pin(ReadableStreamResource::read(self, limit)) + } +} + +// TODO(mmastrac): Move this to deno_core +#[derive(Clone, Debug, Default)] +pub struct CompletionHandle { + inner: Rc<RefCell<CompletionHandleInner>>, +} + +#[derive(Debug, Default)] +struct CompletionHandleInner { + complete: bool, + success: bool, + waker: Option<Waker>, +} + +impl CompletionHandle { + pub fn complete(&self, success: bool) { + let mut mut_self = self.inner.borrow_mut(); + mut_self.complete = true; + mut_self.success = success; + if let Some(waker) = mut_self.waker.take() { + drop(mut_self); + waker.wake(); + } + } +} + +impl Future for CompletionHandle { + type Output = bool; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Self::Output> { + let mut mut_self = self.inner.borrow_mut(); + if mut_self.complete { + return std::task::Poll::Ready(mut_self.success); + } + + mut_self.waker = Some(cx.waker().clone()); + std::task::Poll::Pending + } +} + +fn sender_closed() -> Error { + type_error("sender closed") +} + +/// Allocate a resource that wraps a ReadableStream. +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId { + let (tx, rx) = tokio::sync::mpsc::channel(1); + let tx = RefCell::new(Some(tx)); + let completion = CompletionHandle::default(); + let tx = Box::new(tx); + let resource = ReadableStreamResource { + cancel_handle: Default::default(), + reader: AsyncRefCell::new(ChannelStreamAdapter(rx).peekable()), + data: ReadableStreamResourceData { + tx: Box::into_raw(tx), + completion, + }, + }; + state.resource_table.add(resource) +} + +#[op2(fast)] +pub fn op_readable_stream_resource_get_sink( + state: &mut OpState, + #[smi] rid: ResourceId, +) -> *const c_void { + let Ok(resource) = state.resource_table.get::<ReadableStreamResource>(rid) else { + return std::ptr::null(); + }; + resource.data.tx as _ +} + +fn get_sender(sender: *const c_void) -> Option<Sender<Result<BufView, Error>>> { + // SAFETY: We know this is a valid v8::External + unsafe { + (sender as *const SenderCell) + .as_ref() + .and_then(|r| r.borrow_mut().as_ref().cloned()) + } +} + +fn drop_sender(sender: *const c_void) { + // SAFETY: We know this is a valid v8::External + unsafe { + assert!(!sender.is_null()); + _ = Box::from_raw(sender as *mut SenderCell); + } +} + +#[op2(async)] +pub fn op_readable_stream_resource_write_buf( + sender: *const c_void, + #[buffer] buffer: JsBuffer, +) -> impl Future<Output = Result<(), Error>> { + let sender = get_sender(sender); + async move { + let sender = sender.ok_or_else(sender_closed)?; + sender + .send(Ok(buffer.into())) + .await + .map_err(|_| sender_closed())?; + Ok(()) + } +} + +#[op2(async)] +pub fn op_readable_stream_resource_write_error( + sender: *const c_void, + #[string] error: String, +) -> impl Future<Output = Result<(), Error>> { + let sender = get_sender(sender); + async move { + let sender = sender.ok_or_else(sender_closed)?; + sender + .send(Err(type_error(Cow::Owned(error)))) + .await + .map_err(|_| sender_closed())?; + Ok(()) + } +} + +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_close(sender: *const c_void) { + drop_sender(sender); +} + +#[op2(async)] +pub fn op_readable_stream_resource_await_close( + state: &mut OpState, + #[smi] rid: ResourceId, +) -> impl Future<Output = ()> { + let completion = state + .resource_table + .get::<ReadableStreamResource>(rid) + .ok() + .map(|r| r.data.completion.clone()); + + async move { + if let Some(completion) = completion { + completion.await; + } + } +} + +struct ReadableStreamResourceData { + tx: *const SenderCell, + completion: CompletionHandle, +} + +impl Drop for ReadableStreamResourceData { + fn drop(&mut self) { + self.completion.complete(true); + } +} |