diff options
Diffstat (limited to 'ext/web/stream_resource.rs')
-rw-r--r-- | ext/web/stream_resource.rs | 33 |
1 files changed, 32 insertions, 1 deletions
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 93d10f806..8505be01a 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -197,7 +197,14 @@ impl BoundedBufferChannelInner { pub fn write(&mut self, buffer: V8Slice<u8>) -> Result<(), V8Slice<u8>> { let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE; if next_producer_index == self.ring_consumer { - return Err(buffer); + // Note that we may have been allowed to write because of a close/error condition, but the + // underlying channel is actually closed. If this is the case, we return `Ok(())`` and just + // drop the bytes on the floor. + return if self.closed || self.error.is_some() { + Ok(()) + } else { + Err(buffer) + }; } self.current_size += buffer.len(); @@ -336,6 +343,7 @@ struct ReadableStreamResource { channel: BoundedBufferChannel, cancel_handle: CancelHandle, data: ReadableStreamResourceData, + size_hint: (u64, Option<u64>), } impl ReadableStreamResource { @@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource { fn close(self: Rc<Self>) { self.close_channel(); } + + fn size_hint(&self) -> (u64, Option<u64>) { + self.size_hint + } } impl Drop for ReadableStreamResource { @@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId { cancel_handle: Default::default(), channel: BoundedBufferChannel::default(), data: ReadableStreamResourceData { completion }, + size_hint: (0, None), + }; + state.resource_table.add(resource) +} + +/// Allocate a resource that wraps a ReadableStream, with a size hint. +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_allocate_sized( + state: &mut OpState, + #[number] length: u64, +) -> ResourceId { + let completion = CompletionHandle::default(); + let resource = ReadableStreamResource { + read_queue: Default::default(), + cancel_handle: Default::default(), + channel: BoundedBufferChannel::default(), + data: ReadableStreamResourceData { completion }, + size_hint: (length, Some(length)), }; state.resource_table.add(resource) } |