summaryrefslogtreecommitdiff
path: root/cli/js/web/streams/readable_stream.ts
diff options
context:
space:
mode:
Diffstat (limited to 'cli/js/web/streams/readable_stream.ts')
-rw-r--r--cli/js/web/streams/readable_stream.ts158
1 files changed, 83 insertions, 75 deletions
diff --git a/cli/js/web/streams/readable_stream.ts b/cli/js/web/streams/readable_stream.ts
index e234c909d..27a733d9d 100644
--- a/cli/js/web/streams/readable_stream.ts
+++ b/cli/js/web/streams/readable_stream.ts
@@ -6,9 +6,14 @@ import {
isReadableStream,
isReadableStreamLocked,
isUnderlyingByteSource,
+ isWritableStream,
+ isWritableStreamLocked,
makeSizeAlgorithmFromSizeFunction,
+ setFunctionName,
+ setPromiseIsHandledToTrue,
readableStreamCancel,
ReadableStreamGenericReader,
+ readableStreamPipeTo,
readableStreamTee,
setUpReadableByteStreamControllerFromUnderlyingSource,
setUpReadableStreamDefaultControllerFromUnderlyingSource,
@@ -18,8 +23,8 @@ import { ReadableByteStreamControllerImpl } from "./readable_byte_stream_control
import { ReadableStreamAsyncIteratorPrototype } from "./readable_stream_async_iterator.ts";
import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
import * as sym from "./symbols.ts";
-import { notImplemented } from "../../util.ts";
-import { customInspect } from "../../web/console.ts";
+import { customInspect } from "../console.ts";
+import { AbortSignalImpl } from "../abort_signal.ts";
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
@@ -119,80 +124,81 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
throw new RangeError(`Unsupported mode "${mode}"`);
}
- pipeThrough<T>(): // {
- // writable,
- // readable,
- // }: {
- // writable: WritableStream<R>;
- // readable: ReadableStream<T>;
- // },
- // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {},
- ReadableStream<T> {
- return notImplemented();
- // if (!isReadableStream(this)) {
- // throw new TypeError("Invalid ReadableStream.");
- // }
- // if (!isWritableStream(writable)) {
- // throw new TypeError("writable is not a valid WritableStream.");
- // }
- // if (!isReadableStream(readable)) {
- // throw new TypeError("readable is not a valid ReadableStream.");
- // }
- // preventClose = Boolean(preventClose);
- // preventAbort = Boolean(preventAbort);
- // preventCancel = Boolean(preventCancel);
- // if (signal && !(signal instanceof AbortSignalImpl)) {
- // throw new TypeError("Invalid signal.");
- // }
- // if (isReadableStreamLocked(this)) {
- // throw new TypeError("ReadableStream is locked.");
- // }
- // if (isWritableStreamLocked(writable)) {
- // throw new TypeError("writable is locked.");
- // }
- // readableStreamPipeTo(
- // this,
- // writable,
- // preventClose,
- // preventAbort,
- // preventCancel,
- // signal,
- // );
- // return readable;
+ pipeThrough<T>(
+ {
+ writable,
+ readable,
+ }: {
+ writable: WritableStream<R>;
+ readable: ReadableStream<T>;
+ },
+ { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}
+ ): ReadableStream<T> {
+ if (!isReadableStream(this)) {
+ throw new TypeError("Invalid ReadableStream.");
+ }
+ if (!isWritableStream(writable)) {
+ throw new TypeError("writable is not a valid WritableStream.");
+ }
+ if (!isReadableStream(readable)) {
+ throw new TypeError("readable is not a valid ReadableStream.");
+ }
+ preventClose = Boolean(preventClose);
+ preventAbort = Boolean(preventAbort);
+ preventCancel = Boolean(preventCancel);
+ if (signal && !(signal instanceof AbortSignalImpl)) {
+ throw new TypeError("Invalid signal.");
+ }
+ if (isReadableStreamLocked(this)) {
+ throw new TypeError("ReadableStream is locked.");
+ }
+ if (isWritableStreamLocked(writable)) {
+ throw new TypeError("writable is locked.");
+ }
+ const promise = readableStreamPipeTo(
+ this,
+ writable,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal
+ );
+ setPromiseIsHandledToTrue(promise);
+ return readable;
}
- pipeTo(): // dest: WritableStream<R>,
- // { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {},
- Promise<void> {
- return notImplemented();
- // if (!isReadableStream(this)) {
- // return Promise.reject(new TypeError("Invalid ReadableStream."));
- // }
- // if (!isWritableStream(dest)) {
- // return Promise.reject(
- // new TypeError("dest is not a valid WritableStream."),
- // );
- // }
- // preventClose = Boolean(preventClose);
- // preventAbort = Boolean(preventAbort);
- // preventCancel = Boolean(preventCancel);
- // if (signal && !(signal instanceof AbortSignalImpl)) {
- // return Promise.reject(new TypeError("Invalid signal."));
- // }
- // if (isReadableStreamLocked(this)) {
- // return Promise.reject(new TypeError("ReadableStream is locked."));
- // }
- // if (isWritableStreamLocked(this)) {
- // return Promise.reject(new TypeError("dest is locked."));
- // }
- // return readableStreamPipeTo(
- // this,
- // dest,
- // preventClose,
- // preventAbort,
- // preventCancel,
- // signal,
- // );
+ pipeTo(
+ dest: WritableStream<R>,
+ { preventClose, preventAbort, preventCancel, signal }: PipeOptions = {}
+ ): Promise<void> {
+ if (!isReadableStream(this)) {
+ return Promise.reject(new TypeError("Invalid ReadableStream."));
+ }
+ if (!isWritableStream(dest)) {
+ return Promise.reject(
+ new TypeError("dest is not a valid WritableStream.")
+ );
+ }
+ preventClose = Boolean(preventClose);
+ preventAbort = Boolean(preventAbort);
+ preventCancel = Boolean(preventCancel);
+ if (signal && !(signal instanceof AbortSignalImpl)) {
+ return Promise.reject(new TypeError("Invalid signal."));
+ }
+ if (isReadableStreamLocked(this)) {
+ return Promise.reject(new TypeError("ReadableStream is locked."));
+ }
+ if (isWritableStreamLocked(dest)) {
+ return Promise.reject(new TypeError("dest is locked."));
+ }
+ return readableStreamPipeTo(
+ this,
+ dest,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal
+ );
}
tee(): [ReadableStreamImpl<R>, ReadableStreamImpl<R>] {
@@ -203,7 +209,7 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
}
[customInspect](): string {
- return `ReadableStream { locked: ${String(this.locked)} }`;
+ return `${this.constructor.name} { locked: ${String(this.locked)} }`;
}
[Symbol.asyncIterator](
@@ -214,3 +220,5 @@ export class ReadableStreamImpl<R = any> implements ReadableStream<R> {
return this.getIterator(options);
}
}
+
+setFunctionName(ReadableStreamImpl, "ReadableStream");