From e6e708e46c51f3154a81ed99cd35c3d5569930f9 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Fri, 1 Dec 2023 08:56:10 -0700 Subject: refactor: use resourceForReadableStream for fetch (#20217) Switch `ext/fetch` over to `resourceForReadableStream` to simplify and unify implementation with `ext/serve`. This allows us to work in Rust with resources only. Two additional changes made to `resourceForReadableStream` were required: - Add an optional length to `resourceForReadableStream` which translates to `size_hint` - Fix a bug where writing to a closed stream that was full would panic --- ext/web/stream_resource.rs | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) (limited to 'ext/web/stream_resource.rs') diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 93d10f806..8505be01a 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -197,7 +197,14 @@ impl BoundedBufferChannelInner { pub fn write(&mut self, buffer: V8Slice) -> Result<(), V8Slice> { let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE; if next_producer_index == self.ring_consumer { - return Err(buffer); + // Note that we may have been allowed to write because of a close/error condition, but the + // underlying channel is actually closed. If this is the case, we return `Ok(())`` and just + // drop the bytes on the floor. + return if self.closed || self.error.is_some() { + Ok(()) + } else { + Err(buffer) + }; } self.current_size += buffer.len(); @@ -336,6 +343,7 @@ struct ReadableStreamResource { channel: BoundedBufferChannel, cancel_handle: CancelHandle, data: ReadableStreamResourceData, + size_hint: (u64, Option), } impl ReadableStreamResource { @@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource { fn close(self: Rc) { self.close_channel(); } + + fn size_hint(&self) -> (u64, Option) { + self.size_hint + } } impl Drop for ReadableStreamResource { @@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId { cancel_handle: Default::default(), channel: BoundedBufferChannel::default(), data: ReadableStreamResourceData { completion }, + size_hint: (0, None), + }; + state.resource_table.add(resource) +} + +/// Allocate a resource that wraps a ReadableStream, with a size hint. +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_allocate_sized( + state: &mut OpState, + #[number] length: u64, +) -> ResourceId { + let completion = CompletionHandle::default(); + let resource = ReadableStreamResource { + read_queue: Default::default(), + cancel_handle: Default::default(), + channel: BoundedBufferChannel::default(), + data: ReadableStreamResourceData { completion }, + size_hint: (length, Some(length)), }; state.resource_table.add(resource) } -- cgit v1.2.3