diff options
Diffstat (limited to 'std/node/_stream/duplex.ts')
-rw-r--r-- | std/node/_stream/duplex.ts | 683 |
1 files changed, 681 insertions, 2 deletions
diff --git a/std/node/_stream/duplex.ts b/std/node/_stream/duplex.ts index c5faed6f8..b5c429f0a 100644 --- a/std/node/_stream/duplex.ts +++ b/std/node/_stream/duplex.ts @@ -1,3 +1,682 @@ // Copyright Node.js contributors. All rights reserved. MIT License. -// deno-lint-ignore no-explicit-any -export const errorOrDestroy = (...args: any[]) => {}; +import { captureRejectionSymbol } from "../events.ts"; +import Readable, { ReadableState } from "./readable.ts"; +import Stream from "./stream.ts"; +import Writable, { WritableState } from "./writable.ts"; +import { Buffer } from "../buffer.ts"; +import { + ERR_STREAM_ALREADY_FINISHED, + ERR_STREAM_DESTROYED, + ERR_UNKNOWN_ENCODING, +} from "../_errors.ts"; +import type { Encodings } from "../_utils.ts"; +import createReadableStreamAsyncIterator from "./async_iterator.ts"; +import type { ReadableStreamAsyncIterator } from "./async_iterator.ts"; +import { + _destroy, + computeNewHighWaterMark, + emitReadable, + fromList, + howMuchToRead, + nReadingNextTick, + updateReadableListening, +} from "./readable_internal.ts"; +import { kOnFinished, writeV } from "./writable_internal.ts"; +import { + endDuplex, + finishMaybe, + onwrite, + readableAddChunk, +} from "./duplex_internal.ts"; +export { errorOrDestroy } from "./duplex_internal.ts"; + +export interface DuplexOptions { + allowHalfOpen?: boolean; + autoDestroy?: boolean; + decodeStrings?: boolean; + defaultEncoding?: Encodings; + destroy?( + this: Duplex, + error: Error | null, + callback: (error: Error | null) => void, + ): void; + emitClose?: boolean; + encoding?: Encodings; + final?(this: Duplex, callback: (error?: Error | null) => void): void; + highWaterMark?: number; + objectMode?: boolean; + read?(this: Duplex, size: number): void; + readable?: boolean; + readableHighWaterMark?: number; + readableObjectMode?: boolean; + writable?: boolean; + writableCorked?: number; + writableHighWaterMark?: number; + writableObjectMode?: boolean; + write?( + this: Duplex, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: Encodings, + callback: (error?: Error | null) => void, + ): void; + writev?: writeV; +} + +interface Duplex extends Readable, Writable {} + +/** + * A duplex is an implementation of a stream that has both Readable and Writable + * attributes and capabilities + */ +class Duplex extends Stream { + allowHalfOpen = true; + _final?: ( + callback: (error?: Error | null | undefined) => void, + ) => void; + _readableState: ReadableState; + _writableState: WritableState; + _writev?: writeV | null; + + constructor(options?: DuplexOptions) { + super(); + + if (options) { + if (options.allowHalfOpen === false) { + this.allowHalfOpen = false; + } + if (typeof options.destroy === "function") { + this._destroy = options.destroy; + } + if (typeof options.final === "function") { + this._final = options.final; + } + if (typeof options.read === "function") { + this._read = options.read; + } + if (options.readable === false) { + this.readable = false; + } + if (options.writable === false) { + this.writable = false; + } + if (typeof options.write === "function") { + this._write = options.write; + } + if (typeof options.writev === "function") { + this._writev = options.writev; + } + } + + const readableOptions = { + autoDestroy: options?.autoDestroy, + defaultEncoding: options?.defaultEncoding, + destroy: options?.destroy as unknown as ( + this: Readable, + error: Error | null, + callback: (error: Error | null) => void, + ) => void, + emitClose: options?.emitClose, + encoding: options?.encoding, + highWaterMark: options?.highWaterMark ?? options?.readableHighWaterMark, + objectMode: options?.objectMode ?? options?.readableObjectMode, + read: options?.read as unknown as (this: Readable) => void, + }; + + const writableOptions = { + autoDestroy: options?.autoDestroy, + decodeStrings: options?.decodeStrings, + defaultEncoding: options?.defaultEncoding, + destroy: options?.destroy as unknown as ( + this: Writable, + error: Error | null, + callback: (error: Error | null) => void, + ) => void, + emitClose: options?.emitClose, + final: options?.final as unknown as ( + this: Writable, + callback: (error?: Error | null) => void, + ) => void, + highWaterMark: options?.highWaterMark ?? options?.writableHighWaterMark, + objectMode: options?.objectMode ?? options?.writableObjectMode, + write: options?.write as unknown as ( + this: Writable, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + callback: (error?: Error | null) => void, + ) => void, + writev: options?.writev as unknown as ( + this: Writable, + // deno-lint-ignore no-explicit-any + chunks: Array<{ chunk: any; encoding: Encodings }>, + callback: (error?: Error | null) => void, + ) => void, + }; + + this._readableState = new ReadableState(readableOptions); + this._writableState = new WritableState( + writableOptions, + this as unknown as Writable, + ); + //Very important to override onwrite here, duplex implementation adds a check + //on the readable side + this._writableState.onwrite = onwrite.bind(undefined, this); + } + + [captureRejectionSymbol](err?: Error) { + this.destroy(err); + } + + [Symbol.asyncIterator](): ReadableStreamAsyncIterator { + return createReadableStreamAsyncIterator(this); + } + + _destroy( + error: Error | null, + callback: (error?: Error | null) => void, + ): void { + callback(error); + } + + _read = Readable.prototype._read; + + _undestroy = Readable.prototype._undestroy; + + destroy(err?: Error | null, cb?: (error?: Error | null) => void) { + const r = this._readableState; + const w = this._writableState; + + if (w.destroyed || r.destroyed) { + if (typeof cb === "function") { + cb(); + } + + return this; + } + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + if (!r.errored) { + r.errored = err; + } + } + + w.destroyed = true; + r.destroyed = true; + + this._destroy(err || null, (err) => { + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + if (!r.errored) { + r.errored = err; + } + } + + w.closed = true; + r.closed = true; + + if (typeof cb === "function") { + cb(err); + } + + if (err) { + queueMicrotask(() => { + const r = this._readableState; + const w = this._writableState; + + if (!w.errorEmitted && !r.errorEmitted) { + w.errorEmitted = true; + r.errorEmitted = true; + + this.emit("error", err); + } + + r.closeEmitted = true; + + if (w.emitClose || r.emitClose) { + this.emit("close"); + } + }); + } else { + queueMicrotask(() => { + const r = this._readableState; + const w = this._writableState; + + r.closeEmitted = true; + + if (w.emitClose || r.emitClose) { + this.emit("close"); + } + }); + } + }); + + return this; + } + + isPaused = Readable.prototype.isPaused; + + off = this.removeListener; + + on( + event: "close" | "end" | "pause" | "readable" | "resume", + listener: () => void, + ): this; + // deno-lint-ignore no-explicit-any + on(event: "data", listener: (chunk: any) => void): this; + on(event: "error", listener: (err: Error) => void): this; + // deno-lint-ignore no-explicit-any + on(event: string | symbol, listener: (...args: any[]) => void): this; + on( + ev: string | symbol, + fn: + | (() => void) + // deno-lint-ignore no-explicit-any + | ((chunk: any) => void) + | ((err: Error) => void) + // deno-lint-ignore no-explicit-any + | ((...args: any[]) => void), + ) { + const res = super.on.call(this, ev, fn); + const state = this._readableState; + + if (ev === "data") { + state.readableListening = this.listenerCount("readable") > 0; + + if (state.flowing !== false) { + this.resume(); + } + } else if (ev === "readable") { + if (!state.endEmitted && !state.readableListening) { + state.readableListening = state.needReadable = true; + state.flowing = false; + state.emittedReadable = false; + if (state.length) { + emitReadable(this); + } else if (!state.reading) { + queueMicrotask(() => nReadingNextTick(this)); + } + } + } + + return res; + } + + pause = Readable.prototype.pause as () => this; + + pipe = Readable.prototype.pipe; + + // deno-lint-ignore no-explicit-any + push(chunk: any, encoding?: Encodings): boolean { + return readableAddChunk(this, chunk, encoding, false); + } + + /** You can override either this method, or the async `_read` method */ + read(n?: number) { + // Same as parseInt(undefined, 10), however V8 7.3 performance regressed + // in this scenario, so we are doing it manually. + if (n === undefined) { + n = NaN; + } + const state = this._readableState; + const nOrig = n; + + if (n > state.highWaterMark) { + state.highWaterMark = computeNewHighWaterMark(n); + } + + if (n !== 0) { + state.emittedReadable = false; + } + + if ( + n === 0 && + state.needReadable && + ((state.highWaterMark !== 0 + ? state.length >= state.highWaterMark + : state.length > 0) || + state.ended) + ) { + if (state.length === 0 && state.ended) { + endDuplex(this); + } else { + emitReadable(this); + } + return null; + } + + n = howMuchToRead(n, state); + + if (n === 0 && state.ended) { + if (state.length === 0) { + endDuplex(this); + } + return null; + } + + let doRead = state.needReadable; + if ( + state.length === 0 || state.length - (n as number) < state.highWaterMark + ) { + doRead = true; + } + + if ( + state.ended || state.reading || state.destroyed || state.errored || + !state.constructed + ) { + doRead = false; + } else if (doRead) { + state.reading = true; + state.sync = true; + if (state.length === 0) { + state.needReadable = true; + } + this._read(); + state.sync = false; + if (!state.reading) { + n = howMuchToRead(nOrig, state); + } + } + + let ret; + if ((n as number) > 0) { + ret = fromList((n as number), state); + } else { + ret = null; + } + + if (ret === null) { + state.needReadable = state.length <= state.highWaterMark; + n = 0; + } else { + state.length -= n as number; + if (state.multiAwaitDrain) { + (state.awaitDrainWriters as Set<Writable>).clear(); + } else { + state.awaitDrainWriters = null; + } + } + + if (state.length === 0) { + if (!state.ended) { + state.needReadable = true; + } + + if (nOrig !== n && state.ended) { + endDuplex(this); + } + } + + if (ret !== null) { + this.emit("data", ret); + } + + return ret; + } + + removeAllListeners( + ev: + | "close" + | "data" + | "end" + | "error" + | "pause" + | "readable" + | "resume" + | symbol + | undefined, + ) { + const res = super.removeAllListeners(ev); + + if (ev === "readable" || ev === undefined) { + queueMicrotask(() => updateReadableListening(this)); + } + + return res; + } + + removeListener( + event: "close" | "end" | "pause" | "readable" | "resume", + listener: () => void, + ): this; + // deno-lint-ignore no-explicit-any + removeListener(event: "data", listener: (chunk: any) => void): this; + removeListener(event: "error", listener: (err: Error) => void): this; + removeListener( + event: string | symbol, + // deno-lint-ignore no-explicit-any + listener: (...args: any[]) => void, + ): this; + removeListener( + ev: string | symbol, + fn: + | (() => void) + // deno-lint-ignore no-explicit-any + | ((chunk: any) => void) + | ((err: Error) => void) + // deno-lint-ignore no-explicit-any + | ((...args: any[]) => void), + ) { + const res = super.removeListener.call(this, ev, fn); + + if (ev === "readable") { + queueMicrotask(() => updateReadableListening(this)); + } + + return res; + } + + resume = Readable.prototype.resume as () => this; + + setEncoding = Readable.prototype.setEncoding as (enc: string) => this; + + // deno-lint-ignore no-explicit-any + unshift(chunk: any, encoding?: Encodings): boolean { + return readableAddChunk(this, chunk, encoding, true); + } + + unpipe = Readable.prototype.unpipe as (dest?: Writable | undefined) => this; + + wrap = Readable.prototype.wrap as (stream: Stream) => this; + + get readable(): boolean { + return this._readableState?.readable && + !this._readableState?.destroyed && + !this._readableState?.errorEmitted && + !this._readableState?.endEmitted; + } + set readable(val: boolean) { + if (this._readableState) { + this._readableState.readable = val; + } + } + + get readableHighWaterMark(): number { + return this._readableState.highWaterMark; + } + + get readableBuffer() { + return this._readableState && this._readableState.buffer; + } + + get readableFlowing(): boolean | null { + return this._readableState.flowing; + } + + set readableFlowing(state: boolean | null) { + if (this._readableState) { + this._readableState.flowing = state; + } + } + + get readableLength() { + return this._readableState.length; + } + + get readableObjectMode() { + return this._readableState ? this._readableState.objectMode : false; + } + + get readableEncoding() { + return this._readableState ? this._readableState.encoding : null; + } + + get readableEnded() { + return this._readableState ? this._readableState.endEmitted : false; + } + + _write = Writable.prototype._write; + + write = Writable.prototype.write; + + cork = Writable.prototype.cork; + + uncork = Writable.prototype.uncork; + + setDefaultEncoding(encoding: string) { + // node::ParseEncoding() requires lower case. + if (typeof encoding === "string") { + encoding = encoding.toLowerCase(); + } + if (!Buffer.isEncoding(encoding)) { + throw new ERR_UNKNOWN_ENCODING(encoding); + } + this._writableState.defaultEncoding = encoding as Encodings; + return this; + } + + end(cb?: () => void): void; + // deno-lint-ignore no-explicit-any + end(chunk: any, cb?: () => void): void; + // deno-lint-ignore no-explicit-any + end(chunk: any, encoding: Encodings, cb?: () => void): void; + + end( + // deno-lint-ignore no-explicit-any + x?: any | (() => void), + y?: Encodings | (() => void), + z?: () => void, + ) { + const state = this._writableState; + // deno-lint-ignore no-explicit-any + let chunk: any | null; + let encoding: Encodings | null; + let cb: undefined | ((error?: Error) => void); + + if (typeof x === "function") { + chunk = null; + encoding = null; + cb = x; + } else if (typeof y === "function") { + chunk = x; + encoding = null; + cb = y; + } else { + chunk = x; + encoding = y as Encodings; + cb = z; + } + + if (chunk !== null && chunk !== undefined) { + this.write(chunk, encoding); + } + + if (state.corked) { + state.corked = 1; + this.uncork(); + } + + let err: Error | undefined; + if (!state.errored && !state.ending) { + state.ending = true; + finishMaybe(this, state, true); + state.ended = true; + } else if (state.finished) { + err = new ERR_STREAM_ALREADY_FINISHED("end"); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED("end"); + } + + if (typeof cb === "function") { + if (err || state.finished) { + queueMicrotask(() => { + (cb as (error?: Error | undefined) => void)(err); + }); + } else { + state[kOnFinished].push(cb); + } + } + + return this; + } + + get destroyed() { + if ( + this._readableState === undefined || + this._writableState === undefined + ) { + return false; + } + return this._readableState.destroyed && this._writableState.destroyed; + } + + set destroyed(value: boolean) { + if (this._readableState && this._writableState) { + this._readableState.destroyed = value; + this._writableState.destroyed = value; + } + } + + get writable() { + const w = this._writableState; + return !w.destroyed && !w.errored && !w.ending && !w.ended; + } + + set writable(val) { + if (this._writableState) { + this._writableState.writable = !!val; + } + } + + get writableFinished() { + return this._writableState ? this._writableState.finished : false; + } + + get writableObjectMode() { + return this._writableState ? this._writableState.objectMode : false; + } + + get writableBuffer() { + return this._writableState && this._writableState.getBuffer(); + } + + get writableEnded() { + return this._writableState ? this._writableState.ending : false; + } + + get writableHighWaterMark() { + return this._writableState && this._writableState.highWaterMark; + } + + get writableCorked() { + return this._writableState ? this._writableState.corked : 0; + } + + get writableLength() { + return this._writableState && this._writableState.length; + } +} + +export default Duplex; |