diff options
Diffstat (limited to 'std/node/_stream/writable_internal.ts')
-rw-r--r-- | std/node/_stream/writable_internal.ts | 457 |
1 files changed, 0 insertions, 457 deletions
diff --git a/std/node/_stream/writable_internal.ts b/std/node/_stream/writable_internal.ts deleted file mode 100644 index e8c001af0..000000000 --- a/std/node/_stream/writable_internal.ts +++ /dev/null @@ -1,457 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import type Duplex from "./duplex.ts"; -import type Writable from "./writable.ts"; -import type { WritableState } from "./writable.ts"; -import { kDestroy } from "./symbols.ts"; -import { ERR_MULTIPLE_CALLBACK, ERR_STREAM_DESTROYED } from "../_errors.ts"; - -export type writeV = ( - // deno-lint-ignore no-explicit-any - chunks: Array<{ chunk: any; encoding: string }>, - callback: (error?: Error | null) => void, -) => void; - -export type AfterWriteTick = { - cb: (error?: Error) => void; - count: number; - state: WritableState; - stream: Writable; -}; - -export const kOnFinished = Symbol("kOnFinished"); - -function _destroy( - self: Writable, - err?: Error | null, - cb?: (error?: Error | null) => void, -) { - self._destroy(err || null, (err) => { - const w = self._writableState; - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - } - - w.closed = true; - - if (typeof cb === "function") { - cb(err); - } - - if (err) { - queueMicrotask(() => { - if (!w.errorEmitted) { - w.errorEmitted = true; - self.emit("error", err); - } - w.closeEmitted = true; - if (w.emitClose) { - self.emit("close"); - } - }); - } else { - queueMicrotask(() => { - w.closeEmitted = true; - if (w.emitClose) { - self.emit("close"); - } - }); - } - }); -} - -export function afterWrite( - stream: Writable, - state: WritableState, - count: number, - cb: (error?: Error) => void, -) { - const needDrain = !state.ending && !stream.destroyed && state.length === 0 && - state.needDrain; - if (needDrain) { - state.needDrain = false; - stream.emit("drain"); - } - - while (count-- > 0) { - state.pendingcb--; - cb(); - } - - if (state.destroyed) { - errorBuffer(state); - } - - finishMaybe(stream, state); -} - -export function afterWriteTick({ - cb, - count, - state, - stream, -}: AfterWriteTick) { - state.afterWriteTickInfo = null; - return afterWrite(stream, state, count, cb); -} - -/** If there's something in the buffer waiting, then process it.*/ -export function clearBuffer(stream: Duplex | Writable, state: WritableState) { - if ( - state.corked || - state.bufferProcessing || - state.destroyed || - !state.constructed - ) { - return; - } - - const { buffered, bufferedIndex, objectMode } = state; - const bufferedLength = buffered.length - bufferedIndex; - - if (!bufferedLength) { - return; - } - - const i = bufferedIndex; - - state.bufferProcessing = true; - if (bufferedLength > 1 && stream._writev) { - state.pendingcb -= bufferedLength - 1; - - const callback = state.allNoop ? nop : (err: Error) => { - for (let n = i; n < buffered.length; ++n) { - buffered[n].callback(err); - } - }; - const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); - - doWrite(stream, state, true, state.length, chunks, "", callback); - - resetBuffer(state); - } else { - do { - const { chunk, encoding, callback } = buffered[i]; - const len = objectMode ? 1 : chunk.length; - doWrite(stream, state, false, len, chunk, encoding, callback); - } while (i < buffered.length && !state.writing); - - if (i === buffered.length) { - resetBuffer(state); - } else if (i > 256) { - buffered.splice(0, i); - state.bufferedIndex = 0; - } else { - state.bufferedIndex = i; - } - } - state.bufferProcessing = false; -} - -export function destroy(this: Writable, err?: Error | null, cb?: () => void) { - const w = this._writableState; - - if (w.destroyed) { - if (typeof cb === "function") { - cb(); - } - - return this; - } - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - } - - w.destroyed = true; - - if (!w.constructed) { - this.once(kDestroy, (er) => { - _destroy(this, err || er, cb); - }); - } else { - _destroy(this, err, cb); - } - - return this; -} - -function doWrite( - stream: Duplex | Writable, - state: WritableState, - writev: boolean, - len: number, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - cb: (error: Error) => void, -) { - state.writelen = len; - state.writecb = cb; - state.writing = true; - state.sync = true; - if (state.destroyed) { - state.onwrite(new ERR_STREAM_DESTROYED("write")); - } else if (writev) { - (stream._writev as unknown as writeV)(chunk, state.onwrite); - } else { - stream._write(chunk, encoding, state.onwrite); - } - state.sync = false; -} - -/** If there's something in the buffer waiting, then invoke callbacks.*/ -export function errorBuffer(state: WritableState) { - if (state.writing) { - return; - } - - for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { - const { chunk, callback } = state.buffered[n]; - const len = state.objectMode ? 1 : chunk.length; - state.length -= len; - callback(new ERR_STREAM_DESTROYED("write")); - } - - for (const callback of state[kOnFinished].splice(0)) { - callback(new ERR_STREAM_DESTROYED("end")); - } - - resetBuffer(state); -} - -export function errorOrDestroy(stream: Writable, err: Error, sync = false) { - const w = stream._writableState; - - if (w.destroyed) { - return stream; - } - - if (w.autoDestroy) { - stream.destroy(err); - } else if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - if (sync) { - queueMicrotask(() => { - if (w.errorEmitted) { - return; - } - w.errorEmitted = true; - stream.emit("error", err); - }); - } else { - if (w.errorEmitted) { - return; - } - w.errorEmitted = true; - stream.emit("error", err); - } - } -} - -function finish(stream: Writable, state: WritableState) { - state.pendingcb--; - if (state.errorEmitted || state.closeEmitted) { - return; - } - - state.finished = true; - - for (const callback of state[kOnFinished].splice(0)) { - callback(); - } - - stream.emit("finish"); - - if (state.autoDestroy) { - stream.destroy(); - } -} - -export function finishMaybe( - stream: Writable, - state: WritableState, - sync?: boolean, -) { - if (needFinish(state)) { - prefinish(stream, state); - if (state.pendingcb === 0 && needFinish(state)) { - state.pendingcb++; - if (sync) { - queueMicrotask(() => finish(stream, state)); - } else { - finish(stream, state); - } - } - } -} - -export function needFinish(state: WritableState) { - return (state.ending && - state.constructed && - state.length === 0 && - !state.errored && - state.buffered.length === 0 && - !state.finished && - !state.writing); -} - -export function nop() {} - -export function resetBuffer(state: WritableState) { - state.buffered = []; - state.bufferedIndex = 0; - state.allBuffers = true; - state.allNoop = true; -} - -function onwriteError( - stream: Writable, - state: WritableState, - er: Error, - cb: (error: Error) => void, -) { - --state.pendingcb; - - cb(er); - errorBuffer(state); - errorOrDestroy(stream, er); -} - -export function onwrite(stream: Writable, er?: Error | null) { - const state = stream._writableState; - const sync = state.sync; - const cb = state.writecb; - - if (typeof cb !== "function") { - errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); - return; - } - - state.writing = false; - state.writecb = null; - state.length -= state.writelen; - state.writelen = 0; - - if (er) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - er.stack; - - if (!state.errored) { - state.errored = er; - } - - if (sync) { - queueMicrotask(() => onwriteError(stream, state, er, cb)); - } else { - onwriteError(stream, state, er, cb); - } - } else { - if (state.buffered.length > state.bufferedIndex) { - clearBuffer(stream, state); - } - - if (sync) { - if ( - state.afterWriteTickInfo !== null && - state.afterWriteTickInfo.cb === cb - ) { - state.afterWriteTickInfo.count++; - } else { - state.afterWriteTickInfo = { - count: 1, - cb: (cb as (error?: Error) => void), - stream, - state, - }; - queueMicrotask(() => - afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) - ); - } - } else { - afterWrite(stream, state, 1, cb as (error?: Error) => void); - } - } -} - -export function prefinish(stream: Writable, state: WritableState) { - if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === "function" && !state.destroyed) { - state.finalCalled = true; - - state.sync = true; - state.pendingcb++; - stream._final((err) => { - state.pendingcb--; - if (err) { - for (const callback of state[kOnFinished].splice(0)) { - callback(err); - } - errorOrDestroy(stream, err, state.sync); - } else if (needFinish(state)) { - state.prefinished = true; - stream.emit("prefinish"); - state.pendingcb++; - queueMicrotask(() => finish(stream, state)); - } - }); - state.sync = false; - } else { - state.prefinished = true; - stream.emit("prefinish"); - } - } -} - -export function writeOrBuffer( - stream: Duplex | Writable, - state: WritableState, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - callback: (error: Error) => void, -) { - const len = state.objectMode ? 1 : chunk.length; - - state.length += len; - - if (state.writing || state.corked || state.errored || !state.constructed) { - state.buffered.push({ chunk, encoding, callback }); - if (state.allBuffers && encoding !== "buffer") { - state.allBuffers = false; - } - if (state.allNoop && callback !== nop) { - state.allNoop = false; - } - } else { - state.writelen = len; - state.writecb = callback; - state.writing = true; - state.sync = true; - stream._write(chunk, encoding, state.onwrite); - state.sync = false; - } - - const ret = state.length < state.highWaterMark; - - if (!ret) { - state.needDrain = true; - } - - return ret && !state.errored && !state.destroyed; -} |