summaryrefslogtreecommitdiff
path: root/cli/js
diff options
context:
space:
mode:
Diffstat (limited to 'cli/js')
-rw-r--r--cli/js/globals.ts7
-rw-r--r--cli/js/lib.deno.shared_globals.d.ts64
-rw-r--r--cli/js/tests/streams_piping_test.ts131
-rw-r--r--cli/js/tests/streams_writable_test.ts253
-rw-r--r--cli/js/tests/unit_tests.ts2
-rw-r--r--cli/js/util.ts12
-rw-r--r--cli/js/web/dom_types.d.ts16
-rw-r--r--cli/js/web/fetch.ts4
-rw-r--r--cli/js/web/streams/internals.ts1224
-rw-r--r--cli/js/web/streams/queuing_strategy.ts53
-rw-r--r--cli/js/web/streams/readable_byte_stream_controller.ts10
-rw-r--r--cli/js/web/streams/readable_stream.ts158
-rw-r--r--cli/js/web/streams/readable_stream_default_controller.ts10
-rw-r--r--cli/js/web/streams/readable_stream_default_reader.ts7
-rw-r--r--cli/js/web/streams/symbols.ts16
-rw-r--r--cli/js/web/streams/writable_stream.ts107
-rw-r--r--cli/js/web/streams/writable_stream_default_controller.ts68
-rw-r--r--cli/js/web/streams/writable_stream_default_writer.ts164
18 files changed, 2094 insertions, 212 deletions
diff --git a/cli/js/globals.ts b/cli/js/globals.ts
index 87309a158..caf069ffd 100644
--- a/cli/js/globals.ts
+++ b/cli/js/globals.ts
@@ -23,6 +23,8 @@ import * as workers from "./web/workers.ts";
import * as performanceUtil from "./web/performance.ts";
import * as request from "./web/request.ts";
import * as readableStream from "./web/streams/readable_stream.ts";
+import * as queuingStrategy from "./web/streams/queuing_strategy.ts";
+import * as writableStream from "./web/streams/writable_stream.ts";
// These imports are not exposed and therefore are fine to just import the
// symbols required.
@@ -216,6 +218,10 @@ export const windowOrWorkerGlobalScopeProperties = {
AbortController: nonEnumerable(abortController.AbortControllerImpl),
AbortSignal: nonEnumerable(abortSignal.AbortSignalImpl),
Blob: nonEnumerable(blob.DenoBlob),
+ ByteLengthQueuingStrategy: nonEnumerable(
+ queuingStrategy.ByteLengthQueuingStrategyImpl
+ ),
+ CountQueuingStrategy: nonEnumerable(queuingStrategy.CountQueuingStrategyImpl),
File: nonEnumerable(domFile.DomFileImpl),
CustomEvent: nonEnumerable(customEvent.CustomEventImpl),
DOMException: nonEnumerable(domException.DOMExceptionImpl),
@@ -232,6 +238,7 @@ export const windowOrWorkerGlobalScopeProperties = {
Response: nonEnumerable(fetchTypes.Response),
performance: writable(new performanceUtil.Performance()),
Worker: nonEnumerable(workers.WorkerImpl),
+ WritableStream: nonEnumerable(writableStream.WritableStreamImpl),
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
diff --git a/cli/js/lib.deno.shared_globals.d.ts b/cli/js/lib.deno.shared_globals.d.ts
index 635fa391e..f86279c27 100644
--- a/cli/js/lib.deno.shared_globals.d.ts
+++ b/cli/js/lib.deno.shared_globals.d.ts
@@ -309,6 +309,21 @@ interface QueuingStrategy<T = any> {
size?: QueuingStrategySizeCallback<T>;
}
+/** This Streams API interface provides a built-in byte length queuing strategy
+ * that can be used when constructing streams. */
+declare class CountQueuingStrategy implements QueuingStrategy {
+ constructor(options: { highWaterMark: number });
+ highWaterMark: number;
+ size(chunk: any): 1;
+}
+
+declare class ByteLengthQueuingStrategy
+ implements QueuingStrategy<ArrayBufferView> {
+ constructor(options: { highWaterMark: number });
+ highWaterMark: number;
+ size(chunk: ArrayBufferView): number;
+}
+
/** This Streams API interface represents a readable stream of byte data. The
* Fetch API offers a concrete instance of a ReadableStream through the body
* property of a Response object. */
@@ -347,13 +362,58 @@ declare var ReadableStream: {
): ReadableStream<R>;
};
-/** This Streams API interface provides a standard abstraction for writing streaming data to a destination, known as a sink. This object comes with built-in backpressure and queuing. */
-interface WritableStream<W = any> {
+interface WritableStreamDefaultControllerCloseCallback {
+ (): void | PromiseLike<void>;
+}
+
+interface WritableStreamDefaultControllerStartCallback {
+ (controller: WritableStreamDefaultController): void | PromiseLike<void>;
+}
+
+interface WritableStreamDefaultControllerWriteCallback<W> {
+ (chunk: W, controller: WritableStreamDefaultController): void | PromiseLike<
+ void
+ >;
+}
+
+interface WritableStreamErrorCallback {
+ (reason: any): void | PromiseLike<void>;
+}
+
+interface UnderlyingSink<W = any> {
+ abort?: WritableStreamErrorCallback;
+ close?: WritableStreamDefaultControllerCloseCallback;
+ start?: WritableStreamDefaultControllerStartCallback;
+ type?: undefined;
+ write?: WritableStreamDefaultControllerWriteCallback<W>;
+}
+
+/** This Streams API interface provides a standard abstraction for writing
+ * streaming data to a destination, known as a sink. This object comes with
+ * built-in backpressure and queuing. */
+declare class WritableStream<W = any> {
+ constructor(
+ underlyingSink?: UnderlyingSink<W>,
+ strategy?: QueuingStrategy<W>
+ );
readonly locked: boolean;
abort(reason?: any): Promise<void>;
+ close(): Promise<void>;
getWriter(): WritableStreamDefaultWriter<W>;
}
+/** This Streams API interface represents a controller allowing control of a
+ * WritableStream's state. When constructing a WritableStream, the underlying
+ * sink is given a corresponding WritableStreamDefaultController instance to
+ * manipulate. */
+interface WritableStreamDefaultController {
+ error(error?: any): void;
+}
+
+/** This Streams API interface is the object returned by
+ * WritableStream.getWriter() and once created locks the < writer to the
+ * WritableStream ensuring that no other streams can write to the underlying
+ * sink. */
interface WritableStreamDefaultWriter<W = any> {
readonly closed: Promise<void>;
readonly desiredSize: number | null;
diff --git a/cli/js/tests/streams_piping_test.ts b/cli/js/tests/streams_piping_test.ts
new file mode 100644
index 000000000..a947b3821
--- /dev/null
+++ b/cli/js/tests/streams_piping_test.ts
@@ -0,0 +1,131 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+import { unitTest, assert, assertEquals } from "./test_util.ts";
+import { assertThrowsAsync } from "../../../std/testing/asserts.ts";
+
+unitTest(function streamPipeLocks() {
+ const rs = new ReadableStream();
+ const ws = new WritableStream();
+
+ assertEquals(rs.locked, false);
+ assertEquals(ws.locked, false);
+
+ rs.pipeTo(ws);
+
+ assert(rs.locked);
+ assert(ws.locked);
+});
+
+unitTest(async function streamPipeFinishUnlocks() {
+ const rs = new ReadableStream({
+ start(controller: ReadableStreamDefaultController): void {
+ controller.close();
+ },
+ });
+ const ws = new WritableStream();
+
+ await rs.pipeTo(ws);
+ assertEquals(rs.locked, false);
+ assertEquals(ws.locked, false);
+});
+
+unitTest(async function streamPipeReadableStreamLocked() {
+ const rs = new ReadableStream();
+ const ws = new WritableStream();
+
+ rs.getReader();
+
+ await assertThrowsAsync(async () => {
+ await rs.pipeTo(ws);
+ }, TypeError);
+});
+
+unitTest(async function streamPipeReadableStreamLocked() {
+ const rs = new ReadableStream();
+ const ws = new WritableStream();
+
+ ws.getWriter();
+
+ await assertThrowsAsync(async () => {
+ await rs.pipeTo(ws);
+ }, TypeError);
+});
+
+unitTest(async function streamPipeLotsOfChunks() {
+ const CHUNKS = 10;
+
+ const rs = new ReadableStream<number>({
+ start(c: ReadableStreamDefaultController): void {
+ for (let i = 0; i < CHUNKS; ++i) {
+ c.enqueue(i);
+ }
+ c.close();
+ },
+ });
+
+ const written: Array<string | number> = [];
+ const ws = new WritableStream(
+ {
+ write(chunk: number): void {
+ written.push(chunk);
+ },
+ close(): void {
+ written.push("closed");
+ },
+ },
+ new CountQueuingStrategy({ highWaterMark: CHUNKS })
+ );
+
+ await rs.pipeTo(ws);
+ const targetValues = [];
+ for (let i = 0; i < CHUNKS; ++i) {
+ targetValues.push(i);
+ }
+ targetValues.push("closed");
+
+ assertEquals(written, targetValues, "the correct values must be written");
+
+ // Ensure both readable and writable are closed by the time the pipe finishes.
+ await Promise.all([rs.getReader().closed, ws.getWriter().closed]);
+});
+
+for (const preventAbort of [true, false]) {
+ unitTest(function undefinedRejectionFromPull() {
+ const rs = new ReadableStream({
+ pull(): Promise<void> {
+ return Promise.reject(undefined);
+ },
+ });
+
+ return rs.pipeTo(new WritableStream(), { preventAbort }).then(
+ () => {
+ throw new Error("pipeTo promise should be rejected");
+ },
+ (value) =>
+ assertEquals(value, undefined, "rejection value should be undefined")
+ );
+ });
+}
+
+for (const preventCancel of [true, false]) {
+ unitTest(function undefinedRejectionWithPreventCancel() {
+ const rs = new ReadableStream({
+ pull(controller: ReadableStreamDefaultController<number>): void {
+ controller.enqueue(0);
+ },
+ });
+
+ const ws = new WritableStream({
+ write(): Promise<void> {
+ return Promise.reject(undefined);
+ },
+ });
+
+ return rs.pipeTo(ws, { preventCancel }).then(
+ () => {
+ throw new Error("pipeTo promise should be rejected");
+ },
+ (value) =>
+ assertEquals(value, undefined, "rejection value should be undefined")
+ );
+ });
+}
diff --git a/cli/js/tests/streams_writable_test.ts b/cli/js/tests/streams_writable_test.ts
new file mode 100644
index 000000000..54c1624af
--- /dev/null
+++ b/cli/js/tests/streams_writable_test.ts
@@ -0,0 +1,253 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+import { unitTest, assert, assertEquals, assertThrows } from "./test_util.ts";
+
+unitTest(function writableStreamDesiredSizeOnReleasedWriter() {
+ const ws = new WritableStream();
+ const writer = ws.getWriter();
+ writer.releaseLock();
+ assertThrows(() => {
+ writer.desiredSize;
+ }, TypeError);
+});
+
+unitTest(function writableStreamDesiredSizeInitialValue() {
+ const ws = new WritableStream();
+ const writer = ws.getWriter();
+ assertEquals(writer.desiredSize, 1);
+});
+
+unitTest(async function writableStreamDesiredSizeClosed() {
+ const ws = new WritableStream();
+ const writer = ws.getWriter();
+ await writer.close();
+ assertEquals(writer.desiredSize, 0);
+});
+
+unitTest(function writableStreamStartThrowsDesiredSizeNull() {
+ const ws = new WritableStream({
+ start(c): void {
+ c.error();
+ },
+ });
+
+ const writer = ws.getWriter();
+ assertEquals(writer.desiredSize, null, "desiredSize should be null");
+});
+
+unitTest(function getWriterOnClosingStream() {
+ const ws = new WritableStream({});
+
+ const writer = ws.getWriter();
+ writer.close();
+ writer.releaseLock();
+
+ ws.getWriter();
+});
+
+unitTest(async function getWriterOnClosedStream() {
+ const ws = new WritableStream({});
+
+ const writer = ws.getWriter();
+ await writer.close();
+ writer.releaseLock();
+
+ ws.getWriter();
+});
+
+unitTest(function getWriterOnAbortedStream() {
+ const ws = new WritableStream({});
+
+ const writer = ws.getWriter();
+ writer.abort();
+ writer.releaseLock();
+
+ ws.getWriter();
+});
+
+unitTest(function getWriterOnErroredStream() {
+ const ws = new WritableStream({
+ start(c): void {
+ c.error();
+ },
+ });
+
+ const writer = ws.getWriter();
+ return writer.closed.then(
+ (v) => {
+ throw new Error(`writer.closed fulfilled unexpectedly with: ${v}`);
+ },
+ () => {
+ writer.releaseLock();
+ ws.getWriter();
+ }
+ );
+});
+
+unitTest(function closedAndReadyOnReleasedWriter() {
+ const ws = new WritableStream({});
+
+ const writer = ws.getWriter();
+ writer.releaseLock();
+
+ return writer.closed.then(
+ (v) => {
+ throw new Error("writer.closed fulfilled unexpectedly with: " + v);
+ },
+ (closedRejection) => {
+ assertEquals(
+ closedRejection.name,
+ "TypeError",
+ "closed promise should reject with a TypeError"
+ );
+ return writer.ready.then(
+ (v) => {
+ throw new Error("writer.ready fulfilled unexpectedly with: " + v);
+ },
+ (readyRejection) =>
+ assertEquals(
+ readyRejection,
+ closedRejection,
+ "ready promise should reject with the same error"
+ )
+ );
+ }
+ );
+});
+
+unitTest(function sinkMethodsCalledAsMethods() {
+ let thisObject: Sink | null = null;
+ // Calls to Sink methods after the first are implicitly ignored. Only the
+ // first value that is passed to the resolver is used.
+ class Sink {
+ start(): void {
+ assertEquals(this, thisObject, "start should be called as a method");
+ }
+
+ write(): void {
+ assertEquals(this, thisObject, "write should be called as a method");
+ }
+
+ close(): void {
+ assertEquals(this, thisObject, "close should be called as a method");
+ }
+
+ abort(): void {
+ assertEquals(this, thisObject, "abort should be called as a method");
+ }
+ }
+
+ const theSink = new Sink();
+ thisObject = theSink;
+ const ws = new WritableStream(theSink);
+
+ const writer = ws.getWriter();
+
+ writer.write("a");
+ const closePromise = writer.close();
+
+ const ws2 = new WritableStream(theSink);
+ const writer2 = ws2.getWriter();
+ const abortPromise = writer2.abort();
+
+ return Promise.all([closePromise, abortPromise]).then(undefined);
+});
+
+unitTest(function sizeShouldNotBeCalledAsMethod() {
+ const strategy = {
+ size(): number {
+ if (this !== undefined) {
+ throw new Error("size called as a method");
+ }
+ return 1;
+ },
+ };
+
+ const ws = new WritableStream({}, strategy);
+ const writer = ws.getWriter();
+ return writer.write("a");
+});
+
+unitTest(function redundantReleaseLockIsNoOp() {
+ const ws = new WritableStream();
+ const writer1 = ws.getWriter();
+ assertEquals(
+ undefined,
+ writer1.releaseLock(),
+ "releaseLock() should return undefined"
+ );
+ const writer2 = ws.getWriter();
+ assertEquals(
+ undefined,
+ writer1.releaseLock(),
+ "no-op releaseLock() should return undefined"
+ );
+ // Calling releaseLock() on writer1 should not interfere with writer2. If it did, then the ready promise would be
+ // rejected.
+ return writer2.ready;
+});
+
+unitTest(function readyPromiseShouldFireBeforeReleaseLock() {
+ const events: string[] = [];
+ const ws = new WritableStream();
+ const writer = ws.getWriter();
+ return writer.ready.then(() => {
+ // Force the ready promise back to a pending state.
+ const writerPromise = writer.write("dummy");
+ const readyPromise = writer.ready.catch(() => events.push("ready"));
+ const closedPromise = writer.closed.catch(() => events.push("closed"));
+ writer.releaseLock();
+ return Promise.all([readyPromise, closedPromise]).then(() => {
+ assertEquals(
+ events,
+ ["ready", "closed"],
+ "ready promise should fire before closed promise"
+ );
+ // Stop the writer promise hanging around after the test has finished.
+ return Promise.all([writerPromise, ws.abort()]).then(undefined);
+ });
+ });
+});
+
+unitTest(function subclassingWritableStream() {
+ class Subclass extends WritableStream {
+ extraFunction(): boolean {
+ return true;
+ }
+ }
+ assert(
+ Object.getPrototypeOf(Subclass.prototype) === WritableStream.prototype,
+ "Subclass.prototype's prototype should be WritableStream.prototype"
+ );
+ assert(
+ Object.getPrototypeOf(Subclass) === WritableStream,
+ "Subclass's prototype should be WritableStream"
+ );
+ const sub = new Subclass();
+ assert(
+ sub instanceof WritableStream,
+ "Subclass object should be an instance of WritableStream"
+ );
+ assert(
+ sub instanceof Subclass,
+ "Subclass object should be an instance of Subclass"
+ );
+ const lockedGetter = Object.getOwnPropertyDescriptor(
+ WritableStream.prototype,
+ "locked"
+ )!.get!;
+ assert(
+ lockedGetter.call(sub) === sub.locked,
+ "Subclass object should pass brand check"
+ );
+ assert(
+ sub.extraFunction(),
+ "extraFunction() should be present on Subclass object"
+ );
+});
+
+unitTest(function lockedGetterShouldReturnTrue() {
+ const ws = new WritableStream();
+ assert(!ws.locked, "stream should not be locked");
+ ws.getWriter();
+ assert(ws.locked, "stream should be locked");
+});
diff --git a/cli/js/tests/unit_tests.ts b/cli/js/tests/unit_tests.ts
index fa168e6dd..db2df2216 100644
--- a/cli/js/tests/unit_tests.ts
+++ b/cli/js/tests/unit_tests.ts
@@ -53,6 +53,8 @@ import "./request_test.ts";
import "./resources_test.ts";
import "./signal_test.ts";
import "./stat_test.ts";
+import "./streams_piping_test.ts";
+import "./streams_writable_test.ts";
import "./symlink_test.ts";
import "./text_encoding_test.ts";
import "./testing_test.ts";
diff --git a/cli/js/util.ts b/cli/js/util.ts
index 6db8ade7b..309bfcd0c 100644
--- a/cli/js/util.ts
+++ b/cli/js/util.ts
@@ -20,9 +20,17 @@ export function log(...args: unknown[]): void {
}
// @internal
-export function assert(cond: unknown, msg = "assert"): asserts cond {
+export class AssertionError extends Error {
+ constructor(msg?: string) {
+ super(msg);
+ this.name = "AssertionError";
+ }
+}
+
+// @internal
+export function assert(cond: unknown, msg = "Assertion failed."): asserts cond {
if (!cond) {
- throw Error(msg);
+ throw new AssertionError(msg);
}
}
diff --git a/cli/js/web/dom_types.d.ts b/cli/js/web/dom_types.d.ts
index a78fd7d9e..b5b172ccd 100644
--- a/cli/js/web/dom_types.d.ts
+++ b/cli/js/web/dom_types.d.ts
@@ -253,22 +253,6 @@ export interface Body {
text(): Promise<string>;
}
-export interface WritableStream<W = any> {
- readonly locked: boolean;
- abort(reason?: any): Promise<void>;
- getWriter(): WritableStreamDefaultWriter<W>;
-}
-
-export interface WritableStreamDefaultWriter<W = any> {
- readonly closed: Promise<void>;
- readonly desiredSize: number | null;
- readonly ready: Promise<void>;
- abort(reason?: any): Promise<void>;
- close(): Promise<void>;
- releaseLock(): void;
- write(chunk: W): Promise<void>;
-}
-
export interface RequestInit {
body?: BodyInit | null;
cache?: RequestCache;
diff --git a/cli/js/web/fetch.ts b/cli/js/web/fetch.ts
index cbedcabfd..38ca03aca 100644
--- a/cli/js/web/fetch.ts
+++ b/cli/js/web/fetch.ts
@@ -258,7 +258,7 @@ class Body
pipeThrough<T>(
_: {
- writable: domTypes.WritableStream<Uint8Array>;
+ writable: WritableStream<Uint8Array>;
readable: ReadableStream<T>;
},
_options?: PipeOptions
@@ -267,7 +267,7 @@ class Body
}
pipeTo(
- _dest: domTypes.WritableStream<Uint8Array>,
+ _dest: WritableStream<Uint8Array>,
_options?: PipeOptions
): Promise<void> {
return notImplemented();
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts
index 2559d9e5c..846db096e 100644
--- a/cli/js/web/streams/internals.ts
+++ b/cli/js/web/streams/internals.ts
@@ -13,13 +13,25 @@ import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_c
import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
+import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
+import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
+import { WritableStreamImpl } from "./writable_stream.ts";
+import { AbortSignalImpl } from "../abort_signal.ts";
+import { DOMExceptionImpl as DOMException } from "../dom_exception.ts";
import { cloneValue } from "../util.ts";
-import { assert } from "../../util.ts";
+import { assert, AssertionError } from "../../util.ts";
+export type AbortAlgorithm = (reason?: any) => PromiseLike<void>;
+export interface AbortRequest {
+ promise: Deferred<void>;
+ reason?: any;
+ wasAlreadyErroring: boolean;
+}
export interface BufferQueueItem extends Pair<ArrayBuffer | SharedArrayBuffer> {
offset: number;
}
export type CancelAlgorithm = (reason?: any) => PromiseLike<void>;
+export type CloseAlgorithm = () => PromiseLike<void>;
type Container<R = any> = {
[sym.queue]: Array<Pair<R> | BufferQueueItem>;
[sym.queueTotalSize]: number;
@@ -28,11 +40,11 @@ export type Pair<R> = { value: R; size: number };
export type PullAlgorithm = () => PromiseLike<void>;
export type SizeAlgorithm<T> = (chunk: T) => number;
export type StartAlgorithm = () => void | PromiseLike<void>;
+export type WriteAlgorithm<W> = (chunk: W) => Promise<void>;
export interface Deferred<T> {
promise: Promise<T>;
resolve?: (value?: T | PromiseLike<T>) => void;
reject?: (reason?: any) => void;
- settled: boolean;
}
export interface ReadableStreamGenericReader<R = any>
@@ -58,6 +70,12 @@ export function acquireReadableStreamDefaultReader<T>(
return reader;
}
+export function acquireWritableStreamDefaultWriter<W>(
+ stream: WritableStreamImpl<W>
+): WritableStreamDefaultWriterImpl<W> {
+ return new WritableStreamDefaultWriterImpl(stream);
+}
+
function createAlgorithmFromUnderlyingMethod<
O extends UnderlyingByteSource | UnderlyingSource,
P extends keyof O
@@ -157,14 +175,14 @@ function enqueueValueWithSize<R>(
/** Non-spec mechanism to "unwrap" a promise and store it to be resolved
* later. */
-function getDeferred<T>(): Deferred<T> {
- let resolve = undefined;
- let reject = undefined;
+export function getDeferred<T>(): Required<Deferred<T>> {
+ let resolve: (value?: T | PromiseLike<T>) => void;
+ let reject: (reason?: any) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
- return { promise, resolve, reject, settled: false };
+ return { promise, resolve: resolve!, reject: reject! };
}
export function initializeReadableStream(stream: ReadableStreamImpl): void {
@@ -173,6 +191,17 @@ export function initializeReadableStream(stream: ReadableStreamImpl): void {
stream[sym.disturbed] = false;
}
+export function initializeWritableStream(stream: WritableStreamImpl): void {
+ stream[sym.state] = "writable";
+ stream[sym.storedError] = stream[sym.writer] = stream[
+ sym.writableStreamController
+ ] = stream[sym.inFlightWriteRequest] = stream[sym.closeRequest] = stream[
+ sym.inFlightCloseRequest
+ ] = stream[sym.pendingAbortRequest] = undefined;
+ stream[sym.writeRequests] = [];
+ stream[sym.backpressure] = false;
+}
+
function invokeOrNoop<O extends any, P extends keyof O>(
o: O,
p: P,
@@ -278,6 +307,40 @@ export function isUnderlyingByteSource(
return typeString === "bytes";
}
+export function isWritableStream(x: unknown): x is WritableStreamImpl {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.writableStreamController in x)
+ ? false
+ : true;
+}
+
+export function isWritableStreamDefaultController(
+ x: unknown
+): x is WritableStreamDefaultControllerImpl<any> {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.controlledWritableStream in x)
+ ? false
+ : true;
+}
+
+export function isWritableStreamDefaultWriter(
+ x: unknown
+): x is WritableStreamDefaultWriterImpl<any> {
+ return typeof x !== "object" || x === null || !(sym.ownerWritableStream in x)
+ ? false
+ : true;
+}
+
+export function isWritableStreamLocked(stream: WritableStreamImpl): boolean {
+ assert(isWritableStream(stream));
+ if (stream[sym.writer] === undefined) {
+ return false;
+ }
+ return true;
+}
+
export function makeSizeAlgorithmFromSizeFunction<T>(
size: QueuingStrategySizeCallback<T> | undefined
): SizeAlgorithm<T> {
@@ -292,6 +355,13 @@ export function makeSizeAlgorithmFromSizeFunction<T>(
};
}
+function peekQueueValue<T>(container: Container<T>): T | "close" {
+ assert(sym.queue in container && sym.queueTotalSize in container);
+ assert(container[sym.queue].length);
+ const [pair] = container[sym.queue];
+ return pair.value as T;
+}
+
function readableByteStreamControllerShouldCallPull(
controller: ReadableByteStreamControllerImpl
): boolean {
@@ -333,25 +403,27 @@ export function readableByteStreamControllerCallPullIfNeeded(
assert(controller[sym.pullAgain] === false);
controller[sym.pulling] = true;
const pullPromise = controller[sym.pullAlgorithm]();
- pullPromise.then(
- () => {
- controller[sym.pulling] = false;
- if (controller[sym.pullAgain]) {
- controller[sym.pullAgain];
- readableByteStreamControllerCallPullIfNeeded(controller);
+ setPromiseIsHandledToTrue(
+ pullPromise.then(
+ () => {
+ controller[sym.pulling] = false;
+ if (controller[sym.pullAgain]) {
+ controller[sym.pullAgain] = false;
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+ },
+ (e) => {
+ readableByteStreamControllerError(controller, e);
}
- },
- (e) => {
- readableByteStreamControllerError(controller, e);
- }
+ )
);
}
export function readableByteStreamControllerClearAlgorithms(
controller: ReadableByteStreamControllerImpl
): void {
- delete controller[sym.pullAlgorithm];
- delete controller[sym.cancelAlgorithm];
+ (controller as any)[sym.pullAlgorithm] = undefined;
+ (controller as any)[sym.cancelAlgorithm] = undefined;
}
export function readableByteStreamControllerClose(
@@ -476,7 +548,7 @@ export function readableStreamAddReadRequest<R>(
return promise.promise;
}
-export async function readableStreamCancel<T>(
+export function readableStreamCancel<T>(
stream: ReadableStreamImpl<T>,
reason: any
): Promise<void> {
@@ -488,7 +560,9 @@ export async function readableStreamCancel<T>(
return Promise.reject(stream[sym.storedError]);
}
readableStreamClose(stream);
- await stream[sym.readableStreamController]![sym.cancelSteps](reason);
+ return stream[sym.readableStreamController]![sym.cancelSteps](reason).then(
+ () => undefined
+ ) as Promise<void>;
}
export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void {
@@ -514,7 +588,6 @@ export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void {
const resolve = reader[sym.closedPromise].resolve;
assert(resolve);
resolve();
- reader[sym.closedPromise].settled = true;
}
export function readableStreamCreateReadResult<T>(
@@ -573,9 +646,9 @@ export function readableStreamDefaultControllerCanCloseOrEnqueue<T>(
export function readableStreamDefaultControllerClearAlgorithms<T>(
controller: ReadableStreamDefaultControllerImpl<T>
): void {
- delete controller[sym.pullAlgorithm];
- delete controller[sym.cancelAlgorithm];
- delete controller[sym.strategySizeAlgorithm];
+ (controller as any)[sym.pullAlgorithm] = undefined;
+ (controller as any)[sym.cancelAlgorithm] = undefined;
+ (controller as any)[sym.strategySizeAlgorithm] = undefined;
}
export function readableStreamDefaultControllerClose<T>(
@@ -703,17 +776,18 @@ export function readableStreamError(stream: ReadableStreamImpl, e: any): void {
}
if (isReadableStreamDefaultReader(reader)) {
for (const readRequest of reader[sym.readRequests]) {
- const { reject } = readRequest;
- assert(reject);
- reject(e);
+ assert(readRequest.reject);
+ readRequest.reject(e);
+ readRequest.reject = undefined;
+ readRequest.resolve = undefined;
}
reader[sym.readRequests] = [];
}
// 3.5.6.8 Otherwise, support BYOB Reader
- const { reject } = reader[sym.closedPromise];
- assert(reject);
- reject(e);
- reader[sym.closedPromise].settled = true;
+ reader[sym.closedPromise].reject!(e);
+ reader[sym.closedPromise].reject = undefined;
+ reader[sym.closedPromise].resolve = undefined;
+ setPromiseIsHandledToTrue(reader[sym.closedPromise].promise);
}
export function readableStreamFulfillReadRequest<R>(
@@ -744,6 +818,252 @@ export function readableStreamHasDefaultReader(
: true;
}
+export function readableStreamPipeTo<T>(
+ source: ReadableStreamImpl<T>,
+ dest: WritableStreamImpl<T>,
+ preventClose: boolean,
+ preventAbort: boolean,
+ preventCancel: boolean,
+ signal: AbortSignalImpl | undefined
+): Promise<void> {
+ assert(isReadableStream(source));
+ assert(isWritableStream(dest));
+ assert(
+ typeof preventClose === "boolean" &&
+ typeof preventAbort === "boolean" &&
+ typeof preventCancel === "boolean"
+ );
+ assert(signal === undefined || signal instanceof AbortSignalImpl);
+ assert(!isReadableStreamLocked(source));
+ assert(!isWritableStreamLocked(dest));
+ const reader = acquireReadableStreamDefaultReader(source);
+ const writer = acquireWritableStreamDefaultWriter(dest);
+ source[sym.disturbed] = true;
+ let shuttingDown = false;
+ const promise = getDeferred<void>();
+ let abortAlgorithm: () => void;
+ if (signal) {
+ abortAlgorithm = (): void => {
+ const error = new DOMException("Abort signal received.", "AbortSignal");
+ const actions: Array<() => Promise<void>> = [];
+ if (!preventAbort) {
+ actions.push(() => {
+ if (dest[sym.state] === "writable") {
+ return writableStreamAbort(dest, error);
+ } else {
+ return Promise.resolve(undefined);
+ }
+ });
+ }
+ if (!preventCancel) {
+ actions.push(() => {
+ if (source[sym.state] === "readable") {
+ return readableStreamCancel(source, error);
+ } else {
+ return Promise.resolve(undefined);
+ }
+ });
+ }
+ shutdownWithAction(
+ () => Promise.all(actions.map((action) => action())),
+ true,
+ error
+ );
+ };
+ if (signal.aborted) {
+ abortAlgorithm();
+ return promise.promise;
+ }
+ signal.addEventListener("abort", abortAlgorithm);
+ }
+
+ let currentWrite = Promise.resolve();
+
+ // At this point, the spec becomes non-specific and vague. Most of the rest
+ // of this code is based on the reference implementation that is part of the
+ // specification. This is why the functions are only scoped to this function
+ // to ensure they don't leak into the spec compliant parts.
+
+ function isOrBecomesClosed(
+ stream: ReadableStreamImpl | WritableStreamImpl,
+ promise: Promise<void>,
+ action: () => void
+ ): void {
+ if (stream[sym.state] === "closed") {
+ action();
+ } else {
+ setPromiseIsHandledToTrue(promise.then(action));
+ }
+ }
+
+ function isOrBecomesErrored(
+ stream: ReadableStreamImpl | WritableStreamImpl,
+ promise: Promise<void>,
+ action: (error: any) => void
+ ): void {
+ if (stream[sym.state] === "errored") {
+ action(stream[sym.storedError]);
+ } else {
+ setPromiseIsHandledToTrue(promise.catch((error) => action(error)));
+ }
+ }
+
+ function finalize(isError?: boolean, error?: any): void {
+ writableStreamDefaultWriterRelease(writer);
+ readableStreamReaderGenericRelease(reader);
+
+ if (signal) {
+ signal.removeEventListener("abort", abortAlgorithm);
+ }
+ if (isError) {
+ promise.reject(error);
+ } else {
+ promise.resolve();
+ }
+ }
+
+ function waitForWritesToFinish(): Promise<void> {
+ const oldCurrentWrite = currentWrite;
+ return currentWrite.then(() =>
+ oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined
+ );
+ }
+
+ function shutdownWithAction(
+ action: () => Promise<any>,
+ originalIsError?: boolean,
+ originalError?: any
+ ): void {
+ function doTheRest(): void {
+ setPromiseIsHandledToTrue(
+ action().then(
+ () => finalize(originalIsError, originalError),
+ (newError) => finalize(true, newError)
+ )
+ );
+ }
+
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+
+ if (
+ dest[sym.state] === "writable" &&
+ writableStreamCloseQueuedOrInFlight(dest) === false
+ ) {
+ setPromiseIsHandledToTrue(waitForWritesToFinish().then(doTheRest));
+ } else {
+ doTheRest();
+ }
+ }
+
+ function shutdown(isError: boolean, error?: any): void {
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+
+ if (
+ dest[sym.state] === "writable" &&
+ !writableStreamCloseQueuedOrInFlight(dest)
+ ) {
+ setPromiseIsHandledToTrue(
+ waitForWritesToFinish().then(() => finalize(isError, error))
+ );
+ }
+ finalize(isError, error);
+ }
+
+ function pipeStep(): Promise<boolean> {
+ if (shuttingDown) {
+ return Promise.resolve(true);
+ }
+ return writer[sym.readyPromise].promise.then(() => {
+ return readableStreamDefaultReaderRead(reader).then(({ value, done }) => {
+ if (done === true) {
+ return true;
+ }
+ currentWrite = writableStreamDefaultWriterWrite(
+ writer,
+ value!
+ ).then(undefined, () => {});
+ return false;
+ });
+ });
+ }
+
+ function pipeLoop(): Promise<void> {
+ return new Promise((resolveLoop, rejectLoop) => {
+ function next(done: boolean): void {
+ if (done) {
+ resolveLoop(undefined);
+ } else {
+ setPromiseIsHandledToTrue(pipeStep().then(next, rejectLoop));
+ }
+ }
+ next(false);
+ });
+ }
+
+ isOrBecomesErrored(
+ source,
+ reader[sym.closedPromise].promise,
+ (storedError) => {
+ if (!preventAbort) {
+ shutdownWithAction(
+ () => writableStreamAbort(dest, storedError),
+ true,
+ storedError
+ );
+ } else {
+ shutdown(true, storedError);
+ }
+ }
+ );
+
+ isOrBecomesErrored(dest, writer[sym.closedPromise].promise, (storedError) => {
+ if (!preventCancel) {
+ shutdownWithAction(
+ () => readableStreamCancel(source, storedError),
+ true,
+ storedError
+ );
+ } else {
+ shutdown(true, storedError);
+ }
+ });
+
+ isOrBecomesClosed(source, reader[sym.closedPromise].promise, () => {
+ if (!preventClose) {
+ shutdownWithAction(() =>
+ writableStreamDefaultWriterCloseWithErrorPropagation(writer)
+ );
+ }
+ });
+
+ if (
+ writableStreamCloseQueuedOrInFlight(dest) ||
+ dest[sym.state] === "closed"
+ ) {
+ const destClosed = new TypeError(
+ "The destination writable stream closed before all data could be piped to it."
+ );
+ if (!preventCancel) {
+ shutdownWithAction(
+ () => readableStreamCancel(source, destClosed),
+ true,
+ destClosed
+ );
+ } else {
+ shutdown(true, destClosed);
+ }
+ }
+
+ setPromiseIsHandledToTrue(pipeLoop());
+ return promise.promise;
+}
+
export function readableStreamReaderGenericCancel<R = any>(
reader: ReadableStreamGenericReader<R>,
reason: any
@@ -763,16 +1083,13 @@ export function readableStreamReaderGenericInitialize<R = any>(
if (stream[sym.state] === "readable") {
reader[sym.closedPromise] = getDeferred();
} else if (stream[sym.state] === "closed") {
- reader[sym.closedPromise] = {
- promise: Promise.resolve(),
- settled: true,
- };
+ reader[sym.closedPromise] = { promise: Promise.resolve() };
} else {
assert(stream[sym.state] === "errored");
reader[sym.closedPromise] = {
promise: Promise.reject(stream[sym.storedError]),
- settled: true,
};
+ setPromiseIsHandledToTrue(reader[sym.closedPromise].promise);
}
}
@@ -790,9 +1107,9 @@ export function readableStreamReaderGenericRelease<R = any>(
delete closedPromise.reject;
delete closedPromise.resolve;
}
- closedPromise.settled = true;
- delete reader[sym.ownerReadableStream][sym.reader];
- delete reader[sym.ownerReadableStream];
+ setPromiseIsHandledToTrue(closedPromise.promise);
+ reader[sym.ownerReadableStream][sym.reader] = undefined;
+ (reader as any)[sym.ownerReadableStream] = undefined;
}
export function readableStreamTee<T>(
@@ -817,51 +1134,54 @@ export function readableStreamTee<T>(
return Promise.resolve();
}
reading = true;
- readableStreamDefaultReaderRead(reader).then((result) => {
- reading = false;
- assert(typeof result === "object");
- const { done } = result;
- assert(typeof done === "boolean");
- if (done) {
+ const readPromise = readableStreamDefaultReaderRead(reader).then(
+ (result) => {
+ reading = false;
+ assert(typeof result === "object");
+ const { done } = result;
+ assert(typeof done === "boolean");
+ if (done) {
+ if (!canceled1) {
+ readableStreamDefaultControllerClose(
+ branch1[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl
+ );
+ }
+ if (!canceled2) {
+ readableStreamDefaultControllerClose(
+ branch2[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl
+ );
+ }
+ return;
+ }
+ const { value } = result;
+ const value1 = value!;
+ let value2 = value!;
+ if (!canceled2 && cloneForBranch2) {
+ value2 = cloneValue(value2);
+ }
if (!canceled1) {
- readableStreamDefaultControllerClose(
+ readableStreamDefaultControllerEnqueue(
branch1[
sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl
+ ] as ReadableStreamDefaultControllerImpl,
+ value1
);
}
if (!canceled2) {
- readableStreamDefaultControllerClose(
+ readableStreamDefaultControllerEnqueue(
branch2[
sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl
+ ] as ReadableStreamDefaultControllerImpl,
+ value2
);
}
- return;
- }
- const { value } = result;
- const value1 = value!;
- let value2 = value!;
- if (!canceled2 && cloneForBranch2) {
- value2 = cloneValue(value2);
}
- if (!canceled1) {
- readableStreamDefaultControllerEnqueue(
- branch1[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- value1
- );
- }
- if (!canceled2) {
- readableStreamDefaultControllerEnqueue(
- branch2[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- value2
- );
- }
- });
+ );
+ setPromiseIsHandledToTrue(readPromise);
return Promise.resolve();
};
const cancel1Algorithm = (reason?: any): PromiseLike<void> => {
@@ -870,7 +1190,6 @@ export function readableStreamTee<T>(
if (canceled2) {
const compositeReason = [reason1, reason2];
const cancelResult = readableStreamCancel(stream, compositeReason);
- assert(cancelPromise.resolve);
cancelPromise.resolve(cancelResult);
}
return cancelPromise.promise;
@@ -881,7 +1200,6 @@ export function readableStreamTee<T>(
if (canceled1) {
const compositeReason = [reason1, reason2];
const cancelResult = readableStreamCancel(stream, compositeReason);
- assert(cancelPromise.resolve);
cancelPromise.resolve(cancelResult);
}
return cancelPromise.promise;
@@ -897,20 +1215,22 @@ export function readableStreamTee<T>(
pullAlgorithm,
cancel2Algorithm
);
- reader[sym.closedPromise].promise.catch((r) => {
- readableStreamDefaultControllerError(
- branch1[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- r
- );
- readableStreamDefaultControllerError(
- branch2[
- sym.readableStreamController
- ] as ReadableStreamDefaultControllerImpl,
- r
- );
- });
+ setPromiseIsHandledToTrue(
+ reader[sym.closedPromise].promise.catch((r) => {
+ readableStreamDefaultControllerError(
+ branch1[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ r
+ );
+ readableStreamDefaultControllerError(
+ branch2[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ r
+ );
+ })
+ );
return [branch1, branch2];
}
@@ -920,6 +1240,25 @@ export function resetQueue<R>(container: Container<R>): void {
container[sym.queueTotalSize] = 0;
}
+/** An internal function which provides a function name for some generated
+ * functions, so stack traces are a bit more readable. */
+export function setFunctionName(fn: Function, value: string): void {
+ Object.defineProperty(fn, "name", { value, configurable: true });
+}
+
+/** An internal function which mimics the behavior of setting the promise to
+ * handled in JavaScript. In this situation, an assertion failure, which
+ * shouldn't happen will get thrown, instead of swallowed. */
+export function setPromiseIsHandledToTrue(promise: PromiseLike<unknown>): void {
+ promise.then(undefined, (e) => {
+ if (e && e instanceof AssertionError) {
+ queueMicrotask(() => {
+ throw e;
+ });
+ }
+ });
+}
+
function setUpReadableByteStreamController(
stream: ReadableStreamImpl,
controller: ReadableByteStreamControllerImpl,
@@ -950,16 +1289,18 @@ function setUpReadableByteStreamController(
stream[sym.readableStreamController] = controller;
const startResult = startAlgorithm();
const startPromise = Promise.resolve(startResult);
- startPromise.then(
- () => {
- controller[sym.started] = true;
- assert(!controller[sym.pulling]);
- assert(!controller[sym.pullAgain]);
- readableByteStreamControllerCallPullIfNeeded(controller);
- },
- (r) => {
- readableByteStreamControllerError(controller, r);
- }
+ setPromiseIsHandledToTrue(
+ startPromise.then(
+ () => {
+ controller[sym.started] = true;
+ assert(!controller[sym.pulling]);
+ assert(!controller[sym.pullAgain]);
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ },
+ (r) => {
+ readableByteStreamControllerError(controller, r);
+ }
+ )
);
}
@@ -981,11 +1322,13 @@ export function setUpReadableByteStreamControllerFromUnderlyingSource(
0,
controller
);
+ setFunctionName(pullAlgorithm, "[[pullAlgorithm]]");
const cancelAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingByteSource,
"cancel",
1
);
+ setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]");
// 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize").
const autoAllocateChunkSize = undefined;
setUpReadableByteStreamController(
@@ -1022,16 +1365,18 @@ function setUpReadableStreamDefaultController<T>(
stream[sym.readableStreamController] = controller;
const startResult = startAlgorithm();
const startPromise = Promise.resolve(startResult);
- startPromise.then(
- () => {
- controller[sym.started] = true;
- assert(controller[sym.pulling] === false);
- assert(controller[sym.pullAgain] === false);
- readableStreamDefaultControllerCallPullIfNeeded(controller);
- },
- (r) => {
- readableStreamDefaultControllerError(controller, r);
- }
+ setPromiseIsHandledToTrue(
+ startPromise.then(
+ () => {
+ controller[sym.started] = true;
+ assert(controller[sym.pulling] === false);
+ assert(controller[sym.pullAgain] === false);
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ },
+ (r) => {
+ readableStreamDefaultControllerError(controller, r);
+ }
+ )
);
}
@@ -1053,11 +1398,13 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
0,
controller
);
+ setFunctionName(pullAlgorithm, "[[pullAlgorithm]]");
const cancelAlgorithm: CancelAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingSource,
"cancel",
1
);
+ setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]");
setUpReadableStreamDefaultController(
stream,
controller,
@@ -1069,6 +1416,98 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
);
}
+function setUpWritableStreamDefaultController<W>(
+ stream: WritableStreamImpl<W>,
+ controller: WritableStreamDefaultControllerImpl<W>,
+ startAlgorithm: StartAlgorithm,
+ writeAlgorithm: WriteAlgorithm<W>,
+ closeAlgorithm: CloseAlgorithm,
+ abortAlgorithm: AbortAlgorithm,
+ highWaterMark: number,
+ sizeAlgorithm: SizeAlgorithm<W>
+): void {
+ assert(isWritableStream(stream));
+ assert(stream[sym.writableStreamController] === undefined);
+ controller[sym.controlledWritableStream] = stream;
+ stream[sym.writableStreamController] = controller;
+ controller[sym.queue] = [];
+ controller[sym.queueTotalSize] = 0;
+ controller[sym.started] = false;
+ controller[sym.strategySizeAlgorithm] = sizeAlgorithm;
+ controller[sym.strategyHWM] = highWaterMark;
+ controller[sym.writeAlgorithm] = writeAlgorithm;
+ controller[sym.closeAlgorithm] = closeAlgorithm;
+ controller[sym.abortAlgorithm] = abortAlgorithm;
+ const backpressure = writableStreamDefaultControllerGetBackpressure(
+ controller
+ );
+ writableStreamUpdateBackpressure(stream, backpressure);
+ const startResult = startAlgorithm();
+ const startPromise = Promise.resolve(startResult);
+ setPromiseIsHandledToTrue(
+ startPromise.then(
+ () => {
+ assert(
+ stream[sym.state] === "writable" || stream[sym.state] === "erroring"
+ );
+ controller[sym.started] = true;
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ (r) => {
+ assert(
+ stream[sym.state] === "writable" || stream[sym.state] === "erroring"
+ );
+ controller[sym.started] = true;
+ writableStreamDealWithRejection(stream, r);
+ }
+ )
+ );
+}
+
+export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>(
+ stream: WritableStreamImpl<W>,
+ underlyingSink: UnderlyingSink<W>,
+ highWaterMark: number,
+ sizeAlgorithm: SizeAlgorithm<W>
+): void {
+ assert(underlyingSink);
+ const controller = Object.create(
+ WritableStreamDefaultControllerImpl.prototype
+ );
+ const startAlgorithm = (): void | PromiseLike<void> => {
+ return invokeOrNoop(underlyingSink, "start", controller);
+ };
+ const writeAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingSink,
+ "write",
+ 1,
+ controller
+ );
+ setFunctionName(writeAlgorithm, "[[writeAlgorithm]]");
+ const closeAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingSink,
+ "close",
+ 0
+ );
+ setFunctionName(closeAlgorithm, "[[closeAlgorithm]]");
+ const abortAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingSink,
+ "abort",
+ 1
+ );
+ setFunctionName(abortAlgorithm, "[[abortAlgorithm]]");
+ setUpWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+}
+
function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer {
assert(!isDetachedBuffer(buffer));
const transferredIshVersion = buffer.slice(0);
@@ -1095,4 +1534,571 @@ export function validateAndNormalizeHighWaterMark(
return highWaterMark;
}
+export function writableStreamAbort<W>(
+ stream: WritableStreamImpl<W>,
+ reason: any
+): Promise<void> {
+ const state = stream[sym.state];
+ if (state === "closed" || state === "errored") {
+ return Promise.resolve(undefined);
+ }
+ if (stream[sym.pendingAbortRequest]) {
+ return stream[sym.pendingAbortRequest]!.promise.promise;
+ }
+ assert(state === "writable" || state === "erroring");
+ let wasAlreadyErroring = false;
+ if (state === "erroring") {
+ wasAlreadyErroring = true;
+ reason = undefined;
+ }
+ const promise = getDeferred<void>();
+ stream[sym.pendingAbortRequest] = { promise, reason, wasAlreadyErroring };
+
+ if (wasAlreadyErroring === false) {
+ writableStreamStartErroring(stream, reason);
+ }
+ return promise.promise;
+}
+
+function writableStreamAddWriteRequest<W>(
+ stream: WritableStreamImpl<W>
+): Promise<void> {
+ assert(isWritableStream(stream));
+ assert(stream[sym.state] === "writable");
+ const promise = getDeferred<void>();
+ stream[sym.writeRequests].push(promise);
+ return promise.promise;
+}
+
+export function writableStreamClose<W>(
+ stream: WritableStreamImpl<W>
+): Promise<void> {
+ const state = stream[sym.state];
+ if (state === "closed" || state === "errored") {
+ return Promise.reject(
+ new TypeError("Cannot close an already closed or errored WritableStream.")
+ );
+ }
+ assert(!writableStreamCloseQueuedOrInFlight(stream));
+ const promise = getDeferred<void>();
+ stream[sym.closeRequest] = promise;
+ const writer = stream[sym.writer];
+ if (writer && stream[sym.backpressure] && state === "writable") {
+ writer[sym.readyPromise].resolve!();
+ writer[sym.readyPromise].resolve = undefined;
+ writer[sym.readyPromise].reject = undefined;
+ }
+ writableStreamDefaultControllerClose(stream[sym.writableStreamController]!);
+ return promise.promise;
+}
+
+export function writableStreamCloseQueuedOrInFlight<W>(
+ stream: WritableStreamImpl<W>
+): boolean {
+ if (
+ stream[sym.closeRequest] === undefined &&
+ stream[sym.inFlightCloseRequest] === undefined
+ ) {
+ return false;
+ }
+ return true;
+}
+
+function writableStreamDealWithRejection<W>(
+ stream: WritableStreamImpl<W>,
+ error: any
+): void {
+ const state = stream[sym.state];
+ if (state === "writable") {
+ writableStreamStartErroring(stream, error);
+ return;
+ }
+ assert(state === "erroring");
+ writableStreamFinishErroring(stream);
+}
+
+function writableStreamDefaultControllerAdvanceQueueIfNeeded<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): void {
+ const stream = controller[sym.controlledWritableStream];
+ if (!controller[sym.started]) {
+ return;
+ }
+ if (stream[sym.inFlightWriteRequest]) {
+ return;
+ }
+ const state = stream[sym.state];
+ assert(state !== "closed" && state !== "errored");
+ if (state === "erroring") {
+ writableStreamFinishErroring(stream);
+ return;
+ }
+ if (!controller[sym.queue].length) {
+ return;
+ }
+ const writeRecord = peekQueueValue(controller);
+ if (writeRecord === "close") {
+ writableStreamDefaultControllerProcessClose(controller);
+ } else {
+ writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk);
+ }
+}
+
+export function writableStreamDefaultControllerClearAlgorithms<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): void {
+ (controller as any)[sym.writeAlgorithm] = undefined;
+ (controller as any)[sym.closeAlgorithm] = undefined;
+ (controller as any)[sym.abortAlgorithm] = undefined;
+ (controller as any)[sym.strategySizeAlgorithm] = undefined;
+}
+
+function writableStreamDefaultControllerClose<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): void {
+ enqueueValueWithSize(controller, "close", 0);
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+}
+
+export function writableStreamDefaultControllerError<W>(
+ controller: WritableStreamDefaultControllerImpl<W>,
+ error: any
+): void {
+ const stream = controller[sym.controlledWritableStream];
+ assert(stream[sym.state] === "writable");
+ writableStreamDefaultControllerClearAlgorithms(controller);
+ writableStreamStartErroring(stream, error);
+}
+
+function writableStreamDefaultControllerErrorIfNeeded<W>(
+ controller: WritableStreamDefaultControllerImpl<W>,
+ error: any
+): void {
+ if (controller[sym.controlledWritableStream][sym.state] === "writable") {
+ writableStreamDefaultControllerError(controller, error);
+ }
+}
+
+function writableStreamDefaultControllerGetBackpressure<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): boolean {
+ const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller);
+ return desiredSize <= 0;
+}
+
+function writableStreamDefaultControllerGetChunkSize<W>(
+ controller: WritableStreamDefaultControllerImpl<W>,
+ chunk: W
+): number {
+ let returnValue: number;
+ try {
+ returnValue = controller[sym.strategySizeAlgorithm](chunk);
+ } catch (e) {
+ writableStreamDefaultControllerErrorIfNeeded(controller, e);
+ return 1;
+ }
+ return returnValue;
+}
+
+function writableStreamDefaultControllerGetDesiredSize<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): number {
+ return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
+}
+
+function writableStreamDefaultControllerProcessClose<W>(
+ controller: WritableStreamDefaultControllerImpl<W>
+): void {
+ const stream = controller[sym.controlledWritableStream];
+ writableStreamMarkCloseRequestInFlight(stream);
+ dequeueValue(controller);
+ assert(controller[sym.queue].length === 0);
+ const sinkClosePromise = controller[sym.closeAlgorithm]();
+ writableStreamDefaultControllerClearAlgorithms(controller);
+ setPromiseIsHandledToTrue(
+ sinkClosePromise.then(
+ () => {
+ writableStreamFinishInFlightClose(stream);
+ },
+ (reason) => {
+ writableStreamFinishInFlightCloseWithError(stream, reason);
+ }
+ )
+ );
+}
+
+function writableStreamDefaultControllerProcessWrite<W>(
+ controller: WritableStreamDefaultControllerImpl<W>,
+ chunk: W
+): void {
+ const stream = controller[sym.controlledWritableStream];
+ writableStreamMarkFirstWriteRequestInFlight(stream);
+ const sinkWritePromise = controller[sym.writeAlgorithm](chunk);
+ setPromiseIsHandledToTrue(
+ sinkWritePromise.then(
+ () => {
+ writableStreamFinishInFlightWrite(stream);
+ const state = stream[sym.state];
+ assert(state === "writable" || state === "erroring");
+ dequeueValue(controller);
+ if (
+ !writableStreamCloseQueuedOrInFlight(stream) &&
+ state === "writable"
+ ) {
+ const backpressure = writableStreamDefaultControllerGetBackpressure(
+ controller
+ );
+ writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ (reason) => {
+ if (stream[sym.state] === "writable") {
+ writableStreamDefaultControllerClearAlgorithms(controller);
+ }
+ writableStreamFinishInFlightWriteWithError(stream, reason);
+ }
+ )
+ );
+}
+
+function writableStreamDefaultControllerWrite<W>(
+ controller: WritableStreamDefaultControllerImpl<W>,
+ chunk: W,
+ chunkSize: number
+): void {
+ const writeRecord = { chunk };
+ try {
+ enqueueValueWithSize(controller, writeRecord, chunkSize);
+ } catch (e) {
+ writableStreamDefaultControllerErrorIfNeeded(controller, e);
+ return;
+ }
+ const stream = controller[sym.controlledWritableStream];
+ if (
+ !writableStreamCloseQueuedOrInFlight(stream) &&
+ stream[sym.state] === "writable"
+ ) {
+ const backpressure = writableStreamDefaultControllerGetBackpressure(
+ controller
+ );
+ writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+}
+
+export function writableStreamDefaultWriterAbort<W>(
+ writer: WritableStreamDefaultWriterImpl<W>,
+ reason: any
+): Promise<void> {
+ const stream = writer[sym.ownerWritableStream];
+ assert(stream);
+ return writableStreamAbort(stream, reason);
+}
+
+export function writableStreamDefaultWriterClose<W>(
+ writer: WritableStreamDefaultWriterImpl<W>
+): Promise<void> {
+ const stream = writer[sym.ownerWritableStream];
+ assert(stream);
+ return writableStreamClose(stream);
+}
+
+function writableStreamDefaultWriterCloseWithErrorPropagation<W>(
+ writer: WritableStreamDefaultWriterImpl<W>
+): Promise<void> {
+ const stream = writer[sym.ownerWritableStream];
+ assert(stream);
+ const state = stream[sym.state];
+ if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
+ return Promise.resolve();
+ }
+ if (state === "errored") {
+ return Promise.reject(stream[sym.storedError]);
+ }
+ assert(state === "writable" || state === "erroring");
+ return writableStreamDefaultWriterClose(writer);
+}
+
+function writableStreamDefaultWriterEnsureClosePromiseRejected<W>(
+ writer: WritableStreamDefaultWriterImpl<W>,
+ error: any
+): void {
+ if (writer[sym.closedPromise].reject) {
+ writer[sym.closedPromise].reject!(error);
+ } else {
+ writer[sym.closedPromise] = {
+ promise: Promise.reject(error),
+ };
+ }
+ setPromiseIsHandledToTrue(writer[sym.closedPromise].promise);
+}
+
+function writableStreamDefaultWriterEnsureReadyPromiseRejected<W>(
+ writer: WritableStreamDefaultWriterImpl<W>,
+ error: any
+): void {
+ if (writer[sym.readyPromise].reject) {
+ writer[sym.readyPromise].reject!(error);
+ writer[sym.readyPromise].reject = undefined;
+ writer[sym.readyPromise].resolve = undefined;
+ } else {
+ writer[sym.readyPromise] = {
+ promise: Promise.reject(error),
+ };
+ }
+ setPromiseIsHandledToTrue(writer[sym.readyPromise].promise);
+}
+
+export function writableStreamDefaultWriterWrite<W>(
+ writer: WritableStreamDefaultWriterImpl<W>,
+ chunk: W
+): Promise<void> {
+ const stream = writer[sym.ownerWritableStream];
+ assert(stream);
+ const controller = stream[sym.writableStreamController];
+ assert(controller);
+ const chunkSize = writableStreamDefaultControllerGetChunkSize(
+ controller,
+ chunk
+ );
+ if (stream !== writer[sym.ownerWritableStream]) {
+ return Promise.reject("Writer has incorrect WritableStream.");
+ }
+ const state = stream[sym.state];
+ if (state === "errored") {
+ return Promise.reject(stream[sym.storedError]);
+ }
+ if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
+ return Promise.reject(new TypeError("The stream is closed or closing."));
+ }
+ if (state === "erroring") {
+ return Promise.reject(stream[sym.storedError]);
+ }
+ assert(state === "writable");
+ const promise = writableStreamAddWriteRequest(stream);
+ writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
+ return promise;
+}
+
+export function writableStreamDefaultWriterGetDesiredSize<W>(
+ writer: WritableStreamDefaultWriterImpl<W>
+): number | null {
+ const stream = writer[sym.ownerWritableStream];
+ const state = stream[sym.state];
+ if (state === "errored" || state === "erroring") {
+ return null;
+ }
+ if (state === "closed") {
+ return 0;
+ }
+ return writableStreamDefaultControllerGetDesiredSize(
+ stream[sym.writableStreamController]!
+ );
+}
+
+export function writableStreamDefaultWriterRelease<W>(
+ writer: WritableStreamDefaultWriterImpl<W>
+): void {
+ const stream = writer[sym.ownerWritableStream];
+ assert(stream);
+ assert(stream[sym.writer] === writer);
+ const releasedError = new TypeError(
+ "Writer was released and can no longer be used to monitor the stream's closedness."
+ );
+ writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
+ writableStreamDefaultWriterEnsureClosePromiseRejected(writer, releasedError);
+ stream[sym.writer] = undefined;
+ (writer as any)[sym.ownerWritableStream] = undefined;
+}
+
+function writableStreamFinishErroring<W>(stream: WritableStreamImpl<W>): void {
+ assert(stream[sym.state] === "erroring");
+ assert(!writableStreamHasOperationMarkedInFlight(stream));
+ stream[sym.state] = "errored";
+ stream[sym.writableStreamController]![sym.errorSteps]();
+ const storedError = stream[sym.storedError];
+ for (const writeRequest of stream[sym.writeRequests]) {
+ assert(writeRequest.reject);
+ writeRequest.reject(storedError);
+ }
+ stream[sym.writeRequests] = [];
+ if (!stream[sym.pendingAbortRequest]) {
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+ const abortRequest = stream[sym.pendingAbortRequest];
+ assert(abortRequest);
+ stream[sym.pendingAbortRequest] = undefined;
+ if (abortRequest.wasAlreadyErroring) {
+ assert(abortRequest.promise.reject);
+ abortRequest.promise.reject(storedError);
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+ const promise = stream[sym.writableStreamController]![sym.abortSteps](
+ abortRequest.reason
+ );
+ setPromiseIsHandledToTrue(
+ promise.then(
+ () => {
+ assert(abortRequest.promise.resolve);
+ abortRequest.promise.resolve();
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ },
+ (reason) => {
+ assert(abortRequest.promise.reject);
+ abortRequest.promise.reject(reason);
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ }
+ )
+ );
+}
+
+function writableStreamFinishInFlightClose<W>(
+ stream: WritableStreamImpl<W>
+): void {
+ assert(stream[sym.inFlightCloseRequest]);
+ stream[sym.inFlightCloseRequest]?.resolve!();
+ stream[sym.inFlightCloseRequest] = undefined;
+ const state = stream[sym.state];
+ assert(state === "writable" || state === "erroring");
+ if (state === "erroring") {
+ stream[sym.storedError] = undefined;
+ if (stream[sym.pendingAbortRequest]) {
+ stream[sym.pendingAbortRequest]!.promise.resolve!();
+ stream[sym.pendingAbortRequest] = undefined;
+ }
+ }
+ stream[sym.state] = "closed";
+ const writer = stream[sym.writer];
+ if (writer) {
+ writer[sym.closedPromise].resolve!();
+ }
+ assert(stream[sym.pendingAbortRequest] === undefined);
+ assert(stream[sym.storedError] === undefined);
+}
+
+function writableStreamFinishInFlightCloseWithError<W>(
+ stream: WritableStreamImpl<W>,
+ error: any
+): void {
+ assert(stream[sym.inFlightCloseRequest]);
+ stream[sym.inFlightCloseRequest]?.reject!(error);
+ stream[sym.inFlightCloseRequest] = undefined;
+ assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring");
+ if (stream[sym.pendingAbortRequest]) {
+ stream[sym.pendingAbortRequest]?.promise.reject!(error);
+ stream[sym.pendingAbortRequest] = undefined;
+ }
+ writableStreamDealWithRejection(stream, error);
+}
+
+function writableStreamFinishInFlightWrite<W>(
+ stream: WritableStreamImpl<W>
+): void {
+ assert(stream[sym.inFlightWriteRequest]);
+ stream[sym.inFlightWriteRequest]!.resolve();
+ stream[sym.inFlightWriteRequest] = undefined;
+}
+
+function writableStreamFinishInFlightWriteWithError<W>(
+ stream: WritableStreamImpl<W>,
+ error: any
+): void {
+ assert(stream[sym.inFlightWriteRequest]);
+ stream[sym.inFlightWriteRequest]!.reject!(error);
+ stream[sym.inFlightWriteRequest] = undefined;
+ assert(stream[sym.state] === "writable" || stream[sym.state] === "erroring");
+ writableStreamDealWithRejection(stream, error);
+}
+
+function writableStreamHasOperationMarkedInFlight<W>(
+ stream: WritableStreamImpl<W>
+): boolean {
+ if (
+ stream[sym.inFlightWriteRequest] === undefined &&
+ stream[sym.inFlightCloseRequest] === undefined
+ ) {
+ return false;
+ }
+ return true;
+}
+
+function writableStreamMarkCloseRequestInFlight<W>(
+ stream: WritableStreamImpl<W>
+): void {
+ assert(stream[sym.inFlightCloseRequest] === undefined);
+ assert(stream[sym.closeRequest] !== undefined);
+ stream[sym.inFlightCloseRequest] = stream[sym.closeRequest];
+ stream[sym.closeRequest] = undefined;
+}
+
+function writableStreamMarkFirstWriteRequestInFlight<W>(
+ stream: WritableStreamImpl<W>
+): void {
+ assert(stream[sym.inFlightWriteRequest] === undefined);
+ assert(stream[sym.writeRequests].length);
+ const writeRequest = stream[sym.writeRequests].shift();
+ stream[sym.inFlightWriteRequest] = writeRequest;
+}
+
+function writableStreamRejectCloseAndClosedPromiseIfNeeded<W>(
+ stream: WritableStreamImpl<W>
+): void {
+ assert(stream[sym.state] === "errored");
+ if (stream[sym.closeRequest]) {
+ assert(stream[sym.inFlightCloseRequest] === undefined);
+ stream[sym.closeRequest]!.reject!(stream[sym.storedError]);
+ stream[sym.closeRequest] = undefined;
+ }
+ const writer = stream[sym.writer];
+ if (writer) {
+ writer[sym.closedPromise].reject!(stream[sym.storedError]);
+ setPromiseIsHandledToTrue(writer[sym.closedPromise].promise);
+ }
+}
+
+function writableStreamStartErroring<W>(
+ stream: WritableStreamImpl<W>,
+ reason: any
+): void {
+ assert(stream[sym.storedError] === undefined);
+ assert(stream[sym.state] === "writable");
+ const controller = stream[sym.writableStreamController];
+ assert(controller);
+ stream[sym.state] = "erroring";
+ stream[sym.storedError] = reason;
+ const writer = stream[sym.writer];
+ if (writer) {
+ writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
+ }
+ if (
+ !writableStreamHasOperationMarkedInFlight(stream) &&
+ controller[sym.started]
+ ) {
+ writableStreamFinishErroring(stream);
+ }
+}
+
+function writableStreamUpdateBackpressure<W>(
+ stream: WritableStreamImpl<W>,
+ backpressure: boolean
+): void {
+ assert(stream[sym.state] === "writable");
+ assert(!writableStreamCloseQueuedOrInFlight(stream));
+ const writer = stream[sym.writer];
+ if (writer && backpressure !== stream[sym.backpressure]) {
+ if (backpressure) {
+ writer[sym.readyPromise] = getDeferred();
+ } else {
+ assert(backpressure === false);
+ writer[sym.readyPromise].resolve!();
+ writer[sym.readyPromise].resolve = undefined;
+ writer[sym.readyPromise].reject = undefined;
+ }
+ }
+ stream[sym.backpressure] = backpressure;
+}
+
/* eslint-enable */
diff --git a/cli/js/web/streams/queuing_strategy.ts b/cli/js/web/streams/queuing_strategy.ts
new file mode 100644
index 000000000..d2717874e
--- /dev/null
+++ b/cli/js/web/streams/queuing_strategy.ts
@@ -0,0 +1,53 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import { setFunctionName } from "./internals.ts";
+import { customInspect } from "../console.ts";
+
+export class CountQueuingStrategyImpl implements CountQueuingStrategy {
+ highWaterMark: number;
+
+ constructor({ highWaterMark }: { highWaterMark: number }) {
+ this.highWaterMark = highWaterMark;
+ }
+
+ size(): 1 {
+ return 1;
+ }
+
+ [customInspect](): string {
+ return `${this.constructor.name} { highWaterMark: ${String(
+ this.highWaterMark
+ )}, size: f }`;
+ }
+}
+
+Object.defineProperty(CountQueuingStrategyImpl.prototype, "size", {
+ enumerable: true,
+});
+
+setFunctionName(CountQueuingStrategyImpl, "CountQueuingStrategy");
+
+export class ByteLengthQueuingStrategyImpl
+ implements ByteLengthQueuingStrategy {
+ highWaterMark: number;
+
+ constructor({ highWaterMark }: { highWaterMark: number }) {
+ this.highWaterMark = highWaterMark;
+ }
+
+ size(chunk: ArrayBufferView): number {
+ return chunk.byteLength;
+ }
+
+ [customInspect](): string {
+ return `${this.constructor.name} { highWaterMark: ${String(
+ this.highWaterMark
+ )}, size: f }`;
+ }
+}
+
+Object.defineProperty(ByteLengthQueuingStrategyImpl.prototype, "size", {
+ enumerable: true,
+});
+
+setFunctionName(CountQueuingStrategyImpl, "CountQueuingStrategy");
diff --git a/cli/js/web/streams/readable_byte_stream_controller.ts b/cli/js/web/streams/readable_byte_stream_controller.ts
index 80ba9ad85..cbef64357 100644
--- a/cli/js/web/streams/readable_byte_stream_controller.ts
+++ b/cli/js/web/streams/readable_byte_stream_controller.ts
@@ -18,11 +18,12 @@ import {
readableStreamHasDefaultReader,
readableStreamGetNumReadRequests,
readableStreamCreateReadResult,
+ setFunctionName,
} from "./internals.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
import { assert } from "../../util.ts";
-import { customInspect } from "../../web/console.ts";
+import { customInspect } from "../console.ts";
export class ReadableByteStreamControllerImpl
implements ReadableByteStreamController {
@@ -136,8 +137,13 @@ export class ReadableByteStreamControllerImpl
}
[customInspect](): string {
- return `ReadableByteStreamController { byobRequest: ${String(
+ return `${this.constructor.name} { byobRequest: ${String(
this.byobRequest
)}, desiredSize: ${String(this.desiredSize)} }`;
}
}
+
+setFunctionName(
+ ReadableByteStreamControllerImpl,
+ "ReadableByteStreamController"
+);
diff --git a/cli/js/web/streams/readable_stream.ts b/cli/js/web/streams/readable_stream.ts
index e234c909d..27a733d9d 100644
--- a/cli/js/web/streams/readable_stream.ts
+++ b/cli/js/web/streams/readable_stream.ts
@@ -6,9 +6,14 @@ import {
isReadableStream,
isReadableStreamLocked,
isUnderlyingByteSource,
+ isWritableStream,
+ isWritableStreamLocked,
makeSizeAlgorithmFromSizeFunction,
+ setFunctionName,
+ setPromiseIsHandledToTrue,
readableStreamCancel,
ReadableStreamGenericReader,
+ readableStreamPipeTo,
readableStreamTee,
setUpReadableByteStreamControllerFromUnderlyingSource,
setUpReadableStreamDefaultControllerFromUnderlyingSource,
@@ -18,8 +23,8 @@ import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_control
import { ReadableStreamAsyncIteratorPrototype } from "./readable_stream_async_iterator.ts";
import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
import * as sym from "./symbols.ts";
-import { notImplemented } from "../../util.ts";
-import { customInspect } from "../../web/console.ts";
+import { customInspect } from "../console.ts";
+import { AbortSignalImpl } from "../abort_signal.ts";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
@@ -119,80 +124,81 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
throw new RangeError(`Unsupported mode "${mode}"`);
}
- pipeThrough<T>(): // {
- // writable,
- // readable,
- // }: {
- // writable: WritableStream<R>;
- // readable: ReadableStream<T>;
- // },
- // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {},
- ReadableStream<T> {
- return notImplemented();
- // if (!isReadableStream(this)) {
- // throw new TypeError("Invalid ReadableStream.");
- // }
- // if (!isWritableStream(writable)) {
- // throw new TypeError("writable is not a valid WritableStream.");
- // }
- // if (!isReadableStream(readable)) {
- // throw new TypeError("readable is not a valid ReadableStream.");
- // }
- // preventClose = Boolean(preventClose);
- // preventAbort = Boolean(preventAbort);
- // preventCancel = Boolean(preventCancel);
- // if (signal && !(signal instanceof AbortSignalImpl)) {
- // throw new TypeError("Invalid signal.");
- // }
- // if (isReadableStreamLocked(this)) {
- // throw new TypeError("ReadableStream is locked.");
- // }
- // if (isWritableStreamLocked(writable)) {
- // throw new TypeError("writable is locked.");
- // }
- // readableStreamPipeTo(
- // this,
- // writable,
- // preventClose,
- // preventAbort,
- // preventCancel,
- // signal,
- // );
- // return readable;
+ pipeThrough<T>(
+ {
+ writable,
+ readable,
+ }: {
+ writable: WritableStream<R>;
+ readable: ReadableStream<T>;
+ },
+ { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}
+ ): ReadableStream<T> {
+ if (!isReadableStream(this)) {
+ throw new TypeError("Invalid ReadableStream.");
+ }
+ if (!isWritableStream(writable)) {
+ throw new TypeError("writable is not a valid WritableStream.");
+ }
+ if (!isReadableStream(readable)) {
+ throw new TypeError("readable is not a valid ReadableStream.");
+ }
+ preventClose = Boolean(preventClose);
+ preventAbort = Boolean(preventAbort);
+ preventCancel = Boolean(preventCancel);
+ if (signal && !(signal instanceof AbortSignalImpl)) {
+ throw new TypeError("Invalid signal.");
+ }
+ if (isReadableStreamLocked(this)) {
+ throw new TypeError("ReadableStream is locked.");
+ }
+ if (isWritableStreamLocked(writable)) {
+ throw new TypeError("writable is locked.");
+ }
+ const promise = readableStreamPipeTo(
+ this,
+ writable,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal
+ );
+ setPromiseIsHandledToTrue(promise);
+ return readable;
}
- pipeTo(): // dest: WritableStream<R>,
- // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {},
- Promise<void> {
- return notImplemented();
- // if (!isReadableStream(this)) {
- // return Promise.reject(new TypeError("Invalid ReadableStream."));
- // }
- // if (!isWritableStream(dest)) {
- // return Promise.reject(
- // new TypeError("dest is not a valid WritableStream."),
- // );
- // }
- // preventClose = Boolean(preventClose);
- // preventAbort = Boolean(preventAbort);
- // preventCancel = Boolean(preventCancel);
- // if (signal && !(signal instanceof AbortSignalImpl)) {
- // return Promise.reject(new TypeError("Invalid signal."));
- // }
- // if (isReadableStreamLocked(this)) {
- // return Promise.reject(new TypeError("ReadableStream is locked."));
- // }
- // if (isWritableStreamLocked(this)) {
- // return Promise.reject(new TypeError("dest is locked."));
- // }
- // return readableStreamPipeTo(
- // this,
- // dest,
- // preventClose,
- // preventAbort,
- // preventCancel,
- // signal,
- // );
+ pipeTo(
+ dest: WritableStream<R>,
+ { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}
+ ): Promise<void> {
+ if (!isReadableStream(this)) {
+ return Promise.reject(new TypeError("Invalid ReadableStream."));
+ }
+ if (!isWritableStream(dest)) {
+ return Promise.reject(
+ new TypeError("dest is not a valid WritableStream.")
+ );
+ }
+ preventClose = Boolean(preventClose);
+ preventAbort = Boolean(preventAbort);
+ preventCancel = Boolean(preventCancel);
+ if (signal && !(signal instanceof AbortSignalImpl)) {
+ return Promise.reject(new TypeError("Invalid signal."));
+ }
+ if (isReadableStreamLocked(this)) {
+ return Promise.reject(new TypeError("ReadableStream is locked."));
+ }
+ if (isWritableStreamLocked(dest)) {
+ return Promise.reject(new TypeError("dest is locked."));
+ }
+ return readableStreamPipeTo(
+ this,
+ dest,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal
+ );
}
tee(): [ReadableStreamImpl<R>, ReadableStreamImpl<R>] {
@@ -203,7 +209,7 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
}
[customInspect](): string {
- return `ReadableStream { locked: ${String(this.locked)} }`;
+ return `${this.constructor.name} { locked: ${String(this.locked)} }`;
}
[Symbol.asyncIterator](
@@ -214,3 +220,5 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
return this.getIterator(options);
}
}
+
+setFunctionName(ReadableStreamImpl, "ReadableStream");
diff --git a/cli/js/web/streams/readable_stream_default_controller.ts b/cli/js/web/streams/readable_stream_default_controller.ts
index bbdfa1f0d..0755e7765 100644
--- a/cli/js/web/streams/readable_stream_default_controller.ts
+++ b/cli/js/web/streams/readable_stream_default_controller.ts
@@ -18,10 +18,11 @@ import {
readableStreamDefaultControllerGetDesiredSize,
resetQueue,
SizeAlgorithm,
+ setFunctionName,
} from "./internals.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
-import { customInspect } from "../../web/console.ts";
+import { customInspect } from "../console.ts";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export class ReadableStreamDefaultControllerImpl<R = any>
@@ -113,8 +114,13 @@ export class ReadableStreamDefaultControllerImpl<R = any>
}
[customInspect](): string {
- return `ReadableStreamDefaultController { desiredSize: ${String(
+ return `${this.constructor.name} { desiredSize: ${String(
this.desiredSize
)} }`;
}
}
+
+setFunctionName(
+ ReadableStreamDefaultControllerImpl,
+ "ReadableStreamDefaultController"
+);
diff --git a/cli/js/web/streams/readable_stream_default_reader.ts b/cli/js/web/streams/readable_stream_default_reader.ts
index 20fb21208..a0d5901dc 100644
--- a/cli/js/web/streams/readable_stream_default_reader.ts
+++ b/cli/js/web/streams/readable_stream_default_reader.ts
@@ -9,10 +9,11 @@ import {
readableStreamReaderGenericCancel,
readableStreamReaderGenericInitialize,
readableStreamReaderGenericRelease,
+ setFunctionName,
} from "./internals.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
-import { customInspect } from "../../web/console.ts";
+import { customInspect } from "../console.ts";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export class ReadableStreamDefaultReaderImpl<R = any>
@@ -84,6 +85,8 @@ export class ReadableStreamDefaultReaderImpl<R = any>
}
[customInspect](): string {
- return `ReadableStreamDefaultReader { closed: Promise }`;
+ return `${this.constructor.name} { closed: Promise }`;
}
}
+
+setFunctionName(ReadableStreamDefaultReaderImpl, "ReadableStreamDefaultReader");
diff --git a/cli/js/web/streams/symbols.ts b/cli/js/web/streams/symbols.ts
index 9d6335ef0..9e5cb5715 100644
--- a/cli/js/web/streams/symbols.ts
+++ b/cli/js/web/streams/symbols.ts
@@ -6,21 +6,32 @@
// this data from the public, therefore we will use unique symbols which are
// not available in the runtime.
+export const abortAlgorithm = Symbol("abortAlgorithm");
+export const abortSteps = Symbol("abortSteps");
export const asyncIteratorReader = Symbol("asyncIteratorReader");
export const autoAllocateChunkSize = Symbol("autoAllocateChunkSize");
+export const backpressure = Symbol("backpressure");
export const byobRequest = Symbol("byobRequest");
export const cancelAlgorithm = Symbol("cancelAlgorithm");
export const cancelSteps = Symbol("cancelSteps");
+export const closeAlgorithm = Symbol("closeAlgorithm");
export const closedPromise = Symbol("closedPromise");
+export const closeRequest = Symbol("closeRequest");
export const closeRequested = Symbol("closeRequested");
export const controlledReadableByteStream = Symbol(
"controlledReadableByteStream"
);
export const controlledReadableStream = Symbol("controlledReadableStream");
+export const controlledWritableStream = Symbol("controlledWritableStream");
export const disturbed = Symbol("disturbed");
+export const errorSteps = Symbol("errorSteps");
export const forAuthorCode = Symbol("forAuthorCode");
+export const inFlightWriteRequest = Symbol("inFlightWriteRequest");
+export const inFlightCloseRequest = Symbol("inFlightCloseRequest");
export const isFakeDetached = Symbol("isFakeDetached");
export const ownerReadableStream = Symbol("ownerReadableStream");
+export const ownerWritableStream = Symbol("ownerWritableStream");
+export const pendingAbortRequest = Symbol("pendingAbortRequest");
export const preventCancel = Symbol("preventCancel");
export const pullAgain = Symbol("pullAgain");
export const pullAlgorithm = Symbol("pullAlgorithm");
@@ -31,8 +42,13 @@ export const queueTotalSize = Symbol("queueTotalSize");
export const readableStreamController = Symbol("readableStreamController");
export const reader = Symbol("reader");
export const readRequests = Symbol("readRequests");
+export const readyPromise = Symbol("readyPromise");
export const started = Symbol("started");
export const state = Symbol("state");
export const storedError = Symbol("storedError");
export const strategyHWM = Symbol("strategyHWM");
export const strategySizeAlgorithm = Symbol("strategySizeAlgorithm");
+export const writableStreamController = Symbol("writableStreamController");
+export const writeAlgorithm = Symbol("writeAlgorithm");
+export const writer = Symbol("writer");
+export const writeRequests = Symbol("writeRequests");
diff --git a/cli/js/web/streams/writable_stream.ts b/cli/js/web/streams/writable_stream.ts
new file mode 100644
index 000000000..4bbf339da
--- /dev/null
+++ b/cli/js/web/streams/writable_stream.ts
@@ -0,0 +1,107 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ AbortRequest,
+ acquireWritableStreamDefaultWriter,
+ Deferred,
+ initializeWritableStream,
+ isWritableStream,
+ isWritableStreamLocked,
+ makeSizeAlgorithmFromSizeFunction,
+ setFunctionName,
+ setUpWritableStreamDefaultControllerFromUnderlyingSink,
+ writableStreamAbort,
+ writableStreamClose,
+ writableStreamCloseQueuedOrInFlight,
+ validateAndNormalizeHighWaterMark,
+} from "./internals.ts";
+import * as sym from "./symbols.ts";
+import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
+import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
+import { customInspect } from "../console.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class WritableStreamImpl<W = any> implements WritableStream<W> {
+ [sym.backpressure]: boolean;
+ [sym.closeRequest]?: Deferred<void>;
+ [sym.inFlightWriteRequest]?: Required<Deferred<void>>;
+ [sym.inFlightCloseRequest]?: Deferred<void>;
+ [sym.pendingAbortRequest]?: AbortRequest;
+ [sym.state]: "writable" | "closed" | "erroring" | "errored";
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ [sym.storedError]?: any;
+ [sym.writableStreamController]?: WritableStreamDefaultControllerImpl<W>;
+ [sym.writer]?: WritableStreamDefaultWriterImpl<W>;
+ [sym.writeRequests]: Array<Required<Deferred<void>>>;
+
+ constructor(
+ underlyingSink: UnderlyingSink = {},
+ strategy: QueuingStrategy = {}
+ ) {
+ initializeWritableStream(this);
+ const size = strategy.size;
+ let highWaterMark = strategy.highWaterMark ?? 1;
+ const { type } = underlyingSink;
+ if (type !== undefined) {
+ throw new RangeError(`Sink type of "${String(type)}" not supported.`);
+ }
+ const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size);
+ highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
+ setUpWritableStreamDefaultControllerFromUnderlyingSink(
+ this,
+ underlyingSink,
+ highWaterMark,
+ sizeAlgorithm
+ );
+ }
+
+ get locked(): boolean {
+ if (!isWritableStream(this)) {
+ throw new TypeError("Invalid WritableStream.");
+ }
+ return isWritableStreamLocked(this);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ abort(reason: any): Promise<void> {
+ if (!isWritableStream(this)) {
+ return Promise.reject(new TypeError("Invalid WritableStream."));
+ }
+ if (isWritableStreamLocked(this)) {
+ return Promise.reject(
+ new TypeError("Cannot abort a locked WritableStream.")
+ );
+ }
+ return writableStreamAbort(this, reason);
+ }
+
+ close(): Promise<void> {
+ if (!isWritableStream(this)) {
+ return Promise.reject(new TypeError("Invalid WritableStream."));
+ }
+ if (isWritableStreamLocked(this)) {
+ return Promise.reject(
+ new TypeError("Cannot abort a locked WritableStream.")
+ );
+ }
+ if (writableStreamCloseQueuedOrInFlight(this)) {
+ return Promise.reject(
+ new TypeError("Cannot close an already closing WritableStream.")
+ );
+ }
+ return writableStreamClose(this);
+ }
+
+ getWriter(): WritableStreamDefaultWriter<W> {
+ if (!isWritableStream(this)) {
+ throw new TypeError("Invalid WritableStream.");
+ }
+ return acquireWritableStreamDefaultWriter(this);
+ }
+
+ [customInspect](): string {
+ return `${this.constructor.name} { locked: ${String(this.locked)} }`;
+ }
+}
+
+setFunctionName(WritableStreamImpl, "WritableStream");
diff --git a/cli/js/web/streams/writable_stream_default_controller.ts b/cli/js/web/streams/writable_stream_default_controller.ts
new file mode 100644
index 000000000..040d0eefc
--- /dev/null
+++ b/cli/js/web/streams/writable_stream_default_controller.ts
@@ -0,0 +1,68 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ AbortAlgorithm,
+ CloseAlgorithm,
+ isWritableStreamDefaultController,
+ Pair,
+ resetQueue,
+ setFunctionName,
+ SizeAlgorithm,
+ WriteAlgorithm,
+ writableStreamDefaultControllerClearAlgorithms,
+ writableStreamDefaultControllerError,
+} from "./internals.ts";
+import * as sym from "./symbols.ts";
+import { WritableStreamImpl } from "./writable_stream.ts";
+import { customInspect } from "../console.ts";
+
+export class WritableStreamDefaultControllerImpl<W>
+ implements WritableStreamDefaultController {
+ [sym.abortAlgorithm]: AbortAlgorithm;
+ [sym.closeAlgorithm]: CloseAlgorithm;
+ [sym.controlledWritableStream]: WritableStreamImpl;
+ [sym.queue]: Array<Pair<{ chunk: W } | "close">>;
+ [sym.queueTotalSize]: number;
+ [sym.started]: boolean;
+ [sym.strategyHWM]: number;
+ [sym.strategySizeAlgorithm]: SizeAlgorithm<W>;
+ [sym.writeAlgorithm]: WriteAlgorithm<W>;
+
+ private constructor() {
+ throw new TypeError(
+ "WritableStreamDefaultController's constructor cannot be called."
+ );
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ error(e: any): void {
+ if (!isWritableStreamDefaultController(this)) {
+ throw new TypeError("Invalid WritableStreamDefaultController.");
+ }
+ const state = this[sym.controlledWritableStream][sym.state];
+ if (state !== "writable") {
+ return;
+ }
+ writableStreamDefaultControllerError(this, e);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ [sym.abortSteps](reason: any): PromiseLike<void> {
+ const result = this[sym.abortAlgorithm](reason);
+ writableStreamDefaultControllerClearAlgorithms(this);
+ return result;
+ }
+
+ [sym.errorSteps](): void {
+ resetQueue(this);
+ }
+
+ [customInspect](): string {
+ return `${this.constructor.name} { }`;
+ }
+}
+
+setFunctionName(
+ WritableStreamDefaultControllerImpl,
+ "WritableStreamDefaultController"
+);
diff --git a/cli/js/web/streams/writable_stream_default_writer.ts b/cli/js/web/streams/writable_stream_default_writer.ts
new file mode 100644
index 000000000..cd6b71044
--- /dev/null
+++ b/cli/js/web/streams/writable_stream_default_writer.ts
@@ -0,0 +1,164 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ Deferred,
+ getDeferred,
+ isWritableStream,
+ isWritableStreamDefaultWriter,
+ isWritableStreamLocked,
+ setFunctionName,
+ setPromiseIsHandledToTrue,
+ writableStreamCloseQueuedOrInFlight,
+ writableStreamDefaultWriterAbort,
+ writableStreamDefaultWriterClose,
+ writableStreamDefaultWriterGetDesiredSize,
+ writableStreamDefaultWriterRelease,
+ writableStreamDefaultWriterWrite,
+} from "./internals.ts";
+import * as sym from "./symbols.ts";
+import { WritableStreamImpl } from "./writable_stream.ts";
+import { customInspect } from "../console.ts";
+import { assert } from "../../util.ts";
+
+export class WritableStreamDefaultWriterImpl<W>
+ implements WritableStreamDefaultWriter<W> {
+ [sym.closedPromise]: Deferred<void>;
+ [sym.ownerWritableStream]: WritableStreamImpl<W>;
+ [sym.readyPromise]: Deferred<void>;
+
+ constructor(stream: WritableStreamImpl<W>) {
+ if (!isWritableStream(stream)) {
+ throw new TypeError("Invalid stream.");
+ }
+ if (isWritableStreamLocked(stream)) {
+ throw new TypeError("Cannot create a reader for a locked stream.");
+ }
+ this[sym.ownerWritableStream] = stream;
+ stream[sym.writer] = this;
+ const state = stream[sym.state];
+ if (state === "writable") {
+ if (
+ !writableStreamCloseQueuedOrInFlight(stream) &&
+ stream[sym.backpressure]
+ ) {
+ this[sym.readyPromise] = getDeferred();
+ } else {
+ this[sym.readyPromise] = { promise: Promise.resolve() };
+ }
+ this[sym.closedPromise] = getDeferred();
+ } else if (state === "erroring") {
+ this[sym.readyPromise] = {
+ promise: Promise.reject(stream[sym.storedError]),
+ };
+ setPromiseIsHandledToTrue(this[sym.readyPromise].promise);
+ this[sym.closedPromise] = getDeferred();
+ } else if (state === "closed") {
+ this[sym.readyPromise] = { promise: Promise.resolve() };
+ this[sym.closedPromise] = { promise: Promise.resolve() };
+ } else {
+ assert(state === "errored");
+ const storedError = stream[sym.storedError];
+ this[sym.readyPromise] = { promise: Promise.reject(storedError) };
+ setPromiseIsHandledToTrue(this[sym.readyPromise].promise);
+ this[sym.closedPromise] = { promise: Promise.reject(storedError) };
+ setPromiseIsHandledToTrue(this[sym.closedPromise].promise);
+ }
+ }
+
+ get closed(): Promise<void> {
+ if (!isWritableStreamDefaultWriter(this)) {
+ return Promise.reject(
+ new TypeError("Invalid WritableStreamDefaultWriter.")
+ );
+ }
+ return this[sym.closedPromise].promise;
+ }
+
+ get desiredSize(): number | null {
+ if (!isWritableStreamDefaultWriter(this)) {
+ throw new TypeError("Invalid WritableStreamDefaultWriter.");
+ }
+ if (!this[sym.ownerWritableStream]) {
+ throw new TypeError("WritableStreamDefaultWriter has no owner.");
+ }
+ return writableStreamDefaultWriterGetDesiredSize(this);
+ }
+
+ get ready(): Promise<void> {
+ if (!isWritableStreamDefaultWriter(this)) {
+ return Promise.reject(
+ new TypeError("Invalid WritableStreamDefaultWriter.")
+ );
+ }
+ return this[sym.readyPromise].promise;
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ abort(reason: any): Promise<void> {
+ if (!isWritableStreamDefaultWriter(this)) {
+ return Promise.reject(
+ new TypeError("Invalid WritableStreamDefaultWriter.")
+ );
+ }
+ if (!this[sym.ownerWritableStream]) {
+ Promise.reject(
+ new TypeError("WritableStreamDefaultWriter has no owner.")
+ );
+ }
+ return writableStreamDefaultWriterAbort(this, reason);
+ }
+
+ close(): Promise<void> {
+ if (!isWritableStreamDefaultWriter(this)) {
+ return Promise.reject(
+ new TypeError("Invalid WritableStreamDefaultWriter.")
+ );
+ }
+ const stream = this[sym.ownerWritableStream];
+ if (!stream) {
+ Promise.reject(
+ new TypeError("WritableStreamDefaultWriter has no owner.")
+ );
+ }
+ if (writableStreamCloseQueuedOrInFlight(stream)) {
+ Promise.reject(
+ new TypeError("Stream is in an invalid state to be closed.")
+ );
+ }
+ return writableStreamDefaultWriterClose(this);
+ }
+
+ releaseLock(): void {
+ if (!isWritableStreamDefaultWriter(this)) {
+ throw new TypeError("Invalid WritableStreamDefaultWriter.");
+ }
+ const stream = this[sym.ownerWritableStream];
+ if (!stream) {
+ return;
+ }
+ assert(stream[sym.writer]);
+ writableStreamDefaultWriterRelease(this);
+ }
+
+ write(chunk: W): Promise<void> {
+ if (!isWritableStreamDefaultWriter(this)) {
+ return Promise.reject(
+ new TypeError("Invalid WritableStreamDefaultWriter.")
+ );
+ }
+ if (!this[sym.ownerWritableStream]) {
+ Promise.reject(
+ new TypeError("WritableStreamDefaultWriter has no owner.")
+ );
+ }
+ return writableStreamDefaultWriterWrite(this, chunk);
+ }
+
+ [customInspect](): string {
+ return `${this.constructor.name} { closed: Promise, desiredSize: ${String(
+ this.desiredSize
+ )}, ready: Promise }`;
+ }
+}
+
+setFunctionName(WritableStreamDefaultWriterImpl, "WritableStreamDefaultWriter");