summaryrefslogtreecommitdiff
path: root/ext/web/stream_resource.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/web/stream_resource.rs')
-rw-r--r--ext/web/stream_resource.rs274
1 files changed, 274 insertions, 0 deletions
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
new file mode 100644
index 000000000..4c2a75648
--- /dev/null
+++ b/ext/web/stream_resource.rs
@@ -0,0 +1,274 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use deno_core::anyhow::Error;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op2;
+use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::BufView;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
+use deno_core::JsBuffer;
+use deno_core::OpState;
+use deno_core::RcLike;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use futures::stream::Peekable;
+use futures::Stream;
+use futures::StreamExt;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::ffi::c_void;
+use std::future::Future;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+use std::task::Waker;
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::mpsc::Sender;
+
+type SenderCell = RefCell<Option<Sender<Result<BufView, Error>>>>;
+
+// This indirection allows us to more easily integrate the fast streams work at a later date
+#[repr(transparent)]
+struct ChannelStreamAdapter<C>(C);
+
+impl<C> Stream for ChannelStreamAdapter<C>
+where
+ C: ChannelBytesRead,
+{
+ type Item = Result<BufView, AnyError>;
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ self.0.poll_recv(cx)
+ }
+}
+
+pub trait ChannelBytesRead: Unpin + 'static {
+ fn poll_recv(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<BufView, AnyError>>>;
+}
+
+impl ChannelBytesRead for tokio::sync::mpsc::Receiver<Result<BufView, Error>> {
+ fn poll_recv(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<BufView, AnyError>>> {
+ self.poll_recv(cx)
+ }
+}
+
+#[allow(clippy::type_complexity)]
+struct ReadableStreamResource {
+ reader: AsyncRefCell<
+ Peekable<ChannelStreamAdapter<Receiver<Result<BufView, Error>>>>,
+ >,
+ cancel_handle: CancelHandle,
+ data: ReadableStreamResourceData,
+}
+
+impl ReadableStreamResource {
+ pub fn cancel_handle(self: &Rc<Self>) -> impl RcLike<CancelHandle> {
+ RcRef::map(self, |s| &s.cancel_handle).clone()
+ }
+
+ async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> {
+ let cancel_handle = self.cancel_handle();
+ let peekable = RcRef::map(self, |this| &this.reader);
+ let mut peekable = peekable.borrow_mut().await;
+ match Pin::new(&mut *peekable)
+ .peek_mut()
+ .or_cancel(cancel_handle)
+ .await?
+ {
+ None => Ok(BufView::empty()),
+ // Take the actual error since we only have a reference to it
+ Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()),
+ Some(Ok(bytes)) => {
+ if bytes.len() <= limit {
+ // We can safely take the next item since we peeked it
+ return peekable.next().await.unwrap();
+ }
+ // The remainder of the bytes after we split it is still left in the peek buffer
+ let ret = bytes.split_to(limit);
+ Ok(ret)
+ }
+ }
+ }
+}
+
+impl Resource for ReadableStreamResource {
+ fn name(&self) -> Cow<str> {
+ Cow::Borrowed("readableStream")
+ }
+
+ fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
+ Box::pin(ReadableStreamResource::read(self, limit))
+ }
+}
+
+// TODO(mmastrac): Move this to deno_core
+#[derive(Clone, Debug, Default)]
+pub struct CompletionHandle {
+ inner: Rc<RefCell<CompletionHandleInner>>,
+}
+
+#[derive(Debug, Default)]
+struct CompletionHandleInner {
+ complete: bool,
+ success: bool,
+ waker: Option<Waker>,
+}
+
+impl CompletionHandle {
+ pub fn complete(&self, success: bool) {
+ let mut mut_self = self.inner.borrow_mut();
+ mut_self.complete = true;
+ mut_self.success = success;
+ if let Some(waker) = mut_self.waker.take() {
+ drop(mut_self);
+ waker.wake();
+ }
+ }
+}
+
+impl Future for CompletionHandle {
+ type Output = bool;
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ let mut mut_self = self.inner.borrow_mut();
+ if mut_self.complete {
+ return std::task::Poll::Ready(mut_self.success);
+ }
+
+ mut_self.waker = Some(cx.waker().clone());
+ std::task::Poll::Pending
+ }
+}
+
+fn sender_closed() -> Error {
+ type_error("sender closed")
+}
+
+/// Allocate a resource that wraps a ReadableStream.
+#[op2(fast)]
+#[smi]
+pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId {
+ let (tx, rx) = tokio::sync::mpsc::channel(1);
+ let tx = RefCell::new(Some(tx));
+ let completion = CompletionHandle::default();
+ let tx = Box::new(tx);
+ let resource = ReadableStreamResource {
+ cancel_handle: Default::default(),
+ reader: AsyncRefCell::new(ChannelStreamAdapter(rx).peekable()),
+ data: ReadableStreamResourceData {
+ tx: Box::into_raw(tx),
+ completion,
+ },
+ };
+ state.resource_table.add(resource)
+}
+
+#[op2(fast)]
+pub fn op_readable_stream_resource_get_sink(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+) -> *const c_void {
+ let Ok(resource) = state.resource_table.get::<ReadableStreamResource>(rid) else {
+ return std::ptr::null();
+ };
+ resource.data.tx as _
+}
+
+fn get_sender(sender: *const c_void) -> Option<Sender<Result<BufView, Error>>> {
+ // SAFETY: We know this is a valid v8::External
+ unsafe {
+ (sender as *const SenderCell)
+ .as_ref()
+ .and_then(|r| r.borrow_mut().as_ref().cloned())
+ }
+}
+
+fn drop_sender(sender: *const c_void) {
+ // SAFETY: We know this is a valid v8::External
+ unsafe {
+ assert!(!sender.is_null());
+ _ = Box::from_raw(sender as *mut SenderCell);
+ }
+}
+
+#[op2(async)]
+pub fn op_readable_stream_resource_write_buf(
+ sender: *const c_void,
+ #[buffer] buffer: JsBuffer,
+) -> impl Future<Output = Result<(), Error>> {
+ let sender = get_sender(sender);
+ async move {
+ let sender = sender.ok_or_else(sender_closed)?;
+ sender
+ .send(Ok(buffer.into()))
+ .await
+ .map_err(|_| sender_closed())?;
+ Ok(())
+ }
+}
+
+#[op2(async)]
+pub fn op_readable_stream_resource_write_error(
+ sender: *const c_void,
+ #[string] error: String,
+) -> impl Future<Output = Result<(), Error>> {
+ let sender = get_sender(sender);
+ async move {
+ let sender = sender.ok_or_else(sender_closed)?;
+ sender
+ .send(Err(type_error(Cow::Owned(error))))
+ .await
+ .map_err(|_| sender_closed())?;
+ Ok(())
+ }
+}
+
+#[op2(fast)]
+#[smi]
+pub fn op_readable_stream_resource_close(sender: *const c_void) {
+ drop_sender(sender);
+}
+
+#[op2(async)]
+pub fn op_readable_stream_resource_await_close(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+) -> impl Future<Output = ()> {
+ let completion = state
+ .resource_table
+ .get::<ReadableStreamResource>(rid)
+ .ok()
+ .map(|r| r.data.completion.clone());
+
+ async move {
+ if let Some(completion) = completion {
+ completion.await;
+ }
+ }
+}
+
+struct ReadableStreamResourceData {
+ tx: *const SenderCell,
+ completion: CompletionHandle,
+}
+
+impl Drop for ReadableStreamResourceData {
+ fn drop(&mut self) {
+ self.completion.complete(true);
+ }
+}