summaryrefslogtreecommitdiff
path: root/cli/js
diff options
context:
space:
mode:
authorNick Stott <nick@nickstott.com>2019-10-28 12:41:36 -0400
committerRy Dahl <ry@tinyclouds.org>2019-10-28 12:41:36 -0400
commit65d9286203cf239f68c6015818e82e8521e600a1 (patch)
tree0af1a7be449036f2f4ae9d3ecf06b7d645c8bddc /cli/js
parent967c236fa5fb1e87e1b5ee788fe77d3a07361da1 (diff)
Re-enable basic stream support for fetch bodies (#3192)
* Add sd-streams from https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/ * change the interfaces in dom_types to match what sd-streams expects
Diffstat (limited to 'cli/js')
-rw-r--r--cli/js/body.ts77
-rw-r--r--cli/js/dom_types.ts140
-rw-r--r--cli/js/errors.ts5
-rw-r--r--cli/js/globals.ts1
-rw-r--r--cli/js/request.ts10
-rw-r--r--cli/js/request_test.ts34
-rw-r--r--cli/js/streams/mod.ts20
-rw-r--r--cli/js/streams/pipe-to.ts237
-rw-r--r--cli/js/streams/queue-mixin.ts84
-rw-r--r--cli/js/streams/queue.ts65
-rw-r--r--cli/js/streams/readable-byte-stream-controller.ts214
-rw-r--r--cli/js/streams/readable-internals.ts1357
-rw-r--r--cli/js/streams/readable-stream-byob-reader.ts93
-rw-r--r--cli/js/streams/readable-stream-byob-request.ts60
-rw-r--r--cli/js/streams/readable-stream-default-controller.ts139
-rw-r--r--cli/js/streams/readable-stream-default-reader.ts75
-rw-r--r--cli/js/streams/readable-stream.ts387
-rw-r--r--cli/js/streams/shared-internals.ts310
-rw-r--r--cli/js/streams/strategies.ts39
-rw-r--r--cli/js/streams/transform-internals.ts371
-rw-r--r--cli/js/streams/transform-stream-default-controller.ts58
-rw-r--r--cli/js/streams/transform-stream.ts147
-rw-r--r--cli/js/streams/writable-internals.ts800
-rw-r--r--cli/js/streams/writable-stream-default-controller.ts101
-rw-r--r--cli/js/streams/writable-stream-default-writer.ts136
-rw-r--r--cli/js/streams/writable-stream.ts118
26 files changed, 5059 insertions, 19 deletions
diff --git a/cli/js/body.ts b/cli/js/body.ts
index 6567b1934..e00cd30b9 100644
--- a/cli/js/body.ts
+++ b/cli/js/body.ts
@@ -3,6 +3,7 @@ import * as blob from "./blob.ts";
import * as encoding from "./text_encoding.ts";
import * as headers from "./headers.ts";
import * as domTypes from "./dom_types.ts";
+import { ReadableStream } from "./streams/mod.ts";
const { Headers } = headers;
@@ -12,6 +13,13 @@ const { TextEncoder, TextDecoder } = encoding;
const Blob = blob.DenoBlob;
const DenoBlob = blob.DenoBlob;
+type ReadableStreamReader = domTypes.ReadableStreamReader;
+
+interface ReadableStreamController {
+ enqueue(chunk: string | ArrayBuffer): void;
+ close(): void;
+}
+
export type BodySource =
| domTypes.Blob
| domTypes.BufferSource
@@ -37,6 +45,8 @@ function validateBodyType(owner: Body, bodySource: BodySource): boolean {
return true;
} else if (typeof bodySource === "string") {
return true;
+ } else if (bodySource instanceof ReadableStream) {
+ return true;
} else if (bodySource instanceof FormData) {
return true;
} else if (!bodySource) {
@@ -47,6 +57,58 @@ function validateBodyType(owner: Body, bodySource: BodySource): boolean {
);
}
+function concatenate(...arrays: Uint8Array[]): ArrayBuffer {
+ let totalLength = 0;
+ for (const arr of arrays) {
+ totalLength += arr.length;
+ }
+ const result = new Uint8Array(totalLength);
+ let offset = 0;
+ for (const arr of arrays) {
+ result.set(arr, offset);
+ offset += arr.length;
+ }
+ return result.buffer as ArrayBuffer;
+}
+
+function bufferFromStream(stream: ReadableStreamReader): Promise<ArrayBuffer> {
+ return new Promise(
+ (resolve, reject): void => {
+ const parts: Uint8Array[] = [];
+ const encoder = new TextEncoder();
+ // recurse
+ (function pump(): void {
+ stream
+ .read()
+ .then(
+ ({ done, value }): void => {
+ if (done) {
+ return resolve(concatenate(...parts));
+ }
+
+ if (typeof value === "string") {
+ parts.push(encoder.encode(value));
+ } else if (value instanceof ArrayBuffer) {
+ parts.push(new Uint8Array(value));
+ } else if (!value) {
+ // noop for undefined
+ } else {
+ reject("unhandled type on stream read");
+ }
+
+ return pump();
+ }
+ )
+ .catch(
+ (err): void => {
+ reject(err);
+ }
+ );
+ })();
+ }
+ );
+}
+
function getHeaderValueParams(value: string): Map<string, string> {
const params = new Map();
// Forced to do so for some Map constructor param mismatch
@@ -81,8 +143,18 @@ export class Body implements domTypes.Body {
if (this._stream) {
return this._stream;
}
+
+ if (this._bodySource instanceof ReadableStream) {
+ // @ts-ignore
+ this._stream = this._bodySource;
+ }
if (typeof this._bodySource === "string") {
- throw Error("not implemented");
+ this._stream = new ReadableStream({
+ start(controller: ReadableStreamController): void {
+ controller.enqueue(this._bodySource);
+ controller.close();
+ }
+ });
}
return this._stream;
}
@@ -259,6 +331,9 @@ export class Body implements domTypes.Body {
} else if (typeof this._bodySource === "string") {
const enc = new TextEncoder();
return enc.encode(this._bodySource).buffer as ArrayBuffer;
+ } else if (this._bodySource instanceof ReadableStream) {
+ // @ts-ignore
+ return bufferFromStream(this._bodySource.getReader());
} else if (this._bodySource instanceof FormData) {
const enc = new TextEncoder();
return enc.encode(this._bodySource.toString()).buffer as ArrayBuffer;
diff --git a/cli/js/dom_types.ts b/cli/js/dom_types.ts
index 308505cf5..0b654d750 100644
--- a/cli/js/dom_types.ts
+++ b/cli/js/dom_types.ts
@@ -248,7 +248,7 @@ export interface AddEventListenerOptions extends EventListenerOptions {
passive: boolean;
}
-interface AbortSignal extends EventTarget {
+export interface AbortSignal extends EventTarget {
readonly aborted: boolean;
onabort: ((this: AbortSignal, ev: ProgressEvent) => any) | null;
addEventListener<K extends keyof AbortSignalEventMap>(
@@ -273,19 +273,6 @@ interface AbortSignal extends EventTarget {
): void;
}
-export interface ReadableStream {
- readonly locked: boolean;
- cancel(): Promise<void>;
- getReader(): ReadableStreamReader;
- tee(): [ReadableStream, ReadableStream];
-}
-
-export interface ReadableStreamReader {
- cancel(): Promise<void>;
- read(): Promise<any>;
- releaseLock(): void;
-}
-
export interface FormData extends DomIterable<string, FormDataEntryValue> {
append(name: string, value: string | Blob, fileName?: string): void;
delete(name: string): void;
@@ -343,6 +330,131 @@ export interface Body {
text(): Promise<string>;
}
+export interface ReadableStream {
+ readonly locked: boolean;
+ cancel(reason?: any): Promise<void>;
+ getReader(): ReadableStreamReader;
+ tee(): ReadableStream[];
+}
+
+export interface UnderlyingSource<R = any> {
+ cancel?: ReadableStreamErrorCallback;
+ pull?: ReadableStreamDefaultControllerCallback<R>;
+ start?: ReadableStreamDefaultControllerCallback<R>;
+ type?: undefined;
+}
+
+export interface UnderlyingByteSource {
+ autoAllocateChunkSize?: number;
+ cancel?: ReadableStreamErrorCallback;
+ pull?: ReadableByteStreamControllerCallback;
+ start?: ReadableByteStreamControllerCallback;
+ type: "bytes";
+}
+
+export interface ReadableStreamReader {
+ cancel(reason?: any): Promise<void>;
+ read(): Promise<any>;
+ releaseLock(): void;
+}
+
+export interface ReadableStreamErrorCallback {
+ (reason: any): void | PromiseLike<void>;
+}
+
+export interface ReadableByteStreamControllerCallback {
+ (controller: ReadableByteStreamController): void | PromiseLike<void>;
+}
+
+export interface ReadableStreamDefaultControllerCallback<R> {
+ (controller: ReadableStreamDefaultController<R>): void | PromiseLike<void>;
+}
+
+export interface ReadableStreamDefaultController<R = any> {
+ readonly desiredSize: number | null;
+ close(): void;
+ enqueue(chunk: R): void;
+ error(error?: any): void;
+}
+
+export interface ReadableByteStreamController {
+ readonly byobRequest: ReadableStreamBYOBRequest | undefined;
+ readonly desiredSize: number | null;
+ close(): void;
+ enqueue(chunk: ArrayBufferView): void;
+ error(error?: any): void;
+}
+
+export interface ReadableStreamBYOBRequest {
+ readonly view: ArrayBufferView;
+ respond(bytesWritten: number): void;
+ respondWithNewView(view: ArrayBufferView): void;
+}
+/* TODO reenable these interfaces. These are needed to enable WritableStreams in js/streams/
+export interface WritableStream<W = any> {
+ readonly locked: boolean;
+ abort(reason?: any): Promise<void>;
+ getWriter(): WritableStreamDefaultWriter<W>;
+}
+
+TODO reenable these interfaces. These are needed to enable WritableStreams in js/streams/
+export interface UnderlyingSink<W = any> {
+ abort?: WritableStreamErrorCallback;
+ close?: WritableStreamDefaultControllerCloseCallback;
+ start?: WritableStreamDefaultControllerStartCallback;
+ type?: undefined;
+ write?: WritableStreamDefaultControllerWriteCallback<W>;
+}
+
+export interface PipeOptions {
+ preventAbort?: boolean;
+ preventCancel?: boolean;
+ preventClose?: boolean;
+ signal?: AbortSignal;
+}
+
+
+export interface WritableStreamDefaultWriter<W = any> {
+ readonly closed: Promise<void>;
+ readonly desiredSize: number | null;
+ readonly ready: Promise<void>;
+ abort(reason?: any): Promise<void>;
+ close(): Promise<void>;
+ releaseLock(): void;
+ write(chunk: W): Promise<void>;
+}
+
+export interface WritableStreamErrorCallback {
+ (reason: any): void | PromiseLike<void>;
+}
+
+export interface WritableStreamDefaultControllerCloseCallback {
+ (): void | PromiseLike<void>;
+}
+
+export interface WritableStreamDefaultControllerStartCallback {
+ (controller: WritableStreamDefaultController): void | PromiseLike<void>;
+}
+
+export interface WritableStreamDefaultControllerWriteCallback<W> {
+ (chunk: W, controller: WritableStreamDefaultController): void | PromiseLike<
+ void
+ >;
+}
+
+export interface WritableStreamDefaultController {
+ error(error?: any): void;
+}
+*/
+export interface QueuingStrategy<T = any> {
+ highWaterMark?: number;
+ size?: QueuingStrategySizeCallback<T>;
+}
+
+export interface QueuingStrategySizeCallback<T = any> {
+ (chunk: T): number;
+}
+
export interface Headers extends DomIterable<string, string> {
/** Appends a new value onto an existing header inside a `Headers` object, or
* adds the header if it does not already exist.
diff --git a/cli/js/errors.ts b/cli/js/errors.ts
index 8cd7a76be..286a004e4 100644
--- a/cli/js/errors.ts
+++ b/cli/js/errors.ts
@@ -76,5 +76,8 @@ export enum ErrorKind {
TooManyRedirects = 48,
Diagnostic = 49,
JSError = 50,
- TypeError = 51
+ TypeError = 51,
+
+ /** TODO this is a DomException type, and should be moved out of here when possible */
+ DataCloneError = 52
}
diff --git a/cli/js/globals.ts b/cli/js/globals.ts
index b734b8da3..93f47ff1b 100644
--- a/cli/js/globals.ts
+++ b/cli/js/globals.ts
@@ -26,7 +26,6 @@ import * as url from "./url.ts";
import * as urlSearchParams from "./url_search_params.ts";
import * as workers from "./workers.ts";
import * as performanceUtil from "./performance.ts";
-
import * as request from "./request.ts";
// These imports are not exposed and therefore are fine to just import the
diff --git a/cli/js/request.ts b/cli/js/request.ts
index 0c77b8854..345792c5c 100644
--- a/cli/js/request.ts
+++ b/cli/js/request.ts
@@ -2,8 +2,10 @@
import * as headers from "./headers.ts";
import * as body from "./body.ts";
import * as domTypes from "./dom_types.ts";
+import * as streams from "./streams/mod.ts";
const { Headers } = headers;
+const { ReadableStream } = streams;
function byteUpperCase(s: string): string {
return String(s).replace(/[a-z]/g, function byteUpperCaseReplace(c): string {
@@ -138,7 +140,13 @@ export class Request extends body.Body implements domTypes.Request {
headersList.push(header);
}
- const body2 = this._bodySource;
+ let body2 = this._bodySource;
+
+ if (this._bodySource instanceof ReadableStream) {
+ const tees = (this._bodySource as domTypes.ReadableStream).tee();
+ this._stream = this._bodySource = tees[0];
+ body2 = tees[1];
+ }
const cloned = new Request(this.url, {
body: body2,
diff --git a/cli/js/request_test.ts b/cli/js/request_test.ts
index e9e1f5164..3daca8f5a 100644
--- a/cli/js/request_test.ts
+++ b/cli/js/request_test.ts
@@ -1,5 +1,5 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-import { test, assertEquals } from "./test_util.ts";
+import { test, assert, assertEquals } from "./test_util.ts";
test(function fromInit(): void {
const req = new Request("https://example.com", {
@@ -15,3 +15,35 @@ test(function fromInit(): void {
assertEquals(req.url, "https://example.com");
assertEquals(req.headers.get("test-header"), "value");
});
+
+test(function fromRequest(): void {
+ const r = new Request("https://example.com");
+ // @ts-ignore
+ r._bodySource = "ahoyhoy";
+ r.headers.set("test-header", "value");
+
+ const req = new Request(r);
+
+ // @ts-ignore
+ assertEquals(req._bodySource, r._bodySource);
+ assertEquals(req.url, r.url);
+ assertEquals(req.headers.get("test-header"), r.headers.get("test-header"));
+});
+
+test(async function cloneRequestBodyStream(): Promise<void> {
+ // hack to get a stream
+ const stream = new Request("", { body: "a test body" }).body;
+ const r1 = new Request("https://example.com", {
+ body: stream
+ });
+
+ const r2 = r1.clone();
+
+ const b1 = await r1.text();
+ const b2 = await r2.text();
+
+ assertEquals(b1, b2);
+
+ // @ts-ignore
+ assert(r1._bodySource !== r2._bodySource);
+});
diff --git a/cli/js/streams/mod.ts b/cli/js/streams/mod.ts
new file mode 100644
index 000000000..5389aaf6d
--- /dev/null
+++ b/cli/js/streams/mod.ts
@@ -0,0 +1,20 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * @stardazed/streams - implementation of the web streams standard
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+export { SDReadableStream as ReadableStream } from "./readable-stream.ts";
+/* TODO The following are currently unused so not exported for clarity.
+export { WritableStream } from "./writable-stream.ts";
+
+export { TransformStream } from "./transform-stream.ts";
+export {
+ ByteLengthQueuingStrategy,
+ CountQueuingStrategy
+} from "./strategies.ts";
+*/
diff --git a/cli/js/streams/pipe-to.ts b/cli/js/streams/pipe-to.ts
new file mode 100644
index 000000000..3764e605b
--- /dev/null
+++ b/cli/js/streams/pipe-to.ts
@@ -0,0 +1,237 @@
+// TODO reenable this code when we enable writableStreams and transport types
+// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+// /**
+// * streams/pipe-to - pipeTo algorithm implementation
+// * Part of Stardazed
+// * (c) 2018-Present by Arthur Langereis - @zenmumbler
+// * https://github.com/stardazed/sd-streams
+// */
+
+// /* eslint-disable @typescript-eslint/no-explicit-any */
+// // TODO reenable this lint here
+
+// import * as rs from "./readable-internals.ts";
+// import * as ws from "./writable-internals.ts";
+// import * as shared from "./shared-internals.ts";
+
+// import { ReadableStreamDefaultReader } from "./readable-stream-default-reader.ts";
+// import { WritableStreamDefaultWriter } from "./writable-stream-default-writer.ts";
+// import { PipeOptions } from "../dom_types.ts";
+// import { DenoError, ErrorKind } from "../errors.ts";
+
+// // add a wrapper to handle falsy rejections
+// interface ErrorWrapper {
+// actualError: shared.ErrorResult;
+// }
+
+// export function pipeTo<ChunkType>(
+// source: rs.SDReadableStream<ChunkType>,
+// dest: ws.WritableStream<ChunkType>,
+// options: PipeOptions
+// ): Promise<void> {
+// const preventClose = !!options.preventClose;
+// const preventAbort = !!options.preventAbort;
+// const preventCancel = !!options.preventCancel;
+// const signal = options.signal;
+
+// let shuttingDown = false;
+// let latestWrite = Promise.resolve();
+// const promise = shared.createControlledPromise<void>();
+
+// // If IsReadableByteStreamController(this.[[readableStreamController]]) is true, let reader be either ! AcquireReadableStreamBYOBReader(this) or ! AcquireReadableStreamDefaultReader(this), at the user agent’s discretion.
+// // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(this).
+// const reader = new ReadableStreamDefaultReader(source);
+// const writer = new WritableStreamDefaultWriter(dest);
+
+// let abortAlgorithm: () => any;
+// if (signal !== undefined) {
+// abortAlgorithm = (): void => {
+// // TODO this should be a DOMException,
+// // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/pipe-to.ts#L38
+// const error = new DenoError(ErrorKind.AbortError, "Aborted");
+// const actions: Array<() => Promise<void>> = [];
+// if (preventAbort === false) {
+// actions.push(() => {
+// if (dest[shared.state_] === "writable") {
+// return ws.writableStreamAbort(dest, error);
+// }
+// return Promise.resolve();
+// });
+// }
+// if (preventCancel === false) {
+// actions.push(() => {
+// if (source[shared.state_] === "readable") {
+// return rs.readableStreamCancel(source, error);
+// }
+// return Promise.resolve();
+// });
+// }
+// shutDown(
+// () => {
+// return Promise.all(actions.map(a => a())).then(_ => undefined);
+// },
+// { actualError: error }
+// );
+// };
+
+// if (signal.aborted === true) {
+// abortAlgorithm();
+// } else {
+// signal.addEventListener("abort", abortAlgorithm);
+// }
+// }
+
+// function onStreamErrored(
+// stream: rs.SDReadableStream<ChunkType> | ws.WritableStream<ChunkType>,
+// promise: Promise<void>,
+// action: (error: shared.ErrorResult) => void
+// ): void {
+// if (stream[shared.state_] === "errored") {
+// action(stream[shared.storedError_]);
+// } else {
+// promise.catch(action);
+// }
+// }
+
+// function onStreamClosed(
+// stream: rs.SDReadableStream<ChunkType> | ws.WritableStream<ChunkType>,
+// promise: Promise<void>,
+// action: () => void
+// ): void {
+// if (stream[shared.state_] === "closed") {
+// action();
+// } else {
+// promise.then(action);
+// }
+// }
+
+// onStreamErrored(source, reader[rs.closedPromise_].promise, error => {
+// if (!preventAbort) {
+// shutDown(() => ws.writableStreamAbort(dest, error), {
+// actualError: error
+// });
+// } else {
+// shutDown(undefined, { actualError: error });
+// }
+// });
+
+// onStreamErrored(dest, writer[ws.closedPromise_].promise, error => {
+// if (!preventCancel) {
+// shutDown(() => rs.readableStreamCancel(source, error), {
+// actualError: error
+// });
+// } else {
+// shutDown(undefined, { actualError: error });
+// }
+// });
+
+// onStreamClosed(source, reader[rs.closedPromise_].promise, () => {
+// if (!preventClose) {
+// shutDown(() =>
+// ws.writableStreamDefaultWriterCloseWithErrorPropagation(writer)
+// );
+// } else {
+// shutDown();
+// }
+// });
+
+// if (
+// ws.writableStreamCloseQueuedOrInFlight(dest) ||
+// dest[shared.state_] === "closed"
+// ) {
+// // Assert: no chunks have been read or written.
+// const destClosed = new TypeError();
+// if (!preventCancel) {
+// shutDown(() => rs.readableStreamCancel(source, destClosed), {
+// actualError: destClosed
+// });
+// } else {
+// shutDown(undefined, { actualError: destClosed });
+// }
+// }
+
+// function awaitLatestWrite(): Promise<void> {
+// const curLatestWrite = latestWrite;
+// return latestWrite.then(() =>
+// curLatestWrite === latestWrite ? undefined : awaitLatestWrite()
+// );
+// }
+
+// function flushRemainder(): Promise<void> | undefined {
+// if (
+// dest[shared.state_] === "writable" &&
+// !ws.writableStreamCloseQueuedOrInFlight(dest)
+// ) {
+// return awaitLatestWrite();
+// } else {
+// return undefined;
+// }
+// }
+
+// function shutDown(action?: () => Promise<void>, error?: ErrorWrapper): void {
+// if (shuttingDown) {
+// return;
+// }
+// shuttingDown = true;
+
+// if (action === undefined) {
+// action = (): Promise<void> => Promise.resolve();
+// }
+
+// function finishShutDown(): void {
+// action!().then(
+// _ => finalize(error),
+// newError => finalize({ actualError: newError })
+// );
+// }
+
+// const flushWait = flushRemainder();
+// if (flushWait) {
+// flushWait.then(finishShutDown);
+// } else {
+// finishShutDown();
+// }
+// }
+
+// function finalize(error?: ErrorWrapper): void {
+// ws.writableStreamDefaultWriterRelease(writer);
+// rs.readableStreamReaderGenericRelease(reader);
+// if (signal && abortAlgorithm) {
+// signal.removeEventListener("abort", abortAlgorithm);
+// }
+// if (error) {
+// promise.reject(error.actualError);
+// } else {
+// promise.resolve(undefined);
+// }
+// }
+
+// function next(): Promise<void> | undefined {
+// if (shuttingDown) {
+// return;
+// }
+
+// writer[ws.readyPromise_].promise.then(() => {
+// rs.readableStreamDefaultReaderRead(reader).then(
+// ({ value, done }) => {
+// if (done) {
+// return;
+// }
+// latestWrite = ws
+// .writableStreamDefaultWriterWrite(writer, value!)
+// .catch(() => {});
+// next();
+// },
+// _error => {
+// latestWrite = Promise.resolve();
+// }
+// );
+// });
+// }
+
+// next();
+
+// return promise.promise;
+// }
diff --git a/cli/js/streams/queue-mixin.ts b/cli/js/streams/queue-mixin.ts
new file mode 100644
index 000000000..23c57d75f
--- /dev/null
+++ b/cli/js/streams/queue-mixin.ts
@@ -0,0 +1,84 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/queue-mixin - internal queue operations for stream controllers
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+/* eslint-disable @typescript-eslint/no-explicit-any */
+// TODO reenable this lint here
+
+import { Queue, QueueImpl } from "./queue.ts";
+import { isFiniteNonNegativeNumber } from "./shared-internals.ts";
+
+export const queue_ = Symbol("queue_");
+export const queueTotalSize_ = Symbol("queueTotalSize_");
+
+export interface QueueElement<V> {
+ value: V;
+ size: number;
+}
+
+export interface QueueContainer<V> {
+ [queue_]: Queue<QueueElement<V>>;
+ [queueTotalSize_]: number;
+}
+
+export interface ByteQueueContainer {
+ [queue_]: Queue<{
+ buffer: ArrayBufferLike;
+ byteOffset: number;
+ byteLength: number;
+ }>;
+ [queueTotalSize_]: number;
+}
+
+export function dequeueValue<V>(container: QueueContainer<V>): V {
+ // Assert: container has[[queue]] and[[queueTotalSize]] internal slots.
+ // Assert: container.[[queue]] is not empty.
+ const pair = container[queue_].shift()!;
+ const newTotalSize = container[queueTotalSize_] - pair.size;
+ container[queueTotalSize_] = Math.max(0, newTotalSize); // < 0 can occur due to rounding errors.
+ return pair.value;
+}
+
+export function enqueueValueWithSize<V>(
+ container: QueueContainer<V>,
+ value: V,
+ size: number
+): void {
+ // Assert: container has[[queue]] and[[queueTotalSize]] internal slots.
+ if (!isFiniteNonNegativeNumber(size)) {
+ throw new RangeError("Chunk size must be a non-negative, finite numbers");
+ }
+ container[queue_].push({ value, size });
+ container[queueTotalSize_] += size;
+}
+
+export function peekQueueValue<V>(container: QueueContainer<V>): V {
+ // Assert: container has[[queue]] and[[queueTotalSize]] internal slots.
+ // Assert: container.[[queue]] is not empty.
+ return container[queue_].front()!.value;
+}
+
+export function resetQueue<V>(
+ container: ByteQueueContainer | QueueContainer<V>
+): void {
+ // Chrome (as of v67) has a steep performance cliff with large arrays
+ // and shift(), around about 50k elements. While this is an unusual case
+ // we use a simple wrapper around shift and push that is chunked to
+ // avoid this pitfall.
+ // @see: https://github.com/stardazed/sd-streams/issues/1
+ container[queue_] = new QueueImpl<any>();
+
+ // The code below can be used as a plain array implementation of the
+ // Queue interface.
+ // const q = [] as any;
+ // q.front = function() { return this[0]; };
+ // container[queue_] = q;
+
+ container[queueTotalSize_] = 0;
+}
diff --git a/cli/js/streams/queue.ts b/cli/js/streams/queue.ts
new file mode 100644
index 000000000..264851baf
--- /dev/null
+++ b/cli/js/streams/queue.ts
@@ -0,0 +1,65 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/queue - simple queue type with chunked array backing
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+const CHUNK_SIZE = 16384;
+
+export interface Queue<T> {
+ push(t: T): void;
+ shift(): T | undefined;
+ front(): T | undefined;
+ readonly length: number;
+}
+
+export class QueueImpl<T> implements Queue<T> {
+ private readonly chunks_: T[][];
+ private readChunk_: T[];
+ private writeChunk_: T[];
+ private length_: number;
+
+ constructor() {
+ this.chunks_ = [[]];
+ this.readChunk_ = this.writeChunk_ = this.chunks_[0];
+ this.length_ = 0;
+ }
+
+ push(t: T): void {
+ this.writeChunk_.push(t);
+ this.length_ += 1;
+ if (this.writeChunk_.length === CHUNK_SIZE) {
+ this.writeChunk_ = [];
+ this.chunks_.push(this.writeChunk_);
+ }
+ }
+
+ front(): T | undefined {
+ if (this.length_ === 0) {
+ return undefined;
+ }
+ return this.readChunk_[0];
+ }
+
+ shift(): T | undefined {
+ if (this.length_ === 0) {
+ return undefined;
+ }
+ const t = this.readChunk_.shift();
+
+ this.length_ -= 1;
+ if (this.readChunk_.length === 0 && this.readChunk_ !== this.writeChunk_) {
+ this.chunks_.shift();
+ this.readChunk_ = this.chunks_[0];
+ }
+ return t;
+ }
+
+ get length(): number {
+ return this.length_;
+ }
+}
diff --git a/cli/js/streams/readable-byte-stream-controller.ts b/cli/js/streams/readable-byte-stream-controller.ts
new file mode 100644
index 000000000..86efd416c
--- /dev/null
+++ b/cli/js/streams/readable-byte-stream-controller.ts
@@ -0,0 +1,214 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/readable-byte-stream-controller - ReadableByteStreamController class implementation
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+/* eslint-disable @typescript-eslint/no-explicit-any */
+// TODO reenable this lint here
+
+import * as rs from "./readable-internals.ts";
+import * as q from "./queue-mixin.ts";
+import * as shared from "./shared-internals.ts";
+import { ReadableStreamBYOBRequest } from "./readable-stream-byob-request.ts";
+import { Queue } from "./queue.ts";
+import { UnderlyingByteSource } from "../dom_types.ts";
+
+export class ReadableByteStreamController
+ implements rs.SDReadableByteStreamController {
+ [rs.autoAllocateChunkSize_]: number | undefined;
+ [rs.byobRequest_]: rs.SDReadableStreamBYOBRequest | undefined;
+ [rs.cancelAlgorithm_]: rs.CancelAlgorithm;
+ [rs.closeRequested_]: boolean;
+ [rs.controlledReadableByteStream_]: rs.SDReadableStream<ArrayBufferView>;
+ [rs.pullAgain_]: boolean;
+ [rs.pullAlgorithm_]: rs.PullAlgorithm<ArrayBufferView>;
+ [rs.pulling_]: boolean;
+ [rs.pendingPullIntos_]: rs.PullIntoDescriptor[];
+ [rs.started_]: boolean;
+ [rs.strategyHWM_]: number;
+
+ [q.queue_]: Queue<{
+ buffer: ArrayBufferLike;
+ byteOffset: number;
+ byteLength: number;
+ }>;
+ [q.queueTotalSize_]: number;
+
+ constructor() {
+ throw new TypeError();
+ }
+
+ get byobRequest(): rs.SDReadableStreamBYOBRequest | undefined {
+ if (!rs.isReadableByteStreamController(this)) {
+ throw new TypeError();
+ }
+ if (
+ this[rs.byobRequest_] === undefined &&
+ this[rs.pendingPullIntos_].length > 0
+ ) {
+ const firstDescriptor = this[rs.pendingPullIntos_][0];
+ const view = new Uint8Array(
+ firstDescriptor.buffer,
+ firstDescriptor.byteOffset + firstDescriptor.bytesFilled,
+ firstDescriptor.byteLength - firstDescriptor.bytesFilled
+ );
+ const byobRequest = Object.create(
+ ReadableStreamBYOBRequest.prototype
+ ) as ReadableStreamBYOBRequest;
+ rs.setUpReadableStreamBYOBRequest(byobRequest, this, view);
+ this[rs.byobRequest_] = byobRequest;
+ }
+ return this[rs.byobRequest_];
+ }
+
+ get desiredSize(): number | null {
+ if (!rs.isReadableByteStreamController(this)) {
+ throw new TypeError();
+ }
+ return rs.readableByteStreamControllerGetDesiredSize(this);
+ }
+
+ close(): void {
+ if (!rs.isReadableByteStreamController(this)) {
+ throw new TypeError();
+ }
+ if (this[rs.closeRequested_]) {
+ throw new TypeError("Stream is already closing");
+ }
+ if (this[rs.controlledReadableByteStream_][shared.state_] !== "readable") {
+ throw new TypeError("Stream is closed or errored");
+ }
+ rs.readableByteStreamControllerClose(this);
+ }
+
+ enqueue(chunk: ArrayBufferView): void {
+ if (!rs.isReadableByteStreamController(this)) {
+ throw new TypeError();
+ }
+ if (this[rs.closeRequested_]) {
+ throw new TypeError("Stream is already closing");
+ }
+ if (this[rs.controlledReadableByteStream_][shared.state_] !== "readable") {
+ throw new TypeError("Stream is closed or errored");
+ }
+ if (!ArrayBuffer.isView(chunk)) {
+ throw new TypeError("chunk must be a valid ArrayBufferView");
+ }
+ // If ! IsDetachedBuffer(chunk.[[ViewedArrayBuffer]]) is true, throw a TypeError exception.
+ return rs.readableByteStreamControllerEnqueue(this, chunk);
+ }
+
+ error(error?: shared.ErrorResult): void {
+ if (!rs.isReadableByteStreamController(this)) {
+ throw new TypeError();
+ }
+ rs.readableByteStreamControllerError(this, error);
+ }
+
+ [rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> {
+ if (this[rs.pendingPullIntos_].length > 0) {
+ const firstDescriptor = this[rs.pendingPullIntos_][0];
+ firstDescriptor.bytesFilled = 0;
+ }
+ q.resetQueue(this);
+ const result = this[rs.cancelAlgorithm_](reason);
+ rs.readableByteStreamControllerClearAlgorithms(this);
+ return result;
+ }
+
+ [rs.pullSteps_](
+ forAuthorCode: boolean
+ ): Promise<IteratorResult<ArrayBufferView, any>> {
+ const stream = this[rs.controlledReadableByteStream_];
+ // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
+ if (this[q.queueTotalSize_] > 0) {
+ // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
+ const entry = this[q.queue_].shift()!;
+ this[q.queueTotalSize_] -= entry.byteLength;
+ rs.readableByteStreamControllerHandleQueueDrain(this);
+ const view = new Uint8Array(
+ entry.buffer,
+ entry.byteOffset,
+ entry.byteLength
+ );
+ return Promise.resolve(
+ rs.readableStreamCreateReadResult(view, false, forAuthorCode)
+ );
+ }
+ const autoAllocateChunkSize = this[rs.autoAllocateChunkSize_];
+ if (autoAllocateChunkSize !== undefined) {
+ let buffer: ArrayBuffer;
+ try {
+ buffer = new ArrayBuffer(autoAllocateChunkSize);
+ } catch (error) {
+ return Promise.reject(error);
+ }
+ const pullIntoDescriptor: rs.PullIntoDescriptor = {
+ buffer,
+ byteOffset: 0,
+ byteLength: autoAllocateChunkSize,
+ bytesFilled: 0,
+ elementSize: 1,
+ ctor: Uint8Array,
+ readerType: "default"
+ };
+ this[rs.pendingPullIntos_].push(pullIntoDescriptor);
+ }
+
+ const promise = rs.readableStreamAddReadRequest(stream, forAuthorCode);
+ rs.readableByteStreamControllerCallPullIfNeeded(this);
+ return promise;
+ }
+}
+
+export function setUpReadableByteStreamControllerFromUnderlyingSource(
+ stream: rs.SDReadableStream<ArrayBufferView>,
+ underlyingByteSource: UnderlyingByteSource,
+ highWaterMark: number
+): void {
+ // Assert: underlyingByteSource is not undefined.
+ const controller = Object.create(
+ ReadableByteStreamController.prototype
+ ) as ReadableByteStreamController;
+
+ const startAlgorithm = (): any => {
+ return shared.invokeOrNoop(underlyingByteSource, "start", [controller]);
+ };
+ const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
+ underlyingByteSource,
+ "pull",
+ [controller]
+ );
+ const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
+ underlyingByteSource,
+ "cancel",
+ []
+ );
+
+ let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;
+ if (autoAllocateChunkSize !== undefined) {
+ autoAllocateChunkSize = Number(autoAllocateChunkSize);
+ if (
+ !shared.isInteger(autoAllocateChunkSize) ||
+ autoAllocateChunkSize <= 0
+ ) {
+ throw new RangeError(
+ "autoAllocateChunkSize must be a positive, finite integer"
+ );
+ }
+ }
+ rs.setUpReadableByteStreamController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ autoAllocateChunkSize
+ );
+}
diff --git a/cli/js/streams/readable-internals.ts b/cli/js/streams/readable-internals.ts
new file mode 100644
index 000000000..36f4223d7
--- /dev/null
+++ b/cli/js/streams/readable-internals.ts
@@ -0,0 +1,1357 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/readable-internals - internal types and functions for readable streams
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+/* eslint-disable @typescript-eslint/no-explicit-any */
+// TODO reenable this lint here
+
+import * as shared from "./shared-internals.ts";
+import * as q from "./queue-mixin.ts";
+import {
+ QueuingStrategy,
+ QueuingStrategySizeCallback,
+ UnderlyingSource,
+ UnderlyingByteSource
+} from "../dom_types.ts";
+
+// ReadableStreamDefaultController
+export const controlledReadableStream_ = Symbol("controlledReadableStream_");
+export const pullAlgorithm_ = Symbol("pullAlgorithm_");
+export const cancelAlgorithm_ = Symbol("cancelAlgorithm_");
+export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_");
+export const strategyHWM_ = Symbol("strategyHWM_");
+export const started_ = Symbol("started_");
+export const closeRequested_ = Symbol("closeRequested_");
+export const pullAgain_ = Symbol("pullAgain_");
+export const pulling_ = Symbol("pulling_");
+export const cancelSteps_ = Symbol("cancelSteps_");
+export const pullSteps_ = Symbol("pullSteps_");
+
+// ReadableByteStreamController
+export const autoAllocateChunkSize_ = Symbol("autoAllocateChunkSize_");
+export const byobRequest_ = Symbol("byobRequest_");
+export const controlledReadableByteStream_ = Symbol(
+ "controlledReadableByteStream_"
+);
+export const pendingPullIntos_ = Symbol("pendingPullIntos_");
+
+// ReadableStreamDefaultReader
+export const closedPromise_ = Symbol("closedPromise_");
+export const ownerReadableStream_ = Symbol("ownerReadableStream_");
+export const readRequests_ = Symbol("readRequests_");
+export const readIntoRequests_ = Symbol("readIntoRequests_");
+
+// ReadableStreamBYOBRequest
+export const associatedReadableByteStreamController_ = Symbol(
+ "associatedReadableByteStreamController_"
+);
+export const view_ = Symbol("view_");
+
+// ReadableStreamBYOBReader
+
+// ReadableStream
+export const reader_ = Symbol("reader_");
+export const readableStreamController_ = Symbol("readableStreamController_");
+
+export type StartFunction<OutputType> = (
+ controller: SDReadableStreamControllerBase<OutputType>
+) => void | PromiseLike<void>;
+export type StartAlgorithm = () => Promise<void> | void;
+export type PullFunction<OutputType> = (
+ controller: SDReadableStreamControllerBase<OutputType>
+) => void | PromiseLike<void>;
+export type PullAlgorithm<OutputType> = (
+ controller: SDReadableStreamControllerBase<OutputType>
+) => PromiseLike<void>;
+export type CancelAlgorithm = (reason?: shared.ErrorResult) => Promise<void>;
+
+// ----
+
+export interface SDReadableStreamControllerBase<OutputType> {
+ readonly desiredSize: number | null;
+ close(): void;
+ error(e?: shared.ErrorResult): void;
+
+ [cancelSteps_](reason: shared.ErrorResult): Promise<void>;
+ [pullSteps_](forAuthorCode: boolean): Promise<IteratorResult<OutputType>>;
+}
+
+export interface SDReadableStreamBYOBRequest {
+ readonly view: ArrayBufferView;
+ respond(bytesWritten: number): void;
+ respondWithNewView(view: ArrayBufferView): void;
+
+ [associatedReadableByteStreamController_]:
+ | SDReadableByteStreamController
+ | undefined;
+ [view_]: ArrayBufferView | undefined;
+}
+
+interface ArrayBufferViewCtor {
+ new (
+ buffer: ArrayBufferLike,
+ byteOffset?: number,
+ byteLength?: number
+ ): ArrayBufferView;
+}
+
+export interface PullIntoDescriptor {
+ readerType: "default" | "byob";
+ ctor: ArrayBufferViewCtor;
+ buffer: ArrayBufferLike;
+ byteOffset: number;
+ byteLength: number;
+ bytesFilled: number;
+ elementSize: number;
+}
+
+export interface SDReadableByteStreamController
+ extends SDReadableStreamControllerBase<ArrayBufferView>,
+ q.ByteQueueContainer {
+ readonly byobRequest: SDReadableStreamBYOBRequest | undefined;
+ enqueue(chunk: ArrayBufferView): void;
+
+ [autoAllocateChunkSize_]: number | undefined; // A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise.
+ [byobRequest_]: SDReadableStreamBYOBRequest | undefined; // A ReadableStreamBYOBRequest instance representing the current BYOB pull request
+ [cancelAlgorithm_]: CancelAlgorithm; // A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source
+ [closeRequested_]: boolean; // A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read
+ [controlledReadableByteStream_]: SDReadableStream<ArrayBufferView>; // The ReadableStream instance controlled
+ [pullAgain_]: boolean; // A boolean flag set to true if the stream’s mechanisms requested a call to the underlying byte source’s pull() method to pull more data, but the pull could not yet be done since a previous call is still executing
+ [pullAlgorithm_]: PullAlgorithm<ArrayBufferView>; // A promise-returning algorithm that pulls data from the underlying source
+ [pulling_]: boolean; // A boolean flag set to true while the underlying byte source’s pull() method is executing and has not yet fulfilled, used to prevent reentrant calls
+ [pendingPullIntos_]: PullIntoDescriptor[]; // A List of descriptors representing pending BYOB pull requests
+ [started_]: boolean; // A boolean flag indicating whether the underlying source has finished starting
+ [strategyHWM_]: number; // A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source
+}
+
+export interface SDReadableStreamDefaultController<OutputType>
+ extends SDReadableStreamControllerBase<OutputType>,
+ q.QueueContainer<OutputType> {
+ enqueue(chunk?: OutputType): void;
+
+ [controlledReadableStream_]: SDReadableStream<OutputType>;
+ [pullAlgorithm_]: PullAlgorithm<OutputType>;
+ [cancelAlgorithm_]: CancelAlgorithm;
+ [strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>;
+ [strategyHWM_]: number;
+
+ [started_]: boolean;
+ [closeRequested_]: boolean;
+ [pullAgain_]: boolean;
+ [pulling_]: boolean;
+}
+
+// ----
+
+export interface SDReadableStreamReader<OutputType> {
+ readonly closed: Promise<void>;
+ cancel(reason: shared.ErrorResult): Promise<void>;
+ releaseLock(): void;
+
+ [ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
+ [closedPromise_]: shared.ControlledPromise<void>;
+}
+
+export interface ReadRequest<V> extends shared.ControlledPromise<V> {
+ forAuthorCode: boolean;
+}
+
+export declare class SDReadableStreamDefaultReader<OutputType>
+ implements SDReadableStreamReader<OutputType> {
+ constructor(stream: SDReadableStream<OutputType>);
+
+ readonly closed: Promise<void>;
+ cancel(reason: shared.ErrorResult): Promise<void>;
+ releaseLock(): void;
+ read(): Promise<IteratorResult<OutputType | undefined>>;
+
+ [ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
+ [closedPromise_]: shared.ControlledPromise<void>;
+ [readRequests_]: Array<ReadRequest<IteratorResult<OutputType>>>;
+}
+
+export declare class SDReadableStreamBYOBReader
+ implements SDReadableStreamReader<ArrayBufferView> {
+ constructor(stream: SDReadableStream<ArrayBufferView>);
+
+ readonly closed: Promise<void>;
+ cancel(reason: shared.ErrorResult): Promise<void>;
+ releaseLock(): void;
+ read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>>;
+
+ [ownerReadableStream_]: SDReadableStream<ArrayBufferView> | undefined;
+ [closedPromise_]: shared.ControlledPromise<void>;
+ [readIntoRequests_]: Array<ReadRequest<IteratorResult<ArrayBufferView>>>;
+}
+
+/* TODO reenable this when we add WritableStreams and Transforms
+export interface GenericTransformStream<InputType, OutputType> {
+ readable: SDReadableStream<OutputType>;
+ writable: ws.WritableStream<InputType>;
+}
+*/
+
+export type ReadableStreamState = "readable" | "closed" | "errored";
+
+export declare class SDReadableStream<OutputType> {
+ constructor(
+ underlyingSource: UnderlyingByteSource,
+ strategy?: { highWaterMark?: number; size?: undefined }
+ );
+ constructor(
+ underlyingSource?: UnderlyingSource<OutputType>,
+ strategy?: QueuingStrategy<OutputType>
+ );
+
+ readonly locked: boolean;
+ cancel(reason?: shared.ErrorResult): Promise<void>;
+ getReader(): SDReadableStreamReader<OutputType>;
+ getReader(options: { mode: "byob" }): SDReadableStreamBYOBReader;
+ tee(): Array<SDReadableStream<OutputType>>;
+
+ /* TODO reenable these methods when we bring in writableStreams and transport types
+ pipeThrough<ResultType>(
+ transform: GenericTransformStream<OutputType, ResultType>,
+ options?: PipeOptions
+ ): SDReadableStream<ResultType>;
+ pipeTo(
+ dest: ws.WritableStream<OutputType>,
+ options?: PipeOptions
+ ): Promise<void>;
+ */
+ [shared.state_]: ReadableStreamState;
+ [shared.storedError_]: shared.ErrorResult;
+ [reader_]: SDReadableStreamReader<OutputType> | undefined;
+ [readableStreamController_]: SDReadableStreamControllerBase<OutputType>;
+}
+
+// ---- Stream
+
+export function initializeReadableStream<OutputType>(
+ stream: SDReadableStream<OutputType>
+): void {
+ stream[shared.state_] = "readable";
+ stream[reader_] = undefined;
+ stream[shared.storedError_] = undefined;
+ stream[readableStreamController_] = undefined!; // mark slot as used for brand check
+}
+
+export function isReadableStream(
+ value: unknown
+): value is SDReadableStream<any> {
+ if (typeof value !== "object" || value === null) {
+ return false;
+ }
+ return readableStreamController_ in value;
+}
+
+export function isReadableStreamLocked<OutputType>(
+ stream: SDReadableStream<OutputType>
+): boolean {
+ return stream[reader_] !== undefined;
+}
+
+export function readableStreamGetNumReadIntoRequests<OutputType>(
+ stream: SDReadableStream<OutputType>
+): number | undefined {
+ // TODO remove the "as unknown" cast
+ // This is in to workaround a compiler error
+ // error TS2352: Conversion of type 'SDReadableStreamReader<OutputType>' to type 'SDReadableStreamBYOBReader' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first.
+ // Type 'SDReadableStreamReader<OutputType>' is missing the following properties from type 'SDReadableStreamBYOBReader': read, [readIntoRequests_]
+ const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
+ if (reader === undefined) {
+ return 0;
+ }
+ return reader[readIntoRequests_].length;
+}
+
+export function readableStreamGetNumReadRequests<OutputType>(
+ stream: SDReadableStream<OutputType>
+): number {
+ const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
+ if (reader === undefined) {
+ return 0;
+ }
+ return reader[readRequests_].length;
+}
+
+export function readableStreamCreateReadResult<T>(
+ value: T,
+ done: boolean,
+ forAuthorCode: boolean
+): IteratorResult<T> {
+ const prototype = forAuthorCode ? Object.prototype : null;
+ const result = Object.create(prototype);
+ result.value = value;
+ result.done = done;
+ return result;
+}
+
+export function readableStreamAddReadIntoRequest(
+ stream: SDReadableStream<ArrayBufferView>,
+ forAuthorCode: boolean
+): Promise<IteratorResult<ArrayBufferView, any>> {
+ // Assert: ! IsReadableStreamBYOBReader(stream.[[reader]]) is true.
+ // Assert: stream.[[state]] is "readable" or "closed".
+ const reader = stream[reader_] as SDReadableStreamBYOBReader;
+ const conProm = shared.createControlledPromise<
+ IteratorResult<ArrayBufferView>
+ >() as ReadRequest<IteratorResult<ArrayBufferView>>;
+ conProm.forAuthorCode = forAuthorCode;
+ reader[readIntoRequests_].push(conProm);
+ return conProm.promise;
+}
+
+export function readableStreamAddReadRequest<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ forAuthorCode: boolean
+): Promise<IteratorResult<OutputType, any>> {
+ // Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.
+ // Assert: stream.[[state]] is "readable".
+ const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
+ const conProm = shared.createControlledPromise<
+ IteratorResult<OutputType>
+ >() as ReadRequest<IteratorResult<OutputType>>;
+ conProm.forAuthorCode = forAuthorCode;
+ reader[readRequests_].push(conProm);
+ return conProm.promise;
+}
+
+export function readableStreamHasBYOBReader<OutputType>(
+ stream: SDReadableStream<OutputType>
+): boolean {
+ const reader = stream[reader_];
+ return isReadableStreamBYOBReader(reader);
+}
+
+export function readableStreamHasDefaultReader<OutputType>(
+ stream: SDReadableStream<OutputType>
+): boolean {
+ const reader = stream[reader_];
+ return isReadableStreamDefaultReader(reader);
+}
+
+export function readableStreamCancel<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ reason: shared.ErrorResult
+): Promise<undefined> {
+ if (stream[shared.state_] === "closed") {
+ return Promise.resolve(undefined);
+ }
+ if (stream[shared.state_] === "errored") {
+ return Promise.reject(stream[shared.storedError_]);
+ }
+ readableStreamClose(stream);
+
+ const sourceCancelPromise = stream[readableStreamController_][cancelSteps_](
+ reason
+ );
+ return sourceCancelPromise.then(_ => undefined);
+}
+
+export function readableStreamClose<OutputType>(
+ stream: SDReadableStream<OutputType>
+): void {
+ // Assert: stream.[[state]] is "readable".
+ stream[shared.state_] = "closed";
+ const reader = stream[reader_];
+ if (reader === undefined) {
+ return;
+ }
+
+ if (isReadableStreamDefaultReader(reader)) {
+ for (const readRequest of reader[readRequests_]) {
+ readRequest.resolve(
+ readableStreamCreateReadResult(
+ undefined,
+ true,
+ readRequest.forAuthorCode
+ )
+ );
+ }
+ reader[readRequests_] = [];
+ }
+ reader[closedPromise_].resolve();
+ reader[closedPromise_].promise.catch(() => {});
+}
+
+export function readableStreamError<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ error: shared.ErrorResult
+): void {
+ if (stream[shared.state_] !== "readable") {
+ throw new RangeError("Stream is in an invalid state");
+ }
+ stream[shared.state_] = "errored";
+ stream[shared.storedError_] = error;
+
+ const reader = stream[reader_];
+ if (reader === undefined) {
+ return;
+ }
+ if (isReadableStreamDefaultReader(reader)) {
+ for (const readRequest of reader[readRequests_]) {
+ readRequest.reject(error);
+ }
+ reader[readRequests_] = [];
+ } else {
+ // Assert: IsReadableStreamBYOBReader(reader).
+ // TODO remove the "as unknown" cast
+ const readIntoRequests = ((reader as unknown) as SDReadableStreamBYOBReader)[
+ readIntoRequests_
+ ];
+ for (const readIntoRequest of readIntoRequests) {
+ readIntoRequest.reject(error);
+ }
+ // TODO remove the "as unknown" cast
+ ((reader as unknown) as SDReadableStreamBYOBReader)[readIntoRequests_] = [];
+ }
+
+ reader[closedPromise_].reject(error);
+}
+
+// ---- Readers
+
+export function isReadableStreamDefaultReader(
+ reader: unknown
+): reader is SDReadableStreamDefaultReader<any> {
+ if (typeof reader !== "object" || reader === null) {
+ return false;
+ }
+ return readRequests_ in reader;
+}
+
+export function isReadableStreamBYOBReader(
+ reader: unknown
+): reader is SDReadableStreamBYOBReader {
+ if (typeof reader !== "object" || reader === null) {
+ return false;
+ }
+ return readIntoRequests_ in reader;
+}
+
+export function readableStreamReaderGenericInitialize<OutputType>(
+ reader: SDReadableStreamReader<OutputType>,
+ stream: SDReadableStream<OutputType>
+): void {
+ reader[ownerReadableStream_] = stream;
+ stream[reader_] = reader;
+ const streamState = stream[shared.state_];
+
+ reader[closedPromise_] = shared.createControlledPromise<void>();
+ if (streamState === "readable") {
+ // leave as is
+ } else if (streamState === "closed") {
+ reader[closedPromise_].resolve(undefined);
+ } else {
+ reader[closedPromise_].reject(stream[shared.storedError_]);
+ reader[closedPromise_].promise.catch(() => {});
+ }
+}
+
+export function readableStreamReaderGenericRelease<OutputType>(
+ reader: SDReadableStreamReader<OutputType>
+): void {
+ // Assert: reader.[[ownerReadableStream]] is not undefined.
+ // Assert: reader.[[ownerReadableStream]].[[reader]] is reader.
+ const stream = reader[ownerReadableStream_];
+ if (stream === undefined) {
+ throw new TypeError("Reader is in an inconsistent state");
+ }
+
+ if (stream[shared.state_] === "readable") {
+ // code moved out
+ } else {
+ reader[closedPromise_] = shared.createControlledPromise<void>();
+ }
+ reader[closedPromise_].reject(new TypeError());
+ reader[closedPromise_].promise.catch(() => {});
+
+ stream[reader_] = undefined;
+ reader[ownerReadableStream_] = undefined;
+}
+
+export function readableStreamBYOBReaderRead(
+ reader: SDReadableStreamBYOBReader,
+ view: ArrayBufferView,
+ forAuthorCode = false
+): Promise<IteratorResult<ArrayBufferView, any>> {
+ const stream = reader[ownerReadableStream_]!;
+ // Assert: stream is not undefined.
+
+ if (stream[shared.state_] === "errored") {
+ return Promise.reject(stream[shared.storedError_]);
+ }
+ return readableByteStreamControllerPullInto(
+ stream[readableStreamController_] as SDReadableByteStreamController,
+ view,
+ forAuthorCode
+ );
+}
+
+export function readableStreamDefaultReaderRead<OutputType>(
+ reader: SDReadableStreamDefaultReader<OutputType>,
+ forAuthorCode = false
+): Promise<IteratorResult<OutputType | undefined>> {
+ const stream = reader[ownerReadableStream_]!;
+ // Assert: stream is not undefined.
+
+ if (stream[shared.state_] === "closed") {
+ return Promise.resolve(
+ readableStreamCreateReadResult(undefined, true, forAuthorCode)
+ );
+ }
+ if (stream[shared.state_] === "errored") {
+ return Promise.reject(stream[shared.storedError_]);
+ }
+ // Assert: stream.[[state]] is "readable".
+ return stream[readableStreamController_][pullSteps_](forAuthorCode);
+}
+
+export function readableStreamFulfillReadIntoRequest<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ chunk: ArrayBufferView,
+ done: boolean
+): void {
+ // TODO remove the "as unknown" cast
+ const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
+ const readIntoRequest = reader[readIntoRequests_].shift()!; // <-- length check done in caller
+ readIntoRequest.resolve(
+ readableStreamCreateReadResult(chunk, done, readIntoRequest.forAuthorCode)
+ );
+}
+
+export function readableStreamFulfillReadRequest<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ chunk: OutputType,
+ done: boolean
+): void {
+ const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
+ const readRequest = reader[readRequests_].shift()!; // <-- length check done in caller
+ readRequest.resolve(
+ readableStreamCreateReadResult(chunk, done, readRequest.forAuthorCode)
+ );
+}
+
+// ---- DefaultController
+
+export function setUpReadableStreamDefaultController<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ controller: SDReadableStreamDefaultController<OutputType>,
+ startAlgorithm: StartAlgorithm,
+ pullAlgorithm: PullAlgorithm<OutputType>,
+ cancelAlgorithm: CancelAlgorithm,
+ highWaterMark: number,
+ sizeAlgorithm: QueuingStrategySizeCallback<OutputType>
+): void {
+ // Assert: stream.[[readableStreamController]] is undefined.
+ controller[controlledReadableStream_] = stream;
+ q.resetQueue(controller);
+ controller[started_] = false;
+ controller[closeRequested_] = false;
+ controller[pullAgain_] = false;
+ controller[pulling_] = false;
+ controller[strategySizeAlgorithm_] = sizeAlgorithm;
+ controller[strategyHWM_] = highWaterMark;
+ controller[pullAlgorithm_] = pullAlgorithm;
+ controller[cancelAlgorithm_] = cancelAlgorithm;
+ stream[readableStreamController_] = controller;
+
+ const startResult = startAlgorithm();
+ Promise.resolve(startResult).then(
+ _ => {
+ controller[started_] = true;
+ // Assert: controller.[[pulling]] is false.
+ // Assert: controller.[[pullAgain]] is false.
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ },
+ error => {
+ readableStreamDefaultControllerError(controller, error);
+ }
+ );
+}
+
+export function isReadableStreamDefaultController(
+ value: unknown
+): value is SDReadableStreamDefaultController<any> {
+ if (typeof value !== "object" || value === null) {
+ return false;
+ }
+ return controlledReadableStream_ in value;
+}
+
+export function readableStreamDefaultControllerHasBackpressure<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): boolean {
+ return !readableStreamDefaultControllerShouldCallPull(controller);
+}
+
+export function readableStreamDefaultControllerCanCloseOrEnqueue<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): boolean {
+ const state = controller[controlledReadableStream_][shared.state_];
+ return controller[closeRequested_] === false && state === "readable";
+}
+
+export function readableStreamDefaultControllerGetDesiredSize<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): number | null {
+ const state = controller[controlledReadableStream_][shared.state_];
+ if (state === "errored") {
+ return null;
+ }
+ if (state === "closed") {
+ return 0;
+ }
+ return controller[strategyHWM_] - controller[q.queueTotalSize_];
+}
+
+export function readableStreamDefaultControllerClose<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): void {
+ // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
+ controller[closeRequested_] = true;
+ const stream = controller[controlledReadableStream_];
+ if (controller[q.queue_].length === 0) {
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+ }
+}
+
+export function readableStreamDefaultControllerEnqueue<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>,
+ chunk: OutputType
+): void {
+ const stream = controller[controlledReadableStream_];
+ // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
+ if (
+ isReadableStreamLocked(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ readableStreamFulfillReadRequest(stream, chunk, false);
+ } else {
+ // Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk,
+ // and interpreting the result as an ECMAScript completion value.
+ // impl note: assuming that in JS land this just means try/catch with rethrow
+ let chunkSize: number;
+ try {
+ chunkSize = controller[strategySizeAlgorithm_](chunk);
+ } catch (error) {
+ readableStreamDefaultControllerError(controller, error);
+ throw error;
+ }
+ try {
+ q.enqueueValueWithSize(controller, chunk, chunkSize);
+ } catch (error) {
+ readableStreamDefaultControllerError(controller, error);
+ throw error;
+ }
+ }
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+}
+
+export function readableStreamDefaultControllerError<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>,
+ error: shared.ErrorResult
+): void {
+ const stream = controller[controlledReadableStream_];
+ if (stream[shared.state_] !== "readable") {
+ return;
+ }
+ q.resetQueue(controller);
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamError(stream, error);
+}
+
+export function readableStreamDefaultControllerCallPullIfNeeded<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): void {
+ if (!readableStreamDefaultControllerShouldCallPull(controller)) {
+ return;
+ }
+ if (controller[pulling_]) {
+ controller[pullAgain_] = true;
+ return;
+ }
+ if (controller[pullAgain_]) {
+ throw new RangeError("Stream controller is in an invalid state.");
+ }
+
+ controller[pulling_] = true;
+ controller[pullAlgorithm_](controller).then(
+ _ => {
+ controller[pulling_] = false;
+ if (controller[pullAgain_]) {
+ controller[pullAgain_] = false;
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ },
+ error => {
+ readableStreamDefaultControllerError(controller, error);
+ }
+ );
+}
+
+export function readableStreamDefaultControllerShouldCallPull<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): boolean {
+ const stream = controller[controlledReadableStream_];
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
+ return false;
+ }
+ if (controller[started_] === false) {
+ return false;
+ }
+ if (
+ isReadableStreamLocked(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ return true;
+ }
+ const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
+ if (desiredSize === null) {
+ throw new RangeError("Stream is in an invalid state.");
+ }
+ return desiredSize > 0;
+}
+
+export function readableStreamDefaultControllerClearAlgorithms<OutputType>(
+ controller: SDReadableStreamDefaultController<OutputType>
+): void {
+ controller[pullAlgorithm_] = undefined!;
+ controller[cancelAlgorithm_] = undefined!;
+ controller[strategySizeAlgorithm_] = undefined!;
+}
+
+// ---- BYOBController
+
+export function setUpReadableByteStreamController(
+ stream: SDReadableStream<ArrayBufferView>,
+ controller: SDReadableByteStreamController,
+ startAlgorithm: StartAlgorithm,
+ pullAlgorithm: PullAlgorithm<ArrayBufferView>,
+ cancelAlgorithm: CancelAlgorithm,
+ highWaterMark: number,
+ autoAllocateChunkSize: number | undefined
+): void {
+ // Assert: stream.[[readableStreamController]] is undefined.
+ if (stream[readableStreamController_] !== undefined) {
+ throw new TypeError("Cannot reuse streams");
+ }
+ if (autoAllocateChunkSize !== undefined) {
+ if (
+ !shared.isInteger(autoAllocateChunkSize) ||
+ autoAllocateChunkSize <= 0
+ ) {
+ throw new RangeError(
+ "autoAllocateChunkSize must be a positive, finite integer"
+ );
+ }
+ }
+ // Set controller.[[controlledReadableByteStream]] to stream.
+ controller[controlledReadableByteStream_] = stream;
+ // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
+ controller[pullAgain_] = false;
+ controller[pulling_] = false;
+ readableByteStreamControllerClearPendingPullIntos(controller);
+ q.resetQueue(controller);
+ controller[closeRequested_] = false;
+ controller[started_] = false;
+ controller[strategyHWM_] = shared.validateAndNormalizeHighWaterMark(
+ highWaterMark
+ );
+ controller[pullAlgorithm_] = pullAlgorithm;
+ controller[cancelAlgorithm_] = cancelAlgorithm;
+ controller[autoAllocateChunkSize_] = autoAllocateChunkSize;
+ controller[pendingPullIntos_] = [];
+ stream[readableStreamController_] = controller;
+
+ // Let startResult be the result of performing startAlgorithm.
+ const startResult = startAlgorithm();
+ Promise.resolve(startResult).then(
+ _ => {
+ controller[started_] = true;
+ // Assert: controller.[[pulling]] is false.
+ // Assert: controller.[[pullAgain]] is false.
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ },
+ error => {
+ readableByteStreamControllerError(controller, error);
+ }
+ );
+}
+
+export function isReadableStreamBYOBRequest(
+ value: unknown
+): value is SDReadableStreamBYOBRequest {
+ if (typeof value !== "object" || value === null) {
+ return false;
+ }
+ return associatedReadableByteStreamController_ in value;
+}
+
+export function isReadableByteStreamController(
+ value: unknown
+): value is SDReadableByteStreamController {
+ if (typeof value !== "object" || value === null) {
+ return false;
+ }
+ return controlledReadableByteStream_ in value;
+}
+
+export function readableByteStreamControllerCallPullIfNeeded(
+ controller: SDReadableByteStreamController
+): void {
+ if (!readableByteStreamControllerShouldCallPull(controller)) {
+ return;
+ }
+ if (controller[pulling_]) {
+ controller[pullAgain_] = true;
+ return;
+ }
+ // Assert: controller.[[pullAgain]] is false.
+ controller[pulling_] = true;
+ controller[pullAlgorithm_](controller).then(
+ _ => {
+ controller[pulling_] = false;
+ if (controller[pullAgain_]) {
+ controller[pullAgain_] = false;
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+ },
+ error => {
+ readableByteStreamControllerError(controller, error);
+ }
+ );
+}
+
+export function readableByteStreamControllerClearAlgorithms(
+ controller: SDReadableByteStreamController
+): void {
+ controller[pullAlgorithm_] = undefined!;
+ controller[cancelAlgorithm_] = undefined!;
+}
+
+export function readableByteStreamControllerClearPendingPullIntos(
+ controller: SDReadableByteStreamController
+): void {
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+ controller[pendingPullIntos_] = [];
+}
+
+export function readableByteStreamControllerClose(
+ controller: SDReadableByteStreamController
+): void {
+ const stream = controller[controlledReadableByteStream_];
+ // Assert: controller.[[closeRequested]] is false.
+ // Assert: stream.[[state]] is "readable".
+ if (controller[q.queueTotalSize_] > 0) {
+ controller[closeRequested_] = true;
+ return;
+ }
+ if (controller[pendingPullIntos_].length > 0) {
+ const firstPendingPullInto = controller[pendingPullIntos_][0];
+ if (firstPendingPullInto.bytesFilled > 0) {
+ const error = new TypeError();
+ readableByteStreamControllerError(controller, error);
+ throw error;
+ }
+ }
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+}
+
+export function readableByteStreamControllerCommitPullIntoDescriptor(
+ stream: SDReadableStream<ArrayBufferView>,
+ pullIntoDescriptor: PullIntoDescriptor
+): void {
+ // Assert: stream.[[state]] is not "errored".
+ let done = false;
+ if (stream[shared.state_] === "closed") {
+ // Assert: pullIntoDescriptor.[[bytesFilled]] is 0.
+ done = true;
+ }
+ const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
+ pullIntoDescriptor
+ );
+ if (pullIntoDescriptor.readerType === "default") {
+ readableStreamFulfillReadRequest(stream, filledView, done);
+ } else {
+ // Assert: pullIntoDescriptor.[[readerType]] is "byob".
+ readableStreamFulfillReadIntoRequest(stream, filledView, done);
+ }
+}
+
+export function readableByteStreamControllerConvertPullIntoDescriptor(
+ pullIntoDescriptor: PullIntoDescriptor
+): ArrayBufferView {
+ const { bytesFilled, elementSize } = pullIntoDescriptor;
+ // Assert: bytesFilled <= pullIntoDescriptor.byteLength
+ // Assert: bytesFilled mod elementSize is 0
+ return new pullIntoDescriptor.ctor(
+ pullIntoDescriptor.buffer,
+ pullIntoDescriptor.byteOffset,
+ bytesFilled / elementSize
+ );
+}
+
+export function readableByteStreamControllerEnqueue(
+ controller: SDReadableByteStreamController,
+ chunk: ArrayBufferView
+): void {
+ const stream = controller[controlledReadableByteStream_];
+ // Assert: controller.[[closeRequested]] is false.
+ // Assert: stream.[[state]] is "readable".
+ const { buffer, byteOffset, byteLength } = chunk;
+
+ const transferredBuffer = shared.transferArrayBuffer(buffer);
+
+ if (readableStreamHasDefaultReader(stream)) {
+ if (readableStreamGetNumReadRequests(stream) === 0) {
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ } else {
+ // Assert: controller.[[queue]] is empty.
+ const transferredView = new Uint8Array(
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ readableStreamFulfillReadRequest(stream, transferredView, false);
+ }
+ } else if (readableStreamHasBYOBReader(stream)) {
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
+ controller
+ );
+ } else {
+ // Assert: !IsReadableStreamLocked(stream) is false.
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferredBuffer,
+ byteOffset,
+ byteLength
+ );
+ }
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+export function readableByteStreamControllerEnqueueChunkToQueue(
+ controller: SDReadableByteStreamController,
+ buffer: ArrayBufferLike,
+ byteOffset: number,
+ byteLength: number
+): void {
+ controller[q.queue_].push({ buffer, byteOffset, byteLength });
+ controller[q.queueTotalSize_] += byteLength;
+}
+
+export function readableByteStreamControllerError(
+ controller: SDReadableByteStreamController,
+ error: shared.ErrorResult
+): void {
+ const stream = controller[controlledReadableByteStream_];
+ if (stream[shared.state_] !== "readable") {
+ return;
+ }
+ readableByteStreamControllerClearPendingPullIntos(controller);
+ q.resetQueue(controller);
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamError(stream, error);
+}
+
+export function readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller: SDReadableByteStreamController,
+ size: number,
+ pullIntoDescriptor: PullIntoDescriptor
+): void {
+ // Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor.
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+ pullIntoDescriptor.bytesFilled += size;
+}
+
+export function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller: SDReadableByteStreamController,
+ pullIntoDescriptor: PullIntoDescriptor
+): boolean {
+ const elementSize = pullIntoDescriptor.elementSize;
+ const currentAlignedBytes =
+ pullIntoDescriptor.bytesFilled -
+ (pullIntoDescriptor.bytesFilled % elementSize);
+ const maxBytesToCopy = Math.min(
+ controller[q.queueTotalSize_],
+ pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled
+ );
+ const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
+ const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
+ let totalBytesToCopyRemaining = maxBytesToCopy;
+ let ready = false;
+
+ if (maxAlignedBytes > currentAlignedBytes) {
+ totalBytesToCopyRemaining =
+ maxAlignedBytes - pullIntoDescriptor.bytesFilled;
+ ready = true;
+ }
+ const queue = controller[q.queue_];
+
+ while (totalBytesToCopyRemaining > 0) {
+ const headOfQueue = queue.front()!;
+ const bytesToCopy = Math.min(
+ totalBytesToCopyRemaining,
+ headOfQueue.byteLength
+ );
+ const destStart =
+ pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
+ shared.copyDataBlockBytes(
+ pullIntoDescriptor.buffer,
+ destStart,
+ headOfQueue.buffer,
+ headOfQueue.byteOffset,
+ bytesToCopy
+ );
+ if (headOfQueue.byteLength === bytesToCopy) {
+ queue.shift();
+ } else {
+ headOfQueue.byteOffset += bytesToCopy;
+ headOfQueue.byteLength -= bytesToCopy;
+ }
+ controller[q.queueTotalSize_] -= bytesToCopy;
+ readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller,
+ bytesToCopy,
+ pullIntoDescriptor
+ );
+ totalBytesToCopyRemaining -= bytesToCopy;
+ }
+ if (!ready) {
+ // Assert: controller[queueTotalSize_] === 0
+ // Assert: pullIntoDescriptor.bytesFilled > 0
+ // Assert: pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize
+ }
+ return ready;
+}
+
+export function readableByteStreamControllerGetDesiredSize(
+ controller: SDReadableByteStreamController
+): number | null {
+ const stream = controller[controlledReadableByteStream_];
+ const state = stream[shared.state_];
+ if (state === "errored") {
+ return null;
+ }
+ if (state === "closed") {
+ return 0;
+ }
+ return controller[strategyHWM_] - controller[q.queueTotalSize_];
+}
+
+export function readableByteStreamControllerHandleQueueDrain(
+ controller: SDReadableByteStreamController
+): void {
+ // Assert: controller.[[controlledReadableByteStream]].[[state]] is "readable".
+ if (controller[q.queueTotalSize_] === 0 && controller[closeRequested_]) {
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamClose(controller[controlledReadableByteStream_]);
+ } else {
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+}
+
+export function readableByteStreamControllerInvalidateBYOBRequest(
+ controller: SDReadableByteStreamController
+): void {
+ const byobRequest = controller[byobRequest_];
+ if (byobRequest === undefined) {
+ return;
+ }
+ byobRequest[associatedReadableByteStreamController_] = undefined;
+ byobRequest[view_] = undefined;
+ controller[byobRequest_] = undefined;
+}
+
+export function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
+ controller: SDReadableByteStreamController
+): void {
+ // Assert: controller.[[closeRequested]] is false.
+ const pendingPullIntos = controller[pendingPullIntos_];
+ while (pendingPullIntos.length > 0) {
+ if (controller[q.queueTotalSize_] === 0) {
+ return;
+ }
+ const pullIntoDescriptor = pendingPullIntos[0];
+ if (
+ readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller,
+ pullIntoDescriptor
+ )
+ ) {
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ controller[controlledReadableByteStream_],
+ pullIntoDescriptor
+ );
+ }
+ }
+}
+
+export function readableByteStreamControllerPullInto(
+ controller: SDReadableByteStreamController,
+ view: ArrayBufferView,
+ forAuthorCode: boolean
+): Promise<IteratorResult<ArrayBufferView, any>> {
+ const stream = controller[controlledReadableByteStream_];
+
+ const elementSize = (view as Uint8Array).BYTES_PER_ELEMENT || 1; // DataView exposes this in Webkit as 1, is not present in FF or Blink
+ const ctor = view.constructor as Uint8ArrayConstructor; // the typecast here is just for TS typing, it does not influence buffer creation
+
+ const byteOffset = view.byteOffset;
+ const byteLength = view.byteLength;
+ const buffer = shared.transferArrayBuffer(view.buffer);
+ const pullIntoDescriptor: PullIntoDescriptor = {
+ buffer,
+ byteOffset,
+ byteLength,
+ bytesFilled: 0,
+ elementSize,
+ ctor,
+ readerType: "byob"
+ };
+
+ if (controller[pendingPullIntos_].length > 0) {
+ controller[pendingPullIntos_].push(pullIntoDescriptor);
+ return readableStreamAddReadIntoRequest(stream, forAuthorCode);
+ }
+ if (stream[shared.state_] === "closed") {
+ const emptyView = new ctor(
+ pullIntoDescriptor.buffer,
+ pullIntoDescriptor.byteOffset,
+ 0
+ );
+ return Promise.resolve(
+ readableStreamCreateReadResult(emptyView, true, forAuthorCode)
+ );
+ }
+
+ if (controller[q.queueTotalSize_] > 0) {
+ if (
+ readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller,
+ pullIntoDescriptor
+ )
+ ) {
+ const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
+ pullIntoDescriptor
+ );
+ readableByteStreamControllerHandleQueueDrain(controller);
+ return Promise.resolve(
+ readableStreamCreateReadResult(filledView, false, forAuthorCode)
+ );
+ }
+ if (controller[closeRequested_]) {
+ const error = new TypeError();
+ readableByteStreamControllerError(controller, error);
+ return Promise.reject(error);
+ }
+ }
+
+ controller[pendingPullIntos_].push(pullIntoDescriptor);
+ const promise = readableStreamAddReadIntoRequest(stream, forAuthorCode);
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ return promise;
+}
+
+export function readableByteStreamControllerRespond(
+ controller: SDReadableByteStreamController,
+ bytesWritten: number
+): void {
+ bytesWritten = Number(bytesWritten);
+ if (!shared.isFiniteNonNegativeNumber(bytesWritten)) {
+ throw new RangeError("bytesWritten must be a finite, non-negative number");
+ }
+ // Assert: controller.[[pendingPullIntos]] is not empty.
+ readableByteStreamControllerRespondInternal(controller, bytesWritten);
+}
+
+export function readableByteStreamControllerRespondInClosedState(
+ controller: SDReadableByteStreamController,
+ firstDescriptor: PullIntoDescriptor
+): void {
+ firstDescriptor.buffer = shared.transferArrayBuffer(firstDescriptor.buffer);
+ // Assert: firstDescriptor.[[bytesFilled]] is 0.
+ const stream = controller[controlledReadableByteStream_];
+ if (readableStreamHasBYOBReader(stream)) {
+ while (readableStreamGetNumReadIntoRequests(stream) > 0) {
+ const pullIntoDescriptor = readableByteStreamControllerShiftPendingPullInto(
+ controller
+ )!;
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ stream,
+ pullIntoDescriptor
+ );
+ }
+ }
+}
+
+export function readableByteStreamControllerRespondInReadableState(
+ controller: SDReadableByteStreamController,
+ bytesWritten: number,
+ pullIntoDescriptor: PullIntoDescriptor
+): void {
+ if (
+ pullIntoDescriptor.bytesFilled + bytesWritten >
+ pullIntoDescriptor.byteLength
+ ) {
+ throw new RangeError();
+ }
+ readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller,
+ bytesWritten,
+ pullIntoDescriptor
+ );
+ if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
+ return;
+ }
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ const remainderSize =
+ pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
+ if (remainderSize > 0) {
+ const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
+ const remainder = shared.cloneArrayBuffer(
+ pullIntoDescriptor.buffer,
+ end - remainderSize,
+ remainderSize,
+ ArrayBuffer
+ );
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ remainder,
+ 0,
+ remainder.byteLength
+ );
+ }
+ pullIntoDescriptor.buffer = shared.transferArrayBuffer(
+ pullIntoDescriptor.buffer
+ );
+ pullIntoDescriptor.bytesFilled =
+ pullIntoDescriptor.bytesFilled - remainderSize;
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ controller[controlledReadableByteStream_],
+ pullIntoDescriptor
+ );
+ readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
+}
+
+export function readableByteStreamControllerRespondInternal(
+ controller: SDReadableByteStreamController,
+ bytesWritten: number
+): void {
+ const firstDescriptor = controller[pendingPullIntos_][0];
+ const stream = controller[controlledReadableByteStream_];
+ if (stream[shared.state_] === "closed") {
+ if (bytesWritten !== 0) {
+ throw new TypeError();
+ }
+ readableByteStreamControllerRespondInClosedState(
+ controller,
+ firstDescriptor
+ );
+ } else {
+ // Assert: stream.[[state]] is "readable".
+ readableByteStreamControllerRespondInReadableState(
+ controller,
+ bytesWritten,
+ firstDescriptor
+ );
+ }
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+export function readableByteStreamControllerRespondWithNewView(
+ controller: SDReadableByteStreamController,
+ view: ArrayBufferView
+): void {
+ // Assert: controller.[[pendingPullIntos]] is not empty.
+ const firstDescriptor = controller[pendingPullIntos_][0];
+ if (
+ firstDescriptor.byteOffset + firstDescriptor.bytesFilled !==
+ view.byteOffset
+ ) {
+ throw new RangeError();
+ }
+ if (firstDescriptor.byteLength !== view.byteLength) {
+ throw new RangeError();
+ }
+ firstDescriptor.buffer = view.buffer;
+ readableByteStreamControllerRespondInternal(controller, view.byteLength);
+}
+
+export function readableByteStreamControllerShiftPendingPullInto(
+ controller: SDReadableByteStreamController
+): PullIntoDescriptor | undefined {
+ const descriptor = controller[pendingPullIntos_].shift();
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+ return descriptor;
+}
+
+export function readableByteStreamControllerShouldCallPull(
+ controller: SDReadableByteStreamController
+): boolean {
+ // Let stream be controller.[[controlledReadableByteStream]].
+ const stream = controller[controlledReadableByteStream_];
+ if (stream[shared.state_] !== "readable") {
+ return false;
+ }
+ if (controller[closeRequested_]) {
+ return false;
+ }
+ if (!controller[started_]) {
+ return false;
+ }
+ if (
+ readableStreamHasDefaultReader(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0
+ ) {
+ return true;
+ }
+ if (
+ readableStreamHasBYOBReader(stream) &&
+ readableStreamGetNumReadIntoRequests(stream) > 0
+ ) {
+ return true;
+ }
+ const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
+ // Assert: desiredSize is not null.
+ return desiredSize! > 0;
+}
+
+export function setUpReadableStreamBYOBRequest(
+ request: SDReadableStreamBYOBRequest,
+ controller: SDReadableByteStreamController,
+ view: ArrayBufferView
+): void {
+ if (!isReadableByteStreamController(controller)) {
+ throw new TypeError();
+ }
+ if (!ArrayBuffer.isView(view)) {
+ throw new TypeError();
+ }
+ // Assert: !IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
+
+ request[associatedReadableByteStreamController_] = controller;
+ request[view_] = view;
+}
diff --git a/cli/js/streams/readable-stream-byob-reader.ts b/cli/js/streams/readable-stream-byob-reader.ts
new file mode 100644
index 000000000..0f9bfb037
--- /dev/null
+++ b/cli/js/streams/readable-stream-byob-reader.ts
@@ -0,0 +1,93 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/readable-stream-byob-reader - ReadableStreamBYOBReader class implementation
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+import * as rs from "./readable-internals.ts";
+import * as shared from "./shared-internals.ts";
+
+export class SDReadableStreamBYOBReader
+ implements rs.SDReadableStreamBYOBReader {
+ [rs.closedPromise_]: shared.ControlledPromise<void>;
+ [rs.ownerReadableStream_]: rs.SDReadableStream<ArrayBufferView> | undefined;
+ [rs.readIntoRequests_]: Array<
+ rs.ReadRequest<IteratorResult<ArrayBufferView>>
+ >;
+
+ constructor(stream: rs.SDReadableStream<ArrayBufferView>) {
+ if (!rs.isReadableStream(stream)) {
+ throw new TypeError();
+ }
+ if (
+ !rs.isReadableByteStreamController(stream[rs.readableStreamController_])
+ ) {
+ throw new TypeError();
+ }
+ if (rs.isReadableStreamLocked(stream)) {
+ throw new TypeError("The stream is locked.");
+ }
+ rs.readableStreamReaderGenericInitialize(this, stream);
+ this[rs.readIntoRequests_] = [];
+ }
+
+ get closed(): Promise<void> {
+ if (!rs.isReadableStreamBYOBReader(this)) {
+ return Promise.reject(new TypeError());
+ }
+ return this[rs.closedPromise_].promise;
+ }
+
+ cancel(reason: shared.ErrorResult): Promise<void> {
+ if (!rs.isReadableStreamBYOBReader(this)) {
+ return Promise.reject(new TypeError());
+ }
+ const stream = this[rs.ownerReadableStream_];
+ if (stream === undefined) {
+ return Promise.reject(
+ new TypeError("Reader is not associated with a stream")
+ );
+ }
+ return rs.readableStreamCancel(stream, reason);
+ }
+
+ read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>> {
+ if (!rs.isReadableStreamBYOBReader(this)) {
+ return Promise.reject(new TypeError());
+ }
+ if (this[rs.ownerReadableStream_] === undefined) {
+ return Promise.reject(
+ new TypeError("Reader is not associated with a stream")
+ );
+ }
+ if (!ArrayBuffer.isView(view)) {
+ return Promise.reject(
+ new TypeError("view argument must be a valid ArrayBufferView")
+ );
+ }
+ // If ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true, return a promise rejected with a TypeError exception.
+ if (view.byteLength === 0) {
+ return Promise.reject(
+ new TypeError("supplied buffer view must be > 0 bytes")
+ );
+ }
+ return rs.readableStreamBYOBReaderRead(this, view, true);
+ }
+
+ releaseLock(): void {
+ if (!rs.isReadableStreamBYOBReader(this)) {
+ throw new TypeError();
+ }
+ if (this[rs.ownerReadableStream_] === undefined) {
+ throw new TypeError("Reader is not associated with a stream");
+ }
+ if (this[rs.readIntoRequests_].length > 0) {
+ throw new TypeError();
+ }
+ rs.readableStreamReaderGenericRelease(this);
+ }
+}
diff --git a/cli/js/streams/readable-stream-byob-request.ts b/cli/js/streams/readable-stream-byob-request.ts
new file mode 100644
index 000000000..25b937f10
--- /dev/null
+++ b/cli/js/streams/readable-stream-byob-request.ts
@@ -0,0 +1,60 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/readable-stream-byob-request - ReadableStreamBYOBRequest class implementation
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+import * as rs from "./readable-internals.ts";
+
+export class ReadableStreamBYOBRequest {
+ [rs.associatedReadableByteStreamController_]:
+ | rs.SDReadableByteStreamController
+ | undefined;
+ [rs.view_]: ArrayBufferView | undefined;
+
+ constructor() {
+ throw new TypeError();
+ }
+
+ get view(): ArrayBufferView {
+ if (!rs.isReadableStreamBYOBRequest(this)) {
+ throw new TypeError();
+ }
+ return this[rs.view_]!;
+ }
+
+ respond(bytesWritten: number): void {
+ if (!rs.isReadableStreamBYOBRequest(this)) {
+ throw new TypeError();
+ }
+ if (this[rs.associatedReadableByteStreamController_] === undefined) {
+ throw new TypeError();
+ }
+ // If! IsDetachedBuffer(this.[[view]].[[ViewedArrayBuffer]]) is true, throw a TypeError exception.
+ return rs.readableByteStreamControllerRespond(
+ this[rs.associatedReadableByteStreamController_]!,
+ bytesWritten
+ );
+ }
+
+ respondWithNewView(view: ArrayBufferView): void {
+ if (!rs.isReadableStreamBYOBRequest(this)) {
+ throw new TypeError();
+ }
+ if (this[rs.associatedReadableByteStreamController_] === undefined) {
+ throw new TypeError();
+ }
+ if (!ArrayBuffer.isView(view)) {
+ throw new TypeError("view parameter must be a TypedArray");
+ }
+ // If! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is true, throw a TypeError exception.
+ return rs.readableByteStreamControllerRespondWithNewView(
+ this[rs.associatedReadableByteStreamController_]!,
+ view
+ );
+ }
+}
diff --git a/cli/js/streams/readable-stream-default-controller.ts b/cli/js/streams/readable-stream-default-controller.ts
new file mode 100644
index 000000000..e9ddce1bc
--- /dev/null
+++ b/cli/js/streams/readable-stream-default-controller.ts
@@ -0,0 +1,139 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/readable-stream-default-controller - ReadableStreamDefaultController class implementation
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+/* eslint-disable @typescript-eslint/no-explicit-any */
+// TODO reenable this lint here
+
+import * as rs from "./readable-internals.ts";
+import * as shared from "./shared-internals.ts";
+import * as q from "./queue-mixin.ts";
+import { Queue } from "./queue.ts";
+import { QueuingStrategySizeCallback, UnderlyingSource } from "../dom_types.ts";
+
+export class ReadableStreamDefaultController<OutputType>
+ implements rs.SDReadableStreamDefaultController<OutputType> {
+ [rs.cancelAlgorithm_]: rs.CancelAlgorithm;
+ [rs.closeRequested_]: boolean;
+ [rs.controlledReadableStream_]: rs.SDReadableStream<OutputType>;
+ [rs.pullAgain_]: boolean;
+ [rs.pullAlgorithm_]: rs.PullAlgorithm<OutputType>;
+ [rs.pulling_]: boolean;
+ [rs.strategyHWM_]: number;
+ [rs.strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>;
+ [rs.started_]: boolean;
+
+ [q.queue_]: Queue<q.QueueElement<OutputType>>;
+ [q.queueTotalSize_]: number;
+
+ constructor() {
+ throw new TypeError();
+ }
+
+ get desiredSize(): number | null {
+ return rs.readableStreamDefaultControllerGetDesiredSize(this);
+ }
+
+ close(): void {
+ if (!rs.isReadableStreamDefaultController(this)) {
+ throw new TypeError();
+ }
+ if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
+ throw new TypeError(
+ "Cannot close, the stream is already closing or not readable"
+ );
+ }
+ rs.readableStreamDefaultControllerClose(this);
+ }
+
+ enqueue(chunk?: OutputType): void {
+ if (!rs.isReadableStreamDefaultController(this)) {
+ throw new TypeError();
+ }
+ if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
+ throw new TypeError(
+ "Cannot enqueue, the stream is closing or not readable"
+ );
+ }
+ rs.readableStreamDefaultControllerEnqueue(this, chunk!);
+ }
+
+ error(e?: shared.ErrorResult): void {
+ if (!rs.isReadableStreamDefaultController(this)) {
+ throw new TypeError();
+ }
+ rs.readableStreamDefaultControllerError(this, e);
+ }
+
+ [rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> {
+ q.resetQueue(this);
+ const result = this[rs.cancelAlgorithm_](reason);
+ rs.readableStreamDefaultControllerClearAlgorithms(this);
+ return result;
+ }
+
+ [rs.pullSteps_](
+ forAuthorCode: boolean
+ ): Promise<IteratorResult<OutputType, any>> {
+ const stream = this[rs.controlledReadableStream_];
+ if (this[q.queue_].length > 0) {
+ const chunk = q.dequeueValue(this);
+ if (this[rs.closeRequested_] && this[q.queue_].length === 0) {
+ rs.readableStreamDefaultControllerClearAlgorithms(this);
+ rs.readableStreamClose(stream);
+ } else {
+ rs.readableStreamDefaultControllerCallPullIfNeeded(this);
+ }
+ return Promise.resolve(
+ rs.readableStreamCreateReadResult(chunk, false, forAuthorCode)
+ );
+ }
+
+ const pendingPromise = rs.readableStreamAddReadRequest(
+ stream,
+ forAuthorCode
+ );
+ rs.readableStreamDefaultControllerCallPullIfNeeded(this);
+ return pendingPromise;
+ }
+}
+
+export function setUpReadableStreamDefaultControllerFromUnderlyingSource<
+ OutputType
+>(
+ stream: rs.SDReadableStream<OutputType>,
+ underlyingSource: UnderlyingSource<OutputType>,
+ highWaterMark: number,
+ sizeAlgorithm: QueuingStrategySizeCallback<OutputType>
+): void {
+ // Assert: underlyingSource is not undefined.
+ const controller = Object.create(ReadableStreamDefaultController.prototype);
+ const startAlgorithm = (): any => {
+ return shared.invokeOrNoop(underlyingSource, "start", [controller]);
+ };
+ const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
+ underlyingSource,
+ "pull",
+ [controller]
+ );
+ const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
+ underlyingSource,
+ "cancel",
+ []
+ );
+ rs.setUpReadableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+}
diff --git a/cli/js/streams/readable-stream-default-reader.ts b/cli/js/streams/readable-stream-default-reader.ts
new file mode 100644
index 000000000..eb1910a9d
--- /dev/null
+++ b/cli/js/streams/readable-stream-default-reader.ts
@@ -0,0 +1,75 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/readable-stream-default-reader - ReadableStreamDefaultReader class implementation
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+import * as rs from "./readable-internals.ts";
+import * as shared from "./shared-internals.ts";
+
+export class ReadableStreamDefaultReader<OutputType>
+ implements rs.SDReadableStreamReader<OutputType> {
+ [rs.closedPromise_]: shared.ControlledPromise<void>;
+ [rs.ownerReadableStream_]: rs.SDReadableStream<OutputType> | undefined;
+ [rs.readRequests_]: Array<rs.ReadRequest<IteratorResult<OutputType>>>;
+
+ constructor(stream: rs.SDReadableStream<OutputType>) {
+ if (!rs.isReadableStream(stream)) {
+ throw new TypeError();
+ }
+ if (rs.isReadableStreamLocked(stream)) {
+ throw new TypeError("The stream is locked.");
+ }
+ rs.readableStreamReaderGenericInitialize(this, stream);
+ this[rs.readRequests_] = [];
+ }
+
+ get closed(): Promise<void> {
+ if (!rs.isReadableStreamDefaultReader(this)) {
+ return Promise.reject(new TypeError());
+ }
+ return this[rs.closedPromise_].promise;
+ }
+
+ cancel(reason: shared.ErrorResult): Promise<void> {
+ if (!rs.isReadableStreamDefaultReader(this)) {
+ return Promise.reject(new TypeError());
+ }
+ const stream = this[rs.ownerReadableStream_];
+ if (stream === undefined) {
+ return Promise.reject(
+ new TypeError("Reader is not associated with a stream")
+ );
+ }
+ return rs.readableStreamCancel(stream, reason);
+ }
+
+ read(): Promise<IteratorResult<OutputType | undefined>> {
+ if (!rs.isReadableStreamDefaultReader(this)) {
+ return Promise.reject(new TypeError());
+ }
+ if (this[rs.ownerReadableStream_] === undefined) {
+ return Promise.reject(
+ new TypeError("Reader is not associated with a stream")
+ );
+ }
+ return rs.readableStreamDefaultReaderRead(this, true);
+ }
+
+ releaseLock(): void {
+ if (!rs.isReadableStreamDefaultReader(this)) {
+ throw new TypeError();
+ }
+ if (this[rs.ownerReadableStream_] === undefined) {
+ return;
+ }
+ if (this[rs.readRequests_].length !== 0) {
+ throw new TypeError("Cannot release a stream with pending read requests");
+ }
+ rs.readableStreamReaderGenericRelease(this);
+ }
+}
diff --git a/cli/js/streams/readable-stream.ts b/cli/js/streams/readable-stream.ts
new file mode 100644
index 000000000..0c06a1041
--- /dev/null
+++ b/cli/js/streams/readable-stream.ts
@@ -0,0 +1,387 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/readable-stream - ReadableStream class implementation
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+/* eslint prefer-const: "off" */
+// TODO remove this, surpressed because of
+// 284:7 error 'branch1' is never reassigned. Use 'const' instead prefer-const
+
+import * as rs from "./readable-internals.ts";
+import * as shared from "./shared-internals.ts";
+import {
+ QueuingStrategy,
+ QueuingStrategySizeCallback,
+ UnderlyingSource,
+ UnderlyingByteSource
+} from "../dom_types.ts";
+
+import {
+ ReadableStreamDefaultController,
+ setUpReadableStreamDefaultControllerFromUnderlyingSource
+} from "./readable-stream-default-controller.ts";
+import { ReadableStreamDefaultReader } from "./readable-stream-default-reader.ts";
+
+import {
+ ReadableByteStreamController,
+ setUpReadableByteStreamControllerFromUnderlyingSource
+} from "./readable-byte-stream-controller.ts";
+import { SDReadableStreamBYOBReader } from "./readable-stream-byob-reader.ts";
+
+export class SDReadableStream<OutputType>
+ implements rs.SDReadableStream<OutputType> {
+ [shared.state_]: rs.ReadableStreamState;
+ [shared.storedError_]: shared.ErrorResult;
+ [rs.reader_]: rs.SDReadableStreamReader<OutputType> | undefined;
+ [rs.readableStreamController_]: rs.SDReadableStreamControllerBase<OutputType>;
+
+ constructor(
+ underlyingSource: UnderlyingByteSource,
+ strategy?: { highWaterMark?: number; size?: undefined }
+ );
+ constructor(
+ underlyingSource?: UnderlyingSource<OutputType>,
+ strategy?: QueuingStrategy<OutputType>
+ );
+ constructor(
+ underlyingSource: UnderlyingSource<OutputType> | UnderlyingByteSource = {},
+ strategy:
+ | QueuingStrategy<OutputType>
+ | { highWaterMark?: number; size?: undefined } = {}
+ ) {
+ rs.initializeReadableStream(this);
+
+ const sizeFunc = strategy.size;
+ const stratHWM = strategy.highWaterMark;
+ const sourceType = underlyingSource.type;
+
+ if (sourceType === undefined) {
+ const sizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(sizeFunc);
+ const highWaterMark = shared.validateAndNormalizeHighWaterMark(
+ stratHWM === undefined ? 1 : stratHWM
+ );
+ setUpReadableStreamDefaultControllerFromUnderlyingSource(
+ this,
+ underlyingSource as UnderlyingSource<OutputType>,
+ highWaterMark,
+ sizeAlgorithm
+ );
+ } else if (String(sourceType) === "bytes") {
+ if (sizeFunc !== undefined) {
+ throw new RangeError(
+ "bytes streams cannot have a strategy with a `size` field"
+ );
+ }
+ const highWaterMark = shared.validateAndNormalizeHighWaterMark(
+ stratHWM === undefined ? 0 : stratHWM
+ );
+ setUpReadableByteStreamControllerFromUnderlyingSource(
+ (this as unknown) as rs.SDReadableStream<ArrayBufferView>,
+ underlyingSource as UnderlyingByteSource,
+ highWaterMark
+ );
+ } else {
+ throw new RangeError(
+ "The underlying source's `type` field must be undefined or 'bytes'"
+ );
+ }
+ }
+
+ get locked(): boolean {
+ return rs.isReadableStreamLocked(this);
+ }
+
+ getReader(): rs.SDReadableStreamDefaultReader<OutputType>;
+ getReader(options: { mode?: "byob" }): rs.SDReadableStreamBYOBReader;
+ getReader(options?: {
+ mode?: "byob";
+ }):
+ | rs.SDReadableStreamDefaultReader<OutputType>
+ | rs.SDReadableStreamBYOBReader {
+ if (!rs.isReadableStream(this)) {
+ throw new TypeError();
+ }
+ if (options === undefined) {
+ options = {};
+ }
+ const { mode } = options;
+ if (mode === undefined) {
+ return new ReadableStreamDefaultReader(this);
+ } else if (String(mode) === "byob") {
+ return new SDReadableStreamBYOBReader(
+ (this as unknown) as rs.SDReadableStream<ArrayBufferView>
+ );
+ }
+ throw RangeError("mode option must be undefined or `byob`");
+ }
+
+ cancel(reason: shared.ErrorResult): Promise<void> {
+ if (!rs.isReadableStream(this)) {
+ return Promise.reject(new TypeError());
+ }
+ if (rs.isReadableStreamLocked(this)) {
+ return Promise.reject(new TypeError("Cannot cancel a locked stream"));
+ }
+ return rs.readableStreamCancel(this, reason);
+ }
+
+ tee(): Array<SDReadableStream<OutputType>> {
+ return readableStreamTee(this, false);
+ }
+
+ /* TODO reenable these methods when we bring in writableStreams and transport types
+ pipeThrough<ResultType>(
+ transform: rs.GenericTransformStream<OutputType, ResultType>,
+ options: PipeOptions = {}
+ ): rs.SDReadableStream<ResultType> {
+ const { readable, writable } = transform;
+ if (!rs.isReadableStream(this)) {
+ throw new TypeError();
+ }
+ if (!ws.isWritableStream(writable)) {
+ throw new TypeError("writable must be a WritableStream");
+ }
+ if (!rs.isReadableStream(readable)) {
+ throw new TypeError("readable must be a ReadableStream");
+ }
+ if (options.signal !== undefined && !shared.isAbortSignal(options.signal)) {
+ throw new TypeError("options.signal must be an AbortSignal instance");
+ }
+ if (rs.isReadableStreamLocked(this)) {
+ throw new TypeError("Cannot pipeThrough on a locked stream");
+ }
+ if (ws.isWritableStreamLocked(writable)) {
+ throw new TypeError("Cannot pipeThrough to a locked stream");
+ }
+
+ const pipeResult = pipeTo(this, writable, options);
+ pipeResult.catch(() => {});
+
+ return readable;
+ }
+
+ pipeTo(
+ dest: ws.WritableStream<OutputType>,
+ options: PipeOptions = {}
+ ): Promise<void> {
+ if (!rs.isReadableStream(this)) {
+ return Promise.reject(new TypeError());
+ }
+ if (!ws.isWritableStream(dest)) {
+ return Promise.reject(
+ new TypeError("destination must be a WritableStream")
+ );
+ }
+ if (options.signal !== undefined && !shared.isAbortSignal(options.signal)) {
+ return Promise.reject(
+ new TypeError("options.signal must be an AbortSignal instance")
+ );
+ }
+ if (rs.isReadableStreamLocked(this)) {
+ return Promise.reject(new TypeError("Cannot pipe from a locked stream"));
+ }
+ if (ws.isWritableStreamLocked(dest)) {
+ return Promise.reject(new TypeError("Cannot pipe to a locked stream"));
+ }
+
+ return pipeTo(this, dest, options);
+ }
+ */
+}
+
+export function createReadableStream<OutputType>(
+ startAlgorithm: rs.StartAlgorithm,
+ pullAlgorithm: rs.PullAlgorithm<OutputType>,
+ cancelAlgorithm: rs.CancelAlgorithm,
+ highWaterMark?: number,
+ sizeAlgorithm?: QueuingStrategySizeCallback<OutputType>
+): SDReadableStream<OutputType> {
+ if (highWaterMark === undefined) {
+ highWaterMark = 1;
+ }
+ if (sizeAlgorithm === undefined) {
+ sizeAlgorithm = (): number => 1;
+ }
+ // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
+
+ const stream = Object.create(SDReadableStream.prototype) as SDReadableStream<
+ OutputType
+ >;
+ rs.initializeReadableStream(stream);
+ const controller = Object.create(
+ ReadableStreamDefaultController.prototype
+ ) as ReadableStreamDefaultController<OutputType>;
+ rs.setUpReadableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ sizeAlgorithm
+ );
+ return stream;
+}
+
+export function createReadableByteStream<OutputType>(
+ startAlgorithm: rs.StartAlgorithm,
+ pullAlgorithm: rs.PullAlgorithm<OutputType>,
+ cancelAlgorithm: rs.CancelAlgorithm,
+ highWaterMark?: number,
+ autoAllocateChunkSize?: number
+): SDReadableStream<OutputType> {
+ if (highWaterMark === undefined) {
+ highWaterMark = 0;
+ }
+ // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
+ if (autoAllocateChunkSize !== undefined) {
+ if (
+ !shared.isInteger(autoAllocateChunkSize) ||
+ autoAllocateChunkSize <= 0
+ ) {
+ throw new RangeError(
+ "autoAllocateChunkSize must be a positive, finite integer"
+ );
+ }
+ }
+
+ const stream = Object.create(SDReadableStream.prototype) as SDReadableStream<
+ OutputType
+ >;
+ rs.initializeReadableStream(stream);
+ const controller = Object.create(
+ ReadableByteStreamController.prototype
+ ) as ReadableByteStreamController;
+ rs.setUpReadableByteStreamController(
+ (stream as unknown) as SDReadableStream<ArrayBufferView>,
+ controller,
+ startAlgorithm,
+ (pullAlgorithm as unknown) as rs.PullAlgorithm<ArrayBufferView>,
+ cancelAlgorithm,
+ highWaterMark,
+ autoAllocateChunkSize
+ );
+ return stream;
+}
+
+export function readableStreamTee<OutputType>(
+ stream: SDReadableStream<OutputType>,
+ cloneForBranch2: boolean
+): [SDReadableStream<OutputType>, SDReadableStream<OutputType>] {
+ if (!rs.isReadableStream(stream)) {
+ throw new TypeError();
+ }
+
+ const reader = new ReadableStreamDefaultReader(stream);
+ let closedOrErrored = false;
+ let canceled1 = false;
+ let canceled2 = false;
+ let reason1: shared.ErrorResult;
+ let reason2: shared.ErrorResult;
+ let branch1: SDReadableStream<OutputType>;
+ let branch2: SDReadableStream<OutputType>;
+
+ let cancelResolve: (reason: shared.ErrorResult) => void;
+ const cancelPromise = new Promise<void>(resolve => (cancelResolve = resolve));
+
+ const pullAlgorithm = (): Promise<void> => {
+ return rs
+ .readableStreamDefaultReaderRead(reader)
+ .then(({ value, done }) => {
+ if (done && !closedOrErrored) {
+ if (!canceled1) {
+ rs.readableStreamDefaultControllerClose(branch1![
+ rs.readableStreamController_
+ ] as ReadableStreamDefaultController<OutputType>);
+ }
+ if (!canceled2) {
+ rs.readableStreamDefaultControllerClose(branch2![
+ rs.readableStreamController_
+ ] as ReadableStreamDefaultController<OutputType>);
+ }
+ closedOrErrored = true;
+ }
+ if (closedOrErrored) {
+ return;
+ }
+ const value1 = value;
+ let value2 = value;
+ if (!canceled1) {
+ rs.readableStreamDefaultControllerEnqueue(
+ branch1![
+ rs.readableStreamController_
+ ] as ReadableStreamDefaultController<OutputType>,
+ value1!
+ );
+ }
+ if (!canceled2) {
+ if (cloneForBranch2) {
+ value2 = shared.cloneValue(value2);
+ }
+ rs.readableStreamDefaultControllerEnqueue(
+ branch2![
+ rs.readableStreamController_
+ ] as ReadableStreamDefaultController<OutputType>,
+ value2!
+ );
+ }
+ });
+ };
+
+ const cancel1Algorithm = (reason: shared.ErrorResult): Promise<void> => {
+ canceled1 = true;
+ reason1 = reason;
+ if (canceled2) {
+ const cancelResult = rs.readableStreamCancel(stream, [reason1, reason2]);
+ cancelResolve(cancelResult);
+ }
+ return cancelPromise;
+ };
+
+ const cancel2Algorithm = (reason: shared.ErrorResult): Promise<void> => {
+ canceled2 = true;
+ reason2 = reason;
+ if (canceled1) {
+ const cancelResult = rs.readableStreamCancel(stream, [reason1, reason2]);
+ cancelResolve(cancelResult);
+ }
+ return cancelPromise;
+ };
+
+ const startAlgorithm = (): undefined => undefined;
+ branch1 = createReadableStream(
+ startAlgorithm,
+ pullAlgorithm,
+ cancel1Algorithm
+ );
+ branch2 = createReadableStream(
+ startAlgorithm,
+ pullAlgorithm,
+ cancel2Algorithm
+ );
+
+ reader[rs.closedPromise_].promise.catch(error => {
+ if (!closedOrErrored) {
+ rs.readableStreamDefaultControllerError(
+ branch1![
+ rs.readableStreamController_
+ ] as ReadableStreamDefaultController<OutputType>,
+ error
+ );
+ rs.readableStreamDefaultControllerError(
+ branch2![
+ rs.readableStreamController_
+ ] as ReadableStreamDefaultController<OutputType>,
+ error
+ );
+ closedOrErrored = true;
+ }
+ });
+
+ return [branch1, branch2];
+}
diff --git a/cli/js/streams/shared-internals.ts b/cli/js/streams/shared-internals.ts
new file mode 100644
index 000000000..90e66e591
--- /dev/null
+++ b/cli/js/streams/shared-internals.ts
@@ -0,0 +1,310 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/shared-internals - common types and methods for streams
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+/* eslint-disable @typescript-eslint/no-explicit-any */
+// TODO don't disable this warning
+
+import { AbortSignal, QueuingStrategySizeCallback } from "../dom_types.ts";
+import { DenoError, ErrorKind } from "../errors.ts";
+
+// common stream fields
+
+export const state_ = Symbol("state_");
+export const storedError_ = Symbol("storedError_");
+
+// ---------
+
+/** An error reason / result can be anything */
+export type ErrorResult = any;
+
+// ---------
+
+export function isInteger(value: number): boolean {
+ if (!isFinite(value)) {
+ // covers NaN, +Infinity and -Infinity
+ return false;
+ }
+ const absValue = Math.abs(value);
+ return Math.floor(absValue) === absValue;
+}
+
+export function isFiniteNonNegativeNumber(value: unknown): boolean {
+ if (!(typeof value === "number" && isFinite(value))) {
+ // covers NaN, +Infinity and -Infinity
+ return false;
+ }
+ return value >= 0;
+}
+
+export function isAbortSignal(signal: any): signal is AbortSignal {
+ if (typeof signal !== "object" || signal === null) {
+ return false;
+ }
+ try {
+ // TODO
+ // calling signal.aborted() probably isn't the right way to perform this test
+ // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/shared-internals.ts#L41
+ signal.aborted();
+ return true;
+ } catch (err) {
+ return false;
+ }
+}
+
+export function invokeOrNoop<O extends object, P extends keyof O>(
+ o: O,
+ p: P,
+ args: any[]
+): any {
+ // Assert: O is not undefined.
+ // Assert: IsPropertyKey(P) is true.
+ // Assert: args is a List.
+ const method: Function | undefined = (o as any)[p]; // tslint:disable-line:ban-types
+ if (method === undefined) {
+ return undefined;
+ }
+ return Function.prototype.apply.call(method, o, args);
+}
+
+export function cloneArrayBuffer(
+ srcBuffer: ArrayBufferLike,
+ srcByteOffset: number,
+ srcLength: number,
+ cloneConstructor: ArrayBufferConstructor | SharedArrayBufferConstructor
+): InstanceType<typeof cloneConstructor> {
+ // this function fudges the return type but SharedArrayBuffer is disabled for a while anyway
+ return srcBuffer.slice(
+ srcByteOffset,
+ srcByteOffset + srcLength
+ ) as InstanceType<typeof cloneConstructor>;
+}
+
+export function transferArrayBuffer(buffer: ArrayBufferLike): ArrayBuffer {
+ // This would in a JS engine context detach the buffer's backing store and return
+ // a new ArrayBuffer with the same backing store, invalidating `buffer`,
+ // i.e. a move operation in C++ parlance.
+ // Sadly ArrayBuffer.transfer is yet to be implemented by a single browser vendor.
+ return buffer.slice(0); // copies instead of moves
+}
+
+export function copyDataBlockBytes(
+ toBlock: ArrayBufferLike,
+ toIndex: number,
+ fromBlock: ArrayBufferLike,
+ fromIndex: number,
+ count: number
+): void {
+ new Uint8Array(toBlock, toIndex, count).set(
+ new Uint8Array(fromBlock, fromIndex, count)
+ );
+}
+
+// helper memoisation map for object values
+// weak so it doesn't keep memoized versions of old objects indefinitely.
+const objectCloneMemo = new WeakMap<object, object>();
+
+let sharedArrayBufferSupported_: boolean | undefined;
+function supportsSharedArrayBuffer(): boolean {
+ if (sharedArrayBufferSupported_ === undefined) {
+ try {
+ new SharedArrayBuffer(16);
+ sharedArrayBufferSupported_ = true;
+ } catch (e) {
+ sharedArrayBufferSupported_ = false;
+ }
+ }
+ return sharedArrayBufferSupported_;
+}
+
+/**
+ * Implement a method of value cloning that is reasonably close to performing `StructuredSerialize(StructuredDeserialize(value))`
+ * from the HTML standard. Used by the internal `readableStreamTee` method to clone values for connected implementations.
+ * @see https://html.spec.whatwg.org/multipage/structured-data.html#structuredserializeinternal
+ */
+export function cloneValue(value: any): any {
+ const valueType = typeof value;
+ switch (valueType) {
+ case "number":
+ case "string":
+ case "boolean":
+ case "undefined":
+ // @ts-ignore
+ case "bigint":
+ return value;
+ case "object": {
+ if (objectCloneMemo.has(value)) {
+ return objectCloneMemo.get(value);
+ }
+ if (value === null) {
+ return value;
+ }
+ if (value instanceof Date) {
+ return new Date(value.valueOf());
+ }
+ if (value instanceof RegExp) {
+ return new RegExp(value);
+ }
+ if (supportsSharedArrayBuffer() && value instanceof SharedArrayBuffer) {
+ return value;
+ }
+ if (value instanceof ArrayBuffer) {
+ const cloned = cloneArrayBuffer(
+ value,
+ 0,
+ value.byteLength,
+ ArrayBuffer
+ );
+ objectCloneMemo.set(value, cloned);
+ return cloned;
+ }
+ if (ArrayBuffer.isView(value)) {
+ const clonedBuffer = cloneValue(value.buffer) as ArrayBufferLike;
+ // Use DataViewConstructor type purely for type-checking, can be a DataView or TypedArray.
+ // They use the same constructor signature, only DataView has a length in bytes and TypedArrays
+ // use a length in terms of elements, so we adjust for that.
+ let length: number;
+ if (value instanceof DataView) {
+ length = value.byteLength;
+ } else {
+ length = (value as Uint8Array).length;
+ }
+ return new (value.constructor as DataViewConstructor)(
+ clonedBuffer,
+ value.byteOffset,
+ length
+ );
+ }
+ if (value instanceof Map) {
+ const clonedMap = new Map();
+ objectCloneMemo.set(value, clonedMap);
+ value.forEach((v, k) => clonedMap.set(k, cloneValue(v)));
+ return clonedMap;
+ }
+ if (value instanceof Set) {
+ const clonedSet = new Map();
+ objectCloneMemo.set(value, clonedSet);
+ value.forEach((v, k) => clonedSet.set(k, cloneValue(v)));
+ return clonedSet;
+ }
+
+ // generic object
+ const clonedObj = {} as any;
+ objectCloneMemo.set(value, clonedObj);
+ const sourceKeys = Object.getOwnPropertyNames(value);
+ for (const key of sourceKeys) {
+ clonedObj[key] = cloneValue(value[key]);
+ }
+ return clonedObj;
+ }
+ case "symbol":
+ case "function":
+ default:
+ // TODO this should be a DOMException,
+ // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/shared-internals.ts#L171
+ throw new DenoError(
+ ErrorKind.DataCloneError,
+ "Uncloneable value in stream"
+ );
+ }
+}
+
+export function promiseCall<F extends Function>(
+ f: F,
+ v: object | undefined,
+ args: any[]
+): Promise<any> {
+ // tslint:disable-line:ban-types
+ try {
+ const result = Function.prototype.apply.call(f, v, args);
+ return Promise.resolve(result);
+ } catch (err) {
+ return Promise.reject(err);
+ }
+}
+
+export function createAlgorithmFromUnderlyingMethod<
+ O extends object,
+ K extends keyof O
+>(obj: O, methodName: K, extraArgs: any[]): any {
+ const method = obj[methodName];
+ if (method === undefined) {
+ return (): any => Promise.resolve(undefined);
+ }
+ if (typeof method !== "function") {
+ throw new TypeError(`Field "${methodName}" is not a function.`);
+ }
+ return function(...fnArgs: any[]): any {
+ return promiseCall(method, obj, fnArgs.concat(extraArgs));
+ };
+}
+
+/*
+Deprecated for now, all usages replaced by readableStreamCreateReadResult
+
+function createIterResultObject<T>(value: T, done: boolean): IteratorResult<T> {
+ return { value, done };
+}
+*/
+
+export function validateAndNormalizeHighWaterMark(hwm: unknown): number {
+ const highWaterMark = Number(hwm);
+ if (isNaN(highWaterMark) || highWaterMark < 0) {
+ throw new RangeError(
+ "highWaterMark must be a valid, non-negative integer."
+ );
+ }
+ return highWaterMark;
+}
+
+export function makeSizeAlgorithmFromSizeFunction<T>(
+ sizeFn: undefined | ((chunk: T) => number)
+): QueuingStrategySizeCallback<T> {
+ if (typeof sizeFn !== "function" && typeof sizeFn !== "undefined") {
+ throw new TypeError("size function must be undefined or a function");
+ }
+ return function(chunk: T): number {
+ if (typeof sizeFn === "function") {
+ return sizeFn(chunk);
+ }
+ return 1;
+ };
+}
+
+// ----
+
+export const enum ControlledPromiseState {
+ Pending,
+ Resolved,
+ Rejected
+}
+
+export interface ControlledPromise<V> {
+ resolve(value?: V): void;
+ reject(error: ErrorResult): void;
+ promise: Promise<V>;
+ state: ControlledPromiseState;
+}
+
+export function createControlledPromise<V>(): ControlledPromise<V> {
+ const conProm = {
+ state: ControlledPromiseState.Pending
+ } as ControlledPromise<V>;
+ conProm.promise = new Promise<V>(function(resolve, reject) {
+ conProm.resolve = function(v?: V): void {
+ conProm.state = ControlledPromiseState.Resolved;
+ resolve(v);
+ };
+ conProm.reject = function(e?: ErrorResult): void {
+ conProm.state = ControlledPromiseState.Rejected;
+ reject(e);
+ };
+ });
+ return conProm;
+}
diff --git a/cli/js/streams/strategies.ts b/cli/js/streams/strategies.ts
new file mode 100644
index 000000000..5f7ffc632
--- /dev/null
+++ b/cli/js/streams/strategies.ts
@@ -0,0 +1,39 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/strategies - implementation of the built-in stream strategies
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+/* eslint-disable @typescript-eslint/no-explicit-any */
+// TODO reenable this lint here
+
+import { QueuingStrategy } from "../dom_types.ts";
+
+export class ByteLengthQueuingStrategy
+ implements QueuingStrategy<ArrayBufferView> {
+ highWaterMark: number;
+
+ constructor(options: { highWaterMark: number }) {
+ this.highWaterMark = options.highWaterMark;
+ }
+
+ size(chunk: ArrayBufferView): number {
+ return chunk.byteLength;
+ }
+}
+
+export class CountQueuingStrategy implements QueuingStrategy<any> {
+ highWaterMark: number;
+
+ constructor(options: { highWaterMark: number }) {
+ this.highWaterMark = options.highWaterMark;
+ }
+
+ size(): number {
+ return 1;
+ }
+}
diff --git a/cli/js/streams/transform-internals.ts b/cli/js/streams/transform-internals.ts
new file mode 100644
index 000000000..4c5e3657d
--- /dev/null
+++ b/cli/js/streams/transform-internals.ts
@@ -0,0 +1,371 @@
+// TODO reenable this code when we enable writableStreams and transport types
+// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+// /**
+// * streams/transform-internals - internal types and functions for transform streams
+// * Part of Stardazed
+// * (c) 2018-Present by Arthur Langereis - @zenmumbler
+// * https://github.com/stardazed/sd-streams
+// */
+
+// /* eslint-disable @typescript-eslint/no-explicit-any */
+// // TODO reenable this lint here
+
+// import * as rs from "./readable-internals.ts";
+// import * as ws from "./writable-internals.ts";
+// import * as shared from "./shared-internals.ts";
+
+// import { createReadableStream } from "./readable-stream.ts";
+// import { createWritableStream } from "./writable-stream.ts";
+
+// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.ts";
+
+// export const state_ = Symbol("transformState_");
+// export const backpressure_ = Symbol("backpressure_");
+// export const backpressureChangePromise_ = Symbol("backpressureChangePromise_");
+// export const readable_ = Symbol("readable_");
+// export const transformStreamController_ = Symbol("transformStreamController_");
+// export const writable_ = Symbol("writable_");
+
+// export const controlledTransformStream_ = Symbol("controlledTransformStream_");
+// export const flushAlgorithm_ = Symbol("flushAlgorithm_");
+// export const transformAlgorithm_ = Symbol("transformAlgorithm_");
+
+// // ----
+
+// export type TransformFunction<InputType, OutputType> = (
+// chunk: InputType,
+// controller: TransformStreamDefaultController<InputType, OutputType>
+// ) => void | PromiseLike<void>;
+// export type TransformAlgorithm<InputType> = (chunk: InputType) => Promise<void>;
+// export type FlushFunction<InputType, OutputType> = (
+// controller: TransformStreamDefaultController<InputType, OutputType>
+// ) => void | PromiseLike<void>;
+// export type FlushAlgorithm = () => Promise<void>;
+
+// // ----
+
+// export interface TransformStreamDefaultController<InputType, OutputType> {
+// readonly desiredSize: number | null;
+// enqueue(chunk: OutputType): void;
+// error(reason: shared.ErrorResult): void;
+// terminate(): void;
+
+// [controlledTransformStream_]: TransformStream<InputType, OutputType>; // The TransformStream instance controlled; also used for the IsTransformStreamDefaultController brand check
+// [flushAlgorithm_]: FlushAlgorithm; // A promise - returning algorithm which communicates a requested close to the transformer
+// [transformAlgorithm_]: TransformAlgorithm<InputType>; // A promise - returning algorithm, taking one argument(the chunk to transform), which requests the transformer perform its transformation
+// }
+
+// export interface Transformer<InputType, OutputType> {
+// start?(
+// controller: TransformStreamDefaultController<InputType, OutputType>
+// ): void | PromiseLike<void>;
+// transform?: TransformFunction<InputType, OutputType>;
+// flush?: FlushFunction<InputType, OutputType>;
+
+// readableType?: undefined; // for future spec changes
+// writableType?: undefined; // for future spec changes
+// }
+
+// export declare class TransformStream<InputType, OutputType> {
+// constructor(
+// transformer: Transformer<InputType, OutputType>,
+// writableStrategy: QueuingStrategy<InputType>,
+// readableStrategy: QueuingStrategy<OutputType>
+// );
+
+// readonly readable: rs.SDReadableStream<OutputType>;
+// readonly writable: ws.WritableStream<InputType>;
+
+// [backpressure_]: boolean | undefined; // Whether there was backpressure on [[readable]] the last time it was observed
+// [backpressureChangePromise_]: shared.ControlledPromise<void> | undefined; // A promise which is fulfilled and replaced every time the value of[[backpressure]] changes
+// [readable_]: rs.SDReadableStream<OutputType>; // The ReadableStream instance controlled by this object
+// [transformStreamController_]: TransformStreamDefaultController<
+// InputType,
+// OutputType
+// >; // A TransformStreamDefaultController created with the ability to control[[readable]] and[[writable]]; also used for the IsTransformStream brand check
+// [writable_]: ws.WritableStream<InputType>; // The WritableStream instance controlled by this object
+// }
+
+// // ---- TransformStream
+
+// export function isTransformStream(
+// value: unknown
+// ): value is TransformStream<any, any> {
+// if (typeof value !== "object" || value === null) {
+// return false;
+// }
+// return transformStreamController_ in value;
+// }
+
+// export function initializeTransformStream<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// startPromise: Promise<void>,
+// writableHighWaterMark: number,
+// writableSizeAlgorithm: QueuingStrategySizeCallback<InputType>,
+// readableHighWaterMark: number,
+// readableSizeAlgorithm: QueuingStrategySizeCallback<OutputType>
+// ): void {
+// const startAlgorithm = function(): Promise<void> {
+// return startPromise;
+// };
+// const writeAlgorithm = function(chunk: InputType): Promise<void> {
+// return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
+// };
+// const abortAlgorithm = function(reason: shared.ErrorResult): Promise<void> {
+// return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
+// };
+// const closeAlgorithm = function(): Promise<void> {
+// return transformStreamDefaultSinkCloseAlgorithm(stream);
+// };
+// stream[writable_] = createWritableStream<InputType>(
+// startAlgorithm,
+// writeAlgorithm,
+// closeAlgorithm,
+// abortAlgorithm,
+// writableHighWaterMark,
+// writableSizeAlgorithm
+// );
+
+// const pullAlgorithm = function(): Promise<void> {
+// return transformStreamDefaultSourcePullAlgorithm(stream);
+// };
+// const cancelAlgorithm = function(
+// reason: shared.ErrorResult
+// ): Promise<undefined> {
+// transformStreamErrorWritableAndUnblockWrite(stream, reason);
+// return Promise.resolve(undefined);
+// };
+// stream[readable_] = createReadableStream(
+// startAlgorithm,
+// pullAlgorithm,
+// cancelAlgorithm,
+// readableHighWaterMark,
+// readableSizeAlgorithm
+// );
+
+// stream[backpressure_] = undefined;
+// stream[backpressureChangePromise_] = undefined;
+// transformStreamSetBackpressure(stream, true);
+// stream[transformStreamController_] = undefined!; // initialize slot for brand-check
+// }
+
+// export function transformStreamError<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// error: shared.ErrorResult
+// ): void {
+// rs.readableStreamDefaultControllerError(
+// stream[readable_][
+// rs.readableStreamController_
+// ] as rs.SDReadableStreamDefaultController<OutputType>,
+// error
+// );
+// transformStreamErrorWritableAndUnblockWrite(stream, error);
+// }
+
+// export function transformStreamErrorWritableAndUnblockWrite<
+// InputType,
+// OutputType
+// >(
+// stream: TransformStream<InputType, OutputType>,
+// error: shared.ErrorResult
+// ): void {
+// transformStreamDefaultControllerClearAlgorithms(
+// stream[transformStreamController_]
+// );
+// ws.writableStreamDefaultControllerErrorIfNeeded(
+// stream[writable_][ws.writableStreamController_]!,
+// error
+// );
+// if (stream[backpressure_]) {
+// transformStreamSetBackpressure(stream, false);
+// }
+// }
+
+// export function transformStreamSetBackpressure<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// backpressure: boolean
+// ): void {
+// // Assert: stream.[[backpressure]] is not backpressure.
+// if (stream[backpressure_] !== undefined) {
+// stream[backpressureChangePromise_]!.resolve(undefined);
+// }
+// stream[backpressureChangePromise_] = shared.createControlledPromise<void>();
+// stream[backpressure_] = backpressure;
+// }
+
+// // ---- TransformStreamDefaultController
+
+// export function isTransformStreamDefaultController(
+// value: unknown
+// ): value is TransformStreamDefaultController<any, any> {
+// if (typeof value !== "object" || value === null) {
+// return false;
+// }
+// return controlledTransformStream_ in value;
+// }
+
+// export function setUpTransformStreamDefaultController<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// controller: TransformStreamDefaultController<InputType, OutputType>,
+// transformAlgorithm: TransformAlgorithm<InputType>,
+// flushAlgorithm: FlushAlgorithm
+// ): void {
+// // Assert: ! IsTransformStream(stream) is true.
+// // Assert: stream.[[transformStreamController]] is undefined.
+// controller[controlledTransformStream_] = stream;
+// stream[transformStreamController_] = controller;
+// controller[transformAlgorithm_] = transformAlgorithm;
+// controller[flushAlgorithm_] = flushAlgorithm;
+// }
+
+// export function transformStreamDefaultControllerClearAlgorithms<
+// InputType,
+// OutputType
+// >(controller: TransformStreamDefaultController<InputType, OutputType>): void {
+// // Use ! assertions to override type check here, this way we don't
+// // have to perform type checks/assertions everywhere else.
+// controller[transformAlgorithm_] = undefined!;
+// controller[flushAlgorithm_] = undefined!;
+// }
+
+// export function transformStreamDefaultControllerEnqueue<InputType, OutputType>(
+// controller: TransformStreamDefaultController<InputType, OutputType>,
+// chunk: OutputType
+// ): void {
+// const stream = controller[controlledTransformStream_];
+// const readableController = stream[readable_][
+// rs.readableStreamController_
+// ] as rs.SDReadableStreamDefaultController<OutputType>;
+// if (
+// !rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
+// ) {
+// throw new TypeError();
+// }
+// try {
+// rs.readableStreamDefaultControllerEnqueue(readableController, chunk);
+// } catch (error) {
+// transformStreamErrorWritableAndUnblockWrite(stream, error);
+// throw stream[readable_][shared.storedError_];
+// }
+// const backpressure = rs.readableStreamDefaultControllerHasBackpressure(
+// readableController
+// );
+// if (backpressure !== stream[backpressure_]) {
+// // Assert: backpressure is true.
+// transformStreamSetBackpressure(stream, true);
+// }
+// }
+
+// export function transformStreamDefaultControllerError<InputType, OutputType>(
+// controller: TransformStreamDefaultController<InputType, OutputType>,
+// error: shared.ErrorResult
+// ): void {
+// transformStreamError(controller[controlledTransformStream_], error);
+// }
+
+// export function transformStreamDefaultControllerPerformTransform<
+// InputType,
+// OutputType
+// >(
+// controller: TransformStreamDefaultController<InputType, OutputType>,
+// chunk: InputType
+// ): Promise<void> {
+// const transformPromise = controller[transformAlgorithm_](chunk);
+// return transformPromise.catch(error => {
+// transformStreamError(controller[controlledTransformStream_], error);
+// throw error;
+// });
+// }
+
+// export function transformStreamDefaultControllerTerminate<
+// InputType,
+// OutputType
+// >(controller: TransformStreamDefaultController<InputType, OutputType>): void {
+// const stream = controller[controlledTransformStream_];
+// const readableController = stream[readable_][
+// rs.readableStreamController_
+// ] as rs.SDReadableStreamDefaultController<OutputType>;
+// if (rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
+// rs.readableStreamDefaultControllerClose(readableController);
+// }
+// const error = new TypeError("The transform stream has been terminated");
+// transformStreamErrorWritableAndUnblockWrite(stream, error);
+// }
+
+// // ---- Transform Sinks
+
+// export function transformStreamDefaultSinkWriteAlgorithm<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// chunk: InputType
+// ): Promise<void> {
+// // Assert: stream.[[writable]].[[state]] is "writable".
+// const controller = stream[transformStreamController_];
+// if (stream[backpressure_]) {
+// const backpressureChangePromise = stream[backpressureChangePromise_]!;
+// // Assert: backpressureChangePromise is not undefined.
+// return backpressureChangePromise.promise.then(_ => {
+// const writable = stream[writable_];
+// const state = writable[shared.state_];
+// if (state === "erroring") {
+// throw writable[shared.storedError_];
+// }
+// // Assert: state is "writable".
+// return transformStreamDefaultControllerPerformTransform(
+// controller,
+// chunk
+// );
+// });
+// }
+// return transformStreamDefaultControllerPerformTransform(controller, chunk);
+// }
+
+// export function transformStreamDefaultSinkAbortAlgorithm<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>,
+// reason: shared.ErrorResult
+// ): Promise<void> {
+// transformStreamError(stream, reason);
+// return Promise.resolve(undefined);
+// }
+
+// export function transformStreamDefaultSinkCloseAlgorithm<InputType, OutputType>(
+// stream: TransformStream<InputType, OutputType>
+// ): Promise<void> {
+// const readable = stream[readable_];
+// const controller = stream[transformStreamController_];
+// const flushPromise = controller[flushAlgorithm_]();
+// transformStreamDefaultControllerClearAlgorithms(controller);
+
+// return flushPromise.then(
+// _ => {
+// if (readable[shared.state_] === "errored") {
+// throw readable[shared.storedError_];
+// }
+// const readableController = readable[
+// rs.readableStreamController_
+// ] as rs.SDReadableStreamDefaultController<OutputType>;
+// if (
+// rs.readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
+// ) {
+// rs.readableStreamDefaultControllerClose(readableController);
+// }
+// },
+// error => {
+// transformStreamError(stream, error);
+// throw readable[shared.storedError_];
+// }
+// );
+// }
+
+// // ---- Transform Sources
+
+// export function transformStreamDefaultSourcePullAlgorithm<
+// InputType,
+// OutputType
+// >(stream: TransformStream<InputType, OutputType>): Promise<void> {
+// // Assert: stream.[[backpressure]] is true.
+// // Assert: stream.[[backpressureChangePromise]] is not undefined.
+// transformStreamSetBackpressure(stream, false);
+// return stream[backpressureChangePromise_]!.promise;
+// }
diff --git a/cli/js/streams/transform-stream-default-controller.ts b/cli/js/streams/transform-stream-default-controller.ts
new file mode 100644
index 000000000..24a8d08fd
--- /dev/null
+++ b/cli/js/streams/transform-stream-default-controller.ts
@@ -0,0 +1,58 @@
+// TODO reenable this code when we enable writableStreams and transport types
+// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+// /**
+// * streams/transform-stream-default-controller - TransformStreamDefaultController class implementation
+// * Part of Stardazed
+// * (c) 2018-Present by Arthur Langereis - @zenmumbler
+// * https://github.com/stardazed/sd-streams
+// */
+
+// import * as rs from "./readable-internals.ts";
+// import * as ts from "./transform-internals.ts";
+// import { ErrorResult } from "./shared-internals.ts";
+
+// export class TransformStreamDefaultController<InputType, OutputType>
+// implements ts.TransformStreamDefaultController<InputType, OutputType> {
+// [ts.controlledTransformStream_]: ts.TransformStream<InputType, OutputType>;
+// [ts.flushAlgorithm_]: ts.FlushAlgorithm;
+// [ts.transformAlgorithm_]: ts.TransformAlgorithm<InputType>;
+
+// constructor() {
+// throw new TypeError();
+// }
+
+// get desiredSize(): number | null {
+// if (!ts.isTransformStreamDefaultController(this)) {
+// throw new TypeError();
+// }
+// const readableController = this[ts.controlledTransformStream_][
+// ts.readable_
+// ][rs.readableStreamController_] as rs.SDReadableStreamDefaultController<
+// OutputType
+// >;
+// return rs.readableStreamDefaultControllerGetDesiredSize(readableController);
+// }
+
+// enqueue(chunk: OutputType): void {
+// if (!ts.isTransformStreamDefaultController(this)) {
+// throw new TypeError();
+// }
+// ts.transformStreamDefaultControllerEnqueue(this, chunk);
+// }
+
+// error(reason: ErrorResult): void {
+// if (!ts.isTransformStreamDefaultController(this)) {
+// throw new TypeError();
+// }
+// ts.transformStreamDefaultControllerError(this, reason);
+// }
+
+// terminate(): void {
+// if (!ts.isTransformStreamDefaultController(this)) {
+// throw new TypeError();
+// }
+// ts.transformStreamDefaultControllerTerminate(this);
+// }
+// }
diff --git a/cli/js/streams/transform-stream.ts b/cli/js/streams/transform-stream.ts
new file mode 100644
index 000000000..090f78135
--- /dev/null
+++ b/cli/js/streams/transform-stream.ts
@@ -0,0 +1,147 @@
+// TODO reenable this code when we enable writableStreams and transport types
+// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+// /**
+// * streams/transform-stream - TransformStream class implementation
+// * Part of Stardazed
+// * (c) 2018-Present by Arthur Langereis - @zenmumbler
+// * https://github.com/stardazed/sd-streams
+// */
+
+// /* eslint-disable @typescript-eslint/no-explicit-any */
+// // TODO reenable this lint here
+
+// import * as rs from "./readable-internals.ts";
+// import * as ws from "./writable-internals.ts";
+// import * as ts from "./transform-internals.ts";
+// import * as shared from "./shared-internals.ts";
+// import { TransformStreamDefaultController } from "./transform-stream-default-controller.ts";
+// import { QueuingStrategy } from "../dom_types.ts";
+
+// export class TransformStream<InputType, OutputType> {
+// [ts.backpressure_]: boolean | undefined; // Whether there was backpressure on [[readable]] the last time it was observed
+// [ts.backpressureChangePromise_]: shared.ControlledPromise<void>; // A promise which is fulfilled and replaced every time the value of[[backpressure]] changes
+// [ts.readable_]: rs.SDReadableStream<OutputType>; // The ReadableStream instance controlled by this object
+// [ts.transformStreamController_]: TransformStreamDefaultController<
+// InputType,
+// OutputType
+// >; // A TransformStreamDefaultController created with the ability to control[[readable]] and[[writable]]; also used for the IsTransformStream brand check
+// [ts.writable_]: ws.WritableStream<InputType>; // The WritableStream instance controlled by this object
+
+// constructor(
+// transformer: ts.Transformer<InputType, OutputType> = {},
+// writableStrategy: QueuingStrategy<InputType> = {},
+// readableStrategy: QueuingStrategy<OutputType> = {}
+// ) {
+// const writableSizeFunction = writableStrategy.size;
+// const writableHighWaterMark = writableStrategy.highWaterMark;
+// const readableSizeFunction = readableStrategy.size;
+// const readableHighWaterMark = readableStrategy.highWaterMark;
+
+// const writableType = transformer.writableType;
+// if (writableType !== undefined) {
+// throw new RangeError(
+// "The transformer's `writableType` field must be undefined"
+// );
+// }
+// const writableSizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(
+// writableSizeFunction
+// );
+// const writableHWM = shared.validateAndNormalizeHighWaterMark(
+// writableHighWaterMark === undefined ? 1 : writableHighWaterMark
+// );
+
+// const readableType = transformer.readableType;
+// if (readableType !== undefined) {
+// throw new RangeError(
+// "The transformer's `readableType` field must be undefined"
+// );
+// }
+// const readableSizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(
+// readableSizeFunction
+// );
+// const readableHWM = shared.validateAndNormalizeHighWaterMark(
+// readableHighWaterMark === undefined ? 0 : readableHighWaterMark
+// );
+
+// const startPromise = shared.createControlledPromise<void>();
+// ts.initializeTransformStream(
+// this,
+// startPromise.promise,
+// writableHWM,
+// writableSizeAlgorithm,
+// readableHWM,
+// readableSizeAlgorithm
+// );
+// setUpTransformStreamDefaultControllerFromTransformer(this, transformer);
+
+// const startResult = shared.invokeOrNoop(transformer, "start", [
+// this[ts.transformStreamController_]
+// ]);
+// startPromise.resolve(startResult);
+// }
+
+// get readable(): rs.SDReadableStream<OutputType> {
+// if (!ts.isTransformStream(this)) {
+// throw new TypeError();
+// }
+// return this[ts.readable_];
+// }
+
+// get writable(): ws.WritableStream<InputType> {
+// if (!ts.isTransformStream(this)) {
+// throw new TypeError();
+// }
+// return this[ts.writable_];
+// }
+// }
+
+// function setUpTransformStreamDefaultControllerFromTransformer<
+// InputType,
+// OutputType
+// >(
+// stream: TransformStream<InputType, OutputType>,
+// transformer: ts.Transformer<InputType, OutputType>
+// ): void {
+// const controller = Object.create(
+// TransformStreamDefaultController.prototype
+// ) as TransformStreamDefaultController<InputType, OutputType>;
+// let transformAlgorithm: ts.TransformAlgorithm<InputType>;
+
+// const transformMethod = transformer.transform;
+// if (transformMethod !== undefined) {
+// if (typeof transformMethod !== "function") {
+// throw new TypeError(
+// "`transform` field of the transformer must be a function"
+// );
+// }
+// transformAlgorithm = (chunk: InputType): Promise<any> =>
+// shared.promiseCall(transformMethod, transformer, [chunk, controller]);
+// } else {
+// // use identity transform
+// transformAlgorithm = function(chunk: InputType): Promise<void> {
+// try {
+// // OutputType and InputType are the same here
+// ts.transformStreamDefaultControllerEnqueue(
+// controller,
+// (chunk as unknown) as OutputType
+// );
+// } catch (error) {
+// return Promise.reject(error);
+// }
+// return Promise.resolve(undefined);
+// };
+// }
+// const flushAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
+// transformer,
+// "flush",
+// [controller]
+// );
+// ts.setUpTransformStreamDefaultController(
+// stream,
+// controller,
+// transformAlgorithm,
+// flushAlgorithm
+// );
+// }
diff --git a/cli/js/streams/writable-internals.ts b/cli/js/streams/writable-internals.ts
new file mode 100644
index 000000000..78bb19a28
--- /dev/null
+++ b/cli/js/streams/writable-internals.ts
@@ -0,0 +1,800 @@
+// TODO reenable this code when we enable writableStreams and transport types
+// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+// /**
+// * streams/writable-internals - internal types and functions for writable streams
+// * Part of Stardazed
+// * (c) 2018-Present by Arthur Langereis - @zenmumbler
+// * https://github.com/stardazed/sd-streams
+// */
+
+// /* eslint-disable @typescript-eslint/no-explicit-any */
+// // TODO reenable this lint here
+
+// import * as shared from "./shared-internals.ts";
+// import * as q from "./queue-mixin.ts";
+
+// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.ts";
+
+// export const backpressure_ = Symbol("backpressure_");
+// export const closeRequest_ = Symbol("closeRequest_");
+// export const inFlightWriteRequest_ = Symbol("inFlightWriteRequest_");
+// export const inFlightCloseRequest_ = Symbol("inFlightCloseRequest_");
+// export const pendingAbortRequest_ = Symbol("pendingAbortRequest_");
+// export const writableStreamController_ = Symbol("writableStreamController_");
+// export const writer_ = Symbol("writer_");
+// export const writeRequests_ = Symbol("writeRequests_");
+
+// export const abortAlgorithm_ = Symbol("abortAlgorithm_");
+// export const closeAlgorithm_ = Symbol("closeAlgorithm_");
+// export const controlledWritableStream_ = Symbol("controlledWritableStream_");
+// export const started_ = Symbol("started_");
+// export const strategyHWM_ = Symbol("strategyHWM_");
+// export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_");
+// export const writeAlgorithm_ = Symbol("writeAlgorithm_");
+
+// export const ownerWritableStream_ = Symbol("ownerWritableStream_");
+// export const closedPromise_ = Symbol("closedPromise_");
+// export const readyPromise_ = Symbol("readyPromise_");
+
+// export const errorSteps_ = Symbol("errorSteps_");
+// export const abortSteps_ = Symbol("abortSteps_");
+
+// export type StartFunction = (
+// controller: WritableStreamController
+// ) => void | PromiseLike<void>;
+// export type StartAlgorithm = () => Promise<void> | void;
+// export type WriteFunction<InputType> = (
+// chunk: InputType,
+// controller: WritableStreamController
+// ) => void | PromiseLike<void>;
+// export type WriteAlgorithm<InputType> = (chunk: InputType) => Promise<void>;
+// export type CloseAlgorithm = () => Promise<void>;
+// export type AbortAlgorithm = (reason?: shared.ErrorResult) => Promise<void>;
+
+// // ----
+
+// export interface WritableStreamController {
+// error(e?: shared.ErrorResult): void;
+
+// [errorSteps_](): void;
+// [abortSteps_](reason: shared.ErrorResult): Promise<void>;
+// }
+
+// export interface WriteRecord<InputType> {
+// chunk: InputType;
+// }
+
+// export interface WritableStreamDefaultController<InputType>
+// extends WritableStreamController,
+// q.QueueContainer<WriteRecord<InputType> | "close"> {
+// [abortAlgorithm_]: AbortAlgorithm; // A promise - returning algorithm, taking one argument(the abort reason), which communicates a requested abort to the underlying sink
+// [closeAlgorithm_]: CloseAlgorithm; // A promise - returning algorithm which communicates a requested close to the underlying sink
+// [controlledWritableStream_]: WritableStream<InputType>; // The WritableStream instance controlled
+// [started_]: boolean; // A boolean flag indicating whether the underlying sink has finished starting
+// [strategyHWM_]: number; // A number supplied by the creator of the stream as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying sink
+// [strategySizeAlgorithm_]: QueuingStrategySizeCallback<InputType>; // An algorithm to calculate the size of enqueued chunks, as part of the stream’s queuing strategy
+// [writeAlgorithm_]: WriteAlgorithm<InputType>; // A promise-returning algorithm, taking one argument (the chunk to write), which writes data to the underlying sink
+// }
+
+// // ----
+
+// export interface WritableStreamWriter<InputType> {
+// readonly closed: Promise<void>;
+// readonly desiredSize: number | null;
+// readonly ready: Promise<void>;
+
+// abort(reason: shared.ErrorResult): Promise<void>;
+// close(): Promise<void>;
+// releaseLock(): void;
+// write(chunk: InputType): Promise<void>;
+// }
+
+// export interface WritableStreamDefaultWriter<InputType>
+// extends WritableStreamWriter<InputType> {
+// [ownerWritableStream_]: WritableStream<InputType> | undefined;
+// [closedPromise_]: shared.ControlledPromise<void>;
+// [readyPromise_]: shared.ControlledPromise<void>;
+// }
+
+// // ----
+
+// export type WritableStreamState =
+// | "writable"
+// | "closed"
+// | "erroring"
+// | "errored";
+
+// export interface WritableStreamSink<InputType> {
+// start?: StartFunction;
+// write?: WriteFunction<InputType>;
+// close?(): void | PromiseLike<void>;
+// abort?(reason?: shared.ErrorResult): void;
+
+// type?: undefined; // unused, for future revisions
+// }
+
+// export interface AbortRequest {
+// reason: shared.ErrorResult;
+// wasAlreadyErroring: boolean;
+// promise: Promise<void>;
+// resolve(): void;
+// reject(error: shared.ErrorResult): void;
+// }
+
+// export declare class WritableStream<InputType> {
+// constructor(
+// underlyingSink?: WritableStreamSink<InputType>,
+// strategy?: QueuingStrategy<InputType>
+// );
+
+// readonly locked: boolean;
+// abort(reason?: shared.ErrorResult): Promise<void>;
+// getWriter(): WritableStreamWriter<InputType>;
+
+// [shared.state_]: WritableStreamState;
+// [backpressure_]: boolean;
+// [closeRequest_]: shared.ControlledPromise<void> | undefined;
+// [inFlightWriteRequest_]: shared.ControlledPromise<void> | undefined;
+// [inFlightCloseRequest_]: shared.ControlledPromise<void> | undefined;
+// [pendingAbortRequest_]: AbortRequest | undefined;
+// [shared.storedError_]: shared.ErrorResult;
+// [writableStreamController_]:
+// | WritableStreamDefaultController<InputType>
+// | undefined;
+// [writer_]: WritableStreamDefaultWriter<InputType> | undefined;
+// [writeRequests_]: Array<shared.ControlledPromise<void>>;
+// }
+
+// // ---- Stream
+
+// export function initializeWritableStream<InputType>(
+// stream: WritableStream<InputType>
+// ): void {
+// stream[shared.state_] = "writable";
+// stream[shared.storedError_] = undefined;
+// stream[writer_] = undefined;
+// stream[writableStreamController_] = undefined;
+// stream[inFlightWriteRequest_] = undefined;
+// stream[closeRequest_] = undefined;
+// stream[inFlightCloseRequest_] = undefined;
+// stream[pendingAbortRequest_] = undefined;
+// stream[writeRequests_] = [];
+// stream[backpressure_] = false;
+// }
+
+// export function isWritableStream(value: unknown): value is WritableStream<any> {
+// if (typeof value !== "object" || value === null) {
+// return false;
+// }
+// return writableStreamController_ in value;
+// }
+
+// export function isWritableStreamLocked<InputType>(
+// stream: WritableStream<InputType>
+// ): boolean {
+// return stream[writer_] !== undefined;
+// }
+
+// export function writableStreamAbort<InputType>(
+// stream: WritableStream<InputType>,
+// reason: shared.ErrorResult
+// ): Promise<void> {
+// const state = stream[shared.state_];
+// if (state === "closed" || state === "errored") {
+// return Promise.resolve(undefined);
+// }
+// let pending = stream[pendingAbortRequest_];
+// if (pending !== undefined) {
+// return pending.promise;
+// }
+// // Assert: state is "writable" or "erroring".
+// let wasAlreadyErroring = false;
+// if (state === "erroring") {
+// wasAlreadyErroring = true;
+// reason = undefined;
+// }
+
+// pending = {
+// reason,
+// wasAlreadyErroring
+// } as AbortRequest;
+// const promise = new Promise<void>((resolve, reject) => {
+// pending!.resolve = resolve;
+// pending!.reject = reject;
+// });
+// pending.promise = promise;
+// stream[pendingAbortRequest_] = pending;
+// if (!wasAlreadyErroring) {
+// writableStreamStartErroring(stream, reason);
+// }
+// return promise;
+// }
+
+// export function writableStreamAddWriteRequest<InputType>(
+// stream: WritableStream<InputType>
+// ): Promise<void> {
+// // Assert: !IsWritableStreamLocked(stream) is true.
+// // Assert: stream.[[state]] is "writable".
+// const writePromise = shared.createControlledPromise<void>();
+// stream[writeRequests_].push(writePromise);
+// return writePromise.promise;
+// }
+
+// export function writableStreamDealWithRejection<InputType>(
+// stream: WritableStream<InputType>,
+// error: shared.ErrorResult
+// ): void {
+// const state = stream[shared.state_];
+// if (state === "writable") {
+// writableStreamStartErroring(stream, error);
+// return;
+// }
+// // Assert: state is "erroring"
+// writableStreamFinishErroring(stream);
+// }
+
+// export function writableStreamStartErroring<InputType>(
+// stream: WritableStream<InputType>,
+// reason: shared.ErrorResult
+// ): void {
+// // Assert: stream.[[storedError]] is undefined.
+// // Assert: stream.[[state]] is "writable".
+// const controller = stream[writableStreamController_]!;
+// // Assert: controller is not undefined.
+// stream[shared.state_] = "erroring";
+// stream[shared.storedError_] = reason;
+// const writer = stream[writer_];
+// if (writer !== undefined) {
+// writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
+// }
+// if (
+// !writableStreamHasOperationMarkedInFlight(stream) &&
+// controller[started_]
+// ) {
+// writableStreamFinishErroring(stream);
+// }
+// }
+
+// export function writableStreamFinishErroring<InputType>(
+// stream: WritableStream<InputType>
+// ): void {
+// // Assert: stream.[[state]] is "erroring".
+// // Assert: writableStreamHasOperationMarkedInFlight(stream) is false.
+// stream[shared.state_] = "errored";
+// const controller = stream[writableStreamController_]!;
+// controller[errorSteps_]();
+// const storedError = stream[shared.storedError_];
+// for (const writeRequest of stream[writeRequests_]) {
+// writeRequest.reject(storedError);
+// }
+// stream[writeRequests_] = [];
+
+// const abortRequest = stream[pendingAbortRequest_];
+// if (abortRequest === undefined) {
+// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+// return;
+// }
+// stream[pendingAbortRequest_] = undefined;
+// if (abortRequest.wasAlreadyErroring) {
+// abortRequest.reject(storedError);
+// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+// return;
+// }
+// const promise = controller[abortSteps_](abortRequest.reason);
+// promise.then(
+// _ => {
+// abortRequest.resolve();
+// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+// },
+// error => {
+// abortRequest.reject(error);
+// writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+// }
+// );
+// }
+
+// export function writableStreamFinishInFlightWrite<InputType>(
+// stream: WritableStream<InputType>
+// ): void {
+// // Assert: stream.[[inFlightWriteRequest]] is not undefined.
+// stream[inFlightWriteRequest_]!.resolve(undefined);
+// stream[inFlightWriteRequest_] = undefined;
+// }
+
+// export function writableStreamFinishInFlightWriteWithError<InputType>(
+// stream: WritableStream<InputType>,
+// error: shared.ErrorResult
+// ): void {
+// // Assert: stream.[[inFlightWriteRequest]] is not undefined.
+// stream[inFlightWriteRequest_]!.reject(error);
+// stream[inFlightWriteRequest_] = undefined;
+// // Assert: stream.[[state]] is "writable" or "erroring".
+// writableStreamDealWithRejection(stream, error);
+// }
+
+// export function writableStreamFinishInFlightClose<InputType>(
+// stream: WritableStream<InputType>
+// ): void {
+// // Assert: stream.[[inFlightCloseRequest]] is not undefined.
+// stream[inFlightCloseRequest_]!.resolve(undefined);
+// stream[inFlightCloseRequest_] = undefined;
+// const state = stream[shared.state_];
+// // Assert: stream.[[state]] is "writable" or "erroring".
+// if (state === "erroring") {
+// stream[shared.storedError_] = undefined;
+// if (stream[pendingAbortRequest_] !== undefined) {
+// stream[pendingAbortRequest_]!.resolve();
+// stream[pendingAbortRequest_] = undefined;
+// }
+// }
+// stream[shared.state_] = "closed";
+// const writer = stream[writer_];
+// if (writer !== undefined) {
+// writer[closedPromise_].resolve(undefined);
+// }
+// // Assert: stream.[[pendingAbortRequest]] is undefined.
+// // Assert: stream.[[storedError]] is undefined.
+// }
+
+// export function writableStreamFinishInFlightCloseWithError<InputType>(
+// stream: WritableStream<InputType>,
+// error: shared.ErrorResult
+// ): void {
+// // Assert: stream.[[inFlightCloseRequest]] is not undefined.
+// stream[inFlightCloseRequest_]!.reject(error);
+// stream[inFlightCloseRequest_] = undefined;
+// // Assert: stream.[[state]] is "writable" or "erroring".
+// if (stream[pendingAbortRequest_] !== undefined) {
+// stream[pendingAbortRequest_]!.reject(error);
+// stream[pendingAbortRequest_] = undefined;
+// }
+// writableStreamDealWithRejection(stream, error);
+// }
+
+// export function writableStreamCloseQueuedOrInFlight<InputType>(
+// stream: WritableStream<InputType>
+// ): boolean {
+// return (
+// stream[closeRequest_] !== undefined ||
+// stream[inFlightCloseRequest_] !== undefined
+// );
+// }
+
+// export function writableStreamHasOperationMarkedInFlight<InputType>(
+// stream: WritableStream<InputType>
+// ): boolean {
+// return (
+// stream[inFlightWriteRequest_] !== undefined ||
+// stream[inFlightCloseRequest_] !== undefined
+// );
+// }
+
+// export function writableStreamMarkCloseRequestInFlight<InputType>(
+// stream: WritableStream<InputType>
+// ): void {
+// // Assert: stream.[[inFlightCloseRequest]] is undefined.
+// // Assert: stream.[[closeRequest]] is not undefined.
+// stream[inFlightCloseRequest_] = stream[closeRequest_];
+// stream[closeRequest_] = undefined;
+// }
+
+// export function writableStreamMarkFirstWriteRequestInFlight<InputType>(
+// stream: WritableStream<InputType>
+// ): void {
+// // Assert: stream.[[inFlightWriteRequest]] is undefined.
+// // Assert: stream.[[writeRequests]] is not empty.
+// const writeRequest = stream[writeRequests_].shift()!;
+// stream[inFlightWriteRequest_] = writeRequest;
+// }
+
+// export function writableStreamRejectCloseAndClosedPromiseIfNeeded<InputType>(
+// stream: WritableStream<InputType>
+// ): void {
+// // Assert: stream.[[state]] is "errored".
+// const closeRequest = stream[closeRequest_];
+// if (closeRequest !== undefined) {
+// // Assert: stream.[[inFlightCloseRequest]] is undefined.
+// closeRequest.reject(stream[shared.storedError_]);
+// stream[closeRequest_] = undefined;
+// }
+// const writer = stream[writer_];
+// if (writer !== undefined) {
+// writer[closedPromise_].reject(stream[shared.storedError_]);
+// writer[closedPromise_].promise.catch(() => {});
+// }
+// }
+
+// export function writableStreamUpdateBackpressure<InputType>(
+// stream: WritableStream<InputType>,
+// backpressure: boolean
+// ): void {
+// // Assert: stream.[[state]] is "writable".
+// // Assert: !WritableStreamCloseQueuedOrInFlight(stream) is false.
+// const writer = stream[writer_];
+// if (writer !== undefined && backpressure !== stream[backpressure_]) {
+// if (backpressure) {
+// writer[readyPromise_] = shared.createControlledPromise<void>();
+// } else {
+// writer[readyPromise_].resolve(undefined);
+// }
+// }
+// stream[backpressure_] = backpressure;
+// }
+
+// // ---- Writers
+
+// export function isWritableStreamDefaultWriter(
+// value: unknown
+// ): value is WritableStreamDefaultWriter<any> {
+// if (typeof value !== "object" || value === null) {
+// return false;
+// }
+// return ownerWritableStream_ in value;
+// }
+
+// export function writableStreamDefaultWriterAbort<InputType>(
+// writer: WritableStreamDefaultWriter<InputType>,
+// reason: shared.ErrorResult
+// ): Promise<void> {
+// const stream = writer[ownerWritableStream_]!;
+// // Assert: stream is not undefined.
+// return writableStreamAbort(stream, reason);
+// }
+
+// export function writableStreamDefaultWriterClose<InputType>(
+// writer: WritableStreamDefaultWriter<InputType>
+// ): Promise<void> {
+// const stream = writer[ownerWritableStream_]!;
+// // Assert: stream is not undefined.
+// const state = stream[shared.state_];
+// if (state === "closed" || state === "errored") {
+// return Promise.reject(
+// new TypeError("Writer stream is already closed or errored")
+// );
+// }
+// // Assert: state is "writable" or "erroring".
+// // Assert: writableStreamCloseQueuedOrInFlight(stream) is false.
+// const closePromise = shared.createControlledPromise<void>();
+// stream[closeRequest_] = closePromise;
+// if (stream[backpressure_] && state === "writable") {
+// writer[readyPromise_].resolve(undefined);
+// }
+// writableStreamDefaultControllerClose(stream[writableStreamController_]!);
+// return closePromise.promise;
+// }
+
+// export function writableStreamDefaultWriterCloseWithErrorPropagation<InputType>(
+// writer: WritableStreamDefaultWriter<InputType>
+// ): Promise<void> {
+// const stream = writer[ownerWritableStream_]!;
+// // Assert: stream is not undefined.
+// const state = stream[shared.state_];
+// if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
+// return Promise.resolve(undefined);
+// }
+// if (state === "errored") {
+// return Promise.reject(stream[shared.storedError_]);
+// }
+// // Assert: state is "writable" or "erroring".
+// return writableStreamDefaultWriterClose(writer);
+// }
+
+// export function writableStreamDefaultWriterEnsureClosedPromiseRejected<
+// InputType
+// >(
+// writer: WritableStreamDefaultWriter<InputType>,
+// error: shared.ErrorResult
+// ): void {
+// const closedPromise = writer[closedPromise_];
+// if (closedPromise.state === shared.ControlledPromiseState.Pending) {
+// closedPromise.reject(error);
+// } else {
+// writer[closedPromise_] = shared.createControlledPromise<void>();
+// writer[closedPromise_].reject(error);
+// }
+// writer[closedPromise_].promise.catch(() => {});
+// }
+
+// export function writableStreamDefaultWriterEnsureReadyPromiseRejected<
+// InputType
+// >(
+// writer: WritableStreamDefaultWriter<InputType>,
+// error: shared.ErrorResult
+// ): void {
+// const readyPromise = writer[readyPromise_];
+// if (readyPromise.state === shared.ControlledPromiseState.Pending) {
+// readyPromise.reject(error);
+// } else {
+// writer[readyPromise_] = shared.createControlledPromise<void>();
+// writer[readyPromise_].reject(error);
+// }
+// writer[readyPromise_].promise.catch(() => {});
+// }
+
+// export function writableStreamDefaultWriterGetDesiredSize<InputType>(
+// writer: WritableStreamDefaultWriter<InputType>
+// ): number | null {
+// const stream = writer[ownerWritableStream_]!;
+// const state = stream[shared.state_];
+// if (state === "errored" || state === "erroring") {
+// return null;
+// }
+// if (state === "closed") {
+// return 0;
+// }
+// return writableStreamDefaultControllerGetDesiredSize(
+// stream[writableStreamController_]!
+// );
+// }
+
+// export function writableStreamDefaultWriterRelease<InputType>(
+// writer: WritableStreamDefaultWriter<InputType>
+// ): void {
+// const stream = writer[ownerWritableStream_]!;
+// // Assert: stream is not undefined.
+// // Assert: stream.[[writer]] is writer.
+// const releasedError = new TypeError();
+// writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
+// writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError);
+// stream[writer_] = undefined;
+// writer[ownerWritableStream_] = undefined;
+// }
+
+// export function writableStreamDefaultWriterWrite<InputType>(
+// writer: WritableStreamDefaultWriter<InputType>,
+// chunk: InputType
+// ): Promise<void> {
+// const stream = writer[ownerWritableStream_]!;
+// // Assert: stream is not undefined.
+// const controller = stream[writableStreamController_]!;
+// const chunkSize = writableStreamDefaultControllerGetChunkSize(
+// controller,
+// chunk
+// );
+// if (writer[ownerWritableStream_] !== stream) {
+// return Promise.reject(new TypeError());
+// }
+// const state = stream[shared.state_];
+// if (state === "errored") {
+// return Promise.reject(stream[shared.storedError_]);
+// }
+// if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
+// return Promise.reject(
+// new TypeError("Cannot write to a closing or closed stream")
+// );
+// }
+// if (state === "erroring") {
+// return Promise.reject(stream[shared.storedError_]);
+// }
+// // Assert: state is "writable".
+// const promise = writableStreamAddWriteRequest(stream);
+// writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
+// return promise;
+// }
+
+// // ---- Controller
+
+// export function setUpWritableStreamDefaultController<InputType>(
+// stream: WritableStream<InputType>,
+// controller: WritableStreamDefaultController<InputType>,
+// startAlgorithm: StartAlgorithm,
+// writeAlgorithm: WriteAlgorithm<InputType>,
+// closeAlgorithm: CloseAlgorithm,
+// abortAlgorithm: AbortAlgorithm,
+// highWaterMark: number,
+// sizeAlgorithm: QueuingStrategySizeCallback<InputType>
+// ): void {
+// if (!isWritableStream(stream)) {
+// throw new TypeError();
+// }
+// if (stream[writableStreamController_] !== undefined) {
+// throw new TypeError();
+// }
+
+// controller[controlledWritableStream_] = stream;
+// stream[writableStreamController_] = controller;
+// q.resetQueue(controller);
+// controller[started_] = false;
+// controller[strategySizeAlgorithm_] = sizeAlgorithm;
+// controller[strategyHWM_] = highWaterMark;
+// controller[writeAlgorithm_] = writeAlgorithm;
+// controller[closeAlgorithm_] = closeAlgorithm;
+// controller[abortAlgorithm_] = abortAlgorithm;
+// const backpressure = writableStreamDefaultControllerGetBackpressure(
+// controller
+// );
+// writableStreamUpdateBackpressure(stream, backpressure);
+
+// const startResult = startAlgorithm();
+// Promise.resolve(startResult).then(
+// _ => {
+// // Assert: stream.[[state]] is "writable" or "erroring".
+// controller[started_] = true;
+// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+// },
+// error => {
+// // Assert: stream.[[state]] is "writable" or "erroring".
+// controller[started_] = true;
+// writableStreamDealWithRejection(stream, error);
+// }
+// );
+// }
+
+// export function isWritableStreamDefaultController(
+// value: unknown
+// ): value is WritableStreamDefaultController<any> {
+// if (typeof value !== "object" || value === null) {
+// return false;
+// }
+// return controlledWritableStream_ in value;
+// }
+
+// export function writableStreamDefaultControllerClearAlgorithms<InputType>(
+// controller: WritableStreamDefaultController<InputType>
+// ): void {
+// // Use ! assertions to override type check here, this way we don't
+// // have to perform type checks/assertions everywhere else.
+// controller[writeAlgorithm_] = undefined!;
+// controller[closeAlgorithm_] = undefined!;
+// controller[abortAlgorithm_] = undefined!;
+// controller[strategySizeAlgorithm_] = undefined!;
+// }
+
+// export function writableStreamDefaultControllerClose<InputType>(
+// controller: WritableStreamDefaultController<InputType>
+// ): void {
+// q.enqueueValueWithSize(controller, "close", 0);
+// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+// }
+
+// export function writableStreamDefaultControllerGetChunkSize<InputType>(
+// controller: WritableStreamDefaultController<InputType>,
+// chunk: InputType
+// ): number {
+// let chunkSize: number;
+// try {
+// chunkSize = controller[strategySizeAlgorithm_](chunk);
+// } catch (error) {
+// writableStreamDefaultControllerErrorIfNeeded(controller, error);
+// chunkSize = 1;
+// }
+// return chunkSize;
+// }
+
+// export function writableStreamDefaultControllerGetDesiredSize<InputType>(
+// controller: WritableStreamDefaultController<InputType>
+// ): number {
+// return controller[strategyHWM_] - controller[q.queueTotalSize_];
+// }
+
+// export function writableStreamDefaultControllerWrite<InputType>(
+// controller: WritableStreamDefaultController<InputType>,
+// chunk: InputType,
+// chunkSize: number
+// ): void {
+// try {
+// q.enqueueValueWithSize(controller, { chunk }, chunkSize);
+// } catch (error) {
+// writableStreamDefaultControllerErrorIfNeeded(controller, error);
+// return;
+// }
+// const stream = controller[controlledWritableStream_];
+// if (
+// !writableStreamCloseQueuedOrInFlight(stream) &&
+// stream[shared.state_] === "writable"
+// ) {
+// const backpressure = writableStreamDefaultControllerGetBackpressure(
+// controller
+// );
+// writableStreamUpdateBackpressure(stream, backpressure);
+// }
+// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+// }
+
+// export function writableStreamDefaultControllerAdvanceQueueIfNeeded<InputType>(
+// controller: WritableStreamDefaultController<InputType>
+// ): void {
+// if (!controller[started_]) {
+// return;
+// }
+// const stream = controller[controlledWritableStream_];
+// if (stream[inFlightWriteRequest_] !== undefined) {
+// return;
+// }
+// const state = stream[shared.state_];
+// if (state === "closed" || state === "errored") {
+// return;
+// }
+// if (state === "erroring") {
+// writableStreamFinishErroring(stream);
+// return;
+// }
+// if (controller[q.queue_].length === 0) {
+// return;
+// }
+// const writeRecord = q.peekQueueValue(controller);
+// if (writeRecord === "close") {
+// writableStreamDefaultControllerProcessClose(controller);
+// } else {
+// writableStreamDefaultControllerProcessWrite(controller, writeRecord.chunk);
+// }
+// }
+
+// export function writableStreamDefaultControllerErrorIfNeeded<InputType>(
+// controller: WritableStreamDefaultController<InputType>,
+// error: shared.ErrorResult
+// ): void {
+// if (controller[controlledWritableStream_][shared.state_] === "writable") {
+// writableStreamDefaultControllerError(controller, error);
+// }
+// }
+
+// export function writableStreamDefaultControllerProcessClose<InputType>(
+// controller: WritableStreamDefaultController<InputType>
+// ): void {
+// const stream = controller[controlledWritableStream_];
+// writableStreamMarkCloseRequestInFlight(stream);
+// q.dequeueValue(controller);
+// // Assert: controller.[[queue]] is empty.
+// const sinkClosePromise = controller[closeAlgorithm_]();
+// writableStreamDefaultControllerClearAlgorithms(controller);
+// sinkClosePromise.then(
+// _ => {
+// writableStreamFinishInFlightClose(stream);
+// },
+// error => {
+// writableStreamFinishInFlightCloseWithError(stream, error);
+// }
+// );
+// }
+
+// export function writableStreamDefaultControllerProcessWrite<InputType>(
+// controller: WritableStreamDefaultController<InputType>,
+// chunk: InputType
+// ): void {
+// const stream = controller[controlledWritableStream_];
+// writableStreamMarkFirstWriteRequestInFlight(stream);
+// controller[writeAlgorithm_](chunk).then(
+// _ => {
+// writableStreamFinishInFlightWrite(stream);
+// const state = stream[shared.state_];
+// // Assert: state is "writable" or "erroring".
+// q.dequeueValue(controller);
+// if (
+// !writableStreamCloseQueuedOrInFlight(stream) &&
+// state === "writable"
+// ) {
+// const backpressure = writableStreamDefaultControllerGetBackpressure(
+// controller
+// );
+// writableStreamUpdateBackpressure(stream, backpressure);
+// }
+// writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+// },
+// error => {
+// if (stream[shared.state_] === "writable") {
+// writableStreamDefaultControllerClearAlgorithms(controller);
+// }
+// writableStreamFinishInFlightWriteWithError(stream, error);
+// }
+// );
+// }
+
+// export function writableStreamDefaultControllerGetBackpressure<InputType>(
+// controller: WritableStreamDefaultController<InputType>
+// ): boolean {
+// const desiredSize = writableStreamDefaultControllerGetDesiredSize(controller);
+// return desiredSize <= 0;
+// }
+
+// export function writableStreamDefaultControllerError<InputType>(
+// controller: WritableStreamDefaultController<InputType>,
+// error: shared.ErrorResult
+// ): void {
+// const stream = controller[controlledWritableStream_];
+// // Assert: stream.[[state]] is "writable".
+// writableStreamDefaultControllerClearAlgorithms(controller);
+// writableStreamStartErroring(stream, error);
+// }
diff --git a/cli/js/streams/writable-stream-default-controller.ts b/cli/js/streams/writable-stream-default-controller.ts
new file mode 100644
index 000000000..57ffe08fd
--- /dev/null
+++ b/cli/js/streams/writable-stream-default-controller.ts
@@ -0,0 +1,101 @@
+// TODO reenable this code when we enable writableStreams and transport types
+// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+// /**
+// * streams/writable-stream-default-controller - WritableStreamDefaultController class implementation
+// * Part of Stardazed
+// * (c) 2018-Present by Arthur Langereis - @zenmumbler
+// * https://github.com/stardazed/sd-streams
+// */
+
+// /* eslint-disable @typescript-eslint/no-explicit-any */
+// // TODO reenable this lint here
+
+// import * as ws from "./writable-internals.ts";
+// import * as shared from "./shared-internals.ts";
+// import * as q from "./queue-mixin.ts";
+// import { Queue } from "./queue.ts";
+// import { QueuingStrategySizeCallback } from "../dom_types.ts";
+
+// export class WritableStreamDefaultController<InputType>
+// implements ws.WritableStreamDefaultController<InputType> {
+// [ws.abortAlgorithm_]: ws.AbortAlgorithm;
+// [ws.closeAlgorithm_]: ws.CloseAlgorithm;
+// [ws.controlledWritableStream_]: ws.WritableStream<InputType>;
+// [ws.started_]: boolean;
+// [ws.strategyHWM_]: number;
+// [ws.strategySizeAlgorithm_]: QueuingStrategySizeCallback<InputType>;
+// [ws.writeAlgorithm_]: ws.WriteAlgorithm<InputType>;
+
+// [q.queue_]: Queue<q.QueueElement<ws.WriteRecord<InputType> | "close">>;
+// [q.queueTotalSize_]: number;
+
+// constructor() {
+// throw new TypeError();
+// }
+
+// error(e?: shared.ErrorResult): void {
+// if (!ws.isWritableStreamDefaultController(this)) {
+// throw new TypeError();
+// }
+// const state = this[ws.controlledWritableStream_][shared.state_];
+// if (state !== "writable") {
+// return;
+// }
+// ws.writableStreamDefaultControllerError(this, e);
+// }
+
+// [ws.abortSteps_](reason: shared.ErrorResult): Promise<void> {
+// const result = this[ws.abortAlgorithm_](reason);
+// ws.writableStreamDefaultControllerClearAlgorithms(this);
+// return result;
+// }
+
+// [ws.errorSteps_](): void {
+// q.resetQueue(this);
+// }
+// }
+
+// export function setUpWritableStreamDefaultControllerFromUnderlyingSink<
+// InputType
+// >(
+// stream: ws.WritableStream<InputType>,
+// underlyingSink: ws.WritableStreamSink<InputType>,
+// highWaterMark: number,
+// sizeAlgorithm: QueuingStrategySizeCallback<InputType>
+// ): void {
+// // Assert: underlyingSink is not undefined.
+// const controller = Object.create(
+// WritableStreamDefaultController.prototype
+// ) as WritableStreamDefaultController<InputType>;
+
+// const startAlgorithm = function(): any {
+// return shared.invokeOrNoop(underlyingSink, "start", [controller]);
+// };
+// const writeAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
+// underlyingSink,
+// "write",
+// [controller]
+// );
+// const closeAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
+// underlyingSink,
+// "close",
+// []
+// );
+// const abortAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
+// underlyingSink,
+// "abort",
+// []
+// );
+// ws.setUpWritableStreamDefaultController(
+// stream,
+// controller,
+// startAlgorithm,
+// writeAlgorithm,
+// closeAlgorithm,
+// abortAlgorithm,
+// highWaterMark,
+// sizeAlgorithm
+// );
+// }
diff --git a/cli/js/streams/writable-stream-default-writer.ts b/cli/js/streams/writable-stream-default-writer.ts
new file mode 100644
index 000000000..f38aa26bb
--- /dev/null
+++ b/cli/js/streams/writable-stream-default-writer.ts
@@ -0,0 +1,136 @@
+// TODO reenable this code when we enable writableStreams and transport types
+// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+// /**
+// * streams/writable-stream-default-writer - WritableStreamDefaultWriter class implementation
+// * Part of Stardazed
+// * (c) 2018-Present by Arthur Langereis - @zenmumbler
+// * https://github.com/stardazed/sd-streams
+// */
+
+// import * as ws from "./writable-internals.ts";
+// import * as shared from "./shared-internals.ts";
+
+// export class WritableStreamDefaultWriter<InputType>
+// implements ws.WritableStreamDefaultWriter<InputType> {
+// [ws.ownerWritableStream_]: ws.WritableStream<InputType> | undefined;
+// [ws.readyPromise_]: shared.ControlledPromise<void>;
+// [ws.closedPromise_]: shared.ControlledPromise<void>;
+
+// constructor(stream: ws.WritableStream<InputType>) {
+// if (!ws.isWritableStream(stream)) {
+// throw new TypeError();
+// }
+// if (ws.isWritableStreamLocked(stream)) {
+// throw new TypeError("Stream is already locked");
+// }
+// this[ws.ownerWritableStream_] = stream;
+// stream[ws.writer_] = this;
+
+// const readyPromise = shared.createControlledPromise<void>();
+// const closedPromise = shared.createControlledPromise<void>();
+// this[ws.readyPromise_] = readyPromise;
+// this[ws.closedPromise_] = closedPromise;
+
+// const state = stream[shared.state_];
+// if (state === "writable") {
+// if (
+// !ws.writableStreamCloseQueuedOrInFlight(stream) &&
+// stream[ws.backpressure_]
+// ) {
+// // OK Set this.[[readyPromise]] to a new promise.
+// } else {
+// readyPromise.resolve(undefined);
+// }
+// // OK Set this.[[closedPromise]] to a new promise.
+// } else if (state === "erroring") {
+// readyPromise.reject(stream[shared.storedError_]);
+// readyPromise.promise.catch(() => {});
+// // OK Set this.[[closedPromise]] to a new promise.
+// } else if (state === "closed") {
+// readyPromise.resolve(undefined);
+// closedPromise.resolve(undefined);
+// } else {
+// // Assert: state is "errored".
+// const storedError = stream[shared.storedError_];
+// readyPromise.reject(storedError);
+// readyPromise.promise.catch(() => {});
+// closedPromise.reject(storedError);
+// closedPromise.promise.catch(() => {});
+// }
+// }
+
+// abort(reason: shared.ErrorResult): Promise<void> {
+// if (!ws.isWritableStreamDefaultWriter(this)) {
+// return Promise.reject(new TypeError());
+// }
+// if (this[ws.ownerWritableStream_] === undefined) {
+// return Promise.reject(
+// new TypeError("Writer is not connected to a stream")
+// );
+// }
+// return ws.writableStreamDefaultWriterAbort(this, reason);
+// }
+
+// close(): Promise<void> {
+// if (!ws.isWritableStreamDefaultWriter(this)) {
+// return Promise.reject(new TypeError());
+// }
+// const stream = this[ws.ownerWritableStream_];
+// if (stream === undefined) {
+// return Promise.reject(
+// new TypeError("Writer is not connected to a stream")
+// );
+// }
+// if (ws.writableStreamCloseQueuedOrInFlight(stream)) {
+// return Promise.reject(new TypeError());
+// }
+// return ws.writableStreamDefaultWriterClose(this);
+// }
+
+// releaseLock(): void {
+// const stream = this[ws.ownerWritableStream_];
+// if (stream === undefined) {
+// return;
+// }
+// // Assert: stream.[[writer]] is not undefined.
+// ws.writableStreamDefaultWriterRelease(this);
+// }
+
+// write(chunk: InputType): Promise<void> {
+// if (!ws.isWritableStreamDefaultWriter(this)) {
+// return Promise.reject(new TypeError());
+// }
+// if (this[ws.ownerWritableStream_] === undefined) {
+// return Promise.reject(
+// new TypeError("Writer is not connected to a stream")
+// );
+// }
+// return ws.writableStreamDefaultWriterWrite(this, chunk);
+// }
+
+// get closed(): Promise<void> {
+// if (!ws.isWritableStreamDefaultWriter(this)) {
+// return Promise.reject(new TypeError());
+// }
+// return this[ws.closedPromise_].promise;
+// }
+
+// get desiredSize(): number | null {
+// if (!ws.isWritableStreamDefaultWriter(this)) {
+// throw new TypeError();
+// }
+// if (this[ws.ownerWritableStream_] === undefined) {
+// throw new TypeError("Writer is not connected to stream");
+// }
+// return ws.writableStreamDefaultWriterGetDesiredSize(this);
+// }
+
+// get ready(): Promise<void> {
+// if (!ws.isWritableStreamDefaultWriter(this)) {
+// return Promise.reject(new TypeError());
+// }
+// return this[ws.readyPromise_].promise;
+// }
+// }
diff --git a/cli/js/streams/writable-stream.ts b/cli/js/streams/writable-stream.ts
new file mode 100644
index 000000000..a6131c5d0
--- /dev/null
+++ b/cli/js/streams/writable-stream.ts
@@ -0,0 +1,118 @@
+// TODO reenable this code when we enable writableStreams and transport types
+// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+// /**
+// * streams/writable-stream - WritableStream class implementation
+// * Part of Stardazed
+// * (c) 2018-Present by Arthur Langereis - @zenmumbler
+// * https://github.com/stardazed/sd-streams
+// */
+
+// import * as ws from "./writable-internals.ts";
+// import * as shared from "./shared-internals.ts";
+// import {
+// WritableStreamDefaultController,
+// setUpWritableStreamDefaultControllerFromUnderlyingSink
+// } from "./writable-stream-default-controller.ts";
+// import { WritableStreamDefaultWriter } from "./writable-stream-default-writer.ts";
+// import { QueuingStrategy, QueuingStrategySizeCallback } from "../dom_types.ts";
+
+// export class WritableStream<InputType> {
+// [shared.state_]: ws.WritableStreamState;
+// [shared.storedError_]: shared.ErrorResult;
+// [ws.backpressure_]: boolean;
+// [ws.closeRequest_]: shared.ControlledPromise<void> | undefined;
+// [ws.inFlightWriteRequest_]: shared.ControlledPromise<void> | undefined;
+// [ws.inFlightCloseRequest_]: shared.ControlledPromise<void> | undefined;
+// [ws.pendingAbortRequest_]: ws.AbortRequest | undefined;
+// [ws.writableStreamController_]:
+// | ws.WritableStreamDefaultController<InputType>
+// | undefined;
+// [ws.writer_]: ws.WritableStreamDefaultWriter<InputType> | undefined;
+// [ws.writeRequests_]: Array<shared.ControlledPromise<void>>;
+
+// constructor(
+// sink: ws.WritableStreamSink<InputType> = {},
+// strategy: QueuingStrategy<InputType> = {}
+// ) {
+// ws.initializeWritableStream(this);
+// const sizeFunc = strategy.size;
+// const stratHWM = strategy.highWaterMark;
+// if (sink.type !== undefined) {
+// throw new RangeError("The type of an underlying sink must be undefined");
+// }
+
+// const sizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(sizeFunc);
+// const highWaterMark = shared.validateAndNormalizeHighWaterMark(
+// stratHWM === undefined ? 1 : stratHWM
+// );
+
+// setUpWritableStreamDefaultControllerFromUnderlyingSink(
+// this,
+// sink,
+// highWaterMark,
+// sizeAlgorithm
+// );
+// }
+
+// get locked(): boolean {
+// if (!ws.isWritableStream(this)) {
+// throw new TypeError();
+// }
+// return ws.isWritableStreamLocked(this);
+// }
+
+// abort(reason?: shared.ErrorResult): Promise<void> {
+// if (!ws.isWritableStream(this)) {
+// return Promise.reject(new TypeError());
+// }
+// if (ws.isWritableStreamLocked(this)) {
+// return Promise.reject(new TypeError("Cannot abort a locked stream"));
+// }
+// return ws.writableStreamAbort(this, reason);
+// }
+
+// getWriter(): ws.WritableStreamWriter<InputType> {
+// if (!ws.isWritableStream(this)) {
+// throw new TypeError();
+// }
+// return new WritableStreamDefaultWriter(this);
+// }
+// }
+
+// export function createWritableStream<InputType>(
+// startAlgorithm: ws.StartAlgorithm,
+// writeAlgorithm: ws.WriteAlgorithm<InputType>,
+// closeAlgorithm: ws.CloseAlgorithm,
+// abortAlgorithm: ws.AbortAlgorithm,
+// highWaterMark?: number,
+// sizeAlgorithm?: QueuingStrategySizeCallback<InputType>
+// ): WritableStream<InputType> {
+// if (highWaterMark === undefined) {
+// highWaterMark = 1;
+// }
+// if (sizeAlgorithm === undefined) {
+// sizeAlgorithm = (): number => 1;
+// }
+// // Assert: ! IsNonNegativeNumber(highWaterMark) is true.
+
+// const stream = Object.create(WritableStream.prototype) as WritableStream<
+// InputType
+// >;
+// ws.initializeWritableStream(stream);
+// const controller = Object.create(
+// WritableStreamDefaultController.prototype
+// ) as WritableStreamDefaultController<InputType>;
+// ws.setUpWritableStreamDefaultController(
+// stream,
+// controller,
+// startAlgorithm,
+// writeAlgorithm,
+// closeAlgorithm,
+// abortAlgorithm,
+// highWaterMark,
+// sizeAlgorithm
+// );
+// return stream;
+// }