summaryrefslogtreecommitdiff
path: root/ext/web/stream_resource.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-01 08:56:10 -0700
committerGitHub <noreply@github.com>2023-12-01 08:56:10 -0700
commite6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch)
tree57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/web/stream_resource.rs
parent687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff)
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and unify implementation with `ext/serve`. This allows us to work in Rust with resources only. Two additional changes made to `resourceForReadableStream` were required: - Add an optional length to `resourceForReadableStream` which translates to `size_hint` - Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/web/stream_resource.rs')
-rw-r--r--ext/web/stream_resource.rs33
1 files changed, 32 insertions, 1 deletions
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
index 93d10f806..8505be01a 100644
--- a/ext/web/stream_resource.rs
+++ b/ext/web/stream_resource.rs
@@ -197,7 +197,14 @@ impl BoundedBufferChannelInner {
pub fn write(&mut self, buffer: V8Slice<u8>) -> Result<(), V8Slice<u8>> {
let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE;
if next_producer_index == self.ring_consumer {
- return Err(buffer);
+ // Note that we may have been allowed to write because of a close/error condition, but the
+ // underlying channel is actually closed. If this is the case, we return `Ok(())`` and just
+ // drop the bytes on the floor.
+ return if self.closed || self.error.is_some() {
+ Ok(())
+ } else {
+ Err(buffer)
+ };
}
self.current_size += buffer.len();
@@ -336,6 +343,7 @@ struct ReadableStreamResource {
channel: BoundedBufferChannel,
cancel_handle: CancelHandle,
data: ReadableStreamResourceData,
+ size_hint: (u64, Option<u64>),
}
impl ReadableStreamResource {
@@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource {
fn close(self: Rc<Self>) {
self.close_channel();
}
+
+ fn size_hint(&self) -> (u64, Option<u64>) {
+ self.size_hint
+ }
}
impl Drop for ReadableStreamResource {
@@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId {
cancel_handle: Default::default(),
channel: BoundedBufferChannel::default(),
data: ReadableStreamResourceData { completion },
+ size_hint: (0, None),
+ };
+ state.resource_table.add(resource)
+}
+
+/// Allocate a resource that wraps a ReadableStream, with a size hint.
+#[op2(fast)]
+#[smi]
+pub fn op_readable_stream_resource_allocate_sized(
+ state: &mut OpState,
+ #[number] length: u64,
+) -> ResourceId {
+ let completion = CompletionHandle::default();
+ let resource = ReadableStreamResource {
+ read_queue: Default::default(),
+ cancel_handle: Default::default(),
+ channel: BoundedBufferChannel::default(),
+ data: ReadableStreamResourceData { completion },
+ size_hint: (length, Some(length)),
};
state.resource_table.add(resource)
}