summaryrefslogtreecommitdiff
path: root/cli/js/web/streams/readable_stream_default_controller.ts
diff options
context:
space:
mode:
authorKitson Kelly <me@kitsonkelly.com>2020-04-23 00:06:51 +1000
committerGitHub <noreply@github.com>2020-04-22 10:06:51 -0400
commit8bcfc03d71cbd2cfd7ab68035ec0968d9f93b5b8 (patch)
treee1769ca51d2afde57ae18eb25b7a91388fcbf00a /cli/js/web/streams/readable_stream_default_controller.ts
parentb270d6c8d090669601465f8c9c94512d6c6a07d4 (diff)
Rewrite streams (#4842)
Diffstat (limited to 'cli/js/web/streams/readable_stream_default_controller.ts')
-rw-r--r--cli/js/web/streams/readable_stream_default_controller.ts120
1 files changed, 120 insertions, 0 deletions
diff --git a/cli/js/web/streams/readable_stream_default_controller.ts b/cli/js/web/streams/readable_stream_default_controller.ts
new file mode 100644
index 000000000..866c6d79e
--- /dev/null
+++ b/cli/js/web/streams/readable_stream_default_controller.ts
@@ -0,0 +1,120 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import {
+ CancelAlgorithm,
+ dequeueValue,
+ isReadableStreamDefaultController,
+ Pair,
+ PullAlgorithm,
+ readableStreamAddReadRequest,
+ readableStreamClose,
+ readableStreamCreateReadResult,
+ readableStreamDefaultControllerCallPullIfNeeded,
+ readableStreamDefaultControllerCanCloseOrEnqueue,
+ readableStreamDefaultControllerClearAlgorithms,
+ readableStreamDefaultControllerClose,
+ readableStreamDefaultControllerEnqueue,
+ readableStreamDefaultControllerError,
+ readableStreamDefaultControllerGetDesiredSize,
+ resetQueue,
+ SizeAlgorithm,
+} from "./internals.ts";
+import { ReadableStreamImpl } from "./readable_stream.ts";
+import * as sym from "./symbols.ts";
+import { symbols } from "../../symbols.ts";
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export class ReadableStreamDefaultControllerImpl<R = any>
+ implements ReadableStreamDefaultController<R> {
+ [sym.cancelAlgorithm]: CancelAlgorithm;
+ [sym.closeRequested]: boolean;
+ [sym.controlledReadableStream]: ReadableStreamImpl<R>;
+ [sym.pullAgain]: boolean;
+ [sym.pullAlgorithm]: PullAlgorithm;
+ [sym.pulling]: boolean;
+ [sym.queue]: Array<Pair<R>>;
+ [sym.queueTotalSize]: number;
+ [sym.started]: boolean;
+ [sym.strategyHWM]: number;
+ [sym.strategySizeAlgorithm]: SizeAlgorithm<R>;
+
+ private constructor() {
+ throw new TypeError(
+ "ReadableStreamDefaultController's constructor cannot be called."
+ );
+ }
+
+ get desiredSize(): number | null {
+ if (!isReadableStreamDefaultController(this)) {
+ throw new TypeError("Invalid ReadableStreamDefaultController.");
+ }
+ return readableStreamDefaultControllerGetDesiredSize(this);
+ }
+
+ close(): void {
+ if (!isReadableStreamDefaultController(this)) {
+ throw new TypeError("Invalid ReadableStreamDefaultController.");
+ }
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
+ throw new TypeError(
+ "ReadableStreamDefaultController cannot close or enqueue."
+ );
+ }
+ readableStreamDefaultControllerClose(this);
+ }
+
+ enqueue(chunk: R): void {
+ if (!isReadableStreamDefaultController(this)) {
+ throw new TypeError("Invalid ReadableStreamDefaultController.");
+ }
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
+ throw new TypeError("ReadableSteamController cannot enqueue.");
+ }
+ return readableStreamDefaultControllerEnqueue(this, chunk);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ error(error?: any): void {
+ if (!isReadableStreamDefaultController(this)) {
+ throw new TypeError("Invalid ReadableStreamDefaultController.");
+ }
+ readableStreamDefaultControllerError(this, error);
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ [sym.cancelSteps](reason?: any): PromiseLike<void> {
+ resetQueue(this);
+ const result = this[sym.cancelAlgorithm](reason);
+ readableStreamDefaultControllerClearAlgorithms(this);
+ return result;
+ }
+
+ [sym.pullSteps](): Promise<ReadableStreamReadResult<R>> {
+ const stream = this[sym.controlledReadableStream];
+ if (this[sym.queue].length) {
+ const chunk = dequeueValue<R>(this);
+ if (this[sym.closeRequested] && this[sym.queue].length === 0) {
+ readableStreamDefaultControllerClearAlgorithms(this);
+ readableStreamClose(stream);
+ } else {
+ readableStreamDefaultControllerCallPullIfNeeded(this);
+ }
+ return Promise.resolve(
+ readableStreamCreateReadResult(
+ chunk,
+ false,
+ stream[sym.reader]![sym.forAuthorCode]
+ )
+ );
+ }
+ const pendingPromise = readableStreamAddReadRequest(stream);
+ readableStreamDefaultControllerCallPullIfNeeded(this);
+ return pendingPromise;
+ }
+
+ [symbols.customInspect](): string {
+ return `ReadableStreamDefaultController { desiredSize: ${String(
+ this.desiredSize
+ )} }`;
+ }
+}