summaryrefslogtreecommitdiff
path: root/std/node/_stream/duplex_internal.ts
diff options
context:
space:
mode:
authorCasper Beyer <caspervonb@pm.me>2021-02-02 19:05:46 +0800
committerGitHub <noreply@github.com>2021-02-02 12:05:46 +0100
commit6abf126c2a7a451cded8c6b5e6ddf1b69c84055d (patch)
treefd94c013a19fcb38954844085821ec1601c20e18 /std/node/_stream/duplex_internal.ts
parenta2b5d44f1aa9d64f448a2a3cc2001272e2f60b98 (diff)
chore: remove std directory (#9361)
This removes the std folder from the tree. Various parts of the tests are pretty tightly dependent on std (47 direct imports and 75 indirect imports, not counting the cli tests that use them as fixtures) so I've added std as a submodule for now.
Diffstat (limited to 'std/node/_stream/duplex_internal.ts')
-rw-r--r--std/node/_stream/duplex_internal.ts296
1 files changed, 0 insertions, 296 deletions
diff --git a/std/node/_stream/duplex_internal.ts b/std/node/_stream/duplex_internal.ts
deleted file mode 100644
index bfd9749f8..000000000
--- a/std/node/_stream/duplex_internal.ts
+++ /dev/null
@@ -1,296 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type { ReadableState } from "./readable.ts";
-import { addChunk, maybeReadMore, onEofChunk } from "./readable_internal.ts";
-import type Writable from "./writable.ts";
-import type { WritableState } from "./writable.ts";
-import {
- afterWrite,
- AfterWriteTick,
- afterWriteTick,
- clearBuffer,
- errorBuffer,
- kOnFinished,
- needFinish,
- prefinish,
-} from "./writable_internal.ts";
-import { Buffer } from "../buffer.ts";
-import type Duplex from "./duplex.ts";
-import {
- ERR_MULTIPLE_CALLBACK,
- ERR_STREAM_PUSH_AFTER_EOF,
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
-} from "../_errors.ts";
-
-export function endDuplex(stream: Duplex) {
- const state = stream._readableState;
-
- if (!state.endEmitted) {
- state.ended = true;
- queueMicrotask(() => endReadableNT(state, stream));
- }
-}
-
-function endReadableNT(state: ReadableState, stream: Duplex) {
- // Check that we didn't get one last unshift.
- if (
- !state.errorEmitted && !state.closeEmitted &&
- !state.endEmitted && state.length === 0
- ) {
- state.endEmitted = true;
- stream.emit("end");
-
- if (stream.writable && stream.allowHalfOpen === false) {
- queueMicrotask(() => endWritableNT(state, stream));
- } else if (state.autoDestroy) {
- // In case of duplex streams we need a way to detect
- // if the writable side is ready for autoDestroy as well.
- const wState = stream._writableState;
- const autoDestroy = !wState || (
- wState.autoDestroy &&
- // We don't expect the writable to ever 'finish'
- // if writable is explicitly set to false.
- (wState.finished || wState.writable === false)
- );
-
- if (autoDestroy) {
- stream.destroy();
- }
- }
- }
-}
-
-function endWritableNT(state: ReadableState, stream: Duplex) {
- const writable = stream.writable &&
- !stream.writableEnded &&
- !stream.destroyed;
- if (writable) {
- stream.end();
- }
-}
-
-export function errorOrDestroy(
- // deno-lint-ignore no-explicit-any
- this: any,
- stream: Duplex,
- err: Error,
- sync = false,
-) {
- const r = stream._readableState;
- const w = stream._writableState;
-
- if (w.destroyed || r.destroyed) {
- return this;
- }
-
- if (r.autoDestroy || w.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (w && !w.errored) {
- w.errored = err;
- }
- if (r && !r.errored) {
- r.errored = err;
- }
-
- if (sync) {
- queueMicrotask(() => {
- if (w.errorEmitted || r.errorEmitted) {
- return;
- }
-
- w.errorEmitted = true;
- r.errorEmitted = true;
-
- stream.emit("error", err);
- });
- } else {
- if (w.errorEmitted || r.errorEmitted) {
- return;
- }
-
- w.errorEmitted = true;
- r.errorEmitted = true;
-
- stream.emit("error", err);
- }
- }
-}
-
-function finish(stream: Duplex, state: WritableState) {
- state.pendingcb--;
- if (state.errorEmitted || state.closeEmitted) {
- return;
- }
-
- state.finished = true;
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback();
- }
-
- stream.emit("finish");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
-}
-
-export function finishMaybe(
- stream: Duplex,
- state: WritableState,
- sync?: boolean,
-) {
- if (needFinish(state)) {
- prefinish(stream as Writable, state);
- if (state.pendingcb === 0 && needFinish(state)) {
- state.pendingcb++;
- if (sync) {
- queueMicrotask(() => finish(stream, state));
- } else {
- finish(stream, state);
- }
- }
- }
-}
-
-export function onwrite(stream: Duplex, er?: Error | null) {
- const state = stream._writableState;
- const sync = state.sync;
- const cb = state.writecb;
-
- if (typeof cb !== "function") {
- errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
- return;
- }
-
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
-
- if (er) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- er.stack;
-
- if (!state.errored) {
- state.errored = er;
- }
-
- if (stream._readableState && !stream._readableState.errored) {
- stream._readableState.errored = er;
- }
-
- if (sync) {
- queueMicrotask(() => onwriteError(stream, state, er, cb));
- } else {
- onwriteError(stream, state, er, cb);
- }
- } else {
- if (state.buffered.length > state.bufferedIndex) {
- clearBuffer(stream, state);
- }
-
- if (sync) {
- if (
- state.afterWriteTickInfo !== null &&
- state.afterWriteTickInfo.cb === cb
- ) {
- state.afterWriteTickInfo.count++;
- } else {
- state.afterWriteTickInfo = {
- count: 1,
- cb: (cb as (error?: Error) => void),
- stream: stream as Writable,
- state,
- };
- queueMicrotask(() =>
- afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
- );
- }
- } else {
- afterWrite(stream as Writable, state, 1, cb as (error?: Error) => void);
- }
- }
-}
-
-function onwriteError(
- stream: Duplex,
- state: WritableState,
- er: Error,
- cb: (error: Error) => void,
-) {
- --state.pendingcb;
-
- cb(er);
- errorBuffer(state);
- errorOrDestroy(stream, er);
-}
-
-export function readableAddChunk(
- stream: Duplex,
- chunk: string | Buffer | Uint8Array | null,
- encoding: undefined | string = undefined,
- addToFront: boolean,
-) {
- const state = stream._readableState;
- let usedEncoding = encoding;
-
- let err;
- if (!state.objectMode) {
- if (typeof chunk === "string") {
- usedEncoding = encoding || state.defaultEncoding;
- if (state.encoding !== usedEncoding) {
- if (addToFront && state.encoding) {
- chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
- } else {
- chunk = Buffer.from(chunk, usedEncoding);
- usedEncoding = "";
- }
- }
- } else if (chunk instanceof Uint8Array) {
- chunk = Buffer.from(chunk);
- }
- }
-
- if (err) {
- errorOrDestroy(stream, err);
- } else if (chunk === null) {
- state.reading = false;
- onEofChunk(stream, state);
- } else if (state.objectMode || (chunk.length > 0)) {
- if (addToFront) {
- if (state.endEmitted) {
- errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
- } else {
- addChunk(stream, state, chunk, true);
- }
- } else if (state.ended) {
- errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
- } else if (state.destroyed || state.errored) {
- return false;
- } else {
- state.reading = false;
- if (state.decoder && !usedEncoding) {
- //TODO(Soremwar)
- //I don't think this cast is right
- chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
- if (state.objectMode || chunk.length !== 0) {
- addChunk(stream, state, chunk, false);
- } else {
- maybeReadMore(stream, state);
- }
- } else {
- addChunk(stream, state, chunk, false);
- }
- }
- } else if (!addToFront) {
- state.reading = false;
- maybeReadMore(stream, state);
- }
-
- return !state.ended &&
- (state.length < state.highWaterMark || state.length === 0);
-}