summaryrefslogtreecommitdiff
path: root/std/node/_stream/writable.ts
diff options
context:
space:
mode:
authorSteven Guerrero <stephenguerrero43@gmail.com>2020-11-26 07:50:08 -0500
committerGitHub <noreply@github.com>2020-11-26 13:50:08 +0100
commit9042fcc12e7774cdd0ca3a5d08918a07dae8102b (patch)
tree8b5ff11412aae9bb714e0bb0b9b0358db64a8657 /std/node/_stream/writable.ts
parent60e980c78180ee3b0a14d692307be275dc181c8d (diff)
feat(std/node/stream): Add Duplex, Transform, Passthrough, pipeline, finished and promises (#7940)
Diffstat (limited to 'std/node/_stream/writable.ts')
-rw-r--r--std/node/_stream/writable.ts573
1 files changed, 32 insertions, 541 deletions
diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts
index 158af8325..534fc22fb 100644
--- a/std/node/_stream/writable.ts
+++ b/std/node/_stream/writable.ts
@@ -2,12 +2,10 @@
import { Buffer } from "../buffer.ts";
import Stream from "./stream.ts";
import { captureRejectionSymbol } from "../events.ts";
-import { kConstruct, kDestroy } from "./symbols.ts";
import {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_OPT_VALUE,
ERR_METHOD_NOT_IMPLEMENTED,
- ERR_MULTIPLE_CALLBACK,
ERR_STREAM_ALREADY_FINISHED,
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
@@ -15,493 +13,27 @@ import {
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING,
} from "../_errors.ts";
-
-function nop() {}
-
-//TODO(Soremwar)
-//Bring in encodings
-type write_v = (
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: string }>,
- callback: (error?: Error | null) => void,
-) => void;
-
-type AfterWriteTick = {
- cb: (error?: Error) => void;
- count: number;
- state: WritableState;
- stream: Writable;
-};
-
-const kOnFinished = Symbol("kOnFinished");
-
-function destroy(this: Writable, err?: Error, cb?: () => void) {
- const w = this._writableState;
-
- if (w.destroyed) {
- if (typeof cb === "function") {
- cb();
- }
-
- return this;
- }
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- }
-
- w.destroyed = true;
-
- if (!w.constructed) {
- this.once(kDestroy, (er) => {
- _destroy(this, err || er, cb);
- });
- } else {
- _destroy(this, err, cb);
- }
-
- return this;
-}
-
-function _destroy(
- self: Writable,
- err?: Error,
- cb?: (error?: Error | null) => void,
-) {
- self._destroy(err || null, (err) => {
- const w = self._writableState;
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- }
-
- w.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- if (!w.errorEmitted) {
- w.errorEmitted = true;
- self.emit("error", err);
- }
- w.closeEmitted = true;
- if (w.emitClose) {
- self.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- w.closeEmitted = true;
- if (w.emitClose) {
- self.emit("close");
- }
- });
- }
- });
-}
-
-function errorOrDestroy(stream: Writable, err: Error, sync = false) {
- const w = stream._writableState;
-
- if (w.destroyed) {
- return stream;
- }
-
- if (w.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (sync) {
- queueMicrotask(() => {
- if (w.errorEmitted) {
- return;
- }
- w.errorEmitted = true;
- stream.emit("error", err);
- });
- } else {
- if (w.errorEmitted) {
- return;
- }
- w.errorEmitted = true;
- stream.emit("error", err);
- }
- }
-}
-
-function construct(stream: Writable, cb: (error: Error) => void) {
- if (!stream._construct) {
- return;
- }
-
- stream.once(kConstruct, cb);
- const w = stream._writableState;
-
- w.constructed = false;
-
- queueMicrotask(() => {
- let called = false;
- stream._construct?.((err) => {
- w.constructed = true;
-
- if (called) {
- err = new ERR_MULTIPLE_CALLBACK();
- } else {
- called = true;
- }
-
- if (w.destroyed) {
- stream.emit(kDestroy, err);
- } else if (err) {
- errorOrDestroy(stream, err, true);
- } else {
- queueMicrotask(() => {
- stream.emit(kConstruct);
- });
- }
- });
- });
-}
-
-//TODO(Soremwar)
-//Bring encodings in
-function writeOrBuffer(
- stream: Writable,
- state: WritableState,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- callback: (error: Error) => void,
-) {
- const len = state.objectMode ? 1 : chunk.length;
-
- state.length += len;
-
- if (state.writing || state.corked || state.errored || !state.constructed) {
- state.buffered.push({ chunk, encoding, callback });
- if (state.allBuffers && encoding !== "buffer") {
- state.allBuffers = false;
- }
- if (state.allNoop && callback !== nop) {
- state.allNoop = false;
- }
- } else {
- state.writelen = len;
- state.writecb = callback;
- state.writing = true;
- state.sync = true;
- stream._write(chunk, encoding, state.onwrite);
- state.sync = false;
- }
-
- const ret = state.length < state.highWaterMark;
-
- if (!ret) {
- state.needDrain = true;
- }
-
- return ret && !state.errored && !state.destroyed;
-}
-
-//TODO(Soremwar)
-//Bring encodings in
-function doWrite(
- stream: Writable,
- state: WritableState,
- writev: boolean,
- len: number,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- cb: (error: Error) => void,
-) {
- state.writelen = len;
- state.writecb = cb;
- state.writing = true;
- state.sync = true;
- if (state.destroyed) {
- state.onwrite(new ERR_STREAM_DESTROYED("write"));
- } else if (writev) {
- (stream._writev as unknown as write_v)(chunk, state.onwrite);
- } else {
- stream._write(chunk, encoding, state.onwrite);
- }
- state.sync = false;
-}
-
-function onwriteError(
- stream: Writable,
- state: WritableState,
- er: Error,
- cb: (error: Error) => void,
-) {
- --state.pendingcb;
-
- cb(er);
- errorBuffer(state);
- errorOrDestroy(stream, er);
-}
-
-function onwrite(stream: Writable, er?: Error | null) {
- const state = stream._writableState;
- const sync = state.sync;
- const cb = state.writecb;
-
- if (typeof cb !== "function") {
- errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
- return;
- }
-
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
-
- if (er) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- er.stack;
-
- if (!state.errored) {
- state.errored = er;
- }
-
- if (sync) {
- queueMicrotask(() => onwriteError(stream, state, er, cb));
- } else {
- onwriteError(stream, state, er, cb);
- }
- } else {
- if (state.buffered.length > state.bufferedIndex) {
- clearBuffer(stream, state);
- }
-
- if (sync) {
- if (
- state.afterWriteTickInfo !== null &&
- state.afterWriteTickInfo.cb === cb
- ) {
- state.afterWriteTickInfo.count++;
- } else {
- state.afterWriteTickInfo = {
- count: 1,
- cb: (cb as (error?: Error) => void),
- stream,
- state,
- };
- queueMicrotask(() =>
- afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
- );
- }
- } else {
- afterWrite(stream, state, 1, cb as (error?: Error) => void);
- }
- }
-}
-
-function afterWriteTick({
- cb,
- count,
- state,
- stream,
-}: AfterWriteTick) {
- state.afterWriteTickInfo = null;
- return afterWrite(stream, state, count, cb);
-}
-
-function afterWrite(
- stream: Writable,
- state: WritableState,
- count: number,
- cb: (error?: Error) => void,
-) {
- const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
- state.needDrain;
- if (needDrain) {
- state.needDrain = false;
- stream.emit("drain");
- }
-
- while (count-- > 0) {
- state.pendingcb--;
- cb();
- }
-
- if (state.destroyed) {
- errorBuffer(state);
- }
-
- finishMaybe(stream, state);
-}
-
-/** If there's something in the buffer waiting, then invoke callbacks.*/
-function errorBuffer(state: WritableState) {
- if (state.writing) {
- return;
- }
-
- for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
- const { chunk, callback } = state.buffered[n];
- const len = state.objectMode ? 1 : chunk.length;
- state.length -= len;
- callback(new ERR_STREAM_DESTROYED("write"));
- }
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback(new ERR_STREAM_DESTROYED("end"));
- }
-
- resetBuffer(state);
-}
-
-/** If there's something in the buffer waiting, then process it.*/
-function clearBuffer(stream: Writable, state: WritableState) {
- if (
- state.corked ||
- state.bufferProcessing ||
- state.destroyed ||
- !state.constructed
- ) {
- return;
- }
-
- const { buffered, bufferedIndex, objectMode } = state;
- const bufferedLength = buffered.length - bufferedIndex;
-
- if (!bufferedLength) {
- return;
- }
-
- const i = bufferedIndex;
-
- state.bufferProcessing = true;
- if (bufferedLength > 1 && stream._writev) {
- state.pendingcb -= bufferedLength - 1;
-
- const callback = state.allNoop ? nop : (err: Error) => {
- for (let n = i; n < buffered.length; ++n) {
- buffered[n].callback(err);
- }
- };
- const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
-
- doWrite(stream, state, true, state.length, chunks, "", callback);
-
- resetBuffer(state);
- } else {
- do {
- const { chunk, encoding, callback } = buffered[i];
- const len = objectMode ? 1 : chunk.length;
- doWrite(stream, state, false, len, chunk, encoding, callback);
- } while (i < buffered.length && !state.writing);
-
- if (i === buffered.length) {
- resetBuffer(state);
- } else if (i > 256) {
- buffered.splice(0, i);
- state.bufferedIndex = 0;
- } else {
- state.bufferedIndex = i;
- }
- }
- state.bufferProcessing = false;
-}
-
-function finish(stream: Writable, state: WritableState) {
- state.pendingcb--;
- if (state.errorEmitted || state.closeEmitted) {
- return;
- }
-
- state.finished = true;
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback();
- }
-
- stream.emit("finish");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
-}
-
-function finishMaybe(stream: Writable, state: WritableState, sync?: boolean) {
- if (needFinish(state)) {
- prefinish(stream, state);
- if (state.pendingcb === 0 && needFinish(state)) {
- state.pendingcb++;
- if (sync) {
- queueMicrotask(() => finish(stream, state));
- } else {
- finish(stream, state);
- }
- }
- }
-}
-
-function prefinish(stream: Writable, state: WritableState) {
- if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === "function" && !state.destroyed) {
- state.finalCalled = true;
-
- state.sync = true;
- state.pendingcb++;
- stream._final((err) => {
- state.pendingcb--;
- if (err) {
- for (const callback of state[kOnFinished].splice(0)) {
- callback(err);
- }
- errorOrDestroy(stream, err, state.sync);
- } else if (needFinish(state)) {
- state.prefinished = true;
- stream.emit("prefinish");
- state.pendingcb++;
- queueMicrotask(() => finish(stream, state));
- }
- });
- state.sync = false;
- } else {
- state.prefinished = true;
- stream.emit("prefinish");
- }
- }
-}
-
-function needFinish(state: WritableState) {
- return (state.ending &&
- state.constructed &&
- state.length === 0 &&
- !state.errored &&
- state.buffered.length === 0 &&
- !state.finished &&
- !state.writing);
-}
-
-interface WritableOptions {
+import type { AfterWriteTick, writeV } from "./writable_internal.ts";
+import {
+ clearBuffer,
+ destroy,
+ errorBuffer,
+ errorOrDestroy,
+ finishMaybe,
+ kOnFinished,
+ nop,
+ onwrite,
+ resetBuffer,
+ writeOrBuffer,
+} from "./writable_internal.ts";
+import type { Encodings } from "../_utils.ts";
+
+type WritableEncodings = Encodings | "buffer";
+
+export interface WritableOptions {
autoDestroy?: boolean;
decodeStrings?: boolean;
- //TODO(Soremwar)
- //Bring encodings in
- defaultEncoding?: string;
+ defaultEncoding?: WritableEncodings;
destroy?(
this: Writable,
error: Error | null,
@@ -511,17 +43,13 @@ interface WritableOptions {
final?(this: Writable, callback: (error?: Error | null) => void): void;
highWaterMark?: number;
objectMode?: boolean;
- //TODO(Soremwar)
- //Bring encodings in
write?(
this: Writable,
// deno-lint-ignore no-explicit-any
chunk: any,
- encoding: string,
+ encoding: WritableEncodings,
callback: (error?: Error | null) => void,
): void;
- //TODO(Soremwar)
- //Bring encodings in
writev?(
this: Writable,
// deno-lint-ignore no-explicit-any
@@ -530,14 +58,12 @@ interface WritableOptions {
): void;
}
-class WritableState {
+export class WritableState {
[kOnFinished]: Array<(error?: Error) => void> = [];
afterWriteTickInfo: null | AfterWriteTick = null;
allBuffers = true;
allNoop = true;
autoDestroy: boolean;
- //TODO(Soremwar)
- //Bring in encodings
buffered: Array<{
allBuffers?: boolean;
// deno-lint-ignore no-explicit-any
@@ -552,7 +78,7 @@ class WritableState {
constructed: boolean;
corked = 0;
decodeStrings: boolean;
- defaultEncoding: string;
+ defaultEncoding: WritableEncodings;
destroyed = false;
emitClose: boolean;
ended = false;
@@ -608,25 +134,17 @@ class WritableState {
}
}
-function resetBuffer(state: WritableState) {
- state.buffered = [];
- state.bufferedIndex = 0;
- state.allBuffers = true;
- state.allNoop = true;
-}
-
/** A bit simpler than readable streams.
* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all
* the drain event emission and buffering.
*/
class Writable extends Stream {
- _construct?: (cb: (error?: Error) => void) => void;
_final?: (
this: Writable,
callback: (error?: Error | null | undefined) => void,
) => void;
_writableState: WritableState;
- _writev?: write_v | null = null;
+ _writev?: writeV | null = null;
constructor(options?: WritableOptions) {
super();
@@ -649,16 +167,6 @@ class Writable extends Stream {
this._final = options.final;
}
}
-
- construct(this, () => {
- const state = this._writableState;
-
- if (!state.writing) {
- clearBuffer(this, state);
- }
-
- finishMaybe(this, state);
- });
}
[captureRejectionSymbol](err?: Error) {
@@ -735,7 +243,7 @@ class Writable extends Stream {
cb(err);
}
- destroy(err?: Error, cb?: () => void) {
+ destroy(err?: Error | null, cb?: () => void) {
const state = this._writableState;
if (!state.destroyed) {
queueMicrotask(() => errorBuffer(state));
@@ -747,25 +255,19 @@ class Writable extends Stream {
end(cb?: () => void): void;
// deno-lint-ignore no-explicit-any
end(chunk: any, cb?: () => void): void;
- //TODO(Soremwar)
- //Bring in encodings
// deno-lint-ignore no-explicit-any
- end(chunk: any, encoding: string, cb?: () => void): void;
+ end(chunk: any, encoding: WritableEncodings, cb?: () => void): void;
end(
// deno-lint-ignore no-explicit-any
x?: any | (() => void),
- //TODO(Soremwar)
- //Bring in encodings
- y?: string | (() => void),
+ y?: WritableEncodings | (() => void),
z?: () => void,
) {
const state = this._writableState;
// deno-lint-ignore no-explicit-any
let chunk: any | null;
- //TODO(Soremwar)
- //Bring in encodings
- let encoding: string | null;
+ let encoding: WritableEncodings | null;
let cb: undefined | ((error?: Error) => void);
if (typeof x === "function") {
@@ -778,7 +280,7 @@ class Writable extends Stream {
cb = y;
} else {
chunk = x;
- encoding = y as string;
+ encoding = y as WritableEncodings;
cb = z;
}
@@ -815,8 +317,6 @@ class Writable extends Stream {
return this;
}
- //TODO(Soremwar)
- //Bring in encodings
_write(
// deno-lint-ignore no-explicit-any
chunk: any,
@@ -838,27 +338,21 @@ class Writable extends Stream {
// deno-lint-ignore no-explicit-any
write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean;
- //TODO(Soremwar)
- //Bring in encodings
write(
// deno-lint-ignore no-explicit-any
chunk: any,
- encoding: string | null,
+ encoding: WritableEncodings | null,
cb?: (error: Error | null | undefined) => void,
): boolean;
- //TODO(Soremwar)
- //Bring in encodings
write(
// deno-lint-ignore no-explicit-any
chunk: any,
- x?: string | null | ((error: Error | null | undefined) => void),
+ x?: WritableEncodings | null | ((error: Error | null | undefined) => void),
y?: ((error: Error | null | undefined) => void),
) {
const state = this._writableState;
- //TODO(Soremwar)
- //Bring in encodings
- let encoding: string;
+ let encoding: WritableEncodings;
let cb: (error?: Error | null) => void;
if (typeof x === "function") {
@@ -933,8 +427,6 @@ class Writable extends Stream {
}
}
- //TODO(Soremwar)
- //Bring allowed encodings
setDefaultEncoding(encoding: string) {
// node::ParseEncoding() requires lower case.
if (typeof encoding === "string") {
@@ -943,10 +435,9 @@ class Writable extends Stream {
if (!Buffer.isEncoding(encoding)) {
throw new ERR_UNKNOWN_ENCODING(encoding);
}
- this._writableState.defaultEncoding = encoding;
+ this._writableState.defaultEncoding = encoding as WritableEncodings;
return this;
}
}
export default Writable;
-export { WritableState };