diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-12-01 08:56:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-01 08:56:10 -0700 |
commit | e6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch) | |
tree | 57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/web/stream_resource.rs | |
parent | 687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff) |
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and
unify implementation with `ext/serve`. This allows us to work in Rust
with resources only.
Two additional changes made to `resourceForReadableStream` were
required:
- Add an optional length to `resourceForReadableStream` which translates
to `size_hint`
- Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/web/stream_resource.rs')
-rw-r--r-- | ext/web/stream_resource.rs | 33 |
1 files changed, 32 insertions, 1 deletions
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 93d10f806..8505be01a 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -197,7 +197,14 @@ impl BoundedBufferChannelInner { pub fn write(&mut self, buffer: V8Slice<u8>) -> Result<(), V8Slice<u8>> { let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE; if next_producer_index == self.ring_consumer { - return Err(buffer); + // Note that we may have been allowed to write because of a close/error condition, but the + // underlying channel is actually closed. If this is the case, we return `Ok(())`` and just + // drop the bytes on the floor. + return if self.closed || self.error.is_some() { + Ok(()) + } else { + Err(buffer) + }; } self.current_size += buffer.len(); @@ -336,6 +343,7 @@ struct ReadableStreamResource { channel: BoundedBufferChannel, cancel_handle: CancelHandle, data: ReadableStreamResourceData, + size_hint: (u64, Option<u64>), } impl ReadableStreamResource { @@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource { fn close(self: Rc<Self>) { self.close_channel(); } + + fn size_hint(&self) -> (u64, Option<u64>) { + self.size_hint + } } impl Drop for ReadableStreamResource { @@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId { cancel_handle: Default::default(), channel: BoundedBufferChannel::default(), data: ReadableStreamResourceData { completion }, + size_hint: (0, None), + }; + state.resource_table.add(resource) +} + +/// Allocate a resource that wraps a ReadableStream, with a size hint. +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_allocate_sized( + state: &mut OpState, + #[number] length: u64, +) -> ResourceId { + let completion = CompletionHandle::default(); + let resource = ReadableStreamResource { + read_queue: Default::default(), + cancel_handle: Default::default(), + channel: BoundedBufferChannel::default(), + data: ReadableStreamResourceData { completion }, + size_hint: (length, Some(length)), }; state.resource_table.add(resource) } |