summaryrefslogtreecommitdiff
path: root/cli/js/web/streams/internals.ts
diff options
context:
space:
mode:
Diffstat (limited to 'cli/js/web/streams/internals.ts')
-rw-r--r--cli/js/web/streams/internals.ts364
1 files changed, 355 insertions, 9 deletions
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts
index 846db096e..5ef094afc 100644
--- a/cli/js/web/streams/internals.ts
+++ b/cli/js/web/streams/internals.ts
@@ -13,6 +13,8 @@ import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_c
import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts";
import { ReadableStreamImpl } from "./readable_stream.ts";
import * as sym from "./symbols.ts";
+import { TransformStreamImpl } from "./transform_stream.ts";
+import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts";
import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
import { WritableStreamImpl } from "./writable_stream.ts";
@@ -36,10 +38,12 @@ type Container<R = any> = {
[sym.queue]: Array<Pair<R> | BufferQueueItem>;
[sym.queueTotalSize]: number;
};
+export type FlushAlgorithm = () => Promise<void>;
export type Pair<R> = { value: R; size: number };
export type PullAlgorithm = () => PromiseLike<void>;
export type SizeAlgorithm<T> = (chunk: T) => number;
export type StartAlgorithm = () => void | PromiseLike<void>;
+export type TransformAlgorithm<I> = (chunk: I) => Promise<void>;
export type WriteAlgorithm<W> = (chunk: W) => Promise<void>;
export interface Deferred<T> {
promise: Promise<T>;
@@ -76,8 +80,16 @@ export function acquireWritableStreamDefaultWriter<W>(
return new WritableStreamDefaultWriterImpl(stream);
}
+export function call<F extends (...args: any[]) => any>(
+ fn: F,
+ v: ThisType<F>,
+ args: Parameters<F>
+): ReturnType<F> {
+ return Function.prototype.apply.call(fn, v, args);
+}
+
function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource,
+ O extends UnderlyingByteSource | UnderlyingSource | Transformer,
P extends keyof O
>(
underlyingObject: O,
@@ -86,7 +98,7 @@ function createAlgorithmFromUnderlyingMethod<
...extraArgs: any[]
): () => Promise<void>;
function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource,
+ O extends UnderlyingByteSource | UnderlyingSource | Transformer,
P extends keyof O
>(
underlyingObject: O,
@@ -95,7 +107,7 @@ function createAlgorithmFromUnderlyingMethod<
...extraArgs: any[]
): (arg: any) => Promise<void>;
function createAlgorithmFromUnderlyingMethod<
- O extends UnderlyingByteSource | UnderlyingSource,
+ O extends UnderlyingByteSource | UnderlyingSource | Transformer,
P extends keyof O
>(
underlyingObject: O,
@@ -110,11 +122,11 @@ function createAlgorithmFromUnderlyingMethod<
}
if (algoArgCount === 0) {
return async (): Promise<void> =>
- method.call(underlyingObject, ...extraArgs);
+ call(method, underlyingObject, extraArgs as any);
} else {
return async (arg: any): Promise<void> => {
const fullArgs = [arg, ...extraArgs];
- return method.call(underlyingObject, ...fullArgs);
+ return call(method, underlyingObject, fullArgs as any);
};
}
}
@@ -148,6 +160,33 @@ function createReadableStream<T>(
return stream;
}
+function createWritableStream<W>(
+ startAlgorithm: StartAlgorithm,
+ writeAlgorithm: WriteAlgorithm<W>,
+ closeAlgorithm: CloseAlgorithm,
+ abortAlgorithm: AbortAlgorithm,
+ highWaterMark = 1,
+ sizeAlgorithm: SizeAlgorithm<W> = (): number => 1
+): WritableStreamImpl<W> {
+ assert(isNonNegativeNumber(highWaterMark));
+ const stream = Object.create(WritableStreamImpl.prototype);
+ initializeWritableStream(stream);
+ const controller = Object.create(
+ WritableStreamDefaultControllerImpl.prototype
+ );
+ setUpWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+ return stream;
+}
+
export function dequeueValue<R>(container: Container<R>): R {
assert(sym.queue in container && sym.queueTotalSize in container);
assert(container[sym.queue].length);
@@ -185,13 +224,61 @@ export function getDeferred<T>(): Required<Deferred<T>> {
return { promise, resolve: resolve!, reject: reject! };
}
-export function initializeReadableStream(stream: ReadableStreamImpl): void {
+export function initializeReadableStream<R>(
+ stream: ReadableStreamImpl<R>
+): void {
stream[sym.state] = "readable";
stream[sym.reader] = stream[sym.storedError] = undefined;
stream[sym.disturbed] = false;
}
-export function initializeWritableStream(stream: WritableStreamImpl): void {
+export function initializeTransformStream<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ startPromise: Promise<void>,
+ writableHighWaterMark: number,
+ writableSizeAlgorithm: SizeAlgorithm<I>,
+ readableHighWaterMark: number,
+ readableSizeAlgorithm: SizeAlgorithm<O>
+): void {
+ const startAlgorithm = (): Promise<void> => startPromise;
+ const writeAlgorithm = (chunk: any): Promise<void> =>
+ transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
+ const abortAlgorithm = (reason: any): Promise<void> =>
+ transformStreamDefaultSinkAbortAlgorithm(stream, reason);
+ const closeAlgorithm = (): Promise<void> =>
+ transformStreamDefaultSinkCloseAlgorithm(stream);
+ stream[sym.writable] = createWritableStream(
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ writableHighWaterMark,
+ writableSizeAlgorithm
+ );
+ const pullAlgorithm = (): PromiseLike<void> =>
+ transformStreamDefaultSourcePullAlgorithm(stream);
+ const cancelAlgorithm = (reason: any): Promise<void> => {
+ transformStreamErrorWritableAndUnblockWrite(stream, reason);
+ return Promise.resolve(undefined);
+ };
+ stream[sym.readable] = createReadableStream(
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm
+ );
+ stream[sym.backpressure] = stream[sym.backpressureChangePromise] = undefined;
+ transformStreamSetBackpressure(stream, true);
+ Object.defineProperty(stream, sym.transformStreamController, {
+ value: undefined,
+ configurable: true,
+ });
+}
+
+export function initializeWritableStream<W>(
+ stream: WritableStreamImpl<W>
+): void {
stream[sym.state] = "writable";
stream[sym.storedError] = stream[sym.writer] = stream[
sym.writableStreamController
@@ -202,7 +289,7 @@ export function initializeWritableStream(stream: WritableStreamImpl): void {
stream[sym.backpressure] = false;
}
-function invokeOrNoop<O extends any, P extends keyof O>(
+export function invokeOrNoop<O extends Record<string, any>, P extends keyof O>(
o: O,
p: P,
...args: Parameters<O[P]>
@@ -212,7 +299,7 @@ function invokeOrNoop<O extends any, P extends keyof O>(
if (!method) {
return undefined;
}
- return method.call(o, ...args);
+ return call(method, o, args);
}
function isCallable(value: unknown): value is (...args: any) => any {
@@ -299,6 +386,26 @@ export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean {
return stream[sym.reader] ? true : false;
}
+export function isTransformStream(
+ x: unknown
+): x is TransformStreamImpl<any, any> {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.transformStreamController in x)
+ ? false
+ : true;
+}
+
+export function isTransformStreamDefaultController(
+ x: unknown
+): x is TransformStreamDefaultControllerImpl<any, any> {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.controlledTransformStream in x)
+ ? false
+ : true;
+}
+
export function isUnderlyingByteSource(
underlyingSource: UnderlyingByteSource | UnderlyingSource
): underlyingSource is UnderlyingByteSource {
@@ -717,6 +824,14 @@ export function readableStreamDefaultControllerError<T>(
readableStreamError(stream, e);
}
+function readableStreamDefaultControllerHasBackpressure<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): boolean {
+ return readableStreamDefaultControllerShouldCallPull(controller)
+ ? true
+ : false;
+}
+
function readableStreamDefaultControllerShouldCallPull<T>(
controller: ReadableStreamDefaultControllerImpl<T>
): boolean {
@@ -1416,6 +1531,62 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
);
}
+function setUpTransformStreamDefaultController<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ transformAlgorithm: TransformAlgorithm<I>,
+ flushAlgorithm: FlushAlgorithm
+): void {
+ assert(isTransformStream(stream));
+ assert(stream[sym.transformStreamController] === undefined);
+ controller[sym.controlledTransformStream] = stream;
+ stream[sym.transformStreamController] = controller;
+ controller[sym.transformAlgorithm] = transformAlgorithm;
+ controller[sym.flushAlgorithm] = flushAlgorithm;
+}
+
+export function setUpTransformStreamDefaultControllerFromTransformer<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ transformer: Transformer<I, O>
+): void {
+ assert(transformer);
+ const controller = Object.create(
+ TransformStreamDefaultControllerImpl.prototype
+ ) as TransformStreamDefaultControllerImpl<I, O>;
+ let transformAlgorithm: TransformAlgorithm<I> = (chunk) => {
+ try {
+ transformStreamDefaultControllerEnqueue(
+ controller,
+ // it defaults to no tranformation, so I is assumed to be O
+ (chunk as unknown) as O
+ );
+ } catch (e) {
+ return Promise.reject(e);
+ }
+ return Promise.resolve();
+ };
+ const transformMethod = transformer.transform;
+ if (transformMethod) {
+ if (typeof transformMethod !== "function") {
+ throw new TypeError("tranformer.transform must be callable.");
+ }
+ transformAlgorithm = async (chunk): Promise<void> =>
+ call(transformMethod, transformer, [chunk, controller]);
+ }
+ const flushAlgorithm = createAlgorithmFromUnderlyingMethod(
+ transformer,
+ "flush",
+ 0,
+ controller
+ );
+ setUpTransformStreamDefaultController(
+ stream,
+ controller,
+ transformAlgorithm,
+ flushAlgorithm
+ );
+}
+
function setUpWritableStreamDefaultController<W>(
stream: WritableStreamImpl<W>,
controller: WritableStreamDefaultControllerImpl<W>,
@@ -1508,6 +1679,181 @@ export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>(
);
}
+function transformStreamDefaultControllerClearAlgorithms<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>
+): void {
+ (controller as any)[sym.transformAlgorithm] = undefined;
+ (controller as any)[sym.flushAlgorithm] = undefined;
+}
+
+export function transformStreamDefaultControllerEnqueue<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ chunk: O
+): void {
+ const stream = controller[sym.controlledTransformStream];
+ const readableController = stream[sym.readable][
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>;
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
+ throw new TypeError(
+ "TransformStream's readable controller cannot be closed or enqueued."
+ );
+ }
+ try {
+ readableStreamDefaultControllerEnqueue(readableController, chunk);
+ } catch (e) {
+ transformStreamErrorWritableAndUnblockWrite(stream, e);
+ throw stream[sym.readable][sym.storedError];
+ }
+ const backpressure = readableStreamDefaultControllerHasBackpressure(
+ readableController
+ );
+ if (backpressure) {
+ transformStreamSetBackpressure(stream, true);
+ }
+}
+
+export function transformStreamDefaultControllerError<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ e: any
+): void {
+ transformStreamError(controller[sym.controlledTransformStream], e);
+}
+
+function transformStreamDefaultControllerPerformTransform<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>,
+ chunk: I
+): Promise<void> {
+ const transformPromise = controller[sym.transformAlgorithm](chunk);
+ return transformPromise.then(undefined, (r) => {
+ transformStreamError(controller[sym.controlledTransformStream], r);
+ throw r;
+ });
+}
+
+function transformStreamDefaultSinkAbortAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ reason: any
+): Promise<void> {
+ transformStreamError(stream, reason);
+ return Promise.resolve(undefined);
+}
+
+function transformStreamDefaultSinkCloseAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>
+): Promise<void> {
+ const readable = stream[sym.readable];
+ const controller = stream[sym.transformStreamController];
+ const flushPromise = controller[sym.flushAlgorithm]();
+ transformStreamDefaultControllerClearAlgorithms(controller);
+ return flushPromise.then(
+ () => {
+ if (readable[sym.state] === "errored") {
+ throw readable[sym.storedError];
+ }
+ const readableController = readable[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>;
+ if (
+ readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
+ ) {
+ readableStreamDefaultControllerClose(readableController);
+ }
+ },
+ (r) => {
+ transformStreamError(stream, r);
+ throw readable[sym.storedError];
+ }
+ );
+}
+
+function transformStreamDefaultSinkWriteAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ chunk: I
+): Promise<void> {
+ assert(stream[sym.writable][sym.state] === "writable");
+ const controller = stream[sym.transformStreamController];
+ if (stream[sym.backpressure]) {
+ const backpressureChangePromise = stream[sym.backpressureChangePromise];
+ assert(backpressureChangePromise);
+ return backpressureChangePromise.promise.then(() => {
+ const writable = stream[sym.writable];
+ const state = writable[sym.state];
+ if (state === "erroring") {
+ throw writable[sym.storedError];
+ }
+ assert(state === "writable");
+ return transformStreamDefaultControllerPerformTransform(
+ controller,
+ chunk
+ );
+ });
+ }
+ return transformStreamDefaultControllerPerformTransform(controller, chunk);
+}
+
+function transformStreamDefaultSourcePullAlgorithm<I, O>(
+ stream: TransformStreamImpl<I, O>
+): Promise<void> {
+ assert(stream[sym.backpressure] === true);
+ assert(stream[sym.backpressureChangePromise] !== undefined);
+ transformStreamSetBackpressure(stream, false);
+ return stream[sym.backpressureChangePromise]!.promise;
+}
+
+function transformStreamError<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ e: any
+): void {
+ readableStreamDefaultControllerError(
+ stream[sym.readable][
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>,
+ e
+ );
+ transformStreamErrorWritableAndUnblockWrite(stream, e);
+}
+
+export function transformStreamDefaultControllerTerminate<I, O>(
+ controller: TransformStreamDefaultControllerImpl<I, O>
+): void {
+ const stream = controller[sym.controlledTransformStream];
+ const readableController = stream[sym.readable][
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl<O>;
+ readableStreamDefaultControllerClose(readableController);
+ const error = new TypeError("TransformStream is closed.");
+ transformStreamErrorWritableAndUnblockWrite(stream, error);
+}
+
+function transformStreamErrorWritableAndUnblockWrite<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ e: any
+): void {
+ transformStreamDefaultControllerClearAlgorithms(
+ stream[sym.transformStreamController]
+ );
+ writableStreamDefaultControllerErrorIfNeeded(
+ stream[sym.writable][sym.writableStreamController]!,
+ e
+ );
+ if (stream[sym.backpressure]) {
+ transformStreamSetBackpressure(stream, false);
+ }
+}
+
+function transformStreamSetBackpressure<I, O>(
+ stream: TransformStreamImpl<I, O>,
+ backpressure: boolean
+): void {
+ assert(stream[sym.backpressure] !== backpressure);
+ if (stream[sym.backpressureChangePromise] !== undefined) {
+ stream[sym.backpressureChangePromise]!.resolve!(undefined);
+ }
+ stream[sym.backpressureChangePromise] = getDeferred<void>();
+ stream[sym.backpressure] = backpressure;
+}
+
function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer {
assert(!isDetachedBuffer(buffer));
const transferredIshVersion = buffer.slice(0);