summaryrefslogtreecommitdiff
path: root/ext/web/06_streams.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r--ext/web/06_streams.js83
1 files changed, 80 insertions, 3 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 01f84aa2c..0849d221d 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -1,4 +1,5 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// deno-lint-ignore-file camelcase
// @ts-check
/// <reference path="../webidl/internal.d.ts" />
@@ -7,7 +8,17 @@
/// <reference lib="esnext" />
const core = globalThis.Deno.core;
-const ops = core.ops;
+const internals = globalThis.__bootstrap.internals;
+const {
+ op_arraybuffer_was_detached,
+ op_transfer_arraybuffer,
+ op_readable_stream_resource_allocate,
+ op_readable_stream_resource_get_sink,
+ op_readable_stream_resource_write_error,
+ op_readable_stream_resource_write_buf,
+ op_readable_stream_resource_close,
+ op_readable_stream_resource_await_close,
+} = core.ensureFastOps();
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { structuredClone } from "ext:deno_web/02_structured_clone.js";
import {
@@ -61,6 +72,7 @@ const {
SafeWeakMap,
// TODO(lucacasonato): add SharedArrayBuffer to primordials
// SharedArrayBufferPrototype,
+ String,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
@@ -218,7 +230,7 @@ function isDetachedBuffer(O) {
return false;
}
return ArrayBufferPrototypeGetByteLength(O) === 0 &&
- ops.op_arraybuffer_was_detached(O);
+ op_arraybuffer_was_detached(O);
}
/**
@@ -244,7 +256,7 @@ function canTransferArrayBuffer(O) {
* @returns {ArrayBufferLike}
*/
function transferArrayBuffer(O) {
- return ops.op_transfer_arraybuffer(O);
+ return op_transfer_arraybuffer(O);
}
/**
@@ -695,6 +707,68 @@ function isReadableStreamDisturbed(stream) {
return stream[_disturbed];
}
+/**
+ * Create a new resource that wraps a ReadableStream. The resource will support
+ * read operations, and those read operations will be fed by the output of the
+ * ReadableStream source.
+ * @param {ReadableStream<Uint8Array>} stream
+ * @returns {number}
+ */
+function resourceForReadableStream(stream) {
+ const reader = acquireReadableStreamDefaultReader(stream);
+
+ // Allocate the resource
+ const rid = op_readable_stream_resource_allocate();
+
+ // Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors
+ PromisePrototypeCatch(
+ PromisePrototypeThen(
+ op_readable_stream_resource_await_close(rid),
+ () => reader.cancel(),
+ ),
+ () => {},
+ );
+
+ // The ops here look like op_write_all/op_close, but we're not actually writing to a
+ // real resource.
+ (async () => {
+ try {
+ // This allocation is freed in the finally block below, guaranteeing it won't leak
+ const sink = op_readable_stream_resource_get_sink(rid);
+ try {
+ while (true) {
+ let value;
+ try {
+ const read = await reader.read();
+ value = read.value;
+ if (read.done) {
+ break;
+ }
+ } catch (err) {
+ const message = err.message;
+ if (message) {
+ await op_readable_stream_resource_write_error(sink, err.message);
+ } else {
+ await op_readable_stream_resource_write_error(sink, String(err));
+ }
+ break;
+ }
+ // If the chunk has non-zero length, write it
+ if (value.length > 0) {
+ await op_readable_stream_resource_write_buf(sink, value);
+ }
+ }
+ } finally {
+ op_readable_stream_resource_close(sink);
+ }
+ } catch (err) {
+ // Something went terribly wrong with this stream -- log and continue
+ console.error("Unexpected internal error on stream", err);
+ }
+ })();
+ return rid;
+}
+
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
// A finalization registry to clean up underlying resources that are GC'ed.
@@ -6454,6 +6528,8 @@ webidl.converters.StreamPipeOptions = webidl
{ key: "signal", converter: webidl.converters.AbortSignal },
]);
+internals.resourceForReadableStream = resourceForReadableStream;
+
export {
// Non-Public
_state,
@@ -6482,6 +6558,7 @@ export {
ReadableStreamPrototype,
readableStreamTee,
readableStreamThrowIfErrored,
+ resourceForReadableStream,
TransformStream,
TransformStreamDefaultController,
WritableStream,