summaryrefslogtreecommitdiff
path: root/std/node/_stream/pipeline.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/pipeline.ts')
-rw-r--r--std/node/_stream/pipeline.ts308
1 files changed, 0 insertions, 308 deletions
diff --git a/std/node/_stream/pipeline.ts b/std/node/_stream/pipeline.ts
deleted file mode 100644
index d02a92870..000000000
--- a/std/node/_stream/pipeline.ts
+++ /dev/null
@@ -1,308 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { once } from "../_utils.ts";
-import { destroyer as implDestroyer } from "./destroy.ts";
-import eos from "./end_of_stream.ts";
-import createReadableStreamAsyncIterator from "./async_iterator.ts";
-import * as events from "../events.ts";
-import PassThrough from "./passthrough.ts";
-import {
- ERR_INVALID_ARG_TYPE,
- ERR_INVALID_CALLBACK,
- ERR_INVALID_RETURN_VALUE,
- ERR_MISSING_ARGS,
- ERR_STREAM_DESTROYED,
- NodeErrorAbstraction,
-} from "../_errors.ts";
-import type Duplex from "./duplex.ts";
-import type Readable from "./readable.ts";
-import type Stream from "./stream.ts";
-import type Transform from "./transform.ts";
-import type Writable from "./writable.ts";
-
-type Streams = Duplex | Readable | Writable;
-// deno-lint-ignore no-explicit-any
-type EndCallback = (err?: NodeErrorAbstraction | null, val?: any) => void;
-type TransformCallback =
- // deno-lint-ignore no-explicit-any
- | ((value?: any) => AsyncGenerator<any>)
- // deno-lint-ignore no-explicit-any
- | ((value?: any) => Promise<any>);
-/**
- * This type represents an array that contains a data source,
- * many Transform Streams, a writable stream destination
- * and end in an optional callback
- * */
-type DataSource =
- // deno-lint-ignore no-explicit-any
- | (() => AsyncGenerator<any>)
- | // deno-lint-ignore no-explicit-any
- AsyncIterable<any>
- | Duplex
- | // deno-lint-ignore no-explicit-any
- Iterable<any>
- | // deno-lint-ignore no-explicit-any
- (() => Generator<any>)
- | Readable;
-type Transformers = Duplex | Transform | TransformCallback | Writable;
-export type PipelineArguments = [
- DataSource,
- ...Array<Transformers | EndCallback>,
-];
-
-function destroyer(
- stream: Streams,
- reading: boolean,
- writing: boolean,
- callback: EndCallback,
-) {
- callback = once(callback);
-
- let finished = false;
- stream.on("close", () => {
- finished = true;
- });
-
- eos(stream, { readable: reading, writable: writing }, (err) => {
- finished = !err;
-
- // deno-lint-ignore no-explicit-any
- const rState = (stream as any)?._readableState;
- if (
- err &&
- err.code === "ERR_STREAM_PREMATURE_CLOSE" &&
- reading &&
- (rState?.ended && !rState?.errored && !rState?.errorEmitted)
- ) {
- stream
- .once("end", callback)
- .once("error", callback);
- } else {
- callback(err);
- }
- });
-
- return (err: NodeErrorAbstraction) => {
- if (finished) return;
- finished = true;
- implDestroyer(stream, err);
- callback(err || new ERR_STREAM_DESTROYED("pipe"));
- };
-}
-
-function popCallback(streams: PipelineArguments): EndCallback {
- if (typeof streams[streams.length - 1] !== "function") {
- throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]);
- }
- return streams.pop() as EndCallback;
-}
-
-// function isPromise(obj) {
-// return !!(obj && typeof obj.then === "function");
-// }
-
-// deno-lint-ignore no-explicit-any
-function isReadable(obj: any): obj is Stream {
- return !!(obj && typeof obj.pipe === "function");
-}
-
-// deno-lint-ignore no-explicit-any
-function isWritable(obj: any) {
- return !!(obj && typeof obj.write === "function");
-}
-
-// deno-lint-ignore no-explicit-any
-function isStream(obj: any) {
- return isReadable(obj) || isWritable(obj);
-}
-
-// deno-lint-ignore no-explicit-any
-function isIterable(obj: any, isAsync?: boolean) {
- if (!obj) return false;
- if (isAsync === true) return typeof obj[Symbol.asyncIterator] === "function";
- if (isAsync === false) return typeof obj[Symbol.iterator] === "function";
- return typeof obj[Symbol.asyncIterator] === "function" ||
- typeof obj[Symbol.iterator] === "function";
-}
-
-// deno-lint-ignore no-explicit-any
-function makeAsyncIterable(val: Readable | Iterable<any> | AsyncIterable<any>) {
- if (isIterable(val)) {
- return val;
- } else if (isReadable(val)) {
- return fromReadable(val as Readable);
- }
- throw new ERR_INVALID_ARG_TYPE(
- "val",
- ["Readable", "Iterable", "AsyncIterable"],
- val,
- );
-}
-
-async function* fromReadable(val: Readable) {
- yield* createReadableStreamAsyncIterator(val);
-}
-
-async function pump(
- // deno-lint-ignore no-explicit-any
- iterable: Iterable<any>,
- writable: Duplex | Writable,
- finish: (err?: NodeErrorAbstraction | null) => void,
-) {
- let error;
- try {
- for await (const chunk of iterable) {
- if (!writable.write(chunk)) {
- if (writable.destroyed) return;
- await events.once(writable, "drain");
- }
- }
- writable.end();
- } catch (err) {
- error = err;
- } finally {
- finish(error);
- }
-}
-
-export default function pipeline(...args: PipelineArguments) {
- const callback: EndCallback = once(popCallback(args));
-
- let streams: [DataSource, ...Transformers[]];
- if (args.length > 1) {
- streams = args as [DataSource, ...Transformers[]];
- } else {
- throw new ERR_MISSING_ARGS("streams");
- }
-
- let error: NodeErrorAbstraction;
- // deno-lint-ignore no-explicit-any
- let value: any;
- const destroys: Array<(err: NodeErrorAbstraction) => void> = [];
-
- let finishCount = 0;
-
- function finish(err?: NodeErrorAbstraction | null) {
- const final = --finishCount === 0;
-
- if (err && (!error || error.code === "ERR_STREAM_PREMATURE_CLOSE")) {
- error = err;
- }
-
- if (!error && !final) {
- return;
- }
-
- while (destroys.length) {
- (destroys.shift() as (err: NodeErrorAbstraction) => void)(error);
- }
-
- if (final) {
- callback(error, value);
- }
- }
-
- // TODO(Soremwar)
- // Simplify the hell out of this
- // deno-lint-ignore no-explicit-any
- let ret: any;
- for (let i = 0; i < streams.length; i++) {
- const stream = streams[i];
- const reading = i < streams.length - 1;
- const writing = i > 0;
-
- if (isStream(stream)) {
- finishCount++;
- destroys.push(destroyer(stream as Streams, reading, writing, finish));
- }
-
- if (i === 0) {
- if (typeof stream === "function") {
- ret = stream();
- if (!isIterable(ret)) {
- throw new ERR_INVALID_RETURN_VALUE(
- "Iterable, AsyncIterable or Stream",
- "source",
- ret,
- );
- }
- } else if (isIterable(stream) || isReadable(stream)) {
- ret = stream;
- } else {
- throw new ERR_INVALID_ARG_TYPE(
- "source",
- ["Stream", "Iterable", "AsyncIterable", "Function"],
- stream,
- );
- }
- } else if (typeof stream === "function") {
- ret = makeAsyncIterable(ret);
- ret = stream(ret);
-
- if (reading) {
- if (!isIterable(ret, true)) {
- throw new ERR_INVALID_RETURN_VALUE(
- "AsyncIterable",
- `transform[${i - 1}]`,
- ret,
- );
- }
- } else {
- // If the last argument to pipeline is not a stream
- // we must create a proxy stream so that pipeline(...)
- // always returns a stream which can be further
- // composed through `.pipe(stream)`.
- const pt = new PassThrough({
- objectMode: true,
- });
- if (ret instanceof Promise) {
- ret
- .then((val) => {
- value = val;
- pt.end(val);
- }, (err) => {
- pt.destroy(err);
- });
- } else if (isIterable(ret, true)) {
- finishCount++;
- pump(ret, pt, finish);
- } else {
- throw new ERR_INVALID_RETURN_VALUE(
- "AsyncIterable or Promise",
- "destination",
- ret,
- );
- }
-
- ret = pt;
-
- finishCount++;
- destroys.push(destroyer(ret, false, true, finish));
- }
- } else if (isStream(stream)) {
- if (isReadable(ret)) {
- ret.pipe(stream as Readable);
-
- // TODO(Soremwar)
- // Reimplement after stdout and stderr are implemented
- // if (stream === process.stdout || stream === process.stderr) {
- // ret.on("end", () => stream.end());
- // }
- } else {
- ret = makeAsyncIterable(ret);
-
- finishCount++;
- pump(ret, stream as Writable, finish);
- }
- ret = stream;
- } else {
- const name = reading ? `transform[${i - 1}]` : "destination";
- throw new ERR_INVALID_ARG_TYPE(
- name,
- ["Stream", "Function"],
- ret,
- );
- }
- }
-
- return ret as unknown as Readable;
-}