summaryrefslogtreecommitdiff
path: root/cli/js/streams/pipe-to.ts
diff options
context:
space:
mode:
Diffstat (limited to 'cli/js/streams/pipe-to.ts')
-rw-r--r--cli/js/streams/pipe-to.ts237
1 files changed, 237 insertions, 0 deletions
diff --git a/cli/js/streams/pipe-to.ts b/cli/js/streams/pipe-to.ts
new file mode 100644
index 000000000..3764e605b
--- /dev/null
+++ b/cli/js/streams/pipe-to.ts
@@ -0,0 +1,237 @@
+// TODO reenable this code when we enable writableStreams and transport types
+// // Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
+// // Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
+
+// /**
+// * streams/pipe-to - pipeTo algorithm implementation
+// * Part of Stardazed
+// * (c) 2018-Present by Arthur Langereis - @zenmumbler
+// * https://github.com/stardazed/sd-streams
+// */
+
+// /* eslint-disable @typescript-eslint/no-explicit-any */
+// // TODO reenable this lint here
+
+// import * as rs from "./readable-internals.ts";
+// import * as ws from "./writable-internals.ts";
+// import * as shared from "./shared-internals.ts";
+
+// import { ReadableStreamDefaultReader } from "./readable-stream-default-reader.ts";
+// import { WritableStreamDefaultWriter } from "./writable-stream-default-writer.ts";
+// import { PipeOptions } from "../dom_types.ts";
+// import { DenoError, ErrorKind } from "../errors.ts";
+
+// // add a wrapper to handle falsy rejections
+// interface ErrorWrapper {
+// actualError: shared.ErrorResult;
+// }
+
+// export function pipeTo<ChunkType>(
+// source: rs.SDReadableStream<ChunkType>,
+// dest: ws.WritableStream<ChunkType>,
+// options: PipeOptions
+// ): Promise<void> {
+// const preventClose = !!options.preventClose;
+// const preventAbort = !!options.preventAbort;
+// const preventCancel = !!options.preventCancel;
+// const signal = options.signal;
+
+// let shuttingDown = false;
+// let latestWrite = Promise.resolve();
+// const promise = shared.createControlledPromise<void>();
+
+// // If IsReadableByteStreamController(this.[[readableStreamController]]) is true, let reader be either ! AcquireReadableStreamBYOBReader(this) or ! AcquireReadableStreamDefaultReader(this), at the user agent’s discretion.
+// // Otherwise, let reader be ! AcquireReadableStreamDefaultReader(this).
+// const reader = new ReadableStreamDefaultReader(source);
+// const writer = new WritableStreamDefaultWriter(dest);
+
+// let abortAlgorithm: () => any;
+// if (signal !== undefined) {
+// abortAlgorithm = (): void => {
+// // TODO this should be a DOMException,
+// // https://github.com/stardazed/sd-streams/blob/master/packages/streams/src/pipe-to.ts#L38
+// const error = new DenoError(ErrorKind.AbortError, "Aborted");
+// const actions: Array<() => Promise<void>> = [];
+// if (preventAbort === false) {
+// actions.push(() => {
+// if (dest[shared.state_] === "writable") {
+// return ws.writableStreamAbort(dest, error);
+// }
+// return Promise.resolve();
+// });
+// }
+// if (preventCancel === false) {
+// actions.push(() => {
+// if (source[shared.state_] === "readable") {
+// return rs.readableStreamCancel(source, error);
+// }
+// return Promise.resolve();
+// });
+// }
+// shutDown(
+// () => {
+// return Promise.all(actions.map(a => a())).then(_ => undefined);
+// },
+// { actualError: error }
+// );
+// };
+
+// if (signal.aborted === true) {
+// abortAlgorithm();
+// } else {
+// signal.addEventListener("abort", abortAlgorithm);
+// }
+// }
+
+// function onStreamErrored(
+// stream: rs.SDReadableStream<ChunkType> | ws.WritableStream<ChunkType>,
+// promise: Promise<void>,
+// action: (error: shared.ErrorResult) => void
+// ): void {
+// if (stream[shared.state_] === "errored") {
+// action(stream[shared.storedError_]);
+// } else {
+// promise.catch(action);
+// }
+// }
+
+// function onStreamClosed(
+// stream: rs.SDReadableStream<ChunkType> | ws.WritableStream<ChunkType>,
+// promise: Promise<void>,
+// action: () => void
+// ): void {
+// if (stream[shared.state_] === "closed") {
+// action();
+// } else {
+// promise.then(action);
+// }
+// }
+
+// onStreamErrored(source, reader[rs.closedPromise_].promise, error => {
+// if (!preventAbort) {
+// shutDown(() => ws.writableStreamAbort(dest, error), {
+// actualError: error
+// });
+// } else {
+// shutDown(undefined, { actualError: error });
+// }
+// });
+
+// onStreamErrored(dest, writer[ws.closedPromise_].promise, error => {
+// if (!preventCancel) {
+// shutDown(() => rs.readableStreamCancel(source, error), {
+// actualError: error
+// });
+// } else {
+// shutDown(undefined, { actualError: error });
+// }
+// });
+
+// onStreamClosed(source, reader[rs.closedPromise_].promise, () => {
+// if (!preventClose) {
+// shutDown(() =>
+// ws.writableStreamDefaultWriterCloseWithErrorPropagation(writer)
+// );
+// } else {
+// shutDown();
+// }
+// });
+
+// if (
+// ws.writableStreamCloseQueuedOrInFlight(dest) ||
+// dest[shared.state_] === "closed"
+// ) {
+// // Assert: no chunks have been read or written.
+// const destClosed = new TypeError();
+// if (!preventCancel) {
+// shutDown(() => rs.readableStreamCancel(source, destClosed), {
+// actualError: destClosed
+// });
+// } else {
+// shutDown(undefined, { actualError: destClosed });
+// }
+// }
+
+// function awaitLatestWrite(): Promise<void> {
+// const curLatestWrite = latestWrite;
+// return latestWrite.then(() =>
+// curLatestWrite === latestWrite ? undefined : awaitLatestWrite()
+// );
+// }
+
+// function flushRemainder(): Promise<void> | undefined {
+// if (
+// dest[shared.state_] === "writable" &&
+// !ws.writableStreamCloseQueuedOrInFlight(dest)
+// ) {
+// return awaitLatestWrite();
+// } else {
+// return undefined;
+// }
+// }
+
+// function shutDown(action?: () => Promise<void>, error?: ErrorWrapper): void {
+// if (shuttingDown) {
+// return;
+// }
+// shuttingDown = true;
+
+// if (action === undefined) {
+// action = (): Promise<void> => Promise.resolve();
+// }
+
+// function finishShutDown(): void {
+// action!().then(
+// _ => finalize(error),
+// newError => finalize({ actualError: newError })
+// );
+// }
+
+// const flushWait = flushRemainder();
+// if (flushWait) {
+// flushWait.then(finishShutDown);
+// } else {
+// finishShutDown();
+// }
+// }
+
+// function finalize(error?: ErrorWrapper): void {
+// ws.writableStreamDefaultWriterRelease(writer);
+// rs.readableStreamReaderGenericRelease(reader);
+// if (signal && abortAlgorithm) {
+// signal.removeEventListener("abort", abortAlgorithm);
+// }
+// if (error) {
+// promise.reject(error.actualError);
+// } else {
+// promise.resolve(undefined);
+// }
+// }
+
+// function next(): Promise<void> | undefined {
+// if (shuttingDown) {
+// return;
+// }
+
+// writer[ws.readyPromise_].promise.then(() => {
+// rs.readableStreamDefaultReaderRead(reader).then(
+// ({ value, done }) => {
+// if (done) {
+// return;
+// }
+// latestWrite = ws
+// .writableStreamDefaultWriterWrite(writer, value!)
+// .catch(() => {});
+// next();
+// },
+// _error => {
+// latestWrite = Promise.resolve();
+// }
+// );
+// });
+// }
+
+// next();
+
+// return promise.promise;
+// }