summaryrefslogtreecommitdiff
path: root/std/node/_stream/writable_internal.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/writable_internal.ts')
-rw-r--r--std/node/_stream/writable_internal.ts457
1 files changed, 0 insertions, 457 deletions
diff --git a/std/node/_stream/writable_internal.ts b/std/node/_stream/writable_internal.ts
deleted file mode 100644
index e8c001af0..000000000
--- a/std/node/_stream/writable_internal.ts
+++ /dev/null
@@ -1,457 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type Duplex from "./duplex.ts";
-import type Writable from "./writable.ts";
-import type { WritableState } from "./writable.ts";
-import { kDestroy } from "./symbols.ts";
-import { ERR_MULTIPLE_CALLBACK, ERR_STREAM_DESTROYED } from "../_errors.ts";
-
-export type writeV = (
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: string }>,
- callback: (error?: Error | null) => void,
-) => void;
-
-export type AfterWriteTick = {
- cb: (error?: Error) => void;
- count: number;
- state: WritableState;
- stream: Writable;
-};
-
-export const kOnFinished = Symbol("kOnFinished");
-
-function _destroy(
- self: Writable,
- err?: Error | null,
- cb?: (error?: Error | null) => void,
-) {
- self._destroy(err || null, (err) => {
- const w = self._writableState;
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- }
-
- w.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- if (!w.errorEmitted) {
- w.errorEmitted = true;
- self.emit("error", err);
- }
- w.closeEmitted = true;
- if (w.emitClose) {
- self.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- w.closeEmitted = true;
- if (w.emitClose) {
- self.emit("close");
- }
- });
- }
- });
-}
-
-export function afterWrite(
- stream: Writable,
- state: WritableState,
- count: number,
- cb: (error?: Error) => void,
-) {
- const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
- state.needDrain;
- if (needDrain) {
- state.needDrain = false;
- stream.emit("drain");
- }
-
- while (count-- > 0) {
- state.pendingcb--;
- cb();
- }
-
- if (state.destroyed) {
- errorBuffer(state);
- }
-
- finishMaybe(stream, state);
-}
-
-export function afterWriteTick({
- cb,
- count,
- state,
- stream,
-}: AfterWriteTick) {
- state.afterWriteTickInfo = null;
- return afterWrite(stream, state, count, cb);
-}
-
-/** If there's something in the buffer waiting, then process it.*/
-export function clearBuffer(stream: Duplex | Writable, state: WritableState) {
- if (
- state.corked ||
- state.bufferProcessing ||
- state.destroyed ||
- !state.constructed
- ) {
- return;
- }
-
- const { buffered, bufferedIndex, objectMode } = state;
- const bufferedLength = buffered.length - bufferedIndex;
-
- if (!bufferedLength) {
- return;
- }
-
- const i = bufferedIndex;
-
- state.bufferProcessing = true;
- if (bufferedLength > 1 && stream._writev) {
- state.pendingcb -= bufferedLength - 1;
-
- const callback = state.allNoop ? nop : (err: Error) => {
- for (let n = i; n < buffered.length; ++n) {
- buffered[n].callback(err);
- }
- };
- const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
-
- doWrite(stream, state, true, state.length, chunks, "", callback);
-
- resetBuffer(state);
- } else {
- do {
- const { chunk, encoding, callback } = buffered[i];
- const len = objectMode ? 1 : chunk.length;
- doWrite(stream, state, false, len, chunk, encoding, callback);
- } while (i < buffered.length && !state.writing);
-
- if (i === buffered.length) {
- resetBuffer(state);
- } else if (i > 256) {
- buffered.splice(0, i);
- state.bufferedIndex = 0;
- } else {
- state.bufferedIndex = i;
- }
- }
- state.bufferProcessing = false;
-}
-
-export function destroy(this: Writable, err?: Error | null, cb?: () => void) {
- const w = this._writableState;
-
- if (w.destroyed) {
- if (typeof cb === "function") {
- cb();
- }
-
- return this;
- }
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- }
-
- w.destroyed = true;
-
- if (!w.constructed) {
- this.once(kDestroy, (er) => {
- _destroy(this, err || er, cb);
- });
- } else {
- _destroy(this, err, cb);
- }
-
- return this;
-}
-
-function doWrite(
- stream: Duplex | Writable,
- state: WritableState,
- writev: boolean,
- len: number,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- cb: (error: Error) => void,
-) {
- state.writelen = len;
- state.writecb = cb;
- state.writing = true;
- state.sync = true;
- if (state.destroyed) {
- state.onwrite(new ERR_STREAM_DESTROYED("write"));
- } else if (writev) {
- (stream._writev as unknown as writeV)(chunk, state.onwrite);
- } else {
- stream._write(chunk, encoding, state.onwrite);
- }
- state.sync = false;
-}
-
-/** If there's something in the buffer waiting, then invoke callbacks.*/
-export function errorBuffer(state: WritableState) {
- if (state.writing) {
- return;
- }
-
- for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
- const { chunk, callback } = state.buffered[n];
- const len = state.objectMode ? 1 : chunk.length;
- state.length -= len;
- callback(new ERR_STREAM_DESTROYED("write"));
- }
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback(new ERR_STREAM_DESTROYED("end"));
- }
-
- resetBuffer(state);
-}
-
-export function errorOrDestroy(stream: Writable, err: Error, sync = false) {
- const w = stream._writableState;
-
- if (w.destroyed) {
- return stream;
- }
-
- if (w.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (sync) {
- queueMicrotask(() => {
- if (w.errorEmitted) {
- return;
- }
- w.errorEmitted = true;
- stream.emit("error", err);
- });
- } else {
- if (w.errorEmitted) {
- return;
- }
- w.errorEmitted = true;
- stream.emit("error", err);
- }
- }
-}
-
-function finish(stream: Writable, state: WritableState) {
- state.pendingcb--;
- if (state.errorEmitted || state.closeEmitted) {
- return;
- }
-
- state.finished = true;
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback();
- }
-
- stream.emit("finish");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
-}
-
-export function finishMaybe(
- stream: Writable,
- state: WritableState,
- sync?: boolean,
-) {
- if (needFinish(state)) {
- prefinish(stream, state);
- if (state.pendingcb === 0 && needFinish(state)) {
- state.pendingcb++;
- if (sync) {
- queueMicrotask(() => finish(stream, state));
- } else {
- finish(stream, state);
- }
- }
- }
-}
-
-export function needFinish(state: WritableState) {
- return (state.ending &&
- state.constructed &&
- state.length === 0 &&
- !state.errored &&
- state.buffered.length === 0 &&
- !state.finished &&
- !state.writing);
-}
-
-export function nop() {}
-
-export function resetBuffer(state: WritableState) {
- state.buffered = [];
- state.bufferedIndex = 0;
- state.allBuffers = true;
- state.allNoop = true;
-}
-
-function onwriteError(
- stream: Writable,
- state: WritableState,
- er: Error,
- cb: (error: Error) => void,
-) {
- --state.pendingcb;
-
- cb(er);
- errorBuffer(state);
- errorOrDestroy(stream, er);
-}
-
-export function onwrite(stream: Writable, er?: Error | null) {
- const state = stream._writableState;
- const sync = state.sync;
- const cb = state.writecb;
-
- if (typeof cb !== "function") {
- errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
- return;
- }
-
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
-
- if (er) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- er.stack;
-
- if (!state.errored) {
- state.errored = er;
- }
-
- if (sync) {
- queueMicrotask(() => onwriteError(stream, state, er, cb));
- } else {
- onwriteError(stream, state, er, cb);
- }
- } else {
- if (state.buffered.length > state.bufferedIndex) {
- clearBuffer(stream, state);
- }
-
- if (sync) {
- if (
- state.afterWriteTickInfo !== null &&
- state.afterWriteTickInfo.cb === cb
- ) {
- state.afterWriteTickInfo.count++;
- } else {
- state.afterWriteTickInfo = {
- count: 1,
- cb: (cb as (error?: Error) => void),
- stream,
- state,
- };
- queueMicrotask(() =>
- afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
- );
- }
- } else {
- afterWrite(stream, state, 1, cb as (error?: Error) => void);
- }
- }
-}
-
-export function prefinish(stream: Writable, state: WritableState) {
- if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === "function" && !state.destroyed) {
- state.finalCalled = true;
-
- state.sync = true;
- state.pendingcb++;
- stream._final((err) => {
- state.pendingcb--;
- if (err) {
- for (const callback of state[kOnFinished].splice(0)) {
- callback(err);
- }
- errorOrDestroy(stream, err, state.sync);
- } else if (needFinish(state)) {
- state.prefinished = true;
- stream.emit("prefinish");
- state.pendingcb++;
- queueMicrotask(() => finish(stream, state));
- }
- });
- state.sync = false;
- } else {
- state.prefinished = true;
- stream.emit("prefinish");
- }
- }
-}
-
-export function writeOrBuffer(
- stream: Duplex | Writable,
- state: WritableState,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- callback: (error: Error) => void,
-) {
- const len = state.objectMode ? 1 : chunk.length;
-
- state.length += len;
-
- if (state.writing || state.corked || state.errored || !state.constructed) {
- state.buffered.push({ chunk, encoding, callback });
- if (state.allBuffers && encoding !== "buffer") {
- state.allBuffers = false;
- }
- if (state.allNoop && callback !== nop) {
- state.allNoop = false;
- }
- } else {
- state.writelen = len;
- state.writecb = callback;
- state.writing = true;
- state.sync = true;
- stream._write(chunk, encoding, state.onwrite);
- state.sync = false;
- }
-
- const ret = state.length < state.highWaterMark;
-
- if (!ret) {
- state.needDrain = true;
- }
-
- return ret && !state.errored && !state.destroyed;
-}