summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/js/globals.ts4
-rw-r--r--cli/js/lib.deno.shared_globals.d.ts78
-rw-r--r--cli/js/web/blob.ts49
-rw-r--r--cli/js/web/body.ts27
-rw-r--r--cli/js/web/dom_types.d.ts147
-rw-r--r--cli/js/web/fetch.ts27
-rw-r--r--cli/js/web/request.ts8
-rw-r--r--cli/js/web/streams/internals.ts1098
-rw-r--r--cli/js/web/streams/mod.ts13
-rw-r--r--cli/js/web/streams/pipe-to.ts237
-rw-r--r--cli/js/web/streams/queue-mixin.ts77
-rw-r--r--cli/js/web/streams/queue.ts58
-rw-r--r--cli/js/web/streams/readable-byte-stream-controller.ts207
-rw-r--r--cli/js/web/streams/readable-internals.ts1350
-rw-r--r--cli/js/web/streams/readable-stream-byob-reader.ts86
-rw-r--r--cli/js/web/streams/readable-stream-byob-request.ts53
-rw-r--r--cli/js/web/streams/readable-stream-default-controller.ts135
-rw-r--r--cli/js/web/streams/readable-stream-default-reader.ts68
-rw-r--r--cli/js/web/streams/readable-stream.ts386
-rw-r--r--cli/js/web/streams/readable_byte_stream_controller.ts143
-rw-r--r--cli/js/web/streams/readable_stream.ts216
-rw-r--r--cli/js/web/streams/readable_stream_async_iterator.ts81
-rw-r--r--cli/js/web/streams/readable_stream_default_controller.ts120
-rw-r--r--cli/js/web/streams/readable_stream_default_reader.ts89
-rw-r--r--cli/js/web/streams/shared-internals.ts289
-rw-r--r--cli/js/web/streams/strategies.ts32
-rw-r--r--cli/js/web/streams/symbols.ts38
-rw-r--r--cli/js/web/streams/transform-internals.ts371
-rw-r--r--cli/js/web/streams/transform-stream-default-controller.ts58
-rw-r--r--cli/js/web/streams/transform-stream.ts147
-rw-r--r--cli/js/web/streams/writable-internals.ts800
-rw-r--r--cli/js/web/streams/writable-stream-default-controller.ts101
-rw-r--r--cli/js/web/streams/writable-stream-default-writer.ts136
-rw-r--r--cli/js/web/streams/writable-stream.ts118
-rw-r--r--cli/js/web/util.ts113
35 files changed, 2008 insertions, 4952 deletions
diff --git a/cli/js/globals.ts b/cli/js/globals.ts
index ec314bf68..897e9859f 100644
--- a/cli/js/globals.ts
+++ b/cli/js/globals.ts
@@ -22,7 +22,7 @@ import * as urlSearchParams from "./web/url_search_params.ts";
import * as workers from "./web/workers.ts";
import * as performanceUtil from "./web/performance.ts";
import * as request from "./web/request.ts";
-import * as streams from "./web/streams/mod.ts";
+import * as readableStream from "./web/streams/readable_stream.ts";
// These imports are not exposed and therefore are fine to just import the
// symbols required.
@@ -223,7 +223,7 @@ export const windowOrWorkerGlobalScopeProperties = {
FormData: nonEnumerable(formData.FormDataImpl),
TextEncoder: nonEnumerable(textEncoding.TextEncoder),
TextDecoder: nonEnumerable(textEncoding.TextDecoder),
- ReadableStream: nonEnumerable(streams.ReadableStream),
+ ReadableStream: nonEnumerable(readableStream.ReadableStreamImpl),
Request: nonEnumerable(request.Request),
Response: nonEnumerable(fetchTypes.Response),
performance: writable(new performanceUtil.Performance()),
diff --git a/cli/js/lib.deno.shared_globals.d.ts b/cli/js/lib.deno.shared_globals.d.ts
index ef450c201..bcc3ce890 100644
--- a/cli/js/lib.deno.shared_globals.d.ts
+++ b/cli/js/lib.deno.shared_globals.d.ts
@@ -245,6 +245,24 @@ interface ReadableStreamDefaultReader<R = any> {
releaseLock(): void;
}
+interface ReadableStreamReader<R = any> {
+ cancel(): Promise<void>;
+ read(): Promise<ReadableStreamReadResult<R>>;
+ releaseLock(): void;
+}
+
+interface ReadableByteStreamControllerCallback {
+ (controller: ReadableByteStreamController): void | PromiseLike<void>;
+}
+
+interface UnderlyingByteSource {
+ autoAllocateChunkSize?: number;
+ cancel?: ReadableStreamErrorCallback;
+ pull?: ReadableByteStreamControllerCallback;
+ start?: ReadableByteStreamControllerCallback;
+ type: "bytes";
+}
+
interface UnderlyingSource<R = any> {
cancel?: ReadableStreamErrorCallback;
pull?: ReadableStreamDefaultControllerCallback<R>;
@@ -260,11 +278,35 @@ interface ReadableStreamDefaultControllerCallback<R> {
(controller: ReadableStreamDefaultController<R>): void | PromiseLike<void>;
}
-interface ReadableStreamDefaultController<R> {
- readonly desiredSize: number;
- enqueue(chunk?: R): void;
+interface ReadableStreamDefaultController<R = any> {
+ readonly desiredSize: number | null;
close(): void;
- error(e?: any): void;
+ enqueue(chunk: R): void;
+ error(error?: any): void;
+}
+
+interface ReadableByteStreamController {
+ readonly byobRequest: undefined;
+ readonly desiredSize: number | null;
+ close(): void;
+ enqueue(chunk: ArrayBufferView): void;
+ error(error?: any): void;
+}
+
+interface PipeOptions {
+ preventAbort?: boolean;
+ preventCancel?: boolean;
+ preventClose?: boolean;
+ signal?: AbortSignal;
+}
+
+interface QueuingStrategySizeCallback<T = any> {
+ (chunk: T): number;
+}
+
+interface QueuingStrategy<T = any> {
+ highWaterMark?: number;
+ size?: QueuingStrategySizeCallback<T>;
}
/** This Streams API interface represents a readable stream of byte data. The
@@ -273,16 +315,36 @@ interface ReadableStreamDefaultController<R> {
interface ReadableStream<R = any> {
readonly locked: boolean;
cancel(reason?: any): Promise<void>;
- // TODO(ry) It doesn't seem like Chrome supports this.
+ getIterator(options?: { preventCancel?: boolean }): AsyncIterableIterator<R>;
// getReader(options: { mode: "byob" }): ReadableStreamBYOBReader;
getReader(): ReadableStreamDefaultReader<R>;
+ pipeThrough<T>(
+ {
+ writable,
+ readable,
+ }: {
+ writable: WritableStream<R>;
+ readable: ReadableStream<T>;
+ },
+ options?: PipeOptions
+ ): ReadableStream<T>;
+ pipeTo(dest: WritableStream<R>, options?: PipeOptions): Promise<void>;
tee(): [ReadableStream<R>, ReadableStream<R>];
+ [Symbol.asyncIterator](options?: {
+ preventCancel?: boolean;
+ }): AsyncIterableIterator<R>;
}
-declare const ReadableStream: {
+declare var ReadableStream: {
prototype: ReadableStream;
- // TODO(ry) This doesn't match lib.dom.d.ts
- new <R = any>(src?: UnderlyingSource<R>): ReadableStream<R>;
+ new (
+ underlyingSource: UnderlyingByteSource,
+ strategy?: { highWaterMark?: number; size?: undefined }
+ ): ReadableStream<Uint8Array>;
+ new <R = any>(
+ underlyingSource?: UnderlyingSource<R>,
+ strategy?: QueuingStrategy<R>
+ ): ReadableStream<R>;
};
/** This Streams API interface providesĀ a standard abstraction for writing streaming data to a destination, known as a sink. This object comes with built-in backpressure and queuing. */
diff --git a/cli/js/web/blob.ts b/cli/js/web/blob.ts
index 8f9615933..d30bb7e38 100644
--- a/cli/js/web/blob.ts
+++ b/cli/js/web/blob.ts
@@ -1,8 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-import * as domTypes from "./dom_types.d.ts";
import { TextDecoder, TextEncoder } from "./text_encoding.ts";
import { build } from "../build.ts";
-import { ReadableStream } from "./streams/mod.ts";
+import { ReadableStreamImpl } from "./streams/readable_stream.ts";
export const bytesSymbol = Symbol("bytes");
@@ -124,40 +123,36 @@ function processBlobParts(
return bytes;
}
-function getStream(blobBytes: Uint8Array): domTypes.ReadableStream<Uint8Array> {
- return new ReadableStream<Uint8Array>({
- start: (
- controller: domTypes.ReadableStreamDefaultController<Uint8Array>
- ): void => {
+function getStream(blobBytes: Uint8Array): ReadableStream<ArrayBufferView> {
+ // TODO: Align to spec https://fetch.spec.whatwg.org/#concept-construct-readablestream
+ return new ReadableStreamImpl({
+ type: "bytes",
+ start: (controller: ReadableByteStreamController): void => {
controller.enqueue(blobBytes);
controller.close();
},
- }) as domTypes.ReadableStream<Uint8Array>;
+ });
}
async function readBytes(
- reader: domTypes.ReadableStreamReader<Uint8Array>
+ reader: ReadableStreamReader<ArrayBufferView>
): Promise<ArrayBuffer> {
const chunks: Uint8Array[] = [];
while (true) {
- try {
- const { done, value } = await reader.read();
- if (!done && value instanceof Uint8Array) {
- chunks.push(value);
- } else if (done) {
- const size = chunks.reduce((p, i) => p + i.byteLength, 0);
- const bytes = new Uint8Array(size);
- let offs = 0;
- for (const chunk of chunks) {
- bytes.set(chunk, offs);
- offs += chunk.byteLength;
- }
- return Promise.resolve(bytes);
- } else {
- return Promise.reject(new TypeError());
+ const { done, value } = await reader.read();
+ if (!done && value instanceof Uint8Array) {
+ chunks.push(value);
+ } else if (done) {
+ const size = chunks.reduce((p, i) => p + i.byteLength, 0);
+ const bytes = new Uint8Array(size);
+ let offs = 0;
+ for (const chunk of chunks) {
+ bytes.set(chunk, offs);
+ offs += chunk.byteLength;
}
- } catch (e) {
- return Promise.reject(e);
+ return bytes;
+ } else {
+ throw new TypeError("Invalid reader result.");
}
}
}
@@ -207,7 +202,7 @@ export class DenoBlob implements Blob {
});
}
- stream(): domTypes.ReadableStream<Uint8Array> {
+ stream(): ReadableStream<ArrayBufferView> {
return getStream(this[bytesSymbol]);
}
diff --git a/cli/js/web/body.ts b/cli/js/web/body.ts
index 717b02e29..a1cf2038a 100644
--- a/cli/js/web/body.ts
+++ b/cli/js/web/body.ts
@@ -1,25 +1,18 @@
import * as blob from "./blob.ts";
import * as encoding from "./text_encoding.ts";
import * as domTypes from "./dom_types.d.ts";
-import { ReadableStream } from "./streams/mod.ts";
+import { ReadableStreamImpl } from "./streams/readable_stream.ts";
// only namespace imports work for now, plucking out what we need
const { TextEncoder, TextDecoder } = encoding;
const DenoBlob = blob.DenoBlob;
-type ReadableStreamReader = domTypes.ReadableStreamReader;
-
-interface ReadableStreamController {
- enqueue(chunk: string | ArrayBuffer): void;
- close(): void;
-}
-
export type BodySource =
| Blob
| BufferSource
| FormData
| URLSearchParams
- | domTypes.ReadableStream
+ | ReadableStream
| string;
function validateBodyType(owner: Body, bodySource: BodySource): boolean {
@@ -39,7 +32,7 @@ function validateBodyType(owner: Body, bodySource: BodySource): boolean {
return true;
} else if (typeof bodySource === "string") {
return true;
- } else if (bodySource instanceof ReadableStream) {
+ } else if (bodySource instanceof ReadableStreamImpl) {
return true;
} else if (bodySource instanceof FormData) {
return true;
@@ -118,7 +111,7 @@ export const BodyUsedError =
"Failed to execute 'clone' on 'Body': body is already used";
export class Body implements domTypes.Body {
- protected _stream: domTypes.ReadableStream<string | ArrayBuffer> | null;
+ protected _stream: ReadableStreamImpl<string | ArrayBuffer> | null;
constructor(protected _bodySource: BodySource, readonly contentType: string) {
validateBodyType(this, _bodySource);
@@ -127,23 +120,23 @@ export class Body implements domTypes.Body {
this._stream = null;
}
- get body(): domTypes.ReadableStream | null {
+ get body(): ReadableStream | null {
if (this._stream) {
return this._stream;
}
- if (this._bodySource instanceof ReadableStream) {
+ if (this._bodySource instanceof ReadableStreamImpl) {
// @ts-ignore
this._stream = this._bodySource;
}
if (typeof this._bodySource === "string") {
const bodySource = this._bodySource;
- this._stream = new ReadableStream({
- start(controller: ReadableStreamController): void {
+ this._stream = new ReadableStreamImpl<string | ArrayBuffer>({
+ start(controller: ReadableStreamDefaultController): void {
controller.enqueue(bodySource);
controller.close();
},
- }) as domTypes.ReadableStream<ArrayBuffer | string>;
+ });
}
return this._stream;
}
@@ -320,7 +313,7 @@ export class Body implements domTypes.Body {
return Promise.resolve(
enc.encode(this._bodySource).buffer as ArrayBuffer
);
- } else if (this._bodySource instanceof ReadableStream) {
+ } else if (this._bodySource instanceof ReadableStreamImpl) {
// @ts-ignore
return bufferFromStream(this._bodySource.getReader());
} else if (this._bodySource instanceof FormData) {
diff --git a/cli/js/web/dom_types.d.ts b/cli/js/web/dom_types.d.ts
index e675b888f..a78fd7d9e 100644
--- a/cli/js/web/dom_types.d.ts
+++ b/cli/js/web/dom_types.d.ts
@@ -17,14 +17,6 @@ and limitations under the License.
/* eslint-disable @typescript-eslint/no-explicit-any */
-type BodyInit =
- | Blob
- | BufferSource
- | FormData
- | URLSearchParams
- | ReadableStream
- | string;
-
export type RequestInfo = Request | string;
export interface ProgressEventInit extends EventInit {
@@ -261,82 +253,6 @@ export interface Body {
text(): Promise<string>;
}
-export interface ReadableStreamReadDoneResult<T> {
- done: true;
- value?: T;
-}
-
-export interface ReadableStreamReadValueResult<T> {
- done: false;
- value: T;
-}
-
-export type ReadableStreamReadResult<T> =
- | ReadableStreamReadValueResult<T>
- | ReadableStreamReadDoneResult<T>;
-
-export interface ReadableStreamDefaultReader<R = any> {
- readonly closed: Promise<void>;
- cancel(reason?: any): Promise<void>;
- read(): Promise<ReadableStreamReadResult<R>>;
- releaseLock(): void;
-}
-
-export interface PipeOptions {
- preventAbort?: boolean;
- preventCancel?: boolean;
- preventClose?: boolean;
- signal?: AbortSignal;
-}
-
-export interface UnderlyingSource<R = any> {
- cancel?: ReadableStreamErrorCallback;
- pull?: ReadableStreamDefaultControllerCallback<R>;
- start?: ReadableStreamDefaultControllerCallback<R>;
- type?: undefined;
-}
-export interface ReadableStreamErrorCallback {
- (reason: any): void | PromiseLike<void>;
-}
-
-export interface ReadableStreamDefaultControllerCallback<R> {
- (controller: ReadableStreamDefaultController<R>): void | PromiseLike<void>;
-}
-
-export interface ReadableStreamConstructor {
- new <R = any>(source?: UnderlyingSource<R>): ReadableStream<R>;
-}
-
-export interface ReadableStream<R = any> {
- readonly locked: boolean;
- cancel(reason?: any): Promise<void>;
- getReader(options: { mode: "byob" }): ReadableStreamBYOBReader;
- getReader(): ReadableStreamDefaultReader<R>;
- /* disabled for now
- pipeThrough<T>(
- {
- writable,
- readable
- }: {
- writable: WritableStream<R>;
- readable: ReadableStream<T>;
- },
- options?: PipeOptions
- ): ReadableStream<T>;
- pipeTo(dest: WritableStream<R>, options?: PipeOptions): Promise<void>;
- */
- tee(): [ReadableStream<R>, ReadableStream<R>];
-}
-
-export interface ReadableStreamBYOBReader {
- readonly closed: Promise<void>;
- cancel(reason?: any): Promise<void>;
- read<T extends ArrayBufferView>(
- view: T
- ): Promise<ReadableStreamReadResult<T>>;
- releaseLock(): void;
-}
-
export interface WritableStream<W = any> {
readonly locked: boolean;
abort(reason?: any): Promise<void>;
@@ -353,69 +269,6 @@ export interface WritableStreamDefaultWriter<W = any> {
write(chunk: W): Promise<void>;
}
-export interface UnderlyingSource<R = any> {
- cancel?: ReadableStreamErrorCallback;
- pull?: ReadableStreamDefaultControllerCallback<R>;
- start?: ReadableStreamDefaultControllerCallback<R>;
- type?: undefined;
-}
-
-export interface UnderlyingByteSource {
- autoAllocateChunkSize?: number;
- cancel?: ReadableStreamErrorCallback;
- pull?: ReadableByteStreamControllerCallback;
- start?: ReadableByteStreamControllerCallback;
- type: "bytes";
-}
-
-export interface ReadableStreamReader<R = any> {
- cancel(reason: any): Promise<void>;
- read(): Promise<ReadableStreamReadResult<R>>;
- releaseLock(): void;
-}
-
-export interface ReadableStreamErrorCallback {
- (reason: any): void | PromiseLike<void>;
-}
-
-export interface ReadableByteStreamControllerCallback {
- (controller: ReadableByteStreamController): void | PromiseLike<void>;
-}
-
-export interface ReadableStreamDefaultControllerCallback<R> {
- (controller: ReadableStreamDefaultController<R>): void | PromiseLike<void>;
-}
-
-export interface ReadableStreamDefaultController<R = any> {
- readonly desiredSize: number | null;
- close(): void;
- enqueue(chunk: R): void;
- error(error?: any): void;
-}
-
-export interface ReadableByteStreamController {
- readonly byobRequest: ReadableStreamBYOBRequest | undefined;
- readonly desiredSize: number | null;
- close(): void;
- enqueue(chunk: ArrayBufferView): void;
- error(error?: any): void;
-}
-
-export interface ReadableStreamBYOBRequest {
- readonly view: ArrayBufferView;
- respond(bytesWritten: number): void;
- respondWithNewView(view: ArrayBufferView): void;
-}
-
-export interface QueuingStrategy<T = any> {
- highWaterMark?: number;
- size?: QueuingStrategySizeCallback<T>;
-}
-
-export interface QueuingStrategySizeCallback<T = any> {
- (chunk: T): number;
-}
-
export interface RequestInit {
body?: BodyInit | null;
cache?: RequestCache;
diff --git a/cli/js/web/fetch.ts b/cli/js/web/fetch.ts
index 364c05a6b..2c16d5fb0 100644
--- a/cli/js/web/fetch.ts
+++ b/cli/js/web/fetch.ts
@@ -28,14 +28,13 @@ function hasHeaderValueOf(s: string, value: string): boolean {
return new RegExp(`^${value}[\t\s]*;?`).test(s);
}
-class Body
- implements domTypes.Body, domTypes.ReadableStream<Uint8Array>, io.ReadCloser {
+class Body implements domTypes.Body, ReadableStream<Uint8Array>, io.ReadCloser {
#bodyUsed = false;
#bodyPromise: Promise<ArrayBuffer> | null = null;
#data: ArrayBuffer | null = null;
#rid: number;
readonly locked: boolean = false; // TODO
- readonly body: domTypes.ReadableStream<Uint8Array>;
+ readonly body: ReadableStream<Uint8Array>;
constructor(rid: number, readonly contentType: string) {
this.#rid = rid;
@@ -234,15 +233,17 @@ class Body
return notImplemented();
}
- getReader(options: { mode: "byob" }): domTypes.ReadableStreamBYOBReader;
- getReader(): domTypes.ReadableStreamDefaultReader<Uint8Array>;
- getReader():
- | domTypes.ReadableStreamBYOBReader
- | domTypes.ReadableStreamDefaultReader<Uint8Array> {
+ getIterator(_options?: {
+ preventCancel?: boolean;
+ }): AsyncIterableIterator<Uint8Array> {
return notImplemented();
}
- tee(): [domTypes.ReadableStream, domTypes.ReadableStream] {
+ getReader(): ReadableStreamDefaultReader<Uint8Array> {
+ return notImplemented();
+ }
+
+ tee(): [ReadableStream, ReadableStream] {
return notImplemented();
}
@@ -257,16 +258,16 @@ class Body
pipeThrough<T>(
_: {
writable: domTypes.WritableStream<Uint8Array>;
- readable: domTypes.ReadableStream<T>;
+ readable: ReadableStream<T>;
},
- _options?: domTypes.PipeOptions
- ): domTypes.ReadableStream<T> {
+ _options?: PipeOptions
+ ): ReadableStream<T> {
return notImplemented();
}
pipeTo(
_dest: domTypes.WritableStream<Uint8Array>,
- _options?: domTypes.PipeOptions
+ _options?: PipeOptions
): Promise<void> {
return notImplemented();
}
diff --git a/cli/js/web/request.ts b/cli/js/web/request.ts
index 75f66b7cc..8fe93babe 100644
--- a/cli/js/web/request.ts
+++ b/cli/js/web/request.ts
@@ -1,9 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import * as body from "./body.ts";
import * as domTypes from "./dom_types.d.ts";
-import * as streams from "./streams/mod.ts";
-
-const { ReadableStream } = streams;
+import { ReadableStreamImpl } from "./streams/readable_stream.ts";
function byteUpperCase(s: string): string {
return String(s).replace(/[a-z]/g, function byteUpperCaseReplace(c): string {
@@ -124,8 +122,8 @@ export class Request extends body.Body implements domTypes.Request {
let body2 = this._bodySource;
- if (this._bodySource instanceof ReadableStream) {
- const tees = (this._bodySource as domTypes.ReadableStream).tee();
+ if (this._bodySource instanceof ReadableStreamImpl) {
+ const tees = this._bodySource.tee();
this._stream = this._bodySource = tees[0];
body2 = tees[1];
}
diff --git a/cli/js/web/streams/internals.ts b/cli/js/web/streams/internals.ts
new file mode 100644
index 000000000..2559d9e5c
--- /dev/null
+++ b/cli/js/web/streams/internals.ts
@@ -0,0 +1,1098 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+// This code closely follows the WHATWG Stream Specification
+// See: https://streams.spec.whatwg.org/
+//
+// There are some parts that are not fully implemented, and there are some
+// comments which point to steps of the specification that are not implemented.
+//
+
+/* eslint-disable @typescript-eslint/no-explicit-any,require-await */
+import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_controller.ts";
+import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
+import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts";
+import { ReadableStreamImpl } from "./readable_stream.ts";
+import * as sym from "./symbols.ts";
+import { cloneValue } from "../util.ts";
+import { assert } from "../../util.ts";
+
+export interface BufferQueueItem extends Pair<ArrayBuffer | SharedArrayBuffer> {
+ offset: number;
+}
+export type CancelAlgorithm = (reason?: any) => PromiseLike<void>;
+type Container<R = any> = {
+ [sym.queue]: Array<Pair<R> | BufferQueueItem>;
+ [sym.queueTotalSize]: number;
+};
+export type Pair<R> = { value: R; size: number };
+export type PullAlgorithm = () => PromiseLike<void>;
+export type SizeAlgorithm<T> = (chunk: T) => number;
+export type StartAlgorithm = () => void | PromiseLike<void>;
+export interface Deferred<T> {
+ promise: Promise<T>;
+ resolve?: (value?: T | PromiseLike<T>) => void;
+ reject?: (reason?: any) => void;
+ settled: boolean;
+}
+
+export interface ReadableStreamGenericReader<R = any>
+ extends ReadableStreamReader<R> {
+ [sym.closedPromise]: Deferred<void>;
+ [sym.forAuthorCode]: boolean;
+ [sym.ownerReadableStream]: ReadableStreamImpl<R>;
+ [sym.readRequests]: Array<Deferred<ReadableStreamReadResult<R>>>;
+}
+
+export interface ReadableStreamAsyncIterator<T = any> extends AsyncIterator<T> {
+ [sym.asyncIteratorReader]: ReadableStreamDefaultReaderImpl<T>;
+ [sym.preventCancel]: boolean;
+ return(value?: any | PromiseLike<any>): Promise<IteratorResult<T, any>>;
+}
+
+export function acquireReadableStreamDefaultReader<T>(
+ stream: ReadableStreamImpl<T>,
+ forAuthorCode = false
+): ReadableStreamDefaultReaderImpl<T> {
+ const reader = new ReadableStreamDefaultReaderImpl(stream);
+ reader[sym.forAuthorCode] = forAuthorCode;
+ return reader;
+}
+
+function createAlgorithmFromUnderlyingMethod<
+ O extends UnderlyingByteSource | UnderlyingSource,
+ P extends keyof O
+>(
+ underlyingObject: O,
+ methodName: P,
+ algoArgCount: 0,
+ ...extraArgs: any[]
+): () => Promise<void>;
+function createAlgorithmFromUnderlyingMethod<
+ O extends UnderlyingByteSource | UnderlyingSource,
+ P extends keyof O
+>(
+ underlyingObject: O,
+ methodName: P,
+ algoArgCount: 1,
+ ...extraArgs: any[]
+): (arg: any) => Promise<void>;
+function createAlgorithmFromUnderlyingMethod<
+ O extends UnderlyingByteSource | UnderlyingSource,
+ P extends keyof O
+>(
+ underlyingObject: O,
+ methodName: P,
+ algoArgCount: 0 | 1,
+ ...extraArgs: any[]
+): (() => Promise<void>) | ((arg: any) => Promise<void>) {
+ const method = underlyingObject[methodName];
+ if (method) {
+ if (!isCallable(method)) {
+ throw new TypeError("method is not callable");
+ }
+ if (algoArgCount === 0) {
+ return async (): Promise<void> =>
+ method.call(underlyingObject, ...extraArgs);
+ } else {
+ return async (arg: any): Promise<void> => {
+ const fullArgs = [arg, ...extraArgs];
+ return method.call(underlyingObject, ...fullArgs);
+ };
+ }
+ }
+ return async (): Promise<void> => undefined;
+}
+
+function createReadableStream<T>(
+ startAlgorithm: StartAlgorithm,
+ pullAlgorithm: PullAlgorithm,
+ cancelAlgorithm: CancelAlgorithm,
+ highWaterMark = 1,
+ sizeAlgorithm: SizeAlgorithm<T> = (): number => 1
+): ReadableStreamImpl<T> {
+ assert(isNonNegativeNumber(highWaterMark));
+ const stream: ReadableStreamImpl<T> = Object.create(
+ ReadableStreamImpl.prototype
+ );
+ initializeReadableStream(stream);
+ const controller: ReadableStreamDefaultControllerImpl<T> = Object.create(
+ ReadableStreamDefaultControllerImpl.prototype
+ );
+ setUpReadableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+ return stream;
+}
+
+export function dequeueValue<R>(container: Container<R>): R {
+ assert(sym.queue in container && sym.queueTotalSize in container);
+ assert(container[sym.queue].length);
+ const pair = container[sym.queue].shift()!;
+ container[sym.queueTotalSize] -= pair.size;
+ if (container[sym.queueTotalSize] <= 0) {
+ container[sym.queueTotalSize] = 0;
+ }
+ return pair.value as R;
+}
+
+function enqueueValueWithSize<R>(
+ container: Container<R>,
+ value: R,
+ size: number
+): void {
+ assert(sym.queue in container && sym.queueTotalSize in container);
+ size = Number(size);
+ if (!isFiniteNonNegativeNumber(size)) {
+ throw new RangeError("size must be a finite non-negative number.");
+ }
+ container[sym.queue].push({ value, size });
+ container[sym.queueTotalSize] += size;
+}
+
+/** Non-spec mechanism to "unwrap" a promise and store it to be resolved
+ * later. */
+function getDeferred<T>(): Deferred<T> {
+ let resolve = undefined;
+ let reject = undefined;
+ const promise = new Promise<T>((res, rej) => {
+ resolve = res;
+ reject = rej;
+ });
+ return { promise, resolve, reject, settled: false };
+}
+
+export function initializeReadableStream(stream: ReadableStreamImpl): void {
+ stream[sym.state] = "readable";
+ stream[sym.reader] = stream[sym.storedError] = undefined;
+ stream[sym.disturbed] = false;
+}
+
+function invokeOrNoop<O extends any, P extends keyof O>(
+ o: O,
+ p: P,
+ ...args: Parameters<O[P]>
+): ReturnType<O[P]> | undefined {
+ assert(o);
+ const method = o[p];
+ if (!method) {
+ return undefined;
+ }
+ return method.call(o, ...args);
+}
+
+function isCallable(value: unknown): value is (...args: any) => any {
+ return typeof value === "function";
+}
+
+export function isDetachedBuffer(value: object): boolean {
+ return sym.isFakeDetached in value;
+}
+
+function isFiniteNonNegativeNumber(v: unknown): v is number {
+ if (!isNonNegativeNumber(v)) {
+ return false;
+ }
+ if (v === Infinity) {
+ return false;
+ }
+ return true;
+}
+
+function isNonNegativeNumber(v: unknown): v is number {
+ if (typeof v !== "number") {
+ return false;
+ }
+ if (v === NaN) {
+ return false;
+ }
+ if (v < 0) {
+ return false;
+ }
+ return true;
+}
+
+export function isReadableByteStreamController(
+ x: unknown
+): x is ReadableByteStreamControllerImpl {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.controlledReadableByteStream in x)
+ ? false
+ : true;
+}
+
+export function isReadableStream(x: unknown): x is ReadableStreamImpl {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.readableStreamController in x)
+ ? false
+ : true;
+}
+
+export function isReadableStreamAsyncIterator(
+ x: unknown
+): x is ReadableStreamAsyncIterator<any> {
+ if (typeof x !== "object" || x === null) {
+ return false;
+ }
+ if (!(sym.asyncIteratorReader in x)) {
+ return false;
+ }
+ return true;
+}
+
+export function isReadableStreamDefaultController(
+ x: unknown
+): x is ReadableStreamDefaultControllerImpl {
+ return typeof x !== "object" ||
+ x === null ||
+ !(sym.controlledReadableStream in x)
+ ? false
+ : true;
+}
+
+export function isReadableStreamDefaultReader<T>(
+ x: unknown
+): x is ReadableStreamDefaultReaderImpl<T> {
+ return typeof x !== "object" || x === null || !(sym.readRequests in x)
+ ? false
+ : true;
+}
+
+export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean {
+ assert(isReadableStream(stream));
+ return stream[sym.reader] ? true : false;
+}
+
+export function isUnderlyingByteSource(
+ underlyingSource: UnderlyingByteSource | UnderlyingSource
+): underlyingSource is UnderlyingByteSource {
+ const { type } = underlyingSource;
+ const typeString = String(type);
+ return typeString === "bytes";
+}
+
+export function makeSizeAlgorithmFromSizeFunction<T>(
+ size: QueuingStrategySizeCallback<T> | undefined
+): SizeAlgorithm<T> {
+ if (size === undefined) {
+ return (): number => 1;
+ }
+ if (typeof size !== "function") {
+ throw new TypeError("size must be callable.");
+ }
+ return (chunk: T): number => {
+ return size.call(undefined, chunk);
+ };
+}
+
+function readableByteStreamControllerShouldCallPull(
+ controller: ReadableByteStreamControllerImpl
+): boolean {
+ const stream = controller[sym.controlledReadableByteStream];
+ if (
+ stream[sym.state] !== "readable" ||
+ controller[sym.closeRequested] ||
+ !controller[sym.started]
+ ) {
+ return false;
+ }
+ if (
+ readableStreamHasDefaultReader(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ return true;
+ }
+ // 3.13.25.6 If ! ReadableStreamHasBYOBReader(stream) is true and !
+ // ReadableStreamGetNumReadIntoRequests(stream) > 0, return true.
+ const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
+ assert(desiredSize !== null);
+ if (desiredSize > 0) {
+ return true;
+ }
+ return false;
+}
+
+export function readableByteStreamControllerCallPullIfNeeded(
+ controller: ReadableByteStreamControllerImpl
+): void {
+ const shouldPull = readableByteStreamControllerShouldCallPull(controller);
+ if (!shouldPull) {
+ return;
+ }
+ if (controller[sym.pulling]) {
+ controller[sym.pullAgain] = true;
+ return;
+ }
+ assert(controller[sym.pullAgain] === false);
+ controller[sym.pulling] = true;
+ const pullPromise = controller[sym.pullAlgorithm]();
+ pullPromise.then(
+ () => {
+ controller[sym.pulling] = false;
+ if (controller[sym.pullAgain]) {
+ controller[sym.pullAgain];
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+ },
+ (e) => {
+ readableByteStreamControllerError(controller, e);
+ }
+ );
+}
+
+export function readableByteStreamControllerClearAlgorithms(
+ controller: ReadableByteStreamControllerImpl
+): void {
+ delete controller[sym.pullAlgorithm];
+ delete controller[sym.cancelAlgorithm];
+}
+
+export function readableByteStreamControllerClose(
+ controller: ReadableByteStreamControllerImpl
+): void {
+ const stream = controller[sym.controlledReadableByteStream];
+ if (controller[sym.closeRequested] || stream[sym.state] !== "readable") {
+ return;
+ }
+ if (controller[sym.queueTotalSize] > 0) {
+ controller[sym.closeRequested] = true;
+ return;
+ }
+ // 3.13.6.4 If controller.[[pendingPullIntos]] is not empty, (BYOB Support)
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+}
+
+export function readableByteStreamControllerEnqueue(
+ controller: ReadableByteStreamControllerImpl,
+ chunk: ArrayBufferView
+): void {
+ const stream = controller[sym.controlledReadableByteStream];
+ if (controller[sym.closeRequested] || stream[sym.state] !== "readable") {
+ return;
+ }
+ const { buffer, byteOffset, byteLength } = chunk;
+ const transferredBuffer = transferArrayBuffer(buffer);
+ if (readableStreamHasDefaultReader(stream)) {
+ if (readableStreamGetNumReadRequests(stream) === 0) {
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ } else {
+ assert(controller[sym.queue].length === 0);
+ const transferredView = new Uint8Array(
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ readableStreamFulfillReadRequest(stream, transferredView, false);
+ }
+ // 3.13.9.8 Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true
+ } else {
+ assert(!isReadableStreamLocked(stream));
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ }
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+function readableByteStreamControllerEnqueueChunkToQueue(
+ controller: ReadableByteStreamControllerImpl,
+ buffer: ArrayBuffer | SharedArrayBuffer,
+ byteOffset: number,
+ byteLength: number
+): void {
+ controller[sym.queue].push({
+ value: buffer,
+ offset: byteOffset,
+ size: byteLength,
+ });
+ controller[sym.queueTotalSize] += byteLength;
+}
+
+export function readableByteStreamControllerError(
+ controller: ReadableByteStreamControllerImpl,
+ e: any
+): void {
+ const stream = controller[sym.controlledReadableByteStream];
+ if (stream[sym.state] !== "readable") {
+ return;
+ }
+ // 3.13.11.3 Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
+ resetQueue(controller);
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamError(stream, e);
+}
+
+export function readableByteStreamControllerGetDesiredSize(
+ controller: ReadableByteStreamControllerImpl
+): number | null {
+ const stream = controller[sym.controlledReadableByteStream];
+ const state = stream[sym.state];
+ if (state === "errored") {
+ return null;
+ }
+ if (state === "closed") {
+ return 0;
+ }
+ return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
+}
+
+export function readableByteStreamControllerHandleQueueDrain(
+ controller: ReadableByteStreamControllerImpl
+): void {
+ assert(
+ controller[sym.controlledReadableByteStream][sym.state] === "readable"
+ );
+ if (controller[sym.queueTotalSize] === 0 && controller[sym.closeRequested]) {
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamClose(controller[sym.controlledReadableByteStream]);
+ } else {
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+}
+
+export function readableStreamAddReadRequest<R>(
+ stream: ReadableStreamImpl<R>
+): Promise<ReadableStreamReadResult<R>> {
+ assert(isReadableStreamDefaultReader(stream[sym.reader]));
+ assert(stream[sym.state] === "readable");
+ const promise = getDeferred<ReadableStreamReadResult<R>>();
+ stream[sym.reader]![sym.readRequests].push(promise);
+ return promise.promise;
+}
+
+export async function readableStreamCancel<T>(
+ stream: ReadableStreamImpl<T>,
+ reason: any
+): Promise<void> {
+ stream[sym.disturbed] = true;
+ if (stream[sym.state] === "closed") {
+ return Promise.resolve();
+ }
+ if (stream[sym.state] === "errored") {
+ return Promise.reject(stream[sym.storedError]);
+ }
+ readableStreamClose(stream);
+ await stream[sym.readableStreamController]![sym.cancelSteps](reason);
+}
+
+export function readableStreamClose<T>(stream: ReadableStreamImpl<T>): void {
+ assert(stream[sym.state] === "readable");
+ stream[sym.state] = "closed";
+ const reader = stream[sym.reader];
+ if (!reader) {
+ return;
+ }
+ if (isReadableStreamDefaultReader<T>(reader)) {
+ for (const readRequest of reader[sym.readRequests]) {
+ assert(readRequest.resolve);
+ readRequest.resolve(
+ readableStreamCreateReadResult<T>(
+ undefined,
+ true,
+ reader[sym.forAuthorCode]
+ )
+ );
+ }
+ reader[sym.readRequests] = [];
+ }
+ const resolve = reader[sym.closedPromise].resolve;
+ assert(resolve);
+ resolve();
+ reader[sym.closedPromise].settled = true;
+}
+
+export function readableStreamCreateReadResult<T>(
+ value: T | undefined,
+ done: boolean,
+ forAuthorCode: boolean
+): ReadableStreamReadResult<T> {
+ const prototype = forAuthorCode ? Object.prototype : null;
+ assert(typeof done === "boolean");
+ const obj: ReadableStreamReadResult<T> = Object.create(prototype);
+ Object.defineProperties(obj, {
+ value: { value, writable: true, enumerable: true, configurable: true },
+ done: { value: done, writable: true, enumerable: true, configurable: true },
+ });
+ return obj;
+}
+
+export function readableStreamDefaultControllerCallPullIfNeeded<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): void {
+ const shouldPull = readableStreamDefaultControllerShouldCallPull(controller);
+ if (!shouldPull) {
+ return;
+ }
+ if (controller[sym.pulling]) {
+ controller[sym.pullAgain] = true;
+ return;
+ }
+ assert(controller[sym.pullAgain] === false);
+ controller[sym.pulling] = true;
+ const pullPromise = controller[sym.pullAlgorithm]();
+ pullPromise.then(
+ () => {
+ controller[sym.pulling] = false;
+ if (controller[sym.pullAgain]) {
+ controller[sym.pullAgain] = false;
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ },
+ (e) => {
+ readableStreamDefaultControllerError(controller, e);
+ }
+ );
+}
+
+export function readableStreamDefaultControllerCanCloseOrEnqueue<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): boolean {
+ const state = controller[sym.controlledReadableStream][sym.state];
+ if (!controller[sym.closeRequested] && state === "readable") {
+ return true;
+ }
+ return false;
+}
+
+export function readableStreamDefaultControllerClearAlgorithms<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): void {
+ delete controller[sym.pullAlgorithm];
+ delete controller[sym.cancelAlgorithm];
+ delete controller[sym.strategySizeAlgorithm];
+}
+
+export function readableStreamDefaultControllerClose<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): void {
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
+ return;
+ }
+ const stream = controller[sym.controlledReadableStream];
+ controller[sym.closeRequested] = true;
+ if (controller[sym.queue].length === 0) {
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+ }
+}
+
+export function readableStreamDefaultControllerEnqueue<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>,
+ chunk: T
+): void {
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
+ return;
+ }
+ const stream = controller[sym.controlledReadableStream];
+ if (
+ isReadableStreamLocked(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ readableStreamFulfillReadRequest(stream, chunk, false);
+ } else {
+ try {
+ const chunkSize = controller[sym.strategySizeAlgorithm](chunk);
+ enqueueValueWithSize(controller, chunk, chunkSize);
+ } catch (err) {
+ readableStreamDefaultControllerError(controller, err);
+ throw err;
+ }
+ }
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+}
+
+export function readableStreamDefaultControllerGetDesiredSize<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): number | null {
+ const stream = controller[sym.controlledReadableStream];
+ const state = stream[sym.state];
+ if (state === "errored") {
+ return null;
+ }
+ if (state === "closed") {
+ return 0;
+ }
+ return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
+}
+
+export function readableStreamDefaultControllerError<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>,
+ e: any
+): void {
+ const stream = controller[sym.controlledReadableStream];
+ if (stream[sym.state] !== "readable") {
+ return;
+ }
+ resetQueue(controller);
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamError(stream, e);
+}
+
+function readableStreamDefaultControllerShouldCallPull<T>(
+ controller: ReadableStreamDefaultControllerImpl<T>
+): boolean {
+ const stream = controller[sym.controlledReadableStream];
+ if (
+ !readableStreamDefaultControllerCanCloseOrEnqueue(controller) ||
+ controller[sym.started] === false
+ ) {
+ return false;
+ }
+ if (
+ isReadableStreamLocked(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ return true;
+ }
+ const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
+ assert(desiredSize !== null);
+ if (desiredSize > 0) {
+ return true;
+ }
+ return false;
+}
+
+export function readableStreamDefaultReaderRead<R>(
+ reader: ReadableStreamDefaultReaderImpl<R>
+): Promise<ReadableStreamReadResult<R>> {
+ const stream = reader[sym.ownerReadableStream];
+ assert(stream);
+ stream[sym.disturbed] = true;
+ if (stream[sym.state] === "closed") {
+ return Promise.resolve(
+ readableStreamCreateReadResult<R>(
+ undefined,
+ true,
+ reader[sym.forAuthorCode]
+ )
+ );
+ }
+ if (stream[sym.state] === "errored") {
+ return Promise.reject(stream[sym.storedError]);
+ }
+ assert(stream[sym.state] === "readable");
+ return (stream[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl)[sym.pullSteps]();
+}
+
+export function readableStreamError(stream: ReadableStreamImpl, e: any): void {
+ assert(isReadableStream(stream));
+ assert(stream[sym.state] === "readable");
+ stream[sym.state] = "errored";
+ stream[sym.storedError] = e;
+ const reader = stream[sym.reader];
+ if (reader === undefined) {
+ return;
+ }
+ if (isReadableStreamDefaultReader(reader)) {
+ for (const readRequest of reader[sym.readRequests]) {
+ const { reject } = readRequest;
+ assert(reject);
+ reject(e);
+ }
+ reader[sym.readRequests] = [];
+ }
+ // 3.5.6.8 Otherwise, support BYOB Reader
+ const { reject } = reader[sym.closedPromise];
+ assert(reject);
+ reject(e);
+ reader[sym.closedPromise].settled = true;
+}
+
+export function readableStreamFulfillReadRequest<R>(
+ stream: ReadableStreamImpl<R>,
+ chunk: R,
+ done: boolean
+): void {
+ const reader = stream[sym.reader]!;
+ const readRequest = reader[sym.readRequests].shift()!;
+ assert(readRequest.resolve);
+ readRequest.resolve(
+ readableStreamCreateReadResult(chunk, done, reader[sym.forAuthorCode])
+ );
+}
+
+export function readableStreamGetNumReadRequests(
+ stream: ReadableStreamImpl
+): number {
+ return stream[sym.reader]?.[sym.readRequests].length ?? 0;
+}
+
+export function readableStreamHasDefaultReader(
+ stream: ReadableStreamImpl
+): boolean {
+ const reader = stream[sym.reader];
+ return reader === undefined || !isReadableStreamDefaultReader(reader)
+ ? false
+ : true;
+}
+
+export function readableStreamReaderGenericCancel<R = any>(
+ reader: ReadableStreamGenericReader<R>,
+ reason: any
+): Promise<void> {
+ const stream = reader[sym.ownerReadableStream];
+ assert(stream);
+ return readableStreamCancel(stream, reason);
+}
+
+export function readableStreamReaderGenericInitialize<R = any>(
+ reader: ReadableStreamGenericReader<R>,
+ stream: ReadableStreamImpl<R>
+): void {
+ reader[sym.forAuthorCode] = true;
+ reader[sym.ownerReadableStream] = stream;
+ stream[sym.reader] = reader;
+ if (stream[sym.state] === "readable") {
+ reader[sym.closedPromise] = getDeferred();
+ } else if (stream[sym.state] === "closed") {
+ reader[sym.closedPromise] = {
+ promise: Promise.resolve(),
+ settled: true,
+ };
+ } else {
+ assert(stream[sym.state] === "errored");
+ reader[sym.closedPromise] = {
+ promise: Promise.reject(stream[sym.storedError]),
+ settled: true,
+ };
+ }
+}
+
+export function readableStreamReaderGenericRelease<R = any>(
+ reader: ReadableStreamGenericReader<R>
+): void {
+ assert(reader[sym.ownerReadableStream]);
+ assert(reader[sym.ownerReadableStream][sym.reader] === reader);
+ const closedPromise = reader[sym.closedPromise];
+ if (reader[sym.ownerReadableStream][sym.state] === "readable") {
+ assert(closedPromise.reject);
+ closedPromise.reject(new TypeError("ReadableStream state is readable."));
+ } else {
+ closedPromise.promise = Promise.reject(new TypeError("Reading is closed."));
+ delete closedPromise.reject;
+ delete closedPromise.resolve;
+ }
+ closedPromise.settled = true;
+ delete reader[sym.ownerReadableStream][sym.reader];
+ delete reader[sym.ownerReadableStream];
+}
+
+export function readableStreamTee<T>(
+ stream: ReadableStreamImpl<T>,
+ cloneForBranch2: boolean
+): [ReadableStreamImpl<T>, ReadableStreamImpl<T>] {
+ assert(isReadableStream(stream));
+ assert(typeof cloneForBranch2 === "boolean");
+ const reader = acquireReadableStreamDefaultReader(stream);
+ let reading = false;
+ let canceled1 = false;
+ let canceled2 = false;
+ let reason1: any = undefined;
+ let reason2: any = undefined;
+ /* eslint-disable prefer-const */
+ let branch1: ReadableStreamImpl<T>;
+ let branch2: ReadableStreamImpl<T>;
+ /* eslint-enable prefer-const */
+ const cancelPromise = getDeferred<void>();
+ const pullAlgorithm = (): PromiseLike<void> => {
+ if (reading) {
+ return Promise.resolve();
+ }
+ reading = true;
+ readableStreamDefaultReaderRead(reader).then((result) => {
+ reading = false;
+ assert(typeof result === "object");
+ const { done } = result;
+ assert(typeof done === "boolean");
+ if (done) {
+ if (!canceled1) {
+ readableStreamDefaultControllerClose(
+ branch1[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl
+ );
+ }
+ if (!canceled2) {
+ readableStreamDefaultControllerClose(
+ branch2[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl
+ );
+ }
+ return;
+ }
+ const { value } = result;
+ const value1 = value!;
+ let value2 = value!;
+ if (!canceled2 && cloneForBranch2) {
+ value2 = cloneValue(value2);
+ }
+ if (!canceled1) {
+ readableStreamDefaultControllerEnqueue(
+ branch1[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ value1
+ );
+ }
+ if (!canceled2) {
+ readableStreamDefaultControllerEnqueue(
+ branch2[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ value2
+ );
+ }
+ });
+ return Promise.resolve();
+ };
+ const cancel1Algorithm = (reason?: any): PromiseLike<void> => {
+ canceled1 = true;
+ reason1 = reason;
+ if (canceled2) {
+ const compositeReason = [reason1, reason2];
+ const cancelResult = readableStreamCancel(stream, compositeReason);
+ assert(cancelPromise.resolve);
+ cancelPromise.resolve(cancelResult);
+ }
+ return cancelPromise.promise;
+ };
+ const cancel2Algorithm = (reason?: any): PromiseLike<void> => {
+ canceled2 = true;
+ reason2 = reason;
+ if (canceled1) {
+ const compositeReason = [reason1, reason2];
+ const cancelResult = readableStreamCancel(stream, compositeReason);
+ assert(cancelPromise.resolve);
+ cancelPromise.resolve(cancelResult);
+ }
+ return cancelPromise.promise;
+ };
+ const startAlgorithm = (): void => undefined;
+ branch1 = createReadableStream(
+ startAlgorithm,
+ pullAlgorithm,
+ cancel1Algorithm
+ );
+ branch2 = createReadableStream(
+ startAlgorithm,
+ pullAlgorithm,
+ cancel2Algorithm
+ );
+ reader[sym.closedPromise].promise.catch((r) => {
+ readableStreamDefaultControllerError(
+ branch1[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ r
+ );
+ readableStreamDefaultControllerError(
+ branch2[
+ sym.readableStreamController
+ ] as ReadableStreamDefaultControllerImpl,
+ r
+ );
+ });
+ return [branch1, branch2];
+}
+
+export function resetQueue<R>(container: Container<R>): void {
+ assert(sym.queue in container && sym.queueTotalSize in container);
+ container[sym.queue] = [];
+ container[sym.queueTotalSize] = 0;
+}
+
+function setUpReadableByteStreamController(
+ stream: ReadableStreamImpl,
+ controller: ReadableByteStreamControllerImpl,
+ startAlgorithm: StartAlgorithm,
+ pullAlgorithm: PullAlgorithm,
+ cancelAlgorithm: CancelAlgorithm,
+ highWaterMark: number,
+ autoAllocateChunkSize: number | undefined
+): void {
+ assert(stream[sym.readableStreamController] === undefined);
+ if (autoAllocateChunkSize !== undefined) {
+ assert(Number.isInteger(autoAllocateChunkSize));
+ assert(autoAllocateChunkSize >= 0);
+ }
+ controller[sym.controlledReadableByteStream] = stream;
+ controller[sym.pulling] = controller[sym.pullAgain] = false;
+ controller[sym.byobRequest] = undefined;
+ controller[sym.queue] = [];
+ controller[sym.queueTotalSize] = 0;
+ controller[sym.closeRequested] = controller[sym.started] = false;
+ controller[sym.strategyHWM] = validateAndNormalizeHighWaterMark(
+ highWaterMark
+ );
+ controller[sym.pullAlgorithm] = pullAlgorithm;
+ controller[sym.cancelAlgorithm] = cancelAlgorithm;
+ controller[sym.autoAllocateChunkSize] = autoAllocateChunkSize;
+ // 3.13.26.12 Set controller.[[pendingPullIntos]] to a new empty List.
+ stream[sym.readableStreamController] = controller;
+ const startResult = startAlgorithm();
+ const startPromise = Promise.resolve(startResult);
+ startPromise.then(
+ () => {
+ controller[sym.started] = true;
+ assert(!controller[sym.pulling]);
+ assert(!controller[sym.pullAgain]);
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ },
+ (r) => {
+ readableByteStreamControllerError(controller, r);
+ }
+ );
+}
+
+export function setUpReadableByteStreamControllerFromUnderlyingSource(
+ stream: ReadableStreamImpl,
+ underlyingByteSource: UnderlyingByteSource,
+ highWaterMark: number
+): void {
+ assert(underlyingByteSource);
+ const controller: ReadableByteStreamControllerImpl = Object.create(
+ ReadableByteStreamControllerImpl.prototype
+ );
+ const startAlgorithm: StartAlgorithm = () => {
+ return invokeOrNoop(underlyingByteSource, "start", controller);
+ };
+ const pullAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingByteSource,
+ "pull",
+ 0,
+ controller
+ );
+ const cancelAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingByteSource,
+ "cancel",
+ 1
+ );
+ // 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize").
+ const autoAllocateChunkSize = undefined;
+ setUpReadableByteStreamController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ autoAllocateChunkSize
+ );
+}
+
+function setUpReadableStreamDefaultController<T>(
+ stream: ReadableStreamImpl<T>,
+ controller: ReadableStreamDefaultControllerImpl<T>,
+ startAlgorithm: StartAlgorithm,
+ pullAlgorithm: PullAlgorithm,
+ cancelAlgorithm: CancelAlgorithm,
+ highWaterMark: number,
+ sizeAlgorithm: SizeAlgorithm<T>
+): void {
+ assert(stream[sym.readableStreamController] === undefined);
+ controller[sym.controlledReadableStream] = stream;
+ controller[sym.queue] = [];
+ controller[sym.queueTotalSize] = 0;
+ controller[sym.started] = controller[sym.closeRequested] = controller[
+ sym.pullAgain
+ ] = controller[sym.pulling] = false;
+ controller[sym.strategySizeAlgorithm] = sizeAlgorithm;
+ controller[sym.strategyHWM] = highWaterMark;
+ controller[sym.pullAlgorithm] = pullAlgorithm;
+ controller[sym.cancelAlgorithm] = cancelAlgorithm;
+ stream[sym.readableStreamController] = controller;
+ const startResult = startAlgorithm();
+ const startPromise = Promise.resolve(startResult);
+ startPromise.then(
+ () => {
+ controller[sym.started] = true;
+ assert(controller[sym.pulling] === false);
+ assert(controller[sym.pullAgain] === false);
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ },
+ (r) => {
+ readableStreamDefaultControllerError(controller, r);
+ }
+ );
+}
+
+export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
+ stream: ReadableStreamImpl<T>,
+ underlyingSource: UnderlyingSource<T>,
+ highWaterMark: number,
+ sizeAlgorithm: SizeAlgorithm<T>
+): void {
+ assert(underlyingSource);
+ const controller: ReadableStreamDefaultControllerImpl<T> = Object.create(
+ ReadableStreamDefaultControllerImpl.prototype
+ );
+ const startAlgorithm: StartAlgorithm = (): void | PromiseLike<void> =>
+ invokeOrNoop(underlyingSource, "start", controller);
+ const pullAlgorithm: PullAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingSource,
+ "pull",
+ 0,
+ controller
+ );
+ const cancelAlgorithm: CancelAlgorithm = createAlgorithmFromUnderlyingMethod(
+ underlyingSource,
+ "cancel",
+ 1
+ );
+ setUpReadableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+}
+
+function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer {
+ assert(!isDetachedBuffer(buffer));
+ const transferredIshVersion = buffer.slice(0);
+
+ Object.defineProperty(buffer, "byteLength", {
+ get(): number {
+ return 0;
+ },
+ });
+ (buffer as any)[sym.isFakeDetached] = true;
+
+ return transferredIshVersion;
+}
+
+export function validateAndNormalizeHighWaterMark(
+ highWaterMark: number
+): number {
+ highWaterMark = Number(highWaterMark);
+ if (highWaterMark === NaN || highWaterMark < 0) {
+ throw new RangeError(
+ `highWaterMark must be a positive number or Infinity. Received: ${highWaterMark}.`
+ );
+ }
+ return highWaterMark;
+}
+
+/* eslint-enable */
diff --git a/cli/js/web/streams/mod.ts b/cli/js/web/streams/mod.ts
deleted file mode 100644
index a6b55ed5a..000000000
--- a/cli/js/web/streams/mod.ts
+++ /dev/null
@@ -1,13 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-export { SDReadableStream as ReadableStream } from "./readable-stream.ts";
-/* TODO The following are currently unused so not exported for clarity.
-export { WritableStream } from "./writable-stream.ts";
-
-export { TransformStream } from "./transform-stream.ts";
-export {
- ByteLengthQueuingStrategy,
- CountQueuingStrategy
-} from "./strategies.ts";
-*/
diff --git a/cli/js/web/streams/pipe-to.ts b/cli/js/web/streams/pipe-to.ts
deleted file mode 100644
index 17fb8e5bd..000000000
--- a/cli/js/web/streams/pipe-to.ts
+++ /dev/null
@@ -1,237 +0,0 @@
-// TODO reenable this code when we enable writableStreams and transport types
-// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-// /**
-// * streams/pipe-to - pipeTo algorithm implementation
-// * Part of Stardazed
-// * (c) 2018-Present by Arthur Langereis - @zenmumbler
-// * https://github.com/stardazed/sd-streams
-// */
-
-// /* eslint-disable @typescript-eslint/no-explicit-any */
-// // TODO reenable this lint here
-
-// import * as rs from "./readable-internals.ts";
-// import * as ws from "./writable-internals.ts";
-// import * as shared from "./shared-internals.ts";
-
-// import { ReadableStreamDefaultReader } from "./readable-stream-default-reader.ts";
-// import { WritableStreamDefaultWriter } from "./writable-stream-default-writer.ts";
-// import { PipeOptions } from "../dom_types.d.ts";
-// import { Err } from "../errors.ts";
-
-// // add a wrapper to handle falsy rejections
-// interface ErrorWrapper {
-// actualError: shared.ErrorResult;
-// }
-
-// export function pipeTo<ChunkType>(
-// source: rs.SDReadableStream<ChunkType>,
-// dest: ws.WritableStream<ChunkType>,
-// options: PipeOptions
-// ): Promise<void> {
-// const preventClose = !!options.preventClose;
-// const preventAbort = !!options.preventAbort;
-// const preventCancel = !!options.preventCancel;
-// const signal = options.signal;
-
-// let shuttingDown = false;
-// let latestWrite = Promise.resolve();
-// const promise = shared.createControlledPromise<void>();
-
-// // If IsReadableByteStreamController(this.[[readableStreamController]]) is true, let reader be either ! AcquireReadableStreamBYOBReader(this) or ! AcquireReadableStreamDefaultReader(this), at the user agent’s discretion.
-// // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(this).
-// const reader = new ReadableStreamDefaultReader(source);
-// const writer = new WritableStreamDefaultWriter(dest);
-
-// let abortAlgorithm: () => any;
-// if (signal !== undefined) {
-// abortAlgorithm = (): void => {
-// // TODO this should be a DOMException,
-// // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/pipe-to.ts#L38
-// const error = new errors.Aborted("Aborted");
-// const actions: Array<() => Promise<void>> = [];
-// if (preventAbort === false) {
-// actions.push(() => {
-// if (dest[shared.state_] === "writable") {
-// return ws.writableStreamAbort(dest, error);
-// }
-// return Promise.resolve();
-// });
-// }
-// if (preventCancel === false) {
-// actions.push(() => {
-// if (source[shared.state_] === "readable") {
-// return rs.readableStreamCancel(source, error);
-// }
-// return Promise.resolve();
-// });
-// }
-// shutDown(
-// () => {
-// return Promise.all(actions.map(a => a())).then(_ => undefined);
-// },
-// { actualError: error }
-// );
-// };
-
-// if (signal.aborted === true) {
-// abortAlgorithm();
-// } else {
-// signal.addEventListener("abort", abortAlgorithm);
-// }
-// }
-
-// function onStreamErrored(
-// stream: rs.SDReadableStream<ChunkType> | ws.WritableStream<ChunkType>,
-// promise: Promise<void>,
-// action: (error: shared.ErrorResult) => void
-// ): void {
-// if (stream[shared.state_] === "errored") {
-// action(stream[shared.storedError_]);
-// } else {
-// promise.catch(action);
-// }
-// }
-
-// function onStreamClosed(
-// stream: rs.SDReadableStream<ChunkType> | ws.WritableStream<ChunkType>,
-// promise: Promise<void>,
-// action: () => void
-// ): void {
-// if (stream[shared.state_] === "closed") {
-// action();
-// } else {
-// promise.then(action);
-// }
-// }
-
-// onStreamErrored(source, reader[rs.closedPromise_].promise, error => {
-// if (!preventAbort) {
-// shutDown(() => ws.writableStreamAbort(dest, error), {
-// actualError: error
-// });
-// } else {
-// shutDown(undefined, { actualError: error });
-// }
-// });
-
-// onStreamErrored(dest, writer[ws.closedPromise_].promise, error => {
-// if (!preventCancel) {
-// shutDown(() => rs.readableStreamCancel(source, error), {
-// actualError: error
-// });
-// } else {
-// shutDown(undefined, { actualError: error });
-// }
-// });
-
-// onStreamClosed(source, reader[rs.closedPromise_].promise, () => {
-// if (!preventClose) {
-// shutDown(() =>
-// ws.writableStreamDefaultWriterCloseWithErrorPropagation(writer)
-// );
-// } else {
-// shutDown();
-// }
-// });
-
-// if (
-// ws.writableStreamCloseQueuedOrInFlight(dest) ||
-// dest[shared.state_] === "closed"
-// ) {
-// // Assert: no chunks have been read or written.
-// const destClosed = new TypeError();
-// if (!preventCancel) {
-// shutDown(() => rs.readableStreamCancel(source, destClosed), {
-// actualError: destClosed
-// });
-// } else {
-// shutDown(undefined, { actualError: destClosed });
-// }
-// }
-
-// function awaitLatestWrite(): Promise<void> {
-// const curLatestWrite = latestWrite;
-// return latestWrite.then(() =>
-// curLatestWrite === latestWrite ? undefined : awaitLatestWrite()
-// );
-// }
-
-// function flushRemainder(): Promise<void> | undefined {
-// if (
-// dest[shared.state_] === "writable" &&
-// !ws.writableStreamCloseQueuedOrInFlight(dest)
-// ) {
-// return awaitLatestWrite();
-// } else {
-// return undefined;
-// }
-// }
-
-// function shutDown(action?: () => Promise<void>, error?: ErrorWrapper): void {
-// if (shuttingDown) {
-// return;
-// }
-// shuttingDown = true;
-
-// if (action === undefined) {
-// action = (): Promise<void> => Promise.resolve();
-// }
-
-// function finishShutDown(): void {
-// action!().then(
-// _ => finalize(error),
-// newError => finalize({ actualError: newError })
-// );
-// }
-
-// const flushWait = flushRemainder();
-// if (flushWait) {
-// flushWait.then(finishShutDown);
-// } else {
-// finishShutDown();
-// }
-// }
-
-// function finalize(error?: ErrorWrapper): void {
-// ws.writableStreamDefaultWriterRelease(writer);
-// rs.readableStreamReaderGenericRelease(reader);
-// if (signal && abortAlgorithm) {
-// signal.removeEventListener("abort", abortAlgorithm);
-// }
-// if (error) {
-// promise.reject(error.actualError);
-// } else {
-// promise.resolve(undefined);
-// }
-// }
-
-// function next(): Promise<void> | undefined {
-// if (shuttingDown) {
-// return;
-// }
-
-// writer[ws.readyPromise_].promise.then(() => {
-// rs.readableStreamDefaultReaderRead(reader).then(
-// ({ value, done }) => {
-// if (done) {
-// return;
-// }
-// latestWrite = ws
-// .writableStreamDefaultWriterWrite(writer, value!)
-// .catch(() => {});
-// next();
-// },
-// _error => {
-// latestWrite = Promise.resolve();
-// }
-// );
-// });
-// }
-
-// next();
-
-// return promise.promise;
-// }
diff --git a/cli/js/web/streams/queue-mixin.ts b/cli/js/web/streams/queue-mixin.ts
deleted file mode 100644
index a7ed14974..000000000
--- a/cli/js/web/streams/queue-mixin.ts
+++ /dev/null
@@ -1,77 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-/* eslint-disable @typescript-eslint/no-explicit-any */
-// TODO reenable this lint here
-
-import { Queue, QueueImpl } from "./queue.ts";
-import { isFiniteNonNegativeNumber } from "./shared-internals.ts";
-
-export const queue_ = Symbol("queue_");
-export const queueTotalSize_ = Symbol("queueTotalSize_");
-
-export interface QueueElement<V> {
- value: V;
- size: number;
-}
-
-export interface QueueContainer<V> {
- [queue_]: Queue<QueueElement<V>>;
- [queueTotalSize_]: number;
-}
-
-export interface ByteQueueContainer {
- [queue_]: Queue<{
- buffer: ArrayBufferLike;
- byteOffset: number;
- byteLength: number;
- }>;
- [queueTotalSize_]: number;
-}
-
-export function dequeueValue<V>(container: QueueContainer<V>): V {
- // Assert: container has[[queue]] and[[queueTotalSize]] internal slots.
- // Assert: container.[[queue]] is not empty.
- const pair = container[queue_].shift()!;
- const newTotalSize = container[queueTotalSize_] - pair.size;
- container[queueTotalSize_] = Math.max(0, newTotalSize); // < 0 can occur due to rounding errors.
- return pair.value;
-}
-
-export function enqueueValueWithSize<V>(
- container: QueueContainer<V>,
- value: V,
- size: number
-): void {
- // Assert: container has[[queue]] and[[queueTotalSize]] internal slots.
- if (!isFiniteNonNegativeNumber(size)) {
- throw new RangeError("Chunk size must be a non-negative, finite numbers");
- }
- container[queue_].push({ value, size });
- container[queueTotalSize_] += size;
-}
-
-export function peekQueueValue<V>(container: QueueContainer<V>): V {
- // Assert: container has[[queue]] and[[queueTotalSize]] internal slots.
- // Assert: container.[[queue]] is not empty.
- return container[queue_].front()!.value;
-}
-
-export function resetQueue<V>(
- container: ByteQueueContainer | QueueContainer<V>
-): void {
- // Chrome (as of v67) has a steep performance cliff with large arrays
- // and shift(), around about 50k elements. While this is an unusual case
- // we use a simple wrapper around shift and push that is chunked to
- // avoid this pitfall.
- // @see: https://github.com/stardazed/sd-streams/issues/1
- container[queue_] = new QueueImpl<any>();
-
- // The code below can be used as a plain array implementation of the
- // Queue interface.
- // const q = [] as any;
- // q.front = function() { return this[0]; };
- // container[queue_] = q;
-
- container[queueTotalSize_] = 0;
-}
diff --git a/cli/js/web/streams/queue.ts b/cli/js/web/streams/queue.ts
deleted file mode 100644
index 16e3eafe4..000000000
--- a/cli/js/web/streams/queue.ts
+++ /dev/null
@@ -1,58 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-const CHUNK_SIZE = 16384;
-
-export interface Queue<T> {
- push(t: T): void;
- shift(): T | undefined;
- front(): T | undefined;
- readonly length: number;
-}
-
-export class QueueImpl<T> implements Queue<T> {
- private readonly chunks_: T[][];
- private readChunk_: T[];
- private writeChunk_: T[];
- private length_: number;
-
- constructor() {
- this.chunks_ = [[]];
- this.readChunk_ = this.writeChunk_ = this.chunks_[0];
- this.length_ = 0;
- }
-
- push(t: T): void {
- this.writeChunk_.push(t);
- this.length_ += 1;
- if (this.writeChunk_.length === CHUNK_SIZE) {
- this.writeChunk_ = [];
- this.chunks_.push(this.writeChunk_);
- }
- }
-
- front(): T | undefined {
- if (this.length_ === 0) {
- return undefined;
- }
- return this.readChunk_[0];
- }
-
- shift(): T | undefined {
- if (this.length_ === 0) {
- return undefined;
- }
- const t = this.readChunk_.shift();
-
- this.length_ -= 1;
- if (this.readChunk_.length === 0 && this.readChunk_ !== this.writeChunk_) {
- this.chunks_.shift();
- this.readChunk_ = this.chunks_[0];
- }
- return t;
- }
-
- get length(): number {
- return this.length_;
- }
-}
diff --git a/cli/js/web/streams/readable-byte-stream-controller.ts b/cli/js/web/streams/readable-byte-stream-controller.ts
deleted file mode 100644
index 19f259484..000000000
--- a/cli/js/web/streams/readable-byte-stream-controller.ts
+++ /dev/null
@@ -1,207 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-/* eslint-disable @typescript-eslint/no-explicit-any */
-// TODO reenable this lint here
-
-import * as rs from "./readable-internals.ts";
-import * as q from "./queue-mixin.ts";
-import * as shared from "./shared-internals.ts";
-import { ReadableStreamBYOBRequest } from "./readable-stream-byob-request.ts";
-import { Queue } from "./queue.ts";
-import { UnderlyingByteSource } from "../dom_types.d.ts";
-
-export class ReadableByteStreamController
- implements rs.SDReadableByteStreamController {
- [rs.autoAllocateChunkSize_]: number | undefined;
- [rs.byobRequest_]: rs.SDReadableStreamBYOBRequest | undefined;
- [rs.cancelAlgorithm_]: rs.CancelAlgorithm;
- [rs.closeRequested_]: boolean;
- [rs.controlledReadableByteStream_]: rs.SDReadableStream<ArrayBufferView>;
- [rs.pullAgain_]: boolean;
- [rs.pullAlgorithm_]: rs.PullAlgorithm<ArrayBufferView>;
- [rs.pulling_]: boolean;
- [rs.pendingPullIntos_]: rs.PullIntoDescriptor[];
- [rs.started_]: boolean;
- [rs.strategyHWM_]: number;
-
- [q.queue_]: Queue<{
- buffer: ArrayBufferLike;
- byteOffset: number;
- byteLength: number;
- }>;
- [q.queueTotalSize_]: number;
-
- constructor() {
- throw new TypeError();
- }
-
- get byobRequest(): rs.SDReadableStreamBYOBRequest | undefined {
- if (!rs.isReadableByteStreamController(this)) {
- throw new TypeError();
- }
- if (
- this[rs.byobRequest_] === undefined &&
- this[rs.pendingPullIntos_].length > 0
- ) {
- const firstDescriptor = this[rs.pendingPullIntos_][0];
- const view = new Uint8Array(
- firstDescriptor.buffer,
- firstDescriptor.byteOffset + firstDescriptor.bytesFilled,
- firstDescriptor.byteLength - firstDescriptor.bytesFilled
- );
- const byobRequest = Object.create(
- ReadableStreamBYOBRequest.prototype
- ) as ReadableStreamBYOBRequest;
- rs.setUpReadableStreamBYOBRequest(byobRequest, this, view);
- this[rs.byobRequest_] = byobRequest;
- }
- return this[rs.byobRequest_];
- }
-
- get desiredSize(): number | null {
- if (!rs.isReadableByteStreamController(this)) {
- throw new TypeError();
- }
- return rs.readableByteStreamControllerGetDesiredSize(this);
- }
-
- close(): void {
- if (!rs.isReadableByteStreamController(this)) {
- throw new TypeError();
- }
- if (this[rs.closeRequested_]) {
- throw new TypeError("Stream is already closing");
- }
- if (this[rs.controlledReadableByteStream_][shared.state_] !== "readable") {
- throw new TypeError("Stream is closed or errored");
- }
- rs.readableByteStreamControllerClose(this);
- }
-
- enqueue(chunk: ArrayBufferView): void {
- if (!rs.isReadableByteStreamController(this)) {
- throw new TypeError();
- }
- if (this[rs.closeRequested_]) {
- throw new TypeError("Stream is already closing");
- }
- if (this[rs.controlledReadableByteStream_][shared.state_] !== "readable") {
- throw new TypeError("Stream is closed or errored");
- }
- if (!ArrayBuffer.isView(chunk)) {
- throw new TypeError("chunk must be a valid ArrayBufferView");
- }
- // If ! IsDetachedBuffer(chunk.[[ViewedArrayBuffer]]) is true, throw a TypeError exception.
- return rs.readableByteStreamControllerEnqueue(this, chunk);
- }
-
- error(error?: shared.ErrorResult): void {
- if (!rs.isReadableByteStreamController(this)) {
- throw new TypeError();
- }
- rs.readableByteStreamControllerError(this, error);
- }
-
- [rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> {
- if (this[rs.pendingPullIntos_].length > 0) {
- const firstDescriptor = this[rs.pendingPullIntos_][0];
- firstDescriptor.bytesFilled = 0;
- }
- q.resetQueue(this);
- const result = this[rs.cancelAlgorithm_](reason);
- rs.readableByteStreamControllerClearAlgorithms(this);
- return result;
- }
-
- [rs.pullSteps_](
- forAuthorCode: boolean
- ): Promise<IteratorResult<ArrayBufferView, any>> {
- const stream = this[rs.controlledReadableByteStream_];
- // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
- if (this[q.queueTotalSize_] > 0) {
- // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
- const entry = this[q.queue_].shift()!;
- this[q.queueTotalSize_] -= entry.byteLength;
- rs.readableByteStreamControllerHandleQueueDrain(this);
- const view = new Uint8Array(
- entry.buffer,
- entry.byteOffset,
- entry.byteLength
- );
- return Promise.resolve(
- rs.readableStreamCreateReadResult(view, false, forAuthorCode)
- );
- }
- const autoAllocateChunkSize = this[rs.autoAllocateChunkSize_];
- if (autoAllocateChunkSize !== undefined) {
- let buffer: ArrayBuffer;
- try {
- buffer = new ArrayBuffer(autoAllocateChunkSize);
- } catch (error) {
- return Promise.reject(error);
- }
- const pullIntoDescriptor: rs.PullIntoDescriptor = {
- buffer,
- byteOffset: 0,
- byteLength: autoAllocateChunkSize,
- bytesFilled: 0,
- elementSize: 1,
- ctor: Uint8Array,
- readerType: "default",
- };
- this[rs.pendingPullIntos_].push(pullIntoDescriptor);
- }
-
- const promise = rs.readableStreamAddReadRequest(stream, forAuthorCode);
- rs.readableByteStreamControllerCallPullIfNeeded(this);
- return promise;
- }
-}
-
-export function setUpReadableByteStreamControllerFromUnderlyingSource(
- stream: rs.SDReadableStream<ArrayBufferView>,
- underlyingByteSource: UnderlyingByteSource,
- highWaterMark: number
-): void {
- // Assert: underlyingByteSource is not undefined.
- const controller = Object.create(
- ReadableByteStreamController.prototype
- ) as ReadableByteStreamController;
-
- const startAlgorithm = (): any => {
- return shared.invokeOrNoop(underlyingByteSource, "start", [controller]);
- };
- const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
- underlyingByteSource,
- "pull",
- [controller]
- );
- const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
- underlyingByteSource,
- "cancel",
- []
- );
-
- let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;
- if (autoAllocateChunkSize !== undefined) {
- autoAllocateChunkSize = Number(autoAllocateChunkSize);
- if (
- !shared.isInteger(autoAllocateChunkSize) ||
- autoAllocateChunkSize <= 0
- ) {
- throw new RangeError(
- "autoAllocateChunkSize must be a positive, finite integer"
- );
- }
- }
- rs.setUpReadableByteStreamController(
- stream,
- controller,
- startAlgorithm,
- pullAlgorithm,
- cancelAlgorithm,
- highWaterMark,
- autoAllocateChunkSize
- );
-}
diff --git a/cli/js/web/streams/readable-internals.ts b/cli/js/web/streams/readable-internals.ts
deleted file mode 100644
index 571ce50ed..000000000
--- a/cli/js/web/streams/readable-internals.ts
+++ /dev/null
@@ -1,1350 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-/* eslint-disable @typescript-eslint/no-explicit-any */
-// TODO reenable this lint here
-
-import * as shared from "./shared-internals.ts";
-import * as q from "./queue-mixin.ts";
-import {
- QueuingStrategy,
- QueuingStrategySizeCallback,
- UnderlyingSource,
- UnderlyingByteSource,
-} from "../dom_types.d.ts";
-
-// ReadableStreamDefaultController
-export const controlledReadableStream_ = Symbol("controlledReadableStream_");
-export const pullAlgorithm_ = Symbol("pullAlgorithm_");
-export const cancelAlgorithm_ = Symbol("cancelAlgorithm_");
-export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_");
-export const strategyHWM_ = Symbol("strategyHWM_");
-export const started_ = Symbol("started_");
-export const closeRequested_ = Symbol("closeRequested_");
-export const pullAgain_ = Symbol("pullAgain_");
-export const pulling_ = Symbol("pulling_");
-export const cancelSteps_ = Symbol("cancelSteps_");
-export const pullSteps_ = Symbol("pullSteps_");
-
-// ReadableByteStreamController
-export const autoAllocateChunkSize_ = Symbol("autoAllocateChunkSize_");
-export const byobRequest_ = Symbol("byobRequest_");
-export const controlledReadableByteStream_ = Symbol(
- "controlledReadableByteStream_"
-);
-export const pendingPullIntos_ = Symbol("pendingPullIntos_");
-
-// ReadableStreamDefaultReader
-export const closedPromise_ = Symbol("closedPromise_");
-export const ownerReadableStream_ = Symbol("ownerReadableStream_");
-export const readRequests_ = Symbol("readRequests_");
-export const readIntoRequests_ = Symbol("readIntoRequests_");
-
-// ReadableStreamBYOBRequest
-export const associatedReadableByteStreamController_ = Symbol(
- "associatedReadableByteStreamController_"
-);
-export const view_ = Symbol("view_");
-
-// ReadableStreamBYOBReader
-
-// ReadableStream
-export const reader_ = Symbol("reader_");
-export const readableStreamController_ = Symbol("readableStreamController_");
-
-export type StartFunction<OutputType> = (
- controller: SDReadableStreamControllerBase<OutputType>
-) => void | PromiseLike<void>;
-export type StartAlgorithm = () => Promise<void> | void;
-export type PullFunction<OutputType> = (
- controller: SDReadableStreamControllerBase<OutputType>
-) => void | PromiseLike<void>;
-export type PullAlgorithm<OutputType> = (
- controller: SDReadableStreamControllerBase<OutputType>
-) => PromiseLike<void>;
-export type CancelAlgorithm = (reason?: shared.ErrorResult) => Promise<void>;
-
-// ----
-
-export interface SDReadableStreamControllerBase<OutputType> {
- readonly desiredSize: number | null;
- close(): void;
- error(e?: shared.ErrorResult): void;
-
- [cancelSteps_](reason: shared.ErrorResult): Promise<void>;
- [pullSteps_](forAuthorCode: boolean): Promise<IteratorResult<OutputType>>;
-}
-
-export interface SDReadableStreamBYOBRequest {
- readonly view: ArrayBufferView;
- respond(bytesWritten: number): void;
- respondWithNewView(view: ArrayBufferView): void;
-
- [associatedReadableByteStreamController_]:
- | SDReadableByteStreamController
- | undefined;
- [view_]: ArrayBufferView | undefined;
-}
-
-interface ArrayBufferViewCtor {
- new (
- buffer: ArrayBufferLike,
- byteOffset?: number,
- byteLength?: number
- ): ArrayBufferView;
-}
-
-export interface PullIntoDescriptor {
- readerType: "default" | "byob";
- ctor: ArrayBufferViewCtor;
- buffer: ArrayBufferLike;
- byteOffset: number;
- byteLength: number;
- bytesFilled: number;
- elementSize: number;
-}
-
-export interface SDReadableByteStreamController
- extends SDReadableStreamControllerBase<ArrayBufferView>,
- q.ByteQueueContainer {
- readonly byobRequest: SDReadableStreamBYOBRequest | undefined;
- enqueue(chunk: ArrayBufferView): void;
-
- [autoAllocateChunkSize_]: number | undefined; // A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise.
- [byobRequest_]: SDReadableStreamBYOBRequest | undefined; // A ReadableStreamBYOBRequest instance representing the current BYOB pull request
- [cancelAlgorithm_]: CancelAlgorithm; // A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source
- [closeRequested_]: boolean; // A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read
- [controlledReadableByteStream_]: SDReadableStream<ArrayBufferView>; // The ReadableStream instance controlled
- [pullAgain_]: boolean; // A boolean flag set to true if the stream’s mechanisms requested a call to the underlying byte source’s pull() method to pull more data, but the pull could not yet be done since a previous call is still executing
- [pullAlgorithm_]: PullAlgorithm<ArrayBufferView>; // A promise-returning algorithm that pulls data from the underlying source
- [pulling_]: boolean; // A boolean flag set to true while the underlying byte source’s pull() method is executing and has not yet fulfilled, used to prevent reentrant calls
- [pendingPullIntos_]: PullIntoDescriptor[]; // A List of descriptors representing pending BYOB pull requests
- [started_]: boolean; // A boolean flag indicating whether the underlying source has finished starting
- [strategyHWM_]: number; // A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source
-}
-
-export interface SDReadableStreamDefaultController<OutputType>
- extends SDReadableStreamControllerBase<OutputType>,
- q.QueueContainer<OutputType> {
- enqueue(chunk?: OutputType): void;
-
- [controlledReadableStream_]: SDReadableStream<OutputType>;
- [pullAlgorithm_]: PullAlgorithm<OutputType>;
- [cancelAlgorithm_]: CancelAlgorithm;
- [strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>;
- [strategyHWM_]: number;
-
- [started_]: boolean;
- [closeRequested_]: boolean;
- [pullAgain_]: boolean;
- [pulling_]: boolean;
-}
-
-// ----
-
-export interface SDReadableStreamReader<OutputType> {
- readonly closed: Promise<void>;
- cancel(reason: shared.ErrorResult): Promise<void>;
- releaseLock(): void;
-
- [ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
- [closedPromise_]: shared.ControlledPromise<void>;
-}
-
-export interface ReadRequest<V> extends shared.ControlledPromise<V> {
- forAuthorCode: boolean;
-}
-
-export declare class SDReadableStreamDefaultReader<OutputType>
- implements SDReadableStreamReader<OutputType> {
- constructor(stream: SDReadableStream<OutputType>);
-
- readonly closed: Promise<void>;
- cancel(reason: shared.ErrorResult): Promise<void>;
- releaseLock(): void;
- read(): Promise<IteratorResult<OutputType | undefined>>;
-
- [ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
- [closedPromise_]: shared.ControlledPromise<void>;
- [readRequests_]: Array<ReadRequest<IteratorResult<OutputType>>>;
-}
-
-export declare class SDReadableStreamBYOBReader
- implements SDReadableStreamReader<ArrayBufferView> {
- constructor(stream: SDReadableStream<ArrayBufferView>);
-
- readonly closed: Promise<void>;
- cancel(reason: shared.ErrorResult): Promise<void>;
- releaseLock(): void;
- read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>>;
-
- [ownerReadableStream_]: SDReadableStream<ArrayBufferView> | undefined;
- [closedPromise_]: shared.ControlledPromise<void>;
- [readIntoRequests_]: Array<ReadRequest<IteratorResult<ArrayBufferView>>>;
-}
-
-/* TODO reenable this when we add WritableStreams and Transforms
-export interface GenericTransformStream<InputType, OutputType> {
- readable: SDReadableStream<OutputType>;
- writable: ws.WritableStream<InputType>;
-}
-*/
-
-export type ReadableStreamState = "readable" | "closed" | "errored";
-
-export declare class SDReadableStream<OutputType> {
- constructor(
- underlyingSource: UnderlyingByteSource,
- strategy?: { highWaterMark?: number; size?: undefined }
- );
- constructor(
- underlyingSource?: UnderlyingSource<OutputType>,
- strategy?: QueuingStrategy<OutputType>
- );
-
- readonly locked: boolean;
- cancel(reason?: shared.ErrorResult): Promise<void>;
- getReader(): SDReadableStreamReader<OutputType>;
- getReader(options: { mode: "byob" }): SDReadableStreamBYOBReader;
- tee(): Array<SDReadableStream<OutputType>>;
-
- /* TODO reenable these methods when we bring in writableStreams and transport types
- pipeThrough<ResultType>(
- transform: GenericTransformStream<OutputType, ResultType>,
- options?: PipeOptions
- ): SDReadableStream<ResultType>;
- pipeTo(
- dest: ws.WritableStream<OutputType>,
- options?: PipeOptions
- ): Promise<void>;
- */
- [shared.state_]: ReadableStreamState;
- [shared.storedError_]: shared.ErrorResult;
- [reader_]: SDReadableStreamReader<OutputType> | undefined;
- [readableStreamController_]: SDReadableStreamControllerBase<OutputType>;
-}
-
-// ---- Stream
-
-export function initializeReadableStream<OutputType>(
- stream: SDReadableStream<OutputType>
-): void {
- stream[shared.state_] = "readable";
- stream[reader_] = undefined;
- stream[shared.storedError_] = undefined;
- stream[readableStreamController_] = undefined!; // mark slot as used for brand check
-}
-
-export function isReadableStream(
- value: unknown
-): value is SDReadableStream<any> {
- if (typeof value !== "object" || value === null) {
- return false;
- }
- return readableStreamController_ in value;
-}
-
-export function isReadableStreamLocked<OutputType>(
- stream: SDReadableStream<OutputType>
-): boolean {
- return stream[reader_] !== undefined;
-}
-
-export function readableStreamGetNumReadIntoRequests<OutputType>(
- stream: SDReadableStream<OutputType>
-): number {
- // TODO remove the "as unknown" cast
- // This is in to workaround a compiler error
- // error TS2352: Conversion of type 'SDReadableStreamReader<OutputType>' to type 'SDReadableStreamBYOBReader' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first.
- // Type 'SDReadableStreamReader<OutputType>' is missing the following properties from type 'SDReadableStreamBYOBReader': read, [readIntoRequests_]
- const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
- if (reader === undefined) {
- return 0;
- }
- return reader[readIntoRequests_].length;
-}
-
-export function readableStreamGetNumReadRequests<OutputType>(
- stream: SDReadableStream<OutputType>
-): number {
- const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
- if (reader === undefined) {
- return 0;
- }
- return reader[readRequests_].length;
-}
-
-export function readableStreamCreateReadResult<T>(
- value: T,
- done: boolean,
- forAuthorCode: boolean
-): IteratorResult<T> {
- const prototype = forAuthorCode ? Object.prototype : null;
- const result = Object.create(prototype);
- result.value = value;
- result.done = done;
- return result;
-}
-
-export function readableStreamAddReadIntoRequest(
- stream: SDReadableStream<ArrayBufferView>,
- forAuthorCode: boolean
-): Promise<IteratorResult<ArrayBufferView, any>> {
- // Assert: ! IsReadableStreamBYOBReader(stream.[[reader]]) is true.
- // Assert: stream.[[state]] is "readable" or "closed".
- const reader = stream[reader_] as SDReadableStreamBYOBReader;
- const conProm = shared.createControlledPromise<
- IteratorResult<ArrayBufferView>
- >() as ReadRequest<IteratorResult<ArrayBufferView>>;
- conProm.forAuthorCode = forAuthorCode;
- reader[readIntoRequests_].push(conProm);
- return conProm.promise;
-}
-
-export function readableStreamAddReadRequest<OutputType>(
- stream: SDReadableStream<OutputType>,
- forAuthorCode: boolean
-): Promise<IteratorResult<OutputType, any>> {
- // Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.
- // Assert: stream.[[state]] is "readable".
- const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
- const conProm = shared.createControlledPromise<
- IteratorResult<OutputType>
- >() as ReadRequest<IteratorResult<OutputType>>;
- conProm.forAuthorCode = forAuthorCode;
- reader[readRequests_].push(conProm);
- return conProm.promise;
-}
-
-export function readableStreamHasBYOBReader<OutputType>(
- stream: SDReadableStream<OutputType>
-): boolean {
- const reader = stream[reader_];
- return isReadableStreamBYOBReader(reader);
-}
-
-export function readableStreamHasDefaultReader<OutputType>(
- stream: SDReadableStream<OutputType>
-): boolean {
- const reader = stream[reader_];
- return isReadableStreamDefaultReader(reader);
-}
-
-export function readableStreamCancel<OutputType>(
- stream: SDReadableStream<OutputType>,
- reason: shared.ErrorResult
-): Promise<undefined> {
- if (stream[shared.state_] === "closed") {
- return Promise.resolve(undefined);
- }
- if (stream[shared.state_] === "errored") {
- return Promise.reject(stream[shared.storedError_]);
- }
- readableStreamClose(stream);
-
- const sourceCancelPromise = stream[readableStreamController_][cancelSteps_](
- reason
- );
- return sourceCancelPromise.then((_) => undefined);
-}
-
-export function readableStreamClose<OutputType>(
- stream: SDReadableStream<OutputType>
-): void {
- // Assert: stream.[[state]] is "readable".
- stream[shared.state_] = "closed";
- const reader = stream[reader_];
- if (reader === undefined) {
- return;
- }
-
- if (isReadableStreamDefaultReader(reader)) {
- for (const readRequest of reader[readRequests_]) {
- readRequest.resolve(
- readableStreamCreateReadResult(
- undefined,
- true,
- readRequest.forAuthorCode
- )
- );
- }
- reader[readRequests_] = [];
- }
- reader[closedPromise_].resolve();
- reader[closedPromise_].promise.catch(() => {});
-}
-
-export function readableStreamError<OutputType>(
- stream: SDReadableStream<OutputType>,
- error: shared.ErrorResult
-): void {
- if (stream[shared.state_] !== "readable") {
- throw new RangeError("Stream is in an invalid state");
- }
- stream[shared.state_] = "errored";
- stream[shared.storedError_] = error;
-
- const reader = stream[reader_];
- if (reader === undefined) {
- return;
- }
- if (isReadableStreamDefaultReader(reader)) {
- for (const readRequest of reader[readRequests_]) {
- readRequest.reject(error);
- }
- reader[readRequests_] = [];
- } else {
- // Assert: IsReadableStreamBYOBReader(reader).
- // TODO remove the "as unknown" cast
- const readIntoRequests = ((reader as unknown) as SDReadableStreamBYOBReader)[
- readIntoRequests_
- ];
- for (const readIntoRequest of readIntoRequests) {
- readIntoRequest.reject(error);
- }
- // TODO remove the "as unknown" cast
- ((reader as unknown) as SDReadableStreamBYOBReader)[readIntoRequests_] = [];
- }
-
- reader[closedPromise_].reject(error);
-}
-
-// ---- Readers
-
-export function isReadableStreamDefaultReader(
- reader: unknown
-): reader is SDReadableStreamDefaultReader<any> {
- if (typeof reader !== "object" || reader === null) {
- return false;
- }
- return readRequests_ in reader;
-}
-
-export function isReadableStreamBYOBReader(
- reader: unknown
-): reader is SDReadableStreamBYOBReader {
- if (typeof reader !== "object" || reader === null) {
- return false;
- }
- return readIntoRequests_ in reader;
-}
-
-export function readableStreamReaderGenericInitialize<OutputType>(
- reader: SDReadableStreamReader<OutputType>,
- stream: SDReadableStream<OutputType>
-): void {
- reader[ownerReadableStream_] = stream;
- stream[reader_] = reader;
- const streamState = stream[shared.state_];
-
- reader[closedPromise_] = shared.createControlledPromise<void>();
- if (streamState === "readable") {
- // leave as is
- } else if (streamState === "closed") {
- reader[closedPromise_].resolve(undefined);
- } else {
- reader[closedPromise_].reject(stream[shared.storedError_]);
- reader[closedPromise_].promise.catch(() => {});
- }
-}
-
-export function readableStreamReaderGenericRelease<OutputType>(
- reader: SDReadableStreamReader<OutputType>
-): void {
- // Assert: reader.[[ownerReadableStream]] is not undefined.
- // Assert: reader.[[ownerReadableStream]].[[reader]] is reader.
- const stream = reader[ownerReadableStream_];
- if (stream === undefined) {
- throw new TypeError("Reader is in an inconsistent state");
- }
-
- if (stream[shared.state_] === "readable") {
- // code moved out
- } else {
- reader[closedPromise_] = shared.createControlledPromise<void>();
- }
- reader[closedPromise_].reject(new TypeError());
- reader[closedPromise_].promise.catch(() => {});
-
- stream[reader_] = undefined;
- reader[ownerReadableStream_] = undefined;
-}
-
-export function readableStreamBYOBReaderRead(
- reader: SDReadableStreamBYOBReader,
- view: ArrayBufferView,
- forAuthorCode = false
-): Promise<IteratorResult<ArrayBufferView, any>> {
- const stream = reader[ownerReadableStream_]!;
- // Assert: stream is not undefined.
-
- if (stream[shared.state_] === "errored") {
- return Promise.reject(stream[shared.storedError_]);
- }
- return readableByteStreamControllerPullInto(
- stream[readableStreamController_] as SDReadableByteStreamController,
- view,
- forAuthorCode
- );
-}
-
-export function readableStreamDefaultReaderRead<OutputType>(
- reader: SDReadableStreamDefaultReader<OutputType>,
- forAuthorCode = false
-): Promise<IteratorResult<OutputType | undefined>> {
- const stream = reader[ownerReadableStream_]!;
- // Assert: stream is not undefined.
-
- if (stream[shared.state_] === "closed") {
- return Promise.resolve(
- readableStreamCreateReadResult(undefined, true, forAuthorCode)
- );
- }
- if (stream[shared.state_] === "errored") {
- return Promise.reject(stream[shared.storedError_]);
- }
- // Assert: stream.[[state]] is "readable".
- return stream[readableStreamController_][pullSteps_](forAuthorCode);
-}
-
-export function readableStreamFulfillReadIntoRequest<OutputType>(
- stream: SDReadableStream<OutputType>,
- chunk: ArrayBufferView,
- done: boolean
-): void {
- // TODO remove the "as unknown" cast
- const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
- const readIntoRequest = reader[readIntoRequests_].shift()!; // <-- length check done in caller
- readIntoRequest.resolve(
- readableStreamCreateReadResult(chunk, done, readIntoRequest.forAuthorCode)
- );
-}
-
-export function readableStreamFulfillReadRequest<OutputType>(
- stream: SDReadableStream<OutputType>,
- chunk: OutputType,
- done: boolean
-): void {
- const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
- const readRequest = reader[readRequests_].shift()!; // <-- length check done in caller
- readRequest.resolve(
- readableStreamCreateReadResult(chunk, done, readRequest.forAuthorCode)
- );
-}
-
-// ---- DefaultController
-
-export function setUpReadableStreamDefaultController<OutputType>(
- stream: SDReadableStream<OutputType>,
- controller: SDReadableStreamDefaultController<OutputType>,
- startAlgorithm: StartAlgorithm,
- pullAlgorithm: PullAlgorithm<OutputType>,
- cancelAlgorithm: CancelAlgorithm,
- highWaterMark: number,
- sizeAlgorithm: QueuingStrategySizeCallback<OutputType>
-): void {
- // Assert: stream.[[readableStreamController]] is undefined.
- controller[controlledReadableStream_] = stream;
- q.resetQueue(controller);
- controller[started_] = false;
- controller[closeRequested_] = false;
- controller[pullAgain_] = false;
- controller[pulling_] = false;
- controller[strategySizeAlgorithm_] = sizeAlgorithm;
- controller[strategyHWM_] = highWaterMark;
- controller[pullAlgorithm_] = pullAlgorithm;
- controller[cancelAlgorithm_] = cancelAlgorithm;
- stream[readableStreamController_] = controller;
-
- const startResult = startAlgorithm();
- Promise.resolve(startResult).then(
- (_) => {
- controller[started_] = true;
- // Assert: controller.[[pulling]] is false.
- // Assert: controller.[[pullAgain]] is false.
- readableStreamDefaultControllerCallPullIfNeeded(controller);
- },
- (error) => {
- readableStreamDefaultControllerError(controller, error);
- }
- );
-}
-
-export function isReadableStreamDefaultController(
- value: unknown
-): value is SDReadableStreamDefaultController<any> {
- if (typeof value !== "object" || value === null) {
- return false;
- }
- return controlledReadableStream_ in value;
-}
-
-export function readableStreamDefaultControllerHasBackpressure<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): boolean {
- return !readableStreamDefaultControllerShouldCallPull(controller);
-}
-
-export function readableStreamDefaultControllerCanCloseOrEnqueue<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): boolean {
- const state = controller[controlledReadableStream_][shared.state_];
- return controller[closeRequested_] === false && state === "readable";
-}
-
-export function readableStreamDefaultControllerGetDesiredSize<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): number | null {
- const state = controller[controlledReadableStream_][shared.state_];
- if (state === "errored") {
- return null;
- }
- if (state === "closed") {
- return 0;
- }
- return controller[strategyHWM_] - controller[q.queueTotalSize_];
-}
-
-export function readableStreamDefaultControllerClose<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): void {
- // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
- controller[closeRequested_] = true;
- const stream = controller[controlledReadableStream_];
- if (controller[q.queue_].length === 0) {
- readableStreamDefaultControllerClearAlgorithms(controller);
- readableStreamClose(stream);
- }
-}
-
-export function readableStreamDefaultControllerEnqueue<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>,
- chunk: OutputType
-): void {
- const stream = controller[controlledReadableStream_];
- // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
- if (
- isReadableStreamLocked(stream) &&
- readableStreamGetNumReadRequests(stream) > 0
- ) {
- readableStreamFulfillReadRequest(stream, chunk, false);
- } else {
- // Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk,
- // and interpreting the result as an ECMAScript completion value.
- // impl note: assuming that in JS land this just means try/catch with rethrow
- let chunkSize: number;
- try {
- chunkSize = controller[strategySizeAlgorithm_](chunk);
- } catch (error) {
- readableStreamDefaultControllerError(controller, error);
- throw error;
- }
- try {
- q.enqueueValueWithSize(controller, chunk, chunkSize);
- } catch (error) {
- readableStreamDefaultControllerError(controller, error);
- throw error;
- }
- }
- readableStreamDefaultControllerCallPullIfNeeded(controller);
-}
-
-export function readableStreamDefaultControllerError<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>,
- error: shared.ErrorResult
-): void {
- const stream = controller[controlledReadableStream_];
- if (stream[shared.state_] !== "readable") {
- return;
- }
- q.resetQueue(controller);
- readableStreamDefaultControllerClearAlgorithms(controller);
- readableStreamError(stream, error);
-}
-
-export function readableStreamDefaultControllerCallPullIfNeeded<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): void {
- if (!readableStreamDefaultControllerShouldCallPull(controller)) {
- return;
- }
- if (controller[pulling_]) {
- controller[pullAgain_] = true;
- return;
- }
- if (controller[pullAgain_]) {
- throw new RangeError("Stream controller is in an invalid state.");
- }
-
- controller[pulling_] = true;
- controller[pullAlgorithm_](controller).then(
- (_) => {
- controller[pulling_] = false;
- if (controller[pullAgain_]) {
- controller[pullAgain_] = false;
- readableStreamDefaultControllerCallPullIfNeeded(controller);
- }
- },
- (error) => {
- readableStreamDefaultControllerError(controller, error);
- }
- );
-}
-
-export function readableStreamDefaultControllerShouldCallPull<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): boolean {
- const stream = controller[controlledReadableStream_];
- if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
- return false;
- }
- if (controller[started_] === false) {
- return false;
- }
- if (
- isReadableStreamLocked(stream) &&
- readableStreamGetNumReadRequests(stream) > 0
- ) {
- return true;
- }
- const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
- if (desiredSize === null) {
- throw new RangeError("Stream is in an invalid state.");
- }
- return desiredSize > 0;
-}
-
-export function readableStreamDefaultControllerClearAlgorithms<OutputType>(
- controller: SDReadableStreamDefaultController<OutputType>
-): void {
- controller[pullAlgorithm_] = undefined!;
- controller[cancelAlgorithm_] = undefined!;
- controller[strategySizeAlgorithm_] = undefined!;
-}
-
-// ---- BYOBController
-
-export function setUpReadableByteStreamController(
- stream: SDReadableStream<ArrayBufferView>,
- controller: SDReadableByteStreamController,
- startAlgorithm: StartAlgorithm,
- pullAlgorithm: PullAlgorithm<ArrayBufferView>,
- cancelAlgorithm: CancelAlgorithm,
- highWaterMark: number,
- autoAllocateChunkSize: number | undefined
-): void {
- // Assert: stream.[[readableStreamController]] is undefined.
- if (stream[readableStreamController_] !== undefined) {
- throw new TypeError("Cannot reuse streams");
- }
- if (autoAllocateChunkSize !== undefined) {
- if (
- !shared.isInteger(autoAllocateChunkSize) ||
- autoAllocateChunkSize <= 0
- ) {
- throw new RangeError(
- "autoAllocateChunkSize must be a positive, finite integer"
- );
- }
- }
- // Set controller.[[controlledReadableByteStream]] to stream.
- controller[controlledReadableByteStream_] = stream;
- // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
- controller[pullAgain_] = false;
- controller[pulling_] = false;
- readableByteStreamControllerClearPendingPullIntos(controller);
- q.resetQueue(controller);
- controller[closeRequested_] = false;
- controller[started_] = false;
- controller[strategyHWM_] = shared.validateAndNormalizeHighWaterMark(
- highWaterMark
- );
- controller[pullAlgorithm_] = pullAlgorithm;
- controller[cancelAlgorithm_] = cancelAlgorithm;
- controller[autoAllocateChunkSize_] = autoAllocateChunkSize;
- controller[pendingPullIntos_] = [];
- stream[readableStreamController_] = controller;
-
- // Let startResult be the result of performing startAlgorithm.
- const startResult = startAlgorithm();
- Promise.resolve(startResult).then(
- (_) => {
- controller[started_] = true;
- // Assert: controller.[[pulling]] is false.
- // Assert: controller.[[pullAgain]] is false.
- readableByteStreamControllerCallPullIfNeeded(controller);
- },
- (error) => {
- readableByteStreamControllerError(controller, error);
- }
- );
-}
-
-export function isReadableStreamBYOBRequest(
- value: unknown
-): value is SDReadableStreamBYOBRequest {
- if (typeof value !== "object" || value === null) {
- return false;
- }
- return associatedReadableByteStreamController_ in value;
-}
-
-export function isReadableByteStreamController(
- value: unknown
-): value is SDReadableByteStreamController {
- if (typeof value !== "object" || value === null) {
- return false;
- }
- return controlledReadableByteStream_ in value;
-}
-
-export function readableByteStreamControllerCallPullIfNeeded(
- controller: SDReadableByteStreamController
-): void {
- if (!readableByteStreamControllerShouldCallPull(controller)) {
- return;
- }
- if (controller[pulling_]) {
- controller[pullAgain_] = true;
- return;
- }
- // Assert: controller.[[pullAgain]] is false.
- controller[pulling_] = true;
- controller[pullAlgorithm_](controller).then(
- (_) => {
- controller[pulling_] = false;
- if (controller[pullAgain_]) {
- controller[pullAgain_] = false;
- readableByteStreamControllerCallPullIfNeeded(controller);
- }
- },
- (error) => {
- readableByteStreamControllerError(controller, error);
- }
- );
-}
-
-export function readableByteStreamControllerClearAlgorithms(
- controller: SDReadableByteStreamController
-): void {
- controller[pullAlgorithm_] = undefined!;
- controller[cancelAlgorithm_] = undefined!;
-}
-
-export function readableByteStreamControllerClearPendingPullIntos(
- controller: SDReadableByteStreamController
-): void {
- readableByteStreamControllerInvalidateBYOBRequest(controller);
- controller[pendingPullIntos_] = [];
-}
-
-export function readableByteStreamControllerClose(
- controller: SDReadableByteStreamController
-): void {
- const stream = controller[controlledReadableByteStream_];
- // Assert: controller.[[closeRequested]] is false.
- // Assert: stream.[[state]] is "readable".
- if (controller[q.queueTotalSize_] > 0) {
- controller[closeRequested_] = true;
- return;
- }
- if (controller[pendingPullIntos_].length > 0) {
- const firstPendingPullInto = controller[pendingPullIntos_][0];
- if (firstPendingPullInto.bytesFilled > 0) {
- const error = new TypeError();
- readableByteStreamControllerError(controller, error);
- throw error;
- }
- }
- readableByteStreamControllerClearAlgorithms(controller);
- readableStreamClose(stream);
-}
-
-export function readableByteStreamControllerCommitPullIntoDescriptor(
- stream: SDReadableStream<ArrayBufferView>,
- pullIntoDescriptor: PullIntoDescriptor
-): void {
- // Assert: stream.[[state]] is not "errored".
- let done = false;
- if (stream[shared.state_] === "closed") {
- // Assert: pullIntoDescriptor.[[bytesFilled]] is 0.
- done = true;
- }
- const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
- pullIntoDescriptor
- );
- if (pullIntoDescriptor.readerType === "default") {
- readableStreamFulfillReadRequest(stream, filledView, done);
- } else {
- // Assert: pullIntoDescriptor.[[readerType]] is "byob".
- readableStreamFulfillReadIntoRequest(stream, filledView, done);
- }
-}
-
-export function readableByteStreamControllerConvertPullIntoDescriptor(
- pullIntoDescriptor: PullIntoDescriptor
-): ArrayBufferView {
- const { bytesFilled, elementSize } = pullIntoDescriptor;
- // Assert: bytesFilled <= pullIntoDescriptor.byteLength
- // Assert: bytesFilled mod elementSize is 0
- return new pullIntoDescriptor.ctor(
- pullIntoDescriptor.buffer,
- pullIntoDescriptor.byteOffset,
- bytesFilled / elementSize
- );
-}
-
-export function readableByteStreamControllerEnqueue(
- controller: SDReadableByteStreamController,
- chunk: ArrayBufferView
-): void {
- const stream = controller[controlledReadableByteStream_];
- // Assert: controller.[[closeRequested]] is false.
- // Assert: stream.[[state]] is "readable".
- const { buffer, byteOffset, byteLength } = chunk;
-
- const transferredBuffer = shared.transferArrayBuffer(buffer);
-
- if (readableStreamHasDefaultReader(stream)) {
- if (readableStreamGetNumReadRequests(stream) === 0) {
- readableByteStreamControllerEnqueueChunkToQueue(
- controller,
- transferredBuffer,
- byteOffset,
- byteLength
- );
- } else {
- // Assert: controller.[[queue]] is empty.
- const transferredView = new Uint8Array(
- transferredBuffer,
- byteOffset,
- byteLength
- );
- readableStreamFulfillReadRequest(stream, transferredView, false);
- }
- } else if (readableStreamHasBYOBReader(stream)) {
- readableByteStreamControllerEnqueueChunkToQueue(
- controller,
- transferredBuffer,
- byteOffset,
- byteLength
- );
- readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
- controller
- );
- } else {
- // Assert: !IsReadableStreamLocked(stream) is false.
- readableByteStreamControllerEnqueueChunkToQueue(
- controller,
- transferredBuffer,
- byteOffset,
- byteLength
- );
- }
- readableByteStreamControllerCallPullIfNeeded(controller);
-}
-
-export function readableByteStreamControllerEnqueueChunkToQueue(
- controller: SDReadableByteStreamController,
- buffer: ArrayBufferLike,
- byteOffset: number,
- byteLength: number
-): void {
- controller[q.queue_].push({ buffer, byteOffset, byteLength });
- controller[q.queueTotalSize_] += byteLength;
-}
-
-export function readableByteStreamControllerError(
- controller: SDReadableByteStreamController,
- error: shared.ErrorResult
-): void {
- const stream = controller[controlledReadableByteStream_];
- if (stream[shared.state_] !== "readable") {
- return;
- }
- readableByteStreamControllerClearPendingPullIntos(controller);
- q.resetQueue(controller);
- readableByteStreamControllerClearAlgorithms(controller);
- readableStreamError(stream, error);
-}
-
-export function readableByteStreamControllerFillHeadPullIntoDescriptor(
- controller: SDReadableByteStreamController,
- size: number,
- pullIntoDescriptor: PullIntoDescriptor
-): void {
- // Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor.
- readableByteStreamControllerInvalidateBYOBRequest(controller);
- pullIntoDescriptor.bytesFilled += size;
-}
-
-export function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
- controller: SDReadableByteStreamController,
- pullIntoDescriptor: PullIntoDescriptor
-): boolean {
- const elementSize = pullIntoDescriptor.elementSize;
- const currentAlignedBytes =
- pullIntoDescriptor.bytesFilled -
- (pullIntoDescriptor.bytesFilled % elementSize);
- const maxBytesToCopy = Math.min(
- controller[q.queueTotalSize_],
- pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled
- );
- const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
- const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
- let totalBytesToCopyRemaining = maxBytesToCopy;
- let ready = false;
-
- if (maxAlignedBytes > currentAlignedBytes) {
- totalBytesToCopyRemaining =
- maxAlignedBytes - pullIntoDescriptor.bytesFilled;
- ready = true;
- }
- const queue = controller[q.queue_];
-
- while (totalBytesToCopyRemaining > 0) {
- const headOfQueue = queue.front()!;
- const bytesToCopy = Math.min(
- totalBytesToCopyRemaining,
- headOfQueue.byteLength
- );
- const destStart =
- pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
- shared.copyDataBlockBytes(
- pullIntoDescriptor.buffer,
- destStart,
- headOfQueue.buffer,
- headOfQueue.byteOffset,
- bytesToCopy
- );
- if (headOfQueue.byteLength === bytesToCopy) {
- queue.shift();
- } else {
- headOfQueue.byteOffset += bytesToCopy;
- headOfQueue.byteLength -= bytesToCopy;
- }
- controller[q.queueTotalSize_] -= bytesToCopy;
- readableByteStreamControllerFillHeadPullIntoDescriptor(
- controller,
- bytesToCopy,
- pullIntoDescriptor
- );
- totalBytesToCopyRemaining -= bytesToCopy;
- }
- if (!ready) {
- // Assert: controller[queueTotalSize_] === 0
- // Assert: pullIntoDescriptor.bytesFilled > 0
- // Assert: pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize
- }
- return ready;
-}
-
-export function readableByteStreamControllerGetDesiredSize(
- controller: SDReadableByteStreamController
-): number | null {
- const stream = controller[controlledReadableByteStream_];
- const state = stream[shared.state_];
- if (state === "errored") {
- return null;
- }
- if (state === "closed") {
- return 0;
- }
- return controller[strategyHWM_] - controller[q.queueTotalSize_];
-}
-
-export function readableByteStreamControllerHandleQueueDrain(
- controller: SDReadableByteStreamController
-): void {
- // Assert: controller.[[controlledReadableByteStream]].[[state]] is "readable".
- if (controller[q.queueTotalSize_] === 0 && controller[closeRequested_]) {
- readableByteStreamControllerClearAlgorithms(controller);
- readableStreamClose(controller[controlledReadableByteStream_]);
- } else {
- readableByteStreamControllerCallPullIfNeeded(controller);
- }
-}
-
-export function readableByteStreamControllerInvalidateBYOBRequest(
- controller: SDReadableByteStreamController
-): void {
- const byobRequest = controller[byobRequest_];
- if (byobRequest === undefined) {
- return;
- }
- byobRequest[associatedReadableByteStreamController_] = undefined;
- byobRequest[view_] = undefined;
- controller[byobRequest_] = undefined;
-}
-
-export function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
- controller: SDReadableByteStreamController
-): void {
- // Assert: controller.[[closeRequested]] is false.
- const pendingPullIntos = controller[pendingPullIntos_];
- while (pendingPullIntos.length > 0) {
- if (controller[q.queueTotalSize_] === 0) {
- return;
- }
- const pullIntoDescriptor = pendingPullIntos[0];
- if (
- readableByteStreamControllerFillPullIntoDescriptorFromQueue(
- controller,
- pullIntoDescriptor
- )
- ) {
- readableByteStreamControllerShiftPendingPullInto(controller);
- readableByteStreamControllerCommitPullIntoDescriptor(
- controller[controlledReadableByteStream_],
- pullIntoDescriptor
- );
- }
- }
-}
-
-export function readableByteStreamControllerPullInto(
- controller: SDReadableByteStreamController,
- view: ArrayBufferView,
- forAuthorCode: boolean
-): Promise<IteratorResult<ArrayBufferView, any>> {
- const stream = controller[controlledReadableByteStream_];
-
- const elementSize = (view as Uint8Array).BYTES_PER_ELEMENT || 1; // DataView exposes this in Webkit as 1, is not present in FF or Blink
- const ctor = view.constructor as Uint8ArrayConstructor; // the typecast here is just for TS typing, it does not influence buffer creation
-
- const byteOffset = view.byteOffset;
- const byteLength = view.byteLength;
- const buffer = shared.transferArrayBuffer(view.buffer);
- const pullIntoDescriptor: PullIntoDescriptor = {
- buffer,
- byteOffset,
- byteLength,
- bytesFilled: 0,
- elementSize,
- ctor,
- readerType: "byob",
- };
-
- if (controller[pendingPullIntos_].length > 0) {
- controller[pendingPullIntos_].push(pullIntoDescriptor);
- return readableStreamAddReadIntoRequest(stream, forAuthorCode);
- }
- if (stream[shared.state_] === "closed") {
- const emptyView = new ctor(
- pullIntoDescriptor.buffer,
- pullIntoDescriptor.byteOffset,
- 0
- );
- return Promise.resolve(
- readableStreamCreateReadResult(emptyView, true, forAuthorCode)
- );
- }
-
- if (controller[q.queueTotalSize_] > 0) {
- if (
- readableByteStreamControllerFillPullIntoDescriptorFromQueue(
- controller,
- pullIntoDescriptor
- )
- ) {
- const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
- pullIntoDescriptor
- );
- readableByteStreamControllerHandleQueueDrain(controller);
- return Promise.resolve(
- readableStreamCreateReadResult(filledView, false, forAuthorCode)
- );
- }
- if (controller[closeRequested_]) {
- const error = new TypeError();
- readableByteStreamControllerError(controller, error);
- return Promise.reject(error);
- }
- }
-
- controller[pendingPullIntos_].push(pullIntoDescriptor);
- const promise = readableStreamAddReadIntoRequest(stream, forAuthorCode);
- readableByteStreamControllerCallPullIfNeeded(controller);
- return promise;
-}
-
-export function readableByteStreamControllerRespond(
- controller: SDReadableByteStreamController,
- bytesWritten: number
-): void {
- bytesWritten = Number(bytesWritten);
- if (!shared.isFiniteNonNegativeNumber(bytesWritten)) {
- throw new RangeError("bytesWritten must be a finite, non-negative number");
- }
- // Assert: controller.[[pendingPullIntos]] is not empty.
- readableByteStreamControllerRespondInternal(controller, bytesWritten);
-}
-
-export function readableByteStreamControllerRespondInClosedState(
- controller: SDReadableByteStreamController,
- firstDescriptor: PullIntoDescriptor
-): void {
- firstDescriptor.buffer = shared.transferArrayBuffer(firstDescriptor.buffer);
- // Assert: firstDescriptor.[[bytesFilled]] is 0.
- const stream = controller[controlledReadableByteStream_];
- if (readableStreamHasBYOBReader(stream)) {
- while (readableStreamGetNumReadIntoRequests(stream) > 0) {
- const pullIntoDescriptor = readableByteStreamControllerShiftPendingPullInto(
- controller
- )!;
- readableByteStreamControllerCommitPullIntoDescriptor(
- stream,
- pullIntoDescriptor
- );
- }
- }
-}
-
-export function readableByteStreamControllerRespondInReadableState(
- controller: SDReadableByteStreamController,
- bytesWritten: number,
- pullIntoDescriptor: PullIntoDescriptor
-): void {
- if (
- pullIntoDescriptor.bytesFilled + bytesWritten >
- pullIntoDescriptor.byteLength
- ) {
- throw new RangeError();
- }
- readableByteStreamControllerFillHeadPullIntoDescriptor(
- controller,
- bytesWritten,
- pullIntoDescriptor
- );
- if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
- return;
- }
- readableByteStreamControllerShiftPendingPullInto(controller);
- const remainderSize =
- pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
- if (remainderSize > 0) {
- const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
- const remainder = shared.cloneArrayBuffer(
- pullIntoDescriptor.buffer,
- end - remainderSize,
- remainderSize,
- ArrayBuffer
- );
- readableByteStreamControllerEnqueueChunkToQueue(
- controller,
- remainder,
- 0,
- remainder.byteLength
- );
- }
- pullIntoDescriptor.buffer = shared.transferArrayBuffer(
- pullIntoDescriptor.buffer
- );
- pullIntoDescriptor.bytesFilled =
- pullIntoDescriptor.bytesFilled - remainderSize;
- readableByteStreamControllerCommitPullIntoDescriptor(
- controller[controlledReadableByteStream_],
- pullIntoDescriptor
- );
- readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
-}
-
-export function readableByteStreamControllerRespondInternal(
- controller: SDReadableByteStreamController,
- bytesWritten: number
-): void {
- const firstDescriptor = controller[pendingPullIntos_][0];
- const stream = controller[controlledReadableByteStream_];
- if (stream[shared.state_] === "closed") {
- if (bytesWritten !== 0) {
- throw new TypeError();
- }
- readableByteStreamControllerRespondInClosedState(
- controller,
- firstDescriptor
- );
- } else {
- // Assert: stream.[[state]] is "readable".
- readableByteStreamControllerRespondInReadableState(
- controller,
- bytesWritten,
- firstDescriptor
- );
- }
- readableByteStreamControllerCallPullIfNeeded(controller);
-}
-
-export function readableByteStreamControllerRespondWithNewView(
- controller: SDReadableByteStreamController,
- view: ArrayBufferView
-): void {
- // Assert: controller.[[pendingPullIntos]] is not empty.
- const firstDescriptor = controller[pendingPullIntos_][0];
- if (
- firstDescriptor.byteOffset + firstDescriptor.bytesFilled !==
- view.byteOffset
- ) {
- throw new RangeError();
- }
- if (firstDescriptor.byteLength !== view.byteLength) {
- throw new RangeError();
- }
- firstDescriptor.buffer = view.buffer;
- readableByteStreamControllerRespondInternal(controller, view.byteLength);
-}
-
-export function readableByteStreamControllerShiftPendingPullInto(
- controller: SDReadableByteStreamController
-): PullIntoDescriptor | undefined {
- const descriptor = controller[pendingPullIntos_].shift();
- readableByteStreamControllerInvalidateBYOBRequest(controller);
- return descriptor;
-}
-
-export function readableByteStreamControllerShouldCallPull(
- controller: SDReadableByteStreamController
-): boolean {
- // Let stream be controller.[[controlledReadableByteStream]].
- const stream = controller[controlledReadableByteStream_];
- if (stream[shared.state_] !== "readable") {
- return false;
- }
- if (controller[closeRequested_]) {
- return false;
- }
- if (!controller[started_]) {
- return false;
- }
- if (
- readableStreamHasDefaultReader(stream) &&
- readableStreamGetNumReadRequests(stream) > 0
- ) {
- return true;
- }
- if (
- readableStreamHasBYOBReader(stream) &&
- readableStreamGetNumReadIntoRequests(stream) > 0
- ) {
- return true;
- }
- const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
- // Assert: desiredSize is not null.
- return desiredSize! > 0;
-}
-
-export function setUpReadableStreamBYOBRequest(
- request: SDReadableStreamBYOBRequest,
- controller: SDReadableByteStreamController,
- view: ArrayBufferView
-): void {
- if (!isReadableByteStreamController(controller)) {
- throw new TypeError();
- }
- if (!ArrayBuffer.isView(view)) {
- throw new TypeError();
- }
- // Assert: !IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
-
- request[associatedReadableByteStreamController_] = controller;
- request[view_] = view;
-}
diff --git a/cli/js/web/streams/readable-stream-byob-reader.ts b/cli/js/web/streams/readable-stream-byob-reader.ts
deleted file mode 100644
index 8527f8db9..000000000
--- a/cli/js/web/streams/readable-stream-byob-reader.ts
+++ /dev/null
@@ -1,86 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-import * as rs from "./readable-internals.ts";
-import * as shared from "./shared-internals.ts";
-
-export class SDReadableStreamBYOBReader
- implements rs.SDReadableStreamBYOBReader {
- [rs.closedPromise_]: shared.ControlledPromise<void>;
- [rs.ownerReadableStream_]: rs.SDReadableStream<ArrayBufferView> | undefined;
- [rs.readIntoRequests_]: Array<
- rs.ReadRequest<IteratorResult<ArrayBufferView>>
- >;
-
- constructor(stream: rs.SDReadableStream<ArrayBufferView>) {
- if (!rs.isReadableStream(stream)) {
- throw new TypeError();
- }
- if (
- !rs.isReadableByteStreamController(stream[rs.readableStreamController_])
- ) {
- throw new TypeError();
- }
- if (rs.isReadableStreamLocked(stream)) {
- throw new TypeError("The stream is locked.");
- }
- rs.readableStreamReaderGenericInitialize(this, stream);
- this[rs.readIntoRequests_] = [];
- }
-
- get closed(): Promise<void> {
- if (!rs.isReadableStreamBYOBReader(this)) {
- return Promise.reject(new TypeError());
- }
- return this[rs.closedPromise_].promise;
- }
-
- cancel(reason: shared.ErrorResult): Promise<void> {
- if (!rs.isReadableStreamBYOBReader(this)) {
- return Promise.reject(new TypeError());
- }
- const stream = this[rs.ownerReadableStream_];
- if (stream === undefined) {
- return Promise.reject(
- new TypeError("Reader is not associated with a stream")
- );
- }
- return rs.readableStreamCancel(stream, reason);
- }
-
- read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>> {
- if (!rs.isReadableStreamBYOBReader(this)) {
- return Promise.reject(new TypeError());
- }
- if (this[rs.ownerReadableStream_] === undefined) {
- return Promise.reject(
- new TypeError("Reader is not associated with a stream")
- );
- }
- if (!ArrayBuffer.isView(view)) {
- return Promise.reject(
- new TypeError("view argument must be a valid ArrayBufferView")
- );
- }
- // If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true, return a promise rejected with a TypeError exception.
- if (view.byteLength === 0) {
- return Promise.reject(
- new TypeError("supplied buffer view must be > 0 bytes")
- );
- }
- return rs.readableStreamBYOBReaderRead(this, view, true);
- }
-
- releaseLock(): void {
- if (!rs.isReadableStreamBYOBReader(this)) {
- throw new TypeError();
- }
- if (this[rs.ownerReadableStream_] === undefined) {
- throw new TypeError("Reader is not associated with a stream");
- }
- if (this[rs.readIntoRequests_].length > 0) {
- throw new TypeError();
- }
- rs.readableStreamReaderGenericRelease(this);
- }
-}
diff --git a/cli/js/web/streams/readable-stream-byob-request.ts b/cli/js/web/streams/readable-stream-byob-request.ts
deleted file mode 100644
index 75ca1ddfe..000000000
--- a/cli/js/web/streams/readable-stream-byob-request.ts
+++ /dev/null
@@ -1,53 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-import * as rs from "./readable-internals.ts";
-
-export class ReadableStreamBYOBRequest {
- [rs.associatedReadableByteStreamController_]:
- | rs.SDReadableByteStreamController
- | undefined;
- [rs.view_]: ArrayBufferView | undefined;
-
- constructor() {
- throw new TypeError();
- }
-
- get view(): ArrayBufferView {
- if (!rs.isReadableStreamBYOBRequest(this)) {
- throw new TypeError();
- }
- return this[rs.view_]!;
- }
-
- respond(bytesWritten: number): void {
- if (!rs.isReadableStreamBYOBRequest(this)) {
- throw new TypeError();
- }
- if (this[rs.associatedReadableByteStreamController_] === undefined) {
- throw new TypeError();
- }
- // If! IsDetachedBuffer(this.[[view]].[[ViewedArrayBuffer]]) is true, throw a TypeError exception.
- return rs.readableByteStreamControllerRespond(
- this[rs.associatedReadableByteStreamController_]!,
- bytesWritten
- );
- }
-
- respondWithNewView(view: ArrayBufferView): void {
- if (!rs.isReadableStreamBYOBRequest(this)) {
- throw new TypeError();
- }
- if (this[rs.associatedReadableByteStreamController_] === undefined) {
- throw new TypeError();
- }
- if (!ArrayBuffer.isView(view)) {
- throw new TypeError("view parameter must be a TypedArray");
- }
- // If! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true, throw a TypeError exception.
- return rs.readableByteStreamControllerRespondWithNewView(
- this[rs.associatedReadableByteStreamController_]!,
- view
- );
- }
-}
diff --git a/cli/js/web/streams/readable-stream-default-controller.ts b/cli/js/web/streams/readable-stream-default-controller.ts
deleted file mode 100644
index 5d07dba53..000000000
--- a/cli/js/web/streams/readable-stream-default-controller.ts
+++ /dev/null
@@ -1,135 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-/* eslint-disable @typescript-eslint/no-explicit-any */
-// TODO reenable this lint here
-
-import * as rs from "./readable-internals.ts";
-import * as shared from "./shared-internals.ts";
-import * as q from "./queue-mixin.ts";
-import { Queue } from "./queue.ts";
-import {
- QueuingStrategySizeCallback,
- UnderlyingSource,
-} from "../dom_types.d.ts";
-
-export class ReadableStreamDefaultController<OutputType>
- implements rs.SDReadableStreamDefaultController<OutputType> {
- [rs.cancelAlgorithm_]: rs.CancelAlgorithm;
- [rs.closeRequested_]: boolean;
- [rs.controlledReadableStream_]: rs.SDReadableStream<OutputType>;
- [rs.pullAgain_]: boolean;
- [rs.pullAlgorithm_]: rs.PullAlgorithm<OutputType>;
- [rs.pulling_]: boolean;
- [rs.strategyHWM_]: number;
- [rs.strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>;
- [rs.started_]: boolean;
-
- [q.queue_]: Queue<q.QueueElement<OutputType>>;
- [q.queueTotalSize_]: number;
-
- constructor() {
- throw new TypeError();
- }
-
- get desiredSize(): number | null {
- return rs.readableStreamDefaultControllerGetDesiredSize(this);
- }
-
- close(): void {
- if (!rs.isReadableStreamDefaultController(this)) {
- throw new TypeError();
- }
- if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
- throw new TypeError(
- "Cannot close, the stream is already closing or not readable"
- );
- }
- rs.readableStreamDefaultControllerClose(this);
- }
-
- enqueue(chunk?: OutputType): void {
- if (!rs.isReadableStreamDefaultController(this)) {
- throw new TypeError();
- }
- if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
- throw new TypeError(
- "Cannot enqueue, the stream is closing or not readable"
- );
- }
- rs.readableStreamDefaultControllerEnqueue(this, chunk!);
- }
-
- error(e?: shared.ErrorResult): void {
- if (!rs.isReadableStreamDefaultController(this)) {
- throw new TypeError();
- }
- rs.readableStreamDefaultControllerError(this, e);
- }
-
- [rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> {
- q.resetQueue(this);
- const result = this[rs.cancelAlgorithm_](reason);
- rs.readableStreamDefaultControllerClearAlgorithms(this);
- return result;
- }
-
- [rs.pullSteps_](
- forAuthorCode: boolean
- ): Promise<IteratorResult<OutputType, any>> {
- const stream = this[rs.controlledReadableStream_];
- if (this[q.queue_].length > 0) {
- const chunk = q.dequeueValue(this);
- if (this[rs.closeRequested_] && this[q.queue_].length === 0) {
- rs.readableStreamDefaultControllerClearAlgorithms(this);
- rs.readableStreamClose(stream);
- } else {
- rs.readableStreamDefaultControllerCallPullIfNeeded(this);
- }
- return Promise.resolve(
- rs.readableStreamCreateReadResult(chunk, false, forAuthorCode)
- );
- }
-
- const pendingPromise = rs.readableStreamAddReadRequest(
- stream,
- forAuthorCode
- );
- rs.readableStreamDefaultControllerCallPullIfNeeded(this);
- return pendingPromise;
- }
-}
-
-export function setUpReadableStreamDefaultControllerFromUnderlyingSource<
- OutputType
->(
- stream: rs.SDReadableStream<OutputType>,
- underlyingSource: UnderlyingSource<OutputType>,
- highWaterMark: number,
- sizeAlgorithm: QueuingStrategySizeCallback<OutputType>
-): void {
- // Assert: underlyingSource is not undefined.
- const controller = Object.create(ReadableStreamDefaultController.prototype);
- const startAlgorithm = (): any => {
- return shared.invokeOrNoop(underlyingSource, "start", [controller]);
- };
- const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
- underlyingSource,
- "pull",
- [controller]
- );
- const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
- underlyingSource,
- "cancel",
- []
- );
- rs.setUpReadableStreamDefaultController(
- stream,
- controller,
- startAlgorithm,
- pullAlgorithm,
- cancelAlgorithm,
- highWaterMark,
- sizeAlgorithm
- );
-}
diff --git a/cli/js/web/streams/readable-stream-default-reader.ts b/cli/js/web/streams/readable-stream-default-reader.ts
deleted file mode 100644
index 3fbf22c8c..000000000
--- a/cli/js/web/streams/readable-stream-default-reader.ts
+++ /dev/null
@@ -1,68 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-import * as rs from "./readable-internals.ts";
-import * as shared from "./shared-internals.ts";
-
-export class ReadableStreamDefaultReader<OutputType>
- implements rs.SDReadableStreamReader<OutputType> {
- [rs.closedPromise_]: shared.ControlledPromise<void>;
- [rs.ownerReadableStream_]: rs.SDReadableStream<OutputType> | undefined;
- [rs.readRequests_]: Array<rs.ReadRequest<IteratorResult<OutputType>>>;
-
- constructor(stream: rs.SDReadableStream<OutputType>) {
- if (!rs.isReadableStream(stream)) {
- throw new TypeError();
- }
- if (rs.isReadableStreamLocked(stream)) {
- throw new TypeError("The stream is locked.");
- }
- rs.readableStreamReaderGenericInitialize(this, stream);
- this[rs.readRequests_] = [];
- }
-
- get closed(): Promise<void> {
- if (!rs.isReadableStreamDefaultReader(this)) {
- return Promise.reject(new TypeError());
- }
- return this[rs.closedPromise_].promise;
- }
-
- cancel(reason: shared.ErrorResult): Promise<void> {
- if (!rs.isReadableStreamDefaultReader(this)) {
- return Promise.reject(new TypeError());
- }
- const stream = this[rs.ownerReadableStream_];
- if (stream === undefined) {
- return Promise.reject(
- new TypeError("Reader is not associated with a stream")
- );
- }
- return rs.readableStreamCancel(stream, reason);
- }
-
- read(): Promise<IteratorResult<OutputType | undefined>> {
- if (!rs.isReadableStreamDefaultReader(this)) {
- return Promise.reject(new TypeError());
- }
- if (this[rs.ownerReadableStream_] === undefined) {
- return Promise.reject(
- new TypeError("Reader is not associated with a stream")
- );
- }
- return rs.readableStreamDefaultReaderRead(this, true);
- }
-
- releaseLock(): void {
- if (!rs.isReadableStreamDefaultReader(this)) {
- throw new TypeError();
- }
- if (this[rs.ownerReadableStream_] === undefined) {
- return;
- }
- if (this[rs.readRequests_].length !== 0) {
- throw new TypeError("Cannot release a stream with pending read requests");
- }
- rs.readableStreamReaderGenericRelease(this);
- }
-}
diff --git a/cli/js/web/streams/readable-stream.ts b/cli/js/web/streams/readable-stream.ts
deleted file mode 100644
index a003f0a17..000000000
--- a/cli/js/web/streams/readable-stream.ts
+++ /dev/null
@@ -1,386 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-/* eslint prefer-const: "off" */
-// TODO remove this, surpressed because of
-// 284:7 error 'branch1' is never reassigned. Use 'const' instead prefer-const
-
-import * as rs from "./readable-internals.ts";
-import * as shared from "./shared-internals.ts";
-import {
- QueuingStrategy,
- QueuingStrategySizeCallback,
- UnderlyingSource,
- UnderlyingByteSource,
-} from "../dom_types.d.ts";
-
-import {
- ReadableStreamDefaultController,
- setUpReadableStreamDefaultControllerFromUnderlyingSource,
-} from "./readable-stream-default-controller.ts";
-import { ReadableStreamDefaultReader } from "./readable-stream-default-reader.ts";
-
-import {
- ReadableByteStreamController,
- setUpReadableByteStreamControllerFromUnderlyingSource,
-} from "./readable-byte-stream-controller.ts";
-import { SDReadableStreamBYOBReader } from "./readable-stream-byob-reader.ts";
-
-export class SDReadableStream<OutputType>
- implements rs.SDReadableStream<OutputType> {
- [shared.state_]: rs.ReadableStreamState;
- [shared.storedError_]: shared.ErrorResult;
- [rs.reader_]: rs.SDReadableStreamReader<OutputType> | undefined;
- [rs.readableStreamController_]: rs.SDReadableStreamControllerBase<OutputType>;
-
- constructor(
- underlyingSource: UnderlyingByteSource,
- strategy?: { highWaterMark?: number; size?: undefined }
- );
- constructor(
- underlyingSource?: UnderlyingSource<OutputType>,
- strategy?: QueuingStrategy<OutputType>
- );
- constructor(
- underlyingSource: UnderlyingSource<OutputType> | UnderlyingByteSource = {},
- strategy:
- | QueuingStrategy<OutputType>
- | { highWaterMark?: number; size?: undefined } = {}
- ) {
- rs.initializeReadableStream(this);
-
- const sizeFunc = strategy.size;
- const stratHWM = strategy.highWaterMark;
- const sourceType = underlyingSource.type;
-
- if (sourceType === undefined) {
- const sizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(sizeFunc);
- const highWaterMark = shared.validateAndNormalizeHighWaterMark(
- stratHWM === undefined ? 1 : stratHWM
- );
- setUpReadableStreamDefaultControllerFromUnderlyingSource(
- this,
- underlyingSource as UnderlyingSource<OutputType>,
- highWaterMark,
- sizeAlgorithm
- );
- } else if (String(sourceType) === "bytes") {
- if (sizeFunc !== undefined) {
- throw new RangeError(
- "bytes streams cannot have a strategy with a `size` field"
- );
- }
- const highWaterMark = shared.validateAndNormalizeHighWaterMark(
- stratHWM === undefined ? 0 : stratHWM
- );
- setUpReadableByteStreamControllerFromUnderlyingSource(
- (this as unknown) as rs.SDReadableStream<ArrayBufferView>,
- underlyingSource as UnderlyingByteSource,
- highWaterMark
- );
- } else {
- throw new RangeError(
- "The underlying source's `type` field must be undefined or 'bytes'"
- );
- }
- }
-
- get locked(): boolean {
- return rs.isReadableStreamLocked(this);
- }
-
- getReader(): rs.SDReadableStreamDefaultReader<OutputType>;
- getReader(options: { mode?: "byob" }): rs.SDReadableStreamBYOBReader;
- getReader(options?: {
- mode?: "byob";
- }):
- | rs.SDReadableStreamDefaultReader<OutputType>
- | rs.SDReadableStreamBYOBReader {
- if (!rs.isReadableStream(this)) {
- throw new TypeError();
- }
- if (options === undefined) {
- options = {};
- }
- const { mode } = options;
- if (mode === undefined) {
- return new ReadableStreamDefaultReader(this);
- } else if (String(mode) === "byob") {
- return new SDReadableStreamBYOBReader(
- (this as unknown) as rs.SDReadableStream<ArrayBufferView>
- );
- }
- throw RangeError("mode option must be undefined or `byob`");
- }
-
- cancel(reason: shared.ErrorResult): Promise<void> {
- if (!rs.isReadableStream(this)) {
- return Promise.reject(new TypeError());
- }
- if (rs.isReadableStreamLocked(this)) {
- return Promise.reject(new TypeError("Cannot cancel a locked stream"));
- }
- return rs.readableStreamCancel(this, reason);
- }
-
- tee(): [SDReadableStream<OutputType>, SDReadableStream<OutputType>] {
- return readableStreamTee(this, false);
- }
-
- /* TODO reenable these methods when we bring in writableStreams and transport types
- pipeThrough<ResultType>(
- transform: rs.GenericTransformStream<OutputType, ResultType>,
- options: PipeOptions = {}
- ): rs.SDReadableStream<ResultType> {
- const { readable, writable } = transform;
- if (!rs.isReadableStream(this)) {
- throw new TypeError();
- }
- if (!ws.isWritableStream(writable)) {
- throw new TypeError("writable must be a WritableStream");
- }
- if (!rs.isReadableStream(readable)) {
- throw new TypeError("readable must be a ReadableStream");
- }
- if (options.signal !== undefined && !shared.isAbortSignal(options.signal)) {
- throw new TypeError("options.signal must be an AbortSignal instance");
- }
- if (rs.isReadableStreamLocked(this)) {
- throw new TypeError("Cannot pipeThrough on a locked stream");
- }
- if (ws.isWritableStreamLocked(writable)) {
- throw new TypeError("Cannot pipeThrough to a locked stream");
- }
-
- const pipeResult = pipeTo(this, writable, options);
- pipeResult.catch(() => {});
-
- return readable;
- }
-
- pipeTo(
- dest: ws.WritableStream<OutputType>,
- options: PipeOptions = {}
- ): Promise<void> {
- if (!rs.isReadableStream(this)) {
- return Promise.reject(new TypeError());
- }
- if (!ws.isWritableStream(dest)) {
- return Promise.reject(
- new TypeError("destination must be a WritableStream")
- );
- }
- if (options.signal !== undefined && !shared.isAbortSignal(options.signal)) {
- return Promise.reject(
- new TypeError("options.signal must be an AbortSignal instance")
- );
- }
- if (rs.isReadableStreamLocked(this)) {
- return Promise.reject(new TypeError("Cannot pipe from a locked stream"));
- }
- if (ws.isWritableStreamLocked(dest)) {
- return Promise.reject(new TypeError("Cannot pipe to a locked stream"));
- }
-
- return pipeTo(this, dest, options);
- }
- */
-}
-
-export function createReadableStream<OutputType>(
- startAlgorithm: rs.StartAlgorithm,
- pullAlgorithm: rs.PullAlgorithm<OutputType>,
- cancelAlgorithm: rs.CancelAlgorithm,
- highWaterMark?: number,
- sizeAlgorithm?: QueuingStrategySizeCallback<OutputType>
-): SDReadableStream<OutputType> {
- if (highWaterMark === undefined) {
- highWaterMark = 1;
- }
- if (sizeAlgorithm === undefined) {
- sizeAlgorithm = (): number => 1;
- }
- // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
-
- const stream = Object.create(SDReadableStream.prototype) as SDReadableStream<
- OutputType
- >;
- rs.initializeReadableStream(stream);
- const controller = Object.create(
- ReadableStreamDefaultController.prototype
- ) as ReadableStreamDefaultController<OutputType>;
- rs.setUpReadableStreamDefaultController(
- stream,
- controller,
- startAlgorithm,
- pullAlgorithm,
- cancelAlgorithm,
- highWaterMark,
- sizeAlgorithm
- );
- return stream;
-}
-
-export function createReadableByteStream<OutputType>(
- startAlgorithm: rs.StartAlgorithm,
- pullAlgorithm: rs.PullAlgorithm<OutputType>,
- cancelAlgorithm: rs.CancelAlgorithm,
- highWaterMark?: number,
- autoAllocateChunkSize?: number
-): SDReadableStream<OutputType> {
- if (highWaterMark === undefined) {
- highWaterMark = 0;
- }
- // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
- if (autoAllocateChunkSize !== undefined) {
- if (
- !shared.isInteger(autoAllocateChunkSize) ||
- autoAllocateChunkSize <= 0
- ) {
- throw new RangeError(
- "autoAllocateChunkSize must be a positive, finite integer"
- );
- }
- }
-
- const stream = Object.create(SDReadableStream.prototype) as SDReadableStream<
- OutputType
- >;
- rs.initializeReadableStream(stream);
- const controller = Object.create(
- ReadableByteStreamController.prototype
- ) as ReadableByteStreamController;
- rs.setUpReadableByteStreamController(
- (stream as unknown) as SDReadableStream<ArrayBufferView>,
- controller,
- startAlgorithm,
- (pullAlgorithm as unknown) as rs.PullAlgorithm<ArrayBufferView>,
- cancelAlgorithm,
- highWaterMark,
- autoAllocateChunkSize
- );
- return stream;
-}
-
-export function readableStreamTee<OutputType>(
- stream: SDReadableStream<OutputType>,
- cloneForBranch2: boolean
-): [SDReadableStream<OutputType>, SDReadableStream<OutputType>] {
- if (!rs.isReadableStream(stream)) {
- throw new TypeError();
- }
-
- const reader = new ReadableStreamDefaultReader(stream);
- let closedOrErrored = false;
- let canceled1 = false;
- let canceled2 = false;
- let reason1: shared.ErrorResult;
- let reason2: shared.ErrorResult;
- let branch1: SDReadableStream<OutputType>;
- let branch2: SDReadableStream<OutputType>;
-
- let cancelResolve: (reason: shared.ErrorResult) => void;
- const cancelPromise = new Promise<void>(
- (resolve) => (cancelResolve = resolve)
- );
-
- const pullAlgorithm = (): Promise<void> => {
- return rs
- .readableStreamDefaultReaderRead(reader)
- .then(({ value, done }) => {
- if (done && !closedOrErrored) {
- if (!canceled1) {
- rs.readableStreamDefaultControllerClose(
- branch1![
- rs.readableStreamController_
- ] as ReadableStreamDefaultController<OutputType>
- );
- }
- if (!canceled2) {
- rs.readableStreamDefaultControllerClose(
- branch2![
- rs.readableStreamController_
- ] as ReadableStreamDefaultController<OutputType>
- );
- }
- closedOrErrored = true;
- }
- if (closedOrErrored) {
- return;
- }
- const value1 = value;
- let value2 = value;
- if (!canceled1) {
- rs.readableStreamDefaultControllerEnqueue(
- branch1![
- rs.readableStreamController_
- ] as ReadableStreamDefaultController<OutputType>,
- value1!
- );
- }
- if (!canceled2) {
- if (cloneForBranch2) {
- value2 = shared.cloneValue(value2);
- }
- rs.readableStreamDefaultControllerEnqueue(
- branch2![
- rs.readableStreamController_
- ] as ReadableStreamDefaultController<OutputType>,
- value2!
- );
- }
- });
- };
-
- const cancel1Algorithm = (reason: shared.ErrorResult): Promise<void> => {
- canceled1 = true;
- reason1 = reason;
- if (canceled2) {
- const cancelResult = rs.readableStreamCancel(stream, [reason1, reason2]);
- cancelResolve(cancelResult);
- }
- return cancelPromise;
- };
-
- const cancel2Algorithm = (reason: shared.ErrorResult): Promise<void> => {
- canceled2 = true;
- reason2 = reason;
- if (canceled1) {
- const cancelResult = rs.readableStreamCancel(stream, [reason1, reason2]);
- cancelResolve(cancelResult);
- }
- return cancelPromise;
- };
-
- const startAlgorithm = (): undefined => undefined;
- branch1 = createReadableStream(
- startAlgorithm,
- pullAlgorithm,
- cancel1Algorithm
- );
- branch2 = createReadableStream(
- startAlgorithm,
- pullAlgorithm,
- cancel2Algorithm
- );
-
- reader[rs.closedPromise_].promise.catch((error) => {
- if (!closedOrErrored) {
- rs.readableStreamDefaultControllerError(
- branch1![
- rs.readableStreamController_
- ] as ReadableStreamDefaultController<OutputType>,
- error
- );
- rs.readableStreamDefaultControllerError(
- branch2![
- rs.readableStreamController_
- ] as ReadableStreamDefaultController<OutputType>,
- error
- );
- closedOrErrored = true;
- }
- });
-
- return [branch1, branch2];
-}
diff --git a/cli/js/web/streams/readable_byte_stream_controller.ts b/cli/js/web/streams/readable_byte_stream_controller.ts
new file mode 100644
index 000000000..615eff968
--- /dev/null
+++ b/cli/js/web/streams/readable_byte_stream_controller.ts
@@ -0,0 +1,143 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ BufferQueueItem,
+ CancelAlgorithm,
+ isDetachedBuffer,
+ isReadableByteStreamController,
+ PullAlgorithm,
+ resetQueue,
+ readableByteStreamControllerCallPullIfNeeded,
+ readableByteStreamControllerClearAlgorithms,
+ readableByteStreamControllerClose,
+ readableByteStreamControllerEnqueue,
+ readableByteStreamControllerError,
+ readableByteStreamControllerGetDesiredSize,
+ readableByteStreamControllerHandleQueueDrain,
+ readableStreamAddReadRequest,
+ readableStreamHasDefaultReader,
+ readableStreamGetNumReadRequests,
+ readableStreamCreateReadResult,
+} from "./internals.ts";
+import { ReadableStreamImpl } from "./readable_stream.ts";
+import * as sym from "./symbols.ts";
+import { assert } from "../../util.ts";
+import { symbols } from "../../symbols.ts";
+
+export class ReadableByteStreamControllerImpl
+ implements ReadableByteStreamController {
+ [sym.autoAllocateChunkSize]: number | undefined;
+ [sym.byobRequest]: undefined;
+ [sym.cancelAlgorithm]: CancelAlgorithm;
+ [sym.closeRequested]: boolean;
+ [sym.controlledReadableByteStream]: ReadableStreamImpl<Uint8Array>;
+ [sym.pullAgain]: boolean;
+ [sym.pullAlgorithm]: PullAlgorithm;
+ [sym.pulling]: boolean;
+ [sym.queue]: BufferQueueItem[];
+ [sym.queueTotalSize]: number;
+ [sym.started]: boolean;
+ [sym.strategyHWM]: number;
+
+ private constructor() {
+ throw new TypeError(
+ "ReadableByteStreamController's constructor cannot be called."
+ );
+ }
+
+ get byobRequest(): undefined {
+ return undefined;
+ }
+
+ get desiredSize(): number | null {
+ if (!isReadableByteStreamController(this)) {
+ throw new TypeError("Invalid ReadableByteStreamController.");
+ }
+ return readableByteStreamControllerGetDesiredSize(this);
+ }
+
+ close(): void {
+ if (!isReadableByteStreamController(this)) {
+ throw new TypeError("Invalid ReadableByteStreamController.");
+ }
+ if (this[sym.closeRequested]) {
+ throw new TypeError("Closed already requested.");
+ }
+ if (this[sym.controlledReadableByteStream][sym.state] !== "readable") {
+ throw new TypeError(
+ "ReadableByteStreamController's stream is not in a readable state."
+ );
+ }
+ readableByteStreamControllerClose(this);
+ }
+
+ enqueue(chunk: ArrayBufferView): void {
+ if (!isReadableByteStreamController(this)) {
+ throw new TypeError("Invalid ReadableByteStreamController.");
+ }
+ if (this[sym.closeRequested]) {
+ throw new TypeError("Closed already requested.");
+ }
+ if (this[sym.controlledReadableByteStream][sym.state] !== "readable") {
+ throw new TypeError(
+ "ReadableByteStreamController's stream is not in a readable state."
+ );
+ }
+ if (!ArrayBuffer.isView(chunk)) {
+ throw new TypeError(
+ "You can only enqueue array buffer views when using a ReadableByteStreamController"
+ );
+ }
+ if (isDetachedBuffer(chunk.buffer)) {
+ throw new TypeError("Cannot enqueue a view onto a detached ArrayBuffer");
+ }
+ readableByteStreamControllerEnqueue(this, chunk);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ error(error?: any): void {
+ if (!isReadableByteStreamController(this)) {
+ throw new TypeError("Invalid ReadableByteStreamController.");
+ }
+ readableByteStreamControllerError(this, error);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ [sym.cancelSteps](reason: any): PromiseLike<void> {
+ // 3.11.5.1.1 If this.[[pendingPullIntos]] is not empty,
+ resetQueue(this);
+ const result = this[sym.cancelAlgorithm](reason);
+ readableByteStreamControllerClearAlgorithms(this);
+ return result;
+ }
+
+ [sym.pullSteps](): Promise<ReadableStreamReadResult<Uint8Array>> {
+ const stream = this[sym.controlledReadableByteStream];
+ assert(readableStreamHasDefaultReader(stream));
+ if (this[sym.queueTotalSize] > 0) {
+ assert(readableStreamGetNumReadRequests(stream) === 0);
+ const entry = this[sym.queue].shift();
+ assert(entry);
+ this[sym.queueTotalSize] -= entry.size;
+ readableByteStreamControllerHandleQueueDrain(this);
+ const view = new Uint8Array(entry.value, entry.offset, entry.size);
+ return Promise.resolve(
+ readableStreamCreateReadResult(
+ view,
+ false,
+ stream[sym.reader]![sym.forAuthorCode]
+ )
+ );
+ }
+ // 3.11.5.2.5 If autoAllocateChunkSize is not undefined,
+ const promise = readableStreamAddReadRequest(stream);
+ readableByteStreamControllerCallPullIfNeeded(this);
+ return promise;
+ }
+
+ [symbols.customInspect](): string {
+ return `ReadableByteStreamController { byobRequest: ${String(
+ this.byobRequest
+ )}, desiredSize: ${String(this.desiredSize)} }`;
+ }
+}
diff --git a/cli/js/web/streams/readable_stream.ts b/cli/js/web/streams/readable_stream.ts
new file mode 100644
index 000000000..f606319b1
--- /dev/null
+++ b/cli/js/web/streams/readable_stream.ts
@@ -0,0 +1,216 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ acquireReadableStreamDefaultReader,
+ initializeReadableStream,
+ isReadableStream,
+ isReadableStreamLocked,
+ isUnderlyingByteSource,
+ makeSizeAlgorithmFromSizeFunction,
+ readableStreamCancel,
+ ReadableStreamGenericReader,
+ readableStreamTee,
+ setUpReadableByteStreamControllerFromUnderlyingSource,
+ setUpReadableStreamDefaultControllerFromUnderlyingSource,
+ validateAndNormalizeHighWaterMark,
+} from "./internals.ts";
+import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_controller.ts";
+import { ReadableStreamAsyncIteratorPrototype } from "./readable_stream_async_iterator.ts";
+import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
+import * as sym from "./symbols.ts";
+import { symbols } from "../../symbols.ts";
+import { notImplemented } from "../../util.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
+ [sym.disturbed]: boolean;
+ [sym.readableStreamController]:
+ | ReadableStreamDefaultControllerImpl<R>
+ | ReadableByteStreamControllerImpl;
+ [sym.reader]: ReadableStreamGenericReader<R> | undefined;
+ [sym.state]: "readable" | "closed" | "errored";
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ [sym.storedError]: any;
+
+ constructor(
+ underlyingSource: UnderlyingByteSource | UnderlyingSource<R> = {},
+ strategy:
+ | {
+ highWaterMark?: number;
+ size?: undefined;
+ }
+ | QueuingStrategy<R> = {}
+ ) {
+ initializeReadableStream(this);
+ const { size } = strategy;
+ let { highWaterMark } = strategy;
+ const { type } = underlyingSource;
+
+ if (isUnderlyingByteSource(underlyingSource)) {
+ if (size !== undefined) {
+ throw new RangeError(
+ `When underlying source is "bytes", strategy.size must be undefined.`
+ );
+ }
+ highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 0);
+ setUpReadableByteStreamControllerFromUnderlyingSource(
+ this,
+ underlyingSource,
+ highWaterMark
+ );
+ } else if (type === undefined) {
+ const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size);
+ highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 1);
+ setUpReadableStreamDefaultControllerFromUnderlyingSource(
+ this,
+ underlyingSource,
+ highWaterMark,
+ sizeAlgorithm
+ );
+ } else {
+ throw new RangeError(
+ `Valid values for underlyingSource are "bytes" or undefined. Received: "${type}".`
+ );
+ }
+ }
+
+ get locked(): boolean {
+ if (!isReadableStream(this)) {
+ throw new TypeError("Invalid ReadableStream.");
+ }
+ return isReadableStreamLocked(this);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ cancel(reason?: any): Promise<void> {
+ if (!isReadableStream(this)) {
+ return Promise.reject(new TypeError("Invalid ReadableStream."));
+ }
+ if (isReadableStreamLocked(this)) {
+ return Promise.reject(
+ new TypeError("Cannot cancel a locked ReadableStream.")
+ );
+ }
+ return readableStreamCancel(this, reason);
+ }
+
+ getIterator({
+ preventCancel,
+ }: { preventCancel?: boolean } = {}): AsyncIterableIterator<R> {
+ if (!isReadableStream(this)) {
+ throw new TypeError("Invalid ReadableStream.");
+ }
+ const reader = acquireReadableStreamDefaultReader(this);
+ const iterator = Object.create(ReadableStreamAsyncIteratorPrototype);
+ iterator[sym.asyncIteratorReader] = reader;
+ iterator[sym.preventCancel] = Boolean(preventCancel);
+ return iterator;
+ }
+
+ getReader({ mode }: { mode?: string } = {}): ReadableStreamDefaultReader<R> {
+ if (!isReadableStream(this)) {
+ throw new TypeError("Invalid ReadableStream.");
+ }
+ if (mode === undefined) {
+ return acquireReadableStreamDefaultReader(this, true);
+ }
+ mode = String(mode);
+ // 3.2.5.4.4 If mode is "byob", return ? AcquireReadableStreamBYOBReader(this, true).
+ throw new RangeError(`Unsupported mode "${mode}"`);
+ }
+
+ pipeThrough<T>(): // {
+ // writable,
+ // readable,
+ // }: {
+ // writable: WritableStream<R>;
+ // readable: ReadableStream<T>;
+ // },
+ // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {},
+ ReadableStream<T> {
+ return notImplemented();
+ // if (!isReadableStream(this)) {
+ // throw new TypeError("Invalid ReadableStream.");
+ // }
+ // if (!isWritableStream(writable)) {
+ // throw new TypeError("writable is not a valid WritableStream.");
+ // }
+ // if (!isReadableStream(readable)) {
+ // throw new TypeError("readable is not a valid ReadableStream.");
+ // }
+ // preventClose = Boolean(preventClose);
+ // preventAbort = Boolean(preventAbort);
+ // preventCancel = Boolean(preventCancel);
+ // if (signal && !(signal instanceof AbortSignalImpl)) {
+ // throw new TypeError("Invalid signal.");
+ // }
+ // if (isReadableStreamLocked(this)) {
+ // throw new TypeError("ReadableStream is locked.");
+ // }
+ // if (isWritableStreamLocked(writable)) {
+ // throw new TypeError("writable is locked.");
+ // }
+ // readableStreamPipeTo(
+ // this,
+ // writable,
+ // preventClose,
+ // preventAbort,
+ // preventCancel,
+ // signal,
+ // );
+ // return readable;
+ }
+
+ pipeTo(): // dest: WritableStream<R>,
+ // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {},
+ Promise<void> {
+ return notImplemented();
+ // if (!isReadableStream(this)) {
+ // return Promise.reject(new TypeError("Invalid ReadableStream."));
+ // }
+ // if (!isWritableStream(dest)) {
+ // return Promise.reject(
+ // new TypeError("dest is not a valid WritableStream."),
+ // );
+ // }
+ // preventClose = Boolean(preventClose);
+ // preventAbort = Boolean(preventAbort);
+ // preventCancel = Boolean(preventCancel);
+ // if (signal && !(signal instanceof AbortSignalImpl)) {
+ // return Promise.reject(new TypeError("Invalid signal."));
+ // }
+ // if (isReadableStreamLocked(this)) {
+ // return Promise.reject(new TypeError("ReadableStream is locked."));
+ // }
+ // if (isWritableStreamLocked(this)) {
+ // return Promise.reject(new TypeError("dest is locked."));
+ // }
+ // return readableStreamPipeTo(
+ // this,
+ // dest,
+ // preventClose,
+ // preventAbort,
+ // preventCancel,
+ // signal,
+ // );
+ }
+
+ tee(): [ReadableStreamImpl<R>, ReadableStreamImpl<R>] {
+ if (!isReadableStream(this)) {
+ throw new TypeError("Invalid ReadableStream.");
+ }
+ return readableStreamTee(this, false);
+ }
+
+ [symbols.customInspect](): string {
+ return `ReadableStream { locked: ${String(this.locked)} }`;
+ }
+
+ [Symbol.asyncIterator](
+ options: {
+ preventCancel?: boolean;
+ } = {}
+ ): AsyncIterableIterator<R> {
+ return this.getIterator(options);
+ }
+}
diff --git a/cli/js/web/streams/readable_stream_async_iterator.ts b/cli/js/web/streams/readable_stream_async_iterator.ts
new file mode 100644
index 000000000..cd656e73d
--- /dev/null
+++ b/cli/js/web/streams/readable_stream_async_iterator.ts
@@ -0,0 +1,81 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import * as sym from "./symbols.ts";
+import {
+ isReadableStreamAsyncIterator,
+ ReadableStreamAsyncIterator,
+ readableStreamCreateReadResult,
+ readableStreamReaderGenericCancel,
+ readableStreamReaderGenericRelease,
+ readableStreamDefaultReaderRead,
+} from "./internals.ts";
+import { assert } from "../../util.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+const AsyncIteratorPrototype: AsyncIterableIterator<any> = Object.getPrototypeOf(
+ Object.getPrototypeOf(async function* () {}).prototype
+);
+
+export const ReadableStreamAsyncIteratorPrototype: ReadableStreamAsyncIterator = Object.setPrototypeOf(
+ {
+ next(
+ this: ReadableStreamAsyncIterator
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ ): Promise<ReadableStreamReadResult<any>> {
+ if (!isReadableStreamAsyncIterator(this)) {
+ return Promise.reject(
+ new TypeError("invalid ReadableStreamAsyncIterator.")
+ );
+ }
+ const reader = this[sym.asyncIteratorReader];
+ if (!reader[sym.ownerReadableStream]) {
+ return Promise.reject(
+ new TypeError("reader owner ReadableStream is undefined.")
+ );
+ }
+ return readableStreamDefaultReaderRead(reader).then((result) => {
+ assert(typeof result === "object");
+ const { done } = result;
+ assert(typeof done === "boolean");
+ if (done) {
+ readableStreamReaderGenericRelease(reader);
+ }
+ const { value } = result;
+ return readableStreamCreateReadResult(value, done, true);
+ });
+ },
+ return(
+ this: ReadableStreamAsyncIterator,
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ value?: any | PromiseLike<any>
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ ): Promise<ReadableStreamReadResult<any>> {
+ if (!isReadableStreamAsyncIterator(this)) {
+ return Promise.reject(
+ new TypeError("invalid ReadableStreamAsyncIterator.")
+ );
+ }
+ const reader = this[sym.asyncIteratorReader];
+ if (!reader[sym.ownerReadableStream]) {
+ return Promise.reject(
+ new TypeError("reader owner ReadableStream is undefined.")
+ );
+ }
+ if (reader[sym.readRequests].length) {
+ return Promise.reject(
+ new TypeError("reader has outstanding read requests.")
+ );
+ }
+ if (!this[sym.preventCancel]) {
+ const result = readableStreamReaderGenericCancel(reader, value);
+ readableStreamReaderGenericRelease(reader);
+ return result.then(() =>
+ readableStreamCreateReadResult(value, true, true)
+ );
+ }
+ readableStreamReaderGenericRelease(reader);
+ return Promise.resolve(readableStreamCreateReadResult(value, true, true));
+ },
+ },
+ AsyncIteratorPrototype
+);
diff --git a/cli/js/web/streams/readable_stream_default_controller.ts b/cli/js/web/streams/readable_stream_default_controller.ts
new file mode 100644
index 000000000..866c6d79e
--- /dev/null
+++ b/cli/js/web/streams/readable_stream_default_controller.ts
@@ -0,0 +1,120 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ CancelAlgorithm,
+ dequeueValue,
+ isReadableStreamDefaultController,
+ Pair,
+ PullAlgorithm,
+ readableStreamAddReadRequest,
+ readableStreamClose,
+ readableStreamCreateReadResult,
+ readableStreamDefaultControllerCallPullIfNeeded,
+ readableStreamDefaultControllerCanCloseOrEnqueue,
+ readableStreamDefaultControllerClearAlgorithms,
+ readableStreamDefaultControllerClose,
+ readableStreamDefaultControllerEnqueue,
+ readableStreamDefaultControllerError,
+ readableStreamDefaultControllerGetDesiredSize,
+ resetQueue,
+ SizeAlgorithm,
+} from "./internals.ts";
+import { ReadableStreamImpl } from "./readable_stream.ts";
+import * as sym from "./symbols.ts";
+import { symbols } from "../../symbols.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class ReadableStreamDefaultControllerImpl<R = any>
+ implements ReadableStreamDefaultController<R> {
+ [sym.cancelAlgorithm]: CancelAlgorithm;
+ [sym.closeRequested]: boolean;
+ [sym.controlledReadableStream]: ReadableStreamImpl<R>;
+ [sym.pullAgain]: boolean;
+ [sym.pullAlgorithm]: PullAlgorithm;
+ [sym.pulling]: boolean;
+ [sym.queue]: Array<Pair<R>>;
+ [sym.queueTotalSize]: number;
+ [sym.started]: boolean;
+ [sym.strategyHWM]: number;
+ [sym.strategySizeAlgorithm]: SizeAlgorithm<R>;
+
+ private constructor() {
+ throw new TypeError(
+ "ReadableStreamDefaultController's constructor cannot be called."
+ );
+ }
+
+ get desiredSize(): number | null {
+ if (!isReadableStreamDefaultController(this)) {
+ throw new TypeError("Invalid ReadableStreamDefaultController.");
+ }
+ return readableStreamDefaultControllerGetDesiredSize(this);
+ }
+
+ close(): void {
+ if (!isReadableStreamDefaultController(this)) {
+ throw new TypeError("Invalid ReadableStreamDefaultController.");
+ }
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
+ throw new TypeError(
+ "ReadableStreamDefaultController cannot close or enqueue."
+ );
+ }
+ readableStreamDefaultControllerClose(this);
+ }
+
+ enqueue(chunk: R): void {
+ if (!isReadableStreamDefaultController(this)) {
+ throw new TypeError("Invalid ReadableStreamDefaultController.");
+ }
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
+ throw new TypeError("ReadableSteamController cannot enqueue.");
+ }
+ return readableStreamDefaultControllerEnqueue(this, chunk);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ error(error?: any): void {
+ if (!isReadableStreamDefaultController(this)) {
+ throw new TypeError("Invalid ReadableStreamDefaultController.");
+ }
+ readableStreamDefaultControllerError(this, error);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ [sym.cancelSteps](reason?: any): PromiseLike<void> {
+ resetQueue(this);
+ const result = this[sym.cancelAlgorithm](reason);
+ readableStreamDefaultControllerClearAlgorithms(this);
+ return result;
+ }
+
+ [sym.pullSteps](): Promise<ReadableStreamReadResult<R>> {
+ const stream = this[sym.controlledReadableStream];
+ if (this[sym.queue].length) {
+ const chunk = dequeueValue<R>(this);
+ if (this[sym.closeRequested] && this[sym.queue].length === 0) {
+ readableStreamDefaultControllerClearAlgorithms(this);
+ readableStreamClose(stream);
+ } else {
+ readableStreamDefaultControllerCallPullIfNeeded(this);
+ }
+ return Promise.resolve(
+ readableStreamCreateReadResult(
+ chunk,
+ false,
+ stream[sym.reader]![sym.forAuthorCode]
+ )
+ );
+ }
+ const pendingPromise = readableStreamAddReadRequest(stream);
+ readableStreamDefaultControllerCallPullIfNeeded(this);
+ return pendingPromise;
+ }
+
+ [symbols.customInspect](): string {
+ return `ReadableStreamDefaultController { desiredSize: ${String(
+ this.desiredSize
+ )} }`;
+ }
+}
diff --git a/cli/js/web/streams/readable_stream_default_reader.ts b/cli/js/web/streams/readable_stream_default_reader.ts
new file mode 100644
index 000000000..9bdce3e9c
--- /dev/null
+++ b/cli/js/web/streams/readable_stream_default_reader.ts
@@ -0,0 +1,89 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ Deferred,
+ isReadableStream,
+ isReadableStreamDefaultReader,
+ isReadableStreamLocked,
+ readableStreamDefaultReaderRead,
+ readableStreamReaderGenericCancel,
+ readableStreamReaderGenericInitialize,
+ readableStreamReaderGenericRelease,
+} from "./internals.ts";
+import { ReadableStreamImpl } from "./readable_stream.ts";
+import * as sym from "./symbols.ts";
+import { symbols } from "../../symbols.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class ReadableStreamDefaultReaderImpl<R = any>
+ implements ReadableStreamDefaultReader<R> {
+ [sym.closedPromise]: Deferred<void>;
+ [sym.forAuthorCode]: boolean;
+ [sym.ownerReadableStream]: ReadableStreamImpl<R>;
+ [sym.readRequests]: Array<Deferred<ReadableStreamReadResult<R>>>;
+
+ constructor(stream: ReadableStream<R>) {
+ if (!isReadableStream(stream)) {
+ throw new TypeError("stream is not a ReadableStream.");
+ }
+ if (isReadableStreamLocked(stream)) {
+ throw new TypeError("stream is locked.");
+ }
+ readableStreamReaderGenericInitialize(this, stream);
+ this[sym.readRequests] = [];
+ }
+
+ get closed(): Promise<void> {
+ if (!isReadableStreamDefaultReader(this)) {
+ return Promise.reject(
+ new TypeError("Invalid ReadableStreamDefaultReader.")
+ );
+ }
+ return (
+ this[sym.closedPromise].promise ??
+ Promise.reject(new TypeError("Invalid reader."))
+ );
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ cancel(reason?: any): Promise<void> {
+ if (!isReadableStreamDefaultReader(this)) {
+ return Promise.reject(
+ new TypeError("Invalid ReadableStreamDefaultReader.")
+ );
+ }
+ if (!this[sym.ownerReadableStream]) {
+ return Promise.reject(new TypeError("Invalid reader."));
+ }
+ return readableStreamReaderGenericCancel(this, reason);
+ }
+
+ read(): Promise<ReadableStreamReadResult<R>> {
+ if (!isReadableStreamDefaultReader(this)) {
+ return Promise.reject(
+ new TypeError("Invalid ReadableStreamDefaultReader.")
+ );
+ }
+ if (!this[sym.ownerReadableStream]) {
+ return Promise.reject(new TypeError("Invalid reader."));
+ }
+ return readableStreamDefaultReaderRead(this);
+ }
+
+ releaseLock(): void {
+ if (!isReadableStreamDefaultReader(this)) {
+ throw new TypeError("Invalid ReadableStreamDefaultReader.");
+ }
+ if (this[sym.ownerReadableStream] === undefined) {
+ return;
+ }
+ if (this[sym.readRequests].length) {
+ throw new TypeError("Cannot release lock with pending read requests.");
+ }
+ readableStreamReaderGenericRelease(this);
+ }
+
+ [symbols.customInspect](): string {
+ return `ReadableStreamDefaultReader { closed: Promise }`;
+ }
+}
diff --git a/cli/js/web/streams/shared-internals.ts b/cli/js/web/streams/shared-internals.ts
deleted file mode 100644
index 642b61371..000000000
--- a/cli/js/web/streams/shared-internals.ts
+++ /dev/null
@@ -1,289 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-/* eslint-disable @typescript-eslint/no-explicit-any */
-// TODO don't disable this warning
-
-import { QueuingStrategySizeCallback } from "../dom_types.d.ts";
-
-// common stream fields
-
-export const state_ = Symbol("state_");
-export const storedError_ = Symbol("storedError_");
-
-export type ErrorResult = any;
-
-export function isInteger(value: number): boolean {
- if (!isFinite(value)) {
- // covers NaN, +Infinity and -Infinity
- return false;
- }
- const absValue = Math.abs(value);
- return Math.floor(absValue) === absValue;
-}
-
-export function isFiniteNonNegativeNumber(value: unknown): boolean {
- if (!(typeof value === "number" && isFinite(value))) {
- // covers NaN, +Infinity and -Infinity
- return false;
- }
- return value >= 0;
-}
-
-export function isAbortSignal(signal: any): signal is AbortSignal {
- if (typeof signal !== "object" || signal === null) {
- return false;
- }
- try {
- // TODO
- // calling signal.aborted() probably isn't the right way to perform this test
- // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/shared-internals.ts#L41
- signal.aborted();
- return true;
- } catch (err) {
- return false;
- }
-}
-
-export function invokeOrNoop<O extends object, P extends keyof O>(
- o: O,
- p: P,
- args: any[]
-): any {
- // Assert: O is not undefined.
- // Assert: IsPropertyKey(P) is true.
- // Assert: args is a List.
- const method: Function | undefined = (o as any)[p]; // tslint:disable-line:ban-types
- if (method === undefined) {
- return undefined;
- }
- return Function.prototype.apply.call(method, o, args);
-}
-
-export function cloneArrayBuffer(
- srcBuffer: ArrayBufferLike,
- srcByteOffset: number,
- srcLength: number,
- cloneConstructor: ArrayBufferConstructor | SharedArrayBufferConstructor
-): InstanceType<typeof cloneConstructor> {
- // this function fudges the return type but SharedArrayBuffer is disabled for a while anyway
- return srcBuffer.slice(
- srcByteOffset,
- srcByteOffset + srcLength
- ) as InstanceType<typeof cloneConstructor>;
-}
-
-export function transferArrayBuffer(buffer: ArrayBufferLike): ArrayBuffer {
- // This would in a JS engine context detach the buffer's backing store and return
- // a new ArrayBuffer with the same backing store, invalidating `buffer`,
- // i.e. a move operation in C++ parlance.
- // Sadly ArrayBuffer.transfer is yet to be implemented by a single browser vendor.
- return buffer.slice(0); // copies instead of moves
-}
-
-export function copyDataBlockBytes(
- toBlock: ArrayBufferLike,
- toIndex: number,
- fromBlock: ArrayBufferLike,
- fromIndex: number,
- count: number
-): void {
- new Uint8Array(toBlock, toIndex, count).set(
- new Uint8Array(fromBlock, fromIndex, count)
- );
-}
-
-// helper memoisation map for object values
-// weak so it doesn't keep memoized versions of old objects indefinitely.
-const objectCloneMemo = new WeakMap<object, object>();
-
-let sharedArrayBufferSupported_: boolean | undefined;
-function supportsSharedArrayBuffer(): boolean {
- if (sharedArrayBufferSupported_ === undefined) {
- try {
- new SharedArrayBuffer(16);
- sharedArrayBufferSupported_ = true;
- } catch (e) {
- sharedArrayBufferSupported_ = false;
- }
- }
- return sharedArrayBufferSupported_;
-}
-
-export function cloneValue(value: any): any {
- const valueType = typeof value;
- switch (valueType) {
- case "number":
- case "string":
- case "boolean":
- case "undefined":
- // @ts-ignore
- case "bigint":
- return value;
- case "object": {
- if (objectCloneMemo.has(value)) {
- return objectCloneMemo.get(value);
- }
- if (value === null) {
- return value;
- }
- if (value instanceof Date) {
- return new Date(value.valueOf());
- }
- if (value instanceof RegExp) {
- return new RegExp(value);
- }
- if (supportsSharedArrayBuffer() && value instanceof SharedArrayBuffer) {
- return value;
- }
- if (value instanceof ArrayBuffer) {
- const cloned = cloneArrayBuffer(
- value,
- 0,
- value.byteLength,
- ArrayBuffer
- );
- objectCloneMemo.set(value, cloned);
- return cloned;
- }
- if (ArrayBuffer.isView(value)) {
- const clonedBuffer = cloneValue(value.buffer) as ArrayBufferLike;
- // Use DataViewConstructor type purely for type-checking, can be a DataView or TypedArray.
- // They use the same constructor signature, only DataView has a length in bytes and TypedArrays
- // use a length in terms of elements, so we adjust for that.
- let length: number;
- if (value instanceof DataView) {
- length = value.byteLength;
- } else {
- length = (value as Uint8Array).length;
- }
- return new (value.constructor as DataViewConstructor)(
- clonedBuffer,
- value.byteOffset,
- length
- );
- }
- if (value instanceof Map) {
- const clonedMap = new Map();
- objectCloneMemo.set(value, clonedMap);
- value.forEach((v, k) => clonedMap.set(k, cloneValue(v)));
- return clonedMap;
- }
- if (value instanceof Set) {
- const clonedSet = new Map();
- objectCloneMemo.set(value, clonedSet);
- value.forEach((v, k) => clonedSet.set(k, cloneValue(v)));
- return clonedSet;
- }
-
- // generic object
- const clonedObj = {} as any;
- objectCloneMemo.set(value, clonedObj);
- const sourceKeys = Object.getOwnPropertyNames(value);
- for (const key of sourceKeys) {
- clonedObj[key] = cloneValue(value[key]);
- }
- return clonedObj;
- }
- case "symbol":
- case "function":
- default:
- // TODO this should be a DOMException,
- // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/shared-internals.ts#L171
- throw new Error("Uncloneable value in stream");
- }
-}
-
-export function promiseCall<F extends Function>(
- f: F,
- v: object | undefined,
- args: any[]
-): Promise<any> {
- // tslint:disable-line:ban-types
- try {
- const result = Function.prototype.apply.call(f, v, args);
- return Promise.resolve(result);
- } catch (err) {
- return Promise.reject(err);
- }
-}
-
-export function createAlgorithmFromUnderlyingMethod<
- O extends object,
- K extends keyof O
->(obj: O, methodName: K, extraArgs: any[]): any {
- const method = obj[methodName];
- if (method === undefined) {
- return (): any => Promise.resolve(undefined);
- }
- if (typeof method !== "function") {
- throw new TypeError(`Field "${methodName}" is not a function.`);
- }
- return function (...fnArgs: any[]): any {
- return promiseCall(method, obj, fnArgs.concat(extraArgs));
- };
-}
-
-/*
-Deprecated for now, all usages replaced by readableStreamCreateReadResult
-
-function createIterResultObject<T>(value: T, done: boolean): IteratorResult<T> {
- return { value, done };
-}
-*/
-
-export function validateAndNormalizeHighWaterMark(hwm: unknown): number {
- const highWaterMark = Number(hwm);
- if (isNaN(highWaterMark) || highWaterMark < 0) {
- throw new RangeError(
- "highWaterMark must be a valid, non-negative integer."
- );
- }
- return highWaterMark;
-}
-
-export function makeSizeAlgorithmFromSizeFunction<T>(
- sizeFn: undefined | ((chunk: T) => number)
-): QueuingStrategySizeCallback<T> {
- if (typeof sizeFn !== "function" && typeof sizeFn !== "undefined") {
- throw new TypeError("size function must be undefined or a function");
- }
- return function (chunk: T): number {
- if (typeof sizeFn === "function") {
- return sizeFn(chunk);
- }
- return 1;
- };
-}
-
-// ----
-
-export const enum ControlledPromiseState {
- Pending,
- Resolved,
- Rejected,
-}
-
-export interface ControlledPromise<V> {
- resolve(value?: V): void;
- reject(error: ErrorResult): void;
- promise: Promise<V>;
- state: ControlledPromiseState;
-}
-
-export function createControlledPromise<V>(): ControlledPromise<V> {
- const conProm = {
- state: ControlledPromiseState.Pending,
- } as ControlledPromise<V>;
- conProm.promise = new Promise<V>(function (resolve, reject) {
- conProm.resolve = function (v?: V): void {
- conProm.state = ControlledPromiseState.Resolved;
- resolve(v);
- };
- conProm.reject = function (e?: ErrorResult): void {
- conProm.state = ControlledPromiseState.Rejected;
- reject(e);
- };
- });
- return conProm;
-}
diff --git a/cli/js/web/streams/strategies.ts b/cli/js/web/streams/strategies.ts
deleted file mode 100644
index 4c5b402c5..000000000
--- a/cli/js/web/streams/strategies.ts
+++ /dev/null
@@ -1,32 +0,0 @@
-// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-/* eslint-disable @typescript-eslint/no-explicit-any */
-// TODO reenable this lint here
-
-import { QueuingStrategy } from "../dom_types.d.ts";
-
-export class ByteLengthQueuingStrategy
- implements QueuingStrategy<ArrayBufferView> {
- highWaterMark: number;
-
- constructor(options: { highWaterMark: number }) {
- this.highWaterMark = options.highWaterMark;
- }
-
- size(chunk: ArrayBufferView): number {
- return chunk.byteLength;
- }
-}
-
-export class CountQueuingStrategy implements QueuingStrategy<any> {
- highWaterMark: number;
-
- constructor(options: { highWaterMark: number }) {
- this.highWaterMark = options.highWaterMark;
- }
-
- size(): number {
- return 1;
- }
-}
diff --git a/cli/js/web/streams/symbols.ts b/cli/js/web/streams/symbols.ts
new file mode 100644
index 000000000..9d6335ef0
--- /dev/null
+++ b/cli/js/web/streams/symbols.ts
@@ -0,0 +1,38 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+// The specification refers to internal slots. In most cases, ECMAScript
+// Private Fields are not sufficient for these, as they are often accessed
+// outside of the class itself and using a WeakMap gets really complex to hide
+// this data from the public, therefore we will use unique symbols which are
+// not available in the runtime.
+
+export const asyncIteratorReader = Symbol("asyncIteratorReader");
+export const autoAllocateChunkSize = Symbol("autoAllocateChunkSize");
+export const byobRequest = Symbol("byobRequest");
+export const cancelAlgorithm = Symbol("cancelAlgorithm");
+export const cancelSteps = Symbol("cancelSteps");
+export const closedPromise = Symbol("closedPromise");
+export const closeRequested = Symbol("closeRequested");
+export const controlledReadableByteStream = Symbol(
+ "controlledReadableByteStream"
+);
+export const controlledReadableStream = Symbol("controlledReadableStream");
+export const disturbed = Symbol("disturbed");
+export const forAuthorCode = Symbol("forAuthorCode");
+export const isFakeDetached = Symbol("isFakeDetached");
+export const ownerReadableStream = Symbol("ownerReadableStream");
+export const preventCancel = Symbol("preventCancel");
+export const pullAgain = Symbol("pullAgain");
+export const pullAlgorithm = Symbol("pullAlgorithm");
+export const pulling = Symbol("pulling");
+export const pullSteps = Symbol("pullSteps");
+export const queue = Symbol("queue");
+export const queueTotalSize = Symbol("queueTotalSize");
+export const readableStreamController = Symbol("readableStreamController");
+export const reader = Symbol("reader");
+export const readRequests = Symbol("readRequests");
+export const started = Symbol("started");
+export const state = Symbol("state");
+export const storedError = Symbol("storedError");
+export const strategyHWM = Symbol("strategyHWM");
+export const strategySizeAlgorithm = Symbol("strategySizeAlgorithm");
diff --git a/cli/js/web/streams/transform-internals.ts b/cli/js/web/streams/transform-internals.ts
deleted file mode 100644
index 9c17db8f6..000000000
--- a/cli/js/web/streams/transform-internals.ts
+++ /dev/null
@@ -1,371 +0,0 @@
-// TODO reenable this code when we enable writableStreams and transport types
-// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-// /**
-// * streams/transform-internals - internal types and functions for transform streams
-// * Part of Stardazed
-// * (c) 2018-Present by Arthur Langereis - @zenmumbler
-// * https://github.com/stardazed/sd-streams
-// */
-
-// /* eslint-disable @typescript-eslint/no-explicit-any */
-// // TODO reenable this lint here
-
-// import * as rs from "./readable-internals.ts";
-// import * as ws from "./writable-internals.ts";
-// import * as shared from "./shared-internals.ts";
-
-// import { createReadableStream } from "./readable-stream.ts";
-// import { createWritableStream } from "./writable-stream.ts";
-
-// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.d.ts";
-
-// export const state_ = Symbol("transformState_");
-// export const backpressure_ = Symbol("backpressure_");
-// export const backpressureChangePromise_ = Symbol("backpressureChangePromise_");
-// export const readable_ = Symbol("readable_");
-// export const transformStreamController_ = Symbol("transformStreamController_");
-// export const writable_ = Symbol("writable_");
-
-// export const controlledTransformStream_ = Symbol("controlledTransformStream_");
-// export const flushAlgorithm_ = Symbol("flushAlgorithm_");
-// export const transformAlgorithm_ = Symbol("transformAlgorithm_");
-
-// // ----
-
-// export type TransformFunction<InputType, OutputType> = (
-// chunk: InputType,
-// controller: TransformStreamDefaultController<InputType, OutputType>
-// ) => void | PromiseLike<void>;
-// export type TransformAlgorithm<InputType> = (chunk: InputType) => Promise<void>;
-// export type FlushFunction<InputType, OutputType> = (
-// controller: TransformStreamDefaultController<InputType, OutputType>
-// ) => void | PromiseLike<void>;
-// export type FlushAlgorithm = () => Promise<void>;
-
-// // ----
-
-// export interface TransformStreamDefaultController<InputType, OutputType> {
-// readonly desiredSize: number | null;
-// enqueue(chunk: OutputType): void;
-// error(reason: shared.ErrorResult): void;
-// terminate(): void;
-
-// [controlledTransformStream_]: TransformStream<InputType, OutputType>; // The TransformStream instance controlled; also used for the IsTransformStreamDefaultController brand check
-// [flushAlgorithm_]: FlushAlgorithm; // A promise - returning algorithm which communicates a requested close to the transformer
-// [transformAlgorithm_]: TransformAlgorithm<InputType>; // A promise - returning algorithm, taking one argument(the chunk to transform), which requests the transformer perform its transformation
-// }
-
-// export interface Transformer<InputType, OutputType> {
-// start?(
-// controller: TransformStreamDefaultController<InputType, OutputType>
-// ): void | PromiseLike<void>;
-// transform?: TransformFunction<InputType, OutputType>;
-// flush?: FlushFunction<InputType, OutputType>;
-
-// readableType?: undefined; // for future spec changes
-// writableType?: undefined; // for future spec changes
-// }
-
-// export declare class TransformStream<InputType, OutputType> {
-// constructor(
-// transformer: Transformer<InputType, OutputType>,
-// writableStrategy: QueuingStrategy<InputType>,
-// readableStrategy: QueuingStrategy<OutputType>
-// );
-
-// readonly readable: rs.SDReadableStream<OutputType>;
-// readonly writable: ws.WritableStream<InputType>;
-
-// [backpressure_]: boolean | undefined; // Whether there was backpressure on [[readable]] the last time it was observed
-// [backpressureChangePromise_]: shared.ControlledPromise<void> | undefined; // A promise which is fulfilled and replaced every time the value of[[backpressure]] changes
-// [readable_]: rs.SDReadableStream<OutputType>; // The ReadableStream instance controlled by this object
-// [transformStreamController_]: TransformStreamDefaultController<
-// InputType,
-// OutputType
-// >; // A TransformStreamDefaultController created with the ability to control[[readable]] and[[writable]]; also used for the IsTransformStream brand check
-// [writable_]: ws.WritableStream<InputType>; // The WritableStream instance controlled by this object
-// }
-
-// // ---- TransformStream
-
-// export function isTransformStream(
-// value: unknown
-// ): value is TransformStream<any, any> {
-// if (typeof value !== "object" || value === null) {
-// return false;
-// }
-// return transformStreamController_ in value;
-// }
-
-// export function initializeTransformStream<InputType, OutputType>(
-// stream: TransformStream<InputType, OutputType>,
-// startPromise: Promise<void>,
-// writableHighWaterMark: number,
-// writableSizeAlgorithm: QueuingStrategySizeCallback<InputType>,
-// readableHighWaterMark: number,
-// readableSizeAlgorithm: QueuingStrategySizeCallback<OutputType>
-// ): void {
-// const startAlgorithm = function(): Promise<void> {
-// return startPromise;
-// };
-// const writeAlgorithm = function(chunk: InputType): Promise<void> {
-// return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
-// };
-// const abortAlgorithm = function(reason: shared.ErrorResult): Promise<void> {
-// return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
-// };
-// const closeAlgorithm = function(): Promise<void> {
-// return transformStreamDefaultSinkCloseAlgorithm(stream);
-// };
-// stream[writable_] = createWritableStream<InputType>(
-// startAlgorithm,
-// writeAlgorithm,
-// closeAlgorithm,
-// abortAlgorithm,
-// writableHighWaterMark,
-// writableSizeAlgorithm
-// );
-
-// const pullAlgorithm = function(): Promise<void> {
-// return transformStreamDefaultSourcePullAlgorithm(stream);
-// };
-// const cancelAlgorithm = function(
-// reason: shared.ErrorResult
-// ): Promise<undefined> {
-// transformStreamErrorWritableAndUnblockWrite(stream, reason);
-// return Promise.resolve(undefined);
-// };
-// stream[readable_] = createReadableStream(
-// startAlgorithm,
-// pullAlgorithm,
-// cancelAlgorithm,
-// readableHighWaterMark,
-// readableSizeAlgorithm
-// );
-
-// stream[backpressure_] = undefined;
-// stream[backpressureChangePromise_] = undefined;
-// transformStreamSetBackpressure(stream, true);
-// stream[transformStreamController_] = undefined!; // initialize slot for brand-check
-// }
-
-// export function transformStreamError<InputType, OutputType>(
-// stream: TransformStream<InputType, OutputType>,
-// error: shared.ErrorResult
-// ): void {
-// rs.readableStreamDefaultControllerError(
-// stream[readable_][
-// rs.readableStreamController_
-// ] as rs.SDReadableStreamDefaultController<OutputType>,
-// error
-// );
-// transformStreamErrorWritableAndUnblockWrite(stream, error);
-// }
-
-// export function transformStreamErrorWritableAndUnblockWrite<
-// InputType,
-// OutputType
-// >(
-// stream: TransformStream<InputType, OutputType>,
-// error: shared.ErrorResult
-// ): void {
-// transformStreamDefaultControllerClearAlgorithms(
-// stream[transformStreamController_]
-// );
-// ws.writableStreamDefaultControllerErrorIfNeeded(
-// stream[writable_][ws.writableStreamController_]!,
-// error
-// );
-// if (stream[backpressure_]) {
-// transformStreamSetBackpressure(stream, false);
-// }
-// }
-
-// export function transformStreamSetBackpressure<InputType, OutputType>(
-// stream: TransformStream<InputType, OutputType>,
-// backpressure: boolean
-// ): void {
-// // Assert: stream.[[backpressure]] is not backpressure.
-// if (stream[backpressure_] !== undefined) {
-// stream[backpressureChangePromise_]!.resolve(undefined);
-// }
-// stream[backpressureChangePromise_] = shared.createControlledPromise<void>();
-// stream[backpressure_] = backpressure;
-// }
-
-// // ---- TransformStreamDefaultController
-
-// export function isTransformStreamDefaultController(
-// value: unknown
-// ): value is TransformStreamDefaultController<any, any> {
-// if (typeof value !== "object" || value === null) {
-// return false;
-// }
-// return controlledTransformStream_ in value;
-// }
-
-// export function setUpTransformStreamDefaultController<InputType, OutputType>(
-// stream: TransformStream<InputType, OutputType>,
-// controller: TransformStreamDefaultController<InputType, OutputType>,
-// transformAlgorithm: TransformAlgorithm<InputType>,
-// flushAlgorithm: FlushAlgorithm
-// ): void {
-// // Assert: ! IsTransformStream(stream) is true.
-// // Assert: stream.[[transformStreamController]] is undefined.
-// controller[controlledTransformStream_] = stream;
-// stream[transformStreamController_] = controller;
-// controller[transformAlgorithm_] = transformAlgorithm;
-// controller[flushAlgorithm_] = flushAlgorithm;
-// }
-
-// export function transformStreamDefaultControllerClearAlgorithms<
-// InputType,
-// OutputType
-// >(controller: TransformStreamDefaultController<InputType, OutputType>): void {
-// // Use ! assertions to override type check here, this way we don't
-// // have to perform type checks/assertions everywhere else.
-// controller[transformAlgorithm_] = undefined!;
-// controller[flushAlgorithm_] = undefined!;
-// }
-
-// export function transformStreamDefaultControllerEnqueue<InputType, OutputType>(
-// controller: TransformStreamDefaultController<InputType, OutputType>,
-// chunk: OutputType
-// ): void {
-// const stream = controller[controlledTransformStream_];
-// const readableController = stream[readable_][
-// rs.readableStreamController_
-// ] as rs.SDReadableStreamDefaultController<OutputType>;
-// if (
-// !rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
-// ) {
-// throw new TypeError();
-// }
-// try {
-// rs.readableStreamDefaultControllerEnqueue(readableController, chunk);
-// } catch (error) {
-// transformStreamErrorWritableAndUnblockWrite(stream, error);
-// throw stream[readable_][shared.storedError_];
-// }
-// const backpressure = rs.readableStreamDefaultControllerHasBackpressure(
-// readableController
-// );
-// if (backpressure !== stream[backpressure_]) {
-// // Assert: backpressure is true.
-// transformStreamSetBackpressure(stream, true);
-// }
-// }
-
-// export function transformStreamDefaultControllerError<InputType, OutputType>(
-// controller: TransformStreamDefaultController<InputType, OutputType>,
-// error: shared.ErrorResult
-// ): void {
-// transformStreamError(controller[controlledTransformStream_], error);
-// }
-
-// export function transformStreamDefaultControllerPerformTransform<
-// InputType,
-// OutputType
-// >(
-// controller: TransformStreamDefaultController<InputType, OutputType>,
-// chunk: InputType
-// ): Promise<void> {
-// const transformPromise = controller[transformAlgorithm_](chunk);
-// return transformPromise.catch(error => {
-// transformStreamError(controller[controlledTransformStream_], error);
-// throw error;
-// });
-// }
-
-// export function transformStreamDefaultControllerTerminate<
-// InputType,
-// OutputType
-// >(controller: TransformStreamDefaultController<InputType, OutputType>): void {
-// const stream = controller[controlledTransformStream_];
-// const readableController = stream[readable_][
-// rs.readableStreamController_
-// ] as rs.SDReadableStreamDefaultController<OutputType>;
-// if (rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
-// rs.readableStreamDefaultControllerClose(readableController);
-// }
-// const error = new TypeError("The transform stream has been terminated");
-// transformStreamErrorWritableAndUnblockWrite(stream, error);
-// }
-
-// // ---- Transform Sinks
-
-// export function transformStreamDefaultSinkWriteAlgorithm<InputType, OutputType>(
-// stream: TransformStream<InputType, OutputType>,
-// chunk: InputType
-// ): Promise<void> {
-// // Assert: stream.[[writable]].[[state]] is "writable".
-// const controller = stream[transformStreamController_];
-// if (stream[backpressure_]) {
-// const backpressureChangePromise = stream[backpressureChangePromise_]!;
-// // Assert: backpressureChangePromise is not undefined.
-// return backpressureChangePromise.promise.then(_ => {
-// const writable = stream[writable_];
-// const state = writable[shared.state_];
-// if (state === "erroring") {
-// throw writable[shared.storedError_];
-// }
-// // Assert: state is "writable".
-// return transformStreamDefaultControllerPerformTransform(
-// controller,
-// chunk
-// );
-// });
-// }
-// return transformStreamDefaultControllerPerformTransform(controller, chunk);
-// }
-
-// export function transformStreamDefaultSinkAbortAlgorithm<InputType, OutputType>(
-// stream: TransformStream<InputType, OutputType>,
-// reason: shared.ErrorResult
-// ): Promise<void> {
-// transformStreamError(stream, reason);
-// return Promise.resolve(undefined);
-// }
-
-// export function transformStreamDefaultSinkCloseAlgorithm<InputType, OutputType>(
-// stream: TransformStream<InputType, OutputType>
-// ): Promise<void> {
-// const readable = stream[readable_];
-// const controller = stream[transformStreamController_];
-// const flushPromise = controller[flushAlgorithm_]();
-// transformStreamDefaultControllerClearAlgorithms(controller);
-
-// return flushPromise.then(
-// _ => {
-// if (readable[shared.state_] === "errored") {
-// throw readable[shared.storedError_];
-// }
-// const readableController = readable[
-// rs.readableStreamController_
-// ] as rs.SDReadableStreamDefaultController<OutputType>;
-// if (
-// rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
-// ) {
-// rs.readableStreamDefaultControllerClose(readableController);
-// }
-// },
-// error => {
-// transformStreamError(stream, error);
-// throw readable[shared.storedError_];
-// }
-// );
-// }
-
-// // ---- Transform Sources
-
-// export function transformStreamDefaultSourcePullAlgorithm<
-// InputType,
-// OutputType
-// >(stream: TransformStream<InputType, OutputType>): Promise<void> {
-// // Assert: stream.[[backpressure]] is true.
-// // Assert: stream.[[backpressureChangePromise]] is not undefined.
-// transformStreamSetBackpressure(stream, false);
-// return stream[backpressureChangePromise_]!.promise;
-// }
diff --git a/cli/js/web/streams/transform-stream-default-controller.ts b/cli/js/web/streams/transform-stream-default-controller.ts
deleted file mode 100644
index 24a8d08fd..000000000
--- a/cli/js/web/streams/transform-stream-default-controller.ts
+++ /dev/null
@@ -1,58 +0,0 @@
-// TODO reenable this code when we enable writableStreams and transport types
-// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-// /**
-// * streams/transform-stream-default-controller - TransformStreamDefaultController class implementation
-// * Part of Stardazed
-// * (c) 2018-Present by Arthur Langereis - @zenmumbler
-// * https://github.com/stardazed/sd-streams
-// */
-
-// import * as rs from "./readable-internals.ts";
-// import * as ts from "./transform-internals.ts";
-// import { ErrorResult } from "./shared-internals.ts";
-
-// export class TransformStreamDefaultController<InputType, OutputType>
-// implements ts.TransformStreamDefaultController<InputType, OutputType> {
-// [ts.controlledTransformStream_]: ts.TransformStream<InputType, OutputType>;
-// [ts.flushAlgorithm_]: ts.FlushAlgorithm;
-// [ts.transformAlgorithm_]: ts.TransformAlgorithm<InputType>;
-
-// constructor() {
-// throw new TypeError();
-// }
-
-// get desiredSize(): number | null {
-// if (!ts.isTransformStreamDefaultController(this)) {
-// throw new TypeError();
-// }
-// const readableController = this[ts.controlledTransformStream_][
-// ts.readable_
-// ][rs.readableStreamController_] as rs.SDReadableStreamDefaultController<
-// OutputType
-// >;
-// return rs.readableStreamDefaultControllerGetDesiredSize(readableController);
-// }
-
-// enqueue(chunk: OutputType): void {
-// if (!ts.isTransformStreamDefaultController(this)) {
-// throw new TypeError();
-// }
-// ts.transformStreamDefaultControllerEnqueue(this, chunk);
-// }
-
-// error(reason: ErrorResult): void {
-// if (!ts.isTransformStreamDefaultController(this)) {
-// throw new TypeError();
-// }
-// ts.transformStreamDefaultControllerError(this, reason);
-// }
-
-// terminate(): void {
-// if (!ts.isTransformStreamDefaultController(this)) {
-// throw new TypeError();
-// }
-// ts.transformStreamDefaultControllerTerminate(this);
-// }
-// }
diff --git a/cli/js/web/streams/transform-stream.ts b/cli/js/web/streams/transform-stream.ts
deleted file mode 100644
index c27430db1..000000000
--- a/cli/js/web/streams/transform-stream.ts
+++ /dev/null
@@ -1,147 +0,0 @@
-// TODO reenable this code when we enable writableStreams and transport types
-// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-// /**
-// * streams/transform-stream - TransformStream class implementation
-// * Part of Stardazed
-// * (c) 2018-Present by Arthur Langereis - @zenmumbler
-// * https://github.com/stardazed/sd-streams
-// */
-
-// /* eslint-disable @typescript-eslint/no-explicit-any */
-// // TODO reenable this lint here
-
-// import * as rs from "./readable-internals.ts";
-// import * as ws from "./writable-internals.ts";
-// import * as ts from "./transform-internals.ts";
-// import * as shared from "./shared-internals.ts";
-// import { TransformStreamDefaultController } from "./transform-stream-default-controller.ts";
-// import { QueuingStrategy } from "../dom_types.d.ts";
-
-// export class TransformStream<InputType, OutputType> {
-// [ts.backpressure_]: boolean | undefined; // Whether there was backpressure on [[readable]] the last time it was observed
-// [ts.backpressureChangePromise_]: shared.ControlledPromise<void>; // A promise which is fulfilled and replaced every time the value of[[backpressure]] changes
-// [ts.readable_]: rs.SDReadableStream<OutputType>; // The ReadableStream instance controlled by this object
-// [ts.transformStreamController_]: TransformStreamDefaultController<
-// InputType,
-// OutputType
-// >; // A TransformStreamDefaultController created with the ability to control[[readable]] and[[writable]]; also used for the IsTransformStream brand check
-// [ts.writable_]: ws.WritableStream<InputType>; // The WritableStream instance controlled by this object
-
-// constructor(
-// transformer: ts.Transformer<InputType, OutputType> = {},
-// writableStrategy: QueuingStrategy<InputType> = {},
-// readableStrategy: QueuingStrategy<OutputType> = {}
-// ) {
-// const writableSizeFunction = writableStrategy.size;
-// const writableHighWaterMark = writableStrategy.highWaterMark;
-// const readableSizeFunction = readableStrategy.size;
-// const readableHighWaterMark = readableStrategy.highWaterMark;
-
-// const writableType = transformer.writableType;
-// if (writableType !== undefined) {
-// throw new RangeError(
-// "The transformer's `writableType` field must be undefined"
-// );
-// }
-// const writableSizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(
-// writableSizeFunction
-// );
-// const writableHWM = shared.validateAndNormalizeHighWaterMark(
-// writableHighWaterMark === undefined ? 1 : writableHighWaterMark
-// );
-
-// const readableType = transformer.readableType;
-// if (readableType !== undefined) {
-// throw new RangeError(
-// "The transformer's `readableType` field must be undefined"
-// );
-// }
-// const readableSizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(
-// readableSizeFunction
-// );
-// const readableHWM = shared.validateAndNormalizeHighWaterMark(
-// readableHighWaterMark === undefined ? 0 : readableHighWaterMark
-// );
-
-// const startPromise = shared.createControlledPromise<void>();
-// ts.initializeTransformStream(
-// this,
-// startPromise.promise,
-// writableHWM,
-// writableSizeAlgorithm,
-// readableHWM,
-// readableSizeAlgorithm
-// );
-// setUpTransformStreamDefaultControllerFromTransformer(this, transformer);
-
-// const startResult = shared.invokeOrNoop(transformer, "start", [
-// this[ts.transformStreamController_]
-// ]);
-// startPromise.resolve(startResult);
-// }
-
-// get readable(): rs.SDReadableStream<OutputType> {
-// if (!ts.isTransformStream(this)) {
-// throw new TypeError();
-// }
-// return this[ts.readable_];
-// }
-
-// get writable(): ws.WritableStream<InputType> {
-// if (!ts.isTransformStream(this)) {
-// throw new TypeError();
-// }
-// return this[ts.writable_];
-// }
-// }
-
-// function setUpTransformStreamDefaultControllerFromTransformer<
-// InputType,
-// OutputType
-// >(
-// stream: TransformStream<InputType, OutputType>,
-// transformer: ts.Transformer<InputType, OutputType>
-// ): void {
-// const controller = Object.create(
-// TransformStreamDefaultController.prototype
-// ) as TransformStreamDefaultController<InputType, OutputType>;
-// let transformAlgorithm: ts.TransformAlgorithm<InputType>;
-
-// const transformMethod = transformer.transform;
-// if (transformMethod !== undefined) {
-// if (typeof transformMethod !== "function") {
-// throw new TypeError(
-// "`transform` field of the transformer must be a function"
-// );
-// }
-// transformAlgorithm = (chunk: InputType): Promise<any> =>
-// shared.promiseCall(transformMethod, transformer, [chunk, controller]);
-// } else {
-// // use identity transform
-// transformAlgorithm = function(chunk: InputType): Promise<void> {
-// try {
-// // OutputType and InputType are the same here
-// ts.transformStreamDefaultControllerEnqueue(
-// controller,
-// (chunk as unknown) as OutputType
-// );
-// } catch (error) {
-// return Promise.reject(error);
-// }
-// return Promise.resolve(undefined);
-// };
-// }
-// const flushAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
-// transformer,
-// "flush",
-// [controller]
-// );
-// ts.setUpTransformStreamDefaultController(
-// stream,
-// controller,
-// transformAlgorithm,
-// flushAlgorithm
-// );
-// }
diff --git a/cli/js/web/streams/writable-internals.ts b/cli/js/web/streams/writable-internals.ts
deleted file mode 100644
index 4d442d0f5..000000000
--- a/cli/js/web/streams/writable-internals.ts
+++ /dev/null
@@ -1,800 +0,0 @@
-// TODO reenable this code when we enable writableStreams and transport types
-// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-// /**
-// * streams/writable-internals - internal types and functions for writable streams
-// * Part of Stardazed
-// * (c) 2018-Present by Arthur Langereis - @zenmumbler
-// * https://github.com/stardazed/sd-streams
-// */
-
-// /* eslint-disable @typescript-eslint/no-explicit-any */
-// // TODO reenable this lint here
-
-// import * as shared from "./shared-internals.ts";
-// import * as q from "./queue-mixin.ts";
-
-// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.d.ts";
-
-// export const backpressure_ = Symbol("backpressure_");
-// export const closeRequest_ = Symbol("closeRequest_");
-// export const inFlightWriteRequest_ = Symbol("inFlightWriteRequest_");
-// export const inFlightCloseRequest_ = Symbol("inFlightCloseRequest_");
-// export const pendingAbortRequest_ = Symbol("pendingAbortRequest_");
-// export const writableStreamController_ = Symbol("writableStreamController_");
-// export const writer_ = Symbol("writer_");
-// export const writeRequests_ = Symbol("writeRequests_");
-
-// export const abortAlgorithm_ = Symbol("abortAlgorithm_");
-// export const closeAlgorithm_ = Symbol("closeAlgorithm_");
-// export const controlledWritableStream_ = Symbol("controlledWritableStream_");
-// export const started_ = Symbol("started_");
-// export const strategyHWM_ = Symbol("strategyHWM_");
-// export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_");
-// export const writeAlgorithm_ = Symbol("writeAlgorithm_");
-
-// export const ownerWritableStream_ = Symbol("ownerWritableStream_");
-// export const closedPromise_ = Symbol("closedPromise_");
-// export const readyPromise_ = Symbol("readyPromise_");
-
-// export const errorSteps_ = Symbol("errorSteps_");
-// export const abortSteps_ = Symbol("abortSteps_");
-
-// export type StartFunction = (
-// controller: WritableStreamController
-// ) => void | PromiseLike<void>;
-// export type StartAlgorithm = () => Promise<void> | void;
-// export type WriteFunction<InputType> = (
-// chunk: InputType,
-// controller: WritableStreamController
-// ) => void | PromiseLike<void>;
-// export type WriteAlgorithm<InputType> = (chunk: InputType) => Promise<void>;
-// export type CloseAlgorithm = () => Promise<void>;
-// export type AbortAlgorithm = (reason?: shared.ErrorResult) => Promise<void>;
-
-// // ----
-
-// export interface WritableStreamController {
-// error(e?: shared.ErrorResult): void;
-
-// [errorSteps_](): void;
-// [abortSteps_](reason: shared.ErrorResult): Promise<void>;
-// }
-
-// export interface WriteRecord<InputType> {
-// chunk: InputType;
-// }
-
-// export interface WritableStreamDefaultController<InputType>
-// extends WritableStreamController,
-// q.QueueContainer<WriteRecord<InputType> | "close"> {
-// [abortAlgorithm_]: AbortAlgorithm; // A promise - returning algorithm, taking one argument(the abort reason), which communicates a requested abort to the underlying sink
-// [closeAlgorithm_]: CloseAlgorithm; // A promise - returning algorithm which communicates a requested close to the underlying sink
-// [controlledWritableStream_]: WritableStream<InputType>; // The WritableStream instance controlled
-// [started_]: boolean; // A boolean flag indicating whether the underlying sink has finished starting
-// [strategyHWM_]: number; // A number supplied by the creator of the stream as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying sink
-// [strategySizeAlgorithm_]: QueuingStrategySizeCallback<InputType>; // An algorithm to calculate the size of enqueued chunks, as part of the stream’s queuing strategy
-// [writeAlgorithm_]: WriteAlgorithm<InputType>; // A promise-returning algorithm, taking one argument (the chunk to write), which writes data to the underlying sink
-// }
-
-// // ----
-
-// export interface WritableStreamWriter<InputType> {
-// readonly closed: Promise<void>;
-// readonly desiredSize: number | null;
-// readonly ready: Promise<void>;
-
-// abort(reason: shared.ErrorResult): Promise<void>;
-// close(): Promise<void>;
-// releaseLock(): void;
-// write(chunk: InputType): Promise<void>;
-// }
-
-// export interface WritableStreamDefaultWriter<InputType>
-// extends WritableStreamWriter<InputType> {
-// [ownerWritableStream_]: WritableStream<InputType> | undefined;
-// [closedPromise_]: shared.ControlledPromise<void>;
-// [readyPromise_]: shared.ControlledPromise<void>;
-// }
-
-// // ----
-
-// export type WritableStreamState =
-// | "writable"
-// | "closed"
-// | "erroring"
-// | "errored";
-
-// export interface WritableStreamSink<InputType> {
-// start?: StartFunction;
-// write?: WriteFunction<InputType>;
-// close?(): void | PromiseLike<void>;
-// abort?(reason?: shared.ErrorResult): void;
-
-// type?: undefined; // unused, for future revisions
-// }
-
-// export interface AbortRequest {
-// reason: shared.ErrorResult;
-// wasAlreadyErroring: boolean;
-// promise: Promise<void>;
-// resolve(): void;
-// reject(error: shared.ErrorResult): void;
-// }
-
-// export declare class WritableStream<InputType> {
-// constructor(
-// underlyingSink?: WritableStreamSink<InputType>,
-// strategy?: QueuingStrategy<InputType>
-// );
-
-// readonly locked: boolean;
-// abort(reason?: shared.ErrorResult): Promise<void>;
-// getWriter(): WritableStreamWriter<InputType>;
-
-// [shared.state_]: WritableStreamState;
-// [backpressure_]: boolean;
-// [closeRequest_]: shared.ControlledPromise<void> | undefined;
-// [inFlightWriteRequest_]: shared.ControlledPromise<void> | undefined;
-// [inFlightCloseRequest_]: shared.ControlledPromise<void> | undefined;
-// [pendingAbortRequest_]: AbortRequest | undefined;
-// [shared.storedError_]: shared.ErrorResult;
-// [writableStreamController_]:
-// | WritableStreamDefaultController<InputType>
-// | undefined;
-// [writer_]: WritableStreamDefaultWriter<InputType> | undefined;
-// [writeRequests_]: Array<shared.ControlledPromise<void>>;
-// }
-
-// // ---- Stream
-
-// export function initializeWritableStream<InputType>(
-// stream: WritableStream<InputType>
-// ): void {
-// stream[shared.state_] = "writable";
-// stream[shared.storedError_] = undefined;
-// stream[writer_] = undefined;
-// stream[writableStreamController_] = undefined;
-// stream[inFlightWriteRequest_] = undefined;
-// stream[closeRequest_] = undefined;
-// stream[inFlightCloseRequest_] = undefined;
-// stream[pendingAbortRequest_] = undefined;
-// stream[writeRequests_] = [];
-// stream[backpressure_] = false;
-// }
-
-// export function isWritableStream(value: unknown): value is WritableStream<any> {
-// if (typeof value !== "object" || value === null) {
-// return false;
-// }
-// return writableStreamController_ in value;
-// }
-
-// export function isWritableStreamLocked<InputType>(
-// stream: WritableStream<InputType>
-// ): boolean {
-// return stream[writer_] !== undefined;
-// }
-
-// export function writableStreamAbort<InputType>(
-// stream: WritableStream<InputType>,
-// reason: shared.ErrorResult
-// ): Promise<void> {
-// const state = stream[shared.state_];
-// if (state === "closed" || state === "errored") {
-// return Promise.resolve(undefined);
-// }
-// let pending = stream[pendingAbortRequest_];
-// if (pending !== undefined) {
-// return pending.promise;
-// }
-// // Assert: state is "writable" or "erroring".
-// let wasAlreadyErroring = false;
-// if (state === "erroring") {
-// wasAlreadyErroring = true;
-// reason = undefined;
-// }
-
-// pending = {
-// reason,
-// wasAlreadyErroring
-// } as AbortRequest;
-// const promise = new Promise<void>((resolve, reject) => {
-// pending!.resolve = resolve;
-// pending!.reject = reject;
-// });
-// pending.promise = promise;
-// stream[pendingAbortRequest_] = pending;
-// if (!wasAlreadyErroring) {
-// writableStreamStartErroring(stream, reason);
-// }
-// return promise;
-// }
-
-// export function writableStreamAddWriteRequest<InputType>(
-// stream: WritableStream<InputType>
-// ): Promise<void> {
-// // Assert: !IsWritableStreamLocked(stream) is true.
-// // Assert: stream.[[state]] is "writable".
-// const writePromise = shared.createControlledPromise<void>();
-// stream[writeRequests_].push(writePromise);
-// return writePromise.promise;
-// }
-
-// export function writableStreamDealWithRejection<InputType>(
-// stream: WritableStream<InputType>,
-// error: shared.ErrorResult
-// ): void {
-// const state = stream[shared.state_];
-// if (state === "writable") {
-// writableStreamStartErroring(stream, error);
-// return;
-// }
-// // Assert: state is "erroring"
-// writableStreamFinishErroring(stream);
-// }
-
-// export function writableStreamStartErroring<InputType>(
-// stream: WritableStream<InputType>,
-// reason: shared.ErrorResult
-// ): void {
-// // Assert: stream.[[storedError]] is undefined.
-// // Assert: stream.[[state]] is "writable".
-// const controller = stream[writableStreamController_]!;
-// // Assert: controller is not undefined.
-// stream[shared.state_] = "erroring";
-// stream[shared.storedError_] = reason;
-// const writer = stream[writer_];
-// if (writer !== undefined) {
-// writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
-// }
-// if (
-// !writableStreamHasOperationMarkedInFlight(stream) &&
-// controller[started_]
-// ) {
-// writableStreamFinishErroring(stream);
-// }
-// }
-
-// export function writableStreamFinishErroring<InputType>(
-// stream: WritableStream<InputType>
-// ): void {
-// // Assert: stream.[[state]] is "erroring".
-// // Assert: writableStreamHasOperationMarkedInFlight(stream) is false.
-// stream[shared.state_] = "errored";
-// const controller = stream[writableStreamController_]!;
-// controller[errorSteps_]();
-// const storedError = stream[shared.storedError_];
-// for (const writeRequest of stream[writeRequests_]) {
-// writeRequest.reject(storedError);
-// }
-// stream[writeRequests_] = [];
-
-// const abortRequest = stream[pendingAbortRequest_];
-// if (abortRequest === undefined) {
-// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
-// return;
-// }
-// stream[pendingAbortRequest_] = undefined;
-// if (abortRequest.wasAlreadyErroring) {
-// abortRequest.reject(storedError);
-// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
-// return;
-// }
-// const promise = controller[abortSteps_](abortRequest.reason);
-// promise.then(
-// _ => {
-// abortRequest.resolve();
-// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
-// },
-// error => {
-// abortRequest.reject(error);
-// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
-// }
-// );
-// }
-
-// export function writableStreamFinishInFlightWrite<InputType>(
-// stream: WritableStream<InputType>
-// ): void {
-// // Assert: stream.[[inFlightWriteRequest]] is not undefined.
-// stream[inFlightWriteRequest_]!.resolve(undefined);
-// stream[inFlightWriteRequest_] = undefined;
-// }
-
-// export function writableStreamFinishInFlightWriteWithError<InputType>(
-// stream: WritableStream<InputType>,
-// error: shared.ErrorResult
-// ): void {
-// // Assert: stream.[[inFlightWriteRequest]] is not undefined.
-// stream[inFlightWriteRequest_]!.reject(error);
-// stream[inFlightWriteRequest_] = undefined;
-// // Assert: stream.[[state]] is "writable" or "erroring".
-// writableStreamDealWithRejection(stream, error);
-// }
-
-// export function writableStreamFinishInFlightClose<InputType>(
-// stream: WritableStream<InputType>
-// ): void {
-// // Assert: stream.[[inFlightCloseRequest]] is not undefined.
-// stream[inFlightCloseRequest_]!.resolve(undefined);
-// stream[inFlightCloseRequest_] = undefined;
-// const state = stream[shared.state_];
-// // Assert: stream.[[state]] is "writable" or "erroring".
-// if (state === "erroring") {
-// stream[shared.storedError_] = undefined;
-// if (stream[pendingAbortRequest_] !== undefined) {
-// stream[pendingAbortRequest_]!.resolve();
-// stream[pendingAbortRequest_] = undefined;
-// }
-// }
-// stream[shared.state_] = "closed";
-// const writer = stream[writer_];
-// if (writer !== undefined) {
-// writer[closedPromise_].resolve(undefined);
-// }
-// // Assert: stream.[[pendingAbortRequest]] is undefined.
-// // Assert: stream.[[storedError]] is undefined.
-// }
-
-// export function writableStreamFinishInFlightCloseWithError<InputType>(
-// stream: WritableStream<InputType>,
-// error: shared.ErrorResult
-// ): void {
-// // Assert: stream.[[inFlightCloseRequest]] is not undefined.
-// stream[inFlightCloseRequest_]!.reject(error);
-// stream[inFlightCloseRequest_] = undefined;
-// // Assert: stream.[[state]] is "writable" or "erroring".
-// if (stream[pendingAbortRequest_] !== undefined) {
-// stream[pendingAbortRequest_]!.reject(error);
-// stream[pendingAbortRequest_] = undefined;
-// }
-// writableStreamDealWithRejection(stream, error);
-// }
-
-// export function writableStreamCloseQueuedOrInFlight<InputType>(
-// stream: WritableStream<InputType>
-// ): boolean {
-// return (
-// stream[closeRequest_] !== undefined ||
-// stream[inFlightCloseRequest_] !== undefined
-// );
-// }
-
-// export function writableStreamHasOperationMarkedInFlight<InputType>(
-// stream: WritableStream<InputType>
-// ): boolean {
-// return (
-// stream[inFlightWriteRequest_] !== undefined ||
-// stream[inFlightCloseRequest_] !== undefined
-// );
-// }
-
-// export function writableStreamMarkCloseRequestInFlight<InputType>(
-// stream: WritableStream<InputType>
-// ): void {
-// // Assert: stream.[[inFlightCloseRequest]] is undefined.
-// // Assert: stream.[[closeRequest]] is not undefined.
-// stream[inFlightCloseRequest_] = stream[closeRequest_];
-// stream[closeRequest_] = undefined;
-// }
-
-// export function writableStreamMarkFirstWriteRequestInFlight<InputType>(
-// stream: WritableStream<InputType>
-// ): void {
-// // Assert: stream.[[inFlightWriteRequest]] is undefined.
-// // Assert: stream.[[writeRequests]] is not empty.
-// const writeRequest = stream[writeRequests_].shift()!;
-// stream[inFlightWriteRequest_] = writeRequest;
-// }
-
-// export function writableStreamRejectCloseAndClosedPromiseIfNeeded<InputType>(
-// stream: WritableStream<InputType>
-// ): void {
-// // Assert: stream.[[state]] is "errored".
-// const closeRequest = stream[closeRequest_];
-// if (closeRequest !== undefined) {
-// // Assert: stream.[[inFlightCloseRequest]] is undefined.
-// closeRequest.reject(stream[shared.storedError_]);
-// stream[closeRequest_] = undefined;
-// }
-// const writer = stream[writer_];
-// if (writer !== undefined) {
-// writer[closedPromise_].reject(stream[shared.storedError_]);
-// writer[closedPromise_].promise.catch(() => {});
-// }
-// }
-
-// export function writableStreamUpdateBackpressure<InputType>(
-// stream: WritableStream<InputType>,
-// backpressure: boolean
-// ): void {
-// // Assert: stream.[[state]] is "writable".
-// // Assert: !WritableStreamCloseQueuedOrInFlight(stream) is false.
-// const writer = stream[writer_];
-// if (writer !== undefined && backpressure !== stream[backpressure_]) {
-// if (backpressure) {
-// writer[readyPromise_] = shared.createControlledPromise<void>();
-// } else {
-// writer[readyPromise_].resolve(undefined);
-// }
-// }
-// stream[backpressure_] = backpressure;
-// }
-
-// // ---- Writers
-
-// export function isWritableStreamDefaultWriter(
-// value: unknown
-// ): value is WritableStreamDefaultWriter<any> {
-// if (typeof value !== "object" || value === null) {
-// return false;
-// }
-// return ownerWritableStream_ in value;
-// }
-
-// export function writableStreamDefaultWriterAbort<InputType>(
-// writer: WritableStreamDefaultWriter<InputType>,
-// reason: shared.ErrorResult
-// ): Promise<void> {
-// const stream = writer[ownerWritableStream_]!;
-// // Assert: stream is not undefined.
-// return writableStreamAbort(stream, reason);
-// }
-
-// export function writableStreamDefaultWriterClose<InputType>(
-// writer: WritableStreamDefaultWriter<InputType>
-// ): Promise<void> {
-// const stream = writer[ownerWritableStream_]!;
-// // Assert: stream is not undefined.
-// const state = stream[shared.state_];
-// if (state === "closed" || state === "errored") {
-// return Promise.reject(
-// new TypeError("Writer stream is already closed or errored")
-// );
-// }
-// // Assert: state is "writable" or "erroring".
-// // Assert: writableStreamCloseQueuedOrInFlight(stream) is false.
-// const closePromise = shared.createControlledPromise<void>();
-// stream[closeRequest_] = closePromise;
-// if (stream[backpressure_] && state === "writable") {
-// writer[readyPromise_].resolve(undefined);
-// }
-// writableStreamDefaultControllerClose(stream[writableStreamController_]!);
-// return closePromise.promise;
-// }
-
-// export function writableStreamDefaultWriterCloseWithErrorPropagation<InputType>(
-// writer: WritableStreamDefaultWriter<InputType>
-// ): Promise<void> {
-// const stream = writer[ownerWritableStream_]!;
-// // Assert: stream is not undefined.
-// const state = stream[shared.state_];
-// if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
-// return Promise.resolve(undefined);
-// }
-// if (state === "errored") {
-// return Promise.reject(stream[shared.storedError_]);
-// }
-// // Assert: state is "writable" or "erroring".
-// return writableStreamDefaultWriterClose(writer);
-// }
-
-// export function writableStreamDefaultWriterEnsureClosedPromiseRejected<
-// InputType
-// >(
-// writer: WritableStreamDefaultWriter<InputType>,
-// error: shared.ErrorResult
-// ): void {
-// const closedPromise = writer[closedPromise_];
-// if (closedPromise.state === shared.ControlledPromiseState.Pending) {
-// closedPromise.reject(error);
-// } else {
-// writer[closedPromise_] = shared.createControlledPromise<void>();
-// writer[closedPromise_].reject(error);
-// }
-// writer[closedPromise_].promise.catch(() => {});
-// }
-
-// export function writableStreamDefaultWriterEnsureReadyPromiseRejected<
-// InputType
-// >(
-// writer: WritableStreamDefaultWriter<InputType>,
-// error: shared.ErrorResult
-// ): void {
-// const readyPromise = writer[readyPromise_];
-// if (readyPromise.state === shared.ControlledPromiseState.Pending) {
-// readyPromise.reject(error);
-// } else {
-// writer[readyPromise_] = shared.createControlledPromise<void>();
-// writer[readyPromise_].reject(error);
-// }
-// writer[readyPromise_].promise.catch(() => {});
-// }
-
-// export function writableStreamDefaultWriterGetDesiredSize<InputType>(
-// writer: WritableStreamDefaultWriter<InputType>
-// ): number | null {
-// const stream = writer[ownerWritableStream_]!;
-// const state = stream[shared.state_];
-// if (state === "errored" || state === "erroring") {
-// return null;
-// }
-// if (state === "closed") {
-// return 0;
-// }
-// return writableStreamDefaultControllerGetDesiredSize(
-// stream[writableStreamController_]!
-// );
-// }
-
-// export function writableStreamDefaultWriterRelease<InputType>(
-// writer: WritableStreamDefaultWriter<InputType>
-// ): void {
-// const stream = writer[ownerWritableStream_]!;
-// // Assert: stream is not undefined.
-// // Assert: stream.[[writer]] is writer.
-// const releasedError = new TypeError();
-// writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
-// writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError);
-// stream[writer_] = undefined;
-// writer[ownerWritableStream_] = undefined;
-// }
-
-// export function writableStreamDefaultWriterWrite<InputType>(
-// writer: WritableStreamDefaultWriter<InputType>,
-// chunk: InputType
-// ): Promise<void> {
-// const stream = writer[ownerWritableStream_]!;
-// // Assert: stream is not undefined.
-// const controller = stream[writableStreamController_]!;
-// const chunkSize = writableStreamDefaultControllerGetChunkSize(
-// controller,
-// chunk
-// );
-// if (writer[ownerWritableStream_] !== stream) {
-// return Promise.reject(new TypeError());
-// }
-// const state = stream[shared.state_];
-// if (state === "errored") {
-// return Promise.reject(stream[shared.storedError_]);
-// }
-// if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
-// return Promise.reject(
-// new TypeError("Cannot write to a closing or closed stream")
-// );
-// }
-// if (state === "erroring") {
-// return Promise.reject(stream[shared.storedError_]);
-// }
-// // Assert: state is "writable".
-// const promise = writableStreamAddWriteRequest(stream);
-// writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
-// return promise;
-// }
-
-// // ---- Controller
-
-// export function setUpWritableStreamDefaultController<InputType>(
-// stream: WritableStream<InputType>,
-// controller: WritableStreamDefaultController<InputType>,
-// startAlgorithm: StartAlgorithm,
-// writeAlgorithm: WriteAlgorithm<InputType>,
-// closeAlgorithm: CloseAlgorithm,
-// abortAlgorithm: AbortAlgorithm,
-// highWaterMark: number,
-// sizeAlgorithm: QueuingStrategySizeCallback<InputType>
-// ): void {
-// if (!isWritableStream(stream)) {
-// throw new TypeError();
-// }
-// if (stream[writableStreamController_] !== undefined) {
-// throw new TypeError();
-// }
-
-// controller[controlledWritableStream_] = stream;
-// stream[writableStreamController_] = controller;
-// q.resetQueue(controller);
-// controller[started_] = false;
-// controller[strategySizeAlgorithm_] = sizeAlgorithm;
-// controller[strategyHWM_] = highWaterMark;
-// controller[writeAlgorithm_] = writeAlgorithm;
-// controller[closeAlgorithm_] = closeAlgorithm;
-// controller[abortAlgorithm_] = abortAlgorithm;
-// const backpressure = writableStreamDefaultControllerGetBackpressure(
-// controller
-// );
-// writableStreamUpdateBackpressure(stream, backpressure);
-
-// const startResult = startAlgorithm();
-// Promise.resolve(startResult).then(
-// _ => {
-// // Assert: stream.[[state]] is "writable" or "erroring".
-// controller[started_] = true;
-// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
-// },
-// error => {
-// // Assert: stream.[[state]] is "writable" or "erroring".
-// controller[started_] = true;
-// writableStreamDealWithRejection(stream, error);
-// }
-// );
-// }
-
-// export function isWritableStreamDefaultController(
-// value: unknown
-// ): value is WritableStreamDefaultController<any> {
-// if (typeof value !== "object" || value === null) {
-// return false;
-// }
-// return controlledWritableStream_ in value;
-// }
-
-// export function writableStreamDefaultControllerClearAlgorithms<InputType>(
-// controller: WritableStreamDefaultController<InputType>
-// ): void {
-// // Use ! assertions to override type check here, this way we don't
-// // have to perform type checks/assertions everywhere else.
-// controller[writeAlgorithm_] = undefined!;
-// controller[closeAlgorithm_] = undefined!;
-// controller[abortAlgorithm_] = undefined!;
-// controller[strategySizeAlgorithm_] = undefined!;
-// }
-
-// export function writableStreamDefaultControllerClose<InputType>(
-// controller: WritableStreamDefaultController<InputType>
-// ): void {
-// q.enqueueValueWithSize(controller, "close", 0);
-// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
-// }
-
-// export function writableStreamDefaultControllerGetChunkSize<InputType>(
-// controller: WritableStreamDefaultController<InputType>,
-// chunk: InputType
-// ): number {
-// let chunkSize: number;
-// try {
-// chunkSize = controller[strategySizeAlgorithm_](chunk);
-// } catch (error) {
-// writableStreamDefaultControllerErrorIfNeeded(controller, error);
-// chunkSize = 1;
-// }
-// return chunkSize;
-// }
-
-// export function writableStreamDefaultControllerGetDesiredSize<InputType>(
-// controller: WritableStreamDefaultController<InputType>
-// ): number {
-// return controller[strategyHWM_] - controller[q.queueTotalSize_];
-// }
-
-// export function writableStreamDefaultControllerWrite<InputType>(
-// controller: WritableStreamDefaultController<InputType>,
-// chunk: InputType,
-// chunkSize: number
-// ): void {
-// try {
-// q.enqueueValueWithSize(controller, { chunk }, chunkSize);
-// } catch (error) {
-// writableStreamDefaultControllerErrorIfNeeded(controller, error);
-// return;
-// }
-// const stream = controller[controlledWritableStream_];
-// if (
-// !writableStreamCloseQueuedOrInFlight(stream) &&
-// stream[shared.state_] === "writable"
-// ) {
-// const backpressure = writableStreamDefaultControllerGetBackpressure(
-// controller
-// );
-// writableStreamUpdateBackpressure(stream, backpressure);
-// }
-// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
-// }
-
-// export function writableStreamDefaultControllerAdvanceQueueIfNeeded<InputType>(
-// controller: WritableStreamDefaultController<InputType>
-// ): void {
-// if (!controller[started_]) {
-// return;
-// }
-// const stream = controller[controlledWritableStream_];
-// if (stream[inFlightWriteRequest_] !== undefined) {
-// return;
-// }
-// const state = stream[shared.state_];
-// if (state === "closed" || state === "errored") {
-// return;
-// }
-// if (state === "erroring") {
-// writableStreamFinishErroring(stream);
-// return;
-// }
-// if (controller[q.queue_].length === 0) {
-// return;
-// }
-// const writeRecord = q.peekQueueValue(controller);
-// if (writeRecord === "close") {
-// writableStreamDefaultControllerProcessClose(controller);
-// } else {
-// writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk);
-// }
-// }
-
-// export function writableStreamDefaultControllerErrorIfNeeded<InputType>(
-// controller: WritableStreamDefaultController<InputType>,
-// error: shared.ErrorResult
-// ): void {
-// if (controller[controlledWritableStream_][shared.state_] === "writable") {
-// writableStreamDefaultControllerError(controller, error);
-// }
-// }
-
-// export function writableStreamDefaultControllerProcessClose<InputType>(
-// controller: WritableStreamDefaultController<InputType>
-// ): void {
-// const stream = controller[controlledWritableStream_];
-// writableStreamMarkCloseRequestInFlight(stream);
-// q.dequeueValue(controller);
-// // Assert: controller.[[queue]] is empty.
-// const sinkClosePromise = controller[closeAlgorithm_]();
-// writableStreamDefaultControllerClearAlgorithms(controller);
-// sinkClosePromise.then(
-// _ => {
-// writableStreamFinishInFlightClose(stream);
-// },
-// error => {
-// writableStreamFinishInFlightCloseWithError(stream, error);
-// }
-// );
-// }
-
-// export function writableStreamDefaultControllerProcessWrite<InputType>(
-// controller: WritableStreamDefaultController<InputType>,
-// chunk: InputType
-// ): void {
-// const stream = controller[controlledWritableStream_];
-// writableStreamMarkFirstWriteRequestInFlight(stream);
-// controller[writeAlgorithm_](chunk).then(
-// _ => {
-// writableStreamFinishInFlightWrite(stream);
-// const state = stream[shared.state_];
-// // Assert: state is "writable" or "erroring".
-// q.dequeueValue(controller);
-// if (
-// !writableStreamCloseQueuedOrInFlight(stream) &&
-// state === "writable"
-// ) {
-// const backpressure = writableStreamDefaultControllerGetBackpressure(
-// controller
-// );
-// writableStreamUpdateBackpressure(stream, backpressure);
-// }
-// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
-// },
-// error => {
-// if (stream[shared.state_] === "writable") {
-// writableStreamDefaultControllerClearAlgorithms(controller);
-// }
-// writableStreamFinishInFlightWriteWithError(stream, error);
-// }
-// );
-// }
-
-// export function writableStreamDefaultControllerGetBackpressure<InputType>(
-// controller: WritableStreamDefaultController<InputType>
-// ): boolean {
-// const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller);
-// return desiredSize <= 0;
-// }
-
-// export function writableStreamDefaultControllerError<InputType>(
-// controller: WritableStreamDefaultController<InputType>,
-// error: shared.ErrorResult
-// ): void {
-// const stream = controller[controlledWritableStream_];
-// // Assert: stream.[[state]] is "writable".
-// writableStreamDefaultControllerClearAlgorithms(controller);
-// writableStreamStartErroring(stream, error);
-// }
diff --git a/cli/js/web/streams/writable-stream-default-controller.ts b/cli/js/web/streams/writable-stream-default-controller.ts
deleted file mode 100644
index 181edede8..000000000
--- a/cli/js/web/streams/writable-stream-default-controller.ts
+++ /dev/null
@@ -1,101 +0,0 @@
-// TODO reenable this code when we enable writableStreams and transport types
-// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-// /**
-// * streams/writable-stream-default-controller - WritableStreamDefaultController class implementation
-// * Part of Stardazed
-// * (c) 2018-Present by Arthur Langereis - @zenmumbler
-// * https://github.com/stardazed/sd-streams
-// */
-
-// /* eslint-disable @typescript-eslint/no-explicit-any */
-// // TODO reenable this lint here
-
-// import * as ws from "./writable-internals.ts";
-// import * as shared from "./shared-internals.ts";
-// import * as q from "./queue-mixin.ts";
-// import { Queue } from "./queue.ts";
-// import { QueuingStrategySizeCallback } from "../dom_types.d.ts";
-
-// export class WritableStreamDefaultController<InputType>
-// implements ws.WritableStreamDefaultController<InputType> {
-// [ws.abortAlgorithm_]: ws.AbortAlgorithm;
-// [ws.closeAlgorithm_]: ws.CloseAlgorithm;
-// [ws.controlledWritableStream_]: ws.WritableStream<InputType>;
-// [ws.started_]: boolean;
-// [ws.strategyHWM_]: number;
-// [ws.strategySizeAlgorithm_]: QueuingStrategySizeCallback<InputType>;
-// [ws.writeAlgorithm_]: ws.WriteAlgorithm<InputType>;
-
-// [q.queue_]: Queue<q.QueueElement<ws.WriteRecord<InputType> | "close">>;
-// [q.queueTotalSize_]: number;
-
-// constructor() {
-// throw new TypeError();
-// }
-
-// error(e?: shared.ErrorResult): void {
-// if (!ws.isWritableStreamDefaultController(this)) {
-// throw new TypeError();
-// }
-// const state = this[ws.controlledWritableStream_][shared.state_];
-// if (state !== "writable") {
-// return;
-// }
-// ws.writableStreamDefaultControllerError(this, e);
-// }
-
-// [ws.abortSteps_](reason: shared.ErrorResult): Promise<void> {
-// const result = this[ws.abortAlgorithm_](reason);
-// ws.writableStreamDefaultControllerClearAlgorithms(this);
-// return result;
-// }
-
-// [ws.errorSteps_](): void {
-// q.resetQueue(this);
-// }
-// }
-
-// export function setUpWritableStreamDefaultControllerFromUnderlyingSink<
-// InputType
-// >(
-// stream: ws.WritableStream<InputType>,
-// underlyingSink: ws.WritableStreamSink<InputType>,
-// highWaterMark: number,
-// sizeAlgorithm: QueuingStrategySizeCallback<InputType>
-// ): void {
-// // Assert: underlyingSink is not undefined.
-// const controller = Object.create(
-// WritableStreamDefaultController.prototype
-// ) as WritableStreamDefaultController<InputType>;
-
-// const startAlgorithm = function(): any {
-// return shared.invokeOrNoop(underlyingSink, "start", [controller]);
-// };
-// const writeAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
-// underlyingSink,
-// "write",
-// [controller]
-// );
-// const closeAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
-// underlyingSink,
-// "close",
-// []
-// );
-// const abortAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
-// underlyingSink,
-// "abort",
-// []
-// );
-// ws.setUpWritableStreamDefaultController(
-// stream,
-// controller,
-// startAlgorithm,
-// writeAlgorithm,
-// closeAlgorithm,
-// abortAlgorithm,
-// highWaterMark,
-// sizeAlgorithm
-// );
-// }
diff --git a/cli/js/web/streams/writable-stream-default-writer.ts b/cli/js/web/streams/writable-stream-default-writer.ts
deleted file mode 100644
index f38aa26bb..000000000
--- a/cli/js/web/streams/writable-stream-default-writer.ts
+++ /dev/null
@@ -1,136 +0,0 @@
-// TODO reenable this code when we enable writableStreams and transport types
-// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-// /**
-// * streams/writable-stream-default-writer - WritableStreamDefaultWriter class implementation
-// * Part of Stardazed
-// * (c) 2018-Present by Arthur Langereis - @zenmumbler
-// * https://github.com/stardazed/sd-streams
-// */
-
-// import * as ws from "./writable-internals.ts";
-// import * as shared from "./shared-internals.ts";
-
-// export class WritableStreamDefaultWriter<InputType>
-// implements ws.WritableStreamDefaultWriter<InputType> {
-// [ws.ownerWritableStream_]: ws.WritableStream<InputType> | undefined;
-// [ws.readyPromise_]: shared.ControlledPromise<void>;
-// [ws.closedPromise_]: shared.ControlledPromise<void>;
-
-// constructor(stream: ws.WritableStream<InputType>) {
-// if (!ws.isWritableStream(stream)) {
-// throw new TypeError();
-// }
-// if (ws.isWritableStreamLocked(stream)) {
-// throw new TypeError("Stream is already locked");
-// }
-// this[ws.ownerWritableStream_] = stream;
-// stream[ws.writer_] = this;
-
-// const readyPromise = shared.createControlledPromise<void>();
-// const closedPromise = shared.createControlledPromise<void>();
-// this[ws.readyPromise_] = readyPromise;
-// this[ws.closedPromise_] = closedPromise;
-
-// const state = stream[shared.state_];
-// if (state === "writable") {
-// if (
-// !ws.writableStreamCloseQueuedOrInFlight(stream) &&
-// stream[ws.backpressure_]
-// ) {
-// // OK Set this.[[readyPromise]] to a new promise.
-// } else {
-// readyPromise.resolve(undefined);
-// }
-// // OK Set this.[[closedPromise]] to a new promise.
-// } else if (state === "erroring") {
-// readyPromise.reject(stream[shared.storedError_]);
-// readyPromise.promise.catch(() => {});
-// // OK Set this.[[closedPromise]] to a new promise.
-// } else if (state === "closed") {
-// readyPromise.resolve(undefined);
-// closedPromise.resolve(undefined);
-// } else {
-// // Assert: state is "errored".
-// const storedError = stream[shared.storedError_];
-// readyPromise.reject(storedError);
-// readyPromise.promise.catch(() => {});
-// closedPromise.reject(storedError);
-// closedPromise.promise.catch(() => {});
-// }
-// }
-
-// abort(reason: shared.ErrorResult): Promise<void> {
-// if (!ws.isWritableStreamDefaultWriter(this)) {
-// return Promise.reject(new TypeError());
-// }
-// if (this[ws.ownerWritableStream_] === undefined) {
-// return Promise.reject(
-// new TypeError("Writer is not connected to a stream")
-// );
-// }
-// return ws.writableStreamDefaultWriterAbort(this, reason);
-// }
-
-// close(): Promise<void> {
-// if (!ws.isWritableStreamDefaultWriter(this)) {
-// return Promise.reject(new TypeError());
-// }
-// const stream = this[ws.ownerWritableStream_];
-// if (stream === undefined) {
-// return Promise.reject(
-// new TypeError("Writer is not connected to a stream")
-// );
-// }
-// if (ws.writableStreamCloseQueuedOrInFlight(stream)) {
-// return Promise.reject(new TypeError());
-// }
-// return ws.writableStreamDefaultWriterClose(this);
-// }
-
-// releaseLock(): void {
-// const stream = this[ws.ownerWritableStream_];
-// if (stream === undefined) {
-// return;
-// }
-// // Assert: stream.[[writer]] is not undefined.
-// ws.writableStreamDefaultWriterRelease(this);
-// }
-
-// write(chunk: InputType): Promise<void> {
-// if (!ws.isWritableStreamDefaultWriter(this)) {
-// return Promise.reject(new TypeError());
-// }
-// if (this[ws.ownerWritableStream_] === undefined) {
-// return Promise.reject(
-// new TypeError("Writer is not connected to a stream")
-// );
-// }
-// return ws.writableStreamDefaultWriterWrite(this, chunk);
-// }
-
-// get closed(): Promise<void> {
-// if (!ws.isWritableStreamDefaultWriter(this)) {
-// return Promise.reject(new TypeError());
-// }
-// return this[ws.closedPromise_].promise;
-// }
-
-// get desiredSize(): number | null {
-// if (!ws.isWritableStreamDefaultWriter(this)) {
-// throw new TypeError();
-// }
-// if (this[ws.ownerWritableStream_] === undefined) {
-// throw new TypeError("Writer is not connected to stream");
-// }
-// return ws.writableStreamDefaultWriterGetDesiredSize(this);
-// }
-
-// get ready(): Promise<void> {
-// if (!ws.isWritableStreamDefaultWriter(this)) {
-// return Promise.reject(new TypeError());
-// }
-// return this[ws.readyPromise_].promise;
-// }
-// }
diff --git a/cli/js/web/streams/writable-stream.ts b/cli/js/web/streams/writable-stream.ts
deleted file mode 100644
index f231d78dc..000000000
--- a/cli/js/web/streams/writable-stream.ts
+++ /dev/null
@@ -1,118 +0,0 @@
-// TODO reenable this code when we enable writableStreams and transport types
-// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
-// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
-
-// /**
-// * streams/writable-stream - WritableStream class implementation
-// * Part of Stardazed
-// * (c) 2018-Present by Arthur Langereis - @zenmumbler
-// * https://github.com/stardazed/sd-streams
-// */
-
-// import * as ws from "./writable-internals.ts";
-// import * as shared from "./shared-internals.ts";
-// import {
-// WritableStreamDefaultController,
-// setUpWritableStreamDefaultControllerFromUnderlyingSink
-// } from "./writable-stream-default-controller.ts";
-// import { WritableStreamDefaultWriter } from "./writable-stream-default-writer.ts";
-// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.d.ts";
-
-// export class WritableStream<InputType> {
-// [shared.state_]: ws.WritableStreamState;
-// [shared.storedError_]: shared.ErrorResult;
-// [ws.backpressure_]: boolean;
-// [ws.closeRequest_]: shared.ControlledPromise<void> | undefined;
-// [ws.inFlightWriteRequest_]: shared.ControlledPromise<void> | undefined;
-// [ws.inFlightCloseRequest_]: shared.ControlledPromise<void> | undefined;
-// [ws.pendingAbortRequest_]: ws.AbortRequest | undefined;
-// [ws.writableStreamController_]:
-// | ws.WritableStreamDefaultController<InputType>
-// | undefined;
-// [ws.writer_]: ws.WritableStreamDefaultWriter<InputType> | undefined;
-// [ws.writeRequests_]: Array<shared.ControlledPromise<void>>;
-
-// constructor(
-// sink: ws.WritableStreamSink<InputType> = {},
-// strategy: QueuingStrategy<InputType> = {}
-// ) {
-// ws.initializeWritableStream(this);
-// const sizeFunc = strategy.size;
-// const stratHWM = strategy.highWaterMark;
-// if (sink.type !== undefined) {
-// throw new RangeError("The type of an underlying sink must be undefined");
-// }
-
-// const sizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(sizeFunc);
-// const highWaterMark = shared.validateAndNormalizeHighWaterMark(
-// stratHWM === undefined ? 1 : stratHWM
-// );
-
-// setUpWritableStreamDefaultControllerFromUnderlyingSink(
-// this,
-// sink,
-// highWaterMark,
-// sizeAlgorithm
-// );
-// }
-
-// get locked(): boolean {
-// if (!ws.isWritableStream(this)) {
-// throw new TypeError();
-// }
-// return ws.isWritableStreamLocked(this);
-// }
-
-// abort(reason?: shared.ErrorResult): Promise<void> {
-// if (!ws.isWritableStream(this)) {
-// return Promise.reject(new TypeError());
-// }
-// if (ws.isWritableStreamLocked(this)) {
-// return Promise.reject(new TypeError("Cannot abort a locked stream"));
-// }
-// return ws.writableStreamAbort(this, reason);
-// }
-
-// getWriter(): ws.WritableStreamWriter<InputType> {
-// if (!ws.isWritableStream(this)) {
-// throw new TypeError();
-// }
-// return new WritableStreamDefaultWriter(this);
-// }
-// }
-
-// export function createWritableStream<InputType>(
-// startAlgorithm: ws.StartAlgorithm,
-// writeAlgorithm: ws.WriteAlgorithm<InputType>,
-// closeAlgorithm: ws.CloseAlgorithm,
-// abortAlgorithm: ws.AbortAlgorithm,
-// highWaterMark?: number,
-// sizeAlgorithm?: QueuingStrategySizeCallback<InputType>
-// ): WritableStream<InputType> {
-// if (highWaterMark === undefined) {
-// highWaterMark = 1;
-// }
-// if (sizeAlgorithm === undefined) {
-// sizeAlgorithm = (): number => 1;
-// }
-// // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
-
-// const stream = Object.create(WritableStream.prototype) as WritableStream<
-// InputType
-// >;
-// ws.initializeWritableStream(stream);
-// const controller = Object.create(
-// WritableStreamDefaultController.prototype
-// ) as WritableStreamDefaultController<InputType>;
-// ws.setUpWritableStreamDefaultController(
-// stream,
-// controller,
-// startAlgorithm,
-// writeAlgorithm,
-// closeAlgorithm,
-// abortAlgorithm,
-// highWaterMark,
-// sizeAlgorithm
-// );
-// return stream;
-// }
diff --git a/cli/js/web/util.ts b/cli/js/web/util.ts
index 32e73c443..824d00d4b 100644
--- a/cli/js/web/util.ts
+++ b/cli/js/web/util.ts
@@ -1,5 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+import { DOMExceptionImpl as DOMException } from "./dom_exception.ts";
+
export type TypedArray =
| Int8Array
| Uint8Array
@@ -13,17 +15,7 @@ export type TypedArray =
// @internal
export function isTypedArray(x: unknown): x is TypedArray {
- return (
- x instanceof Int8Array ||
- x instanceof Uint8Array ||
- x instanceof Uint8ClampedArray ||
- x instanceof Int16Array ||
- x instanceof Uint16Array ||
- x instanceof Int32Array ||
- x instanceof Uint32Array ||
- x instanceof Float32Array ||
- x instanceof Float64Array
- );
+ return ArrayBuffer.isView(x) && !(x instanceof DataView);
}
// @internal
@@ -78,6 +70,105 @@ export function isIterable<T, P extends keyof T, K extends T[P]>(
);
}
+const objectCloneMemo = new WeakMap();
+
+function cloneArrayBuffer(
+ srcBuffer: ArrayBufferLike,
+ srcByteOffset: number,
+ srcLength: number,
+ cloneConstructor: ArrayBufferConstructor | SharedArrayBufferConstructor
+): InstanceType<typeof cloneConstructor> {
+ // this function fudges the return type but SharedArrayBuffer is disabled for a while anyway
+ return srcBuffer.slice(
+ srcByteOffset,
+ srcByteOffset + srcLength
+ ) as InstanceType<typeof cloneConstructor>;
+}
+
+/** Clone a value in a similar way to structured cloning. It is similar to a
+ * StructureDeserialize(StructuredSerialize(...)). */
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export function cloneValue(value: any): any {
+ switch (typeof value) {
+ case "number":
+ case "string":
+ case "boolean":
+ case "undefined":
+ case "bigint":
+ return value;
+ case "object": {
+ if (objectCloneMemo.has(value)) {
+ return objectCloneMemo.get(value);
+ }
+ if (value === null) {
+ return value;
+ }
+ if (value instanceof Date) {
+ return new Date(value.valueOf());
+ }
+ if (value instanceof RegExp) {
+ return new RegExp(value);
+ }
+ if (value instanceof SharedArrayBuffer) {
+ return value;
+ }
+ if (value instanceof ArrayBuffer) {
+ const cloned = cloneArrayBuffer(
+ value,
+ 0,
+ value.byteLength,
+ ArrayBuffer
+ );
+ objectCloneMemo.set(value, cloned);
+ return cloned;
+ }
+ if (ArrayBuffer.isView(value)) {
+ const clonedBuffer = cloneValue(value.buffer) as ArrayBufferLike;
+ // Use DataViewConstructor type purely for type-checking, can be a
+ // DataView or TypedArray. They use the same constructor signature,
+ // only DataView has a length in bytes and TypedArrays use a length in
+ // terms of elements, so we adjust for that.
+ let length: number;
+ if (value instanceof DataView) {
+ length = value.byteLength;
+ } else {
+ length = (value as Uint8Array).length;
+ }
+ return new (value.constructor as DataViewConstructor)(
+ clonedBuffer,
+ value.byteOffset,
+ length
+ );
+ }
+ if (value instanceof Map) {
+ const clonedMap = new Map();
+ objectCloneMemo.set(value, clonedMap);
+ value.forEach((v, k) => clonedMap.set(k, cloneValue(v)));
+ return clonedMap;
+ }
+ if (value instanceof Set) {
+ const clonedSet = new Map();
+ objectCloneMemo.set(value, clonedSet);
+ value.forEach((v, k) => clonedSet.set(k, cloneValue(v)));
+ return clonedSet;
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ const clonedObj = {} as Record<string, any>;
+ objectCloneMemo.set(value, clonedObj);
+ const sourceKeys = Object.getOwnPropertyNames(value);
+ for (const key of sourceKeys) {
+ clonedObj[key] = cloneValue(value[key]);
+ }
+ return clonedObj;
+ }
+ case "symbol":
+ case "function":
+ default:
+ throw new DOMException("Uncloneable value in stream", "DataCloneError");
+ }
+}
+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
interface GenericConstructor<T = any> {
prototype: T;