diff options
author | Nick Stott <nick@nickstott.com> | 2019-10-28 12:41:36 -0400 |
---|---|---|
committer | Ry Dahl <ry@tinyclouds.org> | 2019-10-28 12:41:36 -0400 |
commit | 65d9286203cf239f68c6015818e82e8521e600a1 (patch) | |
tree | 0af1a7be449036f2f4ae9d3ecf06b7d645c8bddc /cli/js/streams/readable-stream-default-controller.ts | |
parent | 967c236fa5fb1e87e1b5ee788fe77d3a07361da1 (diff) |
Re-enable basic stream support for fetch bodies (#3192)
* Add sd-streams from https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/
* change the interfaces in dom_types to match what sd-streams expects
Diffstat (limited to 'cli/js/streams/readable-stream-default-controller.ts')
-rw-r--r-- | cli/js/streams/readable-stream-default-controller.ts | 139 |
1 files changed, 139 insertions, 0 deletions
diff --git a/cli/js/streams/readable-stream-default-controller.ts b/cli/js/streams/readable-stream-default-controller.ts new file mode 100644 index 000000000..e9ddce1bc --- /dev/null +++ b/cli/js/streams/readable-stream-default-controller.ts @@ -0,0 +1,139 @@ +// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546 +// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT + +/** + * streams/readable-stream-default-controller - ReadableStreamDefaultController class implementation + * Part of Stardazed + * (c) 2018-Present by Arthur Langereis - @zenmumbler + * https://github.com/stardazed/sd-streams + */ + +/* eslint-disable @typescript-eslint/no-explicit-any */ +// TODO reenable this lint here + +import * as rs from "./readable-internals.ts"; +import * as shared from "./shared-internals.ts"; +import * as q from "./queue-mixin.ts"; +import { Queue } from "./queue.ts"; +import { QueuingStrategySizeCallback, UnderlyingSource } from "../dom_types.ts"; + +export class ReadableStreamDefaultController<OutputType> + implements rs.SDReadableStreamDefaultController<OutputType> { + [rs.cancelAlgorithm_]: rs.CancelAlgorithm; + [rs.closeRequested_]: boolean; + [rs.controlledReadableStream_]: rs.SDReadableStream<OutputType>; + [rs.pullAgain_]: boolean; + [rs.pullAlgorithm_]: rs.PullAlgorithm<OutputType>; + [rs.pulling_]: boolean; + [rs.strategyHWM_]: number; + [rs.strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>; + [rs.started_]: boolean; + + [q.queue_]: Queue<q.QueueElement<OutputType>>; + [q.queueTotalSize_]: number; + + constructor() { + throw new TypeError(); + } + + get desiredSize(): number | null { + return rs.readableStreamDefaultControllerGetDesiredSize(this); + } + + close(): void { + if (!rs.isReadableStreamDefaultController(this)) { + throw new TypeError(); + } + if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) { + throw new TypeError( + "Cannot close, the stream is already closing or not readable" + ); + } + rs.readableStreamDefaultControllerClose(this); + } + + enqueue(chunk?: OutputType): void { + if (!rs.isReadableStreamDefaultController(this)) { + throw new TypeError(); + } + if (!rs.readableStreamDefaultControllerCanCloseOrEnqueue(this)) { + throw new TypeError( + "Cannot enqueue, the stream is closing or not readable" + ); + } + rs.readableStreamDefaultControllerEnqueue(this, chunk!); + } + + error(e?: shared.ErrorResult): void { + if (!rs.isReadableStreamDefaultController(this)) { + throw new TypeError(); + } + rs.readableStreamDefaultControllerError(this, e); + } + + [rs.cancelSteps_](reason: shared.ErrorResult): Promise<void> { + q.resetQueue(this); + const result = this[rs.cancelAlgorithm_](reason); + rs.readableStreamDefaultControllerClearAlgorithms(this); + return result; + } + + [rs.pullSteps_]( + forAuthorCode: boolean + ): Promise<IteratorResult<OutputType, any>> { + const stream = this[rs.controlledReadableStream_]; + if (this[q.queue_].length > 0) { + const chunk = q.dequeueValue(this); + if (this[rs.closeRequested_] && this[q.queue_].length === 0) { + rs.readableStreamDefaultControllerClearAlgorithms(this); + rs.readableStreamClose(stream); + } else { + rs.readableStreamDefaultControllerCallPullIfNeeded(this); + } + return Promise.resolve( + rs.readableStreamCreateReadResult(chunk, false, forAuthorCode) + ); + } + + const pendingPromise = rs.readableStreamAddReadRequest( + stream, + forAuthorCode + ); + rs.readableStreamDefaultControllerCallPullIfNeeded(this); + return pendingPromise; + } +} + +export function setUpReadableStreamDefaultControllerFromUnderlyingSource< + OutputType +>( + stream: rs.SDReadableStream<OutputType>, + underlyingSource: UnderlyingSource<OutputType>, + highWaterMark: number, + sizeAlgorithm: QueuingStrategySizeCallback<OutputType> +): void { + // Assert: underlyingSource is not undefined. + const controller = Object.create(ReadableStreamDefaultController.prototype); + const startAlgorithm = (): any => { + return shared.invokeOrNoop(underlyingSource, "start", [controller]); + }; + const pullAlgorithm = shared.createAlgorithmFromUnderlyingMethod( + underlyingSource, + "pull", + [controller] + ); + const cancelAlgorithm = shared.createAlgorithmFromUnderlyingMethod( + underlyingSource, + "cancel", + [] + ); + rs.setUpReadableStreamDefaultController( + stream, + controller, + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + highWaterMark, + sizeAlgorithm + ); +} |