diff options
Diffstat (limited to 'std/node/_stream/readable.ts')
-rw-r--r-- | std/node/_stream/readable.ts | 788 |
1 files changed, 0 insertions, 788 deletions
diff --git a/std/node/_stream/readable.ts b/std/node/_stream/readable.ts deleted file mode 100644 index 54e0d8ecd..000000000 --- a/std/node/_stream/readable.ts +++ /dev/null @@ -1,788 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { captureRejectionSymbol } from "../events.ts"; -import Stream from "./stream.ts"; -import type { Buffer } from "../buffer.ts"; -import BufferList from "./buffer_list.ts"; -import { - ERR_INVALID_OPT_VALUE, - ERR_METHOD_NOT_IMPLEMENTED, -} from "../_errors.ts"; -import type { Encodings } from "../_utils.ts"; -import { StringDecoder } from "../string_decoder.ts"; -import createReadableStreamAsyncIterator from "./async_iterator.ts"; -import streamFrom from "./from.ts"; -import { kDestroy, kPaused } from "./symbols.ts"; -import { - _destroy, - computeNewHighWaterMark, - emitReadable, - endReadable, - errorOrDestroy, - fromList, - howMuchToRead, - nReadingNextTick, - pipeOnDrain, - prependListener, - readableAddChunk, - resume, - updateReadableListening, -} from "./readable_internal.ts"; -import Writable from "./writable.ts"; -import { errorOrDestroy as errorOrDestroyWritable } from "./writable_internal.ts"; -import Duplex, { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts"; - -export interface ReadableOptions { - autoDestroy?: boolean; - defaultEncoding?: Encodings; - destroy?( - this: Readable, - error: Error | null, - callback: (error: Error | null) => void, - ): void; - emitClose?: boolean; - encoding?: Encodings; - highWaterMark?: number; - objectMode?: boolean; - read?(this: Readable): void; -} - -export class ReadableState { - [kPaused]: boolean | null = null; - awaitDrainWriters: Duplex | Writable | Set<Duplex | Writable> | null = null; - buffer = new BufferList(); - closed = false; - closeEmitted = false; - constructed: boolean; - decoder: StringDecoder | null = null; - destroyed = false; - emittedReadable = false; - encoding: Encodings | null = null; - ended = false; - endEmitted = false; - errored: Error | null = null; - errorEmitted = false; - flowing: boolean | null = null; - highWaterMark: number; - length = 0; - multiAwaitDrain = false; - needReadable = false; - objectMode: boolean; - pipes: Array<Duplex | Writable> = []; - readable = true; - readableListening = false; - reading = false; - readingMore = false; - resumeScheduled = false; - sync = true; - emitClose: boolean; - autoDestroy: boolean; - defaultEncoding: string; - - constructor(options?: ReadableOptions) { - this.objectMode = !!options?.objectMode; - - this.highWaterMark = options?.highWaterMark ?? - (this.objectMode ? 16 : 16 * 1024); - if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) { - this.highWaterMark = Math.floor(this.highWaterMark); - } else { - throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark); - } - - this.emitClose = options?.emitClose ?? true; - this.autoDestroy = options?.autoDestroy ?? true; - this.defaultEncoding = options?.defaultEncoding || "utf8"; - - if (options?.encoding) { - this.decoder = new StringDecoder(options.encoding); - this.encoding = options.encoding; - } - - this.constructed = true; - } -} - -class Readable extends Stream { - _readableState: ReadableState; - - constructor(options?: ReadableOptions) { - super(); - if (options) { - if (typeof options.read === "function") { - this._read = options.read; - } - if (typeof options.destroy === "function") { - this._destroy = options.destroy; - } - } - this._readableState = new ReadableState(options); - } - - static from( - // deno-lint-ignore no-explicit-any - iterable: Iterable<any> | AsyncIterable<any>, - opts?: ReadableOptions, - ): Readable { - return streamFrom(iterable, opts); - } - - static ReadableState = ReadableState; - - static _fromList = fromList; - - // You can override either this method, or the async _read(n) below. - read(n?: number) { - // Same as parseInt(undefined, 10), however V8 7.3 performance regressed - // in this scenario, so we are doing it manually. - if (n === undefined) { - n = NaN; - } - const state = this._readableState; - const nOrig = n; - - if (n > state.highWaterMark) { - state.highWaterMark = computeNewHighWaterMark(n); - } - - if (n !== 0) { - state.emittedReadable = false; - } - - if ( - n === 0 && - state.needReadable && - ((state.highWaterMark !== 0 - ? state.length >= state.highWaterMark - : state.length > 0) || - state.ended) - ) { - if (state.length === 0 && state.ended) { - endReadable(this); - } else { - emitReadable(this); - } - return null; - } - - n = howMuchToRead(n, state); - - if (n === 0 && state.ended) { - if (state.length === 0) { - endReadable(this); - } - return null; - } - - let doRead = state.needReadable; - if ( - state.length === 0 || state.length - (n as number) < state.highWaterMark - ) { - doRead = true; - } - - if ( - state.ended || state.reading || state.destroyed || state.errored || - !state.constructed - ) { - doRead = false; - } else if (doRead) { - state.reading = true; - state.sync = true; - if (state.length === 0) { - state.needReadable = true; - } - this._read(); - state.sync = false; - if (!state.reading) { - n = howMuchToRead(nOrig, state); - } - } - - let ret; - if ((n as number) > 0) { - ret = fromList((n as number), state); - } else { - ret = null; - } - - if (ret === null) { - state.needReadable = state.length <= state.highWaterMark; - n = 0; - } else { - state.length -= n as number; - if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Writable>).clear(); - } else { - state.awaitDrainWriters = null; - } - } - - if (state.length === 0) { - if (!state.ended) { - state.needReadable = true; - } - - if (nOrig !== n && state.ended) { - endReadable(this); - } - } - - if (ret !== null) { - this.emit("data", ret); - } - - return ret; - } - - _read(_size?: number) { - throw new ERR_METHOD_NOT_IMPLEMENTED("_read()"); - } - - pipe<T extends Duplex | Writable>(dest: T, pipeOpts?: { end?: boolean }): T { - // deno-lint-ignore no-this-alias - const src = this; - const state = this._readableState; - - if (state.pipes.length === 1) { - if (!state.multiAwaitDrain) { - state.multiAwaitDrain = true; - state.awaitDrainWriters = new Set( - state.awaitDrainWriters ? [state.awaitDrainWriters as Writable] : [], - ); - } - } - - state.pipes.push(dest); - - const doEnd = (!pipeOpts || pipeOpts.end !== false); - - //TODO(Soremwar) - //Part of doEnd condition - //In node, output/input are a duplex Stream - // && - // dest !== stdout && - // dest !== stderr - - const endFn = doEnd ? onend : unpipe; - if (state.endEmitted) { - queueMicrotask(endFn); - } else { - this.once("end", endFn); - } - - dest.on("unpipe", onunpipe); - function onunpipe(readable: Readable, unpipeInfo: { hasUnpiped: boolean }) { - if (readable === src) { - if (unpipeInfo && unpipeInfo.hasUnpiped === false) { - unpipeInfo.hasUnpiped = true; - cleanup(); - } - } - } - - function onend() { - dest.end(); - } - - let ondrain: () => void; - - let cleanedUp = false; - function cleanup() { - dest.removeListener("close", onclose); - dest.removeListener("finish", onfinish); - if (ondrain) { - dest.removeListener("drain", ondrain); - } - dest.removeListener("error", onerror); - dest.removeListener("unpipe", onunpipe); - src.removeListener("end", onend); - src.removeListener("end", unpipe); - src.removeListener("data", ondata); - - cleanedUp = true; - if ( - ondrain && state.awaitDrainWriters && - (!dest._writableState || dest._writableState.needDrain) - ) { - ondrain(); - } - } - - this.on("data", ondata); - // deno-lint-ignore no-explicit-any - function ondata(chunk: any) { - const ret = dest.write(chunk); - if (ret === false) { - if (!cleanedUp) { - if (state.pipes.length === 1 && state.pipes[0] === dest) { - state.awaitDrainWriters = dest; - state.multiAwaitDrain = false; - } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - (state.awaitDrainWriters as Set<Duplex | Writable>).add(dest); - } - src.pause(); - } - if (!ondrain) { - ondrain = pipeOnDrain(src, dest); - dest.on("drain", ondrain); - } - } - } - - function onerror(er: Error) { - unpipe(); - dest.removeListener("error", onerror); - if (dest.listenerCount("error") === 0) { - const s = dest._writableState || (dest as Duplex)._readableState; - if (s && !s.errorEmitted) { - if (dest instanceof Duplex) { - errorOrDestroyDuplex(dest as unknown as Duplex, er); - } else { - errorOrDestroyWritable(dest as Writable, er); - } - } else { - dest.emit("error", er); - } - } - } - - prependListener(dest, "error", onerror); - - function onclose() { - dest.removeListener("finish", onfinish); - unpipe(); - } - dest.once("close", onclose); - function onfinish() { - dest.removeListener("close", onclose); - unpipe(); - } - dest.once("finish", onfinish); - - function unpipe() { - src.unpipe(dest as Writable); - } - - dest.emit("pipe", this); - - if (!state.flowing) { - this.resume(); - } - - return dest; - } - - isPaused() { - return this._readableState[kPaused] === true || - this._readableState.flowing === false; - } - - setEncoding(enc: Encodings) { - const decoder = new StringDecoder(enc); - this._readableState.decoder = decoder; - this._readableState.encoding = this._readableState.decoder - .encoding as Encodings; - - const buffer = this._readableState.buffer; - let content = ""; - for (const data of buffer) { - content += decoder.write(data as Buffer); - } - buffer.clear(); - if (content !== "") { - buffer.push(content); - } - this._readableState.length = content.length; - return this; - } - - on( - event: "close" | "end" | "pause" | "readable" | "resume", - listener: () => void, - ): this; - // deno-lint-ignore no-explicit-any - on(event: "data", listener: (chunk: any) => void): this; - on(event: "error", listener: (err: Error) => void): this; - // deno-lint-ignore no-explicit-any - on(event: string | symbol, listener: (...args: any[]) => void): this; - on( - ev: string | symbol, - fn: - | (() => void) - // deno-lint-ignore no-explicit-any - | ((chunk: any) => void) - | ((err: Error) => void) - // deno-lint-ignore no-explicit-any - | ((...args: any[]) => void), - ) { - const res = super.on.call(this, ev, fn); - const state = this._readableState; - - if (ev === "data") { - state.readableListening = this.listenerCount("readable") > 0; - - if (state.flowing !== false) { - this.resume(); - } - } else if (ev === "readable") { - if (!state.endEmitted && !state.readableListening) { - state.readableListening = state.needReadable = true; - state.flowing = false; - state.emittedReadable = false; - if (state.length) { - emitReadable(this); - } else if (!state.reading) { - queueMicrotask(() => nReadingNextTick(this)); - } - } - } - - return res; - } - - removeListener( - event: "close" | "end" | "pause" | "readable" | "resume", - listener: () => void, - ): this; - // deno-lint-ignore no-explicit-any - removeListener(event: "data", listener: (chunk: any) => void): this; - removeListener(event: "error", listener: (err: Error) => void): this; - removeListener( - event: string | symbol, - // deno-lint-ignore no-explicit-any - listener: (...args: any[]) => void, - ): this; - removeListener( - ev: string | symbol, - fn: - | (() => void) - // deno-lint-ignore no-explicit-any - | ((chunk: any) => void) - | ((err: Error) => void) - // deno-lint-ignore no-explicit-any - | ((...args: any[]) => void), - ) { - const res = super.removeListener.call(this, ev, fn); - - if (ev === "readable") { - queueMicrotask(() => updateReadableListening(this)); - } - - return res; - } - - off = this.removeListener; - - destroy(err?: Error | null, cb?: () => void) { - const r = this._readableState; - - if (r.destroyed) { - if (typeof cb === "function") { - cb(); - } - - return this; - } - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!r.errored) { - r.errored = err; - } - } - - r.destroyed = true; - - // If still constructing then defer calling _destroy. - if (!r.constructed) { - this.once(kDestroy, (er: Error) => { - _destroy(this, err || er, cb); - }); - } else { - _destroy(this, err, cb); - } - - return this; - } - - _undestroy() { - const r = this._readableState; - r.constructed = true; - r.closed = false; - r.closeEmitted = false; - r.destroyed = false; - r.errored = null; - r.errorEmitted = false; - r.reading = false; - r.ended = false; - r.endEmitted = false; - } - - _destroy( - error: Error | null, - callback: (error?: Error | null) => void, - ): void { - callback(error); - } - - [captureRejectionSymbol](err: Error) { - this.destroy(err); - } - - // deno-lint-ignore no-explicit-any - push(chunk: any, encoding?: Encodings): boolean { - return readableAddChunk(this, chunk, encoding, false); - } - - // deno-lint-ignore no-explicit-any - unshift(chunk: any, encoding?: string): boolean { - return readableAddChunk(this, chunk, encoding, true); - } - - unpipe(dest?: Writable): this { - const state = this._readableState; - const unpipeInfo = { hasUnpiped: false }; - - if (state.pipes.length === 0) { - return this; - } - - if (!dest) { - // remove all. - const dests = state.pipes; - state.pipes = []; - this.pause(); - - for (const dest of dests) { - dest.emit("unpipe", this, { hasUnpiped: false }); - } - return this; - } - - const index = state.pipes.indexOf(dest); - if (index === -1) { - return this; - } - - state.pipes.splice(index, 1); - if (state.pipes.length === 0) { - this.pause(); - } - - dest.emit("unpipe", this, unpipeInfo); - - return this; - } - - removeAllListeners( - ev: - | "close" - | "data" - | "end" - | "error" - | "pause" - | "readable" - | "resume" - | symbol - | undefined, - ) { - const res = super.removeAllListeners(ev); - - if (ev === "readable" || ev === undefined) { - queueMicrotask(() => updateReadableListening(this)); - } - - return res; - } - - resume() { - const state = this._readableState; - if (!state.flowing) { - // We flow only if there is no one listening - // for readable, but we still have to call - // resume(). - state.flowing = !state.readableListening; - resume(this, state); - } - state[kPaused] = false; - return this; - } - - pause() { - if (this._readableState.flowing !== false) { - this._readableState.flowing = false; - this.emit("pause"); - } - this._readableState[kPaused] = true; - return this; - } - - /** Wrap an old-style stream as the async data source. */ - wrap(stream: Stream): this { - const state = this._readableState; - let paused = false; - - stream.on("end", () => { - if (state.decoder && !state.ended) { - const chunk = state.decoder.end(); - if (chunk && chunk.length) { - this.push(chunk); - } - } - - this.push(null); - }); - - stream.on("data", (chunk) => { - if (state.decoder) { - chunk = state.decoder.write(chunk); - } - - if (state.objectMode && (chunk === null || chunk === undefined)) { - return; - } else if (!state.objectMode && (!chunk || !chunk.length)) { - return; - } - - const ret = this.push(chunk); - if (!ret) { - paused = true; - // By the time this is triggered, stream will be a readable stream - // deno-lint-ignore ban-ts-comment - // @ts-ignore - stream.pause(); - } - }); - - // TODO(Soremwar) - // There must be a clean way to implement this on TypeScript - // Proxy all the other methods. Important when wrapping filters and duplexes. - for (const i in stream) { - // deno-lint-ignore ban-ts-comment - //@ts-ignore - if (this[i] === undefined && typeof stream[i] === "function") { - // deno-lint-ignore ban-ts-comment - //@ts-ignore - this[i] = function methodWrap(method) { - return function methodWrapReturnFunction() { - // deno-lint-ignore ban-ts-comment - //@ts-ignore - return stream[method].apply(stream); - }; - }(i); - } - } - - stream.on("error", (err) => { - errorOrDestroy(this, err); - }); - - stream.on("close", () => { - this.emit("close"); - }); - - stream.on("destroy", () => { - this.emit("destroy"); - }); - - stream.on("pause", () => { - this.emit("pause"); - }); - - stream.on("resume", () => { - this.emit("resume"); - }); - - this._read = () => { - if (paused) { - paused = false; - // By the time this is triggered, stream will be a readable stream - // deno-lint-ignore ban-ts-comment - //@ts-ignore - stream.resume(); - } - }; - - return this; - } - - [Symbol.asyncIterator]() { - return createReadableStreamAsyncIterator(this); - } - - get readable(): boolean { - return this._readableState?.readable && - !this._readableState?.destroyed && - !this._readableState?.errorEmitted && - !this._readableState?.endEmitted; - } - set readable(val: boolean) { - if (this._readableState) { - this._readableState.readable = val; - } - } - - get readableHighWaterMark(): number { - return this._readableState.highWaterMark; - } - - get readableBuffer() { - return this._readableState && this._readableState.buffer; - } - - get readableFlowing(): boolean | null { - return this._readableState.flowing; - } - - set readableFlowing(state: boolean | null) { - if (this._readableState) { - this._readableState.flowing = state; - } - } - - get readableLength() { - return this._readableState.length; - } - - get readableObjectMode() { - return this._readableState ? this._readableState.objectMode : false; - } - - get readableEncoding() { - return this._readableState ? this._readableState.encoding : null; - } - - get destroyed() { - if (this._readableState === undefined) { - return false; - } - return this._readableState.destroyed; - } - - set destroyed(value: boolean) { - if (!this._readableState) { - return; - } - this._readableState.destroyed = value; - } - - get readableEnded() { - return this._readableState ? this._readableState.endEmitted : false; - } -} - -Object.defineProperties(Readable, { - _readableState: { enumerable: false }, - destroyed: { enumerable: false }, - readableBuffer: { enumerable: false }, - readableEncoding: { enumerable: false }, - readableEnded: { enumerable: false }, - readableFlowing: { enumerable: false }, - readableHighWaterMark: { enumerable: false }, - readableLength: { enumerable: false }, - readableObjectMode: { enumerable: false }, -}); - -export default Readable; |