diff options
Diffstat (limited to 'cli/js/web/streams/readable_stream.ts')
-rw-r--r-- | cli/js/web/streams/readable_stream.ts | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/cli/js/web/streams/readable_stream.ts b/cli/js/web/streams/readable_stream.ts new file mode 100644 index 000000000..f606319b1 --- /dev/null +++ b/cli/js/web/streams/readable_stream.ts @@ -0,0 +1,216 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +import { + acquireReadableStreamDefaultReader, + initializeReadableStream, + isReadableStream, + isReadableStreamLocked, + isUnderlyingByteSource, + makeSizeAlgorithmFromSizeFunction, + readableStreamCancel, + ReadableStreamGenericReader, + readableStreamTee, + setUpReadableByteStreamControllerFromUnderlyingSource, + setUpReadableStreamDefaultControllerFromUnderlyingSource, + validateAndNormalizeHighWaterMark, +} from "./internals.ts"; +import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_controller.ts"; +import { ReadableStreamAsyncIteratorPrototype } from "./readable_stream_async_iterator.ts"; +import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts"; +import * as sym from "./symbols.ts"; +import { symbols } from "../../symbols.ts"; +import { notImplemented } from "../../util.ts"; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export class ReadableStreamImpl<R = any> implements ReadableStream<R> { + [sym.disturbed]: boolean; + [sym.readableStreamController]: + | ReadableStreamDefaultControllerImpl<R> + | ReadableByteStreamControllerImpl; + [sym.reader]: ReadableStreamGenericReader<R> | undefined; + [sym.state]: "readable" | "closed" | "errored"; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [sym.storedError]: any; + + constructor( + underlyingSource: UnderlyingByteSource | UnderlyingSource<R> = {}, + strategy: + | { + highWaterMark?: number; + size?: undefined; + } + | QueuingStrategy<R> = {} + ) { + initializeReadableStream(this); + const { size } = strategy; + let { highWaterMark } = strategy; + const { type } = underlyingSource; + + if (isUnderlyingByteSource(underlyingSource)) { + if (size !== undefined) { + throw new RangeError( + `When underlying source is "bytes", strategy.size must be undefined.` + ); + } + highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 0); + setUpReadableByteStreamControllerFromUnderlyingSource( + this, + underlyingSource, + highWaterMark + ); + } else if (type === undefined) { + const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size); + highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 1); + setUpReadableStreamDefaultControllerFromUnderlyingSource( + this, + underlyingSource, + highWaterMark, + sizeAlgorithm + ); + } else { + throw new RangeError( + `Valid values for underlyingSource are "bytes" or undefined. Received: "${type}".` + ); + } + } + + get locked(): boolean { + if (!isReadableStream(this)) { + throw new TypeError("Invalid ReadableStream."); + } + return isReadableStreamLocked(this); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + cancel(reason?: any): Promise<void> { + if (!isReadableStream(this)) { + return Promise.reject(new TypeError("Invalid ReadableStream.")); + } + if (isReadableStreamLocked(this)) { + return Promise.reject( + new TypeError("Cannot cancel a locked ReadableStream.") + ); + } + return readableStreamCancel(this, reason); + } + + getIterator({ + preventCancel, + }: { preventCancel?: boolean } = {}): AsyncIterableIterator<R> { + if (!isReadableStream(this)) { + throw new TypeError("Invalid ReadableStream."); + } + const reader = acquireReadableStreamDefaultReader(this); + const iterator = Object.create(ReadableStreamAsyncIteratorPrototype); + iterator[sym.asyncIteratorReader] = reader; + iterator[sym.preventCancel] = Boolean(preventCancel); + return iterator; + } + + getReader({ mode }: { mode?: string } = {}): ReadableStreamDefaultReader<R> { + if (!isReadableStream(this)) { + throw new TypeError("Invalid ReadableStream."); + } + if (mode === undefined) { + return acquireReadableStreamDefaultReader(this, true); + } + mode = String(mode); + // 3.2.5.4.4 If mode is "byob", return ? AcquireReadableStreamBYOBReader(this, true). + throw new RangeError(`Unsupported mode "${mode}"`); + } + + pipeThrough<T>(): // { + // writable, + // readable, + // }: { + // writable: WritableStream<R>; + // readable: ReadableStream<T>; + // }, + // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}, + ReadableStream<T> { + return notImplemented(); + // if (!isReadableStream(this)) { + // throw new TypeError("Invalid ReadableStream."); + // } + // if (!isWritableStream(writable)) { + // throw new TypeError("writable is not a valid WritableStream."); + // } + // if (!isReadableStream(readable)) { + // throw new TypeError("readable is not a valid ReadableStream."); + // } + // preventClose = Boolean(preventClose); + // preventAbort = Boolean(preventAbort); + // preventCancel = Boolean(preventCancel); + // if (signal && !(signal instanceof AbortSignalImpl)) { + // throw new TypeError("Invalid signal."); + // } + // if (isReadableStreamLocked(this)) { + // throw new TypeError("ReadableStream is locked."); + // } + // if (isWritableStreamLocked(writable)) { + // throw new TypeError("writable is locked."); + // } + // readableStreamPipeTo( + // this, + // writable, + // preventClose, + // preventAbort, + // preventCancel, + // signal, + // ); + // return readable; + } + + pipeTo(): // dest: WritableStream<R>, + // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}, + Promise<void> { + return notImplemented(); + // if (!isReadableStream(this)) { + // return Promise.reject(new TypeError("Invalid ReadableStream.")); + // } + // if (!isWritableStream(dest)) { + // return Promise.reject( + // new TypeError("dest is not a valid WritableStream."), + // ); + // } + // preventClose = Boolean(preventClose); + // preventAbort = Boolean(preventAbort); + // preventCancel = Boolean(preventCancel); + // if (signal && !(signal instanceof AbortSignalImpl)) { + // return Promise.reject(new TypeError("Invalid signal.")); + // } + // if (isReadableStreamLocked(this)) { + // return Promise.reject(new TypeError("ReadableStream is locked.")); + // } + // if (isWritableStreamLocked(this)) { + // return Promise.reject(new TypeError("dest is locked.")); + // } + // return readableStreamPipeTo( + // this, + // dest, + // preventClose, + // preventAbort, + // preventCancel, + // signal, + // ); + } + + tee(): [ReadableStreamImpl<R>, ReadableStreamImpl<R>] { + if (!isReadableStream(this)) { + throw new TypeError("Invalid ReadableStream."); + } + return readableStreamTee(this, false); + } + + [symbols.customInspect](): string { + return `ReadableStream { locked: ${String(this.locked)} }`; + } + + [Symbol.asyncIterator]( + options: { + preventCancel?: boolean; + } = {} + ): AsyncIterableIterator<R> { + return this.getIterator(options); + } +} |