summaryrefslogtreecommitdiff
path: root/cli/js/streams/transform-internals.ts
diff options
context:
space:
mode:
Diffstat (limited to 'cli/js/streams/transform-internals.ts')
-rw-r--r--cli/js/streams/transform-internals.ts371
1 files changed, 371 insertions, 0 deletions
diff --git a/cli/js/streams/transform-internals.ts b/cli/js/streams/transform-internals.ts
new file mode 100644
index 000000000..4c5e3657d
--- /dev/null
+++ b/cli/js/streams/transform-internals.ts
@@ -0,0 +1,371 @@
+// TODO reenable this code when we enable writableStreams and transport types
+// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+// /**
+// * streams/transform-internals - internal types and functions for transform streams
+// * Part of Stardazed
+// * (c) 2018-Present by Arthur Langereis - @zenmumbler
+// * https://github.com/stardazed/sd-streams
+// */
+
+// /* eslint-disable @typescript-eslint/no-explicit-any */
+// // TODO reenable this lint here
+
+// import * as rs from "./readable-internals.ts";
+// import * as ws from "./writable-internals.ts";
+// import * as shared from "./shared-internals.ts";
+
+// import { createReadableStream } from "./readable-stream.ts";
+// import { createWritableStream } from "./writable-stream.ts";
+
+// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.ts";
+
+// export const state_ = Symbol("transformState_");
+// export const backpressure_ = Symbol("backpressure_");
+// export const backpressureChangePromise_ = Symbol("backpressureChangePromise_");
+// export const readable_ = Symbol("readable_");
+// export const transformStreamController_ = Symbol("transformStreamController_");
+// export const writable_ = Symbol("writable_");
+
+// export const controlledTransformStream_ = Symbol("controlledTransformStream_");
+// export const flushAlgorithm_ = Symbol("flushAlgorithm_");
+// export const transformAlgorithm_ = Symbol("transformAlgorithm_");
+
+// // ----
+
+// export type TransformFunction<InputType, OutputType> = (
+// chunk: InputType,
+// controller: TransformStreamDefaultController<InputType, OutputType>
+// ) => void | PromiseLike<void>;
+// export type TransformAlgorithm<InputType> = (chunk: InputType) => Promise<void>;
+// export type FlushFunction<InputType, OutputType> = (
+// controller: TransformStreamDefaultController<InputType, OutputType>
+// ) => void | PromiseLike<void>;
+// export type FlushAlgorithm = () => Promise<void>;
+
+// // ----
+
+// export interface TransformStreamDefaultController<InputType, OutputType> {
+// readonly desiredSize: number | null;
+// enqueue(chunk: OutputType): void;
+// error(reason: shared.ErrorResult): void;
+// terminate(): void;
+
+// [controlledTransformStream_]: TransformStream<InputType, OutputType>; // The TransformStream instance controlled; also used for the IsTransformStreamDefaultController brand check
+// [flushAlgorithm_]: FlushAlgorithm; // A promise - returning algorithm which communicates a requested close to the transformer
+// [transformAlgorithm_]: TransformAlgorithm<InputType>; // A promise - returning algorithm, taking one argument(the chunk to transform), which requests the transformer perform its transformation
+// }
+
+// export interface Transformer<InputType, OutputType> {
+// start?(
+// controller: TransformStreamDefaultController<InputType, OutputType>
+// ): void | PromiseLike<void>;
+// transform?: TransformFunction<InputType, OutputType>;
+// flush?: FlushFunction<InputType, OutputType>;
+
+// readableType?: undefined; // for future spec changes
+// writableType?: undefined; // for future spec changes
+// }
+
+// export declare class TransformStream<InputType, OutputType> {
+// constructor(
+// transformer: Transformer<InputType, OutputType>,
+// writableStrategy: QueuingStrategy<InputType>,
+// readableStrategy: QueuingStrategy<OutputType>
+// );
+
+// readonly readable: rs.SDReadableStream<OutputType>;
+// readonly writable: ws.WritableStream<InputType>;
+
+// [backpressure_]: boolean | undefined; // Whether there was backpressure on [[readable]] the last time it was observed
+// [backpressureChangePromise_]: shared.ControlledPromise<void> | undefined; // A promise which is fulfilled and replaced every time the value of[[backpressure]] changes
+// [readable_]: rs.SDReadableStream<OutputType>; // The ReadableStream instance controlled by this object
+// [transformStreamController_]: TransformStreamDefaultController<
+// InputType,
+// OutputType
+// >; // A TransformStreamDefaultController created with the ability to control[[readable]] and[[writable]]; also used for the IsTransformStream brand check
+// [writable_]: ws.WritableStream<InputType>; // The WritableStream instance controlled by this object
+// }
+
+// // ---- TransformStream
+
+// export function isTransformStream(
+// value: unknown
+// ): value is TransformStream<any, any> {
+// if (typeof value !== "object" || value === null) {
+// return false;
+// }
+// return transformStreamController_ in value;
+// }
+
+// export function initializeTransformStream<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// startPromise: Promise<void>,
+// writableHighWaterMark: number,
+// writableSizeAlgorithm: QueuingStrategySizeCallback<InputType>,
+// readableHighWaterMark: number,
+// readableSizeAlgorithm: QueuingStrategySizeCallback<OutputType>
+// ): void {
+// const startAlgorithm = function(): Promise<void> {
+// return startPromise;
+// };
+// const writeAlgorithm = function(chunk: InputType): Promise<void> {
+// return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
+// };
+// const abortAlgorithm = function(reason: shared.ErrorResult): Promise<void> {
+// return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
+// };
+// const closeAlgorithm = function(): Promise<void> {
+// return transformStreamDefaultSinkCloseAlgorithm(stream);
+// };
+// stream[writable_] = createWritableStream<InputType>(
+// startAlgorithm,
+// writeAlgorithm,
+// closeAlgorithm,
+// abortAlgorithm,
+// writableHighWaterMark,
+// writableSizeAlgorithm
+// );
+
+// const pullAlgorithm = function(): Promise<void> {
+// return transformStreamDefaultSourcePullAlgorithm(stream);
+// };
+// const cancelAlgorithm = function(
+// reason: shared.ErrorResult
+// ): Promise<undefined> {
+// transformStreamErrorWritableAndUnblockWrite(stream, reason);
+// return Promise.resolve(undefined);
+// };
+// stream[readable_] = createReadableStream(
+// startAlgorithm,
+// pullAlgorithm,
+// cancelAlgorithm,
+// readableHighWaterMark,
+// readableSizeAlgorithm
+// );
+
+// stream[backpressure_] = undefined;
+// stream[backpressureChangePromise_] = undefined;
+// transformStreamSetBackpressure(stream, true);
+// stream[transformStreamController_] = undefined!; // initialize slot for brand-check
+// }
+
+// export function transformStreamError<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// error: shared.ErrorResult
+// ): void {
+// rs.readableStreamDefaultControllerError(
+// stream[readable_][
+// rs.readableStreamController_
+// ] as rs.SDReadableStreamDefaultController<OutputType>,
+// error
+// );
+// transformStreamErrorWritableAndUnblockWrite(stream, error);
+// }
+
+// export function transformStreamErrorWritableAndUnblockWrite<
+// InputType,
+// OutputType
+// >(
+// stream: TransformStream<InputType, OutputType>,
+// error: shared.ErrorResult
+// ): void {
+// transformStreamDefaultControllerClearAlgorithms(
+// stream[transformStreamController_]
+// );
+// ws.writableStreamDefaultControllerErrorIfNeeded(
+// stream[writable_][ws.writableStreamController_]!,
+// error
+// );
+// if (stream[backpressure_]) {
+// transformStreamSetBackpressure(stream, false);
+// }
+// }
+
+// export function transformStreamSetBackpressure<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// backpressure: boolean
+// ): void {
+// // Assert: stream.[[backpressure]] is not backpressure.
+// if (stream[backpressure_] !== undefined) {
+// stream[backpressureChangePromise_]!.resolve(undefined);
+// }
+// stream[backpressureChangePromise_] = shared.createControlledPromise<void>();
+// stream[backpressure_] = backpressure;
+// }
+
+// // ---- TransformStreamDefaultController
+
+// export function isTransformStreamDefaultController(
+// value: unknown
+// ): value is TransformStreamDefaultController<any, any> {
+// if (typeof value !== "object" || value === null) {
+// return false;
+// }
+// return controlledTransformStream_ in value;
+// }
+
+// export function setUpTransformStreamDefaultController<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// controller: TransformStreamDefaultController<InputType, OutputType>,
+// transformAlgorithm: TransformAlgorithm<InputType>,
+// flushAlgorithm: FlushAlgorithm
+// ): void {
+// // Assert: ! IsTransformStream(stream) is true.
+// // Assert: stream.[[transformStreamController]] is undefined.
+// controller[controlledTransformStream_] = stream;
+// stream[transformStreamController_] = controller;
+// controller[transformAlgorithm_] = transformAlgorithm;
+// controller[flushAlgorithm_] = flushAlgorithm;
+// }
+
+// export function transformStreamDefaultControllerClearAlgorithms<
+// InputType,
+// OutputType
+// >(controller: TransformStreamDefaultController<InputType, OutputType>): void {
+// // Use ! assertions to override type check here, this way we don't
+// // have to perform type checks/assertions everywhere else.
+// controller[transformAlgorithm_] = undefined!;
+// controller[flushAlgorithm_] = undefined!;
+// }
+
+// export function transformStreamDefaultControllerEnqueue<InputType, OutputType>(
+// controller: TransformStreamDefaultController<InputType, OutputType>,
+// chunk: OutputType
+// ): void {
+// const stream = controller[controlledTransformStream_];
+// const readableController = stream[readable_][
+// rs.readableStreamController_
+// ] as rs.SDReadableStreamDefaultController<OutputType>;
+// if (
+// !rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
+// ) {
+// throw new TypeError();
+// }
+// try {
+// rs.readableStreamDefaultControllerEnqueue(readableController, chunk);
+// } catch (error) {
+// transformStreamErrorWritableAndUnblockWrite(stream, error);
+// throw stream[readable_][shared.storedError_];
+// }
+// const backpressure = rs.readableStreamDefaultControllerHasBackpressure(
+// readableController
+// );
+// if (backpressure !== stream[backpressure_]) {
+// // Assert: backpressure is true.
+// transformStreamSetBackpressure(stream, true);
+// }
+// }
+
+// export function transformStreamDefaultControllerError<InputType, OutputType>(
+// controller: TransformStreamDefaultController<InputType, OutputType>,
+// error: shared.ErrorResult
+// ): void {
+// transformStreamError(controller[controlledTransformStream_], error);
+// }
+
+// export function transformStreamDefaultControllerPerformTransform<
+// InputType,
+// OutputType
+// >(
+// controller: TransformStreamDefaultController<InputType, OutputType>,
+// chunk: InputType
+// ): Promise<void> {
+// const transformPromise = controller[transformAlgorithm_](chunk);
+// return transformPromise.catch(error => {
+// transformStreamError(controller[controlledTransformStream_], error);
+// throw error;
+// });
+// }
+
+// export function transformStreamDefaultControllerTerminate<
+// InputType,
+// OutputType
+// >(controller: TransformStreamDefaultController<InputType, OutputType>): void {
+// const stream = controller[controlledTransformStream_];
+// const readableController = stream[readable_][
+// rs.readableStreamController_
+// ] as rs.SDReadableStreamDefaultController<OutputType>;
+// if (rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
+// rs.readableStreamDefaultControllerClose(readableController);
+// }
+// const error = new TypeError("The transform stream has been terminated");
+// transformStreamErrorWritableAndUnblockWrite(stream, error);
+// }
+
+// // ---- Transform Sinks
+
+// export function transformStreamDefaultSinkWriteAlgorithm<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// chunk: InputType
+// ): Promise<void> {
+// // Assert: stream.[[writable]].[[state]] is "writable".
+// const controller = stream[transformStreamController_];
+// if (stream[backpressure_]) {
+// const backpressureChangePromise = stream[backpressureChangePromise_]!;
+// // Assert: backpressureChangePromise is not undefined.
+// return backpressureChangePromise.promise.then(_ => {
+// const writable = stream[writable_];
+// const state = writable[shared.state_];
+// if (state === "erroring") {
+// throw writable[shared.storedError_];
+// }
+// // Assert: state is "writable".
+// return transformStreamDefaultControllerPerformTransform(
+// controller,
+// chunk
+// );
+// });
+// }
+// return transformStreamDefaultControllerPerformTransform(controller, chunk);
+// }
+
+// export function transformStreamDefaultSinkAbortAlgorithm<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// reason: shared.ErrorResult
+// ): Promise<void> {
+// transformStreamError(stream, reason);
+// return Promise.resolve(undefined);
+// }
+
+// export function transformStreamDefaultSinkCloseAlgorithm<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>
+// ): Promise<void> {
+// const readable = stream[readable_];
+// const controller = stream[transformStreamController_];
+// const flushPromise = controller[flushAlgorithm_]();
+// transformStreamDefaultControllerClearAlgorithms(controller);
+
+// return flushPromise.then(
+// _ => {
+// if (readable[shared.state_] === "errored") {
+// throw readable[shared.storedError_];
+// }
+// const readableController = readable[
+// rs.readableStreamController_
+// ] as rs.SDReadableStreamDefaultController<OutputType>;
+// if (
+// rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
+// ) {
+// rs.readableStreamDefaultControllerClose(readableController);
+// }
+// },
+// error => {
+// transformStreamError(stream, error);
+// throw readable[shared.storedError_];
+// }
+// );
+// }
+
+// // ---- Transform Sources
+
+// export function transformStreamDefaultSourcePullAlgorithm<
+// InputType,
+// OutputType
+// >(stream: TransformStream<InputType, OutputType>): Promise<void> {
+// // Assert: stream.[[backpressure]] is true.
+// // Assert: stream.[[backpressureChangePromise]] is not undefined.
+// transformStreamSetBackpressure(stream, false);
+// return stream[backpressureChangePromise_]!.promise;
+// }