summaryrefslogtreecommitdiff
path: root/std/node/_stream/duplex_internal.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/duplex_internal.ts')
-rw-r--r--std/node/_stream/duplex_internal.ts296
1 files changed, 296 insertions, 0 deletions
diff --git a/std/node/_stream/duplex_internal.ts b/std/node/_stream/duplex_internal.ts
new file mode 100644
index 000000000..bfd9749f8
--- /dev/null
+++ b/std/node/_stream/duplex_internal.ts
@@ -0,0 +1,296 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import type { ReadableState } from "./readable.ts";
+import { addChunk, maybeReadMore, onEofChunk } from "./readable_internal.ts";
+import type Writable from "./writable.ts";
+import type { WritableState } from "./writable.ts";
+import {
+ afterWrite,
+ AfterWriteTick,
+ afterWriteTick,
+ clearBuffer,
+ errorBuffer,
+ kOnFinished,
+ needFinish,
+ prefinish,
+} from "./writable_internal.ts";
+import { Buffer } from "../buffer.ts";
+import type Duplex from "./duplex.ts";
+import {
+ ERR_MULTIPLE_CALLBACK,
+ ERR_STREAM_PUSH_AFTER_EOF,
+ ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
+} from "../_errors.ts";
+
+export function endDuplex(stream: Duplex) {
+ const state = stream._readableState;
+
+ if (!state.endEmitted) {
+ state.ended = true;
+ queueMicrotask(() => endReadableNT(state, stream));
+ }
+}
+
+function endReadableNT(state: ReadableState, stream: Duplex) {
+ // Check that we didn't get one last unshift.
+ if (
+ !state.errorEmitted && !state.closeEmitted &&
+ !state.endEmitted && state.length === 0
+ ) {
+ state.endEmitted = true;
+ stream.emit("end");
+
+ if (stream.writable && stream.allowHalfOpen === false) {
+ queueMicrotask(() => endWritableNT(state, stream));
+ } else if (state.autoDestroy) {
+ // In case of duplex streams we need a way to detect
+ // if the writable side is ready for autoDestroy as well.
+ const wState = stream._writableState;
+ const autoDestroy = !wState || (
+ wState.autoDestroy &&
+ // We don't expect the writable to ever 'finish'
+ // if writable is explicitly set to false.
+ (wState.finished || wState.writable === false)
+ );
+
+ if (autoDestroy) {
+ stream.destroy();
+ }
+ }
+ }
+}
+
+function endWritableNT(state: ReadableState, stream: Duplex) {
+ const writable = stream.writable &&
+ !stream.writableEnded &&
+ !stream.destroyed;
+ if (writable) {
+ stream.end();
+ }
+}
+
+export function errorOrDestroy(
+ // deno-lint-ignore no-explicit-any
+ this: any,
+ stream: Duplex,
+ err: Error,
+ sync = false,
+) {
+ const r = stream._readableState;
+ const w = stream._writableState;
+
+ if (w.destroyed || r.destroyed) {
+ return this;
+ }
+
+ if (r.autoDestroy || w.autoDestroy) {
+ stream.destroy(err);
+ } else if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (w && !w.errored) {
+ w.errored = err;
+ }
+ if (r && !r.errored) {
+ r.errored = err;
+ }
+
+ if (sync) {
+ queueMicrotask(() => {
+ if (w.errorEmitted || r.errorEmitted) {
+ return;
+ }
+
+ w.errorEmitted = true;
+ r.errorEmitted = true;
+
+ stream.emit("error", err);
+ });
+ } else {
+ if (w.errorEmitted || r.errorEmitted) {
+ return;
+ }
+
+ w.errorEmitted = true;
+ r.errorEmitted = true;
+
+ stream.emit("error", err);
+ }
+ }
+}
+
+function finish(stream: Duplex, state: WritableState) {
+ state.pendingcb--;
+ if (state.errorEmitted || state.closeEmitted) {
+ return;
+ }
+
+ state.finished = true;
+
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback();
+ }
+
+ stream.emit("finish");
+
+ if (state.autoDestroy) {
+ stream.destroy();
+ }
+}
+
+export function finishMaybe(
+ stream: Duplex,
+ state: WritableState,
+ sync?: boolean,
+) {
+ if (needFinish(state)) {
+ prefinish(stream as Writable, state);
+ if (state.pendingcb === 0 && needFinish(state)) {
+ state.pendingcb++;
+ if (sync) {
+ queueMicrotask(() => finish(stream, state));
+ } else {
+ finish(stream, state);
+ }
+ }
+ }
+}
+
+export function onwrite(stream: Duplex, er?: Error | null) {
+ const state = stream._writableState;
+ const sync = state.sync;
+ const cb = state.writecb;
+
+ if (typeof cb !== "function") {
+ errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
+ return;
+ }
+
+ state.writing = false;
+ state.writecb = null;
+ state.length -= state.writelen;
+ state.writelen = 0;
+
+ if (er) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ er.stack;
+
+ if (!state.errored) {
+ state.errored = er;
+ }
+
+ if (stream._readableState && !stream._readableState.errored) {
+ stream._readableState.errored = er;
+ }
+
+ if (sync) {
+ queueMicrotask(() => onwriteError(stream, state, er, cb));
+ } else {
+ onwriteError(stream, state, er, cb);
+ }
+ } else {
+ if (state.buffered.length > state.bufferedIndex) {
+ clearBuffer(stream, state);
+ }
+
+ if (sync) {
+ if (
+ state.afterWriteTickInfo !== null &&
+ state.afterWriteTickInfo.cb === cb
+ ) {
+ state.afterWriteTickInfo.count++;
+ } else {
+ state.afterWriteTickInfo = {
+ count: 1,
+ cb: (cb as (error?: Error) => void),
+ stream: stream as Writable,
+ state,
+ };
+ queueMicrotask(() =>
+ afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
+ );
+ }
+ } else {
+ afterWrite(stream as Writable, state, 1, cb as (error?: Error) => void);
+ }
+ }
+}
+
+function onwriteError(
+ stream: Duplex,
+ state: WritableState,
+ er: Error,
+ cb: (error: Error) => void,
+) {
+ --state.pendingcb;
+
+ cb(er);
+ errorBuffer(state);
+ errorOrDestroy(stream, er);
+}
+
+export function readableAddChunk(
+ stream: Duplex,
+ chunk: string | Buffer | Uint8Array | null,
+ encoding: undefined | string = undefined,
+ addToFront: boolean,
+) {
+ const state = stream._readableState;
+ let usedEncoding = encoding;
+
+ let err;
+ if (!state.objectMode) {
+ if (typeof chunk === "string") {
+ usedEncoding = encoding || state.defaultEncoding;
+ if (state.encoding !== usedEncoding) {
+ if (addToFront && state.encoding) {
+ chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
+ } else {
+ chunk = Buffer.from(chunk, usedEncoding);
+ usedEncoding = "";
+ }
+ }
+ } else if (chunk instanceof Uint8Array) {
+ chunk = Buffer.from(chunk);
+ }
+ }
+
+ if (err) {
+ errorOrDestroy(stream, err);
+ } else if (chunk === null) {
+ state.reading = false;
+ onEofChunk(stream, state);
+ } else if (state.objectMode || (chunk.length > 0)) {
+ if (addToFront) {
+ if (state.endEmitted) {
+ errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
+ } else {
+ addChunk(stream, state, chunk, true);
+ }
+ } else if (state.ended) {
+ errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
+ } else if (state.destroyed || state.errored) {
+ return false;
+ } else {
+ state.reading = false;
+ if (state.decoder && !usedEncoding) {
+ //TODO(Soremwar)
+ //I don't think this cast is right
+ chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
+ if (state.objectMode || chunk.length !== 0) {
+ addChunk(stream, state, chunk, false);
+ } else {
+ maybeReadMore(stream, state);
+ }
+ } else {
+ addChunk(stream, state, chunk, false);
+ }
+ }
+ } else if (!addToFront) {
+ state.reading = false;
+ maybeReadMore(stream, state);
+ }
+
+ return !state.ended &&
+ (state.length < state.highWaterMark || state.length === 0);
+}