summaryrefslogtreecommitdiff
path: root/ext/web
diff options
context:
space:
mode:
Diffstat (limited to 'ext/web')
-rw-r--r--ext/web/06_streams.js8
-rw-r--r--ext/web/lib.rs1
-rw-r--r--ext/web/stream_resource.rs33
3 files changed, 39 insertions, 3 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 4f472984d..9fc15d1ad 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -12,6 +12,7 @@ const {
op_arraybuffer_was_detached,
op_transfer_arraybuffer,
op_readable_stream_resource_allocate,
+ op_readable_stream_resource_allocate_sized,
op_readable_stream_resource_get_sink,
op_readable_stream_resource_write_error,
op_readable_stream_resource_write_buf,
@@ -863,13 +864,16 @@ function readableStreamReadFn(reader, sink) {
* read operations, and those read operations will be fed by the output of the
* ReadableStream source.
* @param {ReadableStream<Uint8Array>} stream
+ * @param {number | undefined} length
* @returns {number}
*/
-function resourceForReadableStream(stream) {
+function resourceForReadableStream(stream, length) {
const reader = acquireReadableStreamDefaultReader(stream);
// Allocate the resource
- const rid = op_readable_stream_resource_allocate();
+ const rid = typeof length == "number"
+ ? op_readable_stream_resource_allocate_sized(length)
+ : op_readable_stream_resource_allocate();
// Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors
PromisePrototypeCatch(
diff --git a/ext/web/lib.rs b/ext/web/lib.rs
index 4e0d97f5c..a68b6344e 100644
--- a/ext/web/lib.rs
+++ b/ext/web/lib.rs
@@ -91,6 +91,7 @@ deno_core::extension!(deno_web,
op_sleep,
op_transfer_arraybuffer,
stream_resource::op_readable_stream_resource_allocate,
+ stream_resource::op_readable_stream_resource_allocate_sized,
stream_resource::op_readable_stream_resource_get_sink,
stream_resource::op_readable_stream_resource_write_error,
stream_resource::op_readable_stream_resource_write_buf,
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
index 93d10f806..8505be01a 100644
--- a/ext/web/stream_resource.rs
+++ b/ext/web/stream_resource.rs
@@ -197,7 +197,14 @@ impl BoundedBufferChannelInner {
pub fn write(&mut self, buffer: V8Slice<u8>) -> Result<(), V8Slice<u8>> {
let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE;
if next_producer_index == self.ring_consumer {
- return Err(buffer);
+ // Note that we may have been allowed to write because of a close/error condition, but the
+ // underlying channel is actually closed. If this is the case, we return `Ok(())`` and just
+ // drop the bytes on the floor.
+ return if self.closed || self.error.is_some() {
+ Ok(())
+ } else {
+ Err(buffer)
+ };
}
self.current_size += buffer.len();
@@ -336,6 +343,7 @@ struct ReadableStreamResource {
channel: BoundedBufferChannel,
cancel_handle: CancelHandle,
data: ReadableStreamResourceData,
+ size_hint: (u64, Option<u64>),
}
impl ReadableStreamResource {
@@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource {
fn close(self: Rc<Self>) {
self.close_channel();
}
+
+ fn size_hint(&self) -> (u64, Option<u64>) {
+ self.size_hint
+ }
}
impl Drop for ReadableStreamResource {
@@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId {
cancel_handle: Default::default(),
channel: BoundedBufferChannel::default(),
data: ReadableStreamResourceData { completion },
+ size_hint: (0, None),
+ };
+ state.resource_table.add(resource)
+}
+
+/// Allocate a resource that wraps a ReadableStream, with a size hint.
+#[op2(fast)]
+#[smi]
+pub fn op_readable_stream_resource_allocate_sized(
+ state: &mut OpState,
+ #[number] length: u64,
+) -> ResourceId {
+ let completion = CompletionHandle::default();
+ let resource = ReadableStreamResource {
+ read_queue: Default::default(),
+ cancel_handle: Default::default(),
+ channel: BoundedBufferChannel::default(),
+ data: ReadableStreamResourceData { completion },
+ size_hint: (length, Some(length)),
};
state.resource_table.add(resource)
}