diff options
author | Steven Guerrero <stephenguerrero43@gmail.com> | 2020-11-26 07:50:08 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-26 13:50:08 +0100 |
commit | 9042fcc12e7774cdd0ca3a5d08918a07dae8102b (patch) | |
tree | 8b5ff11412aae9bb714e0bb0b9b0358db64a8657 /std/node/_stream/duplex_internal.ts | |
parent | 60e980c78180ee3b0a14d692307be275dc181c8d (diff) |
feat(std/node/stream): Add Duplex, Transform, Passthrough, pipeline, finished and promises (#7940)
Diffstat (limited to 'std/node/_stream/duplex_internal.ts')
-rw-r--r-- | std/node/_stream/duplex_internal.ts | 296 |
1 files changed, 296 insertions, 0 deletions
diff --git a/std/node/_stream/duplex_internal.ts b/std/node/_stream/duplex_internal.ts new file mode 100644 index 000000000..bfd9749f8 --- /dev/null +++ b/std/node/_stream/duplex_internal.ts @@ -0,0 +1,296 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import type { ReadableState } from "./readable.ts"; +import { addChunk, maybeReadMore, onEofChunk } from "./readable_internal.ts"; +import type Writable from "./writable.ts"; +import type { WritableState } from "./writable.ts"; +import { + afterWrite, + AfterWriteTick, + afterWriteTick, + clearBuffer, + errorBuffer, + kOnFinished, + needFinish, + prefinish, +} from "./writable_internal.ts"; +import { Buffer } from "../buffer.ts"; +import type Duplex from "./duplex.ts"; +import { + ERR_MULTIPLE_CALLBACK, + ERR_STREAM_PUSH_AFTER_EOF, + ERR_STREAM_UNSHIFT_AFTER_END_EVENT, +} from "../_errors.ts"; + +export function endDuplex(stream: Duplex) { + const state = stream._readableState; + + if (!state.endEmitted) { + state.ended = true; + queueMicrotask(() => endReadableNT(state, stream)); + } +} + +function endReadableNT(state: ReadableState, stream: Duplex) { + // Check that we didn't get one last unshift. + if ( + !state.errorEmitted && !state.closeEmitted && + !state.endEmitted && state.length === 0 + ) { + state.endEmitted = true; + stream.emit("end"); + + if (stream.writable && stream.allowHalfOpen === false) { + queueMicrotask(() => endWritableNT(state, stream)); + } else if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the writable side is ready for autoDestroy as well. + const wState = stream._writableState; + const autoDestroy = !wState || ( + wState.autoDestroy && + // We don't expect the writable to ever 'finish' + // if writable is explicitly set to false. + (wState.finished || wState.writable === false) + ); + + if (autoDestroy) { + stream.destroy(); + } + } + } +} + +function endWritableNT(state: ReadableState, stream: Duplex) { + const writable = stream.writable && + !stream.writableEnded && + !stream.destroyed; + if (writable) { + stream.end(); + } +} + +export function errorOrDestroy( + // deno-lint-ignore no-explicit-any + this: any, + stream: Duplex, + err: Error, + sync = false, +) { + const r = stream._readableState; + const w = stream._writableState; + + if (w.destroyed || r.destroyed) { + return this; + } + + if (r.autoDestroy || w.autoDestroy) { + stream.destroy(err); + } else if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (w && !w.errored) { + w.errored = err; + } + if (r && !r.errored) { + r.errored = err; + } + + if (sync) { + queueMicrotask(() => { + if (w.errorEmitted || r.errorEmitted) { + return; + } + + w.errorEmitted = true; + r.errorEmitted = true; + + stream.emit("error", err); + }); + } else { + if (w.errorEmitted || r.errorEmitted) { + return; + } + + w.errorEmitted = true; + r.errorEmitted = true; + + stream.emit("error", err); + } + } +} + +function finish(stream: Duplex, state: WritableState) { + state.pendingcb--; + if (state.errorEmitted || state.closeEmitted) { + return; + } + + state.finished = true; + + for (const callback of state[kOnFinished].splice(0)) { + callback(); + } + + stream.emit("finish"); + + if (state.autoDestroy) { + stream.destroy(); + } +} + +export function finishMaybe( + stream: Duplex, + state: WritableState, + sync?: boolean, +) { + if (needFinish(state)) { + prefinish(stream as Writable, state); + if (state.pendingcb === 0 && needFinish(state)) { + state.pendingcb++; + if (sync) { + queueMicrotask(() => finish(stream, state)); + } else { + finish(stream, state); + } + } + } +} + +export function onwrite(stream: Duplex, er?: Error | null) { + const state = stream._writableState; + const sync = state.sync; + const cb = state.writecb; + + if (typeof cb !== "function") { + errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); + return; + } + + state.writing = false; + state.writecb = null; + state.length -= state.writelen; + state.writelen = 0; + + if (er) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + er.stack; + + if (!state.errored) { + state.errored = er; + } + + if (stream._readableState && !stream._readableState.errored) { + stream._readableState.errored = er; + } + + if (sync) { + queueMicrotask(() => onwriteError(stream, state, er, cb)); + } else { + onwriteError(stream, state, er, cb); + } + } else { + if (state.buffered.length > state.bufferedIndex) { + clearBuffer(stream, state); + } + + if (sync) { + if ( + state.afterWriteTickInfo !== null && + state.afterWriteTickInfo.cb === cb + ) { + state.afterWriteTickInfo.count++; + } else { + state.afterWriteTickInfo = { + count: 1, + cb: (cb as (error?: Error) => void), + stream: stream as Writable, + state, + }; + queueMicrotask(() => + afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) + ); + } + } else { + afterWrite(stream as Writable, state, 1, cb as (error?: Error) => void); + } + } +} + +function onwriteError( + stream: Duplex, + state: WritableState, + er: Error, + cb: (error: Error) => void, +) { + --state.pendingcb; + + cb(er); + errorBuffer(state); + errorOrDestroy(stream, er); +} + +export function readableAddChunk( + stream: Duplex, + chunk: string | Buffer | Uint8Array | null, + encoding: undefined | string = undefined, + addToFront: boolean, +) { + const state = stream._readableState; + let usedEncoding = encoding; + + let err; + if (!state.objectMode) { + if (typeof chunk === "string") { + usedEncoding = encoding || state.defaultEncoding; + if (state.encoding !== usedEncoding) { + if (addToFront && state.encoding) { + chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding); + } else { + chunk = Buffer.from(chunk, usedEncoding); + usedEncoding = ""; + } + } + } else if (chunk instanceof Uint8Array) { + chunk = Buffer.from(chunk); + } + } + + if (err) { + errorOrDestroy(stream, err); + } else if (chunk === null) { + state.reading = false; + onEofChunk(stream, state); + } else if (state.objectMode || (chunk.length > 0)) { + if (addToFront) { + if (state.endEmitted) { + errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); + } else { + addChunk(stream, state, chunk, true); + } + } else if (state.ended) { + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); + } else if (state.destroyed || state.errored) { + return false; + } else { + state.reading = false; + if (state.decoder && !usedEncoding) { + //TODO(Soremwar) + //I don't think this cast is right + chunk = state.decoder.write(Buffer.from(chunk as Uint8Array)); + if (state.objectMode || chunk.length !== 0) { + addChunk(stream, state, chunk, false); + } else { + maybeReadMore(stream, state); + } + } else { + addChunk(stream, state, chunk, false); + } + } + } else if (!addToFront) { + state.reading = false; + maybeReadMore(stream, state); + } + + return !state.ended && + (state.length < state.highWaterMark || state.length === 0); +} |