summaryrefslogtreecommitdiff
path: root/std/node/_stream/duplex.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/duplex.ts')
-rw-r--r--std/node/_stream/duplex.ts683
1 files changed, 681 insertions, 2 deletions
diff --git a/std/node/_stream/duplex.ts b/std/node/_stream/duplex.ts
index c5faed6f8..b5c429f0a 100644
--- a/std/node/_stream/duplex.ts
+++ b/std/node/_stream/duplex.ts
@@ -1,3 +1,682 @@
// Copyright Node.js contributors. All rights reserved. MIT License.
-// deno-lint-ignore no-explicit-any
-export const errorOrDestroy = (...args: any[]) => {};
+import { captureRejectionSymbol } from "../events.ts";
+import Readable, { ReadableState } from "./readable.ts";
+import Stream from "./stream.ts";
+import Writable, { WritableState } from "./writable.ts";
+import { Buffer } from "../buffer.ts";
+import {
+ ERR_STREAM_ALREADY_FINISHED,
+ ERR_STREAM_DESTROYED,
+ ERR_UNKNOWN_ENCODING,
+} from "../_errors.ts";
+import type { Encodings } from "../_utils.ts";
+import createReadableStreamAsyncIterator from "./async_iterator.ts";
+import type { ReadableStreamAsyncIterator } from "./async_iterator.ts";
+import {
+ _destroy,
+ computeNewHighWaterMark,
+ emitReadable,
+ fromList,
+ howMuchToRead,
+ nReadingNextTick,
+ updateReadableListening,
+} from "./readable_internal.ts";
+import { kOnFinished, writeV } from "./writable_internal.ts";
+import {
+ endDuplex,
+ finishMaybe,
+ onwrite,
+ readableAddChunk,
+} from "./duplex_internal.ts";
+export { errorOrDestroy } from "./duplex_internal.ts";
+
+export interface DuplexOptions {
+ allowHalfOpen?: boolean;
+ autoDestroy?: boolean;
+ decodeStrings?: boolean;
+ defaultEncoding?: Encodings;
+ destroy?(
+ this: Duplex,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ): void;
+ emitClose?: boolean;
+ encoding?: Encodings;
+ final?(this: Duplex, callback: (error?: Error | null) => void): void;
+ highWaterMark?: number;
+ objectMode?: boolean;
+ read?(this: Duplex, size: number): void;
+ readable?: boolean;
+ readableHighWaterMark?: number;
+ readableObjectMode?: boolean;
+ writable?: boolean;
+ writableCorked?: number;
+ writableHighWaterMark?: number;
+ writableObjectMode?: boolean;
+ write?(
+ this: Duplex,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: Encodings,
+ callback: (error?: Error | null) => void,
+ ): void;
+ writev?: writeV;
+}
+
+interface Duplex extends Readable, Writable {}
+
+/**
+ * A duplex is an implementation of a stream that has both Readable and Writable
+ * attributes and capabilities
+ */
+class Duplex extends Stream {
+ allowHalfOpen = true;
+ _final?: (
+ callback: (error?: Error | null | undefined) => void,
+ ) => void;
+ _readableState: ReadableState;
+ _writableState: WritableState;
+ _writev?: writeV | null;
+
+ constructor(options?: DuplexOptions) {
+ super();
+
+ if (options) {
+ if (options.allowHalfOpen === false) {
+ this.allowHalfOpen = false;
+ }
+ if (typeof options.destroy === "function") {
+ this._destroy = options.destroy;
+ }
+ if (typeof options.final === "function") {
+ this._final = options.final;
+ }
+ if (typeof options.read === "function") {
+ this._read = options.read;
+ }
+ if (options.readable === false) {
+ this.readable = false;
+ }
+ if (options.writable === false) {
+ this.writable = false;
+ }
+ if (typeof options.write === "function") {
+ this._write = options.write;
+ }
+ if (typeof options.writev === "function") {
+ this._writev = options.writev;
+ }
+ }
+
+ const readableOptions = {
+ autoDestroy: options?.autoDestroy,
+ defaultEncoding: options?.defaultEncoding,
+ destroy: options?.destroy as unknown as (
+ this: Readable,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ) => void,
+ emitClose: options?.emitClose,
+ encoding: options?.encoding,
+ highWaterMark: options?.highWaterMark ?? options?.readableHighWaterMark,
+ objectMode: options?.objectMode ?? options?.readableObjectMode,
+ read: options?.read as unknown as (this: Readable) => void,
+ };
+
+ const writableOptions = {
+ autoDestroy: options?.autoDestroy,
+ decodeStrings: options?.decodeStrings,
+ defaultEncoding: options?.defaultEncoding,
+ destroy: options?.destroy as unknown as (
+ this: Writable,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ) => void,
+ emitClose: options?.emitClose,
+ final: options?.final as unknown as (
+ this: Writable,
+ callback: (error?: Error | null) => void,
+ ) => void,
+ highWaterMark: options?.highWaterMark ?? options?.writableHighWaterMark,
+ objectMode: options?.objectMode ?? options?.writableObjectMode,
+ write: options?.write as unknown as (
+ this: Writable,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ callback: (error?: Error | null) => void,
+ ) => void,
+ writev: options?.writev as unknown as (
+ this: Writable,
+ // deno-lint-ignore no-explicit-any
+ chunks: Array<{ chunk: any; encoding: Encodings }>,
+ callback: (error?: Error | null) => void,
+ ) => void,
+ };
+
+ this._readableState = new ReadableState(readableOptions);
+ this._writableState = new WritableState(
+ writableOptions,
+ this as unknown as Writable,
+ );
+ //Very important to override onwrite here, duplex implementation adds a check
+ //on the readable side
+ this._writableState.onwrite = onwrite.bind(undefined, this);
+ }
+
+ [captureRejectionSymbol](err?: Error) {
+ this.destroy(err);
+ }
+
+ [Symbol.asyncIterator](): ReadableStreamAsyncIterator {
+ return createReadableStreamAsyncIterator(this);
+ }
+
+ _destroy(
+ error: Error | null,
+ callback: (error?: Error | null) => void,
+ ): void {
+ callback(error);
+ }
+
+ _read = Readable.prototype._read;
+
+ _undestroy = Readable.prototype._undestroy;
+
+ destroy(err?: Error | null, cb?: (error?: Error | null) => void) {
+ const r = this._readableState;
+ const w = this._writableState;
+
+ if (w.destroyed || r.destroyed) {
+ if (typeof cb === "function") {
+ cb();
+ }
+
+ return this;
+ }
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ if (!r.errored) {
+ r.errored = err;
+ }
+ }
+
+ w.destroyed = true;
+ r.destroyed = true;
+
+ this._destroy(err || null, (err) => {
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ if (!r.errored) {
+ r.errored = err;
+ }
+ }
+
+ w.closed = true;
+ r.closed = true;
+
+ if (typeof cb === "function") {
+ cb(err);
+ }
+
+ if (err) {
+ queueMicrotask(() => {
+ const r = this._readableState;
+ const w = this._writableState;
+
+ if (!w.errorEmitted && !r.errorEmitted) {
+ w.errorEmitted = true;
+ r.errorEmitted = true;
+
+ this.emit("error", err);
+ }
+
+ r.closeEmitted = true;
+
+ if (w.emitClose || r.emitClose) {
+ this.emit("close");
+ }
+ });
+ } else {
+ queueMicrotask(() => {
+ const r = this._readableState;
+ const w = this._writableState;
+
+ r.closeEmitted = true;
+
+ if (w.emitClose || r.emitClose) {
+ this.emit("close");
+ }
+ });
+ }
+ });
+
+ return this;
+ }
+
+ isPaused = Readable.prototype.isPaused;
+
+ off = this.removeListener;
+
+ on(
+ event: "close" | "end" | "pause" | "readable" | "resume",
+ listener: () => void,
+ ): this;
+ // deno-lint-ignore no-explicit-any
+ on(event: "data", listener: (chunk: any) => void): this;
+ on(event: "error", listener: (err: Error) => void): this;
+ // deno-lint-ignore no-explicit-any
+ on(event: string | symbol, listener: (...args: any[]) => void): this;
+ on(
+ ev: string | symbol,
+ fn:
+ | (() => void)
+ // deno-lint-ignore no-explicit-any
+ | ((chunk: any) => void)
+ | ((err: Error) => void)
+ // deno-lint-ignore no-explicit-any
+ | ((...args: any[]) => void),
+ ) {
+ const res = super.on.call(this, ev, fn);
+ const state = this._readableState;
+
+ if (ev === "data") {
+ state.readableListening = this.listenerCount("readable") > 0;
+
+ if (state.flowing !== false) {
+ this.resume();
+ }
+ } else if (ev === "readable") {
+ if (!state.endEmitted && !state.readableListening) {
+ state.readableListening = state.needReadable = true;
+ state.flowing = false;
+ state.emittedReadable = false;
+ if (state.length) {
+ emitReadable(this);
+ } else if (!state.reading) {
+ queueMicrotask(() => nReadingNextTick(this));
+ }
+ }
+ }
+
+ return res;
+ }
+
+ pause = Readable.prototype.pause as () => this;
+
+ pipe = Readable.prototype.pipe;
+
+ // deno-lint-ignore no-explicit-any
+ push(chunk: any, encoding?: Encodings): boolean {
+ return readableAddChunk(this, chunk, encoding, false);
+ }
+
+ /** You can override either this method, or the async `_read` method */
+ read(n?: number) {
+ // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
+ // in this scenario, so we are doing it manually.
+ if (n === undefined) {
+ n = NaN;
+ }
+ const state = this._readableState;
+ const nOrig = n;
+
+ if (n > state.highWaterMark) {
+ state.highWaterMark = computeNewHighWaterMark(n);
+ }
+
+ if (n !== 0) {
+ state.emittedReadable = false;
+ }
+
+ if (
+ n === 0 &&
+ state.needReadable &&
+ ((state.highWaterMark !== 0
+ ? state.length >= state.highWaterMark
+ : state.length > 0) ||
+ state.ended)
+ ) {
+ if (state.length === 0 && state.ended) {
+ endDuplex(this);
+ } else {
+ emitReadable(this);
+ }
+ return null;
+ }
+
+ n = howMuchToRead(n, state);
+
+ if (n === 0 && state.ended) {
+ if (state.length === 0) {
+ endDuplex(this);
+ }
+ return null;
+ }
+
+ let doRead = state.needReadable;
+ if (
+ state.length === 0 || state.length - (n as number) < state.highWaterMark
+ ) {
+ doRead = true;
+ }
+
+ if (
+ state.ended || state.reading || state.destroyed || state.errored ||
+ !state.constructed
+ ) {
+ doRead = false;
+ } else if (doRead) {
+ state.reading = true;
+ state.sync = true;
+ if (state.length === 0) {
+ state.needReadable = true;
+ }
+ this._read();
+ state.sync = false;
+ if (!state.reading) {
+ n = howMuchToRead(nOrig, state);
+ }
+ }
+
+ let ret;
+ if ((n as number) > 0) {
+ ret = fromList((n as number), state);
+ } else {
+ ret = null;
+ }
+
+ if (ret === null) {
+ state.needReadable = state.length <= state.highWaterMark;
+ n = 0;
+ } else {
+ state.length -= n as number;
+ if (state.multiAwaitDrain) {
+ (state.awaitDrainWriters as Set<Writable>).clear();
+ } else {
+ state.awaitDrainWriters = null;
+ }
+ }
+
+ if (state.length === 0) {
+ if (!state.ended) {
+ state.needReadable = true;
+ }
+
+ if (nOrig !== n && state.ended) {
+ endDuplex(this);
+ }
+ }
+
+ if (ret !== null) {
+ this.emit("data", ret);
+ }
+
+ return ret;
+ }
+
+ removeAllListeners(
+ ev:
+ | "close"
+ | "data"
+ | "end"
+ | "error"
+ | "pause"
+ | "readable"
+ | "resume"
+ | symbol
+ | undefined,
+ ) {
+ const res = super.removeAllListeners(ev);
+
+ if (ev === "readable" || ev === undefined) {
+ queueMicrotask(() => updateReadableListening(this));
+ }
+
+ return res;
+ }
+
+ removeListener(
+ event: "close" | "end" | "pause" | "readable" | "resume",
+ listener: () => void,
+ ): this;
+ // deno-lint-ignore no-explicit-any
+ removeListener(event: "data", listener: (chunk: any) => void): this;
+ removeListener(event: "error", listener: (err: Error) => void): this;
+ removeListener(
+ event: string | symbol,
+ // deno-lint-ignore no-explicit-any
+ listener: (...args: any[]) => void,
+ ): this;
+ removeListener(
+ ev: string | symbol,
+ fn:
+ | (() => void)
+ // deno-lint-ignore no-explicit-any
+ | ((chunk: any) => void)
+ | ((err: Error) => void)
+ // deno-lint-ignore no-explicit-any
+ | ((...args: any[]) => void),
+ ) {
+ const res = super.removeListener.call(this, ev, fn);
+
+ if (ev === "readable") {
+ queueMicrotask(() => updateReadableListening(this));
+ }
+
+ return res;
+ }
+
+ resume = Readable.prototype.resume as () => this;
+
+ setEncoding = Readable.prototype.setEncoding as (enc: string) => this;
+
+ // deno-lint-ignore no-explicit-any
+ unshift(chunk: any, encoding?: Encodings): boolean {
+ return readableAddChunk(this, chunk, encoding, true);
+ }
+
+ unpipe = Readable.prototype.unpipe as (dest?: Writable | undefined) => this;
+
+ wrap = Readable.prototype.wrap as (stream: Stream) => this;
+
+ get readable(): boolean {
+ return this._readableState?.readable &&
+ !this._readableState?.destroyed &&
+ !this._readableState?.errorEmitted &&
+ !this._readableState?.endEmitted;
+ }
+ set readable(val: boolean) {
+ if (this._readableState) {
+ this._readableState.readable = val;
+ }
+ }
+
+ get readableHighWaterMark(): number {
+ return this._readableState.highWaterMark;
+ }
+
+ get readableBuffer() {
+ return this._readableState && this._readableState.buffer;
+ }
+
+ get readableFlowing(): boolean | null {
+ return this._readableState.flowing;
+ }
+
+ set readableFlowing(state: boolean | null) {
+ if (this._readableState) {
+ this._readableState.flowing = state;
+ }
+ }
+
+ get readableLength() {
+ return this._readableState.length;
+ }
+
+ get readableObjectMode() {
+ return this._readableState ? this._readableState.objectMode : false;
+ }
+
+ get readableEncoding() {
+ return this._readableState ? this._readableState.encoding : null;
+ }
+
+ get readableEnded() {
+ return this._readableState ? this._readableState.endEmitted : false;
+ }
+
+ _write = Writable.prototype._write;
+
+ write = Writable.prototype.write;
+
+ cork = Writable.prototype.cork;
+
+ uncork = Writable.prototype.uncork;
+
+ setDefaultEncoding(encoding: string) {
+ // node::ParseEncoding() requires lower case.
+ if (typeof encoding === "string") {
+ encoding = encoding.toLowerCase();
+ }
+ if (!Buffer.isEncoding(encoding)) {
+ throw new ERR_UNKNOWN_ENCODING(encoding);
+ }
+ this._writableState.defaultEncoding = encoding as Encodings;
+ return this;
+ }
+
+ end(cb?: () => void): void;
+ // deno-lint-ignore no-explicit-any
+ end(chunk: any, cb?: () => void): void;
+ // deno-lint-ignore no-explicit-any
+ end(chunk: any, encoding: Encodings, cb?: () => void): void;
+
+ end(
+ // deno-lint-ignore no-explicit-any
+ x?: any | (() => void),
+ y?: Encodings | (() => void),
+ z?: () => void,
+ ) {
+ const state = this._writableState;
+ // deno-lint-ignore no-explicit-any
+ let chunk: any | null;
+ let encoding: Encodings | null;
+ let cb: undefined | ((error?: Error) => void);
+
+ if (typeof x === "function") {
+ chunk = null;
+ encoding = null;
+ cb = x;
+ } else if (typeof y === "function") {
+ chunk = x;
+ encoding = null;
+ cb = y;
+ } else {
+ chunk = x;
+ encoding = y as Encodings;
+ cb = z;
+ }
+
+ if (chunk !== null && chunk !== undefined) {
+ this.write(chunk, encoding);
+ }
+
+ if (state.corked) {
+ state.corked = 1;
+ this.uncork();
+ }
+
+ let err: Error | undefined;
+ if (!state.errored && !state.ending) {
+ state.ending = true;
+ finishMaybe(this, state, true);
+ state.ended = true;
+ } else if (state.finished) {
+ err = new ERR_STREAM_ALREADY_FINISHED("end");
+ } else if (state.destroyed) {
+ err = new ERR_STREAM_DESTROYED("end");
+ }
+
+ if (typeof cb === "function") {
+ if (err || state.finished) {
+ queueMicrotask(() => {
+ (cb as (error?: Error | undefined) => void)(err);
+ });
+ } else {
+ state[kOnFinished].push(cb);
+ }
+ }
+
+ return this;
+ }
+
+ get destroyed() {
+ if (
+ this._readableState === undefined ||
+ this._writableState === undefined
+ ) {
+ return false;
+ }
+ return this._readableState.destroyed && this._writableState.destroyed;
+ }
+
+ set destroyed(value: boolean) {
+ if (this._readableState && this._writableState) {
+ this._readableState.destroyed = value;
+ this._writableState.destroyed = value;
+ }
+ }
+
+ get writable() {
+ const w = this._writableState;
+ return !w.destroyed && !w.errored && !w.ending && !w.ended;
+ }
+
+ set writable(val) {
+ if (this._writableState) {
+ this._writableState.writable = !!val;
+ }
+ }
+
+ get writableFinished() {
+ return this._writableState ? this._writableState.finished : false;
+ }
+
+ get writableObjectMode() {
+ return this._writableState ? this._writableState.objectMode : false;
+ }
+
+ get writableBuffer() {
+ return this._writableState && this._writableState.getBuffer();
+ }
+
+ get writableEnded() {
+ return this._writableState ? this._writableState.ending : false;
+ }
+
+ get writableHighWaterMark() {
+ return this._writableState && this._writableState.highWaterMark;
+ }
+
+ get writableCorked() {
+ return this._writableState ? this._writableState.corked : 0;
+ }
+
+ get writableLength() {
+ return this._writableState && this._writableState.length;
+ }
+}
+
+export default Duplex;