diff options
Diffstat (limited to 'std/node/_stream/pipeline.ts')
-rw-r--r-- | std/node/_stream/pipeline.ts | 308 |
1 files changed, 0 insertions, 308 deletions
diff --git a/std/node/_stream/pipeline.ts b/std/node/_stream/pipeline.ts deleted file mode 100644 index d02a92870..000000000 --- a/std/node/_stream/pipeline.ts +++ /dev/null @@ -1,308 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { once } from "../_utils.ts"; -import { destroyer as implDestroyer } from "./destroy.ts"; -import eos from "./end_of_stream.ts"; -import createReadableStreamAsyncIterator from "./async_iterator.ts"; -import * as events from "../events.ts"; -import PassThrough from "./passthrough.ts"; -import { - ERR_INVALID_ARG_TYPE, - ERR_INVALID_CALLBACK, - ERR_INVALID_RETURN_VALUE, - ERR_MISSING_ARGS, - ERR_STREAM_DESTROYED, - NodeErrorAbstraction, -} from "../_errors.ts"; -import type Duplex from "./duplex.ts"; -import type Readable from "./readable.ts"; -import type Stream from "./stream.ts"; -import type Transform from "./transform.ts"; -import type Writable from "./writable.ts"; - -type Streams = Duplex | Readable | Writable; -// deno-lint-ignore no-explicit-any -type EndCallback = (err?: NodeErrorAbstraction | null, val?: any) => void; -type TransformCallback = - // deno-lint-ignore no-explicit-any - | ((value?: any) => AsyncGenerator<any>) - // deno-lint-ignore no-explicit-any - | ((value?: any) => Promise<any>); -/** - * This type represents an array that contains a data source, - * many Transform Streams, a writable stream destination - * and end in an optional callback - * */ -type DataSource = - // deno-lint-ignore no-explicit-any - | (() => AsyncGenerator<any>) - | // deno-lint-ignore no-explicit-any - AsyncIterable<any> - | Duplex - | // deno-lint-ignore no-explicit-any - Iterable<any> - | // deno-lint-ignore no-explicit-any - (() => Generator<any>) - | Readable; -type Transformers = Duplex | Transform | TransformCallback | Writable; -export type PipelineArguments = [ - DataSource, - ...Array<Transformers | EndCallback>, -]; - -function destroyer( - stream: Streams, - reading: boolean, - writing: boolean, - callback: EndCallback, -) { - callback = once(callback); - - let finished = false; - stream.on("close", () => { - finished = true; - }); - - eos(stream, { readable: reading, writable: writing }, (err) => { - finished = !err; - - // deno-lint-ignore no-explicit-any - const rState = (stream as any)?._readableState; - if ( - err && - err.code === "ERR_STREAM_PREMATURE_CLOSE" && - reading && - (rState?.ended && !rState?.errored && !rState?.errorEmitted) - ) { - stream - .once("end", callback) - .once("error", callback); - } else { - callback(err); - } - }); - - return (err: NodeErrorAbstraction) => { - if (finished) return; - finished = true; - implDestroyer(stream, err); - callback(err || new ERR_STREAM_DESTROYED("pipe")); - }; -} - -function popCallback(streams: PipelineArguments): EndCallback { - if (typeof streams[streams.length - 1] !== "function") { - throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]); - } - return streams.pop() as EndCallback; -} - -// function isPromise(obj) { -// return !!(obj && typeof obj.then === "function"); -// } - -// deno-lint-ignore no-explicit-any -function isReadable(obj: any): obj is Stream { - return !!(obj && typeof obj.pipe === "function"); -} - -// deno-lint-ignore no-explicit-any -function isWritable(obj: any) { - return !!(obj && typeof obj.write === "function"); -} - -// deno-lint-ignore no-explicit-any -function isStream(obj: any) { - return isReadable(obj) || isWritable(obj); -} - -// deno-lint-ignore no-explicit-any -function isIterable(obj: any, isAsync?: boolean) { - if (!obj) return false; - if (isAsync === true) return typeof obj[Symbol.asyncIterator] === "function"; - if (isAsync === false) return typeof obj[Symbol.iterator] === "function"; - return typeof obj[Symbol.asyncIterator] === "function" || - typeof obj[Symbol.iterator] === "function"; -} - -// deno-lint-ignore no-explicit-any -function makeAsyncIterable(val: Readable | Iterable<any> | AsyncIterable<any>) { - if (isIterable(val)) { - return val; - } else if (isReadable(val)) { - return fromReadable(val as Readable); - } - throw new ERR_INVALID_ARG_TYPE( - "val", - ["Readable", "Iterable", "AsyncIterable"], - val, - ); -} - -async function* fromReadable(val: Readable) { - yield* createReadableStreamAsyncIterator(val); -} - -async function pump( - // deno-lint-ignore no-explicit-any - iterable: Iterable<any>, - writable: Duplex | Writable, - finish: (err?: NodeErrorAbstraction | null) => void, -) { - let error; - try { - for await (const chunk of iterable) { - if (!writable.write(chunk)) { - if (writable.destroyed) return; - await events.once(writable, "drain"); - } - } - writable.end(); - } catch (err) { - error = err; - } finally { - finish(error); - } -} - -export default function pipeline(...args: PipelineArguments) { - const callback: EndCallback = once(popCallback(args)); - - let streams: [DataSource, ...Transformers[]]; - if (args.length > 1) { - streams = args as [DataSource, ...Transformers[]]; - } else { - throw new ERR_MISSING_ARGS("streams"); - } - - let error: NodeErrorAbstraction; - // deno-lint-ignore no-explicit-any - let value: any; - const destroys: Array<(err: NodeErrorAbstraction) => void> = []; - - let finishCount = 0; - - function finish(err?: NodeErrorAbstraction | null) { - const final = --finishCount === 0; - - if (err && (!error || error.code === "ERR_STREAM_PREMATURE_CLOSE")) { - error = err; - } - - if (!error && !final) { - return; - } - - while (destroys.length) { - (destroys.shift() as (err: NodeErrorAbstraction) => void)(error); - } - - if (final) { - callback(error, value); - } - } - - // TODO(Soremwar) - // Simplify the hell out of this - // deno-lint-ignore no-explicit-any - let ret: any; - for (let i = 0; i < streams.length; i++) { - const stream = streams[i]; - const reading = i < streams.length - 1; - const writing = i > 0; - - if (isStream(stream)) { - finishCount++; - destroys.push(destroyer(stream as Streams, reading, writing, finish)); - } - - if (i === 0) { - if (typeof stream === "function") { - ret = stream(); - if (!isIterable(ret)) { - throw new ERR_INVALID_RETURN_VALUE( - "Iterable, AsyncIterable or Stream", - "source", - ret, - ); - } - } else if (isIterable(stream) || isReadable(stream)) { - ret = stream; - } else { - throw new ERR_INVALID_ARG_TYPE( - "source", - ["Stream", "Iterable", "AsyncIterable", "Function"], - stream, - ); - } - } else if (typeof stream === "function") { - ret = makeAsyncIterable(ret); - ret = stream(ret); - - if (reading) { - if (!isIterable(ret, true)) { - throw new ERR_INVALID_RETURN_VALUE( - "AsyncIterable", - `transform[${i - 1}]`, - ret, - ); - } - } else { - // If the last argument to pipeline is not a stream - // we must create a proxy stream so that pipeline(...) - // always returns a stream which can be further - // composed through `.pipe(stream)`. - const pt = new PassThrough({ - objectMode: true, - }); - if (ret instanceof Promise) { - ret - .then((val) => { - value = val; - pt.end(val); - }, (err) => { - pt.destroy(err); - }); - } else if (isIterable(ret, true)) { - finishCount++; - pump(ret, pt, finish); - } else { - throw new ERR_INVALID_RETURN_VALUE( - "AsyncIterable or Promise", - "destination", - ret, - ); - } - - ret = pt; - - finishCount++; - destroys.push(destroyer(ret, false, true, finish)); - } - } else if (isStream(stream)) { - if (isReadable(ret)) { - ret.pipe(stream as Readable); - - // TODO(Soremwar) - // Reimplement after stdout and stderr are implemented - // if (stream === process.stdout || stream === process.stderr) { - // ret.on("end", () => stream.end()); - // } - } else { - ret = makeAsyncIterable(ret); - - finishCount++; - pump(ret, stream as Writable, finish); - } - ret = stream; - } else { - const name = reading ? `transform[${i - 1}]` : "destination"; - throw new ERR_INVALID_ARG_TYPE( - name, - ["Stream", "Function"], - ret, - ); - } - } - - return ret as unknown as Readable; -} |