summaryrefslogtreecommitdiff
path: root/ext/web
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-01 08:56:10 -0700
committerGitHub <noreply@github.com>2023-12-01 08:56:10 -0700
commite6e708e46c51f3154a81ed99cd35c3d5569930f9 (patch)
tree57a0fdd4c0a5911b62b5af02d09d5a2d9de77ed9 /ext/web
parent687ae870d1e4e856b7ceee0a5511138459c68cb1 (diff)
refactor: use resourceForReadableStream for fetch (#20217)
Switch `ext/fetch` over to `resourceForReadableStream` to simplify and unify implementation with `ext/serve`. This allows us to work in Rust with resources only. Two additional changes made to `resourceForReadableStream` were required: - Add an optional length to `resourceForReadableStream` which translates to `size_hint` - Fix a bug where writing to a closed stream that was full would panic
Diffstat (limited to 'ext/web')
-rw-r--r--ext/web/06_streams.js8
-rw-r--r--ext/web/lib.rs1
-rw-r--r--ext/web/stream_resource.rs33
3 files changed, 39 insertions, 3 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 4f472984d..9fc15d1ad 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -12,6 +12,7 @@ const {
op_arraybuffer_was_detached,
op_transfer_arraybuffer,
op_readable_stream_resource_allocate,
+ op_readable_stream_resource_allocate_sized,
op_readable_stream_resource_get_sink,
op_readable_stream_resource_write_error,
op_readable_stream_resource_write_buf,
@@ -863,13 +864,16 @@ function readableStreamReadFn(reader, sink) {
* read operations, and those read operations will be fed by the output of the
* ReadableStream source.
* @param {ReadableStream<Uint8Array>} stream
+ * @param {number | undefined} length
* @returns {number}
*/
-function resourceForReadableStream(stream) {
+function resourceForReadableStream(stream, length) {
const reader = acquireReadableStreamDefaultReader(stream);
// Allocate the resource
- const rid = op_readable_stream_resource_allocate();
+ const rid = typeof length == "number"
+ ? op_readable_stream_resource_allocate_sized(length)
+ : op_readable_stream_resource_allocate();
// Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors
PromisePrototypeCatch(
diff --git a/ext/web/lib.rs b/ext/web/lib.rs
index 4e0d97f5c..a68b6344e 100644
--- a/ext/web/lib.rs
+++ b/ext/web/lib.rs
@@ -91,6 +91,7 @@ deno_core::extension!(deno_web,
op_sleep,
op_transfer_arraybuffer,
stream_resource::op_readable_stream_resource_allocate,
+ stream_resource::op_readable_stream_resource_allocate_sized,
stream_resource::op_readable_stream_resource_get_sink,
stream_resource::op_readable_stream_resource_write_error,
stream_resource::op_readable_stream_resource_write_buf,
diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs
index 93d10f806..8505be01a 100644
--- a/ext/web/stream_resource.rs
+++ b/ext/web/stream_resource.rs
@@ -197,7 +197,14 @@ impl BoundedBufferChannelInner {
pub fn write(&mut self, buffer: V8Slice<u8>) -> Result<(), V8Slice<u8>> {
let next_producer_index = (self.ring_producer + 1) % BUFFER_CHANNEL_SIZE;
if next_producer_index == self.ring_consumer {
- return Err(buffer);
+ // Note that we may have been allowed to write because of a close/error condition, but the
+ // underlying channel is actually closed. If this is the case, we return `Ok(())`` and just
+ // drop the bytes on the floor.
+ return if self.closed || self.error.is_some() {
+ Ok(())
+ } else {
+ Err(buffer)
+ };
}
self.current_size += buffer.len();
@@ -336,6 +343,7 @@ struct ReadableStreamResource {
channel: BoundedBufferChannel,
cancel_handle: CancelHandle,
data: ReadableStreamResourceData,
+ size_hint: (u64, Option<u64>),
}
impl ReadableStreamResource {
@@ -378,6 +386,10 @@ impl Resource for ReadableStreamResource {
fn close(self: Rc<Self>) {
self.close_channel();
}
+
+ fn size_hint(&self) -> (u64, Option<u64>) {
+ self.size_hint
+ }
}
impl Drop for ReadableStreamResource {
@@ -438,6 +450,25 @@ pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId {
cancel_handle: Default::default(),
channel: BoundedBufferChannel::default(),
data: ReadableStreamResourceData { completion },
+ size_hint: (0, None),
+ };
+ state.resource_table.add(resource)
+}
+
+/// Allocate a resource that wraps a ReadableStream, with a size hint.
+#[op2(fast)]
+#[smi]
+pub fn op_readable_stream_resource_allocate_sized(
+ state: &mut OpState,
+ #[number] length: u64,
+) -> ResourceId {
+ let completion = CompletionHandle::default();
+ let resource = ReadableStreamResource {
+ read_queue: Default::default(),
+ cancel_handle: Default::default(),
+ channel: BoundedBufferChannel::default(),
+ data: ReadableStreamResourceData { completion },
+ size_hint: (length, Some(length)),
};
state.resource_table.add(resource)
}