diff options
Diffstat (limited to 'std/node/_stream/writable.ts')
-rw-r--r-- | std/node/_stream/writable.ts | 573 |
1 files changed, 32 insertions, 541 deletions
diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts index 158af8325..534fc22fb 100644 --- a/std/node/_stream/writable.ts +++ b/std/node/_stream/writable.ts @@ -2,12 +2,10 @@ import { Buffer } from "../buffer.ts"; import Stream from "./stream.ts"; import { captureRejectionSymbol } from "../events.ts"; -import { kConstruct, kDestroy } from "./symbols.ts"; import { ERR_INVALID_ARG_TYPE, ERR_INVALID_OPT_VALUE, ERR_METHOD_NOT_IMPLEMENTED, - ERR_MULTIPLE_CALLBACK, ERR_STREAM_ALREADY_FINISHED, ERR_STREAM_CANNOT_PIPE, ERR_STREAM_DESTROYED, @@ -15,493 +13,27 @@ import { ERR_STREAM_WRITE_AFTER_END, ERR_UNKNOWN_ENCODING, } from "../_errors.ts"; - -function nop() {} - -//TODO(Soremwar) -//Bring in encodings -type write_v = ( - // deno-lint-ignore no-explicit-any - chunks: Array<{ chunk: any; encoding: string }>, - callback: (error?: Error | null) => void, -) => void; - -type AfterWriteTick = { - cb: (error?: Error) => void; - count: number; - state: WritableState; - stream: Writable; -}; - -const kOnFinished = Symbol("kOnFinished"); - -function destroy(this: Writable, err?: Error, cb?: () => void) { - const w = this._writableState; - - if (w.destroyed) { - if (typeof cb === "function") { - cb(); - } - - return this; - } - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - } - - w.destroyed = true; - - if (!w.constructed) { - this.once(kDestroy, (er) => { - _destroy(this, err || er, cb); - }); - } else { - _destroy(this, err, cb); - } - - return this; -} - -function _destroy( - self: Writable, - err?: Error, - cb?: (error?: Error | null) => void, -) { - self._destroy(err || null, (err) => { - const w = self._writableState; - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - } - - w.closed = true; - - if (typeof cb === "function") { - cb(err); - } - - if (err) { - queueMicrotask(() => { - if (!w.errorEmitted) { - w.errorEmitted = true; - self.emit("error", err); - } - w.closeEmitted = true; - if (w.emitClose) { - self.emit("close"); - } - }); - } else { - queueMicrotask(() => { - w.closeEmitted = true; - if (w.emitClose) { - self.emit("close"); - } - }); - } - }); -} - -function errorOrDestroy(stream: Writable, err: Error, sync = false) { - const w = stream._writableState; - - if (w.destroyed) { - return stream; - } - - if (w.autoDestroy) { - stream.destroy(err); - } else if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - if (sync) { - queueMicrotask(() => { - if (w.errorEmitted) { - return; - } - w.errorEmitted = true; - stream.emit("error", err); - }); - } else { - if (w.errorEmitted) { - return; - } - w.errorEmitted = true; - stream.emit("error", err); - } - } -} - -function construct(stream: Writable, cb: (error: Error) => void) { - if (!stream._construct) { - return; - } - - stream.once(kConstruct, cb); - const w = stream._writableState; - - w.constructed = false; - - queueMicrotask(() => { - let called = false; - stream._construct?.((err) => { - w.constructed = true; - - if (called) { - err = new ERR_MULTIPLE_CALLBACK(); - } else { - called = true; - } - - if (w.destroyed) { - stream.emit(kDestroy, err); - } else if (err) { - errorOrDestroy(stream, err, true); - } else { - queueMicrotask(() => { - stream.emit(kConstruct); - }); - } - }); - }); -} - -//TODO(Soremwar) -//Bring encodings in -function writeOrBuffer( - stream: Writable, - state: WritableState, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - callback: (error: Error) => void, -) { - const len = state.objectMode ? 1 : chunk.length; - - state.length += len; - - if (state.writing || state.corked || state.errored || !state.constructed) { - state.buffered.push({ chunk, encoding, callback }); - if (state.allBuffers && encoding !== "buffer") { - state.allBuffers = false; - } - if (state.allNoop && callback !== nop) { - state.allNoop = false; - } - } else { - state.writelen = len; - state.writecb = callback; - state.writing = true; - state.sync = true; - stream._write(chunk, encoding, state.onwrite); - state.sync = false; - } - - const ret = state.length < state.highWaterMark; - - if (!ret) { - state.needDrain = true; - } - - return ret && !state.errored && !state.destroyed; -} - -//TODO(Soremwar) -//Bring encodings in -function doWrite( - stream: Writable, - state: WritableState, - writev: boolean, - len: number, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - cb: (error: Error) => void, -) { - state.writelen = len; - state.writecb = cb; - state.writing = true; - state.sync = true; - if (state.destroyed) { - state.onwrite(new ERR_STREAM_DESTROYED("write")); - } else if (writev) { - (stream._writev as unknown as write_v)(chunk, state.onwrite); - } else { - stream._write(chunk, encoding, state.onwrite); - } - state.sync = false; -} - -function onwriteError( - stream: Writable, - state: WritableState, - er: Error, - cb: (error: Error) => void, -) { - --state.pendingcb; - - cb(er); - errorBuffer(state); - errorOrDestroy(stream, er); -} - -function onwrite(stream: Writable, er?: Error | null) { - const state = stream._writableState; - const sync = state.sync; - const cb = state.writecb; - - if (typeof cb !== "function") { - errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); - return; - } - - state.writing = false; - state.writecb = null; - state.length -= state.writelen; - state.writelen = 0; - - if (er) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - er.stack; - - if (!state.errored) { - state.errored = er; - } - - if (sync) { - queueMicrotask(() => onwriteError(stream, state, er, cb)); - } else { - onwriteError(stream, state, er, cb); - } - } else { - if (state.buffered.length > state.bufferedIndex) { - clearBuffer(stream, state); - } - - if (sync) { - if ( - state.afterWriteTickInfo !== null && - state.afterWriteTickInfo.cb === cb - ) { - state.afterWriteTickInfo.count++; - } else { - state.afterWriteTickInfo = { - count: 1, - cb: (cb as (error?: Error) => void), - stream, - state, - }; - queueMicrotask(() => - afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) - ); - } - } else { - afterWrite(stream, state, 1, cb as (error?: Error) => void); - } - } -} - -function afterWriteTick({ - cb, - count, - state, - stream, -}: AfterWriteTick) { - state.afterWriteTickInfo = null; - return afterWrite(stream, state, count, cb); -} - -function afterWrite( - stream: Writable, - state: WritableState, - count: number, - cb: (error?: Error) => void, -) { - const needDrain = !state.ending && !stream.destroyed && state.length === 0 && - state.needDrain; - if (needDrain) { - state.needDrain = false; - stream.emit("drain"); - } - - while (count-- > 0) { - state.pendingcb--; - cb(); - } - - if (state.destroyed) { - errorBuffer(state); - } - - finishMaybe(stream, state); -} - -/** If there's something in the buffer waiting, then invoke callbacks.*/ -function errorBuffer(state: WritableState) { - if (state.writing) { - return; - } - - for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { - const { chunk, callback } = state.buffered[n]; - const len = state.objectMode ? 1 : chunk.length; - state.length -= len; - callback(new ERR_STREAM_DESTROYED("write")); - } - - for (const callback of state[kOnFinished].splice(0)) { - callback(new ERR_STREAM_DESTROYED("end")); - } - - resetBuffer(state); -} - -/** If there's something in the buffer waiting, then process it.*/ -function clearBuffer(stream: Writable, state: WritableState) { - if ( - state.corked || - state.bufferProcessing || - state.destroyed || - !state.constructed - ) { - return; - } - - const { buffered, bufferedIndex, objectMode } = state; - const bufferedLength = buffered.length - bufferedIndex; - - if (!bufferedLength) { - return; - } - - const i = bufferedIndex; - - state.bufferProcessing = true; - if (bufferedLength > 1 && stream._writev) { - state.pendingcb -= bufferedLength - 1; - - const callback = state.allNoop ? nop : (err: Error) => { - for (let n = i; n < buffered.length; ++n) { - buffered[n].callback(err); - } - }; - const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); - - doWrite(stream, state, true, state.length, chunks, "", callback); - - resetBuffer(state); - } else { - do { - const { chunk, encoding, callback } = buffered[i]; - const len = objectMode ? 1 : chunk.length; - doWrite(stream, state, false, len, chunk, encoding, callback); - } while (i < buffered.length && !state.writing); - - if (i === buffered.length) { - resetBuffer(state); - } else if (i > 256) { - buffered.splice(0, i); - state.bufferedIndex = 0; - } else { - state.bufferedIndex = i; - } - } - state.bufferProcessing = false; -} - -function finish(stream: Writable, state: WritableState) { - state.pendingcb--; - if (state.errorEmitted || state.closeEmitted) { - return; - } - - state.finished = true; - - for (const callback of state[kOnFinished].splice(0)) { - callback(); - } - - stream.emit("finish"); - - if (state.autoDestroy) { - stream.destroy(); - } -} - -function finishMaybe(stream: Writable, state: WritableState, sync?: boolean) { - if (needFinish(state)) { - prefinish(stream, state); - if (state.pendingcb === 0 && needFinish(state)) { - state.pendingcb++; - if (sync) { - queueMicrotask(() => finish(stream, state)); - } else { - finish(stream, state); - } - } - } -} - -function prefinish(stream: Writable, state: WritableState) { - if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === "function" && !state.destroyed) { - state.finalCalled = true; - - state.sync = true; - state.pendingcb++; - stream._final((err) => { - state.pendingcb--; - if (err) { - for (const callback of state[kOnFinished].splice(0)) { - callback(err); - } - errorOrDestroy(stream, err, state.sync); - } else if (needFinish(state)) { - state.prefinished = true; - stream.emit("prefinish"); - state.pendingcb++; - queueMicrotask(() => finish(stream, state)); - } - }); - state.sync = false; - } else { - state.prefinished = true; - stream.emit("prefinish"); - } - } -} - -function needFinish(state: WritableState) { - return (state.ending && - state.constructed && - state.length === 0 && - !state.errored && - state.buffered.length === 0 && - !state.finished && - !state.writing); -} - -interface WritableOptions { +import type { AfterWriteTick, writeV } from "./writable_internal.ts"; +import { + clearBuffer, + destroy, + errorBuffer, + errorOrDestroy, + finishMaybe, + kOnFinished, + nop, + onwrite, + resetBuffer, + writeOrBuffer, +} from "./writable_internal.ts"; +import type { Encodings } from "../_utils.ts"; + +type WritableEncodings = Encodings | "buffer"; + +export interface WritableOptions { autoDestroy?: boolean; decodeStrings?: boolean; - //TODO(Soremwar) - //Bring encodings in - defaultEncoding?: string; + defaultEncoding?: WritableEncodings; destroy?( this: Writable, error: Error | null, @@ -511,17 +43,13 @@ interface WritableOptions { final?(this: Writable, callback: (error?: Error | null) => void): void; highWaterMark?: number; objectMode?: boolean; - //TODO(Soremwar) - //Bring encodings in write?( this: Writable, // deno-lint-ignore no-explicit-any chunk: any, - encoding: string, + encoding: WritableEncodings, callback: (error?: Error | null) => void, ): void; - //TODO(Soremwar) - //Bring encodings in writev?( this: Writable, // deno-lint-ignore no-explicit-any @@ -530,14 +58,12 @@ interface WritableOptions { ): void; } -class WritableState { +export class WritableState { [kOnFinished]: Array<(error?: Error) => void> = []; afterWriteTickInfo: null | AfterWriteTick = null; allBuffers = true; allNoop = true; autoDestroy: boolean; - //TODO(Soremwar) - //Bring in encodings buffered: Array<{ allBuffers?: boolean; // deno-lint-ignore no-explicit-any @@ -552,7 +78,7 @@ class WritableState { constructed: boolean; corked = 0; decodeStrings: boolean; - defaultEncoding: string; + defaultEncoding: WritableEncodings; destroyed = false; emitClose: boolean; ended = false; @@ -608,25 +134,17 @@ class WritableState { } } -function resetBuffer(state: WritableState) { - state.buffered = []; - state.bufferedIndex = 0; - state.allBuffers = true; - state.allNoop = true; -} - /** A bit simpler than readable streams. * Implement an async `._write(chunk, encoding, cb)`, and it'll handle all * the drain event emission and buffering. */ class Writable extends Stream { - _construct?: (cb: (error?: Error) => void) => void; _final?: ( this: Writable, callback: (error?: Error | null | undefined) => void, ) => void; _writableState: WritableState; - _writev?: write_v | null = null; + _writev?: writeV | null = null; constructor(options?: WritableOptions) { super(); @@ -649,16 +167,6 @@ class Writable extends Stream { this._final = options.final; } } - - construct(this, () => { - const state = this._writableState; - - if (!state.writing) { - clearBuffer(this, state); - } - - finishMaybe(this, state); - }); } [captureRejectionSymbol](err?: Error) { @@ -735,7 +243,7 @@ class Writable extends Stream { cb(err); } - destroy(err?: Error, cb?: () => void) { + destroy(err?: Error | null, cb?: () => void) { const state = this._writableState; if (!state.destroyed) { queueMicrotask(() => errorBuffer(state)); @@ -747,25 +255,19 @@ class Writable extends Stream { end(cb?: () => void): void; // deno-lint-ignore no-explicit-any end(chunk: any, cb?: () => void): void; - //TODO(Soremwar) - //Bring in encodings // deno-lint-ignore no-explicit-any - end(chunk: any, encoding: string, cb?: () => void): void; + end(chunk: any, encoding: WritableEncodings, cb?: () => void): void; end( // deno-lint-ignore no-explicit-any x?: any | (() => void), - //TODO(Soremwar) - //Bring in encodings - y?: string | (() => void), + y?: WritableEncodings | (() => void), z?: () => void, ) { const state = this._writableState; // deno-lint-ignore no-explicit-any let chunk: any | null; - //TODO(Soremwar) - //Bring in encodings - let encoding: string | null; + let encoding: WritableEncodings | null; let cb: undefined | ((error?: Error) => void); if (typeof x === "function") { @@ -778,7 +280,7 @@ class Writable extends Stream { cb = y; } else { chunk = x; - encoding = y as string; + encoding = y as WritableEncodings; cb = z; } @@ -815,8 +317,6 @@ class Writable extends Stream { return this; } - //TODO(Soremwar) - //Bring in encodings _write( // deno-lint-ignore no-explicit-any chunk: any, @@ -838,27 +338,21 @@ class Writable extends Stream { // deno-lint-ignore no-explicit-any write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean; - //TODO(Soremwar) - //Bring in encodings write( // deno-lint-ignore no-explicit-any chunk: any, - encoding: string | null, + encoding: WritableEncodings | null, cb?: (error: Error | null | undefined) => void, ): boolean; - //TODO(Soremwar) - //Bring in encodings write( // deno-lint-ignore no-explicit-any chunk: any, - x?: string | null | ((error: Error | null | undefined) => void), + x?: WritableEncodings | null | ((error: Error | null | undefined) => void), y?: ((error: Error | null | undefined) => void), ) { const state = this._writableState; - //TODO(Soremwar) - //Bring in encodings - let encoding: string; + let encoding: WritableEncodings; let cb: (error?: Error | null) => void; if (typeof x === "function") { @@ -933,8 +427,6 @@ class Writable extends Stream { } } - //TODO(Soremwar) - //Bring allowed encodings setDefaultEncoding(encoding: string) { // node::ParseEncoding() requires lower case. if (typeof encoding === "string") { @@ -943,10 +435,9 @@ class Writable extends Stream { if (!Buffer.isEncoding(encoding)) { throw new ERR_UNKNOWN_ENCODING(encoding); } - this._writableState.defaultEncoding = encoding; + this._writableState.defaultEncoding = encoding as WritableEncodings; return this; } } export default Writable; -export { WritableState }; |