summaryrefslogtreecommitdiff
path: root/cli/js/web/streams
diff options
context:
space:
mode:
Diffstat (limited to 'cli/js/web/streams')
-rw-r--r--cli/js/web/streams/internals.ts364
-rw-r--r--cli/js/web/streams/symbols.ts7
-rw-r--r--cli/js/web/streams/transform_stream.ts118
-rw-r--r--cli/js/web/streams/transform_stream_default_controller.ts75
4 files changed, 555 insertions, 9 deletions
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts
index 846db096e..5ef094afc 100644
--- a/cli/js/web/streams/internals.ts
+++ b/cli/js/web/streams/internals.ts
@@ -13,6 +13,8 @@ import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_c
import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
+import { TransformStreamImpl } from "./transform_stream.ts";
+import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts";
import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
import { WritableStreamImpl } from "./writable_stream.ts";
@@ -36,10 +38,12 @@ type Container<R = any> = {
[sym.queue]: Array<Pair<R> | BufferQueueItem>;
[sym.queueTotalSize]: number;
};
+export type FlushAlgorithm = () => Promise<void>;
export type Pair<R> = { value: R; size: number };
export type PullAlgorithm = () => PromiseLike<void>;
export type SizeAlgorithm<T> = (chunk: T) => number;
export type StartAlgorithm = () => void | PromiseLike<void>;
+export type TransformAlgorithm<I> = (chunk: I) => Promise<void>;
export type WriteAlgorithm<W> = (chunk: W) => Promise<void>;
export interface Deferred<T> {
promise: Promise<T>;
@@ -76,8 +80,16 @@ export function acquireWritableStreamDefaultWriter<W>(
return new WritableStreamDefaultWriterImpl(stream);
}
+export function call<F extends (...args: any[]) => any>(
+ fn: F,
+ v: ThisType<F>,
+ args: Parameters<F>
+): ReturnType<F> {
+ return Function.prototype.apply.call(fn, v, args);
+}
+
function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource,
+ O extends UnderlyingByteSource | UnderlyingSource | Transformer,
P extends keyof O
>(
underlyingObject: O,
@@ -86,7 +98,7 @@ function createAlgorithmFromUnderlyingMethod<
...extraArgs: any[]
): () => Promise<void>;
function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource,
+ O extends UnderlyingByteSource | UnderlyingSource | Transformer,
P extends keyof O
>(
underlyingObject: O,
@@ -95,7 +107,7 @@ function createAlgorithmFromUnderlyingMethod<
...extraArgs: any[]
): (arg: any) => Promise<void>;
function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource,
+ O extends UnderlyingByteSource | UnderlyingSource | Transformer,
P extends keyof O
>(
underlyingObject: O,
@@ -110,11 +122,11 @@ function createAlgorithmFromUnderlyingMethod<
}
if (algoArgCount === 0) {
return async (): Promise<void> =>
- method.call(underlyingObject, ...extraArgs);
+ call(method, underlyingObject, extraArgs as any);
} else {
return async (arg: any): Promise<void> => {
const fullArgs = [arg, ...extraArgs];
- return method.call(underlyingObject, ...fullArgs);
+ return call(method, underlyingObject, fullArgs as any);
};
}
}
@@ -148,6 +160,33 @@ function createReadableStream<T>(
return stream;
}
+function createWritableStream<W>(
+ startAlgorithm: StartAlgorithm,
+ writeAlgorithm: WriteAlgorithm<W>,
+ closeAlgorithm: CloseAlgorithm,
+ abortAlgorithm: AbortAlgorithm,
+ highWaterMark = 1,
+ sizeAlgorithm: SizeAlgorithm<W> = (): number => 1
+): WritableStreamImpl<W> {
+ assert(isNonNegativeNumber(highWaterMark));
+ const stream = Object.create(WritableStreamImpl.prototype);
+ initializeWritableStream(stream);
+ const controller = Object.create(
+ WritableStreamDefaultControllerImpl.prototype
+ );
+ setUpWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+ return stream;
+}
+
export function dequeueValue<R>(container: Container<R>): R {
assert(sym.queue in container && sym.queueTotalSize in container);
assert(container[sym.queue].length);
@@ -185,13 +224,61 @@ export function getDeferred<T>(): Required<Deferred<T>> {
return { promise, resolve: resolve!, reject: reject! };
}
-export function initializeReadableStream(stream: ReadableStreamImpl): void {
+export function initializeReadableStream<R>(
+ stream: ReadableStreamImpl<R>
+): void {
stream[sym.state] = "readable";
stream[sym.reader] = stream[sym.storedError] = undefined;
stream[sym.disturbed] = false;
}
-export function initializeWritableStream(stream: WritableStreamImpl): void {
+export function initializeTransformStream<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ startPromise: Promise<void>,
+ writableHighWaterMark: number,
+ writableSizeAlgorithm: SizeAlgorithm<I>,
+ readableHighWaterMark: number,
+ readableSizeAlgorithm: SizeAlgorithm<O>
+): void {
+ const startAlgorithm = (): Promise<void> => startPromise;
+ const writeAlgorithm = (chunk: any): Promise<void> =>
+ transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
+ const abortAlgorithm = (reason: any): Promise<void> =>
+ transformStreamDefaultSinkAbortAlgorithm(stream, reason);
+ const closeAlgorithm = (): Promise<void> =>
+ transformStreamDefaultSinkCloseAlgorithm(stream);
+ stream[sym.writable] = createWritableStream(
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ writableHighWaterMark,
+ writableSizeAlgorithm
+ );
+ const pullAlgorithm = (): PromiseLike<void> =>
+ transformStreamDefaultSourcePullAlgorithm(stream);
+ const cancelAlgorithm = (reason: any): Promise<void> => {
+ transformStreamErrorWritableAndUnblockWrite(stream, reason);
+ return Promise.resolve(undefined);
+ };
+ stream[sym.readable] = createReadableStream(
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm
+ );
+ stream[sym.backpressure] = stream[sym.backpressureChangePromise] = undefined;
+ transformStreamSetBackpressure(stream, true);
+ Object.defineProperty(stream, sym.transformStreamController, {
+ value: undefined,
+ configurable: true,
+ });
+}
+
+export function initializeWritableStream<W>(
+ stream: WritableStreamImpl<W>
+): void {
stream[sym.state] = "writable";
stream[sym.storedError] = stream[sym.writer] = stream[
sym.writableStreamController
@@ -202,7 +289,7 @@ export function initializeWritableStream(stream: WritableStreamImpl): void {
stream[sym.backpressure] = false;
}
-function invokeOrNoop<O extends any, P extends keyof O>(
+export function invokeOrNoop<O extends Record<string, any>, P extends keyof O>(
o: O,
p: P,
...args: Parameters<O[P]>
@@ -212,7 +299,7 @@ function invokeOrNoop<O extends any, P extends keyof O>(
if (!method) {
return undefined;
}
- return method.call(o, ...args);
+ return call(method, o, args);
}
function isCallable(value: unknown): value is (...args: any) => any {
@@ -299,6 +386,26 @@ export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean {
return stream[sym.reader] ? true : false;
}
+export function isTransformStream(
+ x: unknown
+): x is TransformStreamImpl<any, any> {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.transformStreamController in x)
+ ? false
+ : true;
+}
+
+export function isTransformStreamDefaultController(
+ x: unknown
+): x is TransformStreamDefaultControllerImpl<any, any> {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.controlledTransformStream in x)
+ ? false
+ : true;
+}
+
export function isUnderlyingByteSource(
underlyingSource: UnderlyingByteSource | UnderlyingSource
): underlyingSource is UnderlyingByteSource {
@@ -717,6 +824,14 @@ export function readableStreamDefaultControllerError<T>(
readableStreamError(stream, e);
}
+function readableStreamDefaultControllerHasBackpressure<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): boolean {
+ return readableStreamDefaultControllerShouldCallPull(controller)
+ ? true
+ : false;
+}
+
function readableStreamDefaultControllerShouldCallPull<T>(
controller: ReadableStreamDefaultControllerImpl<T>
): boolean {
@@ -1416,6 +1531,62 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
);
}
+function setUpTransformStreamDefaultController<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ transformAlgorithm: TransformAlgorithm<I>,
+ flushAlgorithm: FlushAlgorithm
+): void {
+ assert(isTransformStream(stream));
+ assert(stream[sym.transformStreamController] === undefined);
+ controller[sym.controlledTransformStream] = stream;
+ stream[sym.transformStreamController] = controller;
+ controller[sym.transformAlgorithm] = transformAlgorithm;
+ controller[sym.flushAlgorithm] = flushAlgorithm;
+}
+
+export function setUpTransformStreamDefaultControllerFromTransformer<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ transformer: Transformer<I, O>
+): void {
+ assert(transformer);
+ const controller = Object.create(
+ TransformStreamDefaultControllerImpl.prototype
+ ) as TransformStreamDefaultControllerImpl<I, O>;
+ let transformAlgorithm: TransformAlgorithm<I> = (chunk) => {
+ try {
+ transformStreamDefaultControllerEnqueue(
+ controller,
+ // it defaults to no tranformation, so I is assumed to be O
+ (chunk as unknown) as O
+ );
+ } catch (e) {
+ return Promise.reject(e);
+ }
+ return Promise.resolve();
+ };
+ const transformMethod = transformer.transform;
+ if (transformMethod) {
+ if (typeof transformMethod !== "function") {
+ throw new TypeError("tranformer.transform must be callable.");
+ }
+ transformAlgorithm = async (chunk): Promise<void> =>
+ call(transformMethod, transformer, [chunk, controller]);
+ }
+ const flushAlgorithm = createAlgorithmFromUnderlyingMethod(
+ transformer,
+ "flush",
+ 0,
+ controller
+ );
+ setUpTransformStreamDefaultController(
+ stream,
+ controller,
+ transformAlgorithm,
+ flushAlgorithm
+ );
+}
+
function setUpWritableStreamDefaultController<W>(
stream: WritableStreamImpl<W>,
controller: WritableStreamDefaultControllerImpl<W>,
@@ -1508,6 +1679,181 @@ export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>(
);
}
+function transformStreamDefaultControllerClearAlgorithms<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>
+): void {
+ (controller as any)[sym.transformAlgorithm] = undefined;
+ (controller as any)[sym.flushAlgorithm] = undefined;
+}
+
+export function transformStreamDefaultControllerEnqueue<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ chunk: O
+): void {
+ const stream = controller[sym.controlledTransformStream];
+ const readableController = stream[sym.readable][
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>;
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
+ throw new TypeError(
+ "TransformStream's readable controller cannot be closed or enqueued."
+ );
+ }
+ try {
+ readableStreamDefaultControllerEnqueue(readableController, chunk);
+ } catch (e) {
+ transformStreamErrorWritableAndUnblockWrite(stream, e);
+ throw stream[sym.readable][sym.storedError];
+ }
+ const backpressure = readableStreamDefaultControllerHasBackpressure(
+ readableController
+ );
+ if (backpressure) {
+ transformStreamSetBackpressure(stream, true);
+ }
+}
+
+export function transformStreamDefaultControllerError<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ e: any
+): void {
+ transformStreamError(controller[sym.controlledTransformStream], e);
+}
+
+function transformStreamDefaultControllerPerformTransform<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ chunk: I
+): Promise<void> {
+ const transformPromise = controller[sym.transformAlgorithm](chunk);
+ return transformPromise.then(undefined, (r) => {
+ transformStreamError(controller[sym.controlledTransformStream], r);
+ throw r;
+ });
+}
+
+function transformStreamDefaultSinkAbortAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ reason: any
+): Promise<void> {
+ transformStreamError(stream, reason);
+ return Promise.resolve(undefined);
+}
+
+function transformStreamDefaultSinkCloseAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>
+): Promise<void> {
+ const readable = stream[sym.readable];
+ const controller = stream[sym.transformStreamController];
+ const flushPromise = controller[sym.flushAlgorithm]();
+ transformStreamDefaultControllerClearAlgorithms(controller);
+ return flushPromise.then(
+ () => {
+ if (readable[sym.state] === "errored") {
+ throw readable[sym.storedError];
+ }
+ const readableController = readable[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>;
+ if (
+ readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
+ ) {
+ readableStreamDefaultControllerClose(readableController);
+ }
+ },
+ (r) => {
+ transformStreamError(stream, r);
+ throw readable[sym.storedError];
+ }
+ );
+}
+
+function transformStreamDefaultSinkWriteAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ chunk: I
+): Promise<void> {
+ assert(stream[sym.writable][sym.state] === "writable");
+ const controller = stream[sym.transformStreamController];
+ if (stream[sym.backpressure]) {
+ const backpressureChangePromise = stream[sym.backpressureChangePromise];
+ assert(backpressureChangePromise);
+ return backpressureChangePromise.promise.then(() => {
+ const writable = stream[sym.writable];
+ const state = writable[sym.state];
+ if (state === "erroring") {
+ throw writable[sym.storedError];
+ }
+ assert(state === "writable");
+ return transformStreamDefaultControllerPerformTransform(
+ controller,
+ chunk
+ );
+ });
+ }
+ return transformStreamDefaultControllerPerformTransform(controller, chunk);
+}
+
+function transformStreamDefaultSourcePullAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>
+): Promise<void> {
+ assert(stream[sym.backpressure] === true);
+ assert(stream[sym.backpressureChangePromise] !== undefined);
+ transformStreamSetBackpressure(stream, false);
+ return stream[sym.backpressureChangePromise]!.promise;
+}
+
+function transformStreamError<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ e: any
+): void {
+ readableStreamDefaultControllerError(
+ stream[sym.readable][
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>,
+ e
+ );
+ transformStreamErrorWritableAndUnblockWrite(stream, e);
+}
+
+export function transformStreamDefaultControllerTerminate<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>
+): void {
+ const stream = controller[sym.controlledTransformStream];
+ const readableController = stream[sym.readable][
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>;
+ readableStreamDefaultControllerClose(readableController);
+ const error = new TypeError("TransformStream is closed.");
+ transformStreamErrorWritableAndUnblockWrite(stream, error);
+}
+
+function transformStreamErrorWritableAndUnblockWrite<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ e: any
+): void {
+ transformStreamDefaultControllerClearAlgorithms(
+ stream[sym.transformStreamController]
+ );
+ writableStreamDefaultControllerErrorIfNeeded(
+ stream[sym.writable][sym.writableStreamController]!,
+ e
+ );
+ if (stream[sym.backpressure]) {
+ transformStreamSetBackpressure(stream, false);
+ }
+}
+
+function transformStreamSetBackpressure<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ backpressure: boolean
+): void {
+ assert(stream[sym.backpressure] !== backpressure);
+ if (stream[sym.backpressureChangePromise] !== undefined) {
+ stream[sym.backpressureChangePromise]!.resolve!(undefined);
+ }
+ stream[sym.backpressureChangePromise] = getDeferred<void>();
+ stream[sym.backpressure] = backpressure;
+}
+
function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer {
assert(!isDetachedBuffer(buffer));
const transferredIshVersion = buffer.slice(0);
diff --git a/cli/js/web/streams/symbols.ts b/cli/js/web/streams/symbols.ts
index 9e5cb5715..9c0a336e5 100644
--- a/cli/js/web/streams/symbols.ts
+++ b/cli/js/web/streams/symbols.ts
@@ -11,6 +11,7 @@ export const abortSteps = Symbol("abortSteps");
export const asyncIteratorReader = Symbol("asyncIteratorReader");
export const autoAllocateChunkSize = Symbol("autoAllocateChunkSize");
export const backpressure = Symbol("backpressure");
+export const backpressureChangePromise = Symbol("backpressureChangePromise");
export const byobRequest = Symbol("byobRequest");
export const cancelAlgorithm = Symbol("cancelAlgorithm");
export const cancelSteps = Symbol("cancelSteps");
@@ -22,9 +23,11 @@ export const controlledReadableByteStream = Symbol(
"controlledReadableByteStream"
);
export const controlledReadableStream = Symbol("controlledReadableStream");
+export const controlledTransformStream = Symbol("controlledTransformStream");
export const controlledWritableStream = Symbol("controlledWritableStream");
export const disturbed = Symbol("disturbed");
export const errorSteps = Symbol("errorSteps");
+export const flushAlgorithm = Symbol("flushAlgorithm");
export const forAuthorCode = Symbol("forAuthorCode");
export const inFlightWriteRequest = Symbol("inFlightWriteRequest");
export const inFlightCloseRequest = Symbol("inFlightCloseRequest");
@@ -39,6 +42,7 @@ export const pulling = Symbol("pulling");
export const pullSteps = Symbol("pullSteps");
export const queue = Symbol("queue");
export const queueTotalSize = Symbol("queueTotalSize");
+export const readable = Symbol("readable");
export const readableStreamController = Symbol("readableStreamController");
export const reader = Symbol("reader");
export const readRequests = Symbol("readRequests");
@@ -48,7 +52,10 @@ export const state = Symbol("state");
export const storedError = Symbol("storedError");
export const strategyHWM = Symbol("strategyHWM");
export const strategySizeAlgorithm = Symbol("strategySizeAlgorithm");
+export const transformAlgorithm = Symbol("transformAlgorithm");
+export const transformStreamController = Symbol("transformStreamController");
export const writableStreamController = Symbol("writableStreamController");
export const writeAlgorithm = Symbol("writeAlgorithm");
+export const writable = Symbol("writable");
export const writer = Symbol("writer");
export const writeRequests = Symbol("writeRequests");
diff --git a/cli/js/web/streams/transform_stream.ts b/cli/js/web/streams/transform_stream.ts
new file mode 100644
index 000000000..ac08fea3f
--- /dev/null
+++ b/cli/js/web/streams/transform_stream.ts
@@ -0,0 +1,118 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ Deferred,
+ getDeferred,
+ initializeTransformStream,
+ invokeOrNoop,
+ isTransformStream,
+ makeSizeAlgorithmFromSizeFunction,
+ setFunctionName,
+ setUpTransformStreamDefaultControllerFromTransformer,
+ validateAndNormalizeHighWaterMark,
+} from "./internals.ts";
+import { ReadableStreamImpl } from "./readable_stream.ts";
+import * as sym from "./symbols.ts";
+import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts";
+import { WritableStreamImpl } from "./writable_stream.ts";
+import { customInspect, inspect } from "../console.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class TransformStreamImpl<I = any, O = any>
+ implements TransformStream<I, O> {
+ [sym.backpressure]?: boolean;
+ [sym.backpressureChangePromise]?: Deferred<void>;
+ [sym.readable]: ReadableStreamImpl<O>;
+ [sym.transformStreamController]: TransformStreamDefaultControllerImpl<I, O>;
+ [sym.writable]: WritableStreamImpl<I>;
+
+ constructor(
+ transformer: Transformer<I, O> = {},
+ writableStrategy: QueuingStrategy<I> = {},
+ readableStrategy: QueuingStrategy<O> = {}
+ ) {
+ const writableSizeFunction = writableStrategy.size;
+ let writableHighWaterMark = writableStrategy.highWaterMark;
+ const readableSizeFunction = readableStrategy.size;
+ let readableHighWaterMark = readableStrategy.highWaterMark;
+ const writableType = transformer.writableType;
+ if (writableType !== undefined) {
+ throw new RangeError(
+ `Expected transformer writableType to be undefined, received "${String(
+ writableType
+ )}"`
+ );
+ }
+ const writableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction(
+ writableSizeFunction
+ );
+ if (writableHighWaterMark === undefined) {
+ writableHighWaterMark = 1;
+ }
+ writableHighWaterMark = validateAndNormalizeHighWaterMark(
+ writableHighWaterMark
+ );
+ const readableType = transformer.readableType;
+ if (readableType !== undefined) {
+ throw new RangeError(
+ `Expected transformer readableType to be undefined, received "${String(
+ readableType
+ )}"`
+ );
+ }
+ const readableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction(
+ readableSizeFunction
+ );
+ if (readableHighWaterMark === undefined) {
+ readableHighWaterMark = 1;
+ }
+ readableHighWaterMark = validateAndNormalizeHighWaterMark(
+ readableHighWaterMark
+ );
+ const startPromise = getDeferred<void>();
+ initializeTransformStream(
+ this,
+ startPromise.promise,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm
+ );
+ // the brand check expects this, and the brand check occurs in the following
+ // but the property hasn't been defined.
+ Object.defineProperty(this, sym.transformStreamController, {
+ value: undefined,
+ writable: true,
+ configurable: true,
+ });
+ setUpTransformStreamDefaultControllerFromTransformer(this, transformer);
+ const startResult: void | PromiseLike<void> = invokeOrNoop(
+ transformer,
+ "start",
+ this[sym.transformStreamController]
+ );
+ startPromise.resolve(startResult);
+ }
+
+ get readable(): ReadableStream<O> {
+ if (!isTransformStream(this)) {
+ throw new TypeError("Invalid TransformStream.");
+ }
+ return this[sym.readable];
+ }
+
+ get writable(): WritableStream<I> {
+ if (!isTransformStream(this)) {
+ throw new TypeError("Invalid TransformStream.");
+ }
+ return this[sym.writable];
+ }
+
+ [customInspect](): string {
+ return `${this.constructor.name} {\n readable: ${inspect(
+ this.readable
+ )}\n writable: ${inspect(this.writable)}\n}`;
+ }
+}
+
+setFunctionName(TransformStreamImpl, "TransformStream");
diff --git a/cli/js/web/streams/transform_stream_default_controller.ts b/cli/js/web/streams/transform_stream_default_controller.ts
new file mode 100644
index 000000000..2fc8d2160
--- /dev/null
+++ b/cli/js/web/streams/transform_stream_default_controller.ts
@@ -0,0 +1,75 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ FlushAlgorithm,
+ isTransformStreamDefaultController,
+ readableStreamDefaultControllerGetDesiredSize,
+ setFunctionName,
+ TransformAlgorithm,
+ transformStreamDefaultControllerEnqueue,
+ transformStreamDefaultControllerError,
+ transformStreamDefaultControllerTerminate,
+} from "./internals.ts";
+import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
+import * as sym from "./symbols.ts";
+import { TransformStreamImpl } from "./transform_stream.ts";
+import { customInspect } from "../console.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class TransformStreamDefaultControllerImpl<I = any, O = any>
+ implements TransformStreamDefaultController<O> {
+ [sym.controlledTransformStream]: TransformStreamImpl<I, O>;
+ [sym.flushAlgorithm]: FlushAlgorithm;
+ [sym.transformAlgorithm]: TransformAlgorithm<I>;
+
+ private constructor() {
+ throw new TypeError(
+ "TransformStreamDefaultController's constructor cannot be called."
+ );
+ }
+
+ get desiredSize(): number | null {
+ if (!isTransformStreamDefaultController(this)) {
+ throw new TypeError("Invalid TransformStreamDefaultController.");
+ }
+ const readableController = this[sym.controlledTransformStream][
+ sym.readable
+ ][sym.readableStreamController];
+ return readableStreamDefaultControllerGetDesiredSize(
+ readableController as ReadableStreamDefaultControllerImpl<O>
+ );
+ }
+
+ enqueue(chunk: O): void {
+ if (!isTransformStreamDefaultController(this)) {
+ throw new TypeError("Invalid TransformStreamDefaultController.");
+ }
+ transformStreamDefaultControllerEnqueue(this, chunk);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ error(reason?: any): void {
+ if (!isTransformStreamDefaultController(this)) {
+ throw new TypeError("Invalid TransformStreamDefaultController.");
+ }
+ transformStreamDefaultControllerError(this, reason);
+ }
+
+ terminate(): void {
+ if (!isTransformStreamDefaultController(this)) {
+ throw new TypeError("Invalid TransformStreamDefaultController.");
+ }
+ transformStreamDefaultControllerTerminate(this);
+ }
+
+ [customInspect](): string {
+ return `${this.constructor.name} { desiredSize: ${String(
+ this.desiredSize
+ )} }`;
+ }
+}
+
+setFunctionName(
+ TransformStreamDefaultControllerImpl,
+ "TransformStreamDefaultController"
+);