summaryrefslogtreecommitdiff
path: root/cli/js/streams/readable-byte-stream-controller.ts
diff options
context:
space:
mode:
Diffstat (limited to 'cli/js/streams/readable-byte-stream-controller.ts')
-rw-r--r--cli/js/streams/readable-byte-stream-controller.ts214
1 files changed, 214 insertions, 0 deletions
diff --git a/cli/js/streams/readable-byte-stream-controller.ts b/cli/js/streams/readable-byte-stream-controller.ts
new file mode 100644
index 000000000..86efd416c
--- /dev/null
+++ b/cli/js/streams/readable-byte-stream-controller.ts
@@ -0,0 +1,214 @@
+// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+/**
+ * streams/readable-byte-stream-controller - ReadableByteStreamController class implementation
+ * Part of Stardazed
+ * (c) 2018-Present by Arthur Langereis - @zenmumbler
+ * https://github.com/stardazed/sd-streams
+ */
+
+/* eslint-disable @typescript-eslint/no-explicit-any */
+// TODO reenable this lint here
+
+import * as rs from "./readable-internals.ts";
+import * as q from "./queue-mixin.ts";
+import * as shared from "./shared-internals.ts";
+import { ReadableStreamBYOBRequest } from "./readable-stream-byob-request.ts";
+import { Queue } from "./queue.ts";
+import { UnderlyingByteSource } from "../dom_types.ts";
+
+export class ReadableByteStreamController
+ implements rs.SDReadableByteStreamController {
+ [rs.autoAllocateChunkSize_]: number | undefined;
+ [rs.byobRequest_]: rs.SDReadableStreamBYOBRequest | undefined;
+ [rs.cancelAlgorithm_]: rs.CancelAlgorithm;
+ [rs.closeRequested_]: boolean;
+ [rs.controlledReadableByteStream_]: rs.SDReadableStream<ArrayBufferView>;
+ [rs.pullAgain_]: boolean;
+ [rs.pullAlgorithm_]: rs.PullAlgorithm<ArrayBufferView>;
+ [rs.pulling_]: boolean;
+ [rs.pendingPullIntos_]: rs.PullIntoDescriptor[];
+ [rs.started_]: boolean;
+ [rs.strategyHWM_]: number;
+
+ [q.queue_]: Queue<{
+ buffer: ArrayBufferLike;
+ byteOffset: number;
+ byteLength: number;
+ }>;
+ [q.queueTotalSize_]: number;
+
+ constructor() {
+ throw new TypeError();
+ }
+
+ get byobRequest(): rs.SDReadableStreamBYOBRequest | undefined {
+ if (!rs.isReadableByteStreamController(this)) {
+ throw new TypeError();
+ }
+ if (
+ this[rs.byobRequest_] === undefined &&
+ this[rs.pendingPullIntos_].length > 0
+ ) {
+ const firstDescriptor = this[rs.pendingPullIntos_][0];
+ const view = new Uint8Array(
+ firstDescriptor.buffer,
+ firstDescriptor.byteOffset + firstDescriptor.bytesFilled,
+ firstDescriptor.byteLength - firstDescriptor.bytesFilled
+ );
+ const byobRequest = Object.create(
+ ReadableStreamBYOBRequest.prototype
+ ) as ReadableStreamBYOBRequest;
+ rs.setUpReadableStreamBYOBRequest(byobRequest, this, view);
+ this[rs.byobRequest_] = byobRequest;
+ }
+ return this[rs.byobRequest_];
+ }
+
+ get desiredSize(): number | null {
+ if (!rs.isReadableByteStreamController(this)) {
+ throw new TypeError();
+ }
+ return rs.readableByteStreamControllerGetDesiredSize(this);
+ }
+
+ close(): void {
+ if (!rs.isReadableByteStreamController(this)) {
+ throw new TypeError();
+ }
+ if (this[rs.closeRequested_]) {
+ throw new TypeError("Stream is already closing");
+ }
+ if (this[rs.controlledReadableByteStream_][shared.state_] !== "readable") {
+ throw new TypeError("Stream is closed or errored");
+ }
+ rs.readableByteStreamControllerClose(this);
+ }
+
+ enqueue(chunk: ArrayBufferView): void {
+ if (!rs.isReadableByteStreamController(this)) {
+ throw new TypeError();
+ }
+ if (this[rs.closeRequested_]) {
+ throw new TypeError("Stream is already closing");
+ }
+ if (this[rs.controlledReadableByteStream_][shared.state_] !== "readable") {
+ throw new TypeError("Stream is closed or errored");
+ }
+ if (!ArrayBuffer.isView(chunk)) {
+ throw new TypeError("chunk must be a valid ArrayBufferView");
+ }
+ // If ! IsDetachedBuffer(chunk.[[ViewedArrayBuffer]]) is true, throw a TypeError exception.
+ return rs.readableByteStreamControllerEnqueue(this, chunk);
+ }
+
+ error(error?: shared.ErrorResult): void {
+ if (!rs.isReadableByteStreamController(this)) {
+ throw new TypeError();
+ }
+ rs.readableByteStreamControllerError(this, error);
+ }
+
+ [rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> {
+ if (this[rs.pendingPullIntos_].length > 0) {
+ const firstDescriptor = this[rs.pendingPullIntos_][0];
+ firstDescriptor.bytesFilled = 0;
+ }
+ q.resetQueue(this);
+ const result = this[rs.cancelAlgorithm_](reason);
+ rs.readableByteStreamControllerClearAlgorithms(this);
+ return result;
+ }
+
+ [rs.pullSteps_](
+ forAuthorCode: boolean
+ ): Promise<IteratorResult<ArrayBufferView, any>> {
+ const stream = this[rs.controlledReadableByteStream_];
+ // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
+ if (this[q.queueTotalSize_] > 0) {
+ // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
+ const entry = this[q.queue_].shift()!;
+ this[q.queueTotalSize_] -= entry.byteLength;
+ rs.readableByteStreamControllerHandleQueueDrain(this);
+ const view = new Uint8Array(
+ entry.buffer,
+ entry.byteOffset,
+ entry.byteLength
+ );
+ return Promise.resolve(
+ rs.readableStreamCreateReadResult(view, false, forAuthorCode)
+ );
+ }
+ const autoAllocateChunkSize = this[rs.autoAllocateChunkSize_];
+ if (autoAllocateChunkSize !== undefined) {
+ let buffer: ArrayBuffer;
+ try {
+ buffer = new ArrayBuffer(autoAllocateChunkSize);
+ } catch (error) {
+ return Promise.reject(error);
+ }
+ const pullIntoDescriptor: rs.PullIntoDescriptor = {
+ buffer,
+ byteOffset: 0,
+ byteLength: autoAllocateChunkSize,
+ bytesFilled: 0,
+ elementSize: 1,
+ ctor: Uint8Array,
+ readerType: "default"
+ };
+ this[rs.pendingPullIntos_].push(pullIntoDescriptor);
+ }
+
+ const promise = rs.readableStreamAddReadRequest(stream, forAuthorCode);
+ rs.readableByteStreamControllerCallPullIfNeeded(this);
+ return promise;
+ }
+}
+
+export function setUpReadableByteStreamControllerFromUnderlyingSource(
+ stream: rs.SDReadableStream<ArrayBufferView>,
+ underlyingByteSource: UnderlyingByteSource,
+ highWaterMark: number
+): void {
+ // Assert: underlyingByteSource is not undefined.
+ const controller = Object.create(
+ ReadableByteStreamController.prototype
+ ) as ReadableByteStreamController;
+
+ const startAlgorithm = (): any => {
+ return shared.invokeOrNoop(underlyingByteSource, "start", [controller]);
+ };
+ const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
+ underlyingByteSource,
+ "pull",
+ [controller]
+ );
+ const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod(
+ underlyingByteSource,
+ "cancel",
+ []
+ );
+
+ let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;
+ if (autoAllocateChunkSize !== undefined) {
+ autoAllocateChunkSize = Number(autoAllocateChunkSize);
+ if (
+ !shared.isInteger(autoAllocateChunkSize) ||
+ autoAllocateChunkSize <= 0
+ ) {
+ throw new RangeError(
+ "autoAllocateChunkSize must be a positive, finite integer"
+ );
+ }
+ }
+ rs.setUpReadableByteStreamController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ autoAllocateChunkSize
+ );
+}