diff options
Diffstat (limited to 'std/node/_stream/writable_internal.ts')
-rw-r--r-- | std/node/_stream/writable_internal.ts | 457 |
1 files changed, 457 insertions, 0 deletions
diff --git a/std/node/_stream/writable_internal.ts b/std/node/_stream/writable_internal.ts new file mode 100644 index 000000000..e8c001af0 --- /dev/null +++ b/std/node/_stream/writable_internal.ts @@ -0,0 +1,457 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import type Duplex from "./duplex.ts"; +import type Writable from "./writable.ts"; +import type { WritableState } from "./writable.ts"; +import { kDestroy } from "./symbols.ts"; +import { ERR_MULTIPLE_CALLBACK, ERR_STREAM_DESTROYED } from "../_errors.ts"; + +export type writeV = ( + // deno-lint-ignore no-explicit-any + chunks: Array<{ chunk: any; encoding: string }>, + callback: (error?: Error | null) => void, +) => void; + +export type AfterWriteTick = { + cb: (error?: Error) => void; + count: number; + state: WritableState; + stream: Writable; +}; + +export const kOnFinished = Symbol("kOnFinished"); + +function _destroy( + self: Writable, + err?: Error | null, + cb?: (error?: Error | null) => void, +) { + self._destroy(err || null, (err) => { + const w = self._writableState; + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + } + + w.closed = true; + + if (typeof cb === "function") { + cb(err); + } + + if (err) { + queueMicrotask(() => { + if (!w.errorEmitted) { + w.errorEmitted = true; + self.emit("error", err); + } + w.closeEmitted = true; + if (w.emitClose) { + self.emit("close"); + } + }); + } else { + queueMicrotask(() => { + w.closeEmitted = true; + if (w.emitClose) { + self.emit("close"); + } + }); + } + }); +} + +export function afterWrite( + stream: Writable, + state: WritableState, + count: number, + cb: (error?: Error) => void, +) { + const needDrain = !state.ending && !stream.destroyed && state.length === 0 && + state.needDrain; + if (needDrain) { + state.needDrain = false; + stream.emit("drain"); + } + + while (count-- > 0) { + state.pendingcb--; + cb(); + } + + if (state.destroyed) { + errorBuffer(state); + } + + finishMaybe(stream, state); +} + +export function afterWriteTick({ + cb, + count, + state, + stream, +}: AfterWriteTick) { + state.afterWriteTickInfo = null; + return afterWrite(stream, state, count, cb); +} + +/** If there's something in the buffer waiting, then process it.*/ +export function clearBuffer(stream: Duplex | Writable, state: WritableState) { + if ( + state.corked || + state.bufferProcessing || + state.destroyed || + !state.constructed + ) { + return; + } + + const { buffered, bufferedIndex, objectMode } = state; + const bufferedLength = buffered.length - bufferedIndex; + + if (!bufferedLength) { + return; + } + + const i = bufferedIndex; + + state.bufferProcessing = true; + if (bufferedLength > 1 && stream._writev) { + state.pendingcb -= bufferedLength - 1; + + const callback = state.allNoop ? nop : (err: Error) => { + for (let n = i; n < buffered.length; ++n) { + buffered[n].callback(err); + } + }; + const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); + + doWrite(stream, state, true, state.length, chunks, "", callback); + + resetBuffer(state); + } else { + do { + const { chunk, encoding, callback } = buffered[i]; + const len = objectMode ? 1 : chunk.length; + doWrite(stream, state, false, len, chunk, encoding, callback); + } while (i < buffered.length && !state.writing); + + if (i === buffered.length) { + resetBuffer(state); + } else if (i > 256) { + buffered.splice(0, i); + state.bufferedIndex = 0; + } else { + state.bufferedIndex = i; + } + } + state.bufferProcessing = false; +} + +export function destroy(this: Writable, err?: Error | null, cb?: () => void) { + const w = this._writableState; + + if (w.destroyed) { + if (typeof cb === "function") { + cb(); + } + + return this; + } + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + } + + w.destroyed = true; + + if (!w.constructed) { + this.once(kDestroy, (er) => { + _destroy(this, err || er, cb); + }); + } else { + _destroy(this, err, cb); + } + + return this; +} + +function doWrite( + stream: Duplex | Writable, + state: WritableState, + writev: boolean, + len: number, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + cb: (error: Error) => void, +) { + state.writelen = len; + state.writecb = cb; + state.writing = true; + state.sync = true; + if (state.destroyed) { + state.onwrite(new ERR_STREAM_DESTROYED("write")); + } else if (writev) { + (stream._writev as unknown as writeV)(chunk, state.onwrite); + } else { + stream._write(chunk, encoding, state.onwrite); + } + state.sync = false; +} + +/** If there's something in the buffer waiting, then invoke callbacks.*/ +export function errorBuffer(state: WritableState) { + if (state.writing) { + return; + } + + for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { + const { chunk, callback } = state.buffered[n]; + const len = state.objectMode ? 1 : chunk.length; + state.length -= len; + callback(new ERR_STREAM_DESTROYED("write")); + } + + for (const callback of state[kOnFinished].splice(0)) { + callback(new ERR_STREAM_DESTROYED("end")); + } + + resetBuffer(state); +} + +export function errorOrDestroy(stream: Writable, err: Error, sync = false) { + const w = stream._writableState; + + if (w.destroyed) { + return stream; + } + + if (w.autoDestroy) { + stream.destroy(err); + } else if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + if (sync) { + queueMicrotask(() => { + if (w.errorEmitted) { + return; + } + w.errorEmitted = true; + stream.emit("error", err); + }); + } else { + if (w.errorEmitted) { + return; + } + w.errorEmitted = true; + stream.emit("error", err); + } + } +} + +function finish(stream: Writable, state: WritableState) { + state.pendingcb--; + if (state.errorEmitted || state.closeEmitted) { + return; + } + + state.finished = true; + + for (const callback of state[kOnFinished].splice(0)) { + callback(); + } + + stream.emit("finish"); + + if (state.autoDestroy) { + stream.destroy(); + } +} + +export function finishMaybe( + stream: Writable, + state: WritableState, + sync?: boolean, +) { + if (needFinish(state)) { + prefinish(stream, state); + if (state.pendingcb === 0 && needFinish(state)) { + state.pendingcb++; + if (sync) { + queueMicrotask(() => finish(stream, state)); + } else { + finish(stream, state); + } + } + } +} + +export function needFinish(state: WritableState) { + return (state.ending && + state.constructed && + state.length === 0 && + !state.errored && + state.buffered.length === 0 && + !state.finished && + !state.writing); +} + +export function nop() {} + +export function resetBuffer(state: WritableState) { + state.buffered = []; + state.bufferedIndex = 0; + state.allBuffers = true; + state.allNoop = true; +} + +function onwriteError( + stream: Writable, + state: WritableState, + er: Error, + cb: (error: Error) => void, +) { + --state.pendingcb; + + cb(er); + errorBuffer(state); + errorOrDestroy(stream, er); +} + +export function onwrite(stream: Writable, er?: Error | null) { + const state = stream._writableState; + const sync = state.sync; + const cb = state.writecb; + + if (typeof cb !== "function") { + errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); + return; + } + + state.writing = false; + state.writecb = null; + state.length -= state.writelen; + state.writelen = 0; + + if (er) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + er.stack; + + if (!state.errored) { + state.errored = er; + } + + if (sync) { + queueMicrotask(() => onwriteError(stream, state, er, cb)); + } else { + onwriteError(stream, state, er, cb); + } + } else { + if (state.buffered.length > state.bufferedIndex) { + clearBuffer(stream, state); + } + + if (sync) { + if ( + state.afterWriteTickInfo !== null && + state.afterWriteTickInfo.cb === cb + ) { + state.afterWriteTickInfo.count++; + } else { + state.afterWriteTickInfo = { + count: 1, + cb: (cb as (error?: Error) => void), + stream, + state, + }; + queueMicrotask(() => + afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) + ); + } + } else { + afterWrite(stream, state, 1, cb as (error?: Error) => void); + } + } +} + +export function prefinish(stream: Writable, state: WritableState) { + if (!state.prefinished && !state.finalCalled) { + if (typeof stream._final === "function" && !state.destroyed) { + state.finalCalled = true; + + state.sync = true; + state.pendingcb++; + stream._final((err) => { + state.pendingcb--; + if (err) { + for (const callback of state[kOnFinished].splice(0)) { + callback(err); + } + errorOrDestroy(stream, err, state.sync); + } else if (needFinish(state)) { + state.prefinished = true; + stream.emit("prefinish"); + state.pendingcb++; + queueMicrotask(() => finish(stream, state)); + } + }); + state.sync = false; + } else { + state.prefinished = true; + stream.emit("prefinish"); + } + } +} + +export function writeOrBuffer( + stream: Duplex | Writable, + state: WritableState, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + callback: (error: Error) => void, +) { + const len = state.objectMode ? 1 : chunk.length; + + state.length += len; + + if (state.writing || state.corked || state.errored || !state.constructed) { + state.buffered.push({ chunk, encoding, callback }); + if (state.allBuffers && encoding !== "buffer") { + state.allBuffers = false; + } + if (state.allNoop && callback !== nop) { + state.allNoop = false; + } + } else { + state.writelen = len; + state.writecb = callback; + state.writing = true; + state.sync = true; + stream._write(chunk, encoding, state.onwrite); + state.sync = false; + } + + const ret = state.length < state.highWaterMark; + + if (!ret) { + state.needDrain = true; + } + + return ret && !state.errored && !state.destroyed; +} |