diff options
Diffstat (limited to 'ext/web')
-rw-r--r-- | ext/web/06_streams.js | 83 | ||||
-rw-r--r-- | ext/web/Cargo.toml | 2 | ||||
-rw-r--r-- | ext/web/lib.rs | 7 | ||||
-rw-r--r-- | ext/web/stream_resource.rs | 274 |
4 files changed, 363 insertions, 3 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 01f84aa2c..0849d221d 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -1,4 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// deno-lint-ignore-file camelcase // @ts-check /// <reference path="../webidl/internal.d.ts" /> @@ -7,7 +8,17 @@ /// <reference lib="esnext" /> const core = globalThis.Deno.core; -const ops = core.ops; +const internals = globalThis.__bootstrap.internals; +const { + op_arraybuffer_was_detached, + op_transfer_arraybuffer, + op_readable_stream_resource_allocate, + op_readable_stream_resource_get_sink, + op_readable_stream_resource_write_error, + op_readable_stream_resource_write_buf, + op_readable_stream_resource_close, + op_readable_stream_resource_await_close, +} = core.ensureFastOps(); import * as webidl from "ext:deno_webidl/00_webidl.js"; import { structuredClone } from "ext:deno_web/02_structured_clone.js"; import { @@ -61,6 +72,7 @@ const { SafeWeakMap, // TODO(lucacasonato): add SharedArrayBuffer to primordials // SharedArrayBufferPrototype, + String, Symbol, SymbolAsyncIterator, SymbolIterator, @@ -218,7 +230,7 @@ function isDetachedBuffer(O) { return false; } return ArrayBufferPrototypeGetByteLength(O) === 0 && - ops.op_arraybuffer_was_detached(O); + op_arraybuffer_was_detached(O); } /** @@ -244,7 +256,7 @@ function canTransferArrayBuffer(O) { * @returns {ArrayBufferLike} */ function transferArrayBuffer(O) { - return ops.op_transfer_arraybuffer(O); + return op_transfer_arraybuffer(O); } /** @@ -695,6 +707,68 @@ function isReadableStreamDisturbed(stream) { return stream[_disturbed]; } +/** + * Create a new resource that wraps a ReadableStream. The resource will support + * read operations, and those read operations will be fed by the output of the + * ReadableStream source. + * @param {ReadableStream<Uint8Array>} stream + * @returns {number} + */ +function resourceForReadableStream(stream) { + const reader = acquireReadableStreamDefaultReader(stream); + + // Allocate the resource + const rid = op_readable_stream_resource_allocate(); + + // Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors + PromisePrototypeCatch( + PromisePrototypeThen( + op_readable_stream_resource_await_close(rid), + () => reader.cancel(), + ), + () => {}, + ); + + // The ops here look like op_write_all/op_close, but we're not actually writing to a + // real resource. + (async () => { + try { + // This allocation is freed in the finally block below, guaranteeing it won't leak + const sink = op_readable_stream_resource_get_sink(rid); + try { + while (true) { + let value; + try { + const read = await reader.read(); + value = read.value; + if (read.done) { + break; + } + } catch (err) { + const message = err.message; + if (message) { + await op_readable_stream_resource_write_error(sink, err.message); + } else { + await op_readable_stream_resource_write_error(sink, String(err)); + } + break; + } + // If the chunk has non-zero length, write it + if (value.length > 0) { + await op_readable_stream_resource_write_buf(sink, value); + } + } + } finally { + op_readable_stream_resource_close(sink); + } + } catch (err) { + // Something went terribly wrong with this stream -- log and continue + console.error("Unexpected internal error on stream", err); + } + })(); + return rid; +} + const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB // A finalization registry to clean up underlying resources that are GC'ed. @@ -6454,6 +6528,8 @@ webidl.converters.StreamPipeOptions = webidl { key: "signal", converter: webidl.converters.AbortSignal }, ]); +internals.resourceForReadableStream = resourceForReadableStream; + export { // Non-Public _state, @@ -6482,6 +6558,7 @@ export { ReadableStreamPrototype, readableStreamTee, readableStreamThrowIfErrored, + resourceForReadableStream, TransformStream, TransformStreamDefaultController, WritableStream, diff --git a/ext/web/Cargo.toml b/ext/web/Cargo.toml index dbc2df8c0..b923bc95e 100644 --- a/ext/web/Cargo.toml +++ b/ext/web/Cargo.toml @@ -16,9 +16,11 @@ path = "lib.rs" [dependencies] async-trait.workspace = true base64-simd = "0.8" +bytes.workspace = true deno_core.workspace = true encoding_rs.workspace = true flate2.workspace = true +futures.workspace = true serde = "1.0.149" tokio.workspace = true uuid = { workspace = true, features = ["serde"] } diff --git a/ext/web/lib.rs b/ext/web/lib.rs index 374815804..88937efb2 100644 --- a/ext/web/lib.rs +++ b/ext/web/lib.rs @@ -4,6 +4,7 @@ mod blob; mod compression; mod hr_timer_lock; mod message_port; +mod stream_resource; mod timers; use deno_core::error::range_error; @@ -90,6 +91,12 @@ deno_core::extension!(deno_web, op_cancel_handle, op_sleep, op_transfer_arraybuffer, + stream_resource::op_readable_stream_resource_allocate, + stream_resource::op_readable_stream_resource_get_sink, + stream_resource::op_readable_stream_resource_write_error, + stream_resource::op_readable_stream_resource_write_buf, + stream_resource::op_readable_stream_resource_close, + stream_resource::op_readable_stream_resource_await_close, ], esm = [ "00_infra.js", diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs new file mode 100644 index 000000000..4c2a75648 --- /dev/null +++ b/ext/web/stream_resource.rs @@ -0,0 +1,274 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use deno_core::anyhow::Error; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::op2; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufView; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::JsBuffer; +use deno_core::OpState; +use deno_core::RcLike; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use futures::stream::Peekable; +use futures::Stream; +use futures::StreamExt; +use std::borrow::Cow; +use std::cell::RefCell; +use std::ffi::c_void; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use std::task::Waker; +use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::Sender; + +type SenderCell = RefCell<Option<Sender<Result<BufView, Error>>>>; + +// This indirection allows us to more easily integrate the fast streams work at a later date +#[repr(transparent)] +struct ChannelStreamAdapter<C>(C); + +impl<C> Stream for ChannelStreamAdapter<C> +where + C: ChannelBytesRead, +{ + type Item = Result<BufView, AnyError>; + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + self.0.poll_recv(cx) + } +} + +pub trait ChannelBytesRead: Unpin + 'static { + fn poll_recv( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<BufView, AnyError>>>; +} + +impl ChannelBytesRead for tokio::sync::mpsc::Receiver<Result<BufView, Error>> { + fn poll_recv( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<BufView, AnyError>>> { + self.poll_recv(cx) + } +} + +#[allow(clippy::type_complexity)] +struct ReadableStreamResource { + reader: AsyncRefCell< + Peekable<ChannelStreamAdapter<Receiver<Result<BufView, Error>>>>, + >, + cancel_handle: CancelHandle, + data: ReadableStreamResourceData, +} + +impl ReadableStreamResource { + pub fn cancel_handle(self: &Rc<Self>) -> impl RcLike<CancelHandle> { + RcRef::map(self, |s| &s.cancel_handle).clone() + } + + async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> { + let cancel_handle = self.cancel_handle(); + let peekable = RcRef::map(self, |this| &this.reader); + let mut peekable = peekable.borrow_mut().await; + match Pin::new(&mut *peekable) + .peek_mut() + .or_cancel(cancel_handle) + .await? + { + None => Ok(BufView::empty()), + // Take the actual error since we only have a reference to it + Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()), + Some(Ok(bytes)) => { + if bytes.len() <= limit { + // We can safely take the next item since we peeked it + return peekable.next().await.unwrap(); + } + // The remainder of the bytes after we split it is still left in the peek buffer + let ret = bytes.split_to(limit); + Ok(ret) + } + } + } +} + +impl Resource for ReadableStreamResource { + fn name(&self) -> Cow<str> { + Cow::Borrowed("readableStream") + } + + fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { + Box::pin(ReadableStreamResource::read(self, limit)) + } +} + +// TODO(mmastrac): Move this to deno_core +#[derive(Clone, Debug, Default)] +pub struct CompletionHandle { + inner: Rc<RefCell<CompletionHandleInner>>, +} + +#[derive(Debug, Default)] +struct CompletionHandleInner { + complete: bool, + success: bool, + waker: Option<Waker>, +} + +impl CompletionHandle { + pub fn complete(&self, success: bool) { + let mut mut_self = self.inner.borrow_mut(); + mut_self.complete = true; + mut_self.success = success; + if let Some(waker) = mut_self.waker.take() { + drop(mut_self); + waker.wake(); + } + } +} + +impl Future for CompletionHandle { + type Output = bool; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Self::Output> { + let mut mut_self = self.inner.borrow_mut(); + if mut_self.complete { + return std::task::Poll::Ready(mut_self.success); + } + + mut_self.waker = Some(cx.waker().clone()); + std::task::Poll::Pending + } +} + +fn sender_closed() -> Error { + type_error("sender closed") +} + +/// Allocate a resource that wraps a ReadableStream. +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId { + let (tx, rx) = tokio::sync::mpsc::channel(1); + let tx = RefCell::new(Some(tx)); + let completion = CompletionHandle::default(); + let tx = Box::new(tx); + let resource = ReadableStreamResource { + cancel_handle: Default::default(), + reader: AsyncRefCell::new(ChannelStreamAdapter(rx).peekable()), + data: ReadableStreamResourceData { + tx: Box::into_raw(tx), + completion, + }, + }; + state.resource_table.add(resource) +} + +#[op2(fast)] +pub fn op_readable_stream_resource_get_sink( + state: &mut OpState, + #[smi] rid: ResourceId, +) -> *const c_void { + let Ok(resource) = state.resource_table.get::<ReadableStreamResource>(rid) else { + return std::ptr::null(); + }; + resource.data.tx as _ +} + +fn get_sender(sender: *const c_void) -> Option<Sender<Result<BufView, Error>>> { + // SAFETY: We know this is a valid v8::External + unsafe { + (sender as *const SenderCell) + .as_ref() + .and_then(|r| r.borrow_mut().as_ref().cloned()) + } +} + +fn drop_sender(sender: *const c_void) { + // SAFETY: We know this is a valid v8::External + unsafe { + assert!(!sender.is_null()); + _ = Box::from_raw(sender as *mut SenderCell); + } +} + +#[op2(async)] +pub fn op_readable_stream_resource_write_buf( + sender: *const c_void, + #[buffer] buffer: JsBuffer, +) -> impl Future<Output = Result<(), Error>> { + let sender = get_sender(sender); + async move { + let sender = sender.ok_or_else(sender_closed)?; + sender + .send(Ok(buffer.into())) + .await + .map_err(|_| sender_closed())?; + Ok(()) + } +} + +#[op2(async)] +pub fn op_readable_stream_resource_write_error( + sender: *const c_void, + #[string] error: String, +) -> impl Future<Output = Result<(), Error>> { + let sender = get_sender(sender); + async move { + let sender = sender.ok_or_else(sender_closed)?; + sender + .send(Err(type_error(Cow::Owned(error)))) + .await + .map_err(|_| sender_closed())?; + Ok(()) + } +} + +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_close(sender: *const c_void) { + drop_sender(sender); +} + +#[op2(async)] +pub fn op_readable_stream_resource_await_close( + state: &mut OpState, + #[smi] rid: ResourceId, +) -> impl Future<Output = ()> { + let completion = state + .resource_table + .get::<ReadableStreamResource>(rid) + .ok() + .map(|r| r.data.completion.clone()); + + async move { + if let Some(completion) = completion { + completion.await; + } + } +} + +struct ReadableStreamResourceData { + tx: *const SenderCell, + completion: CompletionHandle, +} + +impl Drop for ReadableStreamResourceData { + fn drop(&mut self) { + self.completion.complete(true); + } +} |