diff options
Diffstat (limited to 'cli/js/web/streams/internals.ts')
-rw-r--r-- | cli/js/web/streams/internals.ts | 364 |
1 files changed, 355 insertions, 9 deletions
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts index 846db096e..5ef094afc 100644 --- a/cli/js/web/streams/internals.ts +++ b/cli/js/web/streams/internals.ts @@ -13,6 +13,8 @@ import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_c import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts"; import { ReadableStreamImpl } from "./readable_stream.ts"; import * as sym from "./symbols.ts"; +import { TransformStreamImpl } from "./transform_stream.ts"; +import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts"; import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts"; import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts"; import { WritableStreamImpl } from "./writable_stream.ts"; @@ -36,10 +38,12 @@ type Container<R = any> = { [sym.queue]: Array<Pair<R> | BufferQueueItem>; [sym.queueTotalSize]: number; }; +export type FlushAlgorithm = () => Promise<void>; export type Pair<R> = { value: R; size: number }; export type PullAlgorithm = () => PromiseLike<void>; export type SizeAlgorithm<T> = (chunk: T) => number; export type StartAlgorithm = () => void | PromiseLike<void>; +export type TransformAlgorithm<I> = (chunk: I) => Promise<void>; export type WriteAlgorithm<W> = (chunk: W) => Promise<void>; export interface Deferred<T> { promise: Promise<T>; @@ -76,8 +80,16 @@ export function acquireWritableStreamDefaultWriter<W>( return new WritableStreamDefaultWriterImpl(stream); } +export function call<F extends (...args: any[]) => any>( + fn: F, + v: ThisType<F>, + args: Parameters<F> +): ReturnType<F> { + return Function.prototype.apply.call(fn, v, args); +} + function createAlgorithmFromUnderlyingMethod< - O extends UnderlyingByteSource | UnderlyingSource, + O extends UnderlyingByteSource | UnderlyingSource | Transformer, P extends keyof O >( underlyingObject: O, @@ -86,7 +98,7 @@ function createAlgorithmFromUnderlyingMethod< ...extraArgs: any[] ): () => Promise<void>; function createAlgorithmFromUnderlyingMethod< - O extends UnderlyingByteSource | UnderlyingSource, + O extends UnderlyingByteSource | UnderlyingSource | Transformer, P extends keyof O >( underlyingObject: O, @@ -95,7 +107,7 @@ function createAlgorithmFromUnderlyingMethod< ...extraArgs: any[] ): (arg: any) => Promise<void>; function createAlgorithmFromUnderlyingMethod< - O extends UnderlyingByteSource | UnderlyingSource, + O extends UnderlyingByteSource | UnderlyingSource | Transformer, P extends keyof O >( underlyingObject: O, @@ -110,11 +122,11 @@ function createAlgorithmFromUnderlyingMethod< } if (algoArgCount === 0) { return async (): Promise<void> => - method.call(underlyingObject, ...extraArgs); + call(method, underlyingObject, extraArgs as any); } else { return async (arg: any): Promise<void> => { const fullArgs = [arg, ...extraArgs]; - return method.call(underlyingObject, ...fullArgs); + return call(method, underlyingObject, fullArgs as any); }; } } @@ -148,6 +160,33 @@ function createReadableStream<T>( return stream; } +function createWritableStream<W>( + startAlgorithm: StartAlgorithm, + writeAlgorithm: WriteAlgorithm<W>, + closeAlgorithm: CloseAlgorithm, + abortAlgorithm: AbortAlgorithm, + highWaterMark = 1, + sizeAlgorithm: SizeAlgorithm<W> = (): number => 1 +): WritableStreamImpl<W> { + assert(isNonNegativeNumber(highWaterMark)); + const stream = Object.create(WritableStreamImpl.prototype); + initializeWritableStream(stream); + const controller = Object.create( + WritableStreamDefaultControllerImpl.prototype + ); + setUpWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm + ); + return stream; +} + export function dequeueValue<R>(container: Container<R>): R { assert(sym.queue in container && sym.queueTotalSize in container); assert(container[sym.queue].length); @@ -185,13 +224,61 @@ export function getDeferred<T>(): Required<Deferred<T>> { return { promise, resolve: resolve!, reject: reject! }; } -export function initializeReadableStream(stream: ReadableStreamImpl): void { +export function initializeReadableStream<R>( + stream: ReadableStreamImpl<R> +): void { stream[sym.state] = "readable"; stream[sym.reader] = stream[sym.storedError] = undefined; stream[sym.disturbed] = false; } -export function initializeWritableStream(stream: WritableStreamImpl): void { +export function initializeTransformStream<I, O>( + stream: TransformStreamImpl<I, O>, + startPromise: Promise<void>, + writableHighWaterMark: number, + writableSizeAlgorithm: SizeAlgorithm<I>, + readableHighWaterMark: number, + readableSizeAlgorithm: SizeAlgorithm<O> +): void { + const startAlgorithm = (): Promise<void> => startPromise; + const writeAlgorithm = (chunk: any): Promise<void> => + transformStreamDefaultSinkWriteAlgorithm(stream, chunk); + const abortAlgorithm = (reason: any): Promise<void> => + transformStreamDefaultSinkAbortAlgorithm(stream, reason); + const closeAlgorithm = (): Promise<void> => + transformStreamDefaultSinkCloseAlgorithm(stream); + stream[sym.writable] = createWritableStream( + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + writableHighWaterMark, + writableSizeAlgorithm + ); + const pullAlgorithm = (): PromiseLike<void> => + transformStreamDefaultSourcePullAlgorithm(stream); + const cancelAlgorithm = (reason: any): Promise<void> => { + transformStreamErrorWritableAndUnblockWrite(stream, reason); + return Promise.resolve(undefined); + }; + stream[sym.readable] = createReadableStream( + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + readableHighWaterMark, + readableSizeAlgorithm + ); + stream[sym.backpressure] = stream[sym.backpressureChangePromise] = undefined; + transformStreamSetBackpressure(stream, true); + Object.defineProperty(stream, sym.transformStreamController, { + value: undefined, + configurable: true, + }); +} + +export function initializeWritableStream<W>( + stream: WritableStreamImpl<W> +): void { stream[sym.state] = "writable"; stream[sym.storedError] = stream[sym.writer] = stream[ sym.writableStreamController @@ -202,7 +289,7 @@ export function initializeWritableStream(stream: WritableStreamImpl): void { stream[sym.backpressure] = false; } -function invokeOrNoop<O extends any, P extends keyof O>( +export function invokeOrNoop<O extends Record<string, any>, P extends keyof O>( o: O, p: P, ...args: Parameters<O[P]> @@ -212,7 +299,7 @@ function invokeOrNoop<O extends any, P extends keyof O>( if (!method) { return undefined; } - return method.call(o, ...args); + return call(method, o, args); } function isCallable(value: unknown): value is (...args: any) => any { @@ -299,6 +386,26 @@ export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean { return stream[sym.reader] ? true : false; } +export function isTransformStream( + x: unknown +): x is TransformStreamImpl<any, any> { + return typeof x !== "object" || + x === null || + !(sym.transformStreamController in x) + ? false + : true; +} + +export function isTransformStreamDefaultController( + x: unknown +): x is TransformStreamDefaultControllerImpl<any, any> { + return typeof x !== "object" || + x === null || + !(sym.controlledTransformStream in x) + ? false + : true; +} + export function isUnderlyingByteSource( underlyingSource: UnderlyingByteSource | UnderlyingSource ): underlyingSource is UnderlyingByteSource { @@ -717,6 +824,14 @@ export function readableStreamDefaultControllerError<T>( readableStreamError(stream, e); } +function readableStreamDefaultControllerHasBackpressure<T>( + controller: ReadableStreamDefaultControllerImpl<T> +): boolean { + return readableStreamDefaultControllerShouldCallPull(controller) + ? true + : false; +} + function readableStreamDefaultControllerShouldCallPull<T>( controller: ReadableStreamDefaultControllerImpl<T> ): boolean { @@ -1416,6 +1531,62 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>( ); } +function setUpTransformStreamDefaultController<I, O>( + stream: TransformStreamImpl<I, O>, + controller: TransformStreamDefaultControllerImpl<I, O>, + transformAlgorithm: TransformAlgorithm<I>, + flushAlgorithm: FlushAlgorithm +): void { + assert(isTransformStream(stream)); + assert(stream[sym.transformStreamController] === undefined); + controller[sym.controlledTransformStream] = stream; + stream[sym.transformStreamController] = controller; + controller[sym.transformAlgorithm] = transformAlgorithm; + controller[sym.flushAlgorithm] = flushAlgorithm; +} + +export function setUpTransformStreamDefaultControllerFromTransformer<I, O>( + stream: TransformStreamImpl<I, O>, + transformer: Transformer<I, O> +): void { + assert(transformer); + const controller = Object.create( + TransformStreamDefaultControllerImpl.prototype + ) as TransformStreamDefaultControllerImpl<I, O>; + let transformAlgorithm: TransformAlgorithm<I> = (chunk) => { + try { + transformStreamDefaultControllerEnqueue( + controller, + // it defaults to no tranformation, so I is assumed to be O + (chunk as unknown) as O + ); + } catch (e) { + return Promise.reject(e); + } + return Promise.resolve(); + }; + const transformMethod = transformer.transform; + if (transformMethod) { + if (typeof transformMethod !== "function") { + throw new TypeError("tranformer.transform must be callable."); + } + transformAlgorithm = async (chunk): Promise<void> => + call(transformMethod, transformer, [chunk, controller]); + } + const flushAlgorithm = createAlgorithmFromUnderlyingMethod( + transformer, + "flush", + 0, + controller + ); + setUpTransformStreamDefaultController( + stream, + controller, + transformAlgorithm, + flushAlgorithm + ); +} + function setUpWritableStreamDefaultController<W>( stream: WritableStreamImpl<W>, controller: WritableStreamDefaultControllerImpl<W>, @@ -1508,6 +1679,181 @@ export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>( ); } +function transformStreamDefaultControllerClearAlgorithms<I, O>( + controller: TransformStreamDefaultControllerImpl<I, O> +): void { + (controller as any)[sym.transformAlgorithm] = undefined; + (controller as any)[sym.flushAlgorithm] = undefined; +} + +export function transformStreamDefaultControllerEnqueue<I, O>( + controller: TransformStreamDefaultControllerImpl<I, O>, + chunk: O +): void { + const stream = controller[sym.controlledTransformStream]; + const readableController = stream[sym.readable][ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl<O>; + if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) { + throw new TypeError( + "TransformStream's readable controller cannot be closed or enqueued." + ); + } + try { + readableStreamDefaultControllerEnqueue(readableController, chunk); + } catch (e) { + transformStreamErrorWritableAndUnblockWrite(stream, e); + throw stream[sym.readable][sym.storedError]; + } + const backpressure = readableStreamDefaultControllerHasBackpressure( + readableController + ); + if (backpressure) { + transformStreamSetBackpressure(stream, true); + } +} + +export function transformStreamDefaultControllerError<I, O>( + controller: TransformStreamDefaultControllerImpl<I, O>, + e: any +): void { + transformStreamError(controller[sym.controlledTransformStream], e); +} + +function transformStreamDefaultControllerPerformTransform<I, O>( + controller: TransformStreamDefaultControllerImpl<I, O>, + chunk: I +): Promise<void> { + const transformPromise = controller[sym.transformAlgorithm](chunk); + return transformPromise.then(undefined, (r) => { + transformStreamError(controller[sym.controlledTransformStream], r); + throw r; + }); +} + +function transformStreamDefaultSinkAbortAlgorithm<I, O>( + stream: TransformStreamImpl<I, O>, + reason: any +): Promise<void> { + transformStreamError(stream, reason); + return Promise.resolve(undefined); +} + +function transformStreamDefaultSinkCloseAlgorithm<I, O>( + stream: TransformStreamImpl<I, O> +): Promise<void> { + const readable = stream[sym.readable]; + const controller = stream[sym.transformStreamController]; + const flushPromise = controller[sym.flushAlgorithm](); + transformStreamDefaultControllerClearAlgorithms(controller); + return flushPromise.then( + () => { + if (readable[sym.state] === "errored") { + throw readable[sym.storedError]; + } + const readableController = readable[ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl<O>; + if ( + readableStreamDefaultControllerCanCloseOrEnqueue(readableController) + ) { + readableStreamDefaultControllerClose(readableController); + } + }, + (r) => { + transformStreamError(stream, r); + throw readable[sym.storedError]; + } + ); +} + +function transformStreamDefaultSinkWriteAlgorithm<I, O>( + stream: TransformStreamImpl<I, O>, + chunk: I +): Promise<void> { + assert(stream[sym.writable][sym.state] === "writable"); + const controller = stream[sym.transformStreamController]; + if (stream[sym.backpressure]) { + const backpressureChangePromise = stream[sym.backpressureChangePromise]; + assert(backpressureChangePromise); + return backpressureChangePromise.promise.then(() => { + const writable = stream[sym.writable]; + const state = writable[sym.state]; + if (state === "erroring") { + throw writable[sym.storedError]; + } + assert(state === "writable"); + return transformStreamDefaultControllerPerformTransform( + controller, + chunk + ); + }); + } + return transformStreamDefaultControllerPerformTransform(controller, chunk); +} + +function transformStreamDefaultSourcePullAlgorithm<I, O>( + stream: TransformStreamImpl<I, O> +): Promise<void> { + assert(stream[sym.backpressure] === true); + assert(stream[sym.backpressureChangePromise] !== undefined); + transformStreamSetBackpressure(stream, false); + return stream[sym.backpressureChangePromise]!.promise; +} + +function transformStreamError<I, O>( + stream: TransformStreamImpl<I, O>, + e: any +): void { + readableStreamDefaultControllerError( + stream[sym.readable][ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl<O>, + e + ); + transformStreamErrorWritableAndUnblockWrite(stream, e); +} + +export function transformStreamDefaultControllerTerminate<I, O>( + controller: TransformStreamDefaultControllerImpl<I, O> +): void { + const stream = controller[sym.controlledTransformStream]; + const readableController = stream[sym.readable][ + sym.readableStreamController + ] as ReadableStreamDefaultControllerImpl<O>; + readableStreamDefaultControllerClose(readableController); + const error = new TypeError("TransformStream is closed."); + transformStreamErrorWritableAndUnblockWrite(stream, error); +} + +function transformStreamErrorWritableAndUnblockWrite<I, O>( + stream: TransformStreamImpl<I, O>, + e: any +): void { + transformStreamDefaultControllerClearAlgorithms( + stream[sym.transformStreamController] + ); + writableStreamDefaultControllerErrorIfNeeded( + stream[sym.writable][sym.writableStreamController]!, + e + ); + if (stream[sym.backpressure]) { + transformStreamSetBackpressure(stream, false); + } +} + +function transformStreamSetBackpressure<I, O>( + stream: TransformStreamImpl<I, O>, + backpressure: boolean +): void { + assert(stream[sym.backpressure] !== backpressure); + if (stream[sym.backpressureChangePromise] !== undefined) { + stream[sym.backpressureChangePromise]!.resolve!(undefined); + } + stream[sym.backpressureChangePromise] = getDeferred<void>(); + stream[sym.backpressure] = backpressure; +} + function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer { assert(!isDetachedBuffer(buffer)); const transferredIshVersion = buffer.slice(0); |