summaryrefslogtreecommitdiff
path: root/cli/js/web/streams/readable_stream.ts
diff options
context:
space:
mode:
Diffstat (limited to 'cli/js/web/streams/readable_stream.ts')
-rw-r--r--cli/js/web/streams/readable_stream.ts216
1 files changed, 216 insertions, 0 deletions
diff --git a/cli/js/web/streams/readable_stream.ts b/cli/js/web/streams/readable_stream.ts
new file mode 100644
index 000000000..f606319b1
--- /dev/null
+++ b/cli/js/web/streams/readable_stream.ts
@@ -0,0 +1,216 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ acquireReadableStreamDefaultReader,
+ initializeReadableStream,
+ isReadableStream,
+ isReadableStreamLocked,
+ isUnderlyingByteSource,
+ makeSizeAlgorithmFromSizeFunction,
+ readableStreamCancel,
+ ReadableStreamGenericReader,
+ readableStreamTee,
+ setUpReadableByteStreamControllerFromUnderlyingSource,
+ setUpReadableStreamDefaultControllerFromUnderlyingSource,
+ validateAndNormalizeHighWaterMark,
+} from "./internals.ts";
+import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_controller.ts";
+import { ReadableStreamAsyncIteratorPrototype } from "./readable_stream_async_iterator.ts";
+import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
+import * as sym from "./symbols.ts";
+import { symbols } from "../../symbols.ts";
+import { notImplemented } from "../../util.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
+ [sym.disturbed]: boolean;
+ [sym.readableStreamController]:
+ | ReadableStreamDefaultControllerImpl<R>
+ | ReadableByteStreamControllerImpl;
+ [sym.reader]: ReadableStreamGenericReader<R> | undefined;
+ [sym.state]: "readable" | "closed" | "errored";
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ [sym.storedError]: any;
+
+ constructor(
+ underlyingSource: UnderlyingByteSource | UnderlyingSource<R> = {},
+ strategy:
+ | {
+ highWaterMark?: number;
+ size?: undefined;
+ }
+ | QueuingStrategy<R> = {}
+ ) {
+ initializeReadableStream(this);
+ const { size } = strategy;
+ let { highWaterMark } = strategy;
+ const { type } = underlyingSource;
+
+ if (isUnderlyingByteSource(underlyingSource)) {
+ if (size !== undefined) {
+ throw new RangeError(
+ `When underlying source is "bytes", strategy.size must be undefined.`
+ );
+ }
+ highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 0);
+ setUpReadableByteStreamControllerFromUnderlyingSource(
+ this,
+ underlyingSource,
+ highWaterMark
+ );
+ } else if (type === undefined) {
+ const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size);
+ highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 1);
+ setUpReadableStreamDefaultControllerFromUnderlyingSource(
+ this,
+ underlyingSource,
+ highWaterMark,
+ sizeAlgorithm
+ );
+ } else {
+ throw new RangeError(
+ `Valid values for underlyingSource are "bytes" or undefined. Received: "${type}".`
+ );
+ }
+ }
+
+ get locked(): boolean {
+ if (!isReadableStream(this)) {
+ throw new TypeError("Invalid ReadableStream.");
+ }
+ return isReadableStreamLocked(this);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ cancel(reason?: any): Promise<void> {
+ if (!isReadableStream(this)) {
+ return Promise.reject(new TypeError("Invalid ReadableStream."));
+ }
+ if (isReadableStreamLocked(this)) {
+ return Promise.reject(
+ new TypeError("Cannot cancel a locked ReadableStream.")
+ );
+ }
+ return readableStreamCancel(this, reason);
+ }
+
+ getIterator({
+ preventCancel,
+ }: { preventCancel?: boolean } = {}): AsyncIterableIterator<R> {
+ if (!isReadableStream(this)) {
+ throw new TypeError("Invalid ReadableStream.");
+ }
+ const reader = acquireReadableStreamDefaultReader(this);
+ const iterator = Object.create(ReadableStreamAsyncIteratorPrototype);
+ iterator[sym.asyncIteratorReader] = reader;
+ iterator[sym.preventCancel] = Boolean(preventCancel);
+ return iterator;
+ }
+
+ getReader({ mode }: { mode?: string } = {}): ReadableStreamDefaultReader<R> {
+ if (!isReadableStream(this)) {
+ throw new TypeError("Invalid ReadableStream.");
+ }
+ if (mode === undefined) {
+ return acquireReadableStreamDefaultReader(this, true);
+ }
+ mode = String(mode);
+ // 3.2.5.4.4 If mode is "byob", return ? AcquireReadableStreamBYOBReader(this, true).
+ throw new RangeError(`Unsupported mode "${mode}"`);
+ }
+
+ pipeThrough<T>(): // {
+ // writable,
+ // readable,
+ // }: {
+ // writable: WritableStream<R>;
+ // readable: ReadableStream<T>;
+ // },
+ // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {},
+ ReadableStream<T> {
+ return notImplemented();
+ // if (!isReadableStream(this)) {
+ // throw new TypeError("Invalid ReadableStream.");
+ // }
+ // if (!isWritableStream(writable)) {
+ // throw new TypeError("writable is not a valid WritableStream.");
+ // }
+ // if (!isReadableStream(readable)) {
+ // throw new TypeError("readable is not a valid ReadableStream.");
+ // }
+ // preventClose = Boolean(preventClose);
+ // preventAbort = Boolean(preventAbort);
+ // preventCancel = Boolean(preventCancel);
+ // if (signal && !(signal instanceof AbortSignalImpl)) {
+ // throw new TypeError("Invalid signal.");
+ // }
+ // if (isReadableStreamLocked(this)) {
+ // throw new TypeError("ReadableStream is locked.");
+ // }
+ // if (isWritableStreamLocked(writable)) {
+ // throw new TypeError("writable is locked.");
+ // }
+ // readableStreamPipeTo(
+ // this,
+ // writable,
+ // preventClose,
+ // preventAbort,
+ // preventCancel,
+ // signal,
+ // );
+ // return readable;
+ }
+
+ pipeTo(): // dest: WritableStream<R>,
+ // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {},
+ Promise<void> {
+ return notImplemented();
+ // if (!isReadableStream(this)) {
+ // return Promise.reject(new TypeError("Invalid ReadableStream."));
+ // }
+ // if (!isWritableStream(dest)) {
+ // return Promise.reject(
+ // new TypeError("dest is not a valid WritableStream."),
+ // );
+ // }
+ // preventClose = Boolean(preventClose);
+ // preventAbort = Boolean(preventAbort);
+ // preventCancel = Boolean(preventCancel);
+ // if (signal && !(signal instanceof AbortSignalImpl)) {
+ // return Promise.reject(new TypeError("Invalid signal."));
+ // }
+ // if (isReadableStreamLocked(this)) {
+ // return Promise.reject(new TypeError("ReadableStream is locked."));
+ // }
+ // if (isWritableStreamLocked(this)) {
+ // return Promise.reject(new TypeError("dest is locked."));
+ // }
+ // return readableStreamPipeTo(
+ // this,
+ // dest,
+ // preventClose,
+ // preventAbort,
+ // preventCancel,
+ // signal,
+ // );
+ }
+
+ tee(): [ReadableStreamImpl<R>, ReadableStreamImpl<R>] {
+ if (!isReadableStream(this)) {
+ throw new TypeError("Invalid ReadableStream.");
+ }
+ return readableStreamTee(this, false);
+ }
+
+ [symbols.customInspect](): string {
+ return `ReadableStream { locked: ${String(this.locked)} }`;
+ }
+
+ [Symbol.asyncIterator](
+ options: {
+ preventCancel?: boolean;
+ } = {}
+ ): AsyncIterableIterator<R> {
+ return this.getIterator(options);
+ }
+}