diff options
author | Steven Guerrero <stephenguerrero43@gmail.com> | 2020-11-26 07:50:08 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-26 13:50:08 +0100 |
commit | 9042fcc12e7774cdd0ca3a5d08918a07dae8102b (patch) | |
tree | 8b5ff11412aae9bb714e0bb0b9b0358db64a8657 /std/node/_stream/transform.ts | |
parent | 60e980c78180ee3b0a14d692307be275dc181c8d (diff) |
feat(std/node/stream): Add Duplex, Transform, Passthrough, pipeline, finished and promises (#7940)
Diffstat (limited to 'std/node/_stream/transform.ts')
-rw-r--r-- | std/node/_stream/transform.ts | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/std/node/_stream/transform.ts b/std/node/_stream/transform.ts new file mode 100644 index 000000000..a4246e81a --- /dev/null +++ b/std/node/_stream/transform.ts @@ -0,0 +1,132 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Encodings } from "../_utils.ts"; +import Duplex from "./duplex.ts"; +import type { DuplexOptions } from "./duplex.ts"; +import type { writeV } from "./writable_internal.ts"; +import { ERR_METHOD_NOT_IMPLEMENTED } from "../_errors.ts"; + +const kCallback = Symbol("kCallback"); + +type TransformFlush = ( + this: Transform, + // deno-lint-ignore no-explicit-any + callback: (error?: Error | null, data?: any) => void, +) => void; + +export interface TransformOptions extends DuplexOptions { + read?(this: Transform, size: number): void; + write?( + this: Transform, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: Encodings, + callback: (error?: Error | null) => void, + ): void; + writev?: writeV; + final?(this: Transform, callback: (error?: Error | null) => void): void; + destroy?( + this: Transform, + error: Error | null, + callback: (error: Error | null) => void, + ): void; + transform?( + this: Transform, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: Encodings, + // deno-lint-ignore no-explicit-any + callback: (error?: Error | null, data?: any) => void, + ): void; + flush?: TransformFlush; +} + +export default class Transform extends Duplex { + [kCallback]: null | ((error?: Error | null) => void); + _flush?: TransformFlush; + + constructor(options?: TransformOptions) { + super(options); + this._readableState.sync = false; + + this[kCallback] = null; + + if (options) { + if (typeof options.transform === "function") { + this._transform = options.transform; + } + + if (typeof options.flush === "function") { + this._flush = options.flush; + } + } + + this.on("prefinish", function (this: Transform) { + if (typeof this._flush === "function" && !this.destroyed) { + this._flush((er, data) => { + if (er) { + this.destroy(er); + return; + } + + if (data != null) { + this.push(data); + } + this.push(null); + }); + } else { + this.push(null); + } + }); + } + + _read = () => { + if (this[kCallback]) { + const callback = this[kCallback] as (error?: Error | null) => void; + this[kCallback] = null; + callback(); + } + }; + + _transform( + // deno-lint-ignore no-explicit-any + _chunk: any, + _encoding: string, + // deno-lint-ignore no-explicit-any + _callback: (error?: Error | null, data?: any) => void, + ) { + throw new ERR_METHOD_NOT_IMPLEMENTED("_transform()"); + } + + _write = ( + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + callback: (error?: Error | null) => void, + ) => { + const rState = this._readableState; + const wState = this._writableState; + const length = rState.length; + + this._transform(chunk, encoding, (err, val) => { + if (err) { + callback(err); + return; + } + + if (val != null) { + this.push(val); + } + + if ( + wState.ended || // Backwards compat. + length === rState.length || // Backwards compat. + rState.length < rState.highWaterMark || + rState.length === 0 + ) { + callback(); + } else { + this[kCallback] = callback; + } + }); + }; +} |