summaryrefslogtreecommitdiff
path: root/std/node/_stream/transform.ts
diff options
context:
space:
mode:
authorSteven Guerrero <stephenguerrero43@gmail.com>2020-11-26 07:50:08 -0500
committerGitHub <noreply@github.com>2020-11-26 13:50:08 +0100
commit9042fcc12e7774cdd0ca3a5d08918a07dae8102b (patch)
tree8b5ff11412aae9bb714e0bb0b9b0358db64a8657 /std/node/_stream/transform.ts
parent60e980c78180ee3b0a14d692307be275dc181c8d (diff)
feat(std/node/stream): Add Duplex, Transform, Passthrough, pipeline, finished and promises (#7940)
Diffstat (limited to 'std/node/_stream/transform.ts')
-rw-r--r--std/node/_stream/transform.ts132
1 files changed, 132 insertions, 0 deletions
diff --git a/std/node/_stream/transform.ts b/std/node/_stream/transform.ts
new file mode 100644
index 000000000..a4246e81a
--- /dev/null
+++ b/std/node/_stream/transform.ts
@@ -0,0 +1,132 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Encodings } from "../_utils.ts";
+import Duplex from "./duplex.ts";
+import type { DuplexOptions } from "./duplex.ts";
+import type { writeV } from "./writable_internal.ts";
+import { ERR_METHOD_NOT_IMPLEMENTED } from "../_errors.ts";
+
+const kCallback = Symbol("kCallback");
+
+type TransformFlush = (
+ this: Transform,
+ // deno-lint-ignore no-explicit-any
+ callback: (error?: Error | null, data?: any) => void,
+) => void;
+
+export interface TransformOptions extends DuplexOptions {
+ read?(this: Transform, size: number): void;
+ write?(
+ this: Transform,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: Encodings,
+ callback: (error?: Error | null) => void,
+ ): void;
+ writev?: writeV;
+ final?(this: Transform, callback: (error?: Error | null) => void): void;
+ destroy?(
+ this: Transform,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ): void;
+ transform?(
+ this: Transform,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: Encodings,
+ // deno-lint-ignore no-explicit-any
+ callback: (error?: Error | null, data?: any) => void,
+ ): void;
+ flush?: TransformFlush;
+}
+
+export default class Transform extends Duplex {
+ [kCallback]: null | ((error?: Error | null) => void);
+ _flush?: TransformFlush;
+
+ constructor(options?: TransformOptions) {
+ super(options);
+ this._readableState.sync = false;
+
+ this[kCallback] = null;
+
+ if (options) {
+ if (typeof options.transform === "function") {
+ this._transform = options.transform;
+ }
+
+ if (typeof options.flush === "function") {
+ this._flush = options.flush;
+ }
+ }
+
+ this.on("prefinish", function (this: Transform) {
+ if (typeof this._flush === "function" && !this.destroyed) {
+ this._flush((er, data) => {
+ if (er) {
+ this.destroy(er);
+ return;
+ }
+
+ if (data != null) {
+ this.push(data);
+ }
+ this.push(null);
+ });
+ } else {
+ this.push(null);
+ }
+ });
+ }
+
+ _read = () => {
+ if (this[kCallback]) {
+ const callback = this[kCallback] as (error?: Error | null) => void;
+ this[kCallback] = null;
+ callback();
+ }
+ };
+
+ _transform(
+ // deno-lint-ignore no-explicit-any
+ _chunk: any,
+ _encoding: string,
+ // deno-lint-ignore no-explicit-any
+ _callback: (error?: Error | null, data?: any) => void,
+ ) {
+ throw new ERR_METHOD_NOT_IMPLEMENTED("_transform()");
+ }
+
+ _write = (
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ callback: (error?: Error | null) => void,
+ ) => {
+ const rState = this._readableState;
+ const wState = this._writableState;
+ const length = rState.length;
+
+ this._transform(chunk, encoding, (err, val) => {
+ if (err) {
+ callback(err);
+ return;
+ }
+
+ if (val != null) {
+ this.push(val);
+ }
+
+ if (
+ wState.ended || // Backwards compat.
+ length === rState.length || // Backwards compat.
+ rState.length < rState.highWaterMark ||
+ rState.length === 0
+ ) {
+ callback();
+ } else {
+ this[kCallback] = callback;
+ }
+ });
+ };
+}