diff options
Diffstat (limited to 'std/node/_stream/writable.ts')
-rw-r--r-- | std/node/_stream/writable.ts | 443 |
1 files changed, 0 insertions, 443 deletions
diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts deleted file mode 100644 index 534fc22fb..000000000 --- a/std/node/_stream/writable.ts +++ /dev/null @@ -1,443 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import Stream from "./stream.ts"; -import { captureRejectionSymbol } from "../events.ts"; -import { - ERR_INVALID_ARG_TYPE, - ERR_INVALID_OPT_VALUE, - ERR_METHOD_NOT_IMPLEMENTED, - ERR_STREAM_ALREADY_FINISHED, - ERR_STREAM_CANNOT_PIPE, - ERR_STREAM_DESTROYED, - ERR_STREAM_NULL_VALUES, - ERR_STREAM_WRITE_AFTER_END, - ERR_UNKNOWN_ENCODING, -} from "../_errors.ts"; -import type { AfterWriteTick, writeV } from "./writable_internal.ts"; -import { - clearBuffer, - destroy, - errorBuffer, - errorOrDestroy, - finishMaybe, - kOnFinished, - nop, - onwrite, - resetBuffer, - writeOrBuffer, -} from "./writable_internal.ts"; -import type { Encodings } from "../_utils.ts"; - -type WritableEncodings = Encodings | "buffer"; - -export interface WritableOptions { - autoDestroy?: boolean; - decodeStrings?: boolean; - defaultEncoding?: WritableEncodings; - destroy?( - this: Writable, - error: Error | null, - callback: (error: Error | null) => void, - ): void; - emitClose?: boolean; - final?(this: Writable, callback: (error?: Error | null) => void): void; - highWaterMark?: number; - objectMode?: boolean; - write?( - this: Writable, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: WritableEncodings, - callback: (error?: Error | null) => void, - ): void; - writev?( - this: Writable, - // deno-lint-ignore no-explicit-any - chunks: Array<{ chunk: any; encoding: string }>, - callback: (error?: Error | null) => void, - ): void; -} - -export class WritableState { - [kOnFinished]: Array<(error?: Error) => void> = []; - afterWriteTickInfo: null | AfterWriteTick = null; - allBuffers = true; - allNoop = true; - autoDestroy: boolean; - buffered: Array<{ - allBuffers?: boolean; - // deno-lint-ignore no-explicit-any - chunk: any; - encoding: string; - callback: (error: Error) => void; - }> = []; - bufferedIndex = 0; - bufferProcessing = false; - closed = false; - closeEmitted = false; - constructed: boolean; - corked = 0; - decodeStrings: boolean; - defaultEncoding: WritableEncodings; - destroyed = false; - emitClose: boolean; - ended = false; - ending = false; - errored: Error | null = null; - errorEmitted = false; - finalCalled = false; - finished = false; - highWaterMark: number; - length = 0; - needDrain = false; - objectMode: boolean; - onwrite: (error?: Error | null) => void; - pendingcb = 0; - prefinished = false; - sync = true; - writecb: null | ((error: Error) => void) = null; - writable = true; - writelen = 0; - writing = false; - - constructor(options: WritableOptions | undefined, stream: Writable) { - this.objectMode = !!options?.objectMode; - - this.highWaterMark = options?.highWaterMark ?? - (this.objectMode ? 16 : 16 * 1024); - - if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) { - this.highWaterMark = Math.floor(this.highWaterMark); - } else { - throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark); - } - - this.decodeStrings = !options?.decodeStrings === false; - - this.defaultEncoding = options?.defaultEncoding || "utf8"; - - this.onwrite = onwrite.bind(undefined, stream); - - resetBuffer(this); - - this.emitClose = options?.emitClose ?? true; - this.autoDestroy = options?.autoDestroy ?? true; - this.constructed = true; - } - - getBuffer() { - return this.buffered.slice(this.bufferedIndex); - } - - get bufferedRequestCount() { - return this.buffered.length - this.bufferedIndex; - } -} - -/** A bit simpler than readable streams. -* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all -* the drain event emission and buffering. -*/ -class Writable extends Stream { - _final?: ( - this: Writable, - callback: (error?: Error | null | undefined) => void, - ) => void; - _writableState: WritableState; - _writev?: writeV | null = null; - - constructor(options?: WritableOptions) { - super(); - this._writableState = new WritableState(options, this); - - if (options) { - if (typeof options.write === "function") { - this._write = options.write; - } - - if (typeof options.writev === "function") { - this._writev = options.writev; - } - - if (typeof options.destroy === "function") { - this._destroy = options.destroy; - } - - if (typeof options.final === "function") { - this._final = options.final; - } - } - } - - [captureRejectionSymbol](err?: Error) { - this.destroy(err); - } - - static WritableState = WritableState; - - get destroyed() { - return this._writableState ? this._writableState.destroyed : false; - } - - set destroyed(value) { - if (this._writableState) { - this._writableState.destroyed = value; - } - } - - get writable() { - const w = this._writableState; - return !w.destroyed && !w.errored && !w.ending && !w.ended; - } - - set writable(val) { - if (this._writableState) { - this._writableState.writable = !!val; - } - } - - get writableFinished() { - return this._writableState ? this._writableState.finished : false; - } - - get writableObjectMode() { - return this._writableState ? this._writableState.objectMode : false; - } - - get writableBuffer() { - return this._writableState && this._writableState.getBuffer(); - } - - get writableEnded() { - return this._writableState ? this._writableState.ending : false; - } - - get writableHighWaterMark() { - return this._writableState && this._writableState.highWaterMark; - } - - get writableCorked() { - return this._writableState ? this._writableState.corked : 0; - } - - get writableLength() { - return this._writableState && this._writableState.length; - } - - _undestroy() { - const w = this._writableState; - w.constructed = true; - w.destroyed = false; - w.closed = false; - w.closeEmitted = false; - w.errored = null; - w.errorEmitted = false; - w.ended = false; - w.ending = false; - w.finalCalled = false; - w.prefinished = false; - w.finished = false; - } - - _destroy(err: Error | null, cb: (error?: Error | null) => void) { - cb(err); - } - - destroy(err?: Error | null, cb?: () => void) { - const state = this._writableState; - if (!state.destroyed) { - queueMicrotask(() => errorBuffer(state)); - } - destroy.call(this, err, cb); - return this; - } - - end(cb?: () => void): void; - // deno-lint-ignore no-explicit-any - end(chunk: any, cb?: () => void): void; - // deno-lint-ignore no-explicit-any - end(chunk: any, encoding: WritableEncodings, cb?: () => void): void; - - end( - // deno-lint-ignore no-explicit-any - x?: any | (() => void), - y?: WritableEncodings | (() => void), - z?: () => void, - ) { - const state = this._writableState; - // deno-lint-ignore no-explicit-any - let chunk: any | null; - let encoding: WritableEncodings | null; - let cb: undefined | ((error?: Error) => void); - - if (typeof x === "function") { - chunk = null; - encoding = null; - cb = x; - } else if (typeof y === "function") { - chunk = x; - encoding = null; - cb = y; - } else { - chunk = x; - encoding = y as WritableEncodings; - cb = z; - } - - if (chunk !== null && chunk !== undefined) { - this.write(chunk, encoding); - } - - if (state.corked) { - state.corked = 1; - this.uncork(); - } - - let err: Error | undefined; - if (!state.errored && !state.ending) { - state.ending = true; - finishMaybe(this, state, true); - state.ended = true; - } else if (state.finished) { - err = new ERR_STREAM_ALREADY_FINISHED("end"); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED("end"); - } - - if (typeof cb === "function") { - if (err || state.finished) { - queueMicrotask(() => { - (cb as (error?: Error | undefined) => void)(err); - }); - } else { - state[kOnFinished].push(cb); - } - } - - return this; - } - - _write( - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - cb: (error?: Error | null) => void, - ): void { - if (this._writev) { - this._writev([{ chunk, encoding }], cb); - } else { - throw new ERR_METHOD_NOT_IMPLEMENTED("_write()"); - } - } - - //This signature was changed to keep inheritance coherent - pipe(dest: Writable): Writable { - errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); - return dest; - } - - // deno-lint-ignore no-explicit-any - write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean; - write( - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: WritableEncodings | null, - cb?: (error: Error | null | undefined) => void, - ): boolean; - - write( - // deno-lint-ignore no-explicit-any - chunk: any, - x?: WritableEncodings | null | ((error: Error | null | undefined) => void), - y?: ((error: Error | null | undefined) => void), - ) { - const state = this._writableState; - let encoding: WritableEncodings; - let cb: (error?: Error | null) => void; - - if (typeof x === "function") { - cb = x; - encoding = state.defaultEncoding; - } else { - if (!x) { - encoding = state.defaultEncoding; - } else if (x !== "buffer" && !Buffer.isEncoding(x)) { - throw new ERR_UNKNOWN_ENCODING(x); - } else { - encoding = x; - } - if (typeof y !== "function") { - cb = nop; - } else { - cb = y; - } - } - - if (chunk === null) { - throw new ERR_STREAM_NULL_VALUES(); - } else if (!state.objectMode) { - if (typeof chunk === "string") { - if (state.decodeStrings !== false) { - chunk = Buffer.from(chunk, encoding); - encoding = "buffer"; - } - } else if (chunk instanceof Buffer) { - encoding = "buffer"; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); - encoding = "buffer"; - } else { - throw new ERR_INVALID_ARG_TYPE( - "chunk", - ["string", "Buffer", "Uint8Array"], - chunk, - ); - } - } - - let err: Error | undefined; - if (state.ending) { - err = new ERR_STREAM_WRITE_AFTER_END(); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED("write"); - } - - if (err) { - queueMicrotask(() => cb(err)); - errorOrDestroy(this, err, true); - return false; - } - state.pendingcb++; - return writeOrBuffer(this, state, chunk, encoding, cb); - } - - cork() { - this._writableState.corked++; - } - - uncork() { - const state = this._writableState; - - if (state.corked) { - state.corked--; - - if (!state.writing) { - clearBuffer(this, state); - } - } - } - - setDefaultEncoding(encoding: string) { - // node::ParseEncoding() requires lower case. - if (typeof encoding === "string") { - encoding = encoding.toLowerCase(); - } - if (!Buffer.isEncoding(encoding)) { - throw new ERR_UNKNOWN_ENCODING(encoding); - } - this._writableState.defaultEncoding = encoding as WritableEncodings; - return this; - } -} - -export default Writable; |