diff options
Diffstat (limited to 'std/node/_stream/readable_internal.ts')
-rw-r--r-- | std/node/_stream/readable_internal.ts | 438 |
1 files changed, 0 insertions, 438 deletions
diff --git a/std/node/_stream/readable_internal.ts b/std/node/_stream/readable_internal.ts deleted file mode 100644 index 0ef261d4d..000000000 --- a/std/node/_stream/readable_internal.ts +++ /dev/null @@ -1,438 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import type Duplex from "./duplex.ts"; -import type EventEmitter from "../events.ts"; -import type Readable from "./readable.ts"; -import type Writable from "./writable.ts"; -import type { ReadableState } from "./readable.ts"; -import { kPaused } from "./symbols.ts"; -import { - ERR_STREAM_PUSH_AFTER_EOF, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT, -} from "../_errors.ts"; - -export function _destroy( - self: Readable, - err?: Error | null, - cb?: (error?: Error | null) => void, -) { - self._destroy(err || null, (err) => { - const r = (self as Readable)._readableState; - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!r.errored) { - r.errored = err; - } - } - - r.closed = true; - - if (typeof cb === "function") { - cb(err); - } - - if (err) { - queueMicrotask(() => { - if (!r.errorEmitted) { - r.errorEmitted = true; - self.emit("error", err); - } - r.closeEmitted = true; - if (r.emitClose) { - self.emit("close"); - } - }); - } else { - queueMicrotask(() => { - r.closeEmitted = true; - if (r.emitClose) { - self.emit("close"); - } - }); - } - }); -} - -export function addChunk( - stream: Duplex | Readable, - state: ReadableState, - chunk: string | Buffer | Uint8Array, - addToFront: boolean, -) { - if (state.flowing && state.length === 0 && !state.sync) { - if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Writable>).clear(); - } else { - state.awaitDrainWriters = null; - } - stream.emit("data", chunk); - } else { - // Update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - if (addToFront) { - state.buffer.unshift(chunk); - } else { - state.buffer.push(chunk); - } - - if (state.needReadable) { - emitReadable(stream); - } - } - maybeReadMore(stream, state); -} - -// Don't raise the hwm > 1GB. -const MAX_HWM = 0x40000000; -export function computeNewHighWaterMark(n: number) { - if (n >= MAX_HWM) { - n = MAX_HWM; - } else { - n--; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - n++; - } - return n; -} - -export function emitReadable(stream: Duplex | Readable) { - const state = stream._readableState; - state.needReadable = false; - if (!state.emittedReadable) { - state.emittedReadable = true; - queueMicrotask(() => emitReadable_(stream)); - } -} - -function emitReadable_(stream: Duplex | Readable) { - const state = stream._readableState; - if (!state.destroyed && !state.errored && (state.length || state.ended)) { - stream.emit("readable"); - state.emittedReadable = false; - } - - state.needReadable = !state.flowing && - !state.ended && - state.length <= state.highWaterMark; - flow(stream); -} - -export function endReadable(stream: Readable) { - const state = stream._readableState; - - if (!state.endEmitted) { - state.ended = true; - queueMicrotask(() => endReadableNT(state, stream)); - } -} - -function endReadableNT(state: ReadableState, stream: Readable) { - if ( - !state.errorEmitted && !state.closeEmitted && - !state.endEmitted && state.length === 0 - ) { - state.endEmitted = true; - stream.emit("end"); - - if (state.autoDestroy) { - stream.destroy(); - } - } -} - -export function errorOrDestroy( - stream: Duplex | Readable, - err: Error, - sync = false, -) { - const r = stream._readableState; - - if (r.destroyed) { - return stream; - } - - if (r.autoDestroy) { - stream.destroy(err); - } else if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!r.errored) { - r.errored = err; - } - if (sync) { - queueMicrotask(() => { - if (!r.errorEmitted) { - r.errorEmitted = true; - stream.emit("error", err); - } - }); - } else if (!r.errorEmitted) { - r.errorEmitted = true; - stream.emit("error", err); - } - } -} - -function flow(stream: Duplex | Readable) { - const state = stream._readableState; - while (state.flowing && stream.read() !== null); -} - -/** Pluck off n bytes from an array of buffers. -* Length is the combined lengths of all the buffers in the list. -* This function is designed to be inlinable, so please take care when making -* changes to the function body. -*/ -export function fromList(n: number, state: ReadableState) { - // nothing buffered. - if (state.length === 0) { - return null; - } - - let ret; - if (state.objectMode) { - ret = state.buffer.shift(); - } else if (!n || n >= state.length) { - if (state.decoder) { - ret = state.buffer.join(""); - } else if (state.buffer.length === 1) { - ret = state.buffer.first(); - } else { - ret = state.buffer.concat(state.length); - } - state.buffer.clear(); - } else { - ret = state.buffer.consume(n, !!state.decoder); - } - - return ret; -} - -export function howMuchToRead(n: number, state: ReadableState) { - if (n <= 0 || (state.length === 0 && state.ended)) { - return 0; - } - if (state.objectMode) { - return 1; - } - if (Number.isNaN(n)) { - // Only flow one buffer at a time. - if (state.flowing && state.length) { - return state.buffer.first().length; - } - return state.length; - } - if (n <= state.length) { - return n; - } - return state.ended ? state.length : 0; -} - -export function maybeReadMore(stream: Readable, state: ReadableState) { - if (!state.readingMore && state.constructed) { - state.readingMore = true; - queueMicrotask(() => maybeReadMore_(stream, state)); - } -} - -function maybeReadMore_(stream: Readable, state: ReadableState) { - while ( - !state.reading && !state.ended && - (state.length < state.highWaterMark || - (state.flowing && state.length === 0)) - ) { - const len = state.length; - stream.read(0); - if (len === state.length) { - // Didn't get any data, stop spinning. - break; - } - } - state.readingMore = false; -} - -export function nReadingNextTick(self: Duplex | Readable) { - self.read(0); -} - -export function onEofChunk(stream: Duplex | Readable, state: ReadableState) { - if (state.ended) return; - if (state.decoder) { - const chunk = state.decoder.end(); - if (chunk && chunk.length) { - state.buffer.push(chunk); - state.length += state.objectMode ? 1 : chunk.length; - } - } - state.ended = true; - - if (state.sync) { - emitReadable(stream); - } else { - state.needReadable = false; - state.emittedReadable = true; - emitReadable_(stream); - } -} - -export function pipeOnDrain(src: Duplex | Readable, dest: Duplex | Writable) { - return function pipeOnDrainFunctionResult() { - const state = src._readableState; - - if (state.awaitDrainWriters === dest) { - state.awaitDrainWriters = null; - } else if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Duplex | Writable>).delete(dest); - } - - if ( - (!state.awaitDrainWriters || - (state.awaitDrainWriters as Set<Writable>).size === 0) && - src.listenerCount("data") - ) { - state.flowing = true; - flow(src); - } - }; -} - -export function prependListener( - emitter: EventEmitter, - event: string, - // deno-lint-ignore no-explicit-any - fn: (...args: any[]) => any, -) { - if (typeof emitter.prependListener === "function") { - return emitter.prependListener(event, fn); - } - - // This is a hack to make sure that our error handler is attached before any - // userland ones. NEVER DO THIS. This is here only because this code needs - // to continue to work with older versions of Node.js that do not include - //the prependListener() method. The goal is to eventually remove this hack. - // TODO(Soremwar) - // Burn it with fire - // deno-lint-ignore ban-ts-comment - //@ts-ignore - if (emitter._events.get(event)?.length) { - // deno-lint-ignore ban-ts-comment - //@ts-ignore - const listeners = [fn, ...emitter._events.get(event)]; - // deno-lint-ignore ban-ts-comment - //@ts-ignore - emitter._events.set(event, listeners); - } else { - emitter.on(event, fn); - } -} - -export function readableAddChunk( - stream: Duplex | Readable, - chunk: string | Buffer | Uint8Array | null, - encoding: undefined | string = undefined, - addToFront: boolean, -) { - const state = stream._readableState; - let usedEncoding = encoding; - - let err; - if (!state.objectMode) { - if (typeof chunk === "string") { - usedEncoding = encoding || state.defaultEncoding; - if (state.encoding !== usedEncoding) { - if (addToFront && state.encoding) { - chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding); - } else { - chunk = Buffer.from(chunk, usedEncoding); - usedEncoding = ""; - } - } - } else if (chunk instanceof Uint8Array) { - chunk = Buffer.from(chunk); - } - } - - if (err) { - errorOrDestroy(stream, err); - } else if (chunk === null) { - state.reading = false; - onEofChunk(stream, state); - } else if (state.objectMode || (chunk.length > 0)) { - if (addToFront) { - if (state.endEmitted) { - errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - } else { - addChunk(stream, state, chunk, true); - } - } else if (state.ended) { - errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state.destroyed || state.errored) { - return false; - } else { - state.reading = false; - if (state.decoder && !usedEncoding) { - //TODO(Soremwar) - //I don't think this cast is right - chunk = state.decoder.write(Buffer.from(chunk as Uint8Array)); - if (state.objectMode || chunk.length !== 0) { - addChunk(stream, state, chunk, false); - } else { - maybeReadMore(stream, state); - } - } else { - addChunk(stream, state, chunk, false); - } - } - } else if (!addToFront) { - state.reading = false; - maybeReadMore(stream, state); - } - - return !state.ended && - (state.length < state.highWaterMark || state.length === 0); -} - -export function resume(stream: Duplex | Readable, state: ReadableState) { - if (!state.resumeScheduled) { - state.resumeScheduled = true; - queueMicrotask(() => resume_(stream, state)); - } -} - -function resume_(stream: Duplex | Readable, state: ReadableState) { - if (!state.reading) { - stream.read(0); - } - - state.resumeScheduled = false; - stream.emit("resume"); - flow(stream); - if (state.flowing && !state.reading) { - stream.read(0); - } -} - -export function updateReadableListening(self: Duplex | Readable) { - const state = self._readableState; - state.readableListening = self.listenerCount("readable") > 0; - - if (state.resumeScheduled && state[kPaused] === false) { - // Flowing needs to be set to true now, otherwise - // the upcoming resume will not flow. - state.flowing = true; - - // Crude way to check if we should resume. - } else if (self.listenerCount("data") > 0) { - self.resume(); - } else if (!state.readableListening) { - state.flowing = null; - } -} |