summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/js/globals.ts2
-rw-r--r--cli/js/lib.deno.shared_globals.d.ts36
-rw-r--r--cli/js/tests/streams_transform_test.ts562
-rw-r--r--cli/js/tests/unit_tests.ts1
-rw-r--r--cli/js/web/streams/internals.ts364
-rw-r--r--cli/js/web/streams/symbols.ts7
-rw-r--r--cli/js/web/streams/transform_stream.ts118
-rw-r--r--cli/js/web/streams/transform_stream_default_controller.ts75
8 files changed, 1156 insertions, 9 deletions
diff --git a/cli/js/globals.ts b/cli/js/globals.ts
index caf069ffd..89f6075a3 100644
--- a/cli/js/globals.ts
+++ b/cli/js/globals.ts
@@ -23,6 +23,7 @@ import * as workers from "./web/workers.ts";
import * as performanceUtil from "./web/performance.ts";
import * as request from "./web/request.ts";
import * as readableStream from "./web/streams/readable_stream.ts";
+import * as transformStream from "./web/streams/transform_stream.ts";
import * as queuingStrategy from "./web/streams/queuing_strategy.ts";
import * as writableStream from "./web/streams/writable_stream.ts";
@@ -234,6 +235,7 @@ export const windowOrWorkerGlobalScopeProperties = {
TextEncoder: nonEnumerable(textEncoding.TextEncoder),
TextDecoder: nonEnumerable(textEncoding.TextDecoder),
ReadableStream: nonEnumerable(readableStream.ReadableStreamImpl),
+ TransformStream: nonEnumerable(transformStream.TransformStreamImpl),
Request: nonEnumerable(request.Request),
Response: nonEnumerable(fetchTypes.Response),
performance: writable(new performanceUtil.Performance()),
diff --git a/cli/js/lib.deno.shared_globals.d.ts b/cli/js/lib.deno.shared_globals.d.ts
index 8f73f0585..d96230447 100644
--- a/cli/js/lib.deno.shared_globals.d.ts
+++ b/cli/js/lib.deno.shared_globals.d.ts
@@ -423,6 +423,42 @@ interface WritableStreamDefaultWriter<W = any> {
write(chunk: W): Promise<void>;
}
+declare class TransformStream<I = any, O = any> {
+ constructor(
+ transformer?: Transformer<I, O>,
+ writableStrategy?: QueuingStrategy<I>,
+ readableStrategy?: QueuingStrategy<O>
+ );
+ readonly readable: ReadableStream<O>;
+ readonly writable: WritableStream<I>;
+}
+
+interface TransformStreamDefaultController<O = any> {
+ readonly desiredSize: number | null;
+ enqueue(chunk: O): void;
+ error(reason?: any): void;
+ terminate(): void;
+}
+
+interface Transformer<I = any, O = any> {
+ flush?: TransformStreamDefaultControllerCallback<O>;
+ readableType?: undefined;
+ start?: TransformStreamDefaultControllerCallback<O>;
+ transform?: TransformStreamDefaultControllerTransformCallback<I, O>;
+ writableType?: undefined;
+}
+
+interface TransformStreamDefaultControllerCallback<O> {
+ (controller: TransformStreamDefaultController<O>): void | PromiseLike<void>;
+}
+
+interface TransformStreamDefaultControllerTransformCallback<I, O> {
+ (
+ chunk: I,
+ controller: TransformStreamDefaultController<O>
+ ): void | PromiseLike<void>;
+}
+
interface DOMStringList {
/** Returns the number of strings in strings. */
readonly length: number;
diff --git a/cli/js/tests/streams_transform_test.ts b/cli/js/tests/streams_transform_test.ts
new file mode 100644
index 000000000..f3ec148ae
--- /dev/null
+++ b/cli/js/tests/streams_transform_test.ts
@@ -0,0 +1,562 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+import {
+ unitTest,
+ assert,
+ assertEquals,
+ assertNotEquals,
+ assertThrows,
+} from "./test_util.ts";
+
+function delay(seconds: number): Promise<void> {
+ return new Promise<void>((resolve) => {
+ setTimeout(() => {
+ resolve();
+ }, seconds);
+ });
+}
+
+function readableStreamToArray<R>(
+ readable: { getReader(): ReadableStreamDefaultReader<R> },
+ reader?: ReadableStreamDefaultReader<R>
+): Promise<R[]> {
+ if (reader === undefined) {
+ reader = readable.getReader();
+ }
+
+ const chunks: R[] = [];
+
+ return pump();
+
+ function pump(): Promise<R[]> {
+ return reader!.read().then((result) => {
+ if (result.done) {
+ return chunks;
+ }
+
+ chunks.push(result.value);
+ return pump();
+ });
+ }
+}
+
+unitTest(function transformStreamConstructedWithTransformFunction() {
+ new TransformStream({ transform(): void {} });
+});
+
+unitTest(function transformStreamConstructedNoTransform() {
+ new TransformStream();
+ new TransformStream({});
+});
+
+unitTest(function transformStreamIntstancesHaveProperProperties() {
+ const ts = new TransformStream({ transform(): void {} });
+ const proto = Object.getPrototypeOf(ts);
+
+ const writableStream = Object.getOwnPropertyDescriptor(proto, "writable");
+ assert(writableStream !== undefined, "it has a writable property");
+ assert(!writableStream.enumerable, "writable should be non-enumerable");
+ assertEquals(
+ typeof writableStream.get,
+ "function",
+ "writable should have a getter"
+ );
+ assertEquals(
+ writableStream.set,
+ undefined,
+ "writable should not have a setter"
+ );
+ assert(writableStream.configurable, "writable should be configurable");
+ assert(
+ ts.writable instanceof WritableStream,
+ "writable is an instance of WritableStream"
+ );
+ assert(
+ WritableStream.prototype.getWriter.call(ts.writable),
+ "writable should pass WritableStream brand check"
+ );
+
+ const readableStream = Object.getOwnPropertyDescriptor(proto, "readable");
+ assert(readableStream !== undefined, "it has a readable property");
+ assert(!readableStream.enumerable, "readable should be non-enumerable");
+ assertEquals(
+ typeof readableStream.get,
+ "function",
+ "readable should have a getter"
+ );
+ assertEquals(
+ readableStream.set,
+ undefined,
+ "readable should not have a setter"
+ );
+ assert(readableStream.configurable, "readable should be configurable");
+ assert(
+ ts.readable instanceof ReadableStream,
+ "readable is an instance of ReadableStream"
+ );
+ assertNotEquals(
+ ReadableStream.prototype.getReader.call(ts.readable),
+ undefined,
+ "readable should pass ReadableStream brand check"
+ );
+});
+
+unitTest(function transformStreamWritableStartsAsWritable() {
+ const ts = new TransformStream({ transform(): void {} });
+
+ const writer = ts.writable.getWriter();
+ assertEquals(writer.desiredSize, 1, "writer.desiredSize should be 1");
+});
+
+unitTest(async function transformStreamReadableCanReadOutOfWritable() {
+ const ts = new TransformStream();
+
+ const writer = ts.writable.getWriter();
+ writer.write("a");
+ assertEquals(
+ writer.desiredSize,
+ 0,
+ "writer.desiredSize should be 0 after write()"
+ );
+
+ const result = await ts.readable.getReader().read();
+ assertEquals(
+ result.value,
+ "a",
+ "result from reading the readable is the same as was written to writable"
+ );
+ assert(!result.done, "stream should not be done");
+
+ await delay(0);
+ assert(writer.desiredSize === 1, "desiredSize should be 1 again");
+});
+
+unitTest(async function transformStreamCanReadWhatIsWritten() {
+ let c: TransformStreamDefaultController;
+ const ts = new TransformStream({
+ start(controller: TransformStreamDefaultController): void {
+ c = controller;
+ },
+ transform(chunk: string): void {
+ c.enqueue(chunk.toUpperCase());
+ },
+ });
+
+ const writer = ts.writable.getWriter();
+ writer.write("a");
+
+ const result = await ts.readable.getReader().read();
+ assertEquals(
+ result.value,
+ "A",
+ "result from reading the readable is the transformation of what was written to writable"
+ );
+ assert(!result.done, "stream should not be done");
+});
+
+unitTest(async function transformStreamCanReadBothChunks() {
+ let c: TransformStreamDefaultController;
+ const ts = new TransformStream({
+ start(controller: TransformStreamDefaultController): void {
+ c = controller;
+ },
+ transform(chunk: string): void {
+ c.enqueue(chunk.toUpperCase());
+ c.enqueue(chunk.toUpperCase());
+ },
+ });
+
+ const writer = ts.writable.getWriter();
+ writer.write("a");
+
+ const reader = ts.readable.getReader();
+
+ const result1 = await reader.read();
+ assertEquals(
+ result1.value,
+ "A",
+ "the first chunk read is the transformation of the single chunk written"
+ );
+ assert(!result1.done, "stream should not be done");
+
+ const result2 = await reader.read();
+ assertEquals(
+ result2.value,
+ "A",
+ "the second chunk read is also the transformation of the single chunk written"
+ );
+ assert(!result2.done, "stream should not be done");
+});
+
+unitTest(async function transformStreamCanReadWhatIsWritten() {
+ let c: TransformStreamDefaultController;
+ const ts = new TransformStream({
+ start(controller: TransformStreamDefaultController): void {
+ c = controller;
+ },
+ transform(chunk: string): Promise<void> {
+ return delay(0).then(() => c.enqueue(chunk.toUpperCase()));
+ },
+ });
+
+ const writer = ts.writable.getWriter();
+ writer.write("a");
+
+ const result = await ts.readable.getReader().read();
+ assertEquals(
+ result.value,
+ "A",
+ "result from reading the readable is the transformation of what was written to writable"
+ );
+ assert(!result.done, "stream should not be done");
+});
+
+unitTest(async function transformStreamAsyncReadMultipleChunks() {
+ let doSecondEnqueue: () => void;
+ let returnFromTransform: () => void;
+ const ts = new TransformStream({
+ transform(
+ chunk: string,
+ controller: TransformStreamDefaultController
+ ): Promise<void> {
+ delay(0).then(() => controller.enqueue(chunk.toUpperCase()));
+ doSecondEnqueue = (): void => controller.enqueue(chunk.toUpperCase());
+ return new Promise((resolve) => {
+ returnFromTransform = resolve;
+ });
+ },
+ });
+
+ const reader = ts.readable.getReader();
+
+ const writer = ts.writable.getWriter();
+ writer.write("a");
+
+ const result1 = await reader.read();
+ assertEquals(
+ result1.value,
+ "A",
+ "the first chunk read is the transformation of the single chunk written"
+ );
+ assert(!result1.done, "stream should not be done");
+ doSecondEnqueue!();
+
+ const result2 = await reader.read();
+ assertEquals(
+ result2.value,
+ "A",
+ "the second chunk read is also the transformation of the single chunk written"
+ );
+ assert(!result2.done, "stream should not be done");
+ returnFromTransform!();
+});
+
+unitTest(function transformStreamClosingWriteClosesRead() {
+ const ts = new TransformStream({ transform(): void {} });
+
+ const writer = ts.writable.getWriter();
+ writer.close();
+
+ return Promise.all([writer.closed, ts.readable.getReader().closed]).then(
+ undefined
+ );
+});
+
+unitTest(async function transformStreamCloseWaitAwaitsTransforms() {
+ let transformResolve: () => void;
+ const transformPromise = new Promise<void>((resolve) => {
+ transformResolve = resolve;
+ });
+ const ts = new TransformStream(
+ {
+ transform(): Promise<void> {
+ return transformPromise;
+ },
+ },
+ undefined,
+ { highWaterMark: 1 }
+ );
+
+ const writer = ts.writable.getWriter();
+ writer.write("a");
+ writer.close();
+
+ let rsClosed = false;
+ ts.readable.getReader().closed.then(() => {
+ rsClosed = true;
+ });
+
+ await delay(0);
+ assertEquals(rsClosed, false, "readable is not closed after a tick");
+ transformResolve!();
+
+ await writer.closed;
+ // TODO: Is this expectation correct?
+ assertEquals(rsClosed, true, "readable is closed at that point");
+});
+
+unitTest(async function transformStreamCloseWriteAfterSyncEnqueues() {
+ let c: TransformStreamDefaultController<string>;
+ const ts = new TransformStream<string, string>({
+ start(controller: TransformStreamDefaultController): void {
+ c = controller;
+ },
+ transform(): Promise<void> {
+ c.enqueue("x");
+ c.enqueue("y");
+ return delay(0);
+ },
+ });
+
+ const writer = ts.writable.getWriter();
+ writer.write("a");
+ writer.close();
+
+ const readableChunks = readableStreamToArray(ts.readable);
+
+ await writer.closed;
+ const chunks = await readableChunks;
+ assertEquals(
+ chunks,
+ ["x", "y"],
+ "both enqueued chunks can be read from the readable"
+ );
+});
+
+unitTest(async function transformStreamWritableCloseAsyncAfterAsyncEnqueues() {
+ let c: TransformStreamDefaultController<string>;
+ const ts = new TransformStream<string, string>({
+ start(controller: TransformStreamDefaultController<string>): void {
+ c = controller;
+ },
+ transform(): Promise<void> {
+ return delay(0)
+ .then(() => c.enqueue("x"))
+ .then(() => c.enqueue("y"))
+ .then(() => delay(0));
+ },
+ });
+
+ const writer = ts.writable.getWriter();
+ writer.write("a");
+ writer.close();
+
+ const readableChunks = readableStreamToArray(ts.readable);
+
+ await writer.closed;
+ const chunks = await readableChunks;
+ assertEquals(
+ chunks,
+ ["x", "y"],
+ "both enqueued chunks can be read from the readable"
+ );
+});
+
+unitTest(async function transformStreamTransformerMethodsCalledAsMethods() {
+ let c: TransformStreamDefaultController<string>;
+ const transformer = {
+ suffix: "-suffix",
+
+ start(controller: TransformStreamDefaultController<string>): void {
+ c = controller;
+ c.enqueue("start" + this.suffix);
+ },
+
+ transform(chunk: string): void {
+ c.enqueue(chunk + this.suffix);
+ },
+
+ flush(): void {
+ c.enqueue("flushed" + this.suffix);
+ },
+ };
+ const ts = new TransformStream(transformer);
+
+ const writer = ts.writable.getWriter();
+ writer.write("a");
+ writer.close();
+
+ const readableChunks = readableStreamToArray(ts.readable);
+
+ await writer.closed;
+ const chunks = await readableChunks;
+ assertEquals(
+ chunks,
+ ["start-suffix", "a-suffix", "flushed-suffix"],
+ "all enqueued chunks have suffixes"
+ );
+});
+
+unitTest(async function transformStreamMethodsShouldNotBeAppliedOrCalled() {
+ function functionWithOverloads(): void {}
+ functionWithOverloads.apply = (): void => {
+ throw new Error("apply() should not be called");
+ };
+ functionWithOverloads.call = (): void => {
+ throw new Error("call() should not be called");
+ };
+ const ts = new TransformStream({
+ start: functionWithOverloads,
+ transform: functionWithOverloads,
+ flush: functionWithOverloads,
+ });
+ const writer = ts.writable.getWriter();
+ writer.write("a");
+ writer.close();
+
+ await readableStreamToArray(ts.readable);
+});
+
+unitTest(async function transformStreamCallTransformSync() {
+ let transformCalled = false;
+ const ts = new TransformStream(
+ {
+ transform(): void {
+ transformCalled = true;
+ },
+ },
+ undefined,
+ { highWaterMark: Infinity }
+ );
+ // transform() is only called synchronously when there is no backpressure and
+ // all microtasks have run.
+ await delay(0);
+ const writePromise = ts.writable.getWriter().write(undefined);
+ assert(transformCalled, "transform() should have been called");
+ await writePromise;
+});
+
+unitTest(function transformStreamCloseWriteCloesesReadWithNoChunks() {
+ const ts = new TransformStream({}, undefined, { highWaterMark: 0 });
+
+ const writer = ts.writable.getWriter();
+ writer.close();
+
+ return Promise.all([writer.closed, ts.readable.getReader().closed]).then(
+ undefined
+ );
+});
+
+unitTest(function transformStreamEnqueueThrowsAfterTerminate() {
+ new TransformStream({
+ start(controller: TransformStreamDefaultController): void {
+ controller.terminate();
+ assertThrows(() => {
+ controller.enqueue(undefined);
+ }, TypeError);
+ },
+ });
+});
+
+unitTest(function transformStreamEnqueueThrowsAfterReadableCancel() {
+ let controller: TransformStreamDefaultController;
+ const ts = new TransformStream({
+ start(c: TransformStreamDefaultController): void {
+ controller = c;
+ },
+ });
+ const cancelPromise = ts.readable.cancel();
+ assertThrows(
+ () => controller.enqueue(undefined),
+ TypeError,
+ undefined,
+ "enqueue should throw"
+ );
+ return cancelPromise;
+});
+
+unitTest(function transformStreamSecondTerminateNoOp() {
+ new TransformStream({
+ start(controller: TransformStreamDefaultController): void {
+ controller.terminate();
+ controller.terminate();
+ },
+ });
+});
+
+unitTest(async function transformStreamTerminateAfterReadableCancelIsNoop() {
+ let controller: TransformStreamDefaultController;
+ const ts = new TransformStream({
+ start(c: TransformStreamDefaultController): void {
+ controller = c;
+ },
+ });
+ const cancelReason = { name: "cancelReason" };
+ const cancelPromise = ts.readable.cancel(cancelReason);
+ controller!.terminate();
+ await cancelPromise;
+ try {
+ await ts.writable.getWriter().closed;
+ } catch (e) {
+ assert(e === cancelReason);
+ return;
+ }
+ throw new Error("closed should have rejected");
+});
+
+unitTest(async function transformStreamStartCalledOnce() {
+ let calls = 0;
+ new TransformStream({
+ start(): void {
+ ++calls;
+ },
+ });
+ await delay(0);
+ assertEquals(calls, 1, "start() should have been called exactly once");
+});
+
+unitTest(function transformStreamReadableTypeThrows() {
+ assertThrows(
+ // eslint-disable-next-line
+ () => new TransformStream({ readableType: "bytes" as any }),
+ RangeError,
+ undefined,
+ "constructor should throw"
+ );
+});
+
+unitTest(function transformStreamWirtableTypeThrows() {
+ assertThrows(
+ // eslint-disable-next-line
+ () => new TransformStream({ writableType: "bytes" as any }),
+ RangeError,
+ undefined,
+ "constructor should throw"
+ );
+});
+
+unitTest(function transformStreamSubclassable() {
+ class Subclass extends TransformStream {
+ extraFunction(): boolean {
+ return true;
+ }
+ }
+ assert(
+ Object.getPrototypeOf(Subclass.prototype) === TransformStream.prototype,
+ "Subclass.prototype's prototype should be TransformStream.prototype"
+ );
+ assert(
+ Object.getPrototypeOf(Subclass) === TransformStream,
+ "Subclass's prototype should be TransformStream"
+ );
+ const sub = new Subclass();
+ assert(
+ sub instanceof TransformStream,
+ "Subclass object should be an instance of TransformStream"
+ );
+ assert(
+ sub instanceof Subclass,
+ "Subclass object should be an instance of Subclass"
+ );
+ const readableGetter = Object.getOwnPropertyDescriptor(
+ TransformStream.prototype,
+ "readable"
+ )!.get;
+ assert(
+ readableGetter!.call(sub) === sub.readable,
+ "Subclass object should pass brand check"
+ );
+ assert(
+ sub.extraFunction(),
+ "extraFunction() should be present on Subclass object"
+ );
+});
diff --git a/cli/js/tests/unit_tests.ts b/cli/js/tests/unit_tests.ts
index 40d0124d3..7327bcc05 100644
--- a/cli/js/tests/unit_tests.ts
+++ b/cli/js/tests/unit_tests.ts
@@ -53,6 +53,7 @@ import "./resources_test.ts";
import "./signal_test.ts";
import "./stat_test.ts";
import "./streams_piping_test.ts";
+import "./streams_transform_test.ts";
import "./streams_writable_test.ts";
import "./symlink_test.ts";
import "./text_encoding_test.ts";
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts
index 846db096e..5ef094afc 100644
--- a/cli/js/web/streams/internals.ts
+++ b/cli/js/web/streams/internals.ts
@@ -13,6 +13,8 @@ import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_c
import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
+import { TransformStreamImpl } from "./transform_stream.ts";
+import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts";
import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
import { WritableStreamImpl } from "./writable_stream.ts";
@@ -36,10 +38,12 @@ type Container<R = any> = {
[sym.queue]: Array<Pair<R> | BufferQueueItem>;
[sym.queueTotalSize]: number;
};
+export type FlushAlgorithm = () => Promise<void>;
export type Pair<R> = { value: R; size: number };
export type PullAlgorithm = () => PromiseLike<void>;
export type SizeAlgorithm<T> = (chunk: T) => number;
export type StartAlgorithm = () => void | PromiseLike<void>;
+export type TransformAlgorithm<I> = (chunk: I) => Promise<void>;
export type WriteAlgorithm<W> = (chunk: W) => Promise<void>;
export interface Deferred<T> {
promise: Promise<T>;
@@ -76,8 +80,16 @@ export function acquireWritableStreamDefaultWriter<W>(
return new WritableStreamDefaultWriterImpl(stream);
}
+export function call<F extends (...args: any[]) => any>(
+ fn: F,
+ v: ThisType<F>,
+ args: Parameters<F>
+): ReturnType<F> {
+ return Function.prototype.apply.call(fn, v, args);
+}
+
function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource,
+ O extends UnderlyingByteSource | UnderlyingSource | Transformer,
P extends keyof O
>(
underlyingObject: O,
@@ -86,7 +98,7 @@ function createAlgorithmFromUnderlyingMethod<
...extraArgs: any[]
): () => Promise<void>;
function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource,
+ O extends UnderlyingByteSource | UnderlyingSource | Transformer,
P extends keyof O
>(
underlyingObject: O,
@@ -95,7 +107,7 @@ function createAlgorithmFromUnderlyingMethod<
...extraArgs: any[]
): (arg: any) => Promise<void>;
function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource,
+ O extends UnderlyingByteSource | UnderlyingSource | Transformer,
P extends keyof O
>(
underlyingObject: O,
@@ -110,11 +122,11 @@ function createAlgorithmFromUnderlyingMethod<
}
if (algoArgCount === 0) {
return async (): Promise<void> =>
- method.call(underlyingObject, ...extraArgs);
+ call(method, underlyingObject, extraArgs as any);
} else {
return async (arg: any): Promise<void> => {
const fullArgs = [arg, ...extraArgs];
- return method.call(underlyingObject, ...fullArgs);
+ return call(method, underlyingObject, fullArgs as any);
};
}
}
@@ -148,6 +160,33 @@ function createReadableStream<T>(
return stream;
}
+function createWritableStream<W>(
+ startAlgorithm: StartAlgorithm,
+ writeAlgorithm: WriteAlgorithm<W>,
+ closeAlgorithm: CloseAlgorithm,
+ abortAlgorithm: AbortAlgorithm,
+ highWaterMark = 1,
+ sizeAlgorithm: SizeAlgorithm<W> = (): number => 1
+): WritableStreamImpl<W> {
+ assert(isNonNegativeNumber(highWaterMark));
+ const stream = Object.create(WritableStreamImpl.prototype);
+ initializeWritableStream(stream);
+ const controller = Object.create(
+ WritableStreamDefaultControllerImpl.prototype
+ );
+ setUpWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+ return stream;
+}
+
export function dequeueValue<R>(container: Container<R>): R {
assert(sym.queue in container && sym.queueTotalSize in container);
assert(container[sym.queue].length);
@@ -185,13 +224,61 @@ export function getDeferred<T>(): Required<Deferred<T>> {
return { promise, resolve: resolve!, reject: reject! };
}
-export function initializeReadableStream(stream: ReadableStreamImpl): void {
+export function initializeReadableStream<R>(
+ stream: ReadableStreamImpl<R>
+): void {
stream[sym.state] = "readable";
stream[sym.reader] = stream[sym.storedError] = undefined;
stream[sym.disturbed] = false;
}
-export function initializeWritableStream(stream: WritableStreamImpl): void {
+export function initializeTransformStream<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ startPromise: Promise<void>,
+ writableHighWaterMark: number,
+ writableSizeAlgorithm: SizeAlgorithm<I>,
+ readableHighWaterMark: number,
+ readableSizeAlgorithm: SizeAlgorithm<O>
+): void {
+ const startAlgorithm = (): Promise<void> => startPromise;
+ const writeAlgorithm = (chunk: any): Promise<void> =>
+ transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
+ const abortAlgorithm = (reason: any): Promise<void> =>
+ transformStreamDefaultSinkAbortAlgorithm(stream, reason);
+ const closeAlgorithm = (): Promise<void> =>
+ transformStreamDefaultSinkCloseAlgorithm(stream);
+ stream[sym.writable] = createWritableStream(
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ writableHighWaterMark,
+ writableSizeAlgorithm
+ );
+ const pullAlgorithm = (): PromiseLike<void> =>
+ transformStreamDefaultSourcePullAlgorithm(stream);
+ const cancelAlgorithm = (reason: any): Promise<void> => {
+ transformStreamErrorWritableAndUnblockWrite(stream, reason);
+ return Promise.resolve(undefined);
+ };
+ stream[sym.readable] = createReadableStream(
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm
+ );
+ stream[sym.backpressure] = stream[sym.backpressureChangePromise] = undefined;
+ transformStreamSetBackpressure(stream, true);
+ Object.defineProperty(stream, sym.transformStreamController, {
+ value: undefined,
+ configurable: true,
+ });
+}
+
+export function initializeWritableStream<W>(
+ stream: WritableStreamImpl<W>
+): void {
stream[sym.state] = "writable";
stream[sym.storedError] = stream[sym.writer] = stream[
sym.writableStreamController
@@ -202,7 +289,7 @@ export function initializeWritableStream(stream: WritableStreamImpl): void {
stream[sym.backpressure] = false;
}
-function invokeOrNoop<O extends any, P extends keyof O>(
+export function invokeOrNoop<O extends Record<string, any>, P extends keyof O>(
o: O,
p: P,
...args: Parameters<O[P]>
@@ -212,7 +299,7 @@ function invokeOrNoop<O extends any, P extends keyof O>(
if (!method) {
return undefined;
}
- return method.call(o, ...args);
+ return call(method, o, args);
}
function isCallable(value: unknown): value is (...args: any) => any {
@@ -299,6 +386,26 @@ export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean {
return stream[sym.reader] ? true : false;
}
+export function isTransformStream(
+ x: unknown
+): x is TransformStreamImpl<any, any> {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.transformStreamController in x)
+ ? false
+ : true;
+}
+
+export function isTransformStreamDefaultController(
+ x: unknown
+): x is TransformStreamDefaultControllerImpl<any, any> {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.controlledTransformStream in x)
+ ? false
+ : true;
+}
+
export function isUnderlyingByteSource(
underlyingSource: UnderlyingByteSource | UnderlyingSource
): underlyingSource is UnderlyingByteSource {
@@ -717,6 +824,14 @@ export function readableStreamDefaultControllerError<T>(
readableStreamError(stream, e);
}
+function readableStreamDefaultControllerHasBackpressure<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): boolean {
+ return readableStreamDefaultControllerShouldCallPull(controller)
+ ? true
+ : false;
+}
+
function readableStreamDefaultControllerShouldCallPull<T>(
controller: ReadableStreamDefaultControllerImpl<T>
): boolean {
@@ -1416,6 +1531,62 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
);
}
+function setUpTransformStreamDefaultController<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ transformAlgorithm: TransformAlgorithm<I>,
+ flushAlgorithm: FlushAlgorithm
+): void {
+ assert(isTransformStream(stream));
+ assert(stream[sym.transformStreamController] === undefined);
+ controller[sym.controlledTransformStream] = stream;
+ stream[sym.transformStreamController] = controller;
+ controller[sym.transformAlgorithm] = transformAlgorithm;
+ controller[sym.flushAlgorithm] = flushAlgorithm;
+}
+
+export function setUpTransformStreamDefaultControllerFromTransformer<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ transformer: Transformer<I, O>
+): void {
+ assert(transformer);
+ const controller = Object.create(
+ TransformStreamDefaultControllerImpl.prototype
+ ) as TransformStreamDefaultControllerImpl<I, O>;
+ let transformAlgorithm: TransformAlgorithm<I> = (chunk) => {
+ try {
+ transformStreamDefaultControllerEnqueue(
+ controller,
+ // it defaults to no tranformation, so I is assumed to be O
+ (chunk as unknown) as O
+ );
+ } catch (e) {
+ return Promise.reject(e);
+ }
+ return Promise.resolve();
+ };
+ const transformMethod = transformer.transform;
+ if (transformMethod) {
+ if (typeof transformMethod !== "function") {
+ throw new TypeError("tranformer.transform must be callable.");
+ }
+ transformAlgorithm = async (chunk): Promise<void> =>
+ call(transformMethod, transformer, [chunk, controller]);
+ }
+ const flushAlgorithm = createAlgorithmFromUnderlyingMethod(
+ transformer,
+ "flush",
+ 0,
+ controller
+ );
+ setUpTransformStreamDefaultController(
+ stream,
+ controller,
+ transformAlgorithm,
+ flushAlgorithm
+ );
+}
+
function setUpWritableStreamDefaultController<W>(
stream: WritableStreamImpl<W>,
controller: WritableStreamDefaultControllerImpl<W>,
@@ -1508,6 +1679,181 @@ export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>(
);
}
+function transformStreamDefaultControllerClearAlgorithms<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>
+): void {
+ (controller as any)[sym.transformAlgorithm] = undefined;
+ (controller as any)[sym.flushAlgorithm] = undefined;
+}
+
+export function transformStreamDefaultControllerEnqueue<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ chunk: O
+): void {
+ const stream = controller[sym.controlledTransformStream];
+ const readableController = stream[sym.readable][
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>;
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
+ throw new TypeError(
+ "TransformStream's readable controller cannot be closed or enqueued."
+ );
+ }
+ try {
+ readableStreamDefaultControllerEnqueue(readableController, chunk);
+ } catch (e) {
+ transformStreamErrorWritableAndUnblockWrite(stream, e);
+ throw stream[sym.readable][sym.storedError];
+ }
+ const backpressure = readableStreamDefaultControllerHasBackpressure(
+ readableController
+ );
+ if (backpressure) {
+ transformStreamSetBackpressure(stream, true);
+ }
+}
+
+export function transformStreamDefaultControllerError<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ e: any
+): void {
+ transformStreamError(controller[sym.controlledTransformStream], e);
+}
+
+function transformStreamDefaultControllerPerformTransform<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ chunk: I
+): Promise<void> {
+ const transformPromise = controller[sym.transformAlgorithm](chunk);
+ return transformPromise.then(undefined, (r) => {
+ transformStreamError(controller[sym.controlledTransformStream], r);
+ throw r;
+ });
+}
+
+function transformStreamDefaultSinkAbortAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ reason: any
+): Promise<void> {
+ transformStreamError(stream, reason);
+ return Promise.resolve(undefined);
+}
+
+function transformStreamDefaultSinkCloseAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>
+): Promise<void> {
+ const readable = stream[sym.readable];
+ const controller = stream[sym.transformStreamController];
+ const flushPromise = controller[sym.flushAlgorithm]();
+ transformStreamDefaultControllerClearAlgorithms(controller);
+ return flushPromise.then(
+ () => {
+ if (readable[sym.state] === "errored") {
+ throw readable[sym.storedError];
+ }
+ const readableController = readable[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>;
+ if (
+ readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
+ ) {
+ readableStreamDefaultControllerClose(readableController);
+ }
+ },
+ (r) => {
+ transformStreamError(stream, r);
+ throw readable[sym.storedError];
+ }
+ );
+}
+
+function transformStreamDefaultSinkWriteAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ chunk: I
+): Promise<void> {
+ assert(stream[sym.writable][sym.state] === "writable");
+ const controller = stream[sym.transformStreamController];
+ if (stream[sym.backpressure]) {
+ const backpressureChangePromise = stream[sym.backpressureChangePromise];
+ assert(backpressureChangePromise);
+ return backpressureChangePromise.promise.then(() => {
+ const writable = stream[sym.writable];
+ const state = writable[sym.state];
+ if (state === "erroring") {
+ throw writable[sym.storedError];
+ }
+ assert(state === "writable");
+ return transformStreamDefaultControllerPerformTransform(
+ controller,
+ chunk
+ );
+ });
+ }
+ return transformStreamDefaultControllerPerformTransform(controller, chunk);
+}
+
+function transformStreamDefaultSourcePullAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>
+): Promise<void> {
+ assert(stream[sym.backpressure] === true);
+ assert(stream[sym.backpressureChangePromise] !== undefined);
+ transformStreamSetBackpressure(stream, false);
+ return stream[sym.backpressureChangePromise]!.promise;
+}
+
+function transformStreamError<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ e: any
+): void {
+ readableStreamDefaultControllerError(
+ stream[sym.readable][
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>,
+ e
+ );
+ transformStreamErrorWritableAndUnblockWrite(stream, e);
+}
+
+export function transformStreamDefaultControllerTerminate<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>
+): void {
+ const stream = controller[sym.controlledTransformStream];
+ const readableController = stream[sym.readable][
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>;
+ readableStreamDefaultControllerClose(readableController);
+ const error = new TypeError("TransformStream is closed.");
+ transformStreamErrorWritableAndUnblockWrite(stream, error);
+}
+
+function transformStreamErrorWritableAndUnblockWrite<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ e: any
+): void {
+ transformStreamDefaultControllerClearAlgorithms(
+ stream[sym.transformStreamController]
+ );
+ writableStreamDefaultControllerErrorIfNeeded(
+ stream[sym.writable][sym.writableStreamController]!,
+ e
+ );
+ if (stream[sym.backpressure]) {
+ transformStreamSetBackpressure(stream, false);
+ }
+}
+
+function transformStreamSetBackpressure<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ backpressure: boolean
+): void {
+ assert(stream[sym.backpressure] !== backpressure);
+ if (stream[sym.backpressureChangePromise] !== undefined) {
+ stream[sym.backpressureChangePromise]!.resolve!(undefined);
+ }
+ stream[sym.backpressureChangePromise] = getDeferred<void>();
+ stream[sym.backpressure] = backpressure;
+}
+
function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer {
assert(!isDetachedBuffer(buffer));
const transferredIshVersion = buffer.slice(0);
diff --git a/cli/js/web/streams/symbols.ts b/cli/js/web/streams/symbols.ts
index 9e5cb5715..9c0a336e5 100644
--- a/cli/js/web/streams/symbols.ts
+++ b/cli/js/web/streams/symbols.ts
@@ -11,6 +11,7 @@ export const abortSteps = Symbol("abortSteps");
export const asyncIteratorReader = Symbol("asyncIteratorReader");
export const autoAllocateChunkSize = Symbol("autoAllocateChunkSize");
export const backpressure = Symbol("backpressure");
+export const backpressureChangePromise = Symbol("backpressureChangePromise");
export const byobRequest = Symbol("byobRequest");
export const cancelAlgorithm = Symbol("cancelAlgorithm");
export const cancelSteps = Symbol("cancelSteps");
@@ -22,9 +23,11 @@ export const controlledReadableByteStream = Symbol(
"controlledReadableByteStream"
);
export const controlledReadableStream = Symbol("controlledReadableStream");
+export const controlledTransformStream = Symbol("controlledTransformStream");
export const controlledWritableStream = Symbol("controlledWritableStream");
export const disturbed = Symbol("disturbed");
export const errorSteps = Symbol("errorSteps");
+export const flushAlgorithm = Symbol("flushAlgorithm");
export const forAuthorCode = Symbol("forAuthorCode");
export const inFlightWriteRequest = Symbol("inFlightWriteRequest");
export const inFlightCloseRequest = Symbol("inFlightCloseRequest");
@@ -39,6 +42,7 @@ export const pulling = Symbol("pulling");
export const pullSteps = Symbol("pullSteps");
export const queue = Symbol("queue");
export const queueTotalSize = Symbol("queueTotalSize");
+export const readable = Symbol("readable");
export const readableStreamController = Symbol("readableStreamController");
export const reader = Symbol("reader");
export const readRequests = Symbol("readRequests");
@@ -48,7 +52,10 @@ export const state = Symbol("state");
export const storedError = Symbol("storedError");
export const strategyHWM = Symbol("strategyHWM");
export const strategySizeAlgorithm = Symbol("strategySizeAlgorithm");
+export const transformAlgorithm = Symbol("transformAlgorithm");
+export const transformStreamController = Symbol("transformStreamController");
export const writableStreamController = Symbol("writableStreamController");
export const writeAlgorithm = Symbol("writeAlgorithm");
+export const writable = Symbol("writable");
export const writer = Symbol("writer");
export const writeRequests = Symbol("writeRequests");
diff --git a/cli/js/web/streams/transform_stream.ts b/cli/js/web/streams/transform_stream.ts
new file mode 100644
index 000000000..ac08fea3f
--- /dev/null
+++ b/cli/js/web/streams/transform_stream.ts
@@ -0,0 +1,118 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ Deferred,
+ getDeferred,
+ initializeTransformStream,
+ invokeOrNoop,
+ isTransformStream,
+ makeSizeAlgorithmFromSizeFunction,
+ setFunctionName,
+ setUpTransformStreamDefaultControllerFromTransformer,
+ validateAndNormalizeHighWaterMark,
+} from "./internals.ts";
+import { ReadableStreamImpl } from "./readable_stream.ts";
+import * as sym from "./symbols.ts";
+import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts";
+import { WritableStreamImpl } from "./writable_stream.ts";
+import { customInspect, inspect } from "../console.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class TransformStreamImpl<I = any, O = any>
+ implements TransformStream<I, O> {
+ [sym.backpressure]?: boolean;
+ [sym.backpressureChangePromise]?: Deferred<void>;
+ [sym.readable]: ReadableStreamImpl<O>;
+ [sym.transformStreamController]: TransformStreamDefaultControllerImpl<I, O>;
+ [sym.writable]: WritableStreamImpl<I>;
+
+ constructor(
+ transformer: Transformer<I, O> = {},
+ writableStrategy: QueuingStrategy<I> = {},
+ readableStrategy: QueuingStrategy<O> = {}
+ ) {
+ const writableSizeFunction = writableStrategy.size;
+ let writableHighWaterMark = writableStrategy.highWaterMark;
+ const readableSizeFunction = readableStrategy.size;
+ let readableHighWaterMark = readableStrategy.highWaterMark;
+ const writableType = transformer.writableType;
+ if (writableType !== undefined) {
+ throw new RangeError(
+ `Expected transformer writableType to be undefined, received "${String(
+ writableType
+ )}"`
+ );
+ }
+ const writableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction(
+ writableSizeFunction
+ );
+ if (writableHighWaterMark === undefined) {
+ writableHighWaterMark = 1;
+ }
+ writableHighWaterMark = validateAndNormalizeHighWaterMark(
+ writableHighWaterMark
+ );
+ const readableType = transformer.readableType;
+ if (readableType !== undefined) {
+ throw new RangeError(
+ `Expected transformer readableType to be undefined, received "${String(
+ readableType
+ )}"`
+ );
+ }
+ const readableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction(
+ readableSizeFunction
+ );
+ if (readableHighWaterMark === undefined) {
+ readableHighWaterMark = 1;
+ }
+ readableHighWaterMark = validateAndNormalizeHighWaterMark(
+ readableHighWaterMark
+ );
+ const startPromise = getDeferred<void>();
+ initializeTransformStream(
+ this,
+ startPromise.promise,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm
+ );
+ // the brand check expects this, and the brand check occurs in the following
+ // but the property hasn't been defined.
+ Object.defineProperty(this, sym.transformStreamController, {
+ value: undefined,
+ writable: true,
+ configurable: true,
+ });
+ setUpTransformStreamDefaultControllerFromTransformer(this, transformer);
+ const startResult: void | PromiseLike<void> = invokeOrNoop(
+ transformer,
+ "start",
+ this[sym.transformStreamController]
+ );
+ startPromise.resolve(startResult);
+ }
+
+ get readable(): ReadableStream<O> {
+ if (!isTransformStream(this)) {
+ throw new TypeError("Invalid TransformStream.");
+ }
+ return this[sym.readable];
+ }
+
+ get writable(): WritableStream<I> {
+ if (!isTransformStream(this)) {
+ throw new TypeError("Invalid TransformStream.");
+ }
+ return this[sym.writable];
+ }
+
+ [customInspect](): string {
+ return `${this.constructor.name} {\n readable: ${inspect(
+ this.readable
+ )}\n writable: ${inspect(this.writable)}\n}`;
+ }
+}
+
+setFunctionName(TransformStreamImpl, "TransformStream");
diff --git a/cli/js/web/streams/transform_stream_default_controller.ts b/cli/js/web/streams/transform_stream_default_controller.ts
new file mode 100644
index 000000000..2fc8d2160
--- /dev/null
+++ b/cli/js/web/streams/transform_stream_default_controller.ts
@@ -0,0 +1,75 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ FlushAlgorithm,
+ isTransformStreamDefaultController,
+ readableStreamDefaultControllerGetDesiredSize,
+ setFunctionName,
+ TransformAlgorithm,
+ transformStreamDefaultControllerEnqueue,
+ transformStreamDefaultControllerError,
+ transformStreamDefaultControllerTerminate,
+} from "./internals.ts";
+import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
+import * as sym from "./symbols.ts";
+import { TransformStreamImpl } from "./transform_stream.ts";
+import { customInspect } from "../console.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class TransformStreamDefaultControllerImpl<I = any, O = any>
+ implements TransformStreamDefaultController<O> {
+ [sym.controlledTransformStream]: TransformStreamImpl<I, O>;
+ [sym.flushAlgorithm]: FlushAlgorithm;
+ [sym.transformAlgorithm]: TransformAlgorithm<I>;
+
+ private constructor() {
+ throw new TypeError(
+ "TransformStreamDefaultController's constructor cannot be called."
+ );
+ }
+
+ get desiredSize(): number | null {
+ if (!isTransformStreamDefaultController(this)) {
+ throw new TypeError("Invalid TransformStreamDefaultController.");
+ }
+ const readableController = this[sym.controlledTransformStream][
+ sym.readable
+ ][sym.readableStreamController];
+ return readableStreamDefaultControllerGetDesiredSize(
+ readableController as ReadableStreamDefaultControllerImpl<O>
+ );
+ }
+
+ enqueue(chunk: O): void {
+ if (!isTransformStreamDefaultController(this)) {
+ throw new TypeError("Invalid TransformStreamDefaultController.");
+ }
+ transformStreamDefaultControllerEnqueue(this, chunk);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ error(reason?: any): void {
+ if (!isTransformStreamDefaultController(this)) {
+ throw new TypeError("Invalid TransformStreamDefaultController.");
+ }
+ transformStreamDefaultControllerError(this, reason);
+ }
+
+ terminate(): void {
+ if (!isTransformStreamDefaultController(this)) {
+ throw new TypeError("Invalid TransformStreamDefaultController.");
+ }
+ transformStreamDefaultControllerTerminate(this);
+ }
+
+ [customInspect](): string {
+ return `${this.constructor.name} { desiredSize: ${String(
+ this.desiredSize
+ )} }`;
+ }
+}
+
+setFunctionName(
+ TransformStreamDefaultControllerImpl,
+ "TransformStreamDefaultController"
+);