diff options
Diffstat (limited to 'std/node/_stream/writable.ts')
-rw-r--r-- | std/node/_stream/writable.ts | 952 |
1 files changed, 952 insertions, 0 deletions
diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts new file mode 100644 index 000000000..158af8325 --- /dev/null +++ b/std/node/_stream/writable.ts @@ -0,0 +1,952 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import Stream from "./stream.ts"; +import { captureRejectionSymbol } from "../events.ts"; +import { kConstruct, kDestroy } from "./symbols.ts"; +import { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_OPT_VALUE, + ERR_METHOD_NOT_IMPLEMENTED, + ERR_MULTIPLE_CALLBACK, + ERR_STREAM_ALREADY_FINISHED, + ERR_STREAM_CANNOT_PIPE, + ERR_STREAM_DESTROYED, + ERR_STREAM_NULL_VALUES, + ERR_STREAM_WRITE_AFTER_END, + ERR_UNKNOWN_ENCODING, +} from "../_errors.ts"; + +function nop() {} + +//TODO(Soremwar) +//Bring in encodings +type write_v = ( + // deno-lint-ignore no-explicit-any + chunks: Array<{ chunk: any; encoding: string }>, + callback: (error?: Error | null) => void, +) => void; + +type AfterWriteTick = { + cb: (error?: Error) => void; + count: number; + state: WritableState; + stream: Writable; +}; + +const kOnFinished = Symbol("kOnFinished"); + +function destroy(this: Writable, err?: Error, cb?: () => void) { + const w = this._writableState; + + if (w.destroyed) { + if (typeof cb === "function") { + cb(); + } + + return this; + } + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + } + + w.destroyed = true; + + if (!w.constructed) { + this.once(kDestroy, (er) => { + _destroy(this, err || er, cb); + }); + } else { + _destroy(this, err, cb); + } + + return this; +} + +function _destroy( + self: Writable, + err?: Error, + cb?: (error?: Error | null) => void, +) { + self._destroy(err || null, (err) => { + const w = self._writableState; + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + } + + w.closed = true; + + if (typeof cb === "function") { + cb(err); + } + + if (err) { + queueMicrotask(() => { + if (!w.errorEmitted) { + w.errorEmitted = true; + self.emit("error", err); + } + w.closeEmitted = true; + if (w.emitClose) { + self.emit("close"); + } + }); + } else { + queueMicrotask(() => { + w.closeEmitted = true; + if (w.emitClose) { + self.emit("close"); + } + }); + } + }); +} + +function errorOrDestroy(stream: Writable, err: Error, sync = false) { + const w = stream._writableState; + + if (w.destroyed) { + return stream; + } + + if (w.autoDestroy) { + stream.destroy(err); + } else if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + if (sync) { + queueMicrotask(() => { + if (w.errorEmitted) { + return; + } + w.errorEmitted = true; + stream.emit("error", err); + }); + } else { + if (w.errorEmitted) { + return; + } + w.errorEmitted = true; + stream.emit("error", err); + } + } +} + +function construct(stream: Writable, cb: (error: Error) => void) { + if (!stream._construct) { + return; + } + + stream.once(kConstruct, cb); + const w = stream._writableState; + + w.constructed = false; + + queueMicrotask(() => { + let called = false; + stream._construct?.((err) => { + w.constructed = true; + + if (called) { + err = new ERR_MULTIPLE_CALLBACK(); + } else { + called = true; + } + + if (w.destroyed) { + stream.emit(kDestroy, err); + } else if (err) { + errorOrDestroy(stream, err, true); + } else { + queueMicrotask(() => { + stream.emit(kConstruct); + }); + } + }); + }); +} + +//TODO(Soremwar) +//Bring encodings in +function writeOrBuffer( + stream: Writable, + state: WritableState, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + callback: (error: Error) => void, +) { + const len = state.objectMode ? 1 : chunk.length; + + state.length += len; + + if (state.writing || state.corked || state.errored || !state.constructed) { + state.buffered.push({ chunk, encoding, callback }); + if (state.allBuffers && encoding !== "buffer") { + state.allBuffers = false; + } + if (state.allNoop && callback !== nop) { + state.allNoop = false; + } + } else { + state.writelen = len; + state.writecb = callback; + state.writing = true; + state.sync = true; + stream._write(chunk, encoding, state.onwrite); + state.sync = false; + } + + const ret = state.length < state.highWaterMark; + + if (!ret) { + state.needDrain = true; + } + + return ret && !state.errored && !state.destroyed; +} + +//TODO(Soremwar) +//Bring encodings in +function doWrite( + stream: Writable, + state: WritableState, + writev: boolean, + len: number, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + cb: (error: Error) => void, +) { + state.writelen = len; + state.writecb = cb; + state.writing = true; + state.sync = true; + if (state.destroyed) { + state.onwrite(new ERR_STREAM_DESTROYED("write")); + } else if (writev) { + (stream._writev as unknown as write_v)(chunk, state.onwrite); + } else { + stream._write(chunk, encoding, state.onwrite); + } + state.sync = false; +} + +function onwriteError( + stream: Writable, + state: WritableState, + er: Error, + cb: (error: Error) => void, +) { + --state.pendingcb; + + cb(er); + errorBuffer(state); + errorOrDestroy(stream, er); +} + +function onwrite(stream: Writable, er?: Error | null) { + const state = stream._writableState; + const sync = state.sync; + const cb = state.writecb; + + if (typeof cb !== "function") { + errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); + return; + } + + state.writing = false; + state.writecb = null; + state.length -= state.writelen; + state.writelen = 0; + + if (er) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + er.stack; + + if (!state.errored) { + state.errored = er; + } + + if (sync) { + queueMicrotask(() => onwriteError(stream, state, er, cb)); + } else { + onwriteError(stream, state, er, cb); + } + } else { + if (state.buffered.length > state.bufferedIndex) { + clearBuffer(stream, state); + } + + if (sync) { + if ( + state.afterWriteTickInfo !== null && + state.afterWriteTickInfo.cb === cb + ) { + state.afterWriteTickInfo.count++; + } else { + state.afterWriteTickInfo = { + count: 1, + cb: (cb as (error?: Error) => void), + stream, + state, + }; + queueMicrotask(() => + afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) + ); + } + } else { + afterWrite(stream, state, 1, cb as (error?: Error) => void); + } + } +} + +function afterWriteTick({ + cb, + count, + state, + stream, +}: AfterWriteTick) { + state.afterWriteTickInfo = null; + return afterWrite(stream, state, count, cb); +} + +function afterWrite( + stream: Writable, + state: WritableState, + count: number, + cb: (error?: Error) => void, +) { + const needDrain = !state.ending && !stream.destroyed && state.length === 0 && + state.needDrain; + if (needDrain) { + state.needDrain = false; + stream.emit("drain"); + } + + while (count-- > 0) { + state.pendingcb--; + cb(); + } + + if (state.destroyed) { + errorBuffer(state); + } + + finishMaybe(stream, state); +} + +/** If there's something in the buffer waiting, then invoke callbacks.*/ +function errorBuffer(state: WritableState) { + if (state.writing) { + return; + } + + for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { + const { chunk, callback } = state.buffered[n]; + const len = state.objectMode ? 1 : chunk.length; + state.length -= len; + callback(new ERR_STREAM_DESTROYED("write")); + } + + for (const callback of state[kOnFinished].splice(0)) { + callback(new ERR_STREAM_DESTROYED("end")); + } + + resetBuffer(state); +} + +/** If there's something in the buffer waiting, then process it.*/ +function clearBuffer(stream: Writable, state: WritableState) { + if ( + state.corked || + state.bufferProcessing || + state.destroyed || + !state.constructed + ) { + return; + } + + const { buffered, bufferedIndex, objectMode } = state; + const bufferedLength = buffered.length - bufferedIndex; + + if (!bufferedLength) { + return; + } + + const i = bufferedIndex; + + state.bufferProcessing = true; + if (bufferedLength > 1 && stream._writev) { + state.pendingcb -= bufferedLength - 1; + + const callback = state.allNoop ? nop : (err: Error) => { + for (let n = i; n < buffered.length; ++n) { + buffered[n].callback(err); + } + }; + const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); + + doWrite(stream, state, true, state.length, chunks, "", callback); + + resetBuffer(state); + } else { + do { + const { chunk, encoding, callback } = buffered[i]; + const len = objectMode ? 1 : chunk.length; + doWrite(stream, state, false, len, chunk, encoding, callback); + } while (i < buffered.length && !state.writing); + + if (i === buffered.length) { + resetBuffer(state); + } else if (i > 256) { + buffered.splice(0, i); + state.bufferedIndex = 0; + } else { + state.bufferedIndex = i; + } + } + state.bufferProcessing = false; +} + +function finish(stream: Writable, state: WritableState) { + state.pendingcb--; + if (state.errorEmitted || state.closeEmitted) { + return; + } + + state.finished = true; + + for (const callback of state[kOnFinished].splice(0)) { + callback(); + } + + stream.emit("finish"); + + if (state.autoDestroy) { + stream.destroy(); + } +} + +function finishMaybe(stream: Writable, state: WritableState, sync?: boolean) { + if (needFinish(state)) { + prefinish(stream, state); + if (state.pendingcb === 0 && needFinish(state)) { + state.pendingcb++; + if (sync) { + queueMicrotask(() => finish(stream, state)); + } else { + finish(stream, state); + } + } + } +} + +function prefinish(stream: Writable, state: WritableState) { + if (!state.prefinished && !state.finalCalled) { + if (typeof stream._final === "function" && !state.destroyed) { + state.finalCalled = true; + + state.sync = true; + state.pendingcb++; + stream._final((err) => { + state.pendingcb--; + if (err) { + for (const callback of state[kOnFinished].splice(0)) { + callback(err); + } + errorOrDestroy(stream, err, state.sync); + } else if (needFinish(state)) { + state.prefinished = true; + stream.emit("prefinish"); + state.pendingcb++; + queueMicrotask(() => finish(stream, state)); + } + }); + state.sync = false; + } else { + state.prefinished = true; + stream.emit("prefinish"); + } + } +} + +function needFinish(state: WritableState) { + return (state.ending && + state.constructed && + state.length === 0 && + !state.errored && + state.buffered.length === 0 && + !state.finished && + !state.writing); +} + +interface WritableOptions { + autoDestroy?: boolean; + decodeStrings?: boolean; + //TODO(Soremwar) + //Bring encodings in + defaultEncoding?: string; + destroy?( + this: Writable, + error: Error | null, + callback: (error: Error | null) => void, + ): void; + emitClose?: boolean; + final?(this: Writable, callback: (error?: Error | null) => void): void; + highWaterMark?: number; + objectMode?: boolean; + //TODO(Soremwar) + //Bring encodings in + write?( + this: Writable, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + callback: (error?: Error | null) => void, + ): void; + //TODO(Soremwar) + //Bring encodings in + writev?( + this: Writable, + // deno-lint-ignore no-explicit-any + chunks: Array<{ chunk: any; encoding: string }>, + callback: (error?: Error | null) => void, + ): void; +} + +class WritableState { + [kOnFinished]: Array<(error?: Error) => void> = []; + afterWriteTickInfo: null | AfterWriteTick = null; + allBuffers = true; + allNoop = true; + autoDestroy: boolean; + //TODO(Soremwar) + //Bring in encodings + buffered: Array<{ + allBuffers?: boolean; + // deno-lint-ignore no-explicit-any + chunk: any; + encoding: string; + callback: (error: Error) => void; + }> = []; + bufferedIndex = 0; + bufferProcessing = false; + closed = false; + closeEmitted = false; + constructed: boolean; + corked = 0; + decodeStrings: boolean; + defaultEncoding: string; + destroyed = false; + emitClose: boolean; + ended = false; + ending = false; + errored: Error | null = null; + errorEmitted = false; + finalCalled = false; + finished = false; + highWaterMark: number; + length = 0; + needDrain = false; + objectMode: boolean; + onwrite: (error?: Error | null) => void; + pendingcb = 0; + prefinished = false; + sync = true; + writecb: null | ((error: Error) => void) = null; + writable = true; + writelen = 0; + writing = false; + + constructor(options: WritableOptions | undefined, stream: Writable) { + this.objectMode = !!options?.objectMode; + + this.highWaterMark = options?.highWaterMark ?? + (this.objectMode ? 16 : 16 * 1024); + + if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) { + this.highWaterMark = Math.floor(this.highWaterMark); + } else { + throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark); + } + + this.decodeStrings = !options?.decodeStrings === false; + + this.defaultEncoding = options?.defaultEncoding || "utf8"; + + this.onwrite = onwrite.bind(undefined, stream); + + resetBuffer(this); + + this.emitClose = options?.emitClose ?? true; + this.autoDestroy = options?.autoDestroy ?? true; + this.constructed = true; + } + + getBuffer() { + return this.buffered.slice(this.bufferedIndex); + } + + get bufferedRequestCount() { + return this.buffered.length - this.bufferedIndex; + } +} + +function resetBuffer(state: WritableState) { + state.buffered = []; + state.bufferedIndex = 0; + state.allBuffers = true; + state.allNoop = true; +} + +/** A bit simpler than readable streams. +* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all +* the drain event emission and buffering. +*/ +class Writable extends Stream { + _construct?: (cb: (error?: Error) => void) => void; + _final?: ( + this: Writable, + callback: (error?: Error | null | undefined) => void, + ) => void; + _writableState: WritableState; + _writev?: write_v | null = null; + + constructor(options?: WritableOptions) { + super(); + this._writableState = new WritableState(options, this); + + if (options) { + if (typeof options.write === "function") { + this._write = options.write; + } + + if (typeof options.writev === "function") { + this._writev = options.writev; + } + + if (typeof options.destroy === "function") { + this._destroy = options.destroy; + } + + if (typeof options.final === "function") { + this._final = options.final; + } + } + + construct(this, () => { + const state = this._writableState; + + if (!state.writing) { + clearBuffer(this, state); + } + + finishMaybe(this, state); + }); + } + + [captureRejectionSymbol](err?: Error) { + this.destroy(err); + } + + static WritableState = WritableState; + + get destroyed() { + return this._writableState ? this._writableState.destroyed : false; + } + + set destroyed(value) { + if (this._writableState) { + this._writableState.destroyed = value; + } + } + + get writable() { + const w = this._writableState; + return !w.destroyed && !w.errored && !w.ending && !w.ended; + } + + set writable(val) { + if (this._writableState) { + this._writableState.writable = !!val; + } + } + + get writableFinished() { + return this._writableState ? this._writableState.finished : false; + } + + get writableObjectMode() { + return this._writableState ? this._writableState.objectMode : false; + } + + get writableBuffer() { + return this._writableState && this._writableState.getBuffer(); + } + + get writableEnded() { + return this._writableState ? this._writableState.ending : false; + } + + get writableHighWaterMark() { + return this._writableState && this._writableState.highWaterMark; + } + + get writableCorked() { + return this._writableState ? this._writableState.corked : 0; + } + + get writableLength() { + return this._writableState && this._writableState.length; + } + + _undestroy() { + const w = this._writableState; + w.constructed = true; + w.destroyed = false; + w.closed = false; + w.closeEmitted = false; + w.errored = null; + w.errorEmitted = false; + w.ended = false; + w.ending = false; + w.finalCalled = false; + w.prefinished = false; + w.finished = false; + } + + _destroy(err: Error | null, cb: (error?: Error | null) => void) { + cb(err); + } + + destroy(err?: Error, cb?: () => void) { + const state = this._writableState; + if (!state.destroyed) { + queueMicrotask(() => errorBuffer(state)); + } + destroy.call(this, err, cb); + return this; + } + + end(cb?: () => void): void; + // deno-lint-ignore no-explicit-any + end(chunk: any, cb?: () => void): void; + //TODO(Soremwar) + //Bring in encodings + // deno-lint-ignore no-explicit-any + end(chunk: any, encoding: string, cb?: () => void): void; + + end( + // deno-lint-ignore no-explicit-any + x?: any | (() => void), + //TODO(Soremwar) + //Bring in encodings + y?: string | (() => void), + z?: () => void, + ) { + const state = this._writableState; + // deno-lint-ignore no-explicit-any + let chunk: any | null; + //TODO(Soremwar) + //Bring in encodings + let encoding: string | null; + let cb: undefined | ((error?: Error) => void); + + if (typeof x === "function") { + chunk = null; + encoding = null; + cb = x; + } else if (typeof y === "function") { + chunk = x; + encoding = null; + cb = y; + } else { + chunk = x; + encoding = y as string; + cb = z; + } + + if (chunk !== null && chunk !== undefined) { + this.write(chunk, encoding); + } + + if (state.corked) { + state.corked = 1; + this.uncork(); + } + + let err: Error | undefined; + if (!state.errored && !state.ending) { + state.ending = true; + finishMaybe(this, state, true); + state.ended = true; + } else if (state.finished) { + err = new ERR_STREAM_ALREADY_FINISHED("end"); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED("end"); + } + + if (typeof cb === "function") { + if (err || state.finished) { + queueMicrotask(() => { + (cb as (error?: Error | undefined) => void)(err); + }); + } else { + state[kOnFinished].push(cb); + } + } + + return this; + } + + //TODO(Soremwar) + //Bring in encodings + _write( + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + cb: (error?: Error | null) => void, + ): void { + if (this._writev) { + this._writev([{ chunk, encoding }], cb); + } else { + throw new ERR_METHOD_NOT_IMPLEMENTED("_write()"); + } + } + + //This signature was changed to keep inheritance coherent + pipe(dest: Writable): Writable { + errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); + return dest; + } + + // deno-lint-ignore no-explicit-any + write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean; + //TODO(Soremwar) + //Bring in encodings + write( + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string | null, + cb?: (error: Error | null | undefined) => void, + ): boolean; + + //TODO(Soremwar) + //Bring in encodings + write( + // deno-lint-ignore no-explicit-any + chunk: any, + x?: string | null | ((error: Error | null | undefined) => void), + y?: ((error: Error | null | undefined) => void), + ) { + const state = this._writableState; + //TODO(Soremwar) + //Bring in encodings + let encoding: string; + let cb: (error?: Error | null) => void; + + if (typeof x === "function") { + cb = x; + encoding = state.defaultEncoding; + } else { + if (!x) { + encoding = state.defaultEncoding; + } else if (x !== "buffer" && !Buffer.isEncoding(x)) { + throw new ERR_UNKNOWN_ENCODING(x); + } else { + encoding = x; + } + if (typeof y !== "function") { + cb = nop; + } else { + cb = y; + } + } + + if (chunk === null) { + throw new ERR_STREAM_NULL_VALUES(); + } else if (!state.objectMode) { + if (typeof chunk === "string") { + if (state.decodeStrings !== false) { + chunk = Buffer.from(chunk, encoding); + encoding = "buffer"; + } + } else if (chunk instanceof Buffer) { + encoding = "buffer"; + } else if (Stream._isUint8Array(chunk)) { + chunk = Stream._uint8ArrayToBuffer(chunk); + encoding = "buffer"; + } else { + throw new ERR_INVALID_ARG_TYPE( + "chunk", + ["string", "Buffer", "Uint8Array"], + chunk, + ); + } + } + + let err: Error | undefined; + if (state.ending) { + err = new ERR_STREAM_WRITE_AFTER_END(); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED("write"); + } + + if (err) { + queueMicrotask(() => cb(err)); + errorOrDestroy(this, err, true); + return false; + } + state.pendingcb++; + return writeOrBuffer(this, state, chunk, encoding, cb); + } + + cork() { + this._writableState.corked++; + } + + uncork() { + const state = this._writableState; + + if (state.corked) { + state.corked--; + + if (!state.writing) { + clearBuffer(this, state); + } + } + } + + //TODO(Soremwar) + //Bring allowed encodings + setDefaultEncoding(encoding: string) { + // node::ParseEncoding() requires lower case. + if (typeof encoding === "string") { + encoding = encoding.toLowerCase(); + } + if (!Buffer.isEncoding(encoding)) { + throw new ERR_UNKNOWN_ENCODING(encoding); + } + this._writableState.defaultEncoding = encoding; + return this; + } +} + +export default Writable; +export { WritableState }; |