summaryrefslogtreecommitdiff
path: root/ext/web/stream_resource.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/web/stream_resource.rs')
-rw-r--r--ext/web/stream_resource.rs33
1 files changed, 32 insertions, 1 deletions
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
index 93d10f806..8505be01a 100644
--- a/ext/web/stream_resource.rs
+++ b/ext/web/stream_resource.rs
@@ -197,7 +197,14 @@ impl BoundedBufferChannelInner {
pub fn write(&mut self, buffer: V8Slice<u8>) -> Result<(), V8Slice<u8>> {
let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE;
if next_producer_index == self.ring_consumer {
- return Err(buffer);
+ // Note that we may have been allowed to write because of a close/error condition, but the
+ // underlying channel is actually closed. If this is the case, we return `Ok(())`` and just
+ // drop the bytes on the floor.
+ return if self.closed || self.error.is_some() {
+ Ok(())
+ } else {
+ Err(buffer)
+ };
}
self.current_size += buffer.len();
@@ -336,6 +343,7 @@ struct ReadableStreamResource {
channel: BoundedBufferChannel,
cancel_handle: CancelHandle,
data: ReadableStreamResourceData,
+ size_hint: (u64, Option<u64>),
}
impl ReadableStreamResource {
@@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource {
fn close(self: Rc<Self>) {
self.close_channel();
}
+
+ fn size_hint(&self) -> (u64, Option<u64>) {
+ self.size_hint
+ }
}
impl Drop for ReadableStreamResource {
@@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId {
cancel_handle: Default::default(),
channel: BoundedBufferChannel::default(),
data: ReadableStreamResourceData { completion },
+ size_hint: (0, None),
+ };
+ state.resource_table.add(resource)
+}
+
+/// Allocate a resource that wraps a ReadableStream, with a size hint.
+#[op2(fast)]
+#[smi]
+pub fn op_readable_stream_resource_allocate_sized(
+ state: &mut OpState,
+ #[number] length: u64,
+) -> ResourceId {
+ let completion = CompletionHandle::default();
+ let resource = ReadableStreamResource {
+ read_queue: Default::default(),
+ cancel_handle: Default::default(),
+ channel: BoundedBufferChannel::default(),
+ data: ReadableStreamResourceData { completion },
+ size_hint: (length, Some(length)),
};
state.resource_table.add(resource)
}