summaryrefslogtreecommitdiff
path: root/std/node/_stream/pipeline.ts
diff options
context:
space:
mode:
authorSteven Guerrero <stephenguerrero43@gmail.com>2020-11-26 07:50:08 -0500
committerGitHub <noreply@github.com>2020-11-26 13:50:08 +0100
commit9042fcc12e7774cdd0ca3a5d08918a07dae8102b (patch)
tree8b5ff11412aae9bb714e0bb0b9b0358db64a8657 /std/node/_stream/pipeline.ts
parent60e980c78180ee3b0a14d692307be275dc181c8d (diff)
feat(std/node/stream): Add Duplex, Transform, Passthrough, pipeline, finished and promises (#7940)
Diffstat (limited to 'std/node/_stream/pipeline.ts')
-rw-r--r--std/node/_stream/pipeline.ts308
1 files changed, 308 insertions, 0 deletions
diff --git a/std/node/_stream/pipeline.ts b/std/node/_stream/pipeline.ts
new file mode 100644
index 000000000..d02a92870
--- /dev/null
+++ b/std/node/_stream/pipeline.ts
@@ -0,0 +1,308 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { once } from "../_utils.ts";
+import { destroyer as implDestroyer } from "./destroy.ts";
+import eos from "./end_of_stream.ts";
+import createReadableStreamAsyncIterator from "./async_iterator.ts";
+import * as events from "../events.ts";
+import PassThrough from "./passthrough.ts";
+import {
+ ERR_INVALID_ARG_TYPE,
+ ERR_INVALID_CALLBACK,
+ ERR_INVALID_RETURN_VALUE,
+ ERR_MISSING_ARGS,
+ ERR_STREAM_DESTROYED,
+ NodeErrorAbstraction,
+} from "../_errors.ts";
+import type Duplex from "./duplex.ts";
+import type Readable from "./readable.ts";
+import type Stream from "./stream.ts";
+import type Transform from "./transform.ts";
+import type Writable from "./writable.ts";
+
+type Streams = Duplex | Readable | Writable;
+// deno-lint-ignore no-explicit-any
+type EndCallback = (err?: NodeErrorAbstraction | null, val?: any) => void;
+type TransformCallback =
+ // deno-lint-ignore no-explicit-any
+ | ((value?: any) => AsyncGenerator<any>)
+ // deno-lint-ignore no-explicit-any
+ | ((value?: any) => Promise<any>);
+/**
+ * This type represents an array that contains a data source,
+ * many Transform Streams, a writable stream destination
+ * and end in an optional callback
+ * */
+type DataSource =
+ // deno-lint-ignore no-explicit-any
+ | (() => AsyncGenerator<any>)
+ | // deno-lint-ignore no-explicit-any
+ AsyncIterable<any>
+ | Duplex
+ | // deno-lint-ignore no-explicit-any
+ Iterable<any>
+ | // deno-lint-ignore no-explicit-any
+ (() => Generator<any>)
+ | Readable;
+type Transformers = Duplex | Transform | TransformCallback | Writable;
+export type PipelineArguments = [
+ DataSource,
+ ...Array<Transformers | EndCallback>,
+];
+
+function destroyer(
+ stream: Streams,
+ reading: boolean,
+ writing: boolean,
+ callback: EndCallback,
+) {
+ callback = once(callback);
+
+ let finished = false;
+ stream.on("close", () => {
+ finished = true;
+ });
+
+ eos(stream, { readable: reading, writable: writing }, (err) => {
+ finished = !err;
+
+ // deno-lint-ignore no-explicit-any
+ const rState = (stream as any)?._readableState;
+ if (
+ err &&
+ err.code === "ERR_STREAM_PREMATURE_CLOSE" &&
+ reading &&
+ (rState?.ended && !rState?.errored && !rState?.errorEmitted)
+ ) {
+ stream
+ .once("end", callback)
+ .once("error", callback);
+ } else {
+ callback(err);
+ }
+ });
+
+ return (err: NodeErrorAbstraction) => {
+ if (finished) return;
+ finished = true;
+ implDestroyer(stream, err);
+ callback(err || new ERR_STREAM_DESTROYED("pipe"));
+ };
+}
+
+function popCallback(streams: PipelineArguments): EndCallback {
+ if (typeof streams[streams.length - 1] !== "function") {
+ throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]);
+ }
+ return streams.pop() as EndCallback;
+}
+
+// function isPromise(obj) {
+// return !!(obj && typeof obj.then === "function");
+// }
+
+// deno-lint-ignore no-explicit-any
+function isReadable(obj: any): obj is Stream {
+ return !!(obj && typeof obj.pipe === "function");
+}
+
+// deno-lint-ignore no-explicit-any
+function isWritable(obj: any) {
+ return !!(obj && typeof obj.write === "function");
+}
+
+// deno-lint-ignore no-explicit-any
+function isStream(obj: any) {
+ return isReadable(obj) || isWritable(obj);
+}
+
+// deno-lint-ignore no-explicit-any
+function isIterable(obj: any, isAsync?: boolean) {
+ if (!obj) return false;
+ if (isAsync === true) return typeof obj[Symbol.asyncIterator] === "function";
+ if (isAsync === false) return typeof obj[Symbol.iterator] === "function";
+ return typeof obj[Symbol.asyncIterator] === "function" ||
+ typeof obj[Symbol.iterator] === "function";
+}
+
+// deno-lint-ignore no-explicit-any
+function makeAsyncIterable(val: Readable | Iterable<any> | AsyncIterable<any>) {
+ if (isIterable(val)) {
+ return val;
+ } else if (isReadable(val)) {
+ return fromReadable(val as Readable);
+ }
+ throw new ERR_INVALID_ARG_TYPE(
+ "val",
+ ["Readable", "Iterable", "AsyncIterable"],
+ val,
+ );
+}
+
+async function* fromReadable(val: Readable) {
+ yield* createReadableStreamAsyncIterator(val);
+}
+
+async function pump(
+ // deno-lint-ignore no-explicit-any
+ iterable: Iterable<any>,
+ writable: Duplex | Writable,
+ finish: (err?: NodeErrorAbstraction | null) => void,
+) {
+ let error;
+ try {
+ for await (const chunk of iterable) {
+ if (!writable.write(chunk)) {
+ if (writable.destroyed) return;
+ await events.once(writable, "drain");
+ }
+ }
+ writable.end();
+ } catch (err) {
+ error = err;
+ } finally {
+ finish(error);
+ }
+}
+
+export default function pipeline(...args: PipelineArguments) {
+ const callback: EndCallback = once(popCallback(args));
+
+ let streams: [DataSource, ...Transformers[]];
+ if (args.length > 1) {
+ streams = args as [DataSource, ...Transformers[]];
+ } else {
+ throw new ERR_MISSING_ARGS("streams");
+ }
+
+ let error: NodeErrorAbstraction;
+ // deno-lint-ignore no-explicit-any
+ let value: any;
+ const destroys: Array<(err: NodeErrorAbstraction) => void> = [];
+
+ let finishCount = 0;
+
+ function finish(err?: NodeErrorAbstraction | null) {
+ const final = --finishCount === 0;
+
+ if (err && (!error || error.code === "ERR_STREAM_PREMATURE_CLOSE")) {
+ error = err;
+ }
+
+ if (!error && !final) {
+ return;
+ }
+
+ while (destroys.length) {
+ (destroys.shift() as (err: NodeErrorAbstraction) => void)(error);
+ }
+
+ if (final) {
+ callback(error, value);
+ }
+ }
+
+ // TODO(Soremwar)
+ // Simplify the hell out of this
+ // deno-lint-ignore no-explicit-any
+ let ret: any;
+ for (let i = 0; i < streams.length; i++) {
+ const stream = streams[i];
+ const reading = i < streams.length - 1;
+ const writing = i > 0;
+
+ if (isStream(stream)) {
+ finishCount++;
+ destroys.push(destroyer(stream as Streams, reading, writing, finish));
+ }
+
+ if (i === 0) {
+ if (typeof stream === "function") {
+ ret = stream();
+ if (!isIterable(ret)) {
+ throw new ERR_INVALID_RETURN_VALUE(
+ "Iterable, AsyncIterable or Stream",
+ "source",
+ ret,
+ );
+ }
+ } else if (isIterable(stream) || isReadable(stream)) {
+ ret = stream;
+ } else {
+ throw new ERR_INVALID_ARG_TYPE(
+ "source",
+ ["Stream", "Iterable", "AsyncIterable", "Function"],
+ stream,
+ );
+ }
+ } else if (typeof stream === "function") {
+ ret = makeAsyncIterable(ret);
+ ret = stream(ret);
+
+ if (reading) {
+ if (!isIterable(ret, true)) {
+ throw new ERR_INVALID_RETURN_VALUE(
+ "AsyncIterable",
+ `transform[${i - 1}]`,
+ ret,
+ );
+ }
+ } else {
+ // If the last argument to pipeline is not a stream
+ // we must create a proxy stream so that pipeline(...)
+ // always returns a stream which can be further
+ // composed through `.pipe(stream)`.
+ const pt = new PassThrough({
+ objectMode: true,
+ });
+ if (ret instanceof Promise) {
+ ret
+ .then((val) => {
+ value = val;
+ pt.end(val);
+ }, (err) => {
+ pt.destroy(err);
+ });
+ } else if (isIterable(ret, true)) {
+ finishCount++;
+ pump(ret, pt, finish);
+ } else {
+ throw new ERR_INVALID_RETURN_VALUE(
+ "AsyncIterable or Promise",
+ "destination",
+ ret,
+ );
+ }
+
+ ret = pt;
+
+ finishCount++;
+ destroys.push(destroyer(ret, false, true, finish));
+ }
+ } else if (isStream(stream)) {
+ if (isReadable(ret)) {
+ ret.pipe(stream as Readable);
+
+ // TODO(Soremwar)
+ // Reimplement after stdout and stderr are implemented
+ // if (stream === process.stdout || stream === process.stderr) {
+ // ret.on("end", () => stream.end());
+ // }
+ } else {
+ ret = makeAsyncIterable(ret);
+
+ finishCount++;
+ pump(ret, stream as Writable, finish);
+ }
+ ret = stream;
+ } else {
+ const name = reading ? `transform[${i - 1}]` : "destination";
+ throw new ERR_INVALID_ARG_TYPE(
+ name,
+ ["Stream", "Function"],
+ ret,
+ );
+ }
+ }
+
+ return ret as unknown as Readable;
+}