diff options
Diffstat (limited to 'cli/js/web/streams/readable_stream.ts')
-rw-r--r-- | cli/js/web/streams/readable_stream.ts | 158 |
1 files changed, 83 insertions, 75 deletions
diff --git a/cli/js/web/streams/readable_stream.ts b/cli/js/web/streams/readable_stream.ts index e234c909d..27a733d9d 100644 --- a/cli/js/web/streams/readable_stream.ts +++ b/cli/js/web/streams/readable_stream.ts @@ -6,9 +6,14 @@ import { isReadableStream, isReadableStreamLocked, isUnderlyingByteSource, + isWritableStream, + isWritableStreamLocked, makeSizeAlgorithmFromSizeFunction, + setFunctionName, + setPromiseIsHandledToTrue, readableStreamCancel, ReadableStreamGenericReader, + readableStreamPipeTo, readableStreamTee, setUpReadableByteStreamControllerFromUnderlyingSource, setUpReadableStreamDefaultControllerFromUnderlyingSource, @@ -18,8 +23,8 @@ import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_control import { ReadableStreamAsyncIteratorPrototype } from "./readable_stream_async_iterator.ts"; import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts"; import * as sym from "./symbols.ts"; -import { notImplemented } from "../../util.ts"; -import { customInspect } from "../../web/console.ts"; +import { customInspect } from "../console.ts"; +import { AbortSignalImpl } from "../abort_signal.ts"; // eslint-disable-next-line @typescript-eslint/no-explicit-any export class ReadableStreamImpl<R = any> implements ReadableStream<R> { @@ -119,80 +124,81 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> { throw new RangeError(`Unsupported mode "${mode}"`); } - pipeThrough<T>(): // { - // writable, - // readable, - // }: { - // writable: WritableStream<R>; - // readable: ReadableStream<T>; - // }, - // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}, - ReadableStream<T> { - return notImplemented(); - // if (!isReadableStream(this)) { - // throw new TypeError("Invalid ReadableStream."); - // } - // if (!isWritableStream(writable)) { - // throw new TypeError("writable is not a valid WritableStream."); - // } - // if (!isReadableStream(readable)) { - // throw new TypeError("readable is not a valid ReadableStream."); - // } - // preventClose = Boolean(preventClose); - // preventAbort = Boolean(preventAbort); - // preventCancel = Boolean(preventCancel); - // if (signal && !(signal instanceof AbortSignalImpl)) { - // throw new TypeError("Invalid signal."); - // } - // if (isReadableStreamLocked(this)) { - // throw new TypeError("ReadableStream is locked."); - // } - // if (isWritableStreamLocked(writable)) { - // throw new TypeError("writable is locked."); - // } - // readableStreamPipeTo( - // this, - // writable, - // preventClose, - // preventAbort, - // preventCancel, - // signal, - // ); - // return readable; + pipeThrough<T>( + { + writable, + readable, + }: { + writable: WritableStream<R>; + readable: ReadableStream<T>; + }, + { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {} + ): ReadableStream<T> { + if (!isReadableStream(this)) { + throw new TypeError("Invalid ReadableStream."); + } + if (!isWritableStream(writable)) { + throw new TypeError("writable is not a valid WritableStream."); + } + if (!isReadableStream(readable)) { + throw new TypeError("readable is not a valid ReadableStream."); + } + preventClose = Boolean(preventClose); + preventAbort = Boolean(preventAbort); + preventCancel = Boolean(preventCancel); + if (signal && !(signal instanceof AbortSignalImpl)) { + throw new TypeError("Invalid signal."); + } + if (isReadableStreamLocked(this)) { + throw new TypeError("ReadableStream is locked."); + } + if (isWritableStreamLocked(writable)) { + throw new TypeError("writable is locked."); + } + const promise = readableStreamPipeTo( + this, + writable, + preventClose, + preventAbort, + preventCancel, + signal + ); + setPromiseIsHandledToTrue(promise); + return readable; } - pipeTo(): // dest: WritableStream<R>, - // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}, - Promise<void> { - return notImplemented(); - // if (!isReadableStream(this)) { - // return Promise.reject(new TypeError("Invalid ReadableStream.")); - // } - // if (!isWritableStream(dest)) { - // return Promise.reject( - // new TypeError("dest is not a valid WritableStream."), - // ); - // } - // preventClose = Boolean(preventClose); - // preventAbort = Boolean(preventAbort); - // preventCancel = Boolean(preventCancel); - // if (signal && !(signal instanceof AbortSignalImpl)) { - // return Promise.reject(new TypeError("Invalid signal.")); - // } - // if (isReadableStreamLocked(this)) { - // return Promise.reject(new TypeError("ReadableStream is locked.")); - // } - // if (isWritableStreamLocked(this)) { - // return Promise.reject(new TypeError("dest is locked.")); - // } - // return readableStreamPipeTo( - // this, - // dest, - // preventClose, - // preventAbort, - // preventCancel, - // signal, - // ); + pipeTo( + dest: WritableStream<R>, + { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {} + ): Promise<void> { + if (!isReadableStream(this)) { + return Promise.reject(new TypeError("Invalid ReadableStream.")); + } + if (!isWritableStream(dest)) { + return Promise.reject( + new TypeError("dest is not a valid WritableStream.") + ); + } + preventClose = Boolean(preventClose); + preventAbort = Boolean(preventAbort); + preventCancel = Boolean(preventCancel); + if (signal && !(signal instanceof AbortSignalImpl)) { + return Promise.reject(new TypeError("Invalid signal.")); + } + if (isReadableStreamLocked(this)) { + return Promise.reject(new TypeError("ReadableStream is locked.")); + } + if (isWritableStreamLocked(dest)) { + return Promise.reject(new TypeError("dest is locked.")); + } + return readableStreamPipeTo( + this, + dest, + preventClose, + preventAbort, + preventCancel, + signal + ); } tee(): [ReadableStreamImpl<R>, ReadableStreamImpl<R>] { @@ -203,7 +209,7 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> { } [customInspect](): string { - return `ReadableStream { locked: ${String(this.locked)} }`; + return `${this.constructor.name} { locked: ${String(this.locked)} }`; } [Symbol.asyncIterator]( @@ -214,3 +220,5 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> { return this.getIterator(options); } } + +setFunctionName(ReadableStreamImpl, "ReadableStream"); |