summaryrefslogtreecommitdiff
path: root/std/node/_stream/writable_internal.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/writable_internal.ts')
-rw-r--r--std/node/_stream/writable_internal.ts457
1 files changed, 457 insertions, 0 deletions
diff --git a/std/node/_stream/writable_internal.ts b/std/node/_stream/writable_internal.ts
new file mode 100644
index 000000000..e8c001af0
--- /dev/null
+++ b/std/node/_stream/writable_internal.ts
@@ -0,0 +1,457 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import type Duplex from "./duplex.ts";
+import type Writable from "./writable.ts";
+import type { WritableState } from "./writable.ts";
+import { kDestroy } from "./symbols.ts";
+import { ERR_MULTIPLE_CALLBACK, ERR_STREAM_DESTROYED } from "../_errors.ts";
+
+export type writeV = (
+ // deno-lint-ignore no-explicit-any
+ chunks: Array<{ chunk: any; encoding: string }>,
+ callback: (error?: Error | null) => void,
+) => void;
+
+export type AfterWriteTick = {
+ cb: (error?: Error) => void;
+ count: number;
+ state: WritableState;
+ stream: Writable;
+};
+
+export const kOnFinished = Symbol("kOnFinished");
+
+function _destroy(
+ self: Writable,
+ err?: Error | null,
+ cb?: (error?: Error | null) => void,
+) {
+ self._destroy(err || null, (err) => {
+ const w = self._writableState;
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ }
+
+ w.closed = true;
+
+ if (typeof cb === "function") {
+ cb(err);
+ }
+
+ if (err) {
+ queueMicrotask(() => {
+ if (!w.errorEmitted) {
+ w.errorEmitted = true;
+ self.emit("error", err);
+ }
+ w.closeEmitted = true;
+ if (w.emitClose) {
+ self.emit("close");
+ }
+ });
+ } else {
+ queueMicrotask(() => {
+ w.closeEmitted = true;
+ if (w.emitClose) {
+ self.emit("close");
+ }
+ });
+ }
+ });
+}
+
+export function afterWrite(
+ stream: Writable,
+ state: WritableState,
+ count: number,
+ cb: (error?: Error) => void,
+) {
+ const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
+ state.needDrain;
+ if (needDrain) {
+ state.needDrain = false;
+ stream.emit("drain");
+ }
+
+ while (count-- > 0) {
+ state.pendingcb--;
+ cb();
+ }
+
+ if (state.destroyed) {
+ errorBuffer(state);
+ }
+
+ finishMaybe(stream, state);
+}
+
+export function afterWriteTick({
+ cb,
+ count,
+ state,
+ stream,
+}: AfterWriteTick) {
+ state.afterWriteTickInfo = null;
+ return afterWrite(stream, state, count, cb);
+}
+
+/** If there's something in the buffer waiting, then process it.*/
+export function clearBuffer(stream: Duplex | Writable, state: WritableState) {
+ if (
+ state.corked ||
+ state.bufferProcessing ||
+ state.destroyed ||
+ !state.constructed
+ ) {
+ return;
+ }
+
+ const { buffered, bufferedIndex, objectMode } = state;
+ const bufferedLength = buffered.length - bufferedIndex;
+
+ if (!bufferedLength) {
+ return;
+ }
+
+ const i = bufferedIndex;
+
+ state.bufferProcessing = true;
+ if (bufferedLength > 1 && stream._writev) {
+ state.pendingcb -= bufferedLength - 1;
+
+ const callback = state.allNoop ? nop : (err: Error) => {
+ for (let n = i; n < buffered.length; ++n) {
+ buffered[n].callback(err);
+ }
+ };
+ const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
+
+ doWrite(stream, state, true, state.length, chunks, "", callback);
+
+ resetBuffer(state);
+ } else {
+ do {
+ const { chunk, encoding, callback } = buffered[i];
+ const len = objectMode ? 1 : chunk.length;
+ doWrite(stream, state, false, len, chunk, encoding, callback);
+ } while (i < buffered.length && !state.writing);
+
+ if (i === buffered.length) {
+ resetBuffer(state);
+ } else if (i > 256) {
+ buffered.splice(0, i);
+ state.bufferedIndex = 0;
+ } else {
+ state.bufferedIndex = i;
+ }
+ }
+ state.bufferProcessing = false;
+}
+
+export function destroy(this: Writable, err?: Error | null, cb?: () => void) {
+ const w = this._writableState;
+
+ if (w.destroyed) {
+ if (typeof cb === "function") {
+ cb();
+ }
+
+ return this;
+ }
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ }
+
+ w.destroyed = true;
+
+ if (!w.constructed) {
+ this.once(kDestroy, (er) => {
+ _destroy(this, err || er, cb);
+ });
+ } else {
+ _destroy(this, err, cb);
+ }
+
+ return this;
+}
+
+function doWrite(
+ stream: Duplex | Writable,
+ state: WritableState,
+ writev: boolean,
+ len: number,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ cb: (error: Error) => void,
+) {
+ state.writelen = len;
+ state.writecb = cb;
+ state.writing = true;
+ state.sync = true;
+ if (state.destroyed) {
+ state.onwrite(new ERR_STREAM_DESTROYED("write"));
+ } else if (writev) {
+ (stream._writev as unknown as writeV)(chunk, state.onwrite);
+ } else {
+ stream._write(chunk, encoding, state.onwrite);
+ }
+ state.sync = false;
+}
+
+/** If there's something in the buffer waiting, then invoke callbacks.*/
+export function errorBuffer(state: WritableState) {
+ if (state.writing) {
+ return;
+ }
+
+ for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
+ const { chunk, callback } = state.buffered[n];
+ const len = state.objectMode ? 1 : chunk.length;
+ state.length -= len;
+ callback(new ERR_STREAM_DESTROYED("write"));
+ }
+
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback(new ERR_STREAM_DESTROYED("end"));
+ }
+
+ resetBuffer(state);
+}
+
+export function errorOrDestroy(stream: Writable, err: Error, sync = false) {
+ const w = stream._writableState;
+
+ if (w.destroyed) {
+ return stream;
+ }
+
+ if (w.autoDestroy) {
+ stream.destroy(err);
+ } else if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ if (sync) {
+ queueMicrotask(() => {
+ if (w.errorEmitted) {
+ return;
+ }
+ w.errorEmitted = true;
+ stream.emit("error", err);
+ });
+ } else {
+ if (w.errorEmitted) {
+ return;
+ }
+ w.errorEmitted = true;
+ stream.emit("error", err);
+ }
+ }
+}
+
+function finish(stream: Writable, state: WritableState) {
+ state.pendingcb--;
+ if (state.errorEmitted || state.closeEmitted) {
+ return;
+ }
+
+ state.finished = true;
+
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback();
+ }
+
+ stream.emit("finish");
+
+ if (state.autoDestroy) {
+ stream.destroy();
+ }
+}
+
+export function finishMaybe(
+ stream: Writable,
+ state: WritableState,
+ sync?: boolean,
+) {
+ if (needFinish(state)) {
+ prefinish(stream, state);
+ if (state.pendingcb === 0 && needFinish(state)) {
+ state.pendingcb++;
+ if (sync) {
+ queueMicrotask(() => finish(stream, state));
+ } else {
+ finish(stream, state);
+ }
+ }
+ }
+}
+
+export function needFinish(state: WritableState) {
+ return (state.ending &&
+ state.constructed &&
+ state.length === 0 &&
+ !state.errored &&
+ state.buffered.length === 0 &&
+ !state.finished &&
+ !state.writing);
+}
+
+export function nop() {}
+
+export function resetBuffer(state: WritableState) {
+ state.buffered = [];
+ state.bufferedIndex = 0;
+ state.allBuffers = true;
+ state.allNoop = true;
+}
+
+function onwriteError(
+ stream: Writable,
+ state: WritableState,
+ er: Error,
+ cb: (error: Error) => void,
+) {
+ --state.pendingcb;
+
+ cb(er);
+ errorBuffer(state);
+ errorOrDestroy(stream, er);
+}
+
+export function onwrite(stream: Writable, er?: Error | null) {
+ const state = stream._writableState;
+ const sync = state.sync;
+ const cb = state.writecb;
+
+ if (typeof cb !== "function") {
+ errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
+ return;
+ }
+
+ state.writing = false;
+ state.writecb = null;
+ state.length -= state.writelen;
+ state.writelen = 0;
+
+ if (er) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ er.stack;
+
+ if (!state.errored) {
+ state.errored = er;
+ }
+
+ if (sync) {
+ queueMicrotask(() => onwriteError(stream, state, er, cb));
+ } else {
+ onwriteError(stream, state, er, cb);
+ }
+ } else {
+ if (state.buffered.length > state.bufferedIndex) {
+ clearBuffer(stream, state);
+ }
+
+ if (sync) {
+ if (
+ state.afterWriteTickInfo !== null &&
+ state.afterWriteTickInfo.cb === cb
+ ) {
+ state.afterWriteTickInfo.count++;
+ } else {
+ state.afterWriteTickInfo = {
+ count: 1,
+ cb: (cb as (error?: Error) => void),
+ stream,
+ state,
+ };
+ queueMicrotask(() =>
+ afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
+ );
+ }
+ } else {
+ afterWrite(stream, state, 1, cb as (error?: Error) => void);
+ }
+ }
+}
+
+export function prefinish(stream: Writable, state: WritableState) {
+ if (!state.prefinished && !state.finalCalled) {
+ if (typeof stream._final === "function" && !state.destroyed) {
+ state.finalCalled = true;
+
+ state.sync = true;
+ state.pendingcb++;
+ stream._final((err) => {
+ state.pendingcb--;
+ if (err) {
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback(err);
+ }
+ errorOrDestroy(stream, err, state.sync);
+ } else if (needFinish(state)) {
+ state.prefinished = true;
+ stream.emit("prefinish");
+ state.pendingcb++;
+ queueMicrotask(() => finish(stream, state));
+ }
+ });
+ state.sync = false;
+ } else {
+ state.prefinished = true;
+ stream.emit("prefinish");
+ }
+ }
+}
+
+export function writeOrBuffer(
+ stream: Duplex | Writable,
+ state: WritableState,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ callback: (error: Error) => void,
+) {
+ const len = state.objectMode ? 1 : chunk.length;
+
+ state.length += len;
+
+ if (state.writing || state.corked || state.errored || !state.constructed) {
+ state.buffered.push({ chunk, encoding, callback });
+ if (state.allBuffers && encoding !== "buffer") {
+ state.allBuffers = false;
+ }
+ if (state.allNoop && callback !== nop) {
+ state.allNoop = false;
+ }
+ } else {
+ state.writelen = len;
+ state.writecb = callback;
+ state.writing = true;
+ state.sync = true;
+ stream._write(chunk, encoding, state.onwrite);
+ state.sync = false;
+ }
+
+ const ret = state.length < state.highWaterMark;
+
+ if (!ret) {
+ state.needDrain = true;
+ }
+
+ return ret && !state.errored && !state.destroyed;
+}