diff options
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r-- | ext/web/06_streams.js | 83 |
1 files changed, 80 insertions, 3 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 01f84aa2c..0849d221d 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -1,4 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// deno-lint-ignore-file camelcase // @ts-check /// <reference path="../webidl/internal.d.ts" /> @@ -7,7 +8,17 @@ /// <reference lib="esnext" /> const core = globalThis.Deno.core; -const ops = core.ops; +const internals = globalThis.__bootstrap.internals; +const { + op_arraybuffer_was_detached, + op_transfer_arraybuffer, + op_readable_stream_resource_allocate, + op_readable_stream_resource_get_sink, + op_readable_stream_resource_write_error, + op_readable_stream_resource_write_buf, + op_readable_stream_resource_close, + op_readable_stream_resource_await_close, +} = core.ensureFastOps(); import * as webidl from "ext:deno_webidl/00_webidl.js"; import { structuredClone } from "ext:deno_web/02_structured_clone.js"; import { @@ -61,6 +72,7 @@ const { SafeWeakMap, // TODO(lucacasonato): add SharedArrayBuffer to primordials // SharedArrayBufferPrototype, + String, Symbol, SymbolAsyncIterator, SymbolIterator, @@ -218,7 +230,7 @@ function isDetachedBuffer(O) { return false; } return ArrayBufferPrototypeGetByteLength(O) === 0 && - ops.op_arraybuffer_was_detached(O); + op_arraybuffer_was_detached(O); } /** @@ -244,7 +256,7 @@ function canTransferArrayBuffer(O) { * @returns {ArrayBufferLike} */ function transferArrayBuffer(O) { - return ops.op_transfer_arraybuffer(O); + return op_transfer_arraybuffer(O); } /** @@ -695,6 +707,68 @@ function isReadableStreamDisturbed(stream) { return stream[_disturbed]; } +/** + * Create a new resource that wraps a ReadableStream. The resource will support + * read operations, and those read operations will be fed by the output of the + * ReadableStream source. + * @param {ReadableStream<Uint8Array>} stream + * @returns {number} + */ +function resourceForReadableStream(stream) { + const reader = acquireReadableStreamDefaultReader(stream); + + // Allocate the resource + const rid = op_readable_stream_resource_allocate(); + + // Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors + PromisePrototypeCatch( + PromisePrototypeThen( + op_readable_stream_resource_await_close(rid), + () => reader.cancel(), + ), + () => {}, + ); + + // The ops here look like op_write_all/op_close, but we're not actually writing to a + // real resource. + (async () => { + try { + // This allocation is freed in the finally block below, guaranteeing it won't leak + const sink = op_readable_stream_resource_get_sink(rid); + try { + while (true) { + let value; + try { + const read = await reader.read(); + value = read.value; + if (read.done) { + break; + } + } catch (err) { + const message = err.message; + if (message) { + await op_readable_stream_resource_write_error(sink, err.message); + } else { + await op_readable_stream_resource_write_error(sink, String(err)); + } + break; + } + // If the chunk has non-zero length, write it + if (value.length > 0) { + await op_readable_stream_resource_write_buf(sink, value); + } + } + } finally { + op_readable_stream_resource_close(sink); + } + } catch (err) { + // Something went terribly wrong with this stream -- log and continue + console.error("Unexpected internal error on stream", err); + } + })(); + return rid; +} + const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB // A finalization registry to clean up underlying resources that are GC'ed. @@ -6454,6 +6528,8 @@ webidl.converters.StreamPipeOptions = webidl { key: "signal", converter: webidl.converters.AbortSignal }, ]); +internals.resourceForReadableStream = resourceForReadableStream; + export { // Non-Public _state, @@ -6482,6 +6558,7 @@ export { ReadableStreamPrototype, readableStreamTee, readableStreamThrowIfErrored, + resourceForReadableStream, TransformStream, TransformStreamDefaultController, WritableStream, |