summaryrefslogtreecommitdiff
path: root/std/node/_stream/writable.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/writable.ts')
-rw-r--r--std/node/_stream/writable.ts952
1 files changed, 952 insertions, 0 deletions
diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts
new file mode 100644
index 000000000..158af8325
--- /dev/null
+++ b/std/node/_stream/writable.ts
@@ -0,0 +1,952 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import Stream from "./stream.ts";
+import { captureRejectionSymbol } from "../events.ts";
+import { kConstruct, kDestroy } from "./symbols.ts";
+import {
+ ERR_INVALID_ARG_TYPE,
+ ERR_INVALID_OPT_VALUE,
+ ERR_METHOD_NOT_IMPLEMENTED,
+ ERR_MULTIPLE_CALLBACK,
+ ERR_STREAM_ALREADY_FINISHED,
+ ERR_STREAM_CANNOT_PIPE,
+ ERR_STREAM_DESTROYED,
+ ERR_STREAM_NULL_VALUES,
+ ERR_STREAM_WRITE_AFTER_END,
+ ERR_UNKNOWN_ENCODING,
+} from "../_errors.ts";
+
+function nop() {}
+
+//TODO(Soremwar)
+//Bring in encodings
+type write_v = (
+ // deno-lint-ignore no-explicit-any
+ chunks: Array<{ chunk: any; encoding: string }>,
+ callback: (error?: Error | null) => void,
+) => void;
+
+type AfterWriteTick = {
+ cb: (error?: Error) => void;
+ count: number;
+ state: WritableState;
+ stream: Writable;
+};
+
+const kOnFinished = Symbol("kOnFinished");
+
+function destroy(this: Writable, err?: Error, cb?: () => void) {
+ const w = this._writableState;
+
+ if (w.destroyed) {
+ if (typeof cb === "function") {
+ cb();
+ }
+
+ return this;
+ }
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ }
+
+ w.destroyed = true;
+
+ if (!w.constructed) {
+ this.once(kDestroy, (er) => {
+ _destroy(this, err || er, cb);
+ });
+ } else {
+ _destroy(this, err, cb);
+ }
+
+ return this;
+}
+
+function _destroy(
+ self: Writable,
+ err?: Error,
+ cb?: (error?: Error | null) => void,
+) {
+ self._destroy(err || null, (err) => {
+ const w = self._writableState;
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ }
+
+ w.closed = true;
+
+ if (typeof cb === "function") {
+ cb(err);
+ }
+
+ if (err) {
+ queueMicrotask(() => {
+ if (!w.errorEmitted) {
+ w.errorEmitted = true;
+ self.emit("error", err);
+ }
+ w.closeEmitted = true;
+ if (w.emitClose) {
+ self.emit("close");
+ }
+ });
+ } else {
+ queueMicrotask(() => {
+ w.closeEmitted = true;
+ if (w.emitClose) {
+ self.emit("close");
+ }
+ });
+ }
+ });
+}
+
+function errorOrDestroy(stream: Writable, err: Error, sync = false) {
+ const w = stream._writableState;
+
+ if (w.destroyed) {
+ return stream;
+ }
+
+ if (w.autoDestroy) {
+ stream.destroy(err);
+ } else if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ if (sync) {
+ queueMicrotask(() => {
+ if (w.errorEmitted) {
+ return;
+ }
+ w.errorEmitted = true;
+ stream.emit("error", err);
+ });
+ } else {
+ if (w.errorEmitted) {
+ return;
+ }
+ w.errorEmitted = true;
+ stream.emit("error", err);
+ }
+ }
+}
+
+function construct(stream: Writable, cb: (error: Error) => void) {
+ if (!stream._construct) {
+ return;
+ }
+
+ stream.once(kConstruct, cb);
+ const w = stream._writableState;
+
+ w.constructed = false;
+
+ queueMicrotask(() => {
+ let called = false;
+ stream._construct?.((err) => {
+ w.constructed = true;
+
+ if (called) {
+ err = new ERR_MULTIPLE_CALLBACK();
+ } else {
+ called = true;
+ }
+
+ if (w.destroyed) {
+ stream.emit(kDestroy, err);
+ } else if (err) {
+ errorOrDestroy(stream, err, true);
+ } else {
+ queueMicrotask(() => {
+ stream.emit(kConstruct);
+ });
+ }
+ });
+ });
+}
+
+//TODO(Soremwar)
+//Bring encodings in
+function writeOrBuffer(
+ stream: Writable,
+ state: WritableState,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ callback: (error: Error) => void,
+) {
+ const len = state.objectMode ? 1 : chunk.length;
+
+ state.length += len;
+
+ if (state.writing || state.corked || state.errored || !state.constructed) {
+ state.buffered.push({ chunk, encoding, callback });
+ if (state.allBuffers && encoding !== "buffer") {
+ state.allBuffers = false;
+ }
+ if (state.allNoop && callback !== nop) {
+ state.allNoop = false;
+ }
+ } else {
+ state.writelen = len;
+ state.writecb = callback;
+ state.writing = true;
+ state.sync = true;
+ stream._write(chunk, encoding, state.onwrite);
+ state.sync = false;
+ }
+
+ const ret = state.length < state.highWaterMark;
+
+ if (!ret) {
+ state.needDrain = true;
+ }
+
+ return ret && !state.errored && !state.destroyed;
+}
+
+//TODO(Soremwar)
+//Bring encodings in
+function doWrite(
+ stream: Writable,
+ state: WritableState,
+ writev: boolean,
+ len: number,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ cb: (error: Error) => void,
+) {
+ state.writelen = len;
+ state.writecb = cb;
+ state.writing = true;
+ state.sync = true;
+ if (state.destroyed) {
+ state.onwrite(new ERR_STREAM_DESTROYED("write"));
+ } else if (writev) {
+ (stream._writev as unknown as write_v)(chunk, state.onwrite);
+ } else {
+ stream._write(chunk, encoding, state.onwrite);
+ }
+ state.sync = false;
+}
+
+function onwriteError(
+ stream: Writable,
+ state: WritableState,
+ er: Error,
+ cb: (error: Error) => void,
+) {
+ --state.pendingcb;
+
+ cb(er);
+ errorBuffer(state);
+ errorOrDestroy(stream, er);
+}
+
+function onwrite(stream: Writable, er?: Error | null) {
+ const state = stream._writableState;
+ const sync = state.sync;
+ const cb = state.writecb;
+
+ if (typeof cb !== "function") {
+ errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
+ return;
+ }
+
+ state.writing = false;
+ state.writecb = null;
+ state.length -= state.writelen;
+ state.writelen = 0;
+
+ if (er) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ er.stack;
+
+ if (!state.errored) {
+ state.errored = er;
+ }
+
+ if (sync) {
+ queueMicrotask(() => onwriteError(stream, state, er, cb));
+ } else {
+ onwriteError(stream, state, er, cb);
+ }
+ } else {
+ if (state.buffered.length > state.bufferedIndex) {
+ clearBuffer(stream, state);
+ }
+
+ if (sync) {
+ if (
+ state.afterWriteTickInfo !== null &&
+ state.afterWriteTickInfo.cb === cb
+ ) {
+ state.afterWriteTickInfo.count++;
+ } else {
+ state.afterWriteTickInfo = {
+ count: 1,
+ cb: (cb as (error?: Error) => void),
+ stream,
+ state,
+ };
+ queueMicrotask(() =>
+ afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
+ );
+ }
+ } else {
+ afterWrite(stream, state, 1, cb as (error?: Error) => void);
+ }
+ }
+}
+
+function afterWriteTick({
+ cb,
+ count,
+ state,
+ stream,
+}: AfterWriteTick) {
+ state.afterWriteTickInfo = null;
+ return afterWrite(stream, state, count, cb);
+}
+
+function afterWrite(
+ stream: Writable,
+ state: WritableState,
+ count: number,
+ cb: (error?: Error) => void,
+) {
+ const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
+ state.needDrain;
+ if (needDrain) {
+ state.needDrain = false;
+ stream.emit("drain");
+ }
+
+ while (count-- > 0) {
+ state.pendingcb--;
+ cb();
+ }
+
+ if (state.destroyed) {
+ errorBuffer(state);
+ }
+
+ finishMaybe(stream, state);
+}
+
+/** If there's something in the buffer waiting, then invoke callbacks.*/
+function errorBuffer(state: WritableState) {
+ if (state.writing) {
+ return;
+ }
+
+ for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
+ const { chunk, callback } = state.buffered[n];
+ const len = state.objectMode ? 1 : chunk.length;
+ state.length -= len;
+ callback(new ERR_STREAM_DESTROYED("write"));
+ }
+
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback(new ERR_STREAM_DESTROYED("end"));
+ }
+
+ resetBuffer(state);
+}
+
+/** If there's something in the buffer waiting, then process it.*/
+function clearBuffer(stream: Writable, state: WritableState) {
+ if (
+ state.corked ||
+ state.bufferProcessing ||
+ state.destroyed ||
+ !state.constructed
+ ) {
+ return;
+ }
+
+ const { buffered, bufferedIndex, objectMode } = state;
+ const bufferedLength = buffered.length - bufferedIndex;
+
+ if (!bufferedLength) {
+ return;
+ }
+
+ const i = bufferedIndex;
+
+ state.bufferProcessing = true;
+ if (bufferedLength > 1 && stream._writev) {
+ state.pendingcb -= bufferedLength - 1;
+
+ const callback = state.allNoop ? nop : (err: Error) => {
+ for (let n = i; n < buffered.length; ++n) {
+ buffered[n].callback(err);
+ }
+ };
+ const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
+
+ doWrite(stream, state, true, state.length, chunks, "", callback);
+
+ resetBuffer(state);
+ } else {
+ do {
+ const { chunk, encoding, callback } = buffered[i];
+ const len = objectMode ? 1 : chunk.length;
+ doWrite(stream, state, false, len, chunk, encoding, callback);
+ } while (i < buffered.length && !state.writing);
+
+ if (i === buffered.length) {
+ resetBuffer(state);
+ } else if (i > 256) {
+ buffered.splice(0, i);
+ state.bufferedIndex = 0;
+ } else {
+ state.bufferedIndex = i;
+ }
+ }
+ state.bufferProcessing = false;
+}
+
+function finish(stream: Writable, state: WritableState) {
+ state.pendingcb--;
+ if (state.errorEmitted || state.closeEmitted) {
+ return;
+ }
+
+ state.finished = true;
+
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback();
+ }
+
+ stream.emit("finish");
+
+ if (state.autoDestroy) {
+ stream.destroy();
+ }
+}
+
+function finishMaybe(stream: Writable, state: WritableState, sync?: boolean) {
+ if (needFinish(state)) {
+ prefinish(stream, state);
+ if (state.pendingcb === 0 && needFinish(state)) {
+ state.pendingcb++;
+ if (sync) {
+ queueMicrotask(() => finish(stream, state));
+ } else {
+ finish(stream, state);
+ }
+ }
+ }
+}
+
+function prefinish(stream: Writable, state: WritableState) {
+ if (!state.prefinished && !state.finalCalled) {
+ if (typeof stream._final === "function" && !state.destroyed) {
+ state.finalCalled = true;
+
+ state.sync = true;
+ state.pendingcb++;
+ stream._final((err) => {
+ state.pendingcb--;
+ if (err) {
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback(err);
+ }
+ errorOrDestroy(stream, err, state.sync);
+ } else if (needFinish(state)) {
+ state.prefinished = true;
+ stream.emit("prefinish");
+ state.pendingcb++;
+ queueMicrotask(() => finish(stream, state));
+ }
+ });
+ state.sync = false;
+ } else {
+ state.prefinished = true;
+ stream.emit("prefinish");
+ }
+ }
+}
+
+function needFinish(state: WritableState) {
+ return (state.ending &&
+ state.constructed &&
+ state.length === 0 &&
+ !state.errored &&
+ state.buffered.length === 0 &&
+ !state.finished &&
+ !state.writing);
+}
+
+interface WritableOptions {
+ autoDestroy?: boolean;
+ decodeStrings?: boolean;
+ //TODO(Soremwar)
+ //Bring encodings in
+ defaultEncoding?: string;
+ destroy?(
+ this: Writable,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ): void;
+ emitClose?: boolean;
+ final?(this: Writable, callback: (error?: Error | null) => void): void;
+ highWaterMark?: number;
+ objectMode?: boolean;
+ //TODO(Soremwar)
+ //Bring encodings in
+ write?(
+ this: Writable,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ callback: (error?: Error | null) => void,
+ ): void;
+ //TODO(Soremwar)
+ //Bring encodings in
+ writev?(
+ this: Writable,
+ // deno-lint-ignore no-explicit-any
+ chunks: Array<{ chunk: any; encoding: string }>,
+ callback: (error?: Error | null) => void,
+ ): void;
+}
+
+class WritableState {
+ [kOnFinished]: Array<(error?: Error) => void> = [];
+ afterWriteTickInfo: null | AfterWriteTick = null;
+ allBuffers = true;
+ allNoop = true;
+ autoDestroy: boolean;
+ //TODO(Soremwar)
+ //Bring in encodings
+ buffered: Array<{
+ allBuffers?: boolean;
+ // deno-lint-ignore no-explicit-any
+ chunk: any;
+ encoding: string;
+ callback: (error: Error) => void;
+ }> = [];
+ bufferedIndex = 0;
+ bufferProcessing = false;
+ closed = false;
+ closeEmitted = false;
+ constructed: boolean;
+ corked = 0;
+ decodeStrings: boolean;
+ defaultEncoding: string;
+ destroyed = false;
+ emitClose: boolean;
+ ended = false;
+ ending = false;
+ errored: Error | null = null;
+ errorEmitted = false;
+ finalCalled = false;
+ finished = false;
+ highWaterMark: number;
+ length = 0;
+ needDrain = false;
+ objectMode: boolean;
+ onwrite: (error?: Error | null) => void;
+ pendingcb = 0;
+ prefinished = false;
+ sync = true;
+ writecb: null | ((error: Error) => void) = null;
+ writable = true;
+ writelen = 0;
+ writing = false;
+
+ constructor(options: WritableOptions | undefined, stream: Writable) {
+ this.objectMode = !!options?.objectMode;
+
+ this.highWaterMark = options?.highWaterMark ??
+ (this.objectMode ? 16 : 16 * 1024);
+
+ if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) {
+ this.highWaterMark = Math.floor(this.highWaterMark);
+ } else {
+ throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark);
+ }
+
+ this.decodeStrings = !options?.decodeStrings === false;
+
+ this.defaultEncoding = options?.defaultEncoding || "utf8";
+
+ this.onwrite = onwrite.bind(undefined, stream);
+
+ resetBuffer(this);
+
+ this.emitClose = options?.emitClose ?? true;
+ this.autoDestroy = options?.autoDestroy ?? true;
+ this.constructed = true;
+ }
+
+ getBuffer() {
+ return this.buffered.slice(this.bufferedIndex);
+ }
+
+ get bufferedRequestCount() {
+ return this.buffered.length - this.bufferedIndex;
+ }
+}
+
+function resetBuffer(state: WritableState) {
+ state.buffered = [];
+ state.bufferedIndex = 0;
+ state.allBuffers = true;
+ state.allNoop = true;
+}
+
+/** A bit simpler than readable streams.
+* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all
+* the drain event emission and buffering.
+*/
+class Writable extends Stream {
+ _construct?: (cb: (error?: Error) => void) => void;
+ _final?: (
+ this: Writable,
+ callback: (error?: Error | null | undefined) => void,
+ ) => void;
+ _writableState: WritableState;
+ _writev?: write_v | null = null;
+
+ constructor(options?: WritableOptions) {
+ super();
+ this._writableState = new WritableState(options, this);
+
+ if (options) {
+ if (typeof options.write === "function") {
+ this._write = options.write;
+ }
+
+ if (typeof options.writev === "function") {
+ this._writev = options.writev;
+ }
+
+ if (typeof options.destroy === "function") {
+ this._destroy = options.destroy;
+ }
+
+ if (typeof options.final === "function") {
+ this._final = options.final;
+ }
+ }
+
+ construct(this, () => {
+ const state = this._writableState;
+
+ if (!state.writing) {
+ clearBuffer(this, state);
+ }
+
+ finishMaybe(this, state);
+ });
+ }
+
+ [captureRejectionSymbol](err?: Error) {
+ this.destroy(err);
+ }
+
+ static WritableState = WritableState;
+
+ get destroyed() {
+ return this._writableState ? this._writableState.destroyed : false;
+ }
+
+ set destroyed(value) {
+ if (this._writableState) {
+ this._writableState.destroyed = value;
+ }
+ }
+
+ get writable() {
+ const w = this._writableState;
+ return !w.destroyed && !w.errored && !w.ending && !w.ended;
+ }
+
+ set writable(val) {
+ if (this._writableState) {
+ this._writableState.writable = !!val;
+ }
+ }
+
+ get writableFinished() {
+ return this._writableState ? this._writableState.finished : false;
+ }
+
+ get writableObjectMode() {
+ return this._writableState ? this._writableState.objectMode : false;
+ }
+
+ get writableBuffer() {
+ return this._writableState && this._writableState.getBuffer();
+ }
+
+ get writableEnded() {
+ return this._writableState ? this._writableState.ending : false;
+ }
+
+ get writableHighWaterMark() {
+ return this._writableState && this._writableState.highWaterMark;
+ }
+
+ get writableCorked() {
+ return this._writableState ? this._writableState.corked : 0;
+ }
+
+ get writableLength() {
+ return this._writableState && this._writableState.length;
+ }
+
+ _undestroy() {
+ const w = this._writableState;
+ w.constructed = true;
+ w.destroyed = false;
+ w.closed = false;
+ w.closeEmitted = false;
+ w.errored = null;
+ w.errorEmitted = false;
+ w.ended = false;
+ w.ending = false;
+ w.finalCalled = false;
+ w.prefinished = false;
+ w.finished = false;
+ }
+
+ _destroy(err: Error | null, cb: (error?: Error | null) => void) {
+ cb(err);
+ }
+
+ destroy(err?: Error, cb?: () => void) {
+ const state = this._writableState;
+ if (!state.destroyed) {
+ queueMicrotask(() => errorBuffer(state));
+ }
+ destroy.call(this, err, cb);
+ return this;
+ }
+
+ end(cb?: () => void): void;
+ // deno-lint-ignore no-explicit-any
+ end(chunk: any, cb?: () => void): void;
+ //TODO(Soremwar)
+ //Bring in encodings
+ // deno-lint-ignore no-explicit-any
+ end(chunk: any, encoding: string, cb?: () => void): void;
+
+ end(
+ // deno-lint-ignore no-explicit-any
+ x?: any | (() => void),
+ //TODO(Soremwar)
+ //Bring in encodings
+ y?: string | (() => void),
+ z?: () => void,
+ ) {
+ const state = this._writableState;
+ // deno-lint-ignore no-explicit-any
+ let chunk: any | null;
+ //TODO(Soremwar)
+ //Bring in encodings
+ let encoding: string | null;
+ let cb: undefined | ((error?: Error) => void);
+
+ if (typeof x === "function") {
+ chunk = null;
+ encoding = null;
+ cb = x;
+ } else if (typeof y === "function") {
+ chunk = x;
+ encoding = null;
+ cb = y;
+ } else {
+ chunk = x;
+ encoding = y as string;
+ cb = z;
+ }
+
+ if (chunk !== null && chunk !== undefined) {
+ this.write(chunk, encoding);
+ }
+
+ if (state.corked) {
+ state.corked = 1;
+ this.uncork();
+ }
+
+ let err: Error | undefined;
+ if (!state.errored && !state.ending) {
+ state.ending = true;
+ finishMaybe(this, state, true);
+ state.ended = true;
+ } else if (state.finished) {
+ err = new ERR_STREAM_ALREADY_FINISHED("end");
+ } else if (state.destroyed) {
+ err = new ERR_STREAM_DESTROYED("end");
+ }
+
+ if (typeof cb === "function") {
+ if (err || state.finished) {
+ queueMicrotask(() => {
+ (cb as (error?: Error | undefined) => void)(err);
+ });
+ } else {
+ state[kOnFinished].push(cb);
+ }
+ }
+
+ return this;
+ }
+
+ //TODO(Soremwar)
+ //Bring in encodings
+ _write(
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ cb: (error?: Error | null) => void,
+ ): void {
+ if (this._writev) {
+ this._writev([{ chunk, encoding }], cb);
+ } else {
+ throw new ERR_METHOD_NOT_IMPLEMENTED("_write()");
+ }
+ }
+
+ //This signature was changed to keep inheritance coherent
+ pipe(dest: Writable): Writable {
+ errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
+ return dest;
+ }
+
+ // deno-lint-ignore no-explicit-any
+ write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean;
+ //TODO(Soremwar)
+ //Bring in encodings
+ write(
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string | null,
+ cb?: (error: Error | null | undefined) => void,
+ ): boolean;
+
+ //TODO(Soremwar)
+ //Bring in encodings
+ write(
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ x?: string | null | ((error: Error | null | undefined) => void),
+ y?: ((error: Error | null | undefined) => void),
+ ) {
+ const state = this._writableState;
+ //TODO(Soremwar)
+ //Bring in encodings
+ let encoding: string;
+ let cb: (error?: Error | null) => void;
+
+ if (typeof x === "function") {
+ cb = x;
+ encoding = state.defaultEncoding;
+ } else {
+ if (!x) {
+ encoding = state.defaultEncoding;
+ } else if (x !== "buffer" && !Buffer.isEncoding(x)) {
+ throw new ERR_UNKNOWN_ENCODING(x);
+ } else {
+ encoding = x;
+ }
+ if (typeof y !== "function") {
+ cb = nop;
+ } else {
+ cb = y;
+ }
+ }
+
+ if (chunk === null) {
+ throw new ERR_STREAM_NULL_VALUES();
+ } else if (!state.objectMode) {
+ if (typeof chunk === "string") {
+ if (state.decodeStrings !== false) {
+ chunk = Buffer.from(chunk, encoding);
+ encoding = "buffer";
+ }
+ } else if (chunk instanceof Buffer) {
+ encoding = "buffer";
+ } else if (Stream._isUint8Array(chunk)) {
+ chunk = Stream._uint8ArrayToBuffer(chunk);
+ encoding = "buffer";
+ } else {
+ throw new ERR_INVALID_ARG_TYPE(
+ "chunk",
+ ["string", "Buffer", "Uint8Array"],
+ chunk,
+ );
+ }
+ }
+
+ let err: Error | undefined;
+ if (state.ending) {
+ err = new ERR_STREAM_WRITE_AFTER_END();
+ } else if (state.destroyed) {
+ err = new ERR_STREAM_DESTROYED("write");
+ }
+
+ if (err) {
+ queueMicrotask(() => cb(err));
+ errorOrDestroy(this, err, true);
+ return false;
+ }
+ state.pendingcb++;
+ return writeOrBuffer(this, state, chunk, encoding, cb);
+ }
+
+ cork() {
+ this._writableState.corked++;
+ }
+
+ uncork() {
+ const state = this._writableState;
+
+ if (state.corked) {
+ state.corked--;
+
+ if (!state.writing) {
+ clearBuffer(this, state);
+ }
+ }
+ }
+
+ //TODO(Soremwar)
+ //Bring allowed encodings
+ setDefaultEncoding(encoding: string) {
+ // node::ParseEncoding() requires lower case.
+ if (typeof encoding === "string") {
+ encoding = encoding.toLowerCase();
+ }
+ if (!Buffer.isEncoding(encoding)) {
+ throw new ERR_UNKNOWN_ENCODING(encoding);
+ }
+ this._writableState.defaultEncoding = encoding;
+ return this;
+ }
+}
+
+export default Writable;
+export { WritableState };