diff options
Diffstat (limited to 'ext/web')
-rw-r--r-- | ext/web/06_streams.js | 8 | ||||
-rw-r--r-- | ext/web/lib.rs | 1 | ||||
-rw-r--r-- | ext/web/stream_resource.rs | 33 |
3 files changed, 39 insertions, 3 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 4f472984d..9fc15d1ad 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -12,6 +12,7 @@ const { op_arraybuffer_was_detached, op_transfer_arraybuffer, op_readable_stream_resource_allocate, + op_readable_stream_resource_allocate_sized, op_readable_stream_resource_get_sink, op_readable_stream_resource_write_error, op_readable_stream_resource_write_buf, @@ -863,13 +864,16 @@ function readableStreamReadFn(reader, sink) { * read operations, and those read operations will be fed by the output of the * ReadableStream source. * @param {ReadableStream<Uint8Array>} stream + * @param {number | undefined} length * @returns {number} */ -function resourceForReadableStream(stream) { +function resourceForReadableStream(stream, length) { const reader = acquireReadableStreamDefaultReader(stream); // Allocate the resource - const rid = op_readable_stream_resource_allocate(); + const rid = typeof length == "number" + ? op_readable_stream_resource_allocate_sized(length) + : op_readable_stream_resource_allocate(); // Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors PromisePrototypeCatch( diff --git a/ext/web/lib.rs b/ext/web/lib.rs index 4e0d97f5c..a68b6344e 100644 --- a/ext/web/lib.rs +++ b/ext/web/lib.rs @@ -91,6 +91,7 @@ deno_core::extension!(deno_web, op_sleep, op_transfer_arraybuffer, stream_resource::op_readable_stream_resource_allocate, + stream_resource::op_readable_stream_resource_allocate_sized, stream_resource::op_readable_stream_resource_get_sink, stream_resource::op_readable_stream_resource_write_error, stream_resource::op_readable_stream_resource_write_buf, diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 93d10f806..8505be01a 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -197,7 +197,14 @@ impl BoundedBufferChannelInner { pub fn write(&mut self, buffer: V8Slice<u8>) -> Result<(), V8Slice<u8>> { let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE; if next_producer_index == self.ring_consumer { - return Err(buffer); + // Note that we may have been allowed to write because of a close/error condition, but the + // underlying channel is actually closed. If this is the case, we return `Ok(())`` and just + // drop the bytes on the floor. + return if self.closed || self.error.is_some() { + Ok(()) + } else { + Err(buffer) + }; } self.current_size += buffer.len(); @@ -336,6 +343,7 @@ struct ReadableStreamResource { channel: BoundedBufferChannel, cancel_handle: CancelHandle, data: ReadableStreamResourceData, + size_hint: (u64, Option<u64>), } impl ReadableStreamResource { @@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource { fn close(self: Rc<Self>) { self.close_channel(); } + + fn size_hint(&self) -> (u64, Option<u64>) { + self.size_hint + } } impl Drop for ReadableStreamResource { @@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId { cancel_handle: Default::default(), channel: BoundedBufferChannel::default(), data: ReadableStreamResourceData { completion }, + size_hint: (0, None), + }; + state.resource_table.add(resource) +} + +/// Allocate a resource that wraps a ReadableStream, with a size hint. +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_allocate_sized( + state: &mut OpState, + #[number] length: u64, +) -> ResourceId { + let completion = CompletionHandle::default(); + let resource = ReadableStreamResource { + read_queue: Default::default(), + cancel_handle: Default::default(), + channel: BoundedBufferChannel::default(), + data: ReadableStreamResourceData { completion }, + size_hint: (length, Some(length)), }; state.resource_table.add(resource) } |