summaryrefslogtreecommitdiff
path: root/std/node/_stream/writable.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/writable.ts')
-rw-r--r--std/node/_stream/writable.ts443
1 files changed, 0 insertions, 443 deletions
diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts
deleted file mode 100644
index 534fc22fb..000000000
--- a/std/node/_stream/writable.ts
+++ /dev/null
@@ -1,443 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Stream from "./stream.ts";
-import { captureRejectionSymbol } from "../events.ts";
-import {
- ERR_INVALID_ARG_TYPE,
- ERR_INVALID_OPT_VALUE,
- ERR_METHOD_NOT_IMPLEMENTED,
- ERR_STREAM_ALREADY_FINISHED,
- ERR_STREAM_CANNOT_PIPE,
- ERR_STREAM_DESTROYED,
- ERR_STREAM_NULL_VALUES,
- ERR_STREAM_WRITE_AFTER_END,
- ERR_UNKNOWN_ENCODING,
-} from "../_errors.ts";
-import type { AfterWriteTick, writeV } from "./writable_internal.ts";
-import {
- clearBuffer,
- destroy,
- errorBuffer,
- errorOrDestroy,
- finishMaybe,
- kOnFinished,
- nop,
- onwrite,
- resetBuffer,
- writeOrBuffer,
-} from "./writable_internal.ts";
-import type { Encodings } from "../_utils.ts";
-
-type WritableEncodings = Encodings | "buffer";
-
-export interface WritableOptions {
- autoDestroy?: boolean;
- decodeStrings?: boolean;
- defaultEncoding?: WritableEncodings;
- destroy?(
- this: Writable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ): void;
- emitClose?: boolean;
- final?(this: Writable, callback: (error?: Error | null) => void): void;
- highWaterMark?: number;
- objectMode?: boolean;
- write?(
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: WritableEncodings,
- callback: (error?: Error | null) => void,
- ): void;
- writev?(
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: string }>,
- callback: (error?: Error | null) => void,
- ): void;
-}
-
-export class WritableState {
- [kOnFinished]: Array<(error?: Error) => void> = [];
- afterWriteTickInfo: null | AfterWriteTick = null;
- allBuffers = true;
- allNoop = true;
- autoDestroy: boolean;
- buffered: Array<{
- allBuffers?: boolean;
- // deno-lint-ignore no-explicit-any
- chunk: any;
- encoding: string;
- callback: (error: Error) => void;
- }> = [];
- bufferedIndex = 0;
- bufferProcessing = false;
- closed = false;
- closeEmitted = false;
- constructed: boolean;
- corked = 0;
- decodeStrings: boolean;
- defaultEncoding: WritableEncodings;
- destroyed = false;
- emitClose: boolean;
- ended = false;
- ending = false;
- errored: Error | null = null;
- errorEmitted = false;
- finalCalled = false;
- finished = false;
- highWaterMark: number;
- length = 0;
- needDrain = false;
- objectMode: boolean;
- onwrite: (error?: Error | null) => void;
- pendingcb = 0;
- prefinished = false;
- sync = true;
- writecb: null | ((error: Error) => void) = null;
- writable = true;
- writelen = 0;
- writing = false;
-
- constructor(options: WritableOptions | undefined, stream: Writable) {
- this.objectMode = !!options?.objectMode;
-
- this.highWaterMark = options?.highWaterMark ??
- (this.objectMode ? 16 : 16 * 1024);
-
- if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) {
- this.highWaterMark = Math.floor(this.highWaterMark);
- } else {
- throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark);
- }
-
- this.decodeStrings = !options?.decodeStrings === false;
-
- this.defaultEncoding = options?.defaultEncoding || "utf8";
-
- this.onwrite = onwrite.bind(undefined, stream);
-
- resetBuffer(this);
-
- this.emitClose = options?.emitClose ?? true;
- this.autoDestroy = options?.autoDestroy ?? true;
- this.constructed = true;
- }
-
- getBuffer() {
- return this.buffered.slice(this.bufferedIndex);
- }
-
- get bufferedRequestCount() {
- return this.buffered.length - this.bufferedIndex;
- }
-}
-
-/** A bit simpler than readable streams.
-* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all
-* the drain event emission and buffering.
-*/
-class Writable extends Stream {
- _final?: (
- this: Writable,
- callback: (error?: Error | null | undefined) => void,
- ) => void;
- _writableState: WritableState;
- _writev?: writeV | null = null;
-
- constructor(options?: WritableOptions) {
- super();
- this._writableState = new WritableState(options, this);
-
- if (options) {
- if (typeof options.write === "function") {
- this._write = options.write;
- }
-
- if (typeof options.writev === "function") {
- this._writev = options.writev;
- }
-
- if (typeof options.destroy === "function") {
- this._destroy = options.destroy;
- }
-
- if (typeof options.final === "function") {
- this._final = options.final;
- }
- }
- }
-
- [captureRejectionSymbol](err?: Error) {
- this.destroy(err);
- }
-
- static WritableState = WritableState;
-
- get destroyed() {
- return this._writableState ? this._writableState.destroyed : false;
- }
-
- set destroyed(value) {
- if (this._writableState) {
- this._writableState.destroyed = value;
- }
- }
-
- get writable() {
- const w = this._writableState;
- return !w.destroyed && !w.errored && !w.ending && !w.ended;
- }
-
- set writable(val) {
- if (this._writableState) {
- this._writableState.writable = !!val;
- }
- }
-
- get writableFinished() {
- return this._writableState ? this._writableState.finished : false;
- }
-
- get writableObjectMode() {
- return this._writableState ? this._writableState.objectMode : false;
- }
-
- get writableBuffer() {
- return this._writableState && this._writableState.getBuffer();
- }
-
- get writableEnded() {
- return this._writableState ? this._writableState.ending : false;
- }
-
- get writableHighWaterMark() {
- return this._writableState && this._writableState.highWaterMark;
- }
-
- get writableCorked() {
- return this._writableState ? this._writableState.corked : 0;
- }
-
- get writableLength() {
- return this._writableState && this._writableState.length;
- }
-
- _undestroy() {
- const w = this._writableState;
- w.constructed = true;
- w.destroyed = false;
- w.closed = false;
- w.closeEmitted = false;
- w.errored = null;
- w.errorEmitted = false;
- w.ended = false;
- w.ending = false;
- w.finalCalled = false;
- w.prefinished = false;
- w.finished = false;
- }
-
- _destroy(err: Error | null, cb: (error?: Error | null) => void) {
- cb(err);
- }
-
- destroy(err?: Error | null, cb?: () => void) {
- const state = this._writableState;
- if (!state.destroyed) {
- queueMicrotask(() => errorBuffer(state));
- }
- destroy.call(this, err, cb);
- return this;
- }
-
- end(cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, encoding: WritableEncodings, cb?: () => void): void;
-
- end(
- // deno-lint-ignore no-explicit-any
- x?: any | (() => void),
- y?: WritableEncodings | (() => void),
- z?: () => void,
- ) {
- const state = this._writableState;
- // deno-lint-ignore no-explicit-any
- let chunk: any | null;
- let encoding: WritableEncodings | null;
- let cb: undefined | ((error?: Error) => void);
-
- if (typeof x === "function") {
- chunk = null;
- encoding = null;
- cb = x;
- } else if (typeof y === "function") {
- chunk = x;
- encoding = null;
- cb = y;
- } else {
- chunk = x;
- encoding = y as WritableEncodings;
- cb = z;
- }
-
- if (chunk !== null && chunk !== undefined) {
- this.write(chunk, encoding);
- }
-
- if (state.corked) {
- state.corked = 1;
- this.uncork();
- }
-
- let err: Error | undefined;
- if (!state.errored && !state.ending) {
- state.ending = true;
- finishMaybe(this, state, true);
- state.ended = true;
- } else if (state.finished) {
- err = new ERR_STREAM_ALREADY_FINISHED("end");
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("end");
- }
-
- if (typeof cb === "function") {
- if (err || state.finished) {
- queueMicrotask(() => {
- (cb as (error?: Error | undefined) => void)(err);
- });
- } else {
- state[kOnFinished].push(cb);
- }
- }
-
- return this;
- }
-
- _write(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- cb: (error?: Error | null) => void,
- ): void {
- if (this._writev) {
- this._writev([{ chunk, encoding }], cb);
- } else {
- throw new ERR_METHOD_NOT_IMPLEMENTED("_write()");
- }
- }
-
- //This signature was changed to keep inheritance coherent
- pipe(dest: Writable): Writable {
- errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
- return dest;
- }
-
- // deno-lint-ignore no-explicit-any
- write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean;
- write(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: WritableEncodings | null,
- cb?: (error: Error | null | undefined) => void,
- ): boolean;
-
- write(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- x?: WritableEncodings | null | ((error: Error | null | undefined) => void),
- y?: ((error: Error | null | undefined) => void),
- ) {
- const state = this._writableState;
- let encoding: WritableEncodings;
- let cb: (error?: Error | null) => void;
-
- if (typeof x === "function") {
- cb = x;
- encoding = state.defaultEncoding;
- } else {
- if (!x) {
- encoding = state.defaultEncoding;
- } else if (x !== "buffer" && !Buffer.isEncoding(x)) {
- throw new ERR_UNKNOWN_ENCODING(x);
- } else {
- encoding = x;
- }
- if (typeof y !== "function") {
- cb = nop;
- } else {
- cb = y;
- }
- }
-
- if (chunk === null) {
- throw new ERR_STREAM_NULL_VALUES();
- } else if (!state.objectMode) {
- if (typeof chunk === "string") {
- if (state.decodeStrings !== false) {
- chunk = Buffer.from(chunk, encoding);
- encoding = "buffer";
- }
- } else if (chunk instanceof Buffer) {
- encoding = "buffer";
- } else if (Stream._isUint8Array(chunk)) {
- chunk = Stream._uint8ArrayToBuffer(chunk);
- encoding = "buffer";
- } else {
- throw new ERR_INVALID_ARG_TYPE(
- "chunk",
- ["string", "Buffer", "Uint8Array"],
- chunk,
- );
- }
- }
-
- let err: Error | undefined;
- if (state.ending) {
- err = new ERR_STREAM_WRITE_AFTER_END();
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("write");
- }
-
- if (err) {
- queueMicrotask(() => cb(err));
- errorOrDestroy(this, err, true);
- return false;
- }
- state.pendingcb++;
- return writeOrBuffer(this, state, chunk, encoding, cb);
- }
-
- cork() {
- this._writableState.corked++;
- }
-
- uncork() {
- const state = this._writableState;
-
- if (state.corked) {
- state.corked--;
-
- if (!state.writing) {
- clearBuffer(this, state);
- }
- }
- }
-
- setDefaultEncoding(encoding: string) {
- // node::ParseEncoding() requires lower case.
- if (typeof encoding === "string") {
- encoding = encoding.toLowerCase();
- }
- if (!Buffer.isEncoding(encoding)) {
- throw new ERR_UNKNOWN_ENCODING(encoding);
- }
- this._writableState.defaultEncoding = encoding as WritableEncodings;
- return this;
- }
-}
-
-export default Writable;