diff options
Diffstat (limited to 'cli/js/streams')
20 files changed, 0 insertions, 4811 deletions
diff --git a/cli/js/streams/mod.ts b/cli/js/streams/mod.ts deleted file mode 100644 index 5389aaf6d..000000000 --- a/cli/js/streams/mod.ts +++ /dev/null @@ -1,20 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * @stardazed/streams - implementation of the web streams standard - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -export { SDReadableStream as ReadableStream } from "./readable-stream.ts"; -/* TODO The following are currently unused so not exported for clarity. -export { WritableStream } from "./writable-stream.ts"; - -export { TransformStream } from "./transform-stream.ts"; -export { - ByteLengthQueuingStrategy, - CountQueuingStrategy -} from "./strategies.ts"; -*/ diff --git a/cli/js/streams/pipe-to.ts b/cli/js/streams/pipe-to.ts deleted file mode 100644 index 1d5579217..000000000 --- a/cli/js/streams/pipe-to.ts +++ /dev/null @@ -1,237 +0,0 @@ -// TODO reenable this code when we enable writableStreams and transport types -// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -// /** -// * streams/pipe-to - pipeTo algorithm implementation -// * Part of Stardazed -// * (c) 2018-Present by Arthur Langereis - @zenmumbler -// * https://github.com/stardazed/sd-streams -// */ - -// /* eslint-disable @typescript-eslint/no-explicit-any */ -// // TODO reenable this lint here - -// import * as rs from "./readable-internals.ts"; -// import * as ws from "./writable-internals.ts"; -// import * as shared from "./shared-internals.ts"; - -// import { ReadableStreamDefaultReader } from "./readable-stream-default-reader.ts"; -// import { WritableStreamDefaultWriter } from "./writable-stream-default-writer.ts"; -// import { PipeOptions } from "../dom_types.ts"; -// import { Err } from "../errors.ts"; - -// // add a wrapper to handle falsy rejections -// interface ErrorWrapper { -// actualError: shared.ErrorResult; -// } - -// export function pipeTo<ChunkType>( -// source: rs.SDReadableStream<ChunkType>, -// dest: ws.WritableStream<ChunkType>, -// options: PipeOptions -// ): Promise<void> { -// const preventClose = !!options.preventClose; -// const preventAbort = !!options.preventAbort; -// const preventCancel = !!options.preventCancel; -// const signal = options.signal; - -// let shuttingDown = false; -// let latestWrite = Promise.resolve(); -// const promise = shared.createControlledPromise<void>(); - -// // If IsReadableByteStreamController(this.[[readableStreamController]]) is true, let reader be either ! AcquireReadableStreamBYOBReader(this) or ! AcquireReadableStreamDefaultReader(this), at the user agent’s discretion. -// // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(this). -// const reader = new ReadableStreamDefaultReader(source); -// const writer = new WritableStreamDefaultWriter(dest); - -// let abortAlgorithm: () => any; -// if (signal !== undefined) { -// abortAlgorithm = (): void => { -// // TODO this should be a DOMException, -// // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/pipe-to.ts#L38 -// const error = new errors.Aborted("Aborted"); -// const actions: Array<() => Promise<void>> = []; -// if (preventAbort === false) { -// actions.push(() => { -// if (dest[shared.state_] === "writable") { -// return ws.writableStreamAbort(dest, error); -// } -// return Promise.resolve(); -// }); -// } -// if (preventCancel === false) { -// actions.push(() => { -// if (source[shared.state_] === "readable") { -// return rs.readableStreamCancel(source, error); -// } -// return Promise.resolve(); -// }); -// } -// shutDown( -// () => { -// return Promise.all(actions.map(a => a())).then(_ => undefined); -// }, -// { actualError: error } -// ); -// }; - -// if (signal.aborted === true) { -// abortAlgorithm(); -// } else { -// signal.addEventListener("abort", abortAlgorithm); -// } -// } - -// function onStreamErrored( -// stream: rs.SDReadableStream<ChunkType> | ws.WritableStream<ChunkType>, -// promise: Promise<void>, -// action: (error: shared.ErrorResult) => void -// ): void { -// if (stream[shared.state_] === "errored") { -// action(stream[shared.storedError_]); -// } else { -// promise.catch(action); -// } -// } - -// function onStreamClosed( -// stream: rs.SDReadableStream<ChunkType> | ws.WritableStream<ChunkType>, -// promise: Promise<void>, -// action: () => void -// ): void { -// if (stream[shared.state_] === "closed") { -// action(); -// } else { -// promise.then(action); -// } -// } - -// onStreamErrored(source, reader[rs.closedPromise_].promise, error => { -// if (!preventAbort) { -// shutDown(() => ws.writableStreamAbort(dest, error), { -// actualError: error -// }); -// } else { -// shutDown(undefined, { actualError: error }); -// } -// }); - -// onStreamErrored(dest, writer[ws.closedPromise_].promise, error => { -// if (!preventCancel) { -// shutDown(() => rs.readableStreamCancel(source, error), { -// actualError: error -// }); -// } else { -// shutDown(undefined, { actualError: error }); -// } -// }); - -// onStreamClosed(source, reader[rs.closedPromise_].promise, () => { -// if (!preventClose) { -// shutDown(() => -// ws.writableStreamDefaultWriterCloseWithErrorPropagation(writer) -// ); -// } else { -// shutDown(); -// } -// }); - -// if ( -// ws.writableStreamCloseQueuedOrInFlight(dest) || -// dest[shared.state_] === "closed" -// ) { -// // Assert: no chunks have been read or written. -// const destClosed = new TypeError(); -// if (!preventCancel) { -// shutDown(() => rs.readableStreamCancel(source, destClosed), { -// actualError: destClosed -// }); -// } else { -// shutDown(undefined, { actualError: destClosed }); -// } -// } - -// function awaitLatestWrite(): Promise<void> { -// const curLatestWrite = latestWrite; -// return latestWrite.then(() => -// curLatestWrite === latestWrite ? undefined : awaitLatestWrite() -// ); -// } - -// function flushRemainder(): Promise<void> | undefined { -// if ( -// dest[shared.state_] === "writable" && -// !ws.writableStreamCloseQueuedOrInFlight(dest) -// ) { -// return awaitLatestWrite(); -// } else { -// return undefined; -// } -// } - -// function shutDown(action?: () => Promise<void>, error?: ErrorWrapper): void { -// if (shuttingDown) { -// return; -// } -// shuttingDown = true; - -// if (action === undefined) { -// action = (): Promise<void> => Promise.resolve(); -// } - -// function finishShutDown(): void { -// action!().then( -// _ => finalize(error), -// newError => finalize({ actualError: newError }) -// ); -// } - -// const flushWait = flushRemainder(); -// if (flushWait) { -// flushWait.then(finishShutDown); -// } else { -// finishShutDown(); -// } -// } - -// function finalize(error?: ErrorWrapper): void { -// ws.writableStreamDefaultWriterRelease(writer); -// rs.readableStreamReaderGenericRelease(reader); -// if (signal && abortAlgorithm) { -// signal.removeEventListener("abort", abortAlgorithm); -// } -// if (error) { -// promise.reject(error.actualError); -// } else { -// promise.resolve(undefined); -// } -// } - -// function next(): Promise<void> | undefined { -// if (shuttingDown) { -// return; -// } - -// writer[ws.readyPromise_].promise.then(() => { -// rs.readableStreamDefaultReaderRead(reader).then( -// ({ value, done }) => { -// if (done) { -// return; -// } -// latestWrite = ws -// .writableStreamDefaultWriterWrite(writer, value!) -// .catch(() => {}); -// next(); -// }, -// _error => { -// latestWrite = Promise.resolve(); -// } -// ); -// }); -// } - -// next(); - -// return promise.promise; -// } diff --git a/cli/js/streams/queue-mixin.ts b/cli/js/streams/queue-mixin.ts deleted file mode 100644 index 23c57d75f..000000000 --- a/cli/js/streams/queue-mixin.ts +++ /dev/null @@ -1,84 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/queue-mixin - internal queue operations for stream controllers - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -/* eslint-disable @typescript-eslint/no-explicit-any */ -// TODO reenable this lint here - -import { Queue, QueueImpl } from "./queue.ts"; -import { isFiniteNonNegativeNumber } from "./shared-internals.ts"; - -export const queue_ = Symbol("queue_"); -export const queueTotalSize_ = Symbol("queueTotalSize_"); - -export interface QueueElement<V> { - value: V; - size: number; -} - -export interface QueueContainer<V> { - [queue_]: Queue<QueueElement<V>>; - [queueTotalSize_]: number; -} - -export interface ByteQueueContainer { - [queue_]: Queue<{ - buffer: ArrayBufferLike; - byteOffset: number; - byteLength: number; - }>; - [queueTotalSize_]: number; -} - -export function dequeueValue<V>(container: QueueContainer<V>): V { - // Assert: container has[[queue]] and[[queueTotalSize]] internal slots. - // Assert: container.[[queue]] is not empty. - const pair = container[queue_].shift()!; - const newTotalSize = container[queueTotalSize_] - pair.size; - container[queueTotalSize_] = Math.max(0, newTotalSize); // < 0 can occur due to rounding errors. - return pair.value; -} - -export function enqueueValueWithSize<V>( - container: QueueContainer<V>, - value: V, - size: number -): void { - // Assert: container has[[queue]] and[[queueTotalSize]] internal slots. - if (!isFiniteNonNegativeNumber(size)) { - throw new RangeError("Chunk size must be a non-negative, finite numbers"); - } - container[queue_].push({ value, size }); - container[queueTotalSize_] += size; -} - -export function peekQueueValue<V>(container: QueueContainer<V>): V { - // Assert: container has[[queue]] and[[queueTotalSize]] internal slots. - // Assert: container.[[queue]] is not empty. - return container[queue_].front()!.value; -} - -export function resetQueue<V>( - container: ByteQueueContainer | QueueContainer<V> -): void { - // Chrome (as of v67) has a steep performance cliff with large arrays - // and shift(), around about 50k elements. While this is an unusual case - // we use a simple wrapper around shift and push that is chunked to - // avoid this pitfall. - // @see: https://github.com/stardazed/sd-streams/issues/1 - container[queue_] = new QueueImpl<any>(); - - // The code below can be used as a plain array implementation of the - // Queue interface. - // const q = [] as any; - // q.front = function() { return this[0]; }; - // container[queue_] = q; - - container[queueTotalSize_] = 0; -} diff --git a/cli/js/streams/queue.ts b/cli/js/streams/queue.ts deleted file mode 100644 index 264851baf..000000000 --- a/cli/js/streams/queue.ts +++ /dev/null @@ -1,65 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/queue - simple queue type with chunked array backing - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -const CHUNK_SIZE = 16384; - -export interface Queue<T> { - push(t: T): void; - shift(): T | undefined; - front(): T | undefined; - readonly length: number; -} - -export class QueueImpl<T> implements Queue<T> { - private readonly chunks_: T[][]; - private readChunk_: T[]; - private writeChunk_: T[]; - private length_: number; - - constructor() { - this.chunks_ = [[]]; - this.readChunk_ = this.writeChunk_ = this.chunks_[0]; - this.length_ = 0; - } - - push(t: T): void { - this.writeChunk_.push(t); - this.length_ += 1; - if (this.writeChunk_.length === CHUNK_SIZE) { - this.writeChunk_ = []; - this.chunks_.push(this.writeChunk_); - } - } - - front(): T | undefined { - if (this.length_ === 0) { - return undefined; - } - return this.readChunk_[0]; - } - - shift(): T | undefined { - if (this.length_ === 0) { - return undefined; - } - const t = this.readChunk_.shift(); - - this.length_ -= 1; - if (this.readChunk_.length === 0 && this.readChunk_ !== this.writeChunk_) { - this.chunks_.shift(); - this.readChunk_ = this.chunks_[0]; - } - return t; - } - - get length(): number { - return this.length_; - } -} diff --git a/cli/js/streams/readable-byte-stream-controller.ts b/cli/js/streams/readable-byte-stream-controller.ts deleted file mode 100644 index 86efd416c..000000000 --- a/cli/js/streams/readable-byte-stream-controller.ts +++ /dev/null @@ -1,214 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/readable-byte-stream-controller - ReadableByteStreamController class implementation - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -/* eslint-disable @typescript-eslint/no-explicit-any */ -// TODO reenable this lint here - -import * as rs from "./readable-internals.ts"; -import * as q from "./queue-mixin.ts"; -import * as shared from "./shared-internals.ts"; -import { ReadableStreamBYOBRequest } from "./readable-stream-byob-request.ts"; -import { Queue } from "./queue.ts"; -import { UnderlyingByteSource } from "../dom_types.ts"; - -export class ReadableByteStreamController - implements rs.SDReadableByteStreamController { - [rs.autoAllocateChunkSize_]: number | undefined; - [rs.byobRequest_]: rs.SDReadableStreamBYOBRequest | undefined; - [rs.cancelAlgorithm_]: rs.CancelAlgorithm; - [rs.closeRequested_]: boolean; - [rs.controlledReadableByteStream_]: rs.SDReadableStream<ArrayBufferView>; - [rs.pullAgain_]: boolean; - [rs.pullAlgorithm_]: rs.PullAlgorithm<ArrayBufferView>; - [rs.pulling_]: boolean; - [rs.pendingPullIntos_]: rs.PullIntoDescriptor[]; - [rs.started_]: boolean; - [rs.strategyHWM_]: number; - - [q.queue_]: Queue<{ - buffer: ArrayBufferLike; - byteOffset: number; - byteLength: number; - }>; - [q.queueTotalSize_]: number; - - constructor() { - throw new TypeError(); - } - - get byobRequest(): rs.SDReadableStreamBYOBRequest | undefined { - if (!rs.isReadableByteStreamController(this)) { - throw new TypeError(); - } - if ( - this[rs.byobRequest_] === undefined && - this[rs.pendingPullIntos_].length > 0 - ) { - const firstDescriptor = this[rs.pendingPullIntos_][0]; - const view = new Uint8Array( - firstDescriptor.buffer, - firstDescriptor.byteOffset + firstDescriptor.bytesFilled, - firstDescriptor.byteLength - firstDescriptor.bytesFilled - ); - const byobRequest = Object.create( - ReadableStreamBYOBRequest.prototype - ) as ReadableStreamBYOBRequest; - rs.setUpReadableStreamBYOBRequest(byobRequest, this, view); - this[rs.byobRequest_] = byobRequest; - } - return this[rs.byobRequest_]; - } - - get desiredSize(): number | null { - if (!rs.isReadableByteStreamController(this)) { - throw new TypeError(); - } - return rs.readableByteStreamControllerGetDesiredSize(this); - } - - close(): void { - if (!rs.isReadableByteStreamController(this)) { - throw new TypeError(); - } - if (this[rs.closeRequested_]) { - throw new TypeError("Stream is already closing"); - } - if (this[rs.controlledReadableByteStream_][shared.state_] !== "readable") { - throw new TypeError("Stream is closed or errored"); - } - rs.readableByteStreamControllerClose(this); - } - - enqueue(chunk: ArrayBufferView): void { - if (!rs.isReadableByteStreamController(this)) { - throw new TypeError(); - } - if (this[rs.closeRequested_]) { - throw new TypeError("Stream is already closing"); - } - if (this[rs.controlledReadableByteStream_][shared.state_] !== "readable") { - throw new TypeError("Stream is closed or errored"); - } - if (!ArrayBuffer.isView(chunk)) { - throw new TypeError("chunk must be a valid ArrayBufferView"); - } - // If ! IsDetachedBuffer(chunk.[[ViewedArrayBuffer]]) is true, throw a TypeError exception. - return rs.readableByteStreamControllerEnqueue(this, chunk); - } - - error(error?: shared.ErrorResult): void { - if (!rs.isReadableByteStreamController(this)) { - throw new TypeError(); - } - rs.readableByteStreamControllerError(this, error); - } - - [rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> { - if (this[rs.pendingPullIntos_].length > 0) { - const firstDescriptor = this[rs.pendingPullIntos_][0]; - firstDescriptor.bytesFilled = 0; - } - q.resetQueue(this); - const result = this[rs.cancelAlgorithm_](reason); - rs.readableByteStreamControllerClearAlgorithms(this); - return result; - } - - [rs.pullSteps_]( - forAuthorCode: boolean - ): Promise<IteratorResult<ArrayBufferView, any>> { - const stream = this[rs.controlledReadableByteStream_]; - // Assert: ! ReadableStreamHasDefaultReader(stream) is true. - if (this[q.queueTotalSize_] > 0) { - // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0. - const entry = this[q.queue_].shift()!; - this[q.queueTotalSize_] -= entry.byteLength; - rs.readableByteStreamControllerHandleQueueDrain(this); - const view = new Uint8Array( - entry.buffer, - entry.byteOffset, - entry.byteLength - ); - return Promise.resolve( - rs.readableStreamCreateReadResult(view, false, forAuthorCode) - ); - } - const autoAllocateChunkSize = this[rs.autoAllocateChunkSize_]; - if (autoAllocateChunkSize !== undefined) { - let buffer: ArrayBuffer; - try { - buffer = new ArrayBuffer(autoAllocateChunkSize); - } catch (error) { - return Promise.reject(error); - } - const pullIntoDescriptor: rs.PullIntoDescriptor = { - buffer, - byteOffset: 0, - byteLength: autoAllocateChunkSize, - bytesFilled: 0, - elementSize: 1, - ctor: Uint8Array, - readerType: "default" - }; - this[rs.pendingPullIntos_].push(pullIntoDescriptor); - } - - const promise = rs.readableStreamAddReadRequest(stream, forAuthorCode); - rs.readableByteStreamControllerCallPullIfNeeded(this); - return promise; - } -} - -export function setUpReadableByteStreamControllerFromUnderlyingSource( - stream: rs.SDReadableStream<ArrayBufferView>, - underlyingByteSource: UnderlyingByteSource, - highWaterMark: number -): void { - // Assert: underlyingByteSource is not undefined. - const controller = Object.create( - ReadableByteStreamController.prototype - ) as ReadableByteStreamController; - - const startAlgorithm = (): any => { - return shared.invokeOrNoop(underlyingByteSource, "start", [controller]); - }; - const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod( - underlyingByteSource, - "pull", - [controller] - ); - const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod( - underlyingByteSource, - "cancel", - [] - ); - - let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize; - if (autoAllocateChunkSize !== undefined) { - autoAllocateChunkSize = Number(autoAllocateChunkSize); - if ( - !shared.isInteger(autoAllocateChunkSize) || - autoAllocateChunkSize <= 0 - ) { - throw new RangeError( - "autoAllocateChunkSize must be a positive, finite integer" - ); - } - } - rs.setUpReadableByteStreamController( - stream, - controller, - startAlgorithm, - pullAlgorithm, - cancelAlgorithm, - highWaterMark, - autoAllocateChunkSize - ); -} diff --git a/cli/js/streams/readable-internals.ts b/cli/js/streams/readable-internals.ts deleted file mode 100644 index 67f5a69b1..000000000 --- a/cli/js/streams/readable-internals.ts +++ /dev/null @@ -1,1357 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/readable-internals - internal types and functions for readable streams - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -/* eslint-disable @typescript-eslint/no-explicit-any */ -// TODO reenable this lint here - -import * as shared from "./shared-internals.ts"; -import * as q from "./queue-mixin.ts"; -import { - QueuingStrategy, - QueuingStrategySizeCallback, - UnderlyingSource, - UnderlyingByteSource -} from "../dom_types.ts"; - -// ReadableStreamDefaultController -export const controlledReadableStream_ = Symbol("controlledReadableStream_"); -export const pullAlgorithm_ = Symbol("pullAlgorithm_"); -export const cancelAlgorithm_ = Symbol("cancelAlgorithm_"); -export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_"); -export const strategyHWM_ = Symbol("strategyHWM_"); -export const started_ = Symbol("started_"); -export const closeRequested_ = Symbol("closeRequested_"); -export const pullAgain_ = Symbol("pullAgain_"); -export const pulling_ = Symbol("pulling_"); -export const cancelSteps_ = Symbol("cancelSteps_"); -export const pullSteps_ = Symbol("pullSteps_"); - -// ReadableByteStreamController -export const autoAllocateChunkSize_ = Symbol("autoAllocateChunkSize_"); -export const byobRequest_ = Symbol("byobRequest_"); -export const controlledReadableByteStream_ = Symbol( - "controlledReadableByteStream_" -); -export const pendingPullIntos_ = Symbol("pendingPullIntos_"); - -// ReadableStreamDefaultReader -export const closedPromise_ = Symbol("closedPromise_"); -export const ownerReadableStream_ = Symbol("ownerReadableStream_"); -export const readRequests_ = Symbol("readRequests_"); -export const readIntoRequests_ = Symbol("readIntoRequests_"); - -// ReadableStreamBYOBRequest -export const associatedReadableByteStreamController_ = Symbol( - "associatedReadableByteStreamController_" -); -export const view_ = Symbol("view_"); - -// ReadableStreamBYOBReader - -// ReadableStream -export const reader_ = Symbol("reader_"); -export const readableStreamController_ = Symbol("readableStreamController_"); - -export type StartFunction<OutputType> = ( - controller: SDReadableStreamControllerBase<OutputType> -) => void | PromiseLike<void>; -export type StartAlgorithm = () => Promise<void> | void; -export type PullFunction<OutputType> = ( - controller: SDReadableStreamControllerBase<OutputType> -) => void | PromiseLike<void>; -export type PullAlgorithm<OutputType> = ( - controller: SDReadableStreamControllerBase<OutputType> -) => PromiseLike<void>; -export type CancelAlgorithm = (reason?: shared.ErrorResult) => Promise<void>; - -// ---- - -export interface SDReadableStreamControllerBase<OutputType> { - readonly desiredSize: number | null; - close(): void; - error(e?: shared.ErrorResult): void; - - [cancelSteps_](reason: shared.ErrorResult): Promise<void>; - [pullSteps_](forAuthorCode: boolean): Promise<IteratorResult<OutputType>>; -} - -export interface SDReadableStreamBYOBRequest { - readonly view: ArrayBufferView; - respond(bytesWritten: number): void; - respondWithNewView(view: ArrayBufferView): void; - - [associatedReadableByteStreamController_]: - | SDReadableByteStreamController - | undefined; - [view_]: ArrayBufferView | undefined; -} - -interface ArrayBufferViewCtor { - new ( - buffer: ArrayBufferLike, - byteOffset?: number, - byteLength?: number - ): ArrayBufferView; -} - -export interface PullIntoDescriptor { - readerType: "default" | "byob"; - ctor: ArrayBufferViewCtor; - buffer: ArrayBufferLike; - byteOffset: number; - byteLength: number; - bytesFilled: number; - elementSize: number; -} - -export interface SDReadableByteStreamController - extends SDReadableStreamControllerBase<ArrayBufferView>, - q.ByteQueueContainer { - readonly byobRequest: SDReadableStreamBYOBRequest | undefined; - enqueue(chunk: ArrayBufferView): void; - - [autoAllocateChunkSize_]: number | undefined; // A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise. - [byobRequest_]: SDReadableStreamBYOBRequest | undefined; // A ReadableStreamBYOBRequest instance representing the current BYOB pull request - [cancelAlgorithm_]: CancelAlgorithm; // A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source - [closeRequested_]: boolean; // A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read - [controlledReadableByteStream_]: SDReadableStream<ArrayBufferView>; // The ReadableStream instance controlled - [pullAgain_]: boolean; // A boolean flag set to true if the stream’s mechanisms requested a call to the underlying byte source’s pull() method to pull more data, but the pull could not yet be done since a previous call is still executing - [pullAlgorithm_]: PullAlgorithm<ArrayBufferView>; // A promise-returning algorithm that pulls data from the underlying source - [pulling_]: boolean; // A boolean flag set to true while the underlying byte source’s pull() method is executing and has not yet fulfilled, used to prevent reentrant calls - [pendingPullIntos_]: PullIntoDescriptor[]; // A List of descriptors representing pending BYOB pull requests - [started_]: boolean; // A boolean flag indicating whether the underlying source has finished starting - [strategyHWM_]: number; // A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source -} - -export interface SDReadableStreamDefaultController<OutputType> - extends SDReadableStreamControllerBase<OutputType>, - q.QueueContainer<OutputType> { - enqueue(chunk?: OutputType): void; - - [controlledReadableStream_]: SDReadableStream<OutputType>; - [pullAlgorithm_]: PullAlgorithm<OutputType>; - [cancelAlgorithm_]: CancelAlgorithm; - [strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>; - [strategyHWM_]: number; - - [started_]: boolean; - [closeRequested_]: boolean; - [pullAgain_]: boolean; - [pulling_]: boolean; -} - -// ---- - -export interface SDReadableStreamReader<OutputType> { - readonly closed: Promise<void>; - cancel(reason: shared.ErrorResult): Promise<void>; - releaseLock(): void; - - [ownerReadableStream_]: SDReadableStream<OutputType> | undefined; - [closedPromise_]: shared.ControlledPromise<void>; -} - -export interface ReadRequest<V> extends shared.ControlledPromise<V> { - forAuthorCode: boolean; -} - -export declare class SDReadableStreamDefaultReader<OutputType> - implements SDReadableStreamReader<OutputType> { - constructor(stream: SDReadableStream<OutputType>); - - readonly closed: Promise<void>; - cancel(reason: shared.ErrorResult): Promise<void>; - releaseLock(): void; - read(): Promise<IteratorResult<OutputType | undefined>>; - - [ownerReadableStream_]: SDReadableStream<OutputType> | undefined; - [closedPromise_]: shared.ControlledPromise<void>; - [readRequests_]: Array<ReadRequest<IteratorResult<OutputType>>>; -} - -export declare class SDReadableStreamBYOBReader - implements SDReadableStreamReader<ArrayBufferView> { - constructor(stream: SDReadableStream<ArrayBufferView>); - - readonly closed: Promise<void>; - cancel(reason: shared.ErrorResult): Promise<void>; - releaseLock(): void; - read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>>; - - [ownerReadableStream_]: SDReadableStream<ArrayBufferView> | undefined; - [closedPromise_]: shared.ControlledPromise<void>; - [readIntoRequests_]: Array<ReadRequest<IteratorResult<ArrayBufferView>>>; -} - -/* TODO reenable this when we add WritableStreams and Transforms -export interface GenericTransformStream<InputType, OutputType> { - readable: SDReadableStream<OutputType>; - writable: ws.WritableStream<InputType>; -} -*/ - -export type ReadableStreamState = "readable" | "closed" | "errored"; - -export declare class SDReadableStream<OutputType> { - constructor( - underlyingSource: UnderlyingByteSource, - strategy?: { highWaterMark?: number; size?: undefined } - ); - constructor( - underlyingSource?: UnderlyingSource<OutputType>, - strategy?: QueuingStrategy<OutputType> - ); - - readonly locked: boolean; - cancel(reason?: shared.ErrorResult): Promise<void>; - getReader(): SDReadableStreamReader<OutputType>; - getReader(options: { mode: "byob" }): SDReadableStreamBYOBReader; - tee(): Array<SDReadableStream<OutputType>>; - - /* TODO reenable these methods when we bring in writableStreams and transport types - pipeThrough<ResultType>( - transform: GenericTransformStream<OutputType, ResultType>, - options?: PipeOptions - ): SDReadableStream<ResultType>; - pipeTo( - dest: ws.WritableStream<OutputType>, - options?: PipeOptions - ): Promise<void>; - */ - [shared.state_]: ReadableStreamState; - [shared.storedError_]: shared.ErrorResult; - [reader_]: SDReadableStreamReader<OutputType> | undefined; - [readableStreamController_]: SDReadableStreamControllerBase<OutputType>; -} - -// ---- Stream - -export function initializeReadableStream<OutputType>( - stream: SDReadableStream<OutputType> -): void { - stream[shared.state_] = "readable"; - stream[reader_] = undefined; - stream[shared.storedError_] = undefined; - stream[readableStreamController_] = undefined!; // mark slot as used for brand check -} - -export function isReadableStream( - value: unknown -): value is SDReadableStream<any> { - if (typeof value !== "object" || value === null) { - return false; - } - return readableStreamController_ in value; -} - -export function isReadableStreamLocked<OutputType>( - stream: SDReadableStream<OutputType> -): boolean { - return stream[reader_] !== undefined; -} - -export function readableStreamGetNumReadIntoRequests<OutputType>( - stream: SDReadableStream<OutputType> -): number { - // TODO remove the "as unknown" cast - // This is in to workaround a compiler error - // error TS2352: Conversion of type 'SDReadableStreamReader<OutputType>' to type 'SDReadableStreamBYOBReader' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first. - // Type 'SDReadableStreamReader<OutputType>' is missing the following properties from type 'SDReadableStreamBYOBReader': read, [readIntoRequests_] - const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader; - if (reader === undefined) { - return 0; - } - return reader[readIntoRequests_].length; -} - -export function readableStreamGetNumReadRequests<OutputType>( - stream: SDReadableStream<OutputType> -): number { - const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>; - if (reader === undefined) { - return 0; - } - return reader[readRequests_].length; -} - -export function readableStreamCreateReadResult<T>( - value: T, - done: boolean, - forAuthorCode: boolean -): IteratorResult<T> { - const prototype = forAuthorCode ? Object.prototype : null; - const result = Object.create(prototype); - result.value = value; - result.done = done; - return result; -} - -export function readableStreamAddReadIntoRequest( - stream: SDReadableStream<ArrayBufferView>, - forAuthorCode: boolean -): Promise<IteratorResult<ArrayBufferView, any>> { - // Assert: ! IsReadableStreamBYOBReader(stream.[[reader]]) is true. - // Assert: stream.[[state]] is "readable" or "closed". - const reader = stream[reader_] as SDReadableStreamBYOBReader; - const conProm = shared.createControlledPromise< - IteratorResult<ArrayBufferView> - >() as ReadRequest<IteratorResult<ArrayBufferView>>; - conProm.forAuthorCode = forAuthorCode; - reader[readIntoRequests_].push(conProm); - return conProm.promise; -} - -export function readableStreamAddReadRequest<OutputType>( - stream: SDReadableStream<OutputType>, - forAuthorCode: boolean -): Promise<IteratorResult<OutputType, any>> { - // Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true. - // Assert: stream.[[state]] is "readable". - const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>; - const conProm = shared.createControlledPromise< - IteratorResult<OutputType> - >() as ReadRequest<IteratorResult<OutputType>>; - conProm.forAuthorCode = forAuthorCode; - reader[readRequests_].push(conProm); - return conProm.promise; -} - -export function readableStreamHasBYOBReader<OutputType>( - stream: SDReadableStream<OutputType> -): boolean { - const reader = stream[reader_]; - return isReadableStreamBYOBReader(reader); -} - -export function readableStreamHasDefaultReader<OutputType>( - stream: SDReadableStream<OutputType> -): boolean { - const reader = stream[reader_]; - return isReadableStreamDefaultReader(reader); -} - -export function readableStreamCancel<OutputType>( - stream: SDReadableStream<OutputType>, - reason: shared.ErrorResult -): Promise<undefined> { - if (stream[shared.state_] === "closed") { - return Promise.resolve(undefined); - } - if (stream[shared.state_] === "errored") { - return Promise.reject(stream[shared.storedError_]); - } - readableStreamClose(stream); - - const sourceCancelPromise = stream[readableStreamController_][cancelSteps_]( - reason - ); - return sourceCancelPromise.then(_ => undefined); -} - -export function readableStreamClose<OutputType>( - stream: SDReadableStream<OutputType> -): void { - // Assert: stream.[[state]] is "readable". - stream[shared.state_] = "closed"; - const reader = stream[reader_]; - if (reader === undefined) { - return; - } - - if (isReadableStreamDefaultReader(reader)) { - for (const readRequest of reader[readRequests_]) { - readRequest.resolve( - readableStreamCreateReadResult( - undefined, - true, - readRequest.forAuthorCode - ) - ); - } - reader[readRequests_] = []; - } - reader[closedPromise_].resolve(); - reader[closedPromise_].promise.catch(() => {}); -} - -export function readableStreamError<OutputType>( - stream: SDReadableStream<OutputType>, - error: shared.ErrorResult -): void { - if (stream[shared.state_] !== "readable") { - throw new RangeError("Stream is in an invalid state"); - } - stream[shared.state_] = "errored"; - stream[shared.storedError_] = error; - - const reader = stream[reader_]; - if (reader === undefined) { - return; - } - if (isReadableStreamDefaultReader(reader)) { - for (const readRequest of reader[readRequests_]) { - readRequest.reject(error); - } - reader[readRequests_] = []; - } else { - // Assert: IsReadableStreamBYOBReader(reader). - // TODO remove the "as unknown" cast - const readIntoRequests = ((reader as unknown) as SDReadableStreamBYOBReader)[ - readIntoRequests_ - ]; - for (const readIntoRequest of readIntoRequests) { - readIntoRequest.reject(error); - } - // TODO remove the "as unknown" cast - ((reader as unknown) as SDReadableStreamBYOBReader)[readIntoRequests_] = []; - } - - reader[closedPromise_].reject(error); -} - -// ---- Readers - -export function isReadableStreamDefaultReader( - reader: unknown -): reader is SDReadableStreamDefaultReader<any> { - if (typeof reader !== "object" || reader === null) { - return false; - } - return readRequests_ in reader; -} - -export function isReadableStreamBYOBReader( - reader: unknown -): reader is SDReadableStreamBYOBReader { - if (typeof reader !== "object" || reader === null) { - return false; - } - return readIntoRequests_ in reader; -} - -export function readableStreamReaderGenericInitialize<OutputType>( - reader: SDReadableStreamReader<OutputType>, - stream: SDReadableStream<OutputType> -): void { - reader[ownerReadableStream_] = stream; - stream[reader_] = reader; - const streamState = stream[shared.state_]; - - reader[closedPromise_] = shared.createControlledPromise<void>(); - if (streamState === "readable") { - // leave as is - } else if (streamState === "closed") { - reader[closedPromise_].resolve(undefined); - } else { - reader[closedPromise_].reject(stream[shared.storedError_]); - reader[closedPromise_].promise.catch(() => {}); - } -} - -export function readableStreamReaderGenericRelease<OutputType>( - reader: SDReadableStreamReader<OutputType> -): void { - // Assert: reader.[[ownerReadableStream]] is not undefined. - // Assert: reader.[[ownerReadableStream]].[[reader]] is reader. - const stream = reader[ownerReadableStream_]; - if (stream === undefined) { - throw new TypeError("Reader is in an inconsistent state"); - } - - if (stream[shared.state_] === "readable") { - // code moved out - } else { - reader[closedPromise_] = shared.createControlledPromise<void>(); - } - reader[closedPromise_].reject(new TypeError()); - reader[closedPromise_].promise.catch(() => {}); - - stream[reader_] = undefined; - reader[ownerReadableStream_] = undefined; -} - -export function readableStreamBYOBReaderRead( - reader: SDReadableStreamBYOBReader, - view: ArrayBufferView, - forAuthorCode = false -): Promise<IteratorResult<ArrayBufferView, any>> { - const stream = reader[ownerReadableStream_]!; - // Assert: stream is not undefined. - - if (stream[shared.state_] === "errored") { - return Promise.reject(stream[shared.storedError_]); - } - return readableByteStreamControllerPullInto( - stream[readableStreamController_] as SDReadableByteStreamController, - view, - forAuthorCode - ); -} - -export function readableStreamDefaultReaderRead<OutputType>( - reader: SDReadableStreamDefaultReader<OutputType>, - forAuthorCode = false -): Promise<IteratorResult<OutputType | undefined>> { - const stream = reader[ownerReadableStream_]!; - // Assert: stream is not undefined. - - if (stream[shared.state_] === "closed") { - return Promise.resolve( - readableStreamCreateReadResult(undefined, true, forAuthorCode) - ); - } - if (stream[shared.state_] === "errored") { - return Promise.reject(stream[shared.storedError_]); - } - // Assert: stream.[[state]] is "readable". - return stream[readableStreamController_][pullSteps_](forAuthorCode); -} - -export function readableStreamFulfillReadIntoRequest<OutputType>( - stream: SDReadableStream<OutputType>, - chunk: ArrayBufferView, - done: boolean -): void { - // TODO remove the "as unknown" cast - const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader; - const readIntoRequest = reader[readIntoRequests_].shift()!; // <-- length check done in caller - readIntoRequest.resolve( - readableStreamCreateReadResult(chunk, done, readIntoRequest.forAuthorCode) - ); -} - -export function readableStreamFulfillReadRequest<OutputType>( - stream: SDReadableStream<OutputType>, - chunk: OutputType, - done: boolean -): void { - const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>; - const readRequest = reader[readRequests_].shift()!; // <-- length check done in caller - readRequest.resolve( - readableStreamCreateReadResult(chunk, done, readRequest.forAuthorCode) - ); -} - -// ---- DefaultController - -export function setUpReadableStreamDefaultController<OutputType>( - stream: SDReadableStream<OutputType>, - controller: SDReadableStreamDefaultController<OutputType>, - startAlgorithm: StartAlgorithm, - pullAlgorithm: PullAlgorithm<OutputType>, - cancelAlgorithm: CancelAlgorithm, - highWaterMark: number, - sizeAlgorithm: QueuingStrategySizeCallback<OutputType> -): void { - // Assert: stream.[[readableStreamController]] is undefined. - controller[controlledReadableStream_] = stream; - q.resetQueue(controller); - controller[started_] = false; - controller[closeRequested_] = false; - controller[pullAgain_] = false; - controller[pulling_] = false; - controller[strategySizeAlgorithm_] = sizeAlgorithm; - controller[strategyHWM_] = highWaterMark; - controller[pullAlgorithm_] = pullAlgorithm; - controller[cancelAlgorithm_] = cancelAlgorithm; - stream[readableStreamController_] = controller; - - const startResult = startAlgorithm(); - Promise.resolve(startResult).then( - _ => { - controller[started_] = true; - // Assert: controller.[[pulling]] is false. - // Assert: controller.[[pullAgain]] is false. - readableStreamDefaultControllerCallPullIfNeeded(controller); - }, - error => { - readableStreamDefaultControllerError(controller, error); - } - ); -} - -export function isReadableStreamDefaultController( - value: unknown -): value is SDReadableStreamDefaultController<any> { - if (typeof value !== "object" || value === null) { - return false; - } - return controlledReadableStream_ in value; -} - -export function readableStreamDefaultControllerHasBackpressure<OutputType>( - controller: SDReadableStreamDefaultController<OutputType> -): boolean { - return !readableStreamDefaultControllerShouldCallPull(controller); -} - -export function readableStreamDefaultControllerCanCloseOrEnqueue<OutputType>( - controller: SDReadableStreamDefaultController<OutputType> -): boolean { - const state = controller[controlledReadableStream_][shared.state_]; - return controller[closeRequested_] === false && state === "readable"; -} - -export function readableStreamDefaultControllerGetDesiredSize<OutputType>( - controller: SDReadableStreamDefaultController<OutputType> -): number | null { - const state = controller[controlledReadableStream_][shared.state_]; - if (state === "errored") { - return null; - } - if (state === "closed") { - return 0; - } - return controller[strategyHWM_] - controller[q.queueTotalSize_]; -} - -export function readableStreamDefaultControllerClose<OutputType>( - controller: SDReadableStreamDefaultController<OutputType> -): void { - // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true. - controller[closeRequested_] = true; - const stream = controller[controlledReadableStream_]; - if (controller[q.queue_].length === 0) { - readableStreamDefaultControllerClearAlgorithms(controller); - readableStreamClose(stream); - } -} - -export function readableStreamDefaultControllerEnqueue<OutputType>( - controller: SDReadableStreamDefaultController<OutputType>, - chunk: OutputType -): void { - const stream = controller[controlledReadableStream_]; - // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true. - if ( - isReadableStreamLocked(stream) && - readableStreamGetNumReadRequests(stream) > 0 - ) { - readableStreamFulfillReadRequest(stream, chunk, false); - } else { - // Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk, - // and interpreting the result as an ECMAScript completion value. - // impl note: assuming that in JS land this just means try/catch with rethrow - let chunkSize: number; - try { - chunkSize = controller[strategySizeAlgorithm_](chunk); - } catch (error) { - readableStreamDefaultControllerError(controller, error); - throw error; - } - try { - q.enqueueValueWithSize(controller, chunk, chunkSize); - } catch (error) { - readableStreamDefaultControllerError(controller, error); - throw error; - } - } - readableStreamDefaultControllerCallPullIfNeeded(controller); -} - -export function readableStreamDefaultControllerError<OutputType>( - controller: SDReadableStreamDefaultController<OutputType>, - error: shared.ErrorResult -): void { - const stream = controller[controlledReadableStream_]; - if (stream[shared.state_] !== "readable") { - return; - } - q.resetQueue(controller); - readableStreamDefaultControllerClearAlgorithms(controller); - readableStreamError(stream, error); -} - -export function readableStreamDefaultControllerCallPullIfNeeded<OutputType>( - controller: SDReadableStreamDefaultController<OutputType> -): void { - if (!readableStreamDefaultControllerShouldCallPull(controller)) { - return; - } - if (controller[pulling_]) { - controller[pullAgain_] = true; - return; - } - if (controller[pullAgain_]) { - throw new RangeError("Stream controller is in an invalid state."); - } - - controller[pulling_] = true; - controller[pullAlgorithm_](controller).then( - _ => { - controller[pulling_] = false; - if (controller[pullAgain_]) { - controller[pullAgain_] = false; - readableStreamDefaultControllerCallPullIfNeeded(controller); - } - }, - error => { - readableStreamDefaultControllerError(controller, error); - } - ); -} - -export function readableStreamDefaultControllerShouldCallPull<OutputType>( - controller: SDReadableStreamDefaultController<OutputType> -): boolean { - const stream = controller[controlledReadableStream_]; - if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) { - return false; - } - if (controller[started_] === false) { - return false; - } - if ( - isReadableStreamLocked(stream) && - readableStreamGetNumReadRequests(stream) > 0 - ) { - return true; - } - const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller); - if (desiredSize === null) { - throw new RangeError("Stream is in an invalid state."); - } - return desiredSize > 0; -} - -export function readableStreamDefaultControllerClearAlgorithms<OutputType>( - controller: SDReadableStreamDefaultController<OutputType> -): void { - controller[pullAlgorithm_] = undefined!; - controller[cancelAlgorithm_] = undefined!; - controller[strategySizeAlgorithm_] = undefined!; -} - -// ---- BYOBController - -export function setUpReadableByteStreamController( - stream: SDReadableStream<ArrayBufferView>, - controller: SDReadableByteStreamController, - startAlgorithm: StartAlgorithm, - pullAlgorithm: PullAlgorithm<ArrayBufferView>, - cancelAlgorithm: CancelAlgorithm, - highWaterMark: number, - autoAllocateChunkSize: number | undefined -): void { - // Assert: stream.[[readableStreamController]] is undefined. - if (stream[readableStreamController_] !== undefined) { - throw new TypeError("Cannot reuse streams"); - } - if (autoAllocateChunkSize !== undefined) { - if ( - !shared.isInteger(autoAllocateChunkSize) || - autoAllocateChunkSize <= 0 - ) { - throw new RangeError( - "autoAllocateChunkSize must be a positive, finite integer" - ); - } - } - // Set controller.[[controlledReadableByteStream]] to stream. - controller[controlledReadableByteStream_] = stream; - // Set controller.[[pullAgain]] and controller.[[pulling]] to false. - controller[pullAgain_] = false; - controller[pulling_] = false; - readableByteStreamControllerClearPendingPullIntos(controller); - q.resetQueue(controller); - controller[closeRequested_] = false; - controller[started_] = false; - controller[strategyHWM_] = shared.validateAndNormalizeHighWaterMark( - highWaterMark - ); - controller[pullAlgorithm_] = pullAlgorithm; - controller[cancelAlgorithm_] = cancelAlgorithm; - controller[autoAllocateChunkSize_] = autoAllocateChunkSize; - controller[pendingPullIntos_] = []; - stream[readableStreamController_] = controller; - - // Let startResult be the result of performing startAlgorithm. - const startResult = startAlgorithm(); - Promise.resolve(startResult).then( - _ => { - controller[started_] = true; - // Assert: controller.[[pulling]] is false. - // Assert: controller.[[pullAgain]] is false. - readableByteStreamControllerCallPullIfNeeded(controller); - }, - error => { - readableByteStreamControllerError(controller, error); - } - ); -} - -export function isReadableStreamBYOBRequest( - value: unknown -): value is SDReadableStreamBYOBRequest { - if (typeof value !== "object" || value === null) { - return false; - } - return associatedReadableByteStreamController_ in value; -} - -export function isReadableByteStreamController( - value: unknown -): value is SDReadableByteStreamController { - if (typeof value !== "object" || value === null) { - return false; - } - return controlledReadableByteStream_ in value; -} - -export function readableByteStreamControllerCallPullIfNeeded( - controller: SDReadableByteStreamController -): void { - if (!readableByteStreamControllerShouldCallPull(controller)) { - return; - } - if (controller[pulling_]) { - controller[pullAgain_] = true; - return; - } - // Assert: controller.[[pullAgain]] is false. - controller[pulling_] = true; - controller[pullAlgorithm_](controller).then( - _ => { - controller[pulling_] = false; - if (controller[pullAgain_]) { - controller[pullAgain_] = false; - readableByteStreamControllerCallPullIfNeeded(controller); - } - }, - error => { - readableByteStreamControllerError(controller, error); - } - ); -} - -export function readableByteStreamControllerClearAlgorithms( - controller: SDReadableByteStreamController -): void { - controller[pullAlgorithm_] = undefined!; - controller[cancelAlgorithm_] = undefined!; -} - -export function readableByteStreamControllerClearPendingPullIntos( - controller: SDReadableByteStreamController -): void { - readableByteStreamControllerInvalidateBYOBRequest(controller); - controller[pendingPullIntos_] = []; -} - -export function readableByteStreamControllerClose( - controller: SDReadableByteStreamController -): void { - const stream = controller[controlledReadableByteStream_]; - // Assert: controller.[[closeRequested]] is false. - // Assert: stream.[[state]] is "readable". - if (controller[q.queueTotalSize_] > 0) { - controller[closeRequested_] = true; - return; - } - if (controller[pendingPullIntos_].length > 0) { - const firstPendingPullInto = controller[pendingPullIntos_][0]; - if (firstPendingPullInto.bytesFilled > 0) { - const error = new TypeError(); - readableByteStreamControllerError(controller, error); - throw error; - } - } - readableByteStreamControllerClearAlgorithms(controller); - readableStreamClose(stream); -} - -export function readableByteStreamControllerCommitPullIntoDescriptor( - stream: SDReadableStream<ArrayBufferView>, - pullIntoDescriptor: PullIntoDescriptor -): void { - // Assert: stream.[[state]] is not "errored". - let done = false; - if (stream[shared.state_] === "closed") { - // Assert: pullIntoDescriptor.[[bytesFilled]] is 0. - done = true; - } - const filledView = readableByteStreamControllerConvertPullIntoDescriptor( - pullIntoDescriptor - ); - if (pullIntoDescriptor.readerType === "default") { - readableStreamFulfillReadRequest(stream, filledView, done); - } else { - // Assert: pullIntoDescriptor.[[readerType]] is "byob". - readableStreamFulfillReadIntoRequest(stream, filledView, done); - } -} - -export function readableByteStreamControllerConvertPullIntoDescriptor( - pullIntoDescriptor: PullIntoDescriptor -): ArrayBufferView { - const { bytesFilled, elementSize } = pullIntoDescriptor; - // Assert: bytesFilled <= pullIntoDescriptor.byteLength - // Assert: bytesFilled mod elementSize is 0 - return new pullIntoDescriptor.ctor( - pullIntoDescriptor.buffer, - pullIntoDescriptor.byteOffset, - bytesFilled / elementSize - ); -} - -export function readableByteStreamControllerEnqueue( - controller: SDReadableByteStreamController, - chunk: ArrayBufferView -): void { - const stream = controller[controlledReadableByteStream_]; - // Assert: controller.[[closeRequested]] is false. - // Assert: stream.[[state]] is "readable". - const { buffer, byteOffset, byteLength } = chunk; - - const transferredBuffer = shared.transferArrayBuffer(buffer); - - if (readableStreamHasDefaultReader(stream)) { - if (readableStreamGetNumReadRequests(stream) === 0) { - readableByteStreamControllerEnqueueChunkToQueue( - controller, - transferredBuffer, - byteOffset, - byteLength - ); - } else { - // Assert: controller.[[queue]] is empty. - const transferredView = new Uint8Array( - transferredBuffer, - byteOffset, - byteLength - ); - readableStreamFulfillReadRequest(stream, transferredView, false); - } - } else if (readableStreamHasBYOBReader(stream)) { - readableByteStreamControllerEnqueueChunkToQueue( - controller, - transferredBuffer, - byteOffset, - byteLength - ); - readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( - controller - ); - } else { - // Assert: !IsReadableStreamLocked(stream) is false. - readableByteStreamControllerEnqueueChunkToQueue( - controller, - transferredBuffer, - byteOffset, - byteLength - ); - } - readableByteStreamControllerCallPullIfNeeded(controller); -} - -export function readableByteStreamControllerEnqueueChunkToQueue( - controller: SDReadableByteStreamController, - buffer: ArrayBufferLike, - byteOffset: number, - byteLength: number -): void { - controller[q.queue_].push({ buffer, byteOffset, byteLength }); - controller[q.queueTotalSize_] += byteLength; -} - -export function readableByteStreamControllerError( - controller: SDReadableByteStreamController, - error: shared.ErrorResult -): void { - const stream = controller[controlledReadableByteStream_]; - if (stream[shared.state_] !== "readable") { - return; - } - readableByteStreamControllerClearPendingPullIntos(controller); - q.resetQueue(controller); - readableByteStreamControllerClearAlgorithms(controller); - readableStreamError(stream, error); -} - -export function readableByteStreamControllerFillHeadPullIntoDescriptor( - controller: SDReadableByteStreamController, - size: number, - pullIntoDescriptor: PullIntoDescriptor -): void { - // Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor. - readableByteStreamControllerInvalidateBYOBRequest(controller); - pullIntoDescriptor.bytesFilled += size; -} - -export function readableByteStreamControllerFillPullIntoDescriptorFromQueue( - controller: SDReadableByteStreamController, - pullIntoDescriptor: PullIntoDescriptor -): boolean { - const elementSize = pullIntoDescriptor.elementSize; - const currentAlignedBytes = - pullIntoDescriptor.bytesFilled - - (pullIntoDescriptor.bytesFilled % elementSize); - const maxBytesToCopy = Math.min( - controller[q.queueTotalSize_], - pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled - ); - const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy; - const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize); - let totalBytesToCopyRemaining = maxBytesToCopy; - let ready = false; - - if (maxAlignedBytes > currentAlignedBytes) { - totalBytesToCopyRemaining = - maxAlignedBytes - pullIntoDescriptor.bytesFilled; - ready = true; - } - const queue = controller[q.queue_]; - - while (totalBytesToCopyRemaining > 0) { - const headOfQueue = queue.front()!; - const bytesToCopy = Math.min( - totalBytesToCopyRemaining, - headOfQueue.byteLength - ); - const destStart = - pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; - shared.copyDataBlockBytes( - pullIntoDescriptor.buffer, - destStart, - headOfQueue.buffer, - headOfQueue.byteOffset, - bytesToCopy - ); - if (headOfQueue.byteLength === bytesToCopy) { - queue.shift(); - } else { - headOfQueue.byteOffset += bytesToCopy; - headOfQueue.byteLength -= bytesToCopy; - } - controller[q.queueTotalSize_] -= bytesToCopy; - readableByteStreamControllerFillHeadPullIntoDescriptor( - controller, - bytesToCopy, - pullIntoDescriptor - ); - totalBytesToCopyRemaining -= bytesToCopy; - } - if (!ready) { - // Assert: controller[queueTotalSize_] === 0 - // Assert: pullIntoDescriptor.bytesFilled > 0 - // Assert: pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize - } - return ready; -} - -export function readableByteStreamControllerGetDesiredSize( - controller: SDReadableByteStreamController -): number | null { - const stream = controller[controlledReadableByteStream_]; - const state = stream[shared.state_]; - if (state === "errored") { - return null; - } - if (state === "closed") { - return 0; - } - return controller[strategyHWM_] - controller[q.queueTotalSize_]; -} - -export function readableByteStreamControllerHandleQueueDrain( - controller: SDReadableByteStreamController -): void { - // Assert: controller.[[controlledReadableByteStream]].[[state]] is "readable". - if (controller[q.queueTotalSize_] === 0 && controller[closeRequested_]) { - readableByteStreamControllerClearAlgorithms(controller); - readableStreamClose(controller[controlledReadableByteStream_]); - } else { - readableByteStreamControllerCallPullIfNeeded(controller); - } -} - -export function readableByteStreamControllerInvalidateBYOBRequest( - controller: SDReadableByteStreamController -): void { - const byobRequest = controller[byobRequest_]; - if (byobRequest === undefined) { - return; - } - byobRequest[associatedReadableByteStreamController_] = undefined; - byobRequest[view_] = undefined; - controller[byobRequest_] = undefined; -} - -export function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( - controller: SDReadableByteStreamController -): void { - // Assert: controller.[[closeRequested]] is false. - const pendingPullIntos = controller[pendingPullIntos_]; - while (pendingPullIntos.length > 0) { - if (controller[q.queueTotalSize_] === 0) { - return; - } - const pullIntoDescriptor = pendingPullIntos[0]; - if ( - readableByteStreamControllerFillPullIntoDescriptorFromQueue( - controller, - pullIntoDescriptor - ) - ) { - readableByteStreamControllerShiftPendingPullInto(controller); - readableByteStreamControllerCommitPullIntoDescriptor( - controller[controlledReadableByteStream_], - pullIntoDescriptor - ); - } - } -} - -export function readableByteStreamControllerPullInto( - controller: SDReadableByteStreamController, - view: ArrayBufferView, - forAuthorCode: boolean -): Promise<IteratorResult<ArrayBufferView, any>> { - const stream = controller[controlledReadableByteStream_]; - - const elementSize = (view as Uint8Array).BYTES_PER_ELEMENT || 1; // DataView exposes this in Webkit as 1, is not present in FF or Blink - const ctor = view.constructor as Uint8ArrayConstructor; // the typecast here is just for TS typing, it does not influence buffer creation - - const byteOffset = view.byteOffset; - const byteLength = view.byteLength; - const buffer = shared.transferArrayBuffer(view.buffer); - const pullIntoDescriptor: PullIntoDescriptor = { - buffer, - byteOffset, - byteLength, - bytesFilled: 0, - elementSize, - ctor, - readerType: "byob" - }; - - if (controller[pendingPullIntos_].length > 0) { - controller[pendingPullIntos_].push(pullIntoDescriptor); - return readableStreamAddReadIntoRequest(stream, forAuthorCode); - } - if (stream[shared.state_] === "closed") { - const emptyView = new ctor( - pullIntoDescriptor.buffer, - pullIntoDescriptor.byteOffset, - 0 - ); - return Promise.resolve( - readableStreamCreateReadResult(emptyView, true, forAuthorCode) - ); - } - - if (controller[q.queueTotalSize_] > 0) { - if ( - readableByteStreamControllerFillPullIntoDescriptorFromQueue( - controller, - pullIntoDescriptor - ) - ) { - const filledView = readableByteStreamControllerConvertPullIntoDescriptor( - pullIntoDescriptor - ); - readableByteStreamControllerHandleQueueDrain(controller); - return Promise.resolve( - readableStreamCreateReadResult(filledView, false, forAuthorCode) - ); - } - if (controller[closeRequested_]) { - const error = new TypeError(); - readableByteStreamControllerError(controller, error); - return Promise.reject(error); - } - } - - controller[pendingPullIntos_].push(pullIntoDescriptor); - const promise = readableStreamAddReadIntoRequest(stream, forAuthorCode); - readableByteStreamControllerCallPullIfNeeded(controller); - return promise; -} - -export function readableByteStreamControllerRespond( - controller: SDReadableByteStreamController, - bytesWritten: number -): void { - bytesWritten = Number(bytesWritten); - if (!shared.isFiniteNonNegativeNumber(bytesWritten)) { - throw new RangeError("bytesWritten must be a finite, non-negative number"); - } - // Assert: controller.[[pendingPullIntos]] is not empty. - readableByteStreamControllerRespondInternal(controller, bytesWritten); -} - -export function readableByteStreamControllerRespondInClosedState( - controller: SDReadableByteStreamController, - firstDescriptor: PullIntoDescriptor -): void { - firstDescriptor.buffer = shared.transferArrayBuffer(firstDescriptor.buffer); - // Assert: firstDescriptor.[[bytesFilled]] is 0. - const stream = controller[controlledReadableByteStream_]; - if (readableStreamHasBYOBReader(stream)) { - while (readableStreamGetNumReadIntoRequests(stream) > 0) { - const pullIntoDescriptor = readableByteStreamControllerShiftPendingPullInto( - controller - )!; - readableByteStreamControllerCommitPullIntoDescriptor( - stream, - pullIntoDescriptor - ); - } - } -} - -export function readableByteStreamControllerRespondInReadableState( - controller: SDReadableByteStreamController, - bytesWritten: number, - pullIntoDescriptor: PullIntoDescriptor -): void { - if ( - pullIntoDescriptor.bytesFilled + bytesWritten > - pullIntoDescriptor.byteLength - ) { - throw new RangeError(); - } - readableByteStreamControllerFillHeadPullIntoDescriptor( - controller, - bytesWritten, - pullIntoDescriptor - ); - if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) { - return; - } - readableByteStreamControllerShiftPendingPullInto(controller); - const remainderSize = - pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize; - if (remainderSize > 0) { - const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; - const remainder = shared.cloneArrayBuffer( - pullIntoDescriptor.buffer, - end - remainderSize, - remainderSize, - ArrayBuffer - ); - readableByteStreamControllerEnqueueChunkToQueue( - controller, - remainder, - 0, - remainder.byteLength - ); - } - pullIntoDescriptor.buffer = shared.transferArrayBuffer( - pullIntoDescriptor.buffer - ); - pullIntoDescriptor.bytesFilled = - pullIntoDescriptor.bytesFilled - remainderSize; - readableByteStreamControllerCommitPullIntoDescriptor( - controller[controlledReadableByteStream_], - pullIntoDescriptor - ); - readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller); -} - -export function readableByteStreamControllerRespondInternal( - controller: SDReadableByteStreamController, - bytesWritten: number -): void { - const firstDescriptor = controller[pendingPullIntos_][0]; - const stream = controller[controlledReadableByteStream_]; - if (stream[shared.state_] === "closed") { - if (bytesWritten !== 0) { - throw new TypeError(); - } - readableByteStreamControllerRespondInClosedState( - controller, - firstDescriptor - ); - } else { - // Assert: stream.[[state]] is "readable". - readableByteStreamControllerRespondInReadableState( - controller, - bytesWritten, - firstDescriptor - ); - } - readableByteStreamControllerCallPullIfNeeded(controller); -} - -export function readableByteStreamControllerRespondWithNewView( - controller: SDReadableByteStreamController, - view: ArrayBufferView -): void { - // Assert: controller.[[pendingPullIntos]] is not empty. - const firstDescriptor = controller[pendingPullIntos_][0]; - if ( - firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== - view.byteOffset - ) { - throw new RangeError(); - } - if (firstDescriptor.byteLength !== view.byteLength) { - throw new RangeError(); - } - firstDescriptor.buffer = view.buffer; - readableByteStreamControllerRespondInternal(controller, view.byteLength); -} - -export function readableByteStreamControllerShiftPendingPullInto( - controller: SDReadableByteStreamController -): PullIntoDescriptor | undefined { - const descriptor = controller[pendingPullIntos_].shift(); - readableByteStreamControllerInvalidateBYOBRequest(controller); - return descriptor; -} - -export function readableByteStreamControllerShouldCallPull( - controller: SDReadableByteStreamController -): boolean { - // Let stream be controller.[[controlledReadableByteStream]]. - const stream = controller[controlledReadableByteStream_]; - if (stream[shared.state_] !== "readable") { - return false; - } - if (controller[closeRequested_]) { - return false; - } - if (!controller[started_]) { - return false; - } - if ( - readableStreamHasDefaultReader(stream) && - readableStreamGetNumReadRequests(stream) > 0 - ) { - return true; - } - if ( - readableStreamHasBYOBReader(stream) && - readableStreamGetNumReadIntoRequests(stream) > 0 - ) { - return true; - } - const desiredSize = readableByteStreamControllerGetDesiredSize(controller); - // Assert: desiredSize is not null. - return desiredSize! > 0; -} - -export function setUpReadableStreamBYOBRequest( - request: SDReadableStreamBYOBRequest, - controller: SDReadableByteStreamController, - view: ArrayBufferView -): void { - if (!isReadableByteStreamController(controller)) { - throw new TypeError(); - } - if (!ArrayBuffer.isView(view)) { - throw new TypeError(); - } - // Assert: !IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false. - - request[associatedReadableByteStreamController_] = controller; - request[view_] = view; -} diff --git a/cli/js/streams/readable-stream-byob-reader.ts b/cli/js/streams/readable-stream-byob-reader.ts deleted file mode 100644 index 0f9bfb037..000000000 --- a/cli/js/streams/readable-stream-byob-reader.ts +++ /dev/null @@ -1,93 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/readable-stream-byob-reader - ReadableStreamBYOBReader class implementation - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -import * as rs from "./readable-internals.ts"; -import * as shared from "./shared-internals.ts"; - -export class SDReadableStreamBYOBReader - implements rs.SDReadableStreamBYOBReader { - [rs.closedPromise_]: shared.ControlledPromise<void>; - [rs.ownerReadableStream_]: rs.SDReadableStream<ArrayBufferView> | undefined; - [rs.readIntoRequests_]: Array< - rs.ReadRequest<IteratorResult<ArrayBufferView>> - >; - - constructor(stream: rs.SDReadableStream<ArrayBufferView>) { - if (!rs.isReadableStream(stream)) { - throw new TypeError(); - } - if ( - !rs.isReadableByteStreamController(stream[rs.readableStreamController_]) - ) { - throw new TypeError(); - } - if (rs.isReadableStreamLocked(stream)) { - throw new TypeError("The stream is locked."); - } - rs.readableStreamReaderGenericInitialize(this, stream); - this[rs.readIntoRequests_] = []; - } - - get closed(): Promise<void> { - if (!rs.isReadableStreamBYOBReader(this)) { - return Promise.reject(new TypeError()); - } - return this[rs.closedPromise_].promise; - } - - cancel(reason: shared.ErrorResult): Promise<void> { - if (!rs.isReadableStreamBYOBReader(this)) { - return Promise.reject(new TypeError()); - } - const stream = this[rs.ownerReadableStream_]; - if (stream === undefined) { - return Promise.reject( - new TypeError("Reader is not associated with a stream") - ); - } - return rs.readableStreamCancel(stream, reason); - } - - read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>> { - if (!rs.isReadableStreamBYOBReader(this)) { - return Promise.reject(new TypeError()); - } - if (this[rs.ownerReadableStream_] === undefined) { - return Promise.reject( - new TypeError("Reader is not associated with a stream") - ); - } - if (!ArrayBuffer.isView(view)) { - return Promise.reject( - new TypeError("view argument must be a valid ArrayBufferView") - ); - } - // If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true, return a promise rejected with a TypeError exception. - if (view.byteLength === 0) { - return Promise.reject( - new TypeError("supplied buffer view must be > 0 bytes") - ); - } - return rs.readableStreamBYOBReaderRead(this, view, true); - } - - releaseLock(): void { - if (!rs.isReadableStreamBYOBReader(this)) { - throw new TypeError(); - } - if (this[rs.ownerReadableStream_] === undefined) { - throw new TypeError("Reader is not associated with a stream"); - } - if (this[rs.readIntoRequests_].length > 0) { - throw new TypeError(); - } - rs.readableStreamReaderGenericRelease(this); - } -} diff --git a/cli/js/streams/readable-stream-byob-request.ts b/cli/js/streams/readable-stream-byob-request.ts deleted file mode 100644 index 25b937f10..000000000 --- a/cli/js/streams/readable-stream-byob-request.ts +++ /dev/null @@ -1,60 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/readable-stream-byob-request - ReadableStreamBYOBRequest class implementation - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -import * as rs from "./readable-internals.ts"; - -export class ReadableStreamBYOBRequest { - [rs.associatedReadableByteStreamController_]: - | rs.SDReadableByteStreamController - | undefined; - [rs.view_]: ArrayBufferView | undefined; - - constructor() { - throw new TypeError(); - } - - get view(): ArrayBufferView { - if (!rs.isReadableStreamBYOBRequest(this)) { - throw new TypeError(); - } - return this[rs.view_]!; - } - - respond(bytesWritten: number): void { - if (!rs.isReadableStreamBYOBRequest(this)) { - throw new TypeError(); - } - if (this[rs.associatedReadableByteStreamController_] === undefined) { - throw new TypeError(); - } - // If! IsDetachedBuffer(this.[[view]].[[ViewedArrayBuffer]]) is true, throw a TypeError exception. - return rs.readableByteStreamControllerRespond( - this[rs.associatedReadableByteStreamController_]!, - bytesWritten - ); - } - - respondWithNewView(view: ArrayBufferView): void { - if (!rs.isReadableStreamBYOBRequest(this)) { - throw new TypeError(); - } - if (this[rs.associatedReadableByteStreamController_] === undefined) { - throw new TypeError(); - } - if (!ArrayBuffer.isView(view)) { - throw new TypeError("view parameter must be a TypedArray"); - } - // If! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true, throw a TypeError exception. - return rs.readableByteStreamControllerRespondWithNewView( - this[rs.associatedReadableByteStreamController_]!, - view - ); - } -} diff --git a/cli/js/streams/readable-stream-default-controller.ts b/cli/js/streams/readable-stream-default-controller.ts deleted file mode 100644 index e9ddce1bc..000000000 --- a/cli/js/streams/readable-stream-default-controller.ts +++ /dev/null @@ -1,139 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/readable-stream-default-controller - ReadableStreamDefaultController class implementation - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -/* eslint-disable @typescript-eslint/no-explicit-any */ -// TODO reenable this lint here - -import * as rs from "./readable-internals.ts"; -import * as shared from "./shared-internals.ts"; -import * as q from "./queue-mixin.ts"; -import { Queue } from "./queue.ts"; -import { QueuingStrategySizeCallback, UnderlyingSource } from "../dom_types.ts"; - -export class ReadableStreamDefaultController<OutputType> - implements rs.SDReadableStreamDefaultController<OutputType> { - [rs.cancelAlgorithm_]: rs.CancelAlgorithm; - [rs.closeRequested_]: boolean; - [rs.controlledReadableStream_]: rs.SDReadableStream<OutputType>; - [rs.pullAgain_]: boolean; - [rs.pullAlgorithm_]: rs.PullAlgorithm<OutputType>; - [rs.pulling_]: boolean; - [rs.strategyHWM_]: number; - [rs.strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>; - [rs.started_]: boolean; - - [q.queue_]: Queue<q.QueueElement<OutputType>>; - [q.queueTotalSize_]: number; - - constructor() { - throw new TypeError(); - } - - get desiredSize(): number | null { - return rs.readableStreamDefaultControllerGetDesiredSize(this); - } - - close(): void { - if (!rs.isReadableStreamDefaultController(this)) { - throw new TypeError(); - } - if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) { - throw new TypeError( - "Cannot close, the stream is already closing or not readable" - ); - } - rs.readableStreamDefaultControllerClose(this); - } - - enqueue(chunk?: OutputType): void { - if (!rs.isReadableStreamDefaultController(this)) { - throw new TypeError(); - } - if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) { - throw new TypeError( - "Cannot enqueue, the stream is closing or not readable" - ); - } - rs.readableStreamDefaultControllerEnqueue(this, chunk!); - } - - error(e?: shared.ErrorResult): void { - if (!rs.isReadableStreamDefaultController(this)) { - throw new TypeError(); - } - rs.readableStreamDefaultControllerError(this, e); - } - - [rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> { - q.resetQueue(this); - const result = this[rs.cancelAlgorithm_](reason); - rs.readableStreamDefaultControllerClearAlgorithms(this); - return result; - } - - [rs.pullSteps_]( - forAuthorCode: boolean - ): Promise<IteratorResult<OutputType, any>> { - const stream = this[rs.controlledReadableStream_]; - if (this[q.queue_].length > 0) { - const chunk = q.dequeueValue(this); - if (this[rs.closeRequested_] && this[q.queue_].length === 0) { - rs.readableStreamDefaultControllerClearAlgorithms(this); - rs.readableStreamClose(stream); - } else { - rs.readableStreamDefaultControllerCallPullIfNeeded(this); - } - return Promise.resolve( - rs.readableStreamCreateReadResult(chunk, false, forAuthorCode) - ); - } - - const pendingPromise = rs.readableStreamAddReadRequest( - stream, - forAuthorCode - ); - rs.readableStreamDefaultControllerCallPullIfNeeded(this); - return pendingPromise; - } -} - -export function setUpReadableStreamDefaultControllerFromUnderlyingSource< - OutputType ->( - stream: rs.SDReadableStream<OutputType>, - underlyingSource: UnderlyingSource<OutputType>, - highWaterMark: number, - sizeAlgorithm: QueuingStrategySizeCallback<OutputType> -): void { - // Assert: underlyingSource is not undefined. - const controller = Object.create(ReadableStreamDefaultController.prototype); - const startAlgorithm = (): any => { - return shared.invokeOrNoop(underlyingSource, "start", [controller]); - }; - const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod( - underlyingSource, - "pull", - [controller] - ); - const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod( - underlyingSource, - "cancel", - [] - ); - rs.setUpReadableStreamDefaultController( - stream, - controller, - startAlgorithm, - pullAlgorithm, - cancelAlgorithm, - highWaterMark, - sizeAlgorithm - ); -} diff --git a/cli/js/streams/readable-stream-default-reader.ts b/cli/js/streams/readable-stream-default-reader.ts deleted file mode 100644 index eb1910a9d..000000000 --- a/cli/js/streams/readable-stream-default-reader.ts +++ /dev/null @@ -1,75 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/readable-stream-default-reader - ReadableStreamDefaultReader class implementation - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -import * as rs from "./readable-internals.ts"; -import * as shared from "./shared-internals.ts"; - -export class ReadableStreamDefaultReader<OutputType> - implements rs.SDReadableStreamReader<OutputType> { - [rs.closedPromise_]: shared.ControlledPromise<void>; - [rs.ownerReadableStream_]: rs.SDReadableStream<OutputType> | undefined; - [rs.readRequests_]: Array<rs.ReadRequest<IteratorResult<OutputType>>>; - - constructor(stream: rs.SDReadableStream<OutputType>) { - if (!rs.isReadableStream(stream)) { - throw new TypeError(); - } - if (rs.isReadableStreamLocked(stream)) { - throw new TypeError("The stream is locked."); - } - rs.readableStreamReaderGenericInitialize(this, stream); - this[rs.readRequests_] = []; - } - - get closed(): Promise<void> { - if (!rs.isReadableStreamDefaultReader(this)) { - return Promise.reject(new TypeError()); - } - return this[rs.closedPromise_].promise; - } - - cancel(reason: shared.ErrorResult): Promise<void> { - if (!rs.isReadableStreamDefaultReader(this)) { - return Promise.reject(new TypeError()); - } - const stream = this[rs.ownerReadableStream_]; - if (stream === undefined) { - return Promise.reject( - new TypeError("Reader is not associated with a stream") - ); - } - return rs.readableStreamCancel(stream, reason); - } - - read(): Promise<IteratorResult<OutputType | undefined>> { - if (!rs.isReadableStreamDefaultReader(this)) { - return Promise.reject(new TypeError()); - } - if (this[rs.ownerReadableStream_] === undefined) { - return Promise.reject( - new TypeError("Reader is not associated with a stream") - ); - } - return rs.readableStreamDefaultReaderRead(this, true); - } - - releaseLock(): void { - if (!rs.isReadableStreamDefaultReader(this)) { - throw new TypeError(); - } - if (this[rs.ownerReadableStream_] === undefined) { - return; - } - if (this[rs.readRequests_].length !== 0) { - throw new TypeError("Cannot release a stream with pending read requests"); - } - rs.readableStreamReaderGenericRelease(this); - } -} diff --git a/cli/js/streams/readable-stream.ts b/cli/js/streams/readable-stream.ts deleted file mode 100644 index 4d9d85889..000000000 --- a/cli/js/streams/readable-stream.ts +++ /dev/null @@ -1,391 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/readable-stream - ReadableStream class implementation - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -/* eslint prefer-const: "off" */ -// TODO remove this, surpressed because of -// 284:7 error 'branch1' is never reassigned. Use 'const' instead prefer-const - -import * as rs from "./readable-internals.ts"; -import * as shared from "./shared-internals.ts"; -import { - QueuingStrategy, - QueuingStrategySizeCallback, - UnderlyingSource, - UnderlyingByteSource -} from "../dom_types.ts"; - -import { - ReadableStreamDefaultController, - setUpReadableStreamDefaultControllerFromUnderlyingSource -} from "./readable-stream-default-controller.ts"; -import { ReadableStreamDefaultReader } from "./readable-stream-default-reader.ts"; - -import { - ReadableByteStreamController, - setUpReadableByteStreamControllerFromUnderlyingSource -} from "./readable-byte-stream-controller.ts"; -import { SDReadableStreamBYOBReader } from "./readable-stream-byob-reader.ts"; - -export class SDReadableStream<OutputType> - implements rs.SDReadableStream<OutputType> { - [shared.state_]: rs.ReadableStreamState; - [shared.storedError_]: shared.ErrorResult; - [rs.reader_]: rs.SDReadableStreamReader<OutputType> | undefined; - [rs.readableStreamController_]: rs.SDReadableStreamControllerBase<OutputType>; - - constructor( - underlyingSource: UnderlyingByteSource, - strategy?: { highWaterMark?: number; size?: undefined } - ); - constructor( - underlyingSource?: UnderlyingSource<OutputType>, - strategy?: QueuingStrategy<OutputType> - ); - constructor( - underlyingSource: UnderlyingSource<OutputType> | UnderlyingByteSource = {}, - strategy: - | QueuingStrategy<OutputType> - | { highWaterMark?: number; size?: undefined } = {} - ) { - rs.initializeReadableStream(this); - - const sizeFunc = strategy.size; - const stratHWM = strategy.highWaterMark; - const sourceType = underlyingSource.type; - - if (sourceType === undefined) { - const sizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(sizeFunc); - const highWaterMark = shared.validateAndNormalizeHighWaterMark( - stratHWM === undefined ? 1 : stratHWM - ); - setUpReadableStreamDefaultControllerFromUnderlyingSource( - this, - underlyingSource as UnderlyingSource<OutputType>, - highWaterMark, - sizeAlgorithm - ); - } else if (String(sourceType) === "bytes") { - if (sizeFunc !== undefined) { - throw new RangeError( - "bytes streams cannot have a strategy with a `size` field" - ); - } - const highWaterMark = shared.validateAndNormalizeHighWaterMark( - stratHWM === undefined ? 0 : stratHWM - ); - setUpReadableByteStreamControllerFromUnderlyingSource( - (this as unknown) as rs.SDReadableStream<ArrayBufferView>, - underlyingSource as UnderlyingByteSource, - highWaterMark - ); - } else { - throw new RangeError( - "The underlying source's `type` field must be undefined or 'bytes'" - ); - } - } - - get locked(): boolean { - return rs.isReadableStreamLocked(this); - } - - getReader(): rs.SDReadableStreamDefaultReader<OutputType>; - getReader(options: { mode?: "byob" }): rs.SDReadableStreamBYOBReader; - getReader(options?: { - mode?: "byob"; - }): - | rs.SDReadableStreamDefaultReader<OutputType> - | rs.SDReadableStreamBYOBReader { - if (!rs.isReadableStream(this)) { - throw new TypeError(); - } - if (options === undefined) { - options = {}; - } - const { mode } = options; - if (mode === undefined) { - return new ReadableStreamDefaultReader(this); - } else if (String(mode) === "byob") { - return new SDReadableStreamBYOBReader( - (this as unknown) as rs.SDReadableStream<ArrayBufferView> - ); - } - throw RangeError("mode option must be undefined or `byob`"); - } - - cancel(reason: shared.ErrorResult): Promise<void> { - if (!rs.isReadableStream(this)) { - return Promise.reject(new TypeError()); - } - if (rs.isReadableStreamLocked(this)) { - return Promise.reject(new TypeError("Cannot cancel a locked stream")); - } - return rs.readableStreamCancel(this, reason); - } - - tee(): Array<SDReadableStream<OutputType>> { - return readableStreamTee(this, false); - } - - /* TODO reenable these methods when we bring in writableStreams and transport types - pipeThrough<ResultType>( - transform: rs.GenericTransformStream<OutputType, ResultType>, - options: PipeOptions = {} - ): rs.SDReadableStream<ResultType> { - const { readable, writable } = transform; - if (!rs.isReadableStream(this)) { - throw new TypeError(); - } - if (!ws.isWritableStream(writable)) { - throw new TypeError("writable must be a WritableStream"); - } - if (!rs.isReadableStream(readable)) { - throw new TypeError("readable must be a ReadableStream"); - } - if (options.signal !== undefined && !shared.isAbortSignal(options.signal)) { - throw new TypeError("options.signal must be an AbortSignal instance"); - } - if (rs.isReadableStreamLocked(this)) { - throw new TypeError("Cannot pipeThrough on a locked stream"); - } - if (ws.isWritableStreamLocked(writable)) { - throw new TypeError("Cannot pipeThrough to a locked stream"); - } - - const pipeResult = pipeTo(this, writable, options); - pipeResult.catch(() => {}); - - return readable; - } - - pipeTo( - dest: ws.WritableStream<OutputType>, - options: PipeOptions = {} - ): Promise<void> { - if (!rs.isReadableStream(this)) { - return Promise.reject(new TypeError()); - } - if (!ws.isWritableStream(dest)) { - return Promise.reject( - new TypeError("destination must be a WritableStream") - ); - } - if (options.signal !== undefined && !shared.isAbortSignal(options.signal)) { - return Promise.reject( - new TypeError("options.signal must be an AbortSignal instance") - ); - } - if (rs.isReadableStreamLocked(this)) { - return Promise.reject(new TypeError("Cannot pipe from a locked stream")); - } - if (ws.isWritableStreamLocked(dest)) { - return Promise.reject(new TypeError("Cannot pipe to a locked stream")); - } - - return pipeTo(this, dest, options); - } - */ -} - -export function createReadableStream<OutputType>( - startAlgorithm: rs.StartAlgorithm, - pullAlgorithm: rs.PullAlgorithm<OutputType>, - cancelAlgorithm: rs.CancelAlgorithm, - highWaterMark?: number, - sizeAlgorithm?: QueuingStrategySizeCallback<OutputType> -): SDReadableStream<OutputType> { - if (highWaterMark === undefined) { - highWaterMark = 1; - } - if (sizeAlgorithm === undefined) { - sizeAlgorithm = (): number => 1; - } - // Assert: ! IsNonNegativeNumber(highWaterMark) is true. - - const stream = Object.create(SDReadableStream.prototype) as SDReadableStream< - OutputType - >; - rs.initializeReadableStream(stream); - const controller = Object.create( - ReadableStreamDefaultController.prototype - ) as ReadableStreamDefaultController<OutputType>; - rs.setUpReadableStreamDefaultController( - stream, - controller, - startAlgorithm, - pullAlgorithm, - cancelAlgorithm, - highWaterMark, - sizeAlgorithm - ); - return stream; -} - -export function createReadableByteStream<OutputType>( - startAlgorithm: rs.StartAlgorithm, - pullAlgorithm: rs.PullAlgorithm<OutputType>, - cancelAlgorithm: rs.CancelAlgorithm, - highWaterMark?: number, - autoAllocateChunkSize?: number -): SDReadableStream<OutputType> { - if (highWaterMark === undefined) { - highWaterMark = 0; - } - // Assert: ! IsNonNegativeNumber(highWaterMark) is true. - if (autoAllocateChunkSize !== undefined) { - if ( - !shared.isInteger(autoAllocateChunkSize) || - autoAllocateChunkSize <= 0 - ) { - throw new RangeError( - "autoAllocateChunkSize must be a positive, finite integer" - ); - } - } - - const stream = Object.create(SDReadableStream.prototype) as SDReadableStream< - OutputType - >; - rs.initializeReadableStream(stream); - const controller = Object.create( - ReadableByteStreamController.prototype - ) as ReadableByteStreamController; - rs.setUpReadableByteStreamController( - (stream as unknown) as SDReadableStream<ArrayBufferView>, - controller, - startAlgorithm, - (pullAlgorithm as unknown) as rs.PullAlgorithm<ArrayBufferView>, - cancelAlgorithm, - highWaterMark, - autoAllocateChunkSize - ); - return stream; -} - -export function readableStreamTee<OutputType>( - stream: SDReadableStream<OutputType>, - cloneForBranch2: boolean -): [SDReadableStream<OutputType>, SDReadableStream<OutputType>] { - if (!rs.isReadableStream(stream)) { - throw new TypeError(); - } - - const reader = new ReadableStreamDefaultReader(stream); - let closedOrErrored = false; - let canceled1 = false; - let canceled2 = false; - let reason1: shared.ErrorResult; - let reason2: shared.ErrorResult; - let branch1: SDReadableStream<OutputType>; - let branch2: SDReadableStream<OutputType>; - - let cancelResolve: (reason: shared.ErrorResult) => void; - const cancelPromise = new Promise<void>(resolve => (cancelResolve = resolve)); - - const pullAlgorithm = (): Promise<void> => { - return rs - .readableStreamDefaultReaderRead(reader) - .then(({ value, done }) => { - if (done && !closedOrErrored) { - if (!canceled1) { - rs.readableStreamDefaultControllerClose( - branch1![ - rs.readableStreamController_ - ] as ReadableStreamDefaultController<OutputType> - ); - } - if (!canceled2) { - rs.readableStreamDefaultControllerClose( - branch2![ - rs.readableStreamController_ - ] as ReadableStreamDefaultController<OutputType> - ); - } - closedOrErrored = true; - } - if (closedOrErrored) { - return; - } - const value1 = value; - let value2 = value; - if (!canceled1) { - rs.readableStreamDefaultControllerEnqueue( - branch1![ - rs.readableStreamController_ - ] as ReadableStreamDefaultController<OutputType>, - value1! - ); - } - if (!canceled2) { - if (cloneForBranch2) { - value2 = shared.cloneValue(value2); - } - rs.readableStreamDefaultControllerEnqueue( - branch2![ - rs.readableStreamController_ - ] as ReadableStreamDefaultController<OutputType>, - value2! - ); - } - }); - }; - - const cancel1Algorithm = (reason: shared.ErrorResult): Promise<void> => { - canceled1 = true; - reason1 = reason; - if (canceled2) { - const cancelResult = rs.readableStreamCancel(stream, [reason1, reason2]); - cancelResolve(cancelResult); - } - return cancelPromise; - }; - - const cancel2Algorithm = (reason: shared.ErrorResult): Promise<void> => { - canceled2 = true; - reason2 = reason; - if (canceled1) { - const cancelResult = rs.readableStreamCancel(stream, [reason1, reason2]); - cancelResolve(cancelResult); - } - return cancelPromise; - }; - - const startAlgorithm = (): undefined => undefined; - branch1 = createReadableStream( - startAlgorithm, - pullAlgorithm, - cancel1Algorithm - ); - branch2 = createReadableStream( - startAlgorithm, - pullAlgorithm, - cancel2Algorithm - ); - - reader[rs.closedPromise_].promise.catch(error => { - if (!closedOrErrored) { - rs.readableStreamDefaultControllerError( - branch1![ - rs.readableStreamController_ - ] as ReadableStreamDefaultController<OutputType>, - error - ); - rs.readableStreamDefaultControllerError( - branch2![ - rs.readableStreamController_ - ] as ReadableStreamDefaultController<OutputType>, - error - ); - closedOrErrored = true; - } - }); - - return [branch1, branch2]; -} diff --git a/cli/js/streams/shared-internals.ts b/cli/js/streams/shared-internals.ts deleted file mode 100644 index 93155fecc..000000000 --- a/cli/js/streams/shared-internals.ts +++ /dev/null @@ -1,306 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/shared-internals - common types and methods for streams - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -/* eslint-disable @typescript-eslint/no-explicit-any */ -// TODO don't disable this warning - -import { AbortSignal, QueuingStrategySizeCallback } from "../dom_types.ts"; - -// common stream fields - -export const state_ = Symbol("state_"); -export const storedError_ = Symbol("storedError_"); - -// --------- - -/** An error reason / result can be anything */ -export type ErrorResult = any; - -// --------- - -export function isInteger(value: number): boolean { - if (!isFinite(value)) { - // covers NaN, +Infinity and -Infinity - return false; - } - const absValue = Math.abs(value); - return Math.floor(absValue) === absValue; -} - -export function isFiniteNonNegativeNumber(value: unknown): boolean { - if (!(typeof value === "number" && isFinite(value))) { - // covers NaN, +Infinity and -Infinity - return false; - } - return value >= 0; -} - -export function isAbortSignal(signal: any): signal is AbortSignal { - if (typeof signal !== "object" || signal === null) { - return false; - } - try { - // TODO - // calling signal.aborted() probably isn't the right way to perform this test - // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/shared-internals.ts#L41 - signal.aborted(); - return true; - } catch (err) { - return false; - } -} - -export function invokeOrNoop<O extends object, P extends keyof O>( - o: O, - p: P, - args: any[] -): any { - // Assert: O is not undefined. - // Assert: IsPropertyKey(P) is true. - // Assert: args is a List. - const method: Function | undefined = (o as any)[p]; // tslint:disable-line:ban-types - if (method === undefined) { - return undefined; - } - return Function.prototype.apply.call(method, o, args); -} - -export function cloneArrayBuffer( - srcBuffer: ArrayBufferLike, - srcByteOffset: number, - srcLength: number, - cloneConstructor: ArrayBufferConstructor | SharedArrayBufferConstructor -): InstanceType<typeof cloneConstructor> { - // this function fudges the return type but SharedArrayBuffer is disabled for a while anyway - return srcBuffer.slice( - srcByteOffset, - srcByteOffset + srcLength - ) as InstanceType<typeof cloneConstructor>; -} - -export function transferArrayBuffer(buffer: ArrayBufferLike): ArrayBuffer { - // This would in a JS engine context detach the buffer's backing store and return - // a new ArrayBuffer with the same backing store, invalidating `buffer`, - // i.e. a move operation in C++ parlance. - // Sadly ArrayBuffer.transfer is yet to be implemented by a single browser vendor. - return buffer.slice(0); // copies instead of moves -} - -export function copyDataBlockBytes( - toBlock: ArrayBufferLike, - toIndex: number, - fromBlock: ArrayBufferLike, - fromIndex: number, - count: number -): void { - new Uint8Array(toBlock, toIndex, count).set( - new Uint8Array(fromBlock, fromIndex, count) - ); -} - -// helper memoisation map for object values -// weak so it doesn't keep memoized versions of old objects indefinitely. -const objectCloneMemo = new WeakMap<object, object>(); - -let sharedArrayBufferSupported_: boolean | undefined; -function supportsSharedArrayBuffer(): boolean { - if (sharedArrayBufferSupported_ === undefined) { - try { - new SharedArrayBuffer(16); - sharedArrayBufferSupported_ = true; - } catch (e) { - sharedArrayBufferSupported_ = false; - } - } - return sharedArrayBufferSupported_; -} - -/** - * Implement a method of value cloning that is reasonably close to performing `StructuredSerialize(StructuredDeserialize(value))` - * from the HTML standard. Used by the internal `readableStreamTee` method to clone values for connected implementations. - * @see https://html.spec.whatwg.org/multipage/structured-data.html#structuredserializeinternal - */ -export function cloneValue(value: any): any { - const valueType = typeof value; - switch (valueType) { - case "number": - case "string": - case "boolean": - case "undefined": - // @ts-ignore - case "bigint": - return value; - case "object": { - if (objectCloneMemo.has(value)) { - return objectCloneMemo.get(value); - } - if (value === null) { - return value; - } - if (value instanceof Date) { - return new Date(value.valueOf()); - } - if (value instanceof RegExp) { - return new RegExp(value); - } - if (supportsSharedArrayBuffer() && value instanceof SharedArrayBuffer) { - return value; - } - if (value instanceof ArrayBuffer) { - const cloned = cloneArrayBuffer( - value, - 0, - value.byteLength, - ArrayBuffer - ); - objectCloneMemo.set(value, cloned); - return cloned; - } - if (ArrayBuffer.isView(value)) { - const clonedBuffer = cloneValue(value.buffer) as ArrayBufferLike; - // Use DataViewConstructor type purely for type-checking, can be a DataView or TypedArray. - // They use the same constructor signature, only DataView has a length in bytes and TypedArrays - // use a length in terms of elements, so we adjust for that. - let length: number; - if (value instanceof DataView) { - length = value.byteLength; - } else { - length = (value as Uint8Array).length; - } - return new (value.constructor as DataViewConstructor)( - clonedBuffer, - value.byteOffset, - length - ); - } - if (value instanceof Map) { - const clonedMap = new Map(); - objectCloneMemo.set(value, clonedMap); - value.forEach((v, k) => clonedMap.set(k, cloneValue(v))); - return clonedMap; - } - if (value instanceof Set) { - const clonedSet = new Map(); - objectCloneMemo.set(value, clonedSet); - value.forEach((v, k) => clonedSet.set(k, cloneValue(v))); - return clonedSet; - } - - // generic object - const clonedObj = {} as any; - objectCloneMemo.set(value, clonedObj); - const sourceKeys = Object.getOwnPropertyNames(value); - for (const key of sourceKeys) { - clonedObj[key] = cloneValue(value[key]); - } - return clonedObj; - } - case "symbol": - case "function": - default: - // TODO this should be a DOMException, - // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/shared-internals.ts#L171 - throw new Error("Uncloneable value in stream"); - } -} - -export function promiseCall<F extends Function>( - f: F, - v: object | undefined, - args: any[] -): Promise<any> { - // tslint:disable-line:ban-types - try { - const result = Function.prototype.apply.call(f, v, args); - return Promise.resolve(result); - } catch (err) { - return Promise.reject(err); - } -} - -export function createAlgorithmFromUnderlyingMethod< - O extends object, - K extends keyof O ->(obj: O, methodName: K, extraArgs: any[]): any { - const method = obj[methodName]; - if (method === undefined) { - return (): any => Promise.resolve(undefined); - } - if (typeof method !== "function") { - throw new TypeError(`Field "${methodName}" is not a function.`); - } - return function(...fnArgs: any[]): any { - return promiseCall(method, obj, fnArgs.concat(extraArgs)); - }; -} - -/* -Deprecated for now, all usages replaced by readableStreamCreateReadResult - -function createIterResultObject<T>(value: T, done: boolean): IteratorResult<T> { - return { value, done }; -} -*/ - -export function validateAndNormalizeHighWaterMark(hwm: unknown): number { - const highWaterMark = Number(hwm); - if (isNaN(highWaterMark) || highWaterMark < 0) { - throw new RangeError( - "highWaterMark must be a valid, non-negative integer." - ); - } - return highWaterMark; -} - -export function makeSizeAlgorithmFromSizeFunction<T>( - sizeFn: undefined | ((chunk: T) => number) -): QueuingStrategySizeCallback<T> { - if (typeof sizeFn !== "function" && typeof sizeFn !== "undefined") { - throw new TypeError("size function must be undefined or a function"); - } - return function(chunk: T): number { - if (typeof sizeFn === "function") { - return sizeFn(chunk); - } - return 1; - }; -} - -// ---- - -export const enum ControlledPromiseState { - Pending, - Resolved, - Rejected -} - -export interface ControlledPromise<V> { - resolve(value?: V): void; - reject(error: ErrorResult): void; - promise: Promise<V>; - state: ControlledPromiseState; -} - -export function createControlledPromise<V>(): ControlledPromise<V> { - const conProm = { - state: ControlledPromiseState.Pending - } as ControlledPromise<V>; - conProm.promise = new Promise<V>(function(resolve, reject) { - conProm.resolve = function(v?: V): void { - conProm.state = ControlledPromiseState.Resolved; - resolve(v); - }; - conProm.reject = function(e?: ErrorResult): void { - conProm.state = ControlledPromiseState.Rejected; - reject(e); - }; - }); - return conProm; -} diff --git a/cli/js/streams/strategies.ts b/cli/js/streams/strategies.ts deleted file mode 100644 index 5f7ffc632..000000000 --- a/cli/js/streams/strategies.ts +++ /dev/null @@ -1,39 +0,0 @@ -// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -/** - * streams/strategies - implementation of the built-in stream strategies - * Part of Stardazed - * (c) 2018-Present by Arthur Langereis - @zenmumbler - * https://github.com/stardazed/sd-streams - */ - -/* eslint-disable @typescript-eslint/no-explicit-any */ -// TODO reenable this lint here - -import { QueuingStrategy } from "../dom_types.ts"; - -export class ByteLengthQueuingStrategy - implements QueuingStrategy<ArrayBufferView> { - highWaterMark: number; - - constructor(options: { highWaterMark: number }) { - this.highWaterMark = options.highWaterMark; - } - - size(chunk: ArrayBufferView): number { - return chunk.byteLength; - } -} - -export class CountQueuingStrategy implements QueuingStrategy<any> { - highWaterMark: number; - - constructor(options: { highWaterMark: number }) { - this.highWaterMark = options.highWaterMark; - } - - size(): number { - return 1; - } -} diff --git a/cli/js/streams/transform-internals.ts b/cli/js/streams/transform-internals.ts deleted file mode 100644 index 4c5e3657d..000000000 --- a/cli/js/streams/transform-internals.ts +++ /dev/null @@ -1,371 +0,0 @@ -// TODO reenable this code when we enable writableStreams and transport types -// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -// /** -// * streams/transform-internals - internal types and functions for transform streams -// * Part of Stardazed -// * (c) 2018-Present by Arthur Langereis - @zenmumbler -// * https://github.com/stardazed/sd-streams -// */ - -// /* eslint-disable @typescript-eslint/no-explicit-any */ -// // TODO reenable this lint here - -// import * as rs from "./readable-internals.ts"; -// import * as ws from "./writable-internals.ts"; -// import * as shared from "./shared-internals.ts"; - -// import { createReadableStream } from "./readable-stream.ts"; -// import { createWritableStream } from "./writable-stream.ts"; - -// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.ts"; - -// export const state_ = Symbol("transformState_"); -// export const backpressure_ = Symbol("backpressure_"); -// export const backpressureChangePromise_ = Symbol("backpressureChangePromise_"); -// export const readable_ = Symbol("readable_"); -// export const transformStreamController_ = Symbol("transformStreamController_"); -// export const writable_ = Symbol("writable_"); - -// export const controlledTransformStream_ = Symbol("controlledTransformStream_"); -// export const flushAlgorithm_ = Symbol("flushAlgorithm_"); -// export const transformAlgorithm_ = Symbol("transformAlgorithm_"); - -// // ---- - -// export type TransformFunction<InputType, OutputType> = ( -// chunk: InputType, -// controller: TransformStreamDefaultController<InputType, OutputType> -// ) => void | PromiseLike<void>; -// export type TransformAlgorithm<InputType> = (chunk: InputType) => Promise<void>; -// export type FlushFunction<InputType, OutputType> = ( -// controller: TransformStreamDefaultController<InputType, OutputType> -// ) => void | PromiseLike<void>; -// export type FlushAlgorithm = () => Promise<void>; - -// // ---- - -// export interface TransformStreamDefaultController<InputType, OutputType> { -// readonly desiredSize: number | null; -// enqueue(chunk: OutputType): void; -// error(reason: shared.ErrorResult): void; -// terminate(): void; - -// [controlledTransformStream_]: TransformStream<InputType, OutputType>; // The TransformStream instance controlled; also used for the IsTransformStreamDefaultController brand check -// [flushAlgorithm_]: FlushAlgorithm; // A promise - returning algorithm which communicates a requested close to the transformer -// [transformAlgorithm_]: TransformAlgorithm<InputType>; // A promise - returning algorithm, taking one argument(the chunk to transform), which requests the transformer perform its transformation -// } - -// export interface Transformer<InputType, OutputType> { -// start?( -// controller: TransformStreamDefaultController<InputType, OutputType> -// ): void | PromiseLike<void>; -// transform?: TransformFunction<InputType, OutputType>; -// flush?: FlushFunction<InputType, OutputType>; - -// readableType?: undefined; // for future spec changes -// writableType?: undefined; // for future spec changes -// } - -// export declare class TransformStream<InputType, OutputType> { -// constructor( -// transformer: Transformer<InputType, OutputType>, -// writableStrategy: QueuingStrategy<InputType>, -// readableStrategy: QueuingStrategy<OutputType> -// ); - -// readonly readable: rs.SDReadableStream<OutputType>; -// readonly writable: ws.WritableStream<InputType>; - -// [backpressure_]: boolean | undefined; // Whether there was backpressure on [[readable]] the last time it was observed -// [backpressureChangePromise_]: shared.ControlledPromise<void> | undefined; // A promise which is fulfilled and replaced every time the value of[[backpressure]] changes -// [readable_]: rs.SDReadableStream<OutputType>; // The ReadableStream instance controlled by this object -// [transformStreamController_]: TransformStreamDefaultController< -// InputType, -// OutputType -// >; // A TransformStreamDefaultController created with the ability to control[[readable]] and[[writable]]; also used for the IsTransformStream brand check -// [writable_]: ws.WritableStream<InputType>; // The WritableStream instance controlled by this object -// } - -// // ---- TransformStream - -// export function isTransformStream( -// value: unknown -// ): value is TransformStream<any, any> { -// if (typeof value !== "object" || value === null) { -// return false; -// } -// return transformStreamController_ in value; -// } - -// export function initializeTransformStream<InputType, OutputType>( -// stream: TransformStream<InputType, OutputType>, -// startPromise: Promise<void>, -// writableHighWaterMark: number, -// writableSizeAlgorithm: QueuingStrategySizeCallback<InputType>, -// readableHighWaterMark: number, -// readableSizeAlgorithm: QueuingStrategySizeCallback<OutputType> -// ): void { -// const startAlgorithm = function(): Promise<void> { -// return startPromise; -// }; -// const writeAlgorithm = function(chunk: InputType): Promise<void> { -// return transformStreamDefaultSinkWriteAlgorithm(stream, chunk); -// }; -// const abortAlgorithm = function(reason: shared.ErrorResult): Promise<void> { -// return transformStreamDefaultSinkAbortAlgorithm(stream, reason); -// }; -// const closeAlgorithm = function(): Promise<void> { -// return transformStreamDefaultSinkCloseAlgorithm(stream); -// }; -// stream[writable_] = createWritableStream<InputType>( -// startAlgorithm, -// writeAlgorithm, -// closeAlgorithm, -// abortAlgorithm, -// writableHighWaterMark, -// writableSizeAlgorithm -// ); - -// const pullAlgorithm = function(): Promise<void> { -// return transformStreamDefaultSourcePullAlgorithm(stream); -// }; -// const cancelAlgorithm = function( -// reason: shared.ErrorResult -// ): Promise<undefined> { -// transformStreamErrorWritableAndUnblockWrite(stream, reason); -// return Promise.resolve(undefined); -// }; -// stream[readable_] = createReadableStream( -// startAlgorithm, -// pullAlgorithm, -// cancelAlgorithm, -// readableHighWaterMark, -// readableSizeAlgorithm -// ); - -// stream[backpressure_] = undefined; -// stream[backpressureChangePromise_] = undefined; -// transformStreamSetBackpressure(stream, true); -// stream[transformStreamController_] = undefined!; // initialize slot for brand-check -// } - -// export function transformStreamError<InputType, OutputType>( -// stream: TransformStream<InputType, OutputType>, -// error: shared.ErrorResult -// ): void { -// rs.readableStreamDefaultControllerError( -// stream[readable_][ -// rs.readableStreamController_ -// ] as rs.SDReadableStreamDefaultController<OutputType>, -// error -// ); -// transformStreamErrorWritableAndUnblockWrite(stream, error); -// } - -// export function transformStreamErrorWritableAndUnblockWrite< -// InputType, -// OutputType -// >( -// stream: TransformStream<InputType, OutputType>, -// error: shared.ErrorResult -// ): void { -// transformStreamDefaultControllerClearAlgorithms( -// stream[transformStreamController_] -// ); -// ws.writableStreamDefaultControllerErrorIfNeeded( -// stream[writable_][ws.writableStreamController_]!, -// error -// ); -// if (stream[backpressure_]) { -// transformStreamSetBackpressure(stream, false); -// } -// } - -// export function transformStreamSetBackpressure<InputType, OutputType>( -// stream: TransformStream<InputType, OutputType>, -// backpressure: boolean -// ): void { -// // Assert: stream.[[backpressure]] is not backpressure. -// if (stream[backpressure_] !== undefined) { -// stream[backpressureChangePromise_]!.resolve(undefined); -// } -// stream[backpressureChangePromise_] = shared.createControlledPromise<void>(); -// stream[backpressure_] = backpressure; -// } - -// // ---- TransformStreamDefaultController - -// export function isTransformStreamDefaultController( -// value: unknown -// ): value is TransformStreamDefaultController<any, any> { -// if (typeof value !== "object" || value === null) { -// return false; -// } -// return controlledTransformStream_ in value; -// } - -// export function setUpTransformStreamDefaultController<InputType, OutputType>( -// stream: TransformStream<InputType, OutputType>, -// controller: TransformStreamDefaultController<InputType, OutputType>, -// transformAlgorithm: TransformAlgorithm<InputType>, -// flushAlgorithm: FlushAlgorithm -// ): void { -// // Assert: ! IsTransformStream(stream) is true. -// // Assert: stream.[[transformStreamController]] is undefined. -// controller[controlledTransformStream_] = stream; -// stream[transformStreamController_] = controller; -// controller[transformAlgorithm_] = transformAlgorithm; -// controller[flushAlgorithm_] = flushAlgorithm; -// } - -// export function transformStreamDefaultControllerClearAlgorithms< -// InputType, -// OutputType -// >(controller: TransformStreamDefaultController<InputType, OutputType>): void { -// // Use ! assertions to override type check here, this way we don't -// // have to perform type checks/assertions everywhere else. -// controller[transformAlgorithm_] = undefined!; -// controller[flushAlgorithm_] = undefined!; -// } - -// export function transformStreamDefaultControllerEnqueue<InputType, OutputType>( -// controller: TransformStreamDefaultController<InputType, OutputType>, -// chunk: OutputType -// ): void { -// const stream = controller[controlledTransformStream_]; -// const readableController = stream[readable_][ -// rs.readableStreamController_ -// ] as rs.SDReadableStreamDefaultController<OutputType>; -// if ( -// !rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController) -// ) { -// throw new TypeError(); -// } -// try { -// rs.readableStreamDefaultControllerEnqueue(readableController, chunk); -// } catch (error) { -// transformStreamErrorWritableAndUnblockWrite(stream, error); -// throw stream[readable_][shared.storedError_]; -// } -// const backpressure = rs.readableStreamDefaultControllerHasBackpressure( -// readableController -// ); -// if (backpressure !== stream[backpressure_]) { -// // Assert: backpressure is true. -// transformStreamSetBackpressure(stream, true); -// } -// } - -// export function transformStreamDefaultControllerError<InputType, OutputType>( -// controller: TransformStreamDefaultController<InputType, OutputType>, -// error: shared.ErrorResult -// ): void { -// transformStreamError(controller[controlledTransformStream_], error); -// } - -// export function transformStreamDefaultControllerPerformTransform< -// InputType, -// OutputType -// >( -// controller: TransformStreamDefaultController<InputType, OutputType>, -// chunk: InputType -// ): Promise<void> { -// const transformPromise = controller[transformAlgorithm_](chunk); -// return transformPromise.catch(error => { -// transformStreamError(controller[controlledTransformStream_], error); -// throw error; -// }); -// } - -// export function transformStreamDefaultControllerTerminate< -// InputType, -// OutputType -// >(controller: TransformStreamDefaultController<InputType, OutputType>): void { -// const stream = controller[controlledTransformStream_]; -// const readableController = stream[readable_][ -// rs.readableStreamController_ -// ] as rs.SDReadableStreamDefaultController<OutputType>; -// if (rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) { -// rs.readableStreamDefaultControllerClose(readableController); -// } -// const error = new TypeError("The transform stream has been terminated"); -// transformStreamErrorWritableAndUnblockWrite(stream, error); -// } - -// // ---- Transform Sinks - -// export function transformStreamDefaultSinkWriteAlgorithm<InputType, OutputType>( -// stream: TransformStream<InputType, OutputType>, -// chunk: InputType -// ): Promise<void> { -// // Assert: stream.[[writable]].[[state]] is "writable". -// const controller = stream[transformStreamController_]; -// if (stream[backpressure_]) { -// const backpressureChangePromise = stream[backpressureChangePromise_]!; -// // Assert: backpressureChangePromise is not undefined. -// return backpressureChangePromise.promise.then(_ => { -// const writable = stream[writable_]; -// const state = writable[shared.state_]; -// if (state === "erroring") { -// throw writable[shared.storedError_]; -// } -// // Assert: state is "writable". -// return transformStreamDefaultControllerPerformTransform( -// controller, -// chunk -// ); -// }); -// } -// return transformStreamDefaultControllerPerformTransform(controller, chunk); -// } - -// export function transformStreamDefaultSinkAbortAlgorithm<InputType, OutputType>( -// stream: TransformStream<InputType, OutputType>, -// reason: shared.ErrorResult -// ): Promise<void> { -// transformStreamError(stream, reason); -// return Promise.resolve(undefined); -// } - -// export function transformStreamDefaultSinkCloseAlgorithm<InputType, OutputType>( -// stream: TransformStream<InputType, OutputType> -// ): Promise<void> { -// const readable = stream[readable_]; -// const controller = stream[transformStreamController_]; -// const flushPromise = controller[flushAlgorithm_](); -// transformStreamDefaultControllerClearAlgorithms(controller); - -// return flushPromise.then( -// _ => { -// if (readable[shared.state_] === "errored") { -// throw readable[shared.storedError_]; -// } -// const readableController = readable[ -// rs.readableStreamController_ -// ] as rs.SDReadableStreamDefaultController<OutputType>; -// if ( -// rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController) -// ) { -// rs.readableStreamDefaultControllerClose(readableController); -// } -// }, -// error => { -// transformStreamError(stream, error); -// throw readable[shared.storedError_]; -// } -// ); -// } - -// // ---- Transform Sources - -// export function transformStreamDefaultSourcePullAlgorithm< -// InputType, -// OutputType -// >(stream: TransformStream<InputType, OutputType>): Promise<void> { -// // Assert: stream.[[backpressure]] is true. -// // Assert: stream.[[backpressureChangePromise]] is not undefined. -// transformStreamSetBackpressure(stream, false); -// return stream[backpressureChangePromise_]!.promise; -// } diff --git a/cli/js/streams/transform-stream-default-controller.ts b/cli/js/streams/transform-stream-default-controller.ts deleted file mode 100644 index 24a8d08fd..000000000 --- a/cli/js/streams/transform-stream-default-controller.ts +++ /dev/null @@ -1,58 +0,0 @@ -// TODO reenable this code when we enable writableStreams and transport types -// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -// /** -// * streams/transform-stream-default-controller - TransformStreamDefaultController class implementation -// * Part of Stardazed -// * (c) 2018-Present by Arthur Langereis - @zenmumbler -// * https://github.com/stardazed/sd-streams -// */ - -// import * as rs from "./readable-internals.ts"; -// import * as ts from "./transform-internals.ts"; -// import { ErrorResult } from "./shared-internals.ts"; - -// export class TransformStreamDefaultController<InputType, OutputType> -// implements ts.TransformStreamDefaultController<InputType, OutputType> { -// [ts.controlledTransformStream_]: ts.TransformStream<InputType, OutputType>; -// [ts.flushAlgorithm_]: ts.FlushAlgorithm; -// [ts.transformAlgorithm_]: ts.TransformAlgorithm<InputType>; - -// constructor() { -// throw new TypeError(); -// } - -// get desiredSize(): number | null { -// if (!ts.isTransformStreamDefaultController(this)) { -// throw new TypeError(); -// } -// const readableController = this[ts.controlledTransformStream_][ -// ts.readable_ -// ][rs.readableStreamController_] as rs.SDReadableStreamDefaultController< -// OutputType -// >; -// return rs.readableStreamDefaultControllerGetDesiredSize(readableController); -// } - -// enqueue(chunk: OutputType): void { -// if (!ts.isTransformStreamDefaultController(this)) { -// throw new TypeError(); -// } -// ts.transformStreamDefaultControllerEnqueue(this, chunk); -// } - -// error(reason: ErrorResult): void { -// if (!ts.isTransformStreamDefaultController(this)) { -// throw new TypeError(); -// } -// ts.transformStreamDefaultControllerError(this, reason); -// } - -// terminate(): void { -// if (!ts.isTransformStreamDefaultController(this)) { -// throw new TypeError(); -// } -// ts.transformStreamDefaultControllerTerminate(this); -// } -// } diff --git a/cli/js/streams/transform-stream.ts b/cli/js/streams/transform-stream.ts deleted file mode 100644 index 090f78135..000000000 --- a/cli/js/streams/transform-stream.ts +++ /dev/null @@ -1,147 +0,0 @@ -// TODO reenable this code when we enable writableStreams and transport types -// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -// /** -// * streams/transform-stream - TransformStream class implementation -// * Part of Stardazed -// * (c) 2018-Present by Arthur Langereis - @zenmumbler -// * https://github.com/stardazed/sd-streams -// */ - -// /* eslint-disable @typescript-eslint/no-explicit-any */ -// // TODO reenable this lint here - -// import * as rs from "./readable-internals.ts"; -// import * as ws from "./writable-internals.ts"; -// import * as ts from "./transform-internals.ts"; -// import * as shared from "./shared-internals.ts"; -// import { TransformStreamDefaultController } from "./transform-stream-default-controller.ts"; -// import { QueuingStrategy } from "../dom_types.ts"; - -// export class TransformStream<InputType, OutputType> { -// [ts.backpressure_]: boolean | undefined; // Whether there was backpressure on [[readable]] the last time it was observed -// [ts.backpressureChangePromise_]: shared.ControlledPromise<void>; // A promise which is fulfilled and replaced every time the value of[[backpressure]] changes -// [ts.readable_]: rs.SDReadableStream<OutputType>; // The ReadableStream instance controlled by this object -// [ts.transformStreamController_]: TransformStreamDefaultController< -// InputType, -// OutputType -// >; // A TransformStreamDefaultController created with the ability to control[[readable]] and[[writable]]; also used for the IsTransformStream brand check -// [ts.writable_]: ws.WritableStream<InputType>; // The WritableStream instance controlled by this object - -// constructor( -// transformer: ts.Transformer<InputType, OutputType> = {}, -// writableStrategy: QueuingStrategy<InputType> = {}, -// readableStrategy: QueuingStrategy<OutputType> = {} -// ) { -// const writableSizeFunction = writableStrategy.size; -// const writableHighWaterMark = writableStrategy.highWaterMark; -// const readableSizeFunction = readableStrategy.size; -// const readableHighWaterMark = readableStrategy.highWaterMark; - -// const writableType = transformer.writableType; -// if (writableType !== undefined) { -// throw new RangeError( -// "The transformer's `writableType` field must be undefined" -// ); -// } -// const writableSizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction( -// writableSizeFunction -// ); -// const writableHWM = shared.validateAndNormalizeHighWaterMark( -// writableHighWaterMark === undefined ? 1 : writableHighWaterMark -// ); - -// const readableType = transformer.readableType; -// if (readableType !== undefined) { -// throw new RangeError( -// "The transformer's `readableType` field must be undefined" -// ); -// } -// const readableSizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction( -// readableSizeFunction -// ); -// const readableHWM = shared.validateAndNormalizeHighWaterMark( -// readableHighWaterMark === undefined ? 0 : readableHighWaterMark -// ); - -// const startPromise = shared.createControlledPromise<void>(); -// ts.initializeTransformStream( -// this, -// startPromise.promise, -// writableHWM, -// writableSizeAlgorithm, -// readableHWM, -// readableSizeAlgorithm -// ); -// setUpTransformStreamDefaultControllerFromTransformer(this, transformer); - -// const startResult = shared.invokeOrNoop(transformer, "start", [ -// this[ts.transformStreamController_] -// ]); -// startPromise.resolve(startResult); -// } - -// get readable(): rs.SDReadableStream<OutputType> { -// if (!ts.isTransformStream(this)) { -// throw new TypeError(); -// } -// return this[ts.readable_]; -// } - -// get writable(): ws.WritableStream<InputType> { -// if (!ts.isTransformStream(this)) { -// throw new TypeError(); -// } -// return this[ts.writable_]; -// } -// } - -// function setUpTransformStreamDefaultControllerFromTransformer< -// InputType, -// OutputType -// >( -// stream: TransformStream<InputType, OutputType>, -// transformer: ts.Transformer<InputType, OutputType> -// ): void { -// const controller = Object.create( -// TransformStreamDefaultController.prototype -// ) as TransformStreamDefaultController<InputType, OutputType>; -// let transformAlgorithm: ts.TransformAlgorithm<InputType>; - -// const transformMethod = transformer.transform; -// if (transformMethod !== undefined) { -// if (typeof transformMethod !== "function") { -// throw new TypeError( -// "`transform` field of the transformer must be a function" -// ); -// } -// transformAlgorithm = (chunk: InputType): Promise<any> => -// shared.promiseCall(transformMethod, transformer, [chunk, controller]); -// } else { -// // use identity transform -// transformAlgorithm = function(chunk: InputType): Promise<void> { -// try { -// // OutputType and InputType are the same here -// ts.transformStreamDefaultControllerEnqueue( -// controller, -// (chunk as unknown) as OutputType -// ); -// } catch (error) { -// return Promise.reject(error); -// } -// return Promise.resolve(undefined); -// }; -// } -// const flushAlgorithm = shared.createAlgorithmFromUnderlyingMethod( -// transformer, -// "flush", -// [controller] -// ); -// ts.setUpTransformStreamDefaultController( -// stream, -// controller, -// transformAlgorithm, -// flushAlgorithm -// ); -// } diff --git a/cli/js/streams/writable-internals.ts b/cli/js/streams/writable-internals.ts deleted file mode 100644 index 78bb19a28..000000000 --- a/cli/js/streams/writable-internals.ts +++ /dev/null @@ -1,800 +0,0 @@ -// TODO reenable this code when we enable writableStreams and transport types -// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -// /** -// * streams/writable-internals - internal types and functions for writable streams -// * Part of Stardazed -// * (c) 2018-Present by Arthur Langereis - @zenmumbler -// * https://github.com/stardazed/sd-streams -// */ - -// /* eslint-disable @typescript-eslint/no-explicit-any */ -// // TODO reenable this lint here - -// import * as shared from "./shared-internals.ts"; -// import * as q from "./queue-mixin.ts"; - -// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.ts"; - -// export const backpressure_ = Symbol("backpressure_"); -// export const closeRequest_ = Symbol("closeRequest_"); -// export const inFlightWriteRequest_ = Symbol("inFlightWriteRequest_"); -// export const inFlightCloseRequest_ = Symbol("inFlightCloseRequest_"); -// export const pendingAbortRequest_ = Symbol("pendingAbortRequest_"); -// export const writableStreamController_ = Symbol("writableStreamController_"); -// export const writer_ = Symbol("writer_"); -// export const writeRequests_ = Symbol("writeRequests_"); - -// export const abortAlgorithm_ = Symbol("abortAlgorithm_"); -// export const closeAlgorithm_ = Symbol("closeAlgorithm_"); -// export const controlledWritableStream_ = Symbol("controlledWritableStream_"); -// export const started_ = Symbol("started_"); -// export const strategyHWM_ = Symbol("strategyHWM_"); -// export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_"); -// export const writeAlgorithm_ = Symbol("writeAlgorithm_"); - -// export const ownerWritableStream_ = Symbol("ownerWritableStream_"); -// export const closedPromise_ = Symbol("closedPromise_"); -// export const readyPromise_ = Symbol("readyPromise_"); - -// export const errorSteps_ = Symbol("errorSteps_"); -// export const abortSteps_ = Symbol("abortSteps_"); - -// export type StartFunction = ( -// controller: WritableStreamController -// ) => void | PromiseLike<void>; -// export type StartAlgorithm = () => Promise<void> | void; -// export type WriteFunction<InputType> = ( -// chunk: InputType, -// controller: WritableStreamController -// ) => void | PromiseLike<void>; -// export type WriteAlgorithm<InputType> = (chunk: InputType) => Promise<void>; -// export type CloseAlgorithm = () => Promise<void>; -// export type AbortAlgorithm = (reason?: shared.ErrorResult) => Promise<void>; - -// // ---- - -// export interface WritableStreamController { -// error(e?: shared.ErrorResult): void; - -// [errorSteps_](): void; -// [abortSteps_](reason: shared.ErrorResult): Promise<void>; -// } - -// export interface WriteRecord<InputType> { -// chunk: InputType; -// } - -// export interface WritableStreamDefaultController<InputType> -// extends WritableStreamController, -// q.QueueContainer<WriteRecord<InputType> | "close"> { -// [abortAlgorithm_]: AbortAlgorithm; // A promise - returning algorithm, taking one argument(the abort reason), which communicates a requested abort to the underlying sink -// [closeAlgorithm_]: CloseAlgorithm; // A promise - returning algorithm which communicates a requested close to the underlying sink -// [controlledWritableStream_]: WritableStream<InputType>; // The WritableStream instance controlled -// [started_]: boolean; // A boolean flag indicating whether the underlying sink has finished starting -// [strategyHWM_]: number; // A number supplied by the creator of the stream as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying sink -// [strategySizeAlgorithm_]: QueuingStrategySizeCallback<InputType>; // An algorithm to calculate the size of enqueued chunks, as part of the stream’s queuing strategy -// [writeAlgorithm_]: WriteAlgorithm<InputType>; // A promise-returning algorithm, taking one argument (the chunk to write), which writes data to the underlying sink -// } - -// // ---- - -// export interface WritableStreamWriter<InputType> { -// readonly closed: Promise<void>; -// readonly desiredSize: number | null; -// readonly ready: Promise<void>; - -// abort(reason: shared.ErrorResult): Promise<void>; -// close(): Promise<void>; -// releaseLock(): void; -// write(chunk: InputType): Promise<void>; -// } - -// export interface WritableStreamDefaultWriter<InputType> -// extends WritableStreamWriter<InputType> { -// [ownerWritableStream_]: WritableStream<InputType> | undefined; -// [closedPromise_]: shared.ControlledPromise<void>; -// [readyPromise_]: shared.ControlledPromise<void>; -// } - -// // ---- - -// export type WritableStreamState = -// | "writable" -// | "closed" -// | "erroring" -// | "errored"; - -// export interface WritableStreamSink<InputType> { -// start?: StartFunction; -// write?: WriteFunction<InputType>; -// close?(): void | PromiseLike<void>; -// abort?(reason?: shared.ErrorResult): void; - -// type?: undefined; // unused, for future revisions -// } - -// export interface AbortRequest { -// reason: shared.ErrorResult; -// wasAlreadyErroring: boolean; -// promise: Promise<void>; -// resolve(): void; -// reject(error: shared.ErrorResult): void; -// } - -// export declare class WritableStream<InputType> { -// constructor( -// underlyingSink?: WritableStreamSink<InputType>, -// strategy?: QueuingStrategy<InputType> -// ); - -// readonly locked: boolean; -// abort(reason?: shared.ErrorResult): Promise<void>; -// getWriter(): WritableStreamWriter<InputType>; - -// [shared.state_]: WritableStreamState; -// [backpressure_]: boolean; -// [closeRequest_]: shared.ControlledPromise<void> | undefined; -// [inFlightWriteRequest_]: shared.ControlledPromise<void> | undefined; -// [inFlightCloseRequest_]: shared.ControlledPromise<void> | undefined; -// [pendingAbortRequest_]: AbortRequest | undefined; -// [shared.storedError_]: shared.ErrorResult; -// [writableStreamController_]: -// | WritableStreamDefaultController<InputType> -// | undefined; -// [writer_]: WritableStreamDefaultWriter<InputType> | undefined; -// [writeRequests_]: Array<shared.ControlledPromise<void>>; -// } - -// // ---- Stream - -// export function initializeWritableStream<InputType>( -// stream: WritableStream<InputType> -// ): void { -// stream[shared.state_] = "writable"; -// stream[shared.storedError_] = undefined; -// stream[writer_] = undefined; -// stream[writableStreamController_] = undefined; -// stream[inFlightWriteRequest_] = undefined; -// stream[closeRequest_] = undefined; -// stream[inFlightCloseRequest_] = undefined; -// stream[pendingAbortRequest_] = undefined; -// stream[writeRequests_] = []; -// stream[backpressure_] = false; -// } - -// export function isWritableStream(value: unknown): value is WritableStream<any> { -// if (typeof value !== "object" || value === null) { -// return false; -// } -// return writableStreamController_ in value; -// } - -// export function isWritableStreamLocked<InputType>( -// stream: WritableStream<InputType> -// ): boolean { -// return stream[writer_] !== undefined; -// } - -// export function writableStreamAbort<InputType>( -// stream: WritableStream<InputType>, -// reason: shared.ErrorResult -// ): Promise<void> { -// const state = stream[shared.state_]; -// if (state === "closed" || state === "errored") { -// return Promise.resolve(undefined); -// } -// let pending = stream[pendingAbortRequest_]; -// if (pending !== undefined) { -// return pending.promise; -// } -// // Assert: state is "writable" or "erroring". -// let wasAlreadyErroring = false; -// if (state === "erroring") { -// wasAlreadyErroring = true; -// reason = undefined; -// } - -// pending = { -// reason, -// wasAlreadyErroring -// } as AbortRequest; -// const promise = new Promise<void>((resolve, reject) => { -// pending!.resolve = resolve; -// pending!.reject = reject; -// }); -// pending.promise = promise; -// stream[pendingAbortRequest_] = pending; -// if (!wasAlreadyErroring) { -// writableStreamStartErroring(stream, reason); -// } -// return promise; -// } - -// export function writableStreamAddWriteRequest<InputType>( -// stream: WritableStream<InputType> -// ): Promise<void> { -// // Assert: !IsWritableStreamLocked(stream) is true. -// // Assert: stream.[[state]] is "writable". -// const writePromise = shared.createControlledPromise<void>(); -// stream[writeRequests_].push(writePromise); -// return writePromise.promise; -// } - -// export function writableStreamDealWithRejection<InputType>( -// stream: WritableStream<InputType>, -// error: shared.ErrorResult -// ): void { -// const state = stream[shared.state_]; -// if (state === "writable") { -// writableStreamStartErroring(stream, error); -// return; -// } -// // Assert: state is "erroring" -// writableStreamFinishErroring(stream); -// } - -// export function writableStreamStartErroring<InputType>( -// stream: WritableStream<InputType>, -// reason: shared.ErrorResult -// ): void { -// // Assert: stream.[[storedError]] is undefined. -// // Assert: stream.[[state]] is "writable". -// const controller = stream[writableStreamController_]!; -// // Assert: controller is not undefined. -// stream[shared.state_] = "erroring"; -// stream[shared.storedError_] = reason; -// const writer = stream[writer_]; -// if (writer !== undefined) { -// writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); -// } -// if ( -// !writableStreamHasOperationMarkedInFlight(stream) && -// controller[started_] -// ) { -// writableStreamFinishErroring(stream); -// } -// } - -// export function writableStreamFinishErroring<InputType>( -// stream: WritableStream<InputType> -// ): void { -// // Assert: stream.[[state]] is "erroring". -// // Assert: writableStreamHasOperationMarkedInFlight(stream) is false. -// stream[shared.state_] = "errored"; -// const controller = stream[writableStreamController_]!; -// controller[errorSteps_](); -// const storedError = stream[shared.storedError_]; -// for (const writeRequest of stream[writeRequests_]) { -// writeRequest.reject(storedError); -// } -// stream[writeRequests_] = []; - -// const abortRequest = stream[pendingAbortRequest_]; -// if (abortRequest === undefined) { -// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); -// return; -// } -// stream[pendingAbortRequest_] = undefined; -// if (abortRequest.wasAlreadyErroring) { -// abortRequest.reject(storedError); -// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); -// return; -// } -// const promise = controller[abortSteps_](abortRequest.reason); -// promise.then( -// _ => { -// abortRequest.resolve(); -// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); -// }, -// error => { -// abortRequest.reject(error); -// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); -// } -// ); -// } - -// export function writableStreamFinishInFlightWrite<InputType>( -// stream: WritableStream<InputType> -// ): void { -// // Assert: stream.[[inFlightWriteRequest]] is not undefined. -// stream[inFlightWriteRequest_]!.resolve(undefined); -// stream[inFlightWriteRequest_] = undefined; -// } - -// export function writableStreamFinishInFlightWriteWithError<InputType>( -// stream: WritableStream<InputType>, -// error: shared.ErrorResult -// ): void { -// // Assert: stream.[[inFlightWriteRequest]] is not undefined. -// stream[inFlightWriteRequest_]!.reject(error); -// stream[inFlightWriteRequest_] = undefined; -// // Assert: stream.[[state]] is "writable" or "erroring". -// writableStreamDealWithRejection(stream, error); -// } - -// export function writableStreamFinishInFlightClose<InputType>( -// stream: WritableStream<InputType> -// ): void { -// // Assert: stream.[[inFlightCloseRequest]] is not undefined. -// stream[inFlightCloseRequest_]!.resolve(undefined); -// stream[inFlightCloseRequest_] = undefined; -// const state = stream[shared.state_]; -// // Assert: stream.[[state]] is "writable" or "erroring". -// if (state === "erroring") { -// stream[shared.storedError_] = undefined; -// if (stream[pendingAbortRequest_] !== undefined) { -// stream[pendingAbortRequest_]!.resolve(); -// stream[pendingAbortRequest_] = undefined; -// } -// } -// stream[shared.state_] = "closed"; -// const writer = stream[writer_]; -// if (writer !== undefined) { -// writer[closedPromise_].resolve(undefined); -// } -// // Assert: stream.[[pendingAbortRequest]] is undefined. -// // Assert: stream.[[storedError]] is undefined. -// } - -// export function writableStreamFinishInFlightCloseWithError<InputType>( -// stream: WritableStream<InputType>, -// error: shared.ErrorResult -// ): void { -// // Assert: stream.[[inFlightCloseRequest]] is not undefined. -// stream[inFlightCloseRequest_]!.reject(error); -// stream[inFlightCloseRequest_] = undefined; -// // Assert: stream.[[state]] is "writable" or "erroring". -// if (stream[pendingAbortRequest_] !== undefined) { -// stream[pendingAbortRequest_]!.reject(error); -// stream[pendingAbortRequest_] = undefined; -// } -// writableStreamDealWithRejection(stream, error); -// } - -// export function writableStreamCloseQueuedOrInFlight<InputType>( -// stream: WritableStream<InputType> -// ): boolean { -// return ( -// stream[closeRequest_] !== undefined || -// stream[inFlightCloseRequest_] !== undefined -// ); -// } - -// export function writableStreamHasOperationMarkedInFlight<InputType>( -// stream: WritableStream<InputType> -// ): boolean { -// return ( -// stream[inFlightWriteRequest_] !== undefined || -// stream[inFlightCloseRequest_] !== undefined -// ); -// } - -// export function writableStreamMarkCloseRequestInFlight<InputType>( -// stream: WritableStream<InputType> -// ): void { -// // Assert: stream.[[inFlightCloseRequest]] is undefined. -// // Assert: stream.[[closeRequest]] is not undefined. -// stream[inFlightCloseRequest_] = stream[closeRequest_]; -// stream[closeRequest_] = undefined; -// } - -// export function writableStreamMarkFirstWriteRequestInFlight<InputType>( -// stream: WritableStream<InputType> -// ): void { -// // Assert: stream.[[inFlightWriteRequest]] is undefined. -// // Assert: stream.[[writeRequests]] is not empty. -// const writeRequest = stream[writeRequests_].shift()!; -// stream[inFlightWriteRequest_] = writeRequest; -// } - -// export function writableStreamRejectCloseAndClosedPromiseIfNeeded<InputType>( -// stream: WritableStream<InputType> -// ): void { -// // Assert: stream.[[state]] is "errored". -// const closeRequest = stream[closeRequest_]; -// if (closeRequest !== undefined) { -// // Assert: stream.[[inFlightCloseRequest]] is undefined. -// closeRequest.reject(stream[shared.storedError_]); -// stream[closeRequest_] = undefined; -// } -// const writer = stream[writer_]; -// if (writer !== undefined) { -// writer[closedPromise_].reject(stream[shared.storedError_]); -// writer[closedPromise_].promise.catch(() => {}); -// } -// } - -// export function writableStreamUpdateBackpressure<InputType>( -// stream: WritableStream<InputType>, -// backpressure: boolean -// ): void { -// // Assert: stream.[[state]] is "writable". -// // Assert: !WritableStreamCloseQueuedOrInFlight(stream) is false. -// const writer = stream[writer_]; -// if (writer !== undefined && backpressure !== stream[backpressure_]) { -// if (backpressure) { -// writer[readyPromise_] = shared.createControlledPromise<void>(); -// } else { -// writer[readyPromise_].resolve(undefined); -// } -// } -// stream[backpressure_] = backpressure; -// } - -// // ---- Writers - -// export function isWritableStreamDefaultWriter( -// value: unknown -// ): value is WritableStreamDefaultWriter<any> { -// if (typeof value !== "object" || value === null) { -// return false; -// } -// return ownerWritableStream_ in value; -// } - -// export function writableStreamDefaultWriterAbort<InputType>( -// writer: WritableStreamDefaultWriter<InputType>, -// reason: shared.ErrorResult -// ): Promise<void> { -// const stream = writer[ownerWritableStream_]!; -// // Assert: stream is not undefined. -// return writableStreamAbort(stream, reason); -// } - -// export function writableStreamDefaultWriterClose<InputType>( -// writer: WritableStreamDefaultWriter<InputType> -// ): Promise<void> { -// const stream = writer[ownerWritableStream_]!; -// // Assert: stream is not undefined. -// const state = stream[shared.state_]; -// if (state === "closed" || state === "errored") { -// return Promise.reject( -// new TypeError("Writer stream is already closed or errored") -// ); -// } -// // Assert: state is "writable" or "erroring". -// // Assert: writableStreamCloseQueuedOrInFlight(stream) is false. -// const closePromise = shared.createControlledPromise<void>(); -// stream[closeRequest_] = closePromise; -// if (stream[backpressure_] && state === "writable") { -// writer[readyPromise_].resolve(undefined); -// } -// writableStreamDefaultControllerClose(stream[writableStreamController_]!); -// return closePromise.promise; -// } - -// export function writableStreamDefaultWriterCloseWithErrorPropagation<InputType>( -// writer: WritableStreamDefaultWriter<InputType> -// ): Promise<void> { -// const stream = writer[ownerWritableStream_]!; -// // Assert: stream is not undefined. -// const state = stream[shared.state_]; -// if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") { -// return Promise.resolve(undefined); -// } -// if (state === "errored") { -// return Promise.reject(stream[shared.storedError_]); -// } -// // Assert: state is "writable" or "erroring". -// return writableStreamDefaultWriterClose(writer); -// } - -// export function writableStreamDefaultWriterEnsureClosedPromiseRejected< -// InputType -// >( -// writer: WritableStreamDefaultWriter<InputType>, -// error: shared.ErrorResult -// ): void { -// const closedPromise = writer[closedPromise_]; -// if (closedPromise.state === shared.ControlledPromiseState.Pending) { -// closedPromise.reject(error); -// } else { -// writer[closedPromise_] = shared.createControlledPromise<void>(); -// writer[closedPromise_].reject(error); -// } -// writer[closedPromise_].promise.catch(() => {}); -// } - -// export function writableStreamDefaultWriterEnsureReadyPromiseRejected< -// InputType -// >( -// writer: WritableStreamDefaultWriter<InputType>, -// error: shared.ErrorResult -// ): void { -// const readyPromise = writer[readyPromise_]; -// if (readyPromise.state === shared.ControlledPromiseState.Pending) { -// readyPromise.reject(error); -// } else { -// writer[readyPromise_] = shared.createControlledPromise<void>(); -// writer[readyPromise_].reject(error); -// } -// writer[readyPromise_].promise.catch(() => {}); -// } - -// export function writableStreamDefaultWriterGetDesiredSize<InputType>( -// writer: WritableStreamDefaultWriter<InputType> -// ): number | null { -// const stream = writer[ownerWritableStream_]!; -// const state = stream[shared.state_]; -// if (state === "errored" || state === "erroring") { -// return null; -// } -// if (state === "closed") { -// return 0; -// } -// return writableStreamDefaultControllerGetDesiredSize( -// stream[writableStreamController_]! -// ); -// } - -// export function writableStreamDefaultWriterRelease<InputType>( -// writer: WritableStreamDefaultWriter<InputType> -// ): void { -// const stream = writer[ownerWritableStream_]!; -// // Assert: stream is not undefined. -// // Assert: stream.[[writer]] is writer. -// const releasedError = new TypeError(); -// writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError); -// writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError); -// stream[writer_] = undefined; -// writer[ownerWritableStream_] = undefined; -// } - -// export function writableStreamDefaultWriterWrite<InputType>( -// writer: WritableStreamDefaultWriter<InputType>, -// chunk: InputType -// ): Promise<void> { -// const stream = writer[ownerWritableStream_]!; -// // Assert: stream is not undefined. -// const controller = stream[writableStreamController_]!; -// const chunkSize = writableStreamDefaultControllerGetChunkSize( -// controller, -// chunk -// ); -// if (writer[ownerWritableStream_] !== stream) { -// return Promise.reject(new TypeError()); -// } -// const state = stream[shared.state_]; -// if (state === "errored") { -// return Promise.reject(stream[shared.storedError_]); -// } -// if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") { -// return Promise.reject( -// new TypeError("Cannot write to a closing or closed stream") -// ); -// } -// if (state === "erroring") { -// return Promise.reject(stream[shared.storedError_]); -// } -// // Assert: state is "writable". -// const promise = writableStreamAddWriteRequest(stream); -// writableStreamDefaultControllerWrite(controller, chunk, chunkSize); -// return promise; -// } - -// // ---- Controller - -// export function setUpWritableStreamDefaultController<InputType>( -// stream: WritableStream<InputType>, -// controller: WritableStreamDefaultController<InputType>, -// startAlgorithm: StartAlgorithm, -// writeAlgorithm: WriteAlgorithm<InputType>, -// closeAlgorithm: CloseAlgorithm, -// abortAlgorithm: AbortAlgorithm, -// highWaterMark: number, -// sizeAlgorithm: QueuingStrategySizeCallback<InputType> -// ): void { -// if (!isWritableStream(stream)) { -// throw new TypeError(); -// } -// if (stream[writableStreamController_] !== undefined) { -// throw new TypeError(); -// } - -// controller[controlledWritableStream_] = stream; -// stream[writableStreamController_] = controller; -// q.resetQueue(controller); -// controller[started_] = false; -// controller[strategySizeAlgorithm_] = sizeAlgorithm; -// controller[strategyHWM_] = highWaterMark; -// controller[writeAlgorithm_] = writeAlgorithm; -// controller[closeAlgorithm_] = closeAlgorithm; -// controller[abortAlgorithm_] = abortAlgorithm; -// const backpressure = writableStreamDefaultControllerGetBackpressure( -// controller -// ); -// writableStreamUpdateBackpressure(stream, backpressure); - -// const startResult = startAlgorithm(); -// Promise.resolve(startResult).then( -// _ => { -// // Assert: stream.[[state]] is "writable" or "erroring". -// controller[started_] = true; -// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); -// }, -// error => { -// // Assert: stream.[[state]] is "writable" or "erroring". -// controller[started_] = true; -// writableStreamDealWithRejection(stream, error); -// } -// ); -// } - -// export function isWritableStreamDefaultController( -// value: unknown -// ): value is WritableStreamDefaultController<any> { -// if (typeof value !== "object" || value === null) { -// return false; -// } -// return controlledWritableStream_ in value; -// } - -// export function writableStreamDefaultControllerClearAlgorithms<InputType>( -// controller: WritableStreamDefaultController<InputType> -// ): void { -// // Use ! assertions to override type check here, this way we don't -// // have to perform type checks/assertions everywhere else. -// controller[writeAlgorithm_] = undefined!; -// controller[closeAlgorithm_] = undefined!; -// controller[abortAlgorithm_] = undefined!; -// controller[strategySizeAlgorithm_] = undefined!; -// } - -// export function writableStreamDefaultControllerClose<InputType>( -// controller: WritableStreamDefaultController<InputType> -// ): void { -// q.enqueueValueWithSize(controller, "close", 0); -// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); -// } - -// export function writableStreamDefaultControllerGetChunkSize<InputType>( -// controller: WritableStreamDefaultController<InputType>, -// chunk: InputType -// ): number { -// let chunkSize: number; -// try { -// chunkSize = controller[strategySizeAlgorithm_](chunk); -// } catch (error) { -// writableStreamDefaultControllerErrorIfNeeded(controller, error); -// chunkSize = 1; -// } -// return chunkSize; -// } - -// export function writableStreamDefaultControllerGetDesiredSize<InputType>( -// controller: WritableStreamDefaultController<InputType> -// ): number { -// return controller[strategyHWM_] - controller[q.queueTotalSize_]; -// } - -// export function writableStreamDefaultControllerWrite<InputType>( -// controller: WritableStreamDefaultController<InputType>, -// chunk: InputType, -// chunkSize: number -// ): void { -// try { -// q.enqueueValueWithSize(controller, { chunk }, chunkSize); -// } catch (error) { -// writableStreamDefaultControllerErrorIfNeeded(controller, error); -// return; -// } -// const stream = controller[controlledWritableStream_]; -// if ( -// !writableStreamCloseQueuedOrInFlight(stream) && -// stream[shared.state_] === "writable" -// ) { -// const backpressure = writableStreamDefaultControllerGetBackpressure( -// controller -// ); -// writableStreamUpdateBackpressure(stream, backpressure); -// } -// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); -// } - -// export function writableStreamDefaultControllerAdvanceQueueIfNeeded<InputType>( -// controller: WritableStreamDefaultController<InputType> -// ): void { -// if (!controller[started_]) { -// return; -// } -// const stream = controller[controlledWritableStream_]; -// if (stream[inFlightWriteRequest_] !== undefined) { -// return; -// } -// const state = stream[shared.state_]; -// if (state === "closed" || state === "errored") { -// return; -// } -// if (state === "erroring") { -// writableStreamFinishErroring(stream); -// return; -// } -// if (controller[q.queue_].length === 0) { -// return; -// } -// const writeRecord = q.peekQueueValue(controller); -// if (writeRecord === "close") { -// writableStreamDefaultControllerProcessClose(controller); -// } else { -// writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk); -// } -// } - -// export function writableStreamDefaultControllerErrorIfNeeded<InputType>( -// controller: WritableStreamDefaultController<InputType>, -// error: shared.ErrorResult -// ): void { -// if (controller[controlledWritableStream_][shared.state_] === "writable") { -// writableStreamDefaultControllerError(controller, error); -// } -// } - -// export function writableStreamDefaultControllerProcessClose<InputType>( -// controller: WritableStreamDefaultController<InputType> -// ): void { -// const stream = controller[controlledWritableStream_]; -// writableStreamMarkCloseRequestInFlight(stream); -// q.dequeueValue(controller); -// // Assert: controller.[[queue]] is empty. -// const sinkClosePromise = controller[closeAlgorithm_](); -// writableStreamDefaultControllerClearAlgorithms(controller); -// sinkClosePromise.then( -// _ => { -// writableStreamFinishInFlightClose(stream); -// }, -// error => { -// writableStreamFinishInFlightCloseWithError(stream, error); -// } -// ); -// } - -// export function writableStreamDefaultControllerProcessWrite<InputType>( -// controller: WritableStreamDefaultController<InputType>, -// chunk: InputType -// ): void { -// const stream = controller[controlledWritableStream_]; -// writableStreamMarkFirstWriteRequestInFlight(stream); -// controller[writeAlgorithm_](chunk).then( -// _ => { -// writableStreamFinishInFlightWrite(stream); -// const state = stream[shared.state_]; -// // Assert: state is "writable" or "erroring". -// q.dequeueValue(controller); -// if ( -// !writableStreamCloseQueuedOrInFlight(stream) && -// state === "writable" -// ) { -// const backpressure = writableStreamDefaultControllerGetBackpressure( -// controller -// ); -// writableStreamUpdateBackpressure(stream, backpressure); -// } -// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); -// }, -// error => { -// if (stream[shared.state_] === "writable") { -// writableStreamDefaultControllerClearAlgorithms(controller); -// } -// writableStreamFinishInFlightWriteWithError(stream, error); -// } -// ); -// } - -// export function writableStreamDefaultControllerGetBackpressure<InputType>( -// controller: WritableStreamDefaultController<InputType> -// ): boolean { -// const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller); -// return desiredSize <= 0; -// } - -// export function writableStreamDefaultControllerError<InputType>( -// controller: WritableStreamDefaultController<InputType>, -// error: shared.ErrorResult -// ): void { -// const stream = controller[controlledWritableStream_]; -// // Assert: stream.[[state]] is "writable". -// writableStreamDefaultControllerClearAlgorithms(controller); -// writableStreamStartErroring(stream, error); -// } diff --git a/cli/js/streams/writable-stream-default-controller.ts b/cli/js/streams/writable-stream-default-controller.ts deleted file mode 100644 index 57ffe08fd..000000000 --- a/cli/js/streams/writable-stream-default-controller.ts +++ /dev/null @@ -1,101 +0,0 @@ -// TODO reenable this code when we enable writableStreams and transport types -// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -// /** -// * streams/writable-stream-default-controller - WritableStreamDefaultController class implementation -// * Part of Stardazed -// * (c) 2018-Present by Arthur Langereis - @zenmumbler -// * https://github.com/stardazed/sd-streams -// */ - -// /* eslint-disable @typescript-eslint/no-explicit-any */ -// // TODO reenable this lint here - -// import * as ws from "./writable-internals.ts"; -// import * as shared from "./shared-internals.ts"; -// import * as q from "./queue-mixin.ts"; -// import { Queue } from "./queue.ts"; -// import { QueuingStrategySizeCallback } from "../dom_types.ts"; - -// export class WritableStreamDefaultController<InputType> -// implements ws.WritableStreamDefaultController<InputType> { -// [ws.abortAlgorithm_]: ws.AbortAlgorithm; -// [ws.closeAlgorithm_]: ws.CloseAlgorithm; -// [ws.controlledWritableStream_]: ws.WritableStream<InputType>; -// [ws.started_]: boolean; -// [ws.strategyHWM_]: number; -// [ws.strategySizeAlgorithm_]: QueuingStrategySizeCallback<InputType>; -// [ws.writeAlgorithm_]: ws.WriteAlgorithm<InputType>; - -// [q.queue_]: Queue<q.QueueElement<ws.WriteRecord<InputType> | "close">>; -// [q.queueTotalSize_]: number; - -// constructor() { -// throw new TypeError(); -// } - -// error(e?: shared.ErrorResult): void { -// if (!ws.isWritableStreamDefaultController(this)) { -// throw new TypeError(); -// } -// const state = this[ws.controlledWritableStream_][shared.state_]; -// if (state !== "writable") { -// return; -// } -// ws.writableStreamDefaultControllerError(this, e); -// } - -// [ws.abortSteps_](reason: shared.ErrorResult): Promise<void> { -// const result = this[ws.abortAlgorithm_](reason); -// ws.writableStreamDefaultControllerClearAlgorithms(this); -// return result; -// } - -// [ws.errorSteps_](): void { -// q.resetQueue(this); -// } -// } - -// export function setUpWritableStreamDefaultControllerFromUnderlyingSink< -// InputType -// >( -// stream: ws.WritableStream<InputType>, -// underlyingSink: ws.WritableStreamSink<InputType>, -// highWaterMark: number, -// sizeAlgorithm: QueuingStrategySizeCallback<InputType> -// ): void { -// // Assert: underlyingSink is not undefined. -// const controller = Object.create( -// WritableStreamDefaultController.prototype -// ) as WritableStreamDefaultController<InputType>; - -// const startAlgorithm = function(): any { -// return shared.invokeOrNoop(underlyingSink, "start", [controller]); -// }; -// const writeAlgorithm = shared.createAlgorithmFromUnderlyingMethod( -// underlyingSink, -// "write", -// [controller] -// ); -// const closeAlgorithm = shared.createAlgorithmFromUnderlyingMethod( -// underlyingSink, -// "close", -// [] -// ); -// const abortAlgorithm = shared.createAlgorithmFromUnderlyingMethod( -// underlyingSink, -// "abort", -// [] -// ); -// ws.setUpWritableStreamDefaultController( -// stream, -// controller, -// startAlgorithm, -// writeAlgorithm, -// closeAlgorithm, -// abortAlgorithm, -// highWaterMark, -// sizeAlgorithm -// ); -// } diff --git a/cli/js/streams/writable-stream-default-writer.ts b/cli/js/streams/writable-stream-default-writer.ts deleted file mode 100644 index f38aa26bb..000000000 --- a/cli/js/streams/writable-stream-default-writer.ts +++ /dev/null @@ -1,136 +0,0 @@ -// TODO reenable this code when we enable writableStreams and transport types -// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -// /** -// * streams/writable-stream-default-writer - WritableStreamDefaultWriter class implementation -// * Part of Stardazed -// * (c) 2018-Present by Arthur Langereis - @zenmumbler -// * https://github.com/stardazed/sd-streams -// */ - -// import * as ws from "./writable-internals.ts"; -// import * as shared from "./shared-internals.ts"; - -// export class WritableStreamDefaultWriter<InputType> -// implements ws.WritableStreamDefaultWriter<InputType> { -// [ws.ownerWritableStream_]: ws.WritableStream<InputType> | undefined; -// [ws.readyPromise_]: shared.ControlledPromise<void>; -// [ws.closedPromise_]: shared.ControlledPromise<void>; - -// constructor(stream: ws.WritableStream<InputType>) { -// if (!ws.isWritableStream(stream)) { -// throw new TypeError(); -// } -// if (ws.isWritableStreamLocked(stream)) { -// throw new TypeError("Stream is already locked"); -// } -// this[ws.ownerWritableStream_] = stream; -// stream[ws.writer_] = this; - -// const readyPromise = shared.createControlledPromise<void>(); -// const closedPromise = shared.createControlledPromise<void>(); -// this[ws.readyPromise_] = readyPromise; -// this[ws.closedPromise_] = closedPromise; - -// const state = stream[shared.state_]; -// if (state === "writable") { -// if ( -// !ws.writableStreamCloseQueuedOrInFlight(stream) && -// stream[ws.backpressure_] -// ) { -// // OK Set this.[[readyPromise]] to a new promise. -// } else { -// readyPromise.resolve(undefined); -// } -// // OK Set this.[[closedPromise]] to a new promise. -// } else if (state === "erroring") { -// readyPromise.reject(stream[shared.storedError_]); -// readyPromise.promise.catch(() => {}); -// // OK Set this.[[closedPromise]] to a new promise. -// } else if (state === "closed") { -// readyPromise.resolve(undefined); -// closedPromise.resolve(undefined); -// } else { -// // Assert: state is "errored". -// const storedError = stream[shared.storedError_]; -// readyPromise.reject(storedError); -// readyPromise.promise.catch(() => {}); -// closedPromise.reject(storedError); -// closedPromise.promise.catch(() => {}); -// } -// } - -// abort(reason: shared.ErrorResult): Promise<void> { -// if (!ws.isWritableStreamDefaultWriter(this)) { -// return Promise.reject(new TypeError()); -// } -// if (this[ws.ownerWritableStream_] === undefined) { -// return Promise.reject( -// new TypeError("Writer is not connected to a stream") -// ); -// } -// return ws.writableStreamDefaultWriterAbort(this, reason); -// } - -// close(): Promise<void> { -// if (!ws.isWritableStreamDefaultWriter(this)) { -// return Promise.reject(new TypeError()); -// } -// const stream = this[ws.ownerWritableStream_]; -// if (stream === undefined) { -// return Promise.reject( -// new TypeError("Writer is not connected to a stream") -// ); -// } -// if (ws.writableStreamCloseQueuedOrInFlight(stream)) { -// return Promise.reject(new TypeError()); -// } -// return ws.writableStreamDefaultWriterClose(this); -// } - -// releaseLock(): void { -// const stream = this[ws.ownerWritableStream_]; -// if (stream === undefined) { -// return; -// } -// // Assert: stream.[[writer]] is not undefined. -// ws.writableStreamDefaultWriterRelease(this); -// } - -// write(chunk: InputType): Promise<void> { -// if (!ws.isWritableStreamDefaultWriter(this)) { -// return Promise.reject(new TypeError()); -// } -// if (this[ws.ownerWritableStream_] === undefined) { -// return Promise.reject( -// new TypeError("Writer is not connected to a stream") -// ); -// } -// return ws.writableStreamDefaultWriterWrite(this, chunk); -// } - -// get closed(): Promise<void> { -// if (!ws.isWritableStreamDefaultWriter(this)) { -// return Promise.reject(new TypeError()); -// } -// return this[ws.closedPromise_].promise; -// } - -// get desiredSize(): number | null { -// if (!ws.isWritableStreamDefaultWriter(this)) { -// throw new TypeError(); -// } -// if (this[ws.ownerWritableStream_] === undefined) { -// throw new TypeError("Writer is not connected to stream"); -// } -// return ws.writableStreamDefaultWriterGetDesiredSize(this); -// } - -// get ready(): Promise<void> { -// if (!ws.isWritableStreamDefaultWriter(this)) { -// return Promise.reject(new TypeError()); -// } -// return this[ws.readyPromise_].promise; -// } -// } diff --git a/cli/js/streams/writable-stream.ts b/cli/js/streams/writable-stream.ts deleted file mode 100644 index a6131c5d0..000000000 --- a/cli/js/streams/writable-stream.ts +++ /dev/null @@ -1,118 +0,0 @@ -// TODO reenable this code when we enable writableStreams and transport types -// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 -// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT - -// /** -// * streams/writable-stream - WritableStream class implementation -// * Part of Stardazed -// * (c) 2018-Present by Arthur Langereis - @zenmumbler -// * https://github.com/stardazed/sd-streams -// */ - -// import * as ws from "./writable-internals.ts"; -// import * as shared from "./shared-internals.ts"; -// import { -// WritableStreamDefaultController, -// setUpWritableStreamDefaultControllerFromUnderlyingSink -// } from "./writable-stream-default-controller.ts"; -// import { WritableStreamDefaultWriter } from "./writable-stream-default-writer.ts"; -// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.ts"; - -// export class WritableStream<InputType> { -// [shared.state_]: ws.WritableStreamState; -// [shared.storedError_]: shared.ErrorResult; -// [ws.backpressure_]: boolean; -// [ws.closeRequest_]: shared.ControlledPromise<void> | undefined; -// [ws.inFlightWriteRequest_]: shared.ControlledPromise<void> | undefined; -// [ws.inFlightCloseRequest_]: shared.ControlledPromise<void> | undefined; -// [ws.pendingAbortRequest_]: ws.AbortRequest | undefined; -// [ws.writableStreamController_]: -// | ws.WritableStreamDefaultController<InputType> -// | undefined; -// [ws.writer_]: ws.WritableStreamDefaultWriter<InputType> | undefined; -// [ws.writeRequests_]: Array<shared.ControlledPromise<void>>; - -// constructor( -// sink: ws.WritableStreamSink<InputType> = {}, -// strategy: QueuingStrategy<InputType> = {} -// ) { -// ws.initializeWritableStream(this); -// const sizeFunc = strategy.size; -// const stratHWM = strategy.highWaterMark; -// if (sink.type !== undefined) { -// throw new RangeError("The type of an underlying sink must be undefined"); -// } - -// const sizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(sizeFunc); -// const highWaterMark = shared.validateAndNormalizeHighWaterMark( -// stratHWM === undefined ? 1 : stratHWM -// ); - -// setUpWritableStreamDefaultControllerFromUnderlyingSink( -// this, -// sink, -// highWaterMark, -// sizeAlgorithm -// ); -// } - -// get locked(): boolean { -// if (!ws.isWritableStream(this)) { -// throw new TypeError(); -// } -// return ws.isWritableStreamLocked(this); -// } - -// abort(reason?: shared.ErrorResult): Promise<void> { -// if (!ws.isWritableStream(this)) { -// return Promise.reject(new TypeError()); -// } -// if (ws.isWritableStreamLocked(this)) { -// return Promise.reject(new TypeError("Cannot abort a locked stream")); -// } -// return ws.writableStreamAbort(this, reason); -// } - -// getWriter(): ws.WritableStreamWriter<InputType> { -// if (!ws.isWritableStream(this)) { -// throw new TypeError(); -// } -// return new WritableStreamDefaultWriter(this); -// } -// } - -// export function createWritableStream<InputType>( -// startAlgorithm: ws.StartAlgorithm, -// writeAlgorithm: ws.WriteAlgorithm<InputType>, -// closeAlgorithm: ws.CloseAlgorithm, -// abortAlgorithm: ws.AbortAlgorithm, -// highWaterMark?: number, -// sizeAlgorithm?: QueuingStrategySizeCallback<InputType> -// ): WritableStream<InputType> { -// if (highWaterMark === undefined) { -// highWaterMark = 1; -// } -// if (sizeAlgorithm === undefined) { -// sizeAlgorithm = (): number => 1; -// } -// // Assert: ! IsNonNegativeNumber(highWaterMark) is true. - -// const stream = Object.create(WritableStream.prototype) as WritableStream< -// InputType -// >; -// ws.initializeWritableStream(stream); -// const controller = Object.create( -// WritableStreamDefaultController.prototype -// ) as WritableStreamDefaultController<InputType>; -// ws.setUpWritableStreamDefaultController( -// stream, -// controller, -// startAlgorithm, -// writeAlgorithm, -// closeAlgorithm, -// abortAlgorithm, -// highWaterMark, -// sizeAlgorithm -// ); -// return stream; -// } |