diff options
Diffstat (limited to 'std/node/_stream/duplex_internal.ts')
-rw-r--r-- | std/node/_stream/duplex_internal.ts | 296 |
1 files changed, 0 insertions, 296 deletions
diff --git a/std/node/_stream/duplex_internal.ts b/std/node/_stream/duplex_internal.ts deleted file mode 100644 index bfd9749f8..000000000 --- a/std/node/_stream/duplex_internal.ts +++ /dev/null @@ -1,296 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import type { ReadableState } from "./readable.ts"; -import { addChunk, maybeReadMore, onEofChunk } from "./readable_internal.ts"; -import type Writable from "./writable.ts"; -import type { WritableState } from "./writable.ts"; -import { - afterWrite, - AfterWriteTick, - afterWriteTick, - clearBuffer, - errorBuffer, - kOnFinished, - needFinish, - prefinish, -} from "./writable_internal.ts"; -import { Buffer } from "../buffer.ts"; -import type Duplex from "./duplex.ts"; -import { - ERR_MULTIPLE_CALLBACK, - ERR_STREAM_PUSH_AFTER_EOF, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT, -} from "../_errors.ts"; - -export function endDuplex(stream: Duplex) { - const state = stream._readableState; - - if (!state.endEmitted) { - state.ended = true; - queueMicrotask(() => endReadableNT(state, stream)); - } -} - -function endReadableNT(state: ReadableState, stream: Duplex) { - // Check that we didn't get one last unshift. - if ( - !state.errorEmitted && !state.closeEmitted && - !state.endEmitted && state.length === 0 - ) { - state.endEmitted = true; - stream.emit("end"); - - if (stream.writable && stream.allowHalfOpen === false) { - queueMicrotask(() => endWritableNT(state, stream)); - } else if (state.autoDestroy) { - // In case of duplex streams we need a way to detect - // if the writable side is ready for autoDestroy as well. - const wState = stream._writableState; - const autoDestroy = !wState || ( - wState.autoDestroy && - // We don't expect the writable to ever 'finish' - // if writable is explicitly set to false. - (wState.finished || wState.writable === false) - ); - - if (autoDestroy) { - stream.destroy(); - } - } - } -} - -function endWritableNT(state: ReadableState, stream: Duplex) { - const writable = stream.writable && - !stream.writableEnded && - !stream.destroyed; - if (writable) { - stream.end(); - } -} - -export function errorOrDestroy( - // deno-lint-ignore no-explicit-any - this: any, - stream: Duplex, - err: Error, - sync = false, -) { - const r = stream._readableState; - const w = stream._writableState; - - if (w.destroyed || r.destroyed) { - return this; - } - - if (r.autoDestroy || w.autoDestroy) { - stream.destroy(err); - } else if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (w && !w.errored) { - w.errored = err; - } - if (r && !r.errored) { - r.errored = err; - } - - if (sync) { - queueMicrotask(() => { - if (w.errorEmitted || r.errorEmitted) { - return; - } - - w.errorEmitted = true; - r.errorEmitted = true; - - stream.emit("error", err); - }); - } else { - if (w.errorEmitted || r.errorEmitted) { - return; - } - - w.errorEmitted = true; - r.errorEmitted = true; - - stream.emit("error", err); - } - } -} - -function finish(stream: Duplex, state: WritableState) { - state.pendingcb--; - if (state.errorEmitted || state.closeEmitted) { - return; - } - - state.finished = true; - - for (const callback of state[kOnFinished].splice(0)) { - callback(); - } - - stream.emit("finish"); - - if (state.autoDestroy) { - stream.destroy(); - } -} - -export function finishMaybe( - stream: Duplex, - state: WritableState, - sync?: boolean, -) { - if (needFinish(state)) { - prefinish(stream as Writable, state); - if (state.pendingcb === 0 && needFinish(state)) { - state.pendingcb++; - if (sync) { - queueMicrotask(() => finish(stream, state)); - } else { - finish(stream, state); - } - } - } -} - -export function onwrite(stream: Duplex, er?: Error | null) { - const state = stream._writableState; - const sync = state.sync; - const cb = state.writecb; - - if (typeof cb !== "function") { - errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); - return; - } - - state.writing = false; - state.writecb = null; - state.length -= state.writelen; - state.writelen = 0; - - if (er) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - er.stack; - - if (!state.errored) { - state.errored = er; - } - - if (stream._readableState && !stream._readableState.errored) { - stream._readableState.errored = er; - } - - if (sync) { - queueMicrotask(() => onwriteError(stream, state, er, cb)); - } else { - onwriteError(stream, state, er, cb); - } - } else { - if (state.buffered.length > state.bufferedIndex) { - clearBuffer(stream, state); - } - - if (sync) { - if ( - state.afterWriteTickInfo !== null && - state.afterWriteTickInfo.cb === cb - ) { - state.afterWriteTickInfo.count++; - } else { - state.afterWriteTickInfo = { - count: 1, - cb: (cb as (error?: Error) => void), - stream: stream as Writable, - state, - }; - queueMicrotask(() => - afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) - ); - } - } else { - afterWrite(stream as Writable, state, 1, cb as (error?: Error) => void); - } - } -} - -function onwriteError( - stream: Duplex, - state: WritableState, - er: Error, - cb: (error: Error) => void, -) { - --state.pendingcb; - - cb(er); - errorBuffer(state); - errorOrDestroy(stream, er); -} - -export function readableAddChunk( - stream: Duplex, - chunk: string | Buffer | Uint8Array | null, - encoding: undefined | string = undefined, - addToFront: boolean, -) { - const state = stream._readableState; - let usedEncoding = encoding; - - let err; - if (!state.objectMode) { - if (typeof chunk === "string") { - usedEncoding = encoding || state.defaultEncoding; - if (state.encoding !== usedEncoding) { - if (addToFront && state.encoding) { - chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding); - } else { - chunk = Buffer.from(chunk, usedEncoding); - usedEncoding = ""; - } - } - } else if (chunk instanceof Uint8Array) { - chunk = Buffer.from(chunk); - } - } - - if (err) { - errorOrDestroy(stream, err); - } else if (chunk === null) { - state.reading = false; - onEofChunk(stream, state); - } else if (state.objectMode || (chunk.length > 0)) { - if (addToFront) { - if (state.endEmitted) { - errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - } else { - addChunk(stream, state, chunk, true); - } - } else if (state.ended) { - errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state.destroyed || state.errored) { - return false; - } else { - state.reading = false; - if (state.decoder && !usedEncoding) { - //TODO(Soremwar) - //I don't think this cast is right - chunk = state.decoder.write(Buffer.from(chunk as Uint8Array)); - if (state.objectMode || chunk.length !== 0) { - addChunk(stream, state, chunk, false); - } else { - maybeReadMore(stream, state); - } - } else { - addChunk(stream, state, chunk, false); - } - } - } else if (!addToFront) { - state.reading = false; - maybeReadMore(stream, state); - } - - return !state.ended && - (state.length < state.highWaterMark || state.length === 0); -} |