summaryrefslogtreecommitdiff
path: root/std/node/_stream/duplex.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/duplex.ts')
-rw-r--r--std/node/_stream/duplex.ts682
1 files changed, 0 insertions, 682 deletions
diff --git a/std/node/_stream/duplex.ts b/std/node/_stream/duplex.ts
deleted file mode 100644
index b5c429f0a..000000000
--- a/std/node/_stream/duplex.ts
+++ /dev/null
@@ -1,682 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { captureRejectionSymbol } from "../events.ts";
-import Readable, { ReadableState } from "./readable.ts";
-import Stream from "./stream.ts";
-import Writable, { WritableState } from "./writable.ts";
-import { Buffer } from "../buffer.ts";
-import {
- ERR_STREAM_ALREADY_FINISHED,
- ERR_STREAM_DESTROYED,
- ERR_UNKNOWN_ENCODING,
-} from "../_errors.ts";
-import type { Encodings } from "../_utils.ts";
-import createReadableStreamAsyncIterator from "./async_iterator.ts";
-import type { ReadableStreamAsyncIterator } from "./async_iterator.ts";
-import {
- _destroy,
- computeNewHighWaterMark,
- emitReadable,
- fromList,
- howMuchToRead,
- nReadingNextTick,
- updateReadableListening,
-} from "./readable_internal.ts";
-import { kOnFinished, writeV } from "./writable_internal.ts";
-import {
- endDuplex,
- finishMaybe,
- onwrite,
- readableAddChunk,
-} from "./duplex_internal.ts";
-export { errorOrDestroy } from "./duplex_internal.ts";
-
-export interface DuplexOptions {
- allowHalfOpen?: boolean;
- autoDestroy?: boolean;
- decodeStrings?: boolean;
- defaultEncoding?: Encodings;
- destroy?(
- this: Duplex,
- error: Error | null,
- callback: (error: Error | null) => void,
- ): void;
- emitClose?: boolean;
- encoding?: Encodings;
- final?(this: Duplex, callback: (error?: Error | null) => void): void;
- highWaterMark?: number;
- objectMode?: boolean;
- read?(this: Duplex, size: number): void;
- readable?: boolean;
- readableHighWaterMark?: number;
- readableObjectMode?: boolean;
- writable?: boolean;
- writableCorked?: number;
- writableHighWaterMark?: number;
- writableObjectMode?: boolean;
- write?(
- this: Duplex,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: Encodings,
- callback: (error?: Error | null) => void,
- ): void;
- writev?: writeV;
-}
-
-interface Duplex extends Readable, Writable {}
-
-/**
- * A duplex is an implementation of a stream that has both Readable and Writable
- * attributes and capabilities
- */
-class Duplex extends Stream {
- allowHalfOpen = true;
- _final?: (
- callback: (error?: Error | null | undefined) => void,
- ) => void;
- _readableState: ReadableState;
- _writableState: WritableState;
- _writev?: writeV | null;
-
- constructor(options?: DuplexOptions) {
- super();
-
- if (options) {
- if (options.allowHalfOpen === false) {
- this.allowHalfOpen = false;
- }
- if (typeof options.destroy === "function") {
- this._destroy = options.destroy;
- }
- if (typeof options.final === "function") {
- this._final = options.final;
- }
- if (typeof options.read === "function") {
- this._read = options.read;
- }
- if (options.readable === false) {
- this.readable = false;
- }
- if (options.writable === false) {
- this.writable = false;
- }
- if (typeof options.write === "function") {
- this._write = options.write;
- }
- if (typeof options.writev === "function") {
- this._writev = options.writev;
- }
- }
-
- const readableOptions = {
- autoDestroy: options?.autoDestroy,
- defaultEncoding: options?.defaultEncoding,
- destroy: options?.destroy as unknown as (
- this: Readable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ) => void,
- emitClose: options?.emitClose,
- encoding: options?.encoding,
- highWaterMark: options?.highWaterMark ?? options?.readableHighWaterMark,
- objectMode: options?.objectMode ?? options?.readableObjectMode,
- read: options?.read as unknown as (this: Readable) => void,
- };
-
- const writableOptions = {
- autoDestroy: options?.autoDestroy,
- decodeStrings: options?.decodeStrings,
- defaultEncoding: options?.defaultEncoding,
- destroy: options?.destroy as unknown as (
- this: Writable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ) => void,
- emitClose: options?.emitClose,
- final: options?.final as unknown as (
- this: Writable,
- callback: (error?: Error | null) => void,
- ) => void,
- highWaterMark: options?.highWaterMark ?? options?.writableHighWaterMark,
- objectMode: options?.objectMode ?? options?.writableObjectMode,
- write: options?.write as unknown as (
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- callback: (error?: Error | null) => void,
- ) => void,
- writev: options?.writev as unknown as (
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: Encodings }>,
- callback: (error?: Error | null) => void,
- ) => void,
- };
-
- this._readableState = new ReadableState(readableOptions);
- this._writableState = new WritableState(
- writableOptions,
- this as unknown as Writable,
- );
- //Very important to override onwrite here, duplex implementation adds a check
- //on the readable side
- this._writableState.onwrite = onwrite.bind(undefined, this);
- }
-
- [captureRejectionSymbol](err?: Error) {
- this.destroy(err);
- }
-
- [Symbol.asyncIterator](): ReadableStreamAsyncIterator {
- return createReadableStreamAsyncIterator(this);
- }
-
- _destroy(
- error: Error | null,
- callback: (error?: Error | null) => void,
- ): void {
- callback(error);
- }
-
- _read = Readable.prototype._read;
-
- _undestroy = Readable.prototype._undestroy;
-
- destroy(err?: Error | null, cb?: (error?: Error | null) => void) {
- const r = this._readableState;
- const w = this._writableState;
-
- if (w.destroyed || r.destroyed) {
- if (typeof cb === "function") {
- cb();
- }
-
- return this;
- }
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- w.destroyed = true;
- r.destroyed = true;
-
- this._destroy(err || null, (err) => {
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- w.closed = true;
- r.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- const r = this._readableState;
- const w = this._writableState;
-
- if (!w.errorEmitted && !r.errorEmitted) {
- w.errorEmitted = true;
- r.errorEmitted = true;
-
- this.emit("error", err);
- }
-
- r.closeEmitted = true;
-
- if (w.emitClose || r.emitClose) {
- this.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- const r = this._readableState;
- const w = this._writableState;
-
- r.closeEmitted = true;
-
- if (w.emitClose || r.emitClose) {
- this.emit("close");
- }
- });
- }
- });
-
- return this;
- }
-
- isPaused = Readable.prototype.isPaused;
-
- off = this.removeListener;
-
- on(
- event: "close" | "end" | "pause" | "readable" | "resume",
- listener: () => void,
- ): this;
- // deno-lint-ignore no-explicit-any
- on(event: "data", listener: (chunk: any) => void): this;
- on(event: "error", listener: (err: Error) => void): this;
- // deno-lint-ignore no-explicit-any
- on(event: string | symbol, listener: (...args: any[]) => void): this;
- on(
- ev: string | symbol,
- fn:
- | (() => void)
- // deno-lint-ignore no-explicit-any
- | ((chunk: any) => void)
- | ((err: Error) => void)
- // deno-lint-ignore no-explicit-any
- | ((...args: any[]) => void),
- ) {
- const res = super.on.call(this, ev, fn);
- const state = this._readableState;
-
- if (ev === "data") {
- state.readableListening = this.listenerCount("readable") > 0;
-
- if (state.flowing !== false) {
- this.resume();
- }
- } else if (ev === "readable") {
- if (!state.endEmitted && !state.readableListening) {
- state.readableListening = state.needReadable = true;
- state.flowing = false;
- state.emittedReadable = false;
- if (state.length) {
- emitReadable(this);
- } else if (!state.reading) {
- queueMicrotask(() => nReadingNextTick(this));
- }
- }
- }
-
- return res;
- }
-
- pause = Readable.prototype.pause as () => this;
-
- pipe = Readable.prototype.pipe;
-
- // deno-lint-ignore no-explicit-any
- push(chunk: any, encoding?: Encodings): boolean {
- return readableAddChunk(this, chunk, encoding, false);
- }
-
- /** You can override either this method, or the async `_read` method */
- read(n?: number) {
- // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
- // in this scenario, so we are doing it manually.
- if (n === undefined) {
- n = NaN;
- }
- const state = this._readableState;
- const nOrig = n;
-
- if (n > state.highWaterMark) {
- state.highWaterMark = computeNewHighWaterMark(n);
- }
-
- if (n !== 0) {
- state.emittedReadable = false;
- }
-
- if (
- n === 0 &&
- state.needReadable &&
- ((state.highWaterMark !== 0
- ? state.length >= state.highWaterMark
- : state.length > 0) ||
- state.ended)
- ) {
- if (state.length === 0 && state.ended) {
- endDuplex(this);
- } else {
- emitReadable(this);
- }
- return null;
- }
-
- n = howMuchToRead(n, state);
-
- if (n === 0 && state.ended) {
- if (state.length === 0) {
- endDuplex(this);
- }
- return null;
- }
-
- let doRead = state.needReadable;
- if (
- state.length === 0 || state.length - (n as number) < state.highWaterMark
- ) {
- doRead = true;
- }
-
- if (
- state.ended || state.reading || state.destroyed || state.errored ||
- !state.constructed
- ) {
- doRead = false;
- } else if (doRead) {
- state.reading = true;
- state.sync = true;
- if (state.length === 0) {
- state.needReadable = true;
- }
- this._read();
- state.sync = false;
- if (!state.reading) {
- n = howMuchToRead(nOrig, state);
- }
- }
-
- let ret;
- if ((n as number) > 0) {
- ret = fromList((n as number), state);
- } else {
- ret = null;
- }
-
- if (ret === null) {
- state.needReadable = state.length <= state.highWaterMark;
- n = 0;
- } else {
- state.length -= n as number;
- if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).clear();
- } else {
- state.awaitDrainWriters = null;
- }
- }
-
- if (state.length === 0) {
- if (!state.ended) {
- state.needReadable = true;
- }
-
- if (nOrig !== n && state.ended) {
- endDuplex(this);
- }
- }
-
- if (ret !== null) {
- this.emit("data", ret);
- }
-
- return ret;
- }
-
- removeAllListeners(
- ev:
- | "close"
- | "data"
- | "end"
- | "error"
- | "pause"
- | "readable"
- | "resume"
- | symbol
- | undefined,
- ) {
- const res = super.removeAllListeners(ev);
-
- if (ev === "readable" || ev === undefined) {
- queueMicrotask(() => updateReadableListening(this));
- }
-
- return res;
- }
-
- removeListener(
- event: "close" | "end" | "pause" | "readable" | "resume",
- listener: () => void,
- ): this;
- // deno-lint-ignore no-explicit-any
- removeListener(event: "data", listener: (chunk: any) => void): this;
- removeListener(event: "error", listener: (err: Error) => void): this;
- removeListener(
- event: string | symbol,
- // deno-lint-ignore no-explicit-any
- listener: (...args: any[]) => void,
- ): this;
- removeListener(
- ev: string | symbol,
- fn:
- | (() => void)
- // deno-lint-ignore no-explicit-any
- | ((chunk: any) => void)
- | ((err: Error) => void)
- // deno-lint-ignore no-explicit-any
- | ((...args: any[]) => void),
- ) {
- const res = super.removeListener.call(this, ev, fn);
-
- if (ev === "readable") {
- queueMicrotask(() => updateReadableListening(this));
- }
-
- return res;
- }
-
- resume = Readable.prototype.resume as () => this;
-
- setEncoding = Readable.prototype.setEncoding as (enc: string) => this;
-
- // deno-lint-ignore no-explicit-any
- unshift(chunk: any, encoding?: Encodings): boolean {
- return readableAddChunk(this, chunk, encoding, true);
- }
-
- unpipe = Readable.prototype.unpipe as (dest?: Writable | undefined) => this;
-
- wrap = Readable.prototype.wrap as (stream: Stream) => this;
-
- get readable(): boolean {
- return this._readableState?.readable &&
- !this._readableState?.destroyed &&
- !this._readableState?.errorEmitted &&
- !this._readableState?.endEmitted;
- }
- set readable(val: boolean) {
- if (this._readableState) {
- this._readableState.readable = val;
- }
- }
-
- get readableHighWaterMark(): number {
- return this._readableState.highWaterMark;
- }
-
- get readableBuffer() {
- return this._readableState && this._readableState.buffer;
- }
-
- get readableFlowing(): boolean | null {
- return this._readableState.flowing;
- }
-
- set readableFlowing(state: boolean | null) {
- if (this._readableState) {
- this._readableState.flowing = state;
- }
- }
-
- get readableLength() {
- return this._readableState.length;
- }
-
- get readableObjectMode() {
- return this._readableState ? this._readableState.objectMode : false;
- }
-
- get readableEncoding() {
- return this._readableState ? this._readableState.encoding : null;
- }
-
- get readableEnded() {
- return this._readableState ? this._readableState.endEmitted : false;
- }
-
- _write = Writable.prototype._write;
-
- write = Writable.prototype.write;
-
- cork = Writable.prototype.cork;
-
- uncork = Writable.prototype.uncork;
-
- setDefaultEncoding(encoding: string) {
- // node::ParseEncoding() requires lower case.
- if (typeof encoding === "string") {
- encoding = encoding.toLowerCase();
- }
- if (!Buffer.isEncoding(encoding)) {
- throw new ERR_UNKNOWN_ENCODING(encoding);
- }
- this._writableState.defaultEncoding = encoding as Encodings;
- return this;
- }
-
- end(cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, encoding: Encodings, cb?: () => void): void;
-
- end(
- // deno-lint-ignore no-explicit-any
- x?: any | (() => void),
- y?: Encodings | (() => void),
- z?: () => void,
- ) {
- const state = this._writableState;
- // deno-lint-ignore no-explicit-any
- let chunk: any | null;
- let encoding: Encodings | null;
- let cb: undefined | ((error?: Error) => void);
-
- if (typeof x === "function") {
- chunk = null;
- encoding = null;
- cb = x;
- } else if (typeof y === "function") {
- chunk = x;
- encoding = null;
- cb = y;
- } else {
- chunk = x;
- encoding = y as Encodings;
- cb = z;
- }
-
- if (chunk !== null && chunk !== undefined) {
- this.write(chunk, encoding);
- }
-
- if (state.corked) {
- state.corked = 1;
- this.uncork();
- }
-
- let err: Error | undefined;
- if (!state.errored && !state.ending) {
- state.ending = true;
- finishMaybe(this, state, true);
- state.ended = true;
- } else if (state.finished) {
- err = new ERR_STREAM_ALREADY_FINISHED("end");
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("end");
- }
-
- if (typeof cb === "function") {
- if (err || state.finished) {
- queueMicrotask(() => {
- (cb as (error?: Error | undefined) => void)(err);
- });
- } else {
- state[kOnFinished].push(cb);
- }
- }
-
- return this;
- }
-
- get destroyed() {
- if (
- this._readableState === undefined ||
- this._writableState === undefined
- ) {
- return false;
- }
- return this._readableState.destroyed && this._writableState.destroyed;
- }
-
- set destroyed(value: boolean) {
- if (this._readableState && this._writableState) {
- this._readableState.destroyed = value;
- this._writableState.destroyed = value;
- }
- }
-
- get writable() {
- const w = this._writableState;
- return !w.destroyed && !w.errored && !w.ending && !w.ended;
- }
-
- set writable(val) {
- if (this._writableState) {
- this._writableState.writable = !!val;
- }
- }
-
- get writableFinished() {
- return this._writableState ? this._writableState.finished : false;
- }
-
- get writableObjectMode() {
- return this._writableState ? this._writableState.objectMode : false;
- }
-
- get writableBuffer() {
- return this._writableState && this._writableState.getBuffer();
- }
-
- get writableEnded() {
- return this._writableState ? this._writableState.ending : false;
- }
-
- get writableHighWaterMark() {
- return this._writableState && this._writableState.highWaterMark;
- }
-
- get writableCorked() {
- return this._writableState ? this._writableState.corked : 0;
- }
-
- get writableLength() {
- return this._writableState && this._writableState.length;
- }
-}
-
-export default Duplex;