diff options
Diffstat (limited to 'std/node/_stream/readable.ts')
-rw-r--r-- | std/node/_stream/readable.ts | 546 |
1 files changed, 43 insertions, 503 deletions
diff --git a/std/node/_stream/readable.ts b/std/node/_stream/readable.ts index 72e61dff7..c8ed29953 100644 --- a/std/node/_stream/readable.ts +++ b/std/node/_stream/readable.ts @@ -1,492 +1,46 @@ // Copyright Node.js contributors. All rights reserved. MIT License. -import EventEmitter, { captureRejectionSymbol } from "../events.ts"; +import { captureRejectionSymbol } from "../events.ts"; import Stream from "./stream.ts"; -import { Buffer } from "../buffer.ts"; +import type { Buffer } from "../buffer.ts"; import BufferList from "./buffer_list.ts"; import { ERR_INVALID_OPT_VALUE, ERR_METHOD_NOT_IMPLEMENTED, - ERR_MULTIPLE_CALLBACK, - ERR_STREAM_PUSH_AFTER_EOF, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT, } from "../_errors.ts"; +import type { Encodings } from "../_utils.ts"; import { StringDecoder } from "../string_decoder.ts"; import createReadableStreamAsyncIterator from "./async_iterator.ts"; import streamFrom from "./from.ts"; -import { kConstruct, kDestroy, kPaused } from "./symbols.ts"; -import type Writable from "./writable.ts"; -import { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts"; - -function construct(stream: Readable, cb: () => void) { - const r = stream._readableState; - - if (!stream._construct) { - return; - } - - stream.once(kConstruct, cb); - - r.constructed = false; - - queueMicrotask(() => { - let called = false; - stream._construct?.((err?: Error) => { - r.constructed = true; - - if (called) { - err = new ERR_MULTIPLE_CALLBACK(); - } else { - called = true; - } - - if (r.destroyed) { - stream.emit(kDestroy, err); - } else if (err) { - errorOrDestroy(stream, err, true); - } else { - queueMicrotask(() => stream.emit(kConstruct)); - } - }); - }); -} - -function _destroy( - self: Readable, - err?: Error, - cb?: (error?: Error | null) => void, -) { - self._destroy(err || null, (err) => { - const r = (self as Readable)._readableState; - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!r.errored) { - r.errored = err; - } - } - - r.closed = true; - - if (typeof cb === "function") { - cb(err); - } - - if (err) { - queueMicrotask(() => { - if (!r.errorEmitted) { - r.errorEmitted = true; - self.emit("error", err); - } - r.closeEmitted = true; - if (r.emitClose) { - self.emit("close"); - } - }); - } else { - queueMicrotask(() => { - r.closeEmitted = true; - if (r.emitClose) { - self.emit("close"); - } - }); - } - }); -} - -function errorOrDestroy(stream: Readable, err: Error, sync = false) { - const r = stream._readableState; - - if (r.destroyed) { - return stream; - } - - if (r.autoDestroy) { - stream.destroy(err); - } else if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!r.errored) { - r.errored = err; - } - if (sync) { - queueMicrotask(() => { - if (!r.errorEmitted) { - r.errorEmitted = true; - stream.emit("error", err); - } - }); - } else if (!r.errorEmitted) { - r.errorEmitted = true; - stream.emit("error", err); - } - } -} - -function flow(stream: Readable) { - const state = stream._readableState; - while (state.flowing && stream.read() !== null); -} - -function pipeOnDrain(src: Readable, dest: Writable) { - return function pipeOnDrainFunctionResult() { - const state = src._readableState; - - if (state.awaitDrainWriters === dest) { - state.awaitDrainWriters = null; - } else if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Writable>).delete(dest); - } - - if ( - (!state.awaitDrainWriters || - (state.awaitDrainWriters as Set<Writable>).size === 0) && - src.listenerCount("data") - ) { - state.flowing = true; - flow(src); - } - }; -} - -function updateReadableListening(self: Readable) { - const state = self._readableState; - state.readableListening = self.listenerCount("readable") > 0; - - if (state.resumeScheduled && state[kPaused] === false) { - // Flowing needs to be set to true now, otherwise - // the upcoming resume will not flow. - state.flowing = true; - - // Crude way to check if we should resume. - } else if (self.listenerCount("data") > 0) { - self.resume(); - } else if (!state.readableListening) { - state.flowing = null; - } -} - -function nReadingNextTick(self: Readable) { - self.read(0); -} - -function resume(stream: Readable, state: ReadableState) { - if (!state.resumeScheduled) { - state.resumeScheduled = true; - queueMicrotask(() => resume_(stream, state)); - } -} - -function resume_(stream: Readable, state: ReadableState) { - if (!state.reading) { - stream.read(0); - } - - state.resumeScheduled = false; - stream.emit("resume"); - flow(stream); - if (state.flowing && !state.reading) { - stream.read(0); - } -} - -function readableAddChunk( - stream: Readable, - chunk: string | Buffer | Uint8Array | null, - encoding: undefined | string = undefined, - addToFront: boolean, -) { - const state = stream._readableState; - let usedEncoding = encoding; - - let err; - if (!state.objectMode) { - if (typeof chunk === "string") { - usedEncoding = encoding || state.defaultEncoding; - if (state.encoding !== usedEncoding) { - if (addToFront && state.encoding) { - chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding); - } else { - chunk = Buffer.from(chunk, usedEncoding); - usedEncoding = ""; - } - } - } else if (chunk instanceof Uint8Array) { - chunk = Buffer.from(chunk); - } - } - - if (err) { - errorOrDestroy(stream, err); - } else if (chunk === null) { - state.reading = false; - onEofChunk(stream, state); - } else if (state.objectMode || (chunk.length > 0)) { - if (addToFront) { - if (state.endEmitted) { - errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - } else { - addChunk(stream, state, chunk, true); - } - } else if (state.ended) { - errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state.destroyed || state.errored) { - return false; - } else { - state.reading = false; - if (state.decoder && !usedEncoding) { - //TODO(Soremwar) - //I don't think this cast is right - chunk = state.decoder.write(Buffer.from(chunk as Uint8Array)); - if (state.objectMode || chunk.length !== 0) { - addChunk(stream, state, chunk, false); - } else { - maybeReadMore(stream, state); - } - } else { - addChunk(stream, state, chunk, false); - } - } - } else if (!addToFront) { - state.reading = false; - maybeReadMore(stream, state); - } - - return !state.ended && - (state.length < state.highWaterMark || state.length === 0); -} - -function addChunk( - stream: Readable, - state: ReadableState, - chunk: string | Buffer | Uint8Array, - addToFront: boolean, -) { - if (state.flowing && state.length === 0 && !state.sync) { - if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Writable>).clear(); - } else { - state.awaitDrainWriters = null; - } - stream.emit("data", chunk); - } else { - // Update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - if (addToFront) { - state.buffer.unshift(chunk); - } else { - state.buffer.push(chunk); - } - - if (state.needReadable) { - emitReadable(stream); - } - } - maybeReadMore(stream, state); -} - -function prependListener( - emitter: EventEmitter, - event: string, - // deno-lint-ignore no-explicit-any - fn: (...args: any[]) => any, -) { - if (typeof emitter.prependListener === "function") { - return emitter.prependListener(event, fn); - } - - // This is a hack to make sure that our error handler is attached before any - // userland ones. NEVER DO THIS. This is here only because this code needs - // to continue to work with older versions of Node.js that do not include - //the prependListener() method. The goal is to eventually remove this hack. - // TODO(Soremwar) - // Burn it with fire - // deno-lint-ignore ban-ts-comment - //@ts-ignore - if (emitter._events.get(event)?.length) { - // deno-lint-ignore ban-ts-comment - //@ts-ignore - const listeners = [fn, ...emitter._events.get(event)]; - // deno-lint-ignore ban-ts-comment - //@ts-ignore - emitter._events.set(event, listeners); - } else { - emitter.on(event, fn); - } -} - -/** Pluck off n bytes from an array of buffers. -* Length is the combined lengths of all the buffers in the list. -* This function is designed to be inlinable, so please take care when making -* changes to the function body. -*/ -function fromList(n: number, state: ReadableState) { - // nothing buffered. - if (state.length === 0) { - return null; - } - - let ret; - if (state.objectMode) { - ret = state.buffer.shift(); - } else if (!n || n >= state.length) { - if (state.decoder) { - ret = state.buffer.join(""); - } else if (state.buffer.length === 1) { - ret = state.buffer.first(); - } else { - ret = state.buffer.concat(state.length); - } - state.buffer.clear(); - } else { - ret = state.buffer.consume(n, !!state.decoder); - } - - return ret; -} - -function endReadable(stream: Readable) { - const state = stream._readableState; - - if (!state.endEmitted) { - state.ended = true; - queueMicrotask(() => endReadableNT(state, stream)); - } -} - -function endReadableNT(state: ReadableState, stream: Readable) { - if ( - !state.errorEmitted && !state.closeEmitted && - !state.endEmitted && state.length === 0 - ) { - state.endEmitted = true; - stream.emit("end"); - - if (state.autoDestroy) { - stream.destroy(); - } - } -} - -// Don't raise the hwm > 1GB. -const MAX_HWM = 0x40000000; -function computeNewHighWaterMark(n: number) { - if (n >= MAX_HWM) { - n = MAX_HWM; - } else { - n--; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - n++; - } - return n; -} - -function howMuchToRead(n: number, state: ReadableState) { - if (n <= 0 || (state.length === 0 && state.ended)) { - return 0; - } - if (state.objectMode) { - return 1; - } - if (Number.isNaN(n)) { - // Only flow one buffer at a time. - if (state.flowing && state.length) { - return state.buffer.first().length; - } - return state.length; - } - if (n <= state.length) { - return n; - } - return state.ended ? state.length : 0; -} - -function onEofChunk(stream: Readable, state: ReadableState) { - if (state.ended) return; - if (state.decoder) { - const chunk = state.decoder.end(); - if (chunk && chunk.length) { - state.buffer.push(chunk); - state.length += state.objectMode ? 1 : chunk.length; - } - } - state.ended = true; - - if (state.sync) { - emitReadable(stream); - } else { - state.needReadable = false; - state.emittedReadable = true; - emitReadable_(stream); - } -} - -function emitReadable(stream: Readable) { - const state = stream._readableState; - state.needReadable = false; - if (!state.emittedReadable) { - state.emittedReadable = true; - queueMicrotask(() => emitReadable_(stream)); - } -} - -function emitReadable_(stream: Readable) { - const state = stream._readableState; - if (!state.destroyed && !state.errored && (state.length || state.ended)) { - stream.emit("readable"); - state.emittedReadable = false; - } - - state.needReadable = !state.flowing && - !state.ended && - state.length <= state.highWaterMark; - flow(stream); -} - -function maybeReadMore(stream: Readable, state: ReadableState) { - if (!state.readingMore && state.constructed) { - state.readingMore = true; - queueMicrotask(() => maybeReadMore_(stream, state)); - } -} - -function maybeReadMore_(stream: Readable, state: ReadableState) { - while ( - !state.reading && !state.ended && - (state.length < state.highWaterMark || - (state.flowing && state.length === 0)) - ) { - const len = state.length; - stream.read(0); - if (len === state.length) { - // Didn't get any data, stop spinning. - break; - } - } - state.readingMore = false; -} +import { kDestroy, kPaused } from "./symbols.ts"; +import { + _destroy, + computeNewHighWaterMark, + emitReadable, + endReadable, + errorOrDestroy, + fromList, + howMuchToRead, + nReadingNextTick, + pipeOnDrain, + prependListener, + readableAddChunk, + resume, + updateReadableListening, +} from "./readable_internal.ts"; +import Writable from "./writable.ts"; +import { errorOrDestroy as errorOrDestroyWritable } from "./writable_internal.ts"; +import Duplex, { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts"; export interface ReadableOptions { autoDestroy?: boolean; - construct?: () => void; - //TODO(Soremwar) - //Import available encodings - defaultEncoding?: string; + defaultEncoding?: Encodings; destroy?( this: Readable, error: Error | null, callback: (error: Error | null) => void, ): void; emitClose?: boolean; - //TODO(Soremwar) - //Import available encodings - encoding?: string; + encoding?: Encodings; highWaterMark?: number; objectMode?: boolean; read?(this: Readable): void; @@ -494,7 +48,7 @@ export interface ReadableOptions { export class ReadableState { [kPaused]: boolean | null = null; - awaitDrainWriters: Writable | Set<Writable> | null = null; + awaitDrainWriters: Duplex | Writable | Set<Duplex | Writable> | null = null; buffer = new BufferList(); closed = false; closeEmitted = false; @@ -502,9 +56,7 @@ export class ReadableState { decoder: StringDecoder | null = null; destroyed = false; emittedReadable = false; - //TODO(Soremwar) - //Import available encodings - encoding: string | null = null; + encoding: Encodings | null = null; ended = false; endEmitted = false; errored: Error | null = null; @@ -515,7 +67,7 @@ export class ReadableState { multiAwaitDrain = false; needReadable = false; objectMode: boolean; - pipes: Writable[] = []; + pipes: Array<Duplex | Writable> = []; readable = true; readableListening = false; reading = false; @@ -551,7 +103,6 @@ export class ReadableState { } class Readable extends Stream { - _construct?: (cb: (error?: Error) => void) => void; _readableState: ReadableState; constructor(options?: ReadableOptions) { @@ -563,15 +114,8 @@ class Readable extends Stream { if (typeof options.destroy === "function") { this._destroy = options.destroy; } - if (typeof options.construct === "function") { - this._construct = options.construct; - } } this._readableState = new ReadableState(options); - - construct(this, () => { - maybeReadMore(this, this._readableState); - }); } static from( @@ -690,13 +234,11 @@ class Readable extends Stream { return ret; } - _read() { + _read(_size?: number) { throw new ERR_METHOD_NOT_IMPLEMENTED("_read()"); } - //TODO(Soremwar) - //Should be duplex - pipe<T extends Writable>(dest: T, pipeOpts?: { end?: boolean }): T { + pipe<T extends Duplex | Writable>(dest: T, pipeOpts?: { end?: boolean }): T { // deno-lint-ignore no-this-alias const src = this; const state = this._readableState; @@ -776,7 +318,7 @@ class Readable extends Stream { state.awaitDrainWriters = dest; state.multiAwaitDrain = false; } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - (state.awaitDrainWriters as Set<Writable>).add(dest); + (state.awaitDrainWriters as Set<Duplex | Writable>).add(dest); } src.pause(); } @@ -791,12 +333,13 @@ class Readable extends Stream { unpipe(); dest.removeListener("error", onerror); if (dest.listenerCount("error") === 0) { - //TODO(Soremwar) - //Should be const s = dest._writableState || dest._readableState; - const s = dest._writableState; + const s = dest._writableState || (dest as Duplex)._readableState; if (s && !s.errorEmitted) { - // User incorrectly emitted 'error' directly on the stream. - errorOrDestroyDuplex(dest, er); + if (dest instanceof Duplex) { + errorOrDestroyDuplex(dest as unknown as Duplex, er); + } else { + errorOrDestroyWritable(dest as Writable, er); + } } else { dest.emit("error", er); } @@ -817,7 +360,7 @@ class Readable extends Stream { dest.once("finish", onfinish); function unpipe() { - src.unpipe(dest); + src.unpipe(dest as Writable); } dest.emit("pipe", this); @@ -834,12 +377,11 @@ class Readable extends Stream { this._readableState.flowing === false; } - //TODO(Soremwar) - //Replace string with encoding types - setEncoding(enc: string) { + setEncoding(enc: Encodings) { const decoder = new StringDecoder(enc); this._readableState.decoder = decoder; - this._readableState.encoding = this._readableState.decoder.encoding; + this._readableState.encoding = this._readableState.decoder + .encoding as Encodings; const buffer = this._readableState.buffer; let content = ""; @@ -931,7 +473,7 @@ class Readable extends Stream { off = this.removeListener; - destroy(err?: Error, cb?: () => void) { + destroy(err?: Error | null, cb?: () => void) { const r = this._readableState; if (r.destroyed) { @@ -989,10 +531,8 @@ class Readable extends Stream { this.destroy(err); } - //TODO(Soremwar) - //Same deal, string => encodings // deno-lint-ignore no-explicit-any - push(chunk: any, encoding?: string): boolean { + push(chunk: any, encoding?: Encodings): boolean { return readableAddChunk(this, chunk, encoding, false); } @@ -1233,7 +773,7 @@ class Readable extends Stream { } } -Object.defineProperties(Stream, { +Object.defineProperties(Readable, { _readableState: { enumerable: false }, destroyed: { enumerable: false }, readableBuffer: { enumerable: false }, |