summaryrefslogtreecommitdiff
path: root/std/node/_stream
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream')
-rw-r--r--std/node/_stream/async_iterator.ts23
-rw-r--r--std/node/_stream/destroy.ts38
-rw-r--r--std/node/_stream/duplex.ts683
-rw-r--r--std/node/_stream/duplex_internal.ts296
-rw-r--r--std/node/_stream/duplex_test.ts698
-rw-r--r--std/node/_stream/end_of_stream.ts (renamed from std/node/_stream/end-of-stream.ts)5
-rw-r--r--std/node/_stream/end_of_stream_test.ts97
-rw-r--r--std/node/_stream/passthrough.ts20
-rw-r--r--std/node/_stream/pipeline.ts308
-rw-r--r--std/node/_stream/pipeline_test.ts387
-rw-r--r--std/node/_stream/promises.ts42
-rw-r--r--std/node/_stream/promises_test.ts84
-rw-r--r--std/node/_stream/readable.ts546
-rw-r--r--std/node/_stream/readable_internal.ts438
-rw-r--r--std/node/_stream/stream.ts6
-rw-r--r--std/node/_stream/transform.ts132
-rw-r--r--std/node/_stream/transform_test.ts68
-rw-r--r--std/node/_stream/writable.ts573
-rw-r--r--std/node/_stream/writable_internal.ts457
-rw-r--r--std/node/_stream/writable_test.ts2
20 files changed, 3832 insertions, 1071 deletions
diff --git a/std/node/_stream/async_iterator.ts b/std/node/_stream/async_iterator.ts
index cd1b6db3c..5369ef39c 100644
--- a/std/node/_stream/async_iterator.ts
+++ b/std/node/_stream/async_iterator.ts
@@ -1,8 +1,9 @@
// Copyright Node.js contributors. All rights reserved. MIT License.
import type { Buffer } from "../buffer.ts";
-import finished from "./end-of-stream.ts";
+import finished from "./end_of_stream.ts";
import Readable from "./readable.ts";
import type Stream from "./stream.ts";
+import { destroyer } from "./destroy.ts";
const kLastResolve = Symbol("lastResolve");
const kLastReject = Symbol("lastReject");
@@ -34,24 +35,6 @@ function initIteratorSymbols(
Object.defineProperties(o, properties);
}
-// TODO(Soremwar)
-// Bring back once requests are implemented
-// function isRequest(stream: any) {
-// return stream && stream.setHeader && typeof stream.abort === "function";
-// }
-
-//TODO(Soremwar)
-//Should be any implementation of stream
-// deno-lint-ignore no-explicit-any
-function destroyer(stream: any, err?: Error | null) {
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // if (isRequest(stream)) return stream.abort();
- // if (isRequest(stream.req)) return stream.req.abort();
- if (typeof stream.destroy === "function") return stream.destroy(err);
- if (typeof stream.close === "function") return stream.close();
-}
-
function createIterResult(
value: IterableItem,
done: boolean,
@@ -119,7 +102,7 @@ const AsyncIteratorPrototype = Object.getPrototypeOf(
Object.getPrototypeOf(async function* () {}).prototype,
);
-class ReadableStreamAsyncIterator
+export class ReadableStreamAsyncIterator
implements AsyncIterableIterator<IterableItem> {
[kEnded]: boolean;
[kError]: Error | null = null;
diff --git a/std/node/_stream/destroy.ts b/std/node/_stream/destroy.ts
new file mode 100644
index 000000000..d13e12de2
--- /dev/null
+++ b/std/node/_stream/destroy.ts
@@ -0,0 +1,38 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import type Duplex from "./duplex.ts";
+import type Readable from "./readable.ts";
+import type Stream from "./stream.ts";
+import type Writable from "./writable.ts";
+
+//This whole module acts as a 'normalizer'
+//Idea behind it is you can pass any kind of streams and functions will execute anyways
+
+//TODO(Soremwar)
+//Should be any implementation of stream
+//This is a guard to check executed methods exist inside the implementation
+type StreamImplementations = Duplex | Readable | Writable;
+
+// TODO(Soremwar)
+// Bring back once requests are implemented
+// function isRequest(stream: any) {
+// return stream && stream.setHeader && typeof stream.abort === "function";
+// }
+
+export function destroyer(stream: Stream, err?: Error | null) {
+ // TODO(Soremwar)
+ // Bring back once requests are implemented
+ // if (isRequest(stream)) return stream.abort();
+ // if (isRequest(stream.req)) return stream.req.abort();
+ if (
+ typeof (stream as StreamImplementations).destroy === "function"
+ ) {
+ return (stream as StreamImplementations).destroy(err);
+ }
+ // A test of async iterator mocks an upcoming implementation of stream
+ // his is casted to any in the meanwhile
+ // deno-lint-ignore no-explicit-any
+ if (typeof (stream as any).close === "function") {
+ // deno-lint-ignore no-explicit-any
+ return (stream as any).close();
+ }
+}
diff --git a/std/node/_stream/duplex.ts b/std/node/_stream/duplex.ts
index c5faed6f8..b5c429f0a 100644
--- a/std/node/_stream/duplex.ts
+++ b/std/node/_stream/duplex.ts
@@ -1,3 +1,682 @@
// Copyright Node.js contributors. All rights reserved. MIT License.
-// deno-lint-ignore no-explicit-any
-export const errorOrDestroy = (...args: any[]) => {};
+import { captureRejectionSymbol } from "../events.ts";
+import Readable, { ReadableState } from "./readable.ts";
+import Stream from "./stream.ts";
+import Writable, { WritableState } from "./writable.ts";
+import { Buffer } from "../buffer.ts";
+import {
+ ERR_STREAM_ALREADY_FINISHED,
+ ERR_STREAM_DESTROYED,
+ ERR_UNKNOWN_ENCODING,
+} from "../_errors.ts";
+import type { Encodings } from "../_utils.ts";
+import createReadableStreamAsyncIterator from "./async_iterator.ts";
+import type { ReadableStreamAsyncIterator } from "./async_iterator.ts";
+import {
+ _destroy,
+ computeNewHighWaterMark,
+ emitReadable,
+ fromList,
+ howMuchToRead,
+ nReadingNextTick,
+ updateReadableListening,
+} from "./readable_internal.ts";
+import { kOnFinished, writeV } from "./writable_internal.ts";
+import {
+ endDuplex,
+ finishMaybe,
+ onwrite,
+ readableAddChunk,
+} from "./duplex_internal.ts";
+export { errorOrDestroy } from "./duplex_internal.ts";
+
+export interface DuplexOptions {
+ allowHalfOpen?: boolean;
+ autoDestroy?: boolean;
+ decodeStrings?: boolean;
+ defaultEncoding?: Encodings;
+ destroy?(
+ this: Duplex,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ): void;
+ emitClose?: boolean;
+ encoding?: Encodings;
+ final?(this: Duplex, callback: (error?: Error | null) => void): void;
+ highWaterMark?: number;
+ objectMode?: boolean;
+ read?(this: Duplex, size: number): void;
+ readable?: boolean;
+ readableHighWaterMark?: number;
+ readableObjectMode?: boolean;
+ writable?: boolean;
+ writableCorked?: number;
+ writableHighWaterMark?: number;
+ writableObjectMode?: boolean;
+ write?(
+ this: Duplex,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: Encodings,
+ callback: (error?: Error | null) => void,
+ ): void;
+ writev?: writeV;
+}
+
+interface Duplex extends Readable, Writable {}
+
+/**
+ * A duplex is an implementation of a stream that has both Readable and Writable
+ * attributes and capabilities
+ */
+class Duplex extends Stream {
+ allowHalfOpen = true;
+ _final?: (
+ callback: (error?: Error | null | undefined) => void,
+ ) => void;
+ _readableState: ReadableState;
+ _writableState: WritableState;
+ _writev?: writeV | null;
+
+ constructor(options?: DuplexOptions) {
+ super();
+
+ if (options) {
+ if (options.allowHalfOpen === false) {
+ this.allowHalfOpen = false;
+ }
+ if (typeof options.destroy === "function") {
+ this._destroy = options.destroy;
+ }
+ if (typeof options.final === "function") {
+ this._final = options.final;
+ }
+ if (typeof options.read === "function") {
+ this._read = options.read;
+ }
+ if (options.readable === false) {
+ this.readable = false;
+ }
+ if (options.writable === false) {
+ this.writable = false;
+ }
+ if (typeof options.write === "function") {
+ this._write = options.write;
+ }
+ if (typeof options.writev === "function") {
+ this._writev = options.writev;
+ }
+ }
+
+ const readableOptions = {
+ autoDestroy: options?.autoDestroy,
+ defaultEncoding: options?.defaultEncoding,
+ destroy: options?.destroy as unknown as (
+ this: Readable,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ) => void,
+ emitClose: options?.emitClose,
+ encoding: options?.encoding,
+ highWaterMark: options?.highWaterMark ?? options?.readableHighWaterMark,
+ objectMode: options?.objectMode ?? options?.readableObjectMode,
+ read: options?.read as unknown as (this: Readable) => void,
+ };
+
+ const writableOptions = {
+ autoDestroy: options?.autoDestroy,
+ decodeStrings: options?.decodeStrings,
+ defaultEncoding: options?.defaultEncoding,
+ destroy: options?.destroy as unknown as (
+ this: Writable,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ) => void,
+ emitClose: options?.emitClose,
+ final: options?.final as unknown as (
+ this: Writable,
+ callback: (error?: Error | null) => void,
+ ) => void,
+ highWaterMark: options?.highWaterMark ?? options?.writableHighWaterMark,
+ objectMode: options?.objectMode ?? options?.writableObjectMode,
+ write: options?.write as unknown as (
+ this: Writable,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ callback: (error?: Error | null) => void,
+ ) => void,
+ writev: options?.writev as unknown as (
+ this: Writable,
+ // deno-lint-ignore no-explicit-any
+ chunks: Array<{ chunk: any; encoding: Encodings }>,
+ callback: (error?: Error | null) => void,
+ ) => void,
+ };
+
+ this._readableState = new ReadableState(readableOptions);
+ this._writableState = new WritableState(
+ writableOptions,
+ this as unknown as Writable,
+ );
+ //Very important to override onwrite here, duplex implementation adds a check
+ //on the readable side
+ this._writableState.onwrite = onwrite.bind(undefined, this);
+ }
+
+ [captureRejectionSymbol](err?: Error) {
+ this.destroy(err);
+ }
+
+ [Symbol.asyncIterator](): ReadableStreamAsyncIterator {
+ return createReadableStreamAsyncIterator(this);
+ }
+
+ _destroy(
+ error: Error | null,
+ callback: (error?: Error | null) => void,
+ ): void {
+ callback(error);
+ }
+
+ _read = Readable.prototype._read;
+
+ _undestroy = Readable.prototype._undestroy;
+
+ destroy(err?: Error | null, cb?: (error?: Error | null) => void) {
+ const r = this._readableState;
+ const w = this._writableState;
+
+ if (w.destroyed || r.destroyed) {
+ if (typeof cb === "function") {
+ cb();
+ }
+
+ return this;
+ }
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ if (!r.errored) {
+ r.errored = err;
+ }
+ }
+
+ w.destroyed = true;
+ r.destroyed = true;
+
+ this._destroy(err || null, (err) => {
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ if (!r.errored) {
+ r.errored = err;
+ }
+ }
+
+ w.closed = true;
+ r.closed = true;
+
+ if (typeof cb === "function") {
+ cb(err);
+ }
+
+ if (err) {
+ queueMicrotask(() => {
+ const r = this._readableState;
+ const w = this._writableState;
+
+ if (!w.errorEmitted && !r.errorEmitted) {
+ w.errorEmitted = true;
+ r.errorEmitted = true;
+
+ this.emit("error", err);
+ }
+
+ r.closeEmitted = true;
+
+ if (w.emitClose || r.emitClose) {
+ this.emit("close");
+ }
+ });
+ } else {
+ queueMicrotask(() => {
+ const r = this._readableState;
+ const w = this._writableState;
+
+ r.closeEmitted = true;
+
+ if (w.emitClose || r.emitClose) {
+ this.emit("close");
+ }
+ });
+ }
+ });
+
+ return this;
+ }
+
+ isPaused = Readable.prototype.isPaused;
+
+ off = this.removeListener;
+
+ on(
+ event: "close" | "end" | "pause" | "readable" | "resume",
+ listener: () => void,
+ ): this;
+ // deno-lint-ignore no-explicit-any
+ on(event: "data", listener: (chunk: any) => void): this;
+ on(event: "error", listener: (err: Error) => void): this;
+ // deno-lint-ignore no-explicit-any
+ on(event: string | symbol, listener: (...args: any[]) => void): this;
+ on(
+ ev: string | symbol,
+ fn:
+ | (() => void)
+ // deno-lint-ignore no-explicit-any
+ | ((chunk: any) => void)
+ | ((err: Error) => void)
+ // deno-lint-ignore no-explicit-any
+ | ((...args: any[]) => void),
+ ) {
+ const res = super.on.call(this, ev, fn);
+ const state = this._readableState;
+
+ if (ev === "data") {
+ state.readableListening = this.listenerCount("readable") > 0;
+
+ if (state.flowing !== false) {
+ this.resume();
+ }
+ } else if (ev === "readable") {
+ if (!state.endEmitted && !state.readableListening) {
+ state.readableListening = state.needReadable = true;
+ state.flowing = false;
+ state.emittedReadable = false;
+ if (state.length) {
+ emitReadable(this);
+ } else if (!state.reading) {
+ queueMicrotask(() => nReadingNextTick(this));
+ }
+ }
+ }
+
+ return res;
+ }
+
+ pause = Readable.prototype.pause as () => this;
+
+ pipe = Readable.prototype.pipe;
+
+ // deno-lint-ignore no-explicit-any
+ push(chunk: any, encoding?: Encodings): boolean {
+ return readableAddChunk(this, chunk, encoding, false);
+ }
+
+ /** You can override either this method, or the async `_read` method */
+ read(n?: number) {
+ // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
+ // in this scenario, so we are doing it manually.
+ if (n === undefined) {
+ n = NaN;
+ }
+ const state = this._readableState;
+ const nOrig = n;
+
+ if (n > state.highWaterMark) {
+ state.highWaterMark = computeNewHighWaterMark(n);
+ }
+
+ if (n !== 0) {
+ state.emittedReadable = false;
+ }
+
+ if (
+ n === 0 &&
+ state.needReadable &&
+ ((state.highWaterMark !== 0
+ ? state.length >= state.highWaterMark
+ : state.length > 0) ||
+ state.ended)
+ ) {
+ if (state.length === 0 && state.ended) {
+ endDuplex(this);
+ } else {
+ emitReadable(this);
+ }
+ return null;
+ }
+
+ n = howMuchToRead(n, state);
+
+ if (n === 0 && state.ended) {
+ if (state.length === 0) {
+ endDuplex(this);
+ }
+ return null;
+ }
+
+ let doRead = state.needReadable;
+ if (
+ state.length === 0 || state.length - (n as number) < state.highWaterMark
+ ) {
+ doRead = true;
+ }
+
+ if (
+ state.ended || state.reading || state.destroyed || state.errored ||
+ !state.constructed
+ ) {
+ doRead = false;
+ } else if (doRead) {
+ state.reading = true;
+ state.sync = true;
+ if (state.length === 0) {
+ state.needReadable = true;
+ }
+ this._read();
+ state.sync = false;
+ if (!state.reading) {
+ n = howMuchToRead(nOrig, state);
+ }
+ }
+
+ let ret;
+ if ((n as number) > 0) {
+ ret = fromList((n as number), state);
+ } else {
+ ret = null;
+ }
+
+ if (ret === null) {
+ state.needReadable = state.length <= state.highWaterMark;
+ n = 0;
+ } else {
+ state.length -= n as number;
+ if (state.multiAwaitDrain) {
+ (state.awaitDrainWriters as Set<Writable>).clear();
+ } else {
+ state.awaitDrainWriters = null;
+ }
+ }
+
+ if (state.length === 0) {
+ if (!state.ended) {
+ state.needReadable = true;
+ }
+
+ if (nOrig !== n && state.ended) {
+ endDuplex(this);
+ }
+ }
+
+ if (ret !== null) {
+ this.emit("data", ret);
+ }
+
+ return ret;
+ }
+
+ removeAllListeners(
+ ev:
+ | "close"
+ | "data"
+ | "end"
+ | "error"
+ | "pause"
+ | "readable"
+ | "resume"
+ | symbol
+ | undefined,
+ ) {
+ const res = super.removeAllListeners(ev);
+
+ if (ev === "readable" || ev === undefined) {
+ queueMicrotask(() => updateReadableListening(this));
+ }
+
+ return res;
+ }
+
+ removeListener(
+ event: "close" | "end" | "pause" | "readable" | "resume",
+ listener: () => void,
+ ): this;
+ // deno-lint-ignore no-explicit-any
+ removeListener(event: "data", listener: (chunk: any) => void): this;
+ removeListener(event: "error", listener: (err: Error) => void): this;
+ removeListener(
+ event: string | symbol,
+ // deno-lint-ignore no-explicit-any
+ listener: (...args: any[]) => void,
+ ): this;
+ removeListener(
+ ev: string | symbol,
+ fn:
+ | (() => void)
+ // deno-lint-ignore no-explicit-any
+ | ((chunk: any) => void)
+ | ((err: Error) => void)
+ // deno-lint-ignore no-explicit-any
+ | ((...args: any[]) => void),
+ ) {
+ const res = super.removeListener.call(this, ev, fn);
+
+ if (ev === "readable") {
+ queueMicrotask(() => updateReadableListening(this));
+ }
+
+ return res;
+ }
+
+ resume = Readable.prototype.resume as () => this;
+
+ setEncoding = Readable.prototype.setEncoding as (enc: string) => this;
+
+ // deno-lint-ignore no-explicit-any
+ unshift(chunk: any, encoding?: Encodings): boolean {
+ return readableAddChunk(this, chunk, encoding, true);
+ }
+
+ unpipe = Readable.prototype.unpipe as (dest?: Writable | undefined) => this;
+
+ wrap = Readable.prototype.wrap as (stream: Stream) => this;
+
+ get readable(): boolean {
+ return this._readableState?.readable &&
+ !this._readableState?.destroyed &&
+ !this._readableState?.errorEmitted &&
+ !this._readableState?.endEmitted;
+ }
+ set readable(val: boolean) {
+ if (this._readableState) {
+ this._readableState.readable = val;
+ }
+ }
+
+ get readableHighWaterMark(): number {
+ return this._readableState.highWaterMark;
+ }
+
+ get readableBuffer() {
+ return this._readableState && this._readableState.buffer;
+ }
+
+ get readableFlowing(): boolean | null {
+ return this._readableState.flowing;
+ }
+
+ set readableFlowing(state: boolean | null) {
+ if (this._readableState) {
+ this._readableState.flowing = state;
+ }
+ }
+
+ get readableLength() {
+ return this._readableState.length;
+ }
+
+ get readableObjectMode() {
+ return this._readableState ? this._readableState.objectMode : false;
+ }
+
+ get readableEncoding() {
+ return this._readableState ? this._readableState.encoding : null;
+ }
+
+ get readableEnded() {
+ return this._readableState ? this._readableState.endEmitted : false;
+ }
+
+ _write = Writable.prototype._write;
+
+ write = Writable.prototype.write;
+
+ cork = Writable.prototype.cork;
+
+ uncork = Writable.prototype.uncork;
+
+ setDefaultEncoding(encoding: string) {
+ // node::ParseEncoding() requires lower case.
+ if (typeof encoding === "string") {
+ encoding = encoding.toLowerCase();
+ }
+ if (!Buffer.isEncoding(encoding)) {
+ throw new ERR_UNKNOWN_ENCODING(encoding);
+ }
+ this._writableState.defaultEncoding = encoding as Encodings;
+ return this;
+ }
+
+ end(cb?: () => void): void;
+ // deno-lint-ignore no-explicit-any
+ end(chunk: any, cb?: () => void): void;
+ // deno-lint-ignore no-explicit-any
+ end(chunk: any, encoding: Encodings, cb?: () => void): void;
+
+ end(
+ // deno-lint-ignore no-explicit-any
+ x?: any | (() => void),
+ y?: Encodings | (() => void),
+ z?: () => void,
+ ) {
+ const state = this._writableState;
+ // deno-lint-ignore no-explicit-any
+ let chunk: any | null;
+ let encoding: Encodings | null;
+ let cb: undefined | ((error?: Error) => void);
+
+ if (typeof x === "function") {
+ chunk = null;
+ encoding = null;
+ cb = x;
+ } else if (typeof y === "function") {
+ chunk = x;
+ encoding = null;
+ cb = y;
+ } else {
+ chunk = x;
+ encoding = y as Encodings;
+ cb = z;
+ }
+
+ if (chunk !== null && chunk !== undefined) {
+ this.write(chunk, encoding);
+ }
+
+ if (state.corked) {
+ state.corked = 1;
+ this.uncork();
+ }
+
+ let err: Error | undefined;
+ if (!state.errored && !state.ending) {
+ state.ending = true;
+ finishMaybe(this, state, true);
+ state.ended = true;
+ } else if (state.finished) {
+ err = new ERR_STREAM_ALREADY_FINISHED("end");
+ } else if (state.destroyed) {
+ err = new ERR_STREAM_DESTROYED("end");
+ }
+
+ if (typeof cb === "function") {
+ if (err || state.finished) {
+ queueMicrotask(() => {
+ (cb as (error?: Error | undefined) => void)(err);
+ });
+ } else {
+ state[kOnFinished].push(cb);
+ }
+ }
+
+ return this;
+ }
+
+ get destroyed() {
+ if (
+ this._readableState === undefined ||
+ this._writableState === undefined
+ ) {
+ return false;
+ }
+ return this._readableState.destroyed && this._writableState.destroyed;
+ }
+
+ set destroyed(value: boolean) {
+ if (this._readableState && this._writableState) {
+ this._readableState.destroyed = value;
+ this._writableState.destroyed = value;
+ }
+ }
+
+ get writable() {
+ const w = this._writableState;
+ return !w.destroyed && !w.errored && !w.ending && !w.ended;
+ }
+
+ set writable(val) {
+ if (this._writableState) {
+ this._writableState.writable = !!val;
+ }
+ }
+
+ get writableFinished() {
+ return this._writableState ? this._writableState.finished : false;
+ }
+
+ get writableObjectMode() {
+ return this._writableState ? this._writableState.objectMode : false;
+ }
+
+ get writableBuffer() {
+ return this._writableState && this._writableState.getBuffer();
+ }
+
+ get writableEnded() {
+ return this._writableState ? this._writableState.ending : false;
+ }
+
+ get writableHighWaterMark() {
+ return this._writableState && this._writableState.highWaterMark;
+ }
+
+ get writableCorked() {
+ return this._writableState ? this._writableState.corked : 0;
+ }
+
+ get writableLength() {
+ return this._writableState && this._writableState.length;
+ }
+}
+
+export default Duplex;
diff --git a/std/node/_stream/duplex_internal.ts b/std/node/_stream/duplex_internal.ts
new file mode 100644
index 000000000..bfd9749f8
--- /dev/null
+++ b/std/node/_stream/duplex_internal.ts
@@ -0,0 +1,296 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import type { ReadableState } from "./readable.ts";
+import { addChunk, maybeReadMore, onEofChunk } from "./readable_internal.ts";
+import type Writable from "./writable.ts";
+import type { WritableState } from "./writable.ts";
+import {
+ afterWrite,
+ AfterWriteTick,
+ afterWriteTick,
+ clearBuffer,
+ errorBuffer,
+ kOnFinished,
+ needFinish,
+ prefinish,
+} from "./writable_internal.ts";
+import { Buffer } from "../buffer.ts";
+import type Duplex from "./duplex.ts";
+import {
+ ERR_MULTIPLE_CALLBACK,
+ ERR_STREAM_PUSH_AFTER_EOF,
+ ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
+} from "../_errors.ts";
+
+export function endDuplex(stream: Duplex) {
+ const state = stream._readableState;
+
+ if (!state.endEmitted) {
+ state.ended = true;
+ queueMicrotask(() => endReadableNT(state, stream));
+ }
+}
+
+function endReadableNT(state: ReadableState, stream: Duplex) {
+ // Check that we didn't get one last unshift.
+ if (
+ !state.errorEmitted && !state.closeEmitted &&
+ !state.endEmitted && state.length === 0
+ ) {
+ state.endEmitted = true;
+ stream.emit("end");
+
+ if (stream.writable && stream.allowHalfOpen === false) {
+ queueMicrotask(() => endWritableNT(state, stream));
+ } else if (state.autoDestroy) {
+ // In case of duplex streams we need a way to detect
+ // if the writable side is ready for autoDestroy as well.
+ const wState = stream._writableState;
+ const autoDestroy = !wState || (
+ wState.autoDestroy &&
+ // We don't expect the writable to ever 'finish'
+ // if writable is explicitly set to false.
+ (wState.finished || wState.writable === false)
+ );
+
+ if (autoDestroy) {
+ stream.destroy();
+ }
+ }
+ }
+}
+
+function endWritableNT(state: ReadableState, stream: Duplex) {
+ const writable = stream.writable &&
+ !stream.writableEnded &&
+ !stream.destroyed;
+ if (writable) {
+ stream.end();
+ }
+}
+
+export function errorOrDestroy(
+ // deno-lint-ignore no-explicit-any
+ this: any,
+ stream: Duplex,
+ err: Error,
+ sync = false,
+) {
+ const r = stream._readableState;
+ const w = stream._writableState;
+
+ if (w.destroyed || r.destroyed) {
+ return this;
+ }
+
+ if (r.autoDestroy || w.autoDestroy) {
+ stream.destroy(err);
+ } else if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (w && !w.errored) {
+ w.errored = err;
+ }
+ if (r && !r.errored) {
+ r.errored = err;
+ }
+
+ if (sync) {
+ queueMicrotask(() => {
+ if (w.errorEmitted || r.errorEmitted) {
+ return;
+ }
+
+ w.errorEmitted = true;
+ r.errorEmitted = true;
+
+ stream.emit("error", err);
+ });
+ } else {
+ if (w.errorEmitted || r.errorEmitted) {
+ return;
+ }
+
+ w.errorEmitted = true;
+ r.errorEmitted = true;
+
+ stream.emit("error", err);
+ }
+ }
+}
+
+function finish(stream: Duplex, state: WritableState) {
+ state.pendingcb--;
+ if (state.errorEmitted || state.closeEmitted) {
+ return;
+ }
+
+ state.finished = true;
+
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback();
+ }
+
+ stream.emit("finish");
+
+ if (state.autoDestroy) {
+ stream.destroy();
+ }
+}
+
+export function finishMaybe(
+ stream: Duplex,
+ state: WritableState,
+ sync?: boolean,
+) {
+ if (needFinish(state)) {
+ prefinish(stream as Writable, state);
+ if (state.pendingcb === 0 && needFinish(state)) {
+ state.pendingcb++;
+ if (sync) {
+ queueMicrotask(() => finish(stream, state));
+ } else {
+ finish(stream, state);
+ }
+ }
+ }
+}
+
+export function onwrite(stream: Duplex, er?: Error | null) {
+ const state = stream._writableState;
+ const sync = state.sync;
+ const cb = state.writecb;
+
+ if (typeof cb !== "function") {
+ errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
+ return;
+ }
+
+ state.writing = false;
+ state.writecb = null;
+ state.length -= state.writelen;
+ state.writelen = 0;
+
+ if (er) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ er.stack;
+
+ if (!state.errored) {
+ state.errored = er;
+ }
+
+ if (stream._readableState && !stream._readableState.errored) {
+ stream._readableState.errored = er;
+ }
+
+ if (sync) {
+ queueMicrotask(() => onwriteError(stream, state, er, cb));
+ } else {
+ onwriteError(stream, state, er, cb);
+ }
+ } else {
+ if (state.buffered.length > state.bufferedIndex) {
+ clearBuffer(stream, state);
+ }
+
+ if (sync) {
+ if (
+ state.afterWriteTickInfo !== null &&
+ state.afterWriteTickInfo.cb === cb
+ ) {
+ state.afterWriteTickInfo.count++;
+ } else {
+ state.afterWriteTickInfo = {
+ count: 1,
+ cb: (cb as (error?: Error) => void),
+ stream: stream as Writable,
+ state,
+ };
+ queueMicrotask(() =>
+ afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
+ );
+ }
+ } else {
+ afterWrite(stream as Writable, state, 1, cb as (error?: Error) => void);
+ }
+ }
+}
+
+function onwriteError(
+ stream: Duplex,
+ state: WritableState,
+ er: Error,
+ cb: (error: Error) => void,
+) {
+ --state.pendingcb;
+
+ cb(er);
+ errorBuffer(state);
+ errorOrDestroy(stream, er);
+}
+
+export function readableAddChunk(
+ stream: Duplex,
+ chunk: string | Buffer | Uint8Array | null,
+ encoding: undefined | string = undefined,
+ addToFront: boolean,
+) {
+ const state = stream._readableState;
+ let usedEncoding = encoding;
+
+ let err;
+ if (!state.objectMode) {
+ if (typeof chunk === "string") {
+ usedEncoding = encoding || state.defaultEncoding;
+ if (state.encoding !== usedEncoding) {
+ if (addToFront && state.encoding) {
+ chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
+ } else {
+ chunk = Buffer.from(chunk, usedEncoding);
+ usedEncoding = "";
+ }
+ }
+ } else if (chunk instanceof Uint8Array) {
+ chunk = Buffer.from(chunk);
+ }
+ }
+
+ if (err) {
+ errorOrDestroy(stream, err);
+ } else if (chunk === null) {
+ state.reading = false;
+ onEofChunk(stream, state);
+ } else if (state.objectMode || (chunk.length > 0)) {
+ if (addToFront) {
+ if (state.endEmitted) {
+ errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
+ } else {
+ addChunk(stream, state, chunk, true);
+ }
+ } else if (state.ended) {
+ errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
+ } else if (state.destroyed || state.errored) {
+ return false;
+ } else {
+ state.reading = false;
+ if (state.decoder && !usedEncoding) {
+ //TODO(Soremwar)
+ //I don't think this cast is right
+ chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
+ if (state.objectMode || chunk.length !== 0) {
+ addChunk(stream, state, chunk, false);
+ } else {
+ maybeReadMore(stream, state);
+ }
+ } else {
+ addChunk(stream, state, chunk, false);
+ }
+ }
+ } else if (!addToFront) {
+ state.reading = false;
+ maybeReadMore(stream, state);
+ }
+
+ return !state.ended &&
+ (state.length < state.highWaterMark || state.length === 0);
+}
diff --git a/std/node/_stream/duplex_test.ts b/std/node/_stream/duplex_test.ts
new file mode 100644
index 000000000..1596ec218
--- /dev/null
+++ b/std/node/_stream/duplex_test.ts
@@ -0,0 +1,698 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import Duplex from "./duplex.ts";
+import finished from "./end_of_stream.ts";
+import {
+ assert,
+ assertEquals,
+ assertStrictEquals,
+ assertThrows,
+} from "../../testing/asserts.ts";
+import { deferred, delay } from "../../async/mod.ts";
+
+Deno.test("Duplex stream works normally", () => {
+ const stream = new Duplex({ objectMode: true });
+
+ assert(stream._readableState.objectMode);
+ assert(stream._writableState.objectMode);
+ assert(stream.allowHalfOpen);
+ assertEquals(stream.listenerCount("end"), 0);
+
+ let written: { val: number };
+ let read: { val: number };
+
+ stream._write = (obj, _, cb) => {
+ written = obj;
+ cb();
+ };
+
+ stream._read = () => {};
+
+ stream.on("data", (obj) => {
+ read = obj;
+ });
+
+ stream.push({ val: 1 });
+ stream.end({ val: 2 });
+
+ stream.on("finish", () => {
+ assertEquals(read.val, 1);
+ assertEquals(written.val, 2);
+ });
+});
+
+Deno.test("Duplex stream gets constructed correctly", () => {
+ const d1 = new Duplex({
+ objectMode: true,
+ highWaterMark: 100,
+ });
+
+ assertEquals(d1.readableObjectMode, true);
+ assertEquals(d1.readableHighWaterMark, 100);
+ assertEquals(d1.writableObjectMode, true);
+ assertEquals(d1.writableHighWaterMark, 100);
+
+ const d2 = new Duplex({
+ readableObjectMode: false,
+ readableHighWaterMark: 10,
+ writableObjectMode: true,
+ writableHighWaterMark: 100,
+ });
+
+ assertEquals(d2.writableObjectMode, true);
+ assertEquals(d2.writableHighWaterMark, 100);
+ assertEquals(d2.readableObjectMode, false);
+ assertEquals(d2.readableHighWaterMark, 10);
+});
+
+Deno.test("Duplex stream can be paused", () => {
+ const readable = new Duplex();
+
+ // _read is a noop, here.
+ readable._read = () => {};
+
+ // Default state of a stream is not "paused"
+ assert(!readable.isPaused());
+
+ // Make the stream start flowing...
+ readable.on("data", () => {});
+
+ // still not paused.
+ assert(!readable.isPaused());
+
+ readable.pause();
+ assert(readable.isPaused());
+ readable.resume();
+ assert(!readable.isPaused());
+});
+
+Deno.test("Duplex stream sets enconding correctly", () => {
+ const readable = new Duplex({
+ read() {},
+ });
+
+ readable.setEncoding("utf8");
+
+ readable.push(new TextEncoder().encode("DEF"));
+ readable.unshift(new TextEncoder().encode("ABC"));
+
+ assertStrictEquals(readable.read(), "ABCDEF");
+});
+
+Deno.test("Duplex stream sets encoding correctly", () => {
+ const readable = new Duplex({
+ read() {},
+ });
+
+ readable.setEncoding("utf8");
+
+ readable.push(new TextEncoder().encode("DEF"));
+ readable.unshift(new TextEncoder().encode("ABC"));
+
+ assertStrictEquals(readable.read(), "ABCDEF");
+});
+
+Deno.test("Duplex stream holds up a big push", async () => {
+ let readExecuted = 0;
+ const readExecutedExpected = 3;
+ const readExpectedExecutions = deferred();
+
+ let endExecuted = 0;
+ const endExecutedExpected = 1;
+ const endExpectedExecutions = deferred();
+
+ const str = "asdfasdfasdfasdfasdf";
+
+ const r = new Duplex({
+ highWaterMark: 5,
+ encoding: "utf8",
+ });
+
+ let reads = 0;
+
+ function _read() {
+ if (reads === 0) {
+ setTimeout(() => {
+ r.push(str);
+ }, 1);
+ reads++;
+ } else if (reads === 1) {
+ const ret = r.push(str);
+ assertEquals(ret, false);
+ reads++;
+ } else {
+ r.push(null);
+ }
+ }
+
+ r._read = () => {
+ readExecuted++;
+ if (readExecuted == readExecutedExpected) {
+ readExpectedExecutions.resolve();
+ }
+ _read();
+ };
+
+ r.on("end", () => {
+ endExecuted++;
+ if (endExecuted == endExecutedExpected) {
+ endExpectedExecutions.resolve();
+ }
+ });
+
+ // Push some data in to start.
+ // We've never gotten any read event at this point.
+ const ret = r.push(str);
+ assert(!ret);
+ let chunk = r.read();
+ assertEquals(chunk, str);
+ chunk = r.read();
+ assertEquals(chunk, null);
+
+ r.once("readable", () => {
+ // This time, we'll get *all* the remaining data, because
+ // it's been added synchronously, as the read WOULD take
+ // us below the hwm, and so it triggered a _read() again,
+ // which synchronously added more, which we then return.
+ chunk = r.read();
+ assertEquals(chunk, str + str);
+
+ chunk = r.read();
+ assertEquals(chunk, null);
+ });
+
+ const readTimeout = setTimeout(
+ () => readExpectedExecutions.reject(),
+ 1000,
+ );
+ const endTimeout = setTimeout(
+ () => endExpectedExecutions.reject(),
+ 1000,
+ );
+ await readExpectedExecutions;
+ await endExpectedExecutions;
+ clearTimeout(readTimeout);
+ clearTimeout(endTimeout);
+ assertEquals(readExecuted, readExecutedExpected);
+ assertEquals(endExecuted, endExecutedExpected);
+});
+
+Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => {
+ let readableExecuted = 0;
+ const readableExecutedExpected = 1;
+ const readableExpectedExecutions = deferred();
+
+ const r = new Duplex({
+ highWaterMark: 3,
+ });
+
+ r._read = () => {
+ throw new Error("_read must not be called");
+ };
+ r.push(Buffer.from("blerg"));
+
+ setTimeout(function () {
+ assert(!r._readableState.reading);
+ r.on("readable", () => {
+ readableExecuted++;
+ if (readableExecuted == readableExecutedExpected) {
+ readableExpectedExecutions.resolve();
+ }
+ });
+ }, 1);
+
+ const readableTimeout = setTimeout(
+ () => readableExpectedExecutions.reject(),
+ 1000,
+ );
+ await readableExpectedExecutions;
+ clearTimeout(readableTimeout);
+ assertEquals(readableExecuted, readableExecutedExpected);
+});
+
+Deno.test("Duplex stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => {
+ let readableExecuted = 0;
+ const readableExecutedExpected = 1;
+ const readableExpectedExecutions = deferred();
+
+ let readExecuted = 0;
+ const readExecutedExpected = 1;
+ const readExpectedExecutions = deferred();
+
+ const r = new Duplex({
+ highWaterMark: 3,
+ });
+
+ r._read = () => {
+ readExecuted++;
+ if (readExecuted == readExecutedExpected) {
+ readExpectedExecutions.resolve();
+ }
+ };
+
+ r.push(Buffer.from("bl"));
+
+ setTimeout(function () {
+ assert(r._readableState.reading);
+ r.on("readable", () => {
+ readableExecuted++;
+ if (readableExecuted == readableExecutedExpected) {
+ readableExpectedExecutions.resolve();
+ }
+ });
+ }, 1);
+
+ const readableTimeout = setTimeout(
+ () => readableExpectedExecutions.reject(),
+ 1000,
+ );
+ const readTimeout = setTimeout(
+ () => readExpectedExecutions.reject(),
+ 1000,
+ );
+ await readableExpectedExecutions;
+ await readExpectedExecutions;
+ clearTimeout(readableTimeout);
+ clearTimeout(readTimeout);
+ assertEquals(readableExecuted, readableExecutedExpected);
+ assertEquals(readExecuted, readExecutedExpected);
+});
+
+Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => {
+ let readableExecuted = 0;
+ const readableExecutedExpected = 1;
+ const readableExpectedExecutions = deferred();
+
+ const r = new Duplex({
+ highWaterMark: 30,
+ });
+
+ r._read = () => {
+ throw new Error("Must not be executed");
+ };
+
+ r.push(Buffer.from("blerg"));
+ //This ends the stream and triggers end
+ r.push(null);
+
+ setTimeout(function () {
+ // Assert we're testing what we think we are
+ assert(!r._readableState.reading);
+ r.on("readable", () => {
+ readableExecuted++;
+ if (readableExecuted == readableExecutedExpected) {
+ readableExpectedExecutions.resolve();
+ }
+ });
+ }, 1);
+
+ const readableTimeout = setTimeout(
+ () => readableExpectedExecutions.reject(),
+ 1000,
+ );
+ await readableExpectedExecutions;
+ clearTimeout(readableTimeout);
+ assertEquals(readableExecuted, readableExecutedExpected);
+});
+
+Deno.test("Duplex stream: 'read' is emitted on empty string pushed in non-object mode", async () => {
+ let endExecuted = 0;
+ const endExecutedExpected = 1;
+ const endExpectedExecutions = deferred();
+
+ const underlyingData = ["", "x", "y", "", "z"];
+ const expected = underlyingData.filter((data) => data);
+ const result: unknown[] = [];
+
+ const r = new Duplex({
+ encoding: "utf8",
+ });
+ r._read = function () {
+ queueMicrotask(() => {
+ if (!underlyingData.length) {
+ this.push(null);
+ } else {
+ this.push(underlyingData.shift());
+ }
+ });
+ };
+
+ r.on("readable", () => {
+ const data = r.read();
+ if (data !== null) result.push(data);
+ });
+
+ r.on("end", () => {
+ endExecuted++;
+ if (endExecuted == endExecutedExpected) {
+ endExpectedExecutions.resolve();
+ }
+ assertEquals(result, expected);
+ });
+
+ const endTimeout = setTimeout(
+ () => endExpectedExecutions.reject(),
+ 1000,
+ );
+ await endExpectedExecutions;
+ clearTimeout(endTimeout);
+ assertEquals(endExecuted, endExecutedExpected);
+});
+
+Deno.test("Duplex stream: listeners can be removed", () => {
+ const r = new Duplex();
+ r._read = () => {};
+ r.on("data", () => {});
+
+ r.removeAllListeners("data");
+
+ assertEquals(r.eventNames().length, 0);
+});
+
+Deno.test("Duplex stream writes correctly", async () => {
+ let callback: undefined | ((error?: Error | null | undefined) => void);
+
+ let writeExecuted = 0;
+ const writeExecutedExpected = 1;
+ const writeExpectedExecutions = deferred();
+
+ let writevExecuted = 0;
+ const writevExecutedExpected = 1;
+ const writevExpectedExecutions = deferred();
+
+ const writable = new Duplex({
+ write: (chunk, encoding, cb) => {
+ writeExecuted++;
+ if (writeExecuted == writeExecutedExpected) {
+ writeExpectedExecutions.resolve();
+ }
+ assert(chunk instanceof Buffer);
+ assertStrictEquals(encoding, "buffer");
+ assertStrictEquals(String(chunk), "ABC");
+ callback = cb;
+ },
+ writev: (chunks) => {
+ writevExecuted++;
+ if (writevExecuted == writevExecutedExpected) {
+ writevExpectedExecutions.resolve();
+ }
+ assertStrictEquals(chunks.length, 2);
+ assertStrictEquals(chunks[0].encoding, "buffer");
+ assertStrictEquals(chunks[1].encoding, "buffer");
+ assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI");
+ },
+ });
+
+ writable.write(new TextEncoder().encode("ABC"));
+ writable.write(new TextEncoder().encode("DEF"));
+ writable.end(new TextEncoder().encode("GHI"));
+ callback?.();
+
+ const writeTimeout = setTimeout(
+ () => writeExpectedExecutions.reject(),
+ 1000,
+ );
+ const writevTimeout = setTimeout(
+ () => writevExpectedExecutions.reject(),
+ 1000,
+ );
+ await writeExpectedExecutions;
+ await writevExpectedExecutions;
+ clearTimeout(writeTimeout);
+ clearTimeout(writevTimeout);
+ assertEquals(writeExecuted, writeExecutedExpected);
+ assertEquals(writevExecuted, writevExecutedExpected);
+});
+
+Deno.test("Duplex stream writes Uint8Array in object mode", async () => {
+ let writeExecuted = 0;
+ const writeExecutedExpected = 1;
+ const writeExpectedExecutions = deferred();
+
+ const ABC = new TextEncoder().encode("ABC");
+
+ const writable = new Duplex({
+ objectMode: true,
+ write: (chunk, encoding, cb) => {
+ writeExecuted++;
+ if (writeExecuted == writeExecutedExpected) {
+ writeExpectedExecutions.resolve();
+ }
+ assert(!(chunk instanceof Buffer));
+ assert(chunk instanceof Uint8Array);
+ assertEquals(chunk, ABC);
+ assertEquals(encoding, "utf8");
+ cb();
+ },
+ });
+
+ writable.end(ABC);
+
+ const writeTimeout = setTimeout(
+ () => writeExpectedExecutions.reject(),
+ 1000,
+ );
+ await writeExpectedExecutions;
+ clearTimeout(writeTimeout);
+ assertEquals(writeExecuted, writeExecutedExpected);
+});
+
+Deno.test("Duplex stream throws on unexpected close", async () => {
+ let finishedExecuted = 0;
+ const finishedExecutedExpected = 1;
+ const finishedExpectedExecutions = deferred();
+
+ const writable = new Duplex({
+ write: () => {},
+ });
+ writable.writable = false;
+ writable.destroy();
+
+ finished(writable, (err) => {
+ finishedExecuted++;
+ if (finishedExecuted == finishedExecutedExpected) {
+ finishedExpectedExecutions.resolve();
+ }
+ assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE");
+ });
+
+ const finishedTimeout = setTimeout(
+ () => finishedExpectedExecutions.reject(),
+ 1000,
+ );
+ await finishedExpectedExecutions;
+ clearTimeout(finishedTimeout);
+ assertEquals(finishedExecuted, finishedExecutedExpected);
+});
+
+Deno.test("Duplex stream finishes correctly after error", async () => {
+ let errorExecuted = 0;
+ const errorExecutedExpected = 1;
+ const errorExpectedExecutions = deferred();
+
+ let finishedExecuted = 0;
+ const finishedExecutedExpected = 1;
+ const finishedExpectedExecutions = deferred();
+
+ const w = new Duplex({
+ write(_chunk, _encoding, cb) {
+ cb(new Error());
+ },
+ autoDestroy: false,
+ });
+ w.write("asd");
+ w.on("error", () => {
+ errorExecuted++;
+ if (errorExecuted == errorExecutedExpected) {
+ errorExpectedExecutions.resolve();
+ }
+ finished(w, () => {
+ finishedExecuted++;
+ if (finishedExecuted == finishedExecutedExpected) {
+ finishedExpectedExecutions.resolve();
+ }
+ });
+ });
+
+ const errorTimeout = setTimeout(
+ () => errorExpectedExecutions.reject(),
+ 1000,
+ );
+ const finishedTimeout = setTimeout(
+ () => finishedExpectedExecutions.reject(),
+ 1000,
+ );
+ await finishedExpectedExecutions;
+ await errorExpectedExecutions;
+ clearTimeout(finishedTimeout);
+ clearTimeout(errorTimeout);
+ assertEquals(finishedExecuted, finishedExecutedExpected);
+ assertEquals(errorExecuted, errorExecutedExpected);
+});
+
+Deno.test("Duplex stream fails on 'write' null value", () => {
+ const writable = new Duplex();
+ assertThrows(() => writable.write(null));
+});
+
+Deno.test("Duplex stream is destroyed correctly", async () => {
+ let closeExecuted = 0;
+ const closeExecutedExpected = 1;
+ const closeExpectedExecutions = deferred();
+
+ const unexpectedExecution = deferred();
+
+ const duplex = new Duplex({
+ write(_chunk, _enc, cb) {
+ cb();
+ },
+ read() {},
+ });
+
+ duplex.resume();
+
+ function never() {
+ unexpectedExecution.reject();
+ }
+
+ duplex.on("end", never);
+ duplex.on("finish", never);
+ duplex.on("close", () => {
+ closeExecuted++;
+ if (closeExecuted == closeExecutedExpected) {
+ closeExpectedExecutions.resolve();
+ }
+ });
+
+ duplex.destroy();
+ assertEquals(duplex.destroyed, true);
+
+ const closeTimeout = setTimeout(
+ () => closeExpectedExecutions.reject(),
+ 1000,
+ );
+ await Promise.race([
+ unexpectedExecution,
+ delay(100),
+ ]);
+ await closeExpectedExecutions;
+ clearTimeout(closeTimeout);
+ assertEquals(closeExecuted, closeExecutedExpected);
+});
+
+Deno.test("Duplex stream errors correctly on destroy", async () => {
+ let errorExecuted = 0;
+ const errorExecutedExpected = 1;
+ const errorExpectedExecutions = deferred();
+
+ const unexpectedExecution = deferred();
+
+ const duplex = new Duplex({
+ write(_chunk, _enc, cb) {
+ cb();
+ },
+ read() {},
+ });
+ duplex.resume();
+
+ const expected = new Error("kaboom");
+
+ function never() {
+ unexpectedExecution.reject();
+ }
+
+ duplex.on("end", never);
+ duplex.on("finish", never);
+ duplex.on("error", (err) => {
+ errorExecuted++;
+ if (errorExecuted == errorExecutedExpected) {
+ errorExpectedExecutions.resolve();
+ }
+ assertStrictEquals(err, expected);
+ });
+
+ duplex.destroy(expected);
+ assertEquals(duplex.destroyed, true);
+
+ const errorTimeout = setTimeout(
+ () => errorExpectedExecutions.reject(),
+ 1000,
+ );
+ await Promise.race([
+ unexpectedExecution,
+ delay(100),
+ ]);
+ await errorExpectedExecutions;
+ clearTimeout(errorTimeout);
+ assertEquals(errorExecuted, errorExecutedExpected);
+});
+
+Deno.test("Duplex stream doesn't finish on allowHalfOpen", async () => {
+ const unexpectedExecution = deferred();
+
+ const duplex = new Duplex({
+ read() {},
+ });
+
+ assertEquals(duplex.allowHalfOpen, true);
+ duplex.on("finish", () => unexpectedExecution.reject());
+ assertEquals(duplex.listenerCount("end"), 0);
+ duplex.resume();
+ duplex.push(null);
+
+ await Promise.race([
+ unexpectedExecution,
+ delay(100),
+ ]);
+});
+
+Deno.test("Duplex stream finishes when allowHalfOpen is disabled", async () => {
+ let finishExecuted = 0;
+ const finishExecutedExpected = 1;
+ const finishExpectedExecutions = deferred();
+
+ const duplex = new Duplex({
+ read() {},
+ allowHalfOpen: false,
+ });
+
+ assertEquals(duplex.allowHalfOpen, false);
+ duplex.on("finish", () => {
+ finishExecuted++;
+ if (finishExecuted == finishExecutedExpected) {
+ finishExpectedExecutions.resolve();
+ }
+ });
+ assertEquals(duplex.listenerCount("end"), 0);
+ duplex.resume();
+ duplex.push(null);
+
+ const finishTimeout = setTimeout(
+ () => finishExpectedExecutions.reject(),
+ 1000,
+ );
+ await finishExpectedExecutions;
+ clearTimeout(finishTimeout);
+ assertEquals(finishExecuted, finishExecutedExpected);
+});
+
+Deno.test("Duplex stream doesn't finish when allowHalfOpen is disabled but stream ended", async () => {
+ const unexpectedExecution = deferred();
+
+ const duplex = new Duplex({
+ read() {},
+ allowHalfOpen: false,
+ });
+
+ assertEquals(duplex.allowHalfOpen, false);
+ duplex._writableState.ended = true;
+ duplex.on("finish", () => unexpectedExecution.reject());
+ assertEquals(duplex.listenerCount("end"), 0);
+ duplex.resume();
+ duplex.push(null);
+
+ await Promise.race([
+ unexpectedExecution,
+ delay(100),
+ ]);
+});
diff --git a/std/node/_stream/end-of-stream.ts b/std/node/_stream/end_of_stream.ts
index c42bb0e1c..6179e7fc4 100644
--- a/std/node/_stream/end-of-stream.ts
+++ b/std/node/_stream/end_of_stream.ts
@@ -1,5 +1,6 @@
// Copyright Node.js contributors. All rights reserved. MIT License.
import { once } from "../_utils.ts";
+import type Duplex from "./duplex.ts";
import type Readable from "./readable.ts";
import type Stream from "./stream.ts";
import type { ReadableState } from "./readable.ts";
@@ -11,7 +12,7 @@ import {
NodeErrorAbstraction,
} from "../_errors.ts";
-type StreamImplementations = Readable | Stream | Writable;
+export type StreamImplementations = Duplex | Readable | Stream | Writable;
// TODO(Soremwar)
// Bring back once requests are implemented
@@ -49,7 +50,7 @@ function isReadableEnded(stream: Readable) {
return rState.endEmitted || (rState.ended && rState.length === 0);
}
-interface FinishedOptions {
+export interface FinishedOptions {
error?: boolean;
readable?: boolean;
writable?: boolean;
diff --git a/std/node/_stream/end_of_stream_test.ts b/std/node/_stream/end_of_stream_test.ts
new file mode 100644
index 000000000..571e75b99
--- /dev/null
+++ b/std/node/_stream/end_of_stream_test.ts
@@ -0,0 +1,97 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import finished from "./end_of_stream.ts";
+import Readable from "./readable.ts";
+import Transform from "./transform.ts";
+import Writable from "./writable.ts";
+import { mustCall } from "../_utils.ts";
+import { assert, fail } from "../../testing/asserts.ts";
+import { deferred, delay } from "../../async/mod.ts";
+
+Deno.test("Finished appends to Readable correctly", async () => {
+ const rs = new Readable({
+ read() {},
+ });
+
+ const [finishedExecution, finishedCb] = mustCall((err) => {
+ assert(!err);
+ });
+
+ finished(rs, finishedCb);
+
+ rs.push(null);
+ rs.resume();
+
+ await finishedExecution;
+});
+
+Deno.test("Finished appends to Writable correctly", async () => {
+ const ws = new Writable({
+ write(_data, _enc, cb) {
+ cb();
+ },
+ });
+
+ const [finishedExecution, finishedCb] = mustCall((err) => {
+ assert(!err);
+ });
+
+ finished(ws, finishedCb);
+
+ ws.end();
+
+ await finishedExecution;
+});
+
+Deno.test("Finished appends to Transform correctly", async () => {
+ const tr = new Transform({
+ transform(_data, _enc, cb) {
+ cb();
+ },
+ });
+
+ let finish = false;
+ let ended = false;
+
+ tr.on("end", () => {
+ ended = true;
+ });
+
+ tr.on("finish", () => {
+ finish = true;
+ });
+
+ const [finishedExecution, finishedCb] = mustCall((err) => {
+ assert(!err);
+ assert(finish);
+ assert(ended);
+ });
+
+ finished(tr, finishedCb);
+
+ tr.end();
+ tr.resume();
+
+ await finishedExecution;
+});
+
+Deno.test("The function returned by Finished clears the listeners", async () => {
+ const finishedExecution = deferred();
+
+ const ws = new Writable({
+ write(_data, _env, cb) {
+ cb();
+ },
+ });
+
+ const removeListener = finished(ws, () => {
+ finishedExecution.reject();
+ });
+ removeListener();
+ ws.end();
+
+ await Promise.race([
+ delay(100),
+ finishedExecution,
+ ])
+ .catch(() => fail("Finished was executed"));
+});
diff --git a/std/node/_stream/passthrough.ts b/std/node/_stream/passthrough.ts
new file mode 100644
index 000000000..9126420e5
--- /dev/null
+++ b/std/node/_stream/passthrough.ts
@@ -0,0 +1,20 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import Transform from "./transform.ts";
+import type { TransformOptions } from "./transform.ts";
+import type { Encodings } from "../_utils.ts";
+
+export default class PassThrough extends Transform {
+ constructor(options?: TransformOptions) {
+ super(options);
+ }
+
+ _transform(
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ _encoding: Encodings,
+ // deno-lint-ignore no-explicit-any
+ cb: (error?: Error | null, data?: any) => void,
+ ) {
+ cb(null, chunk);
+ }
+}
diff --git a/std/node/_stream/pipeline.ts b/std/node/_stream/pipeline.ts
new file mode 100644
index 000000000..d02a92870
--- /dev/null
+++ b/std/node/_stream/pipeline.ts
@@ -0,0 +1,308 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { once } from "../_utils.ts";
+import { destroyer as implDestroyer } from "./destroy.ts";
+import eos from "./end_of_stream.ts";
+import createReadableStreamAsyncIterator from "./async_iterator.ts";
+import * as events from "../events.ts";
+import PassThrough from "./passthrough.ts";
+import {
+ ERR_INVALID_ARG_TYPE,
+ ERR_INVALID_CALLBACK,
+ ERR_INVALID_RETURN_VALUE,
+ ERR_MISSING_ARGS,
+ ERR_STREAM_DESTROYED,
+ NodeErrorAbstraction,
+} from "../_errors.ts";
+import type Duplex from "./duplex.ts";
+import type Readable from "./readable.ts";
+import type Stream from "./stream.ts";
+import type Transform from "./transform.ts";
+import type Writable from "./writable.ts";
+
+type Streams = Duplex | Readable | Writable;
+// deno-lint-ignore no-explicit-any
+type EndCallback = (err?: NodeErrorAbstraction | null, val?: any) => void;
+type TransformCallback =
+ // deno-lint-ignore no-explicit-any
+ | ((value?: any) => AsyncGenerator<any>)
+ // deno-lint-ignore no-explicit-any
+ | ((value?: any) => Promise<any>);
+/**
+ * This type represents an array that contains a data source,
+ * many Transform Streams, a writable stream destination
+ * and end in an optional callback
+ * */
+type DataSource =
+ // deno-lint-ignore no-explicit-any
+ | (() => AsyncGenerator<any>)
+ | // deno-lint-ignore no-explicit-any
+ AsyncIterable<any>
+ | Duplex
+ | // deno-lint-ignore no-explicit-any
+ Iterable<any>
+ | // deno-lint-ignore no-explicit-any
+ (() => Generator<any>)
+ | Readable;
+type Transformers = Duplex | Transform | TransformCallback | Writable;
+export type PipelineArguments = [
+ DataSource,
+ ...Array<Transformers | EndCallback>,
+];
+
+function destroyer(
+ stream: Streams,
+ reading: boolean,
+ writing: boolean,
+ callback: EndCallback,
+) {
+ callback = once(callback);
+
+ let finished = false;
+ stream.on("close", () => {
+ finished = true;
+ });
+
+ eos(stream, { readable: reading, writable: writing }, (err) => {
+ finished = !err;
+
+ // deno-lint-ignore no-explicit-any
+ const rState = (stream as any)?._readableState;
+ if (
+ err &&
+ err.code === "ERR_STREAM_PREMATURE_CLOSE" &&
+ reading &&
+ (rState?.ended && !rState?.errored && !rState?.errorEmitted)
+ ) {
+ stream
+ .once("end", callback)
+ .once("error", callback);
+ } else {
+ callback(err);
+ }
+ });
+
+ return (err: NodeErrorAbstraction) => {
+ if (finished) return;
+ finished = true;
+ implDestroyer(stream, err);
+ callback(err || new ERR_STREAM_DESTROYED("pipe"));
+ };
+}
+
+function popCallback(streams: PipelineArguments): EndCallback {
+ if (typeof streams[streams.length - 1] !== "function") {
+ throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]);
+ }
+ return streams.pop() as EndCallback;
+}
+
+// function isPromise(obj) {
+// return !!(obj && typeof obj.then === "function");
+// }
+
+// deno-lint-ignore no-explicit-any
+function isReadable(obj: any): obj is Stream {
+ return !!(obj && typeof obj.pipe === "function");
+}
+
+// deno-lint-ignore no-explicit-any
+function isWritable(obj: any) {
+ return !!(obj && typeof obj.write === "function");
+}
+
+// deno-lint-ignore no-explicit-any
+function isStream(obj: any) {
+ return isReadable(obj) || isWritable(obj);
+}
+
+// deno-lint-ignore no-explicit-any
+function isIterable(obj: any, isAsync?: boolean) {
+ if (!obj) return false;
+ if (isAsync === true) return typeof obj[Symbol.asyncIterator] === "function";
+ if (isAsync === false) return typeof obj[Symbol.iterator] === "function";
+ return typeof obj[Symbol.asyncIterator] === "function" ||
+ typeof obj[Symbol.iterator] === "function";
+}
+
+// deno-lint-ignore no-explicit-any
+function makeAsyncIterable(val: Readable | Iterable<any> | AsyncIterable<any>) {
+ if (isIterable(val)) {
+ return val;
+ } else if (isReadable(val)) {
+ return fromReadable(val as Readable);
+ }
+ throw new ERR_INVALID_ARG_TYPE(
+ "val",
+ ["Readable", "Iterable", "AsyncIterable"],
+ val,
+ );
+}
+
+async function* fromReadable(val: Readable) {
+ yield* createReadableStreamAsyncIterator(val);
+}
+
+async function pump(
+ // deno-lint-ignore no-explicit-any
+ iterable: Iterable<any>,
+ writable: Duplex | Writable,
+ finish: (err?: NodeErrorAbstraction | null) => void,
+) {
+ let error;
+ try {
+ for await (const chunk of iterable) {
+ if (!writable.write(chunk)) {
+ if (writable.destroyed) return;
+ await events.once(writable, "drain");
+ }
+ }
+ writable.end();
+ } catch (err) {
+ error = err;
+ } finally {
+ finish(error);
+ }
+}
+
+export default function pipeline(...args: PipelineArguments) {
+ const callback: EndCallback = once(popCallback(args));
+
+ let streams: [DataSource, ...Transformers[]];
+ if (args.length > 1) {
+ streams = args as [DataSource, ...Transformers[]];
+ } else {
+ throw new ERR_MISSING_ARGS("streams");
+ }
+
+ let error: NodeErrorAbstraction;
+ // deno-lint-ignore no-explicit-any
+ let value: any;
+ const destroys: Array<(err: NodeErrorAbstraction) => void> = [];
+
+ let finishCount = 0;
+
+ function finish(err?: NodeErrorAbstraction | null) {
+ const final = --finishCount === 0;
+
+ if (err && (!error || error.code === "ERR_STREAM_PREMATURE_CLOSE")) {
+ error = err;
+ }
+
+ if (!error && !final) {
+ return;
+ }
+
+ while (destroys.length) {
+ (destroys.shift() as (err: NodeErrorAbstraction) => void)(error);
+ }
+
+ if (final) {
+ callback(error, value);
+ }
+ }
+
+ // TODO(Soremwar)
+ // Simplify the hell out of this
+ // deno-lint-ignore no-explicit-any
+ let ret: any;
+ for (let i = 0; i < streams.length; i++) {
+ const stream = streams[i];
+ const reading = i < streams.length - 1;
+ const writing = i > 0;
+
+ if (isStream(stream)) {
+ finishCount++;
+ destroys.push(destroyer(stream as Streams, reading, writing, finish));
+ }
+
+ if (i === 0) {
+ if (typeof stream === "function") {
+ ret = stream();
+ if (!isIterable(ret)) {
+ throw new ERR_INVALID_RETURN_VALUE(
+ "Iterable, AsyncIterable or Stream",
+ "source",
+ ret,
+ );
+ }
+ } else if (isIterable(stream) || isReadable(stream)) {
+ ret = stream;
+ } else {
+ throw new ERR_INVALID_ARG_TYPE(
+ "source",
+ ["Stream", "Iterable", "AsyncIterable", "Function"],
+ stream,
+ );
+ }
+ } else if (typeof stream === "function") {
+ ret = makeAsyncIterable(ret);
+ ret = stream(ret);
+
+ if (reading) {
+ if (!isIterable(ret, true)) {
+ throw new ERR_INVALID_RETURN_VALUE(
+ "AsyncIterable",
+ `transform[${i - 1}]`,
+ ret,
+ );
+ }
+ } else {
+ // If the last argument to pipeline is not a stream
+ // we must create a proxy stream so that pipeline(...)
+ // always returns a stream which can be further
+ // composed through `.pipe(stream)`.
+ const pt = new PassThrough({
+ objectMode: true,
+ });
+ if (ret instanceof Promise) {
+ ret
+ .then((val) => {
+ value = val;
+ pt.end(val);
+ }, (err) => {
+ pt.destroy(err);
+ });
+ } else if (isIterable(ret, true)) {
+ finishCount++;
+ pump(ret, pt, finish);
+ } else {
+ throw new ERR_INVALID_RETURN_VALUE(
+ "AsyncIterable or Promise",
+ "destination",
+ ret,
+ );
+ }
+
+ ret = pt;
+
+ finishCount++;
+ destroys.push(destroyer(ret, false, true, finish));
+ }
+ } else if (isStream(stream)) {
+ if (isReadable(ret)) {
+ ret.pipe(stream as Readable);
+
+ // TODO(Soremwar)
+ // Reimplement after stdout and stderr are implemented
+ // if (stream === process.stdout || stream === process.stderr) {
+ // ret.on("end", () => stream.end());
+ // }
+ } else {
+ ret = makeAsyncIterable(ret);
+
+ finishCount++;
+ pump(ret, stream as Writable, finish);
+ }
+ ret = stream;
+ } else {
+ const name = reading ? `transform[${i - 1}]` : "destination";
+ throw new ERR_INVALID_ARG_TYPE(
+ name,
+ ["Stream", "Function"],
+ ret,
+ );
+ }
+ }
+
+ return ret as unknown as Readable;
+}
diff --git a/std/node/_stream/pipeline_test.ts b/std/node/_stream/pipeline_test.ts
new file mode 100644
index 000000000..aa1869416
--- /dev/null
+++ b/std/node/_stream/pipeline_test.ts
@@ -0,0 +1,387 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import PassThrough from "./passthrough.ts";
+import pipeline from "./pipeline.ts";
+import Readable from "./readable.ts";
+import Transform from "./transform.ts";
+import Writable from "./writable.ts";
+import { mustCall } from "../_utils.ts";
+import {
+ assert,
+ assertEquals,
+ assertStrictEquals,
+} from "../../testing/asserts.ts";
+import type { NodeErrorAbstraction } from "../_errors.ts";
+
+Deno.test("Pipeline ends on stream finished", async () => {
+ let finished = false;
+
+ // deno-lint-ignore no-explicit-any
+ const processed: any[] = [];
+ const expected = [
+ Buffer.from("a"),
+ Buffer.from("b"),
+ Buffer.from("c"),
+ ];
+
+ const read = new Readable({
+ read() {},
+ });
+
+ const write = new Writable({
+ write(data, _enc, cb) {
+ processed.push(data);
+ cb();
+ },
+ });
+
+ write.on("finish", () => {
+ finished = true;
+ });
+
+ for (let i = 0; i < expected.length; i++) {
+ read.push(expected[i]);
+ }
+ read.push(null);
+
+ const [finishedCompleted, finishedCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assert(finished);
+ assertEquals(processed, expected);
+ },
+ 1,
+ );
+
+ pipeline(read, write, finishedCb);
+
+ await finishedCompleted;
+});
+
+Deno.test("Pipeline fails on stream destroyed", async () => {
+ const read = new Readable({
+ read() {},
+ });
+
+ const write = new Writable({
+ write(_data, _enc, cb) {
+ cb();
+ },
+ });
+
+ read.push("data");
+ queueMicrotask(() => read.destroy());
+
+ const [pipelineExecuted, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(err);
+ },
+ 1,
+ );
+ pipeline(read, write, pipelineCb);
+
+ await pipelineExecuted;
+});
+
+Deno.test("Pipeline exits on stream error", async () => {
+ const read = new Readable({
+ read() {},
+ });
+
+ const transform = new Transform({
+ transform(_data, _enc, cb) {
+ cb(new Error("kaboom"));
+ },
+ });
+
+ const write = new Writable({
+ write(_data, _enc, cb) {
+ cb();
+ },
+ });
+
+ const [readExecution, readCb] = mustCall();
+ read.on("close", readCb);
+ const [closeExecution, closeCb] = mustCall();
+ transform.on("close", closeCb);
+ const [writeExecution, writeCb] = mustCall();
+ write.on("close", writeCb);
+
+ const errorExecutions = [read, transform, write]
+ .map((stream) => {
+ const [execution, cb] = mustCall((err?: NodeErrorAbstraction | null) => {
+ assertEquals(err, new Error("kaboom"));
+ });
+
+ stream.on("error", cb);
+ return execution;
+ });
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assertEquals(err, new Error("kaboom"));
+ },
+ );
+ const dst = pipeline(read, transform, write, pipelineCb);
+
+ assertStrictEquals(dst, write);
+
+ read.push("hello");
+
+ await readExecution;
+ await closeExecution;
+ await writeExecution;
+ await Promise.all(errorExecutions);
+ await pipelineExecution;
+});
+
+Deno.test("Pipeline processes iterators correctly", async () => {
+ let res = "";
+ const w = new Writable({
+ write(chunk, _encoding, callback) {
+ res += chunk;
+ callback();
+ },
+ });
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assertEquals(res, "helloworld");
+ },
+ );
+ pipeline(
+ function* () {
+ yield "hello";
+ yield "world";
+ }(),
+ w,
+ pipelineCb,
+ );
+
+ await pipelineExecution;
+});
+
+Deno.test("Pipeline processes async iterators correctly", async () => {
+ let res = "";
+ const w = new Writable({
+ write(chunk, _encoding, callback) {
+ res += chunk;
+ callback();
+ },
+ });
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assertEquals(res, "helloworld");
+ },
+ );
+ pipeline(
+ async function* () {
+ await Promise.resolve();
+ yield "hello";
+ yield "world";
+ }(),
+ w,
+ pipelineCb,
+ );
+
+ await pipelineExecution;
+});
+
+Deno.test("Pipeline processes generators correctly", async () => {
+ let res = "";
+ const w = new Writable({
+ write(chunk, _encoding, callback) {
+ res += chunk;
+ callback();
+ },
+ });
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assertEquals(res, "helloworld");
+ },
+ );
+ pipeline(
+ function* () {
+ yield "hello";
+ yield "world";
+ },
+ w,
+ pipelineCb,
+ );
+
+ await pipelineExecution;
+});
+
+Deno.test("Pipeline processes async generators correctly", async () => {
+ let res = "";
+ const w = new Writable({
+ write(chunk, _encoding, callback) {
+ res += chunk;
+ callback();
+ },
+ });
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assertEquals(res, "helloworld");
+ },
+ );
+ pipeline(
+ async function* () {
+ await Promise.resolve();
+ yield "hello";
+ yield "world";
+ },
+ w,
+ pipelineCb,
+ );
+
+ await pipelineExecution;
+});
+
+Deno.test("Pipeline handles generator transforms", async () => {
+ let res = "";
+
+ const [pipelineExecuted, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assertEquals(res, "HELLOWORLD");
+ },
+ );
+ pipeline(
+ async function* () {
+ await Promise.resolve();
+ yield "hello";
+ yield "world";
+ },
+ async function* (source: string[]) {
+ for await (const chunk of source) {
+ yield chunk.toUpperCase();
+ }
+ },
+ async function (source: string[]) {
+ for await (const chunk of source) {
+ res += chunk;
+ }
+ },
+ pipelineCb,
+ );
+
+ await pipelineExecuted;
+});
+
+Deno.test("Pipeline passes result to final callback", async () => {
+ const [pipelineExecuted, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null, val?: unknown) => {
+ assert(!err);
+ assertEquals(val, "HELLOWORLD");
+ },
+ );
+ pipeline(
+ async function* () {
+ await Promise.resolve();
+ yield "hello";
+ yield "world";
+ },
+ async function* (source: string[]) {
+ for await (const chunk of source) {
+ yield chunk.toUpperCase();
+ }
+ },
+ async function (source: string[]) {
+ let ret = "";
+ for await (const chunk of source) {
+ ret += chunk;
+ }
+ return ret;
+ },
+ pipelineCb,
+ );
+
+ await pipelineExecuted;
+});
+
+Deno.test("Pipeline returns a stream after ending", async () => {
+ const [pipelineExecuted, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assertEquals(err, undefined);
+ },
+ );
+ const ret = pipeline(
+ async function* () {
+ await Promise.resolve();
+ yield "hello";
+ },
+ // deno-lint-ignore require-yield
+ async function* (source: string[]) {
+ for await (const chunk of source) {
+ chunk;
+ }
+ },
+ pipelineCb,
+ );
+
+ ret.resume();
+
+ assertEquals(typeof ret.pipe, "function");
+
+ await pipelineExecuted;
+});
+
+Deno.test("Pipeline returns a stream after erroring", async () => {
+ const errorText = "kaboom";
+
+ const [pipelineExecuted, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assertEquals(err?.message, errorText);
+ },
+ );
+ const ret = pipeline(
+ // deno-lint-ignore require-yield
+ async function* () {
+ await Promise.resolve();
+ throw new Error(errorText);
+ },
+ // deno-lint-ignore require-yield
+ async function* (source: string[]) {
+ for await (const chunk of source) {
+ chunk;
+ }
+ },
+ pipelineCb,
+ );
+
+ ret.resume();
+
+ assertEquals(typeof ret.pipe, "function");
+
+ await pipelineExecuted;
+});
+
+Deno.test("Pipeline destination gets destroyed on error", async () => {
+ const errorText = "kaboom";
+ const s = new PassThrough();
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assertEquals(err?.message, errorText);
+ assertEquals(s.destroyed, true);
+ },
+ );
+ pipeline(
+ // deno-lint-ignore require-yield
+ async function* () {
+ throw new Error(errorText);
+ },
+ s,
+ pipelineCb,
+ );
+
+ await pipelineExecution;
+});
diff --git a/std/node/_stream/promises.ts b/std/node/_stream/promises.ts
new file mode 100644
index 000000000..1adf4ea3f
--- /dev/null
+++ b/std/node/_stream/promises.ts
@@ -0,0 +1,42 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import pl from "./pipeline.ts";
+import type { PipelineArguments } from "./pipeline.ts";
+import eos from "./end_of_stream.ts";
+import type {
+ FinishedOptions,
+ StreamImplementations as FinishedStreams,
+} from "./end_of_stream.ts";
+
+export function pipeline(...streams: PipelineArguments) {
+ return new Promise((resolve, reject) => {
+ pl(
+ ...streams,
+ (err, value) => {
+ if (err) {
+ reject(err);
+ } else {
+ resolve(value);
+ }
+ },
+ );
+ });
+}
+
+export function finished(
+ stream: FinishedStreams,
+ opts?: FinishedOptions,
+) {
+ return new Promise<void>((resolve, reject) => {
+ eos(
+ stream,
+ opts || null,
+ (err) => {
+ if (err) {
+ reject(err);
+ } else {
+ resolve();
+ }
+ },
+ );
+ });
+}
diff --git a/std/node/_stream/promises_test.ts b/std/node/_stream/promises_test.ts
new file mode 100644
index 000000000..90803b4af
--- /dev/null
+++ b/std/node/_stream/promises_test.ts
@@ -0,0 +1,84 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import Readable from "./readable.ts";
+import Writable from "./writable.ts";
+import { pipeline } from "./promises.ts";
+import { deferred } from "../../async/mod.ts";
+import {
+ assert,
+ assertEquals,
+ assertThrowsAsync,
+} from "../../testing/asserts.ts";
+
+Deno.test("Promise pipeline works correctly", async () => {
+ let pipelineExecuted = 0;
+ const pipelineExecutedExpected = 1;
+ const pipelineExpectedExecutions = deferred();
+
+ let finished = false;
+ // deno-lint-ignore no-explicit-any
+ const processed: any[] = [];
+ const expected = [
+ Buffer.from("a"),
+ Buffer.from("b"),
+ Buffer.from("c"),
+ ];
+
+ const read = new Readable({
+ read() {},
+ });
+
+ const write = new Writable({
+ write(data, _enc, cb) {
+ processed.push(data);
+ cb();
+ },
+ });
+
+ write.on("finish", () => {
+ finished = true;
+ });
+
+ for (let i = 0; i < expected.length; i++) {
+ read.push(expected[i]);
+ }
+ read.push(null);
+
+ pipeline(read, write).then(() => {
+ pipelineExecuted++;
+ if (pipelineExecuted == pipelineExecutedExpected) {
+ pipelineExpectedExecutions.resolve();
+ }
+ assert(finished);
+ assertEquals(processed, expected);
+ });
+
+ const pipelineTimeout = setTimeout(
+ () => pipelineExpectedExecutions.reject(),
+ 1000,
+ );
+ await pipelineExpectedExecutions;
+ clearTimeout(pipelineTimeout);
+ assertEquals(pipelineExecuted, pipelineExecutedExpected);
+});
+
+Deno.test("Promise pipeline throws on readable destroyed", async () => {
+ const read = new Readable({
+ read() {},
+ });
+
+ const write = new Writable({
+ write(_data, _enc, cb) {
+ cb();
+ },
+ });
+
+ read.push("data");
+ read.destroy();
+
+ await assertThrowsAsync(
+ () => pipeline(read, write),
+ Error,
+ "Premature close",
+ );
+});
diff --git a/std/node/_stream/readable.ts b/std/node/_stream/readable.ts
index 72e61dff7..c8ed29953 100644
--- a/std/node/_stream/readable.ts
+++ b/std/node/_stream/readable.ts
@@ -1,492 +1,46 @@
// Copyright Node.js contributors. All rights reserved. MIT License.
-import EventEmitter, { captureRejectionSymbol } from "../events.ts";
+import { captureRejectionSymbol } from "../events.ts";
import Stream from "./stream.ts";
-import { Buffer } from "../buffer.ts";
+import type { Buffer } from "../buffer.ts";
import BufferList from "./buffer_list.ts";
import {
ERR_INVALID_OPT_VALUE,
ERR_METHOD_NOT_IMPLEMENTED,
- ERR_MULTIPLE_CALLBACK,
- ERR_STREAM_PUSH_AFTER_EOF,
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
} from "../_errors.ts";
+import type { Encodings } from "../_utils.ts";
import { StringDecoder } from "../string_decoder.ts";
import createReadableStreamAsyncIterator from "./async_iterator.ts";
import streamFrom from "./from.ts";
-import { kConstruct, kDestroy, kPaused } from "./symbols.ts";
-import type Writable from "./writable.ts";
-import { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts";
-
-function construct(stream: Readable, cb: () => void) {
- const r = stream._readableState;
-
- if (!stream._construct) {
- return;
- }
-
- stream.once(kConstruct, cb);
-
- r.constructed = false;
-
- queueMicrotask(() => {
- let called = false;
- stream._construct?.((err?: Error) => {
- r.constructed = true;
-
- if (called) {
- err = new ERR_MULTIPLE_CALLBACK();
- } else {
- called = true;
- }
-
- if (r.destroyed) {
- stream.emit(kDestroy, err);
- } else if (err) {
- errorOrDestroy(stream, err, true);
- } else {
- queueMicrotask(() => stream.emit(kConstruct));
- }
- });
- });
-}
-
-function _destroy(
- self: Readable,
- err?: Error,
- cb?: (error?: Error | null) => void,
-) {
- self._destroy(err || null, (err) => {
- const r = (self as Readable)._readableState;
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- r.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- if (!r.errorEmitted) {
- r.errorEmitted = true;
- self.emit("error", err);
- }
- r.closeEmitted = true;
- if (r.emitClose) {
- self.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- r.closeEmitted = true;
- if (r.emitClose) {
- self.emit("close");
- }
- });
- }
- });
-}
-
-function errorOrDestroy(stream: Readable, err: Error, sync = false) {
- const r = stream._readableState;
-
- if (r.destroyed) {
- return stream;
- }
-
- if (r.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!r.errored) {
- r.errored = err;
- }
- if (sync) {
- queueMicrotask(() => {
- if (!r.errorEmitted) {
- r.errorEmitted = true;
- stream.emit("error", err);
- }
- });
- } else if (!r.errorEmitted) {
- r.errorEmitted = true;
- stream.emit("error", err);
- }
- }
-}
-
-function flow(stream: Readable) {
- const state = stream._readableState;
- while (state.flowing && stream.read() !== null);
-}
-
-function pipeOnDrain(src: Readable, dest: Writable) {
- return function pipeOnDrainFunctionResult() {
- const state = src._readableState;
-
- if (state.awaitDrainWriters === dest) {
- state.awaitDrainWriters = null;
- } else if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).delete(dest);
- }
-
- if (
- (!state.awaitDrainWriters ||
- (state.awaitDrainWriters as Set<Writable>).size === 0) &&
- src.listenerCount("data")
- ) {
- state.flowing = true;
- flow(src);
- }
- };
-}
-
-function updateReadableListening(self: Readable) {
- const state = self._readableState;
- state.readableListening = self.listenerCount("readable") > 0;
-
- if (state.resumeScheduled && state[kPaused] === false) {
- // Flowing needs to be set to true now, otherwise
- // the upcoming resume will not flow.
- state.flowing = true;
-
- // Crude way to check if we should resume.
- } else if (self.listenerCount("data") > 0) {
- self.resume();
- } else if (!state.readableListening) {
- state.flowing = null;
- }
-}
-
-function nReadingNextTick(self: Readable) {
- self.read(0);
-}
-
-function resume(stream: Readable, state: ReadableState) {
- if (!state.resumeScheduled) {
- state.resumeScheduled = true;
- queueMicrotask(() => resume_(stream, state));
- }
-}
-
-function resume_(stream: Readable, state: ReadableState) {
- if (!state.reading) {
- stream.read(0);
- }
-
- state.resumeScheduled = false;
- stream.emit("resume");
- flow(stream);
- if (state.flowing && !state.reading) {
- stream.read(0);
- }
-}
-
-function readableAddChunk(
- stream: Readable,
- chunk: string | Buffer | Uint8Array | null,
- encoding: undefined | string = undefined,
- addToFront: boolean,
-) {
- const state = stream._readableState;
- let usedEncoding = encoding;
-
- let err;
- if (!state.objectMode) {
- if (typeof chunk === "string") {
- usedEncoding = encoding || state.defaultEncoding;
- if (state.encoding !== usedEncoding) {
- if (addToFront && state.encoding) {
- chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
- } else {
- chunk = Buffer.from(chunk, usedEncoding);
- usedEncoding = "";
- }
- }
- } else if (chunk instanceof Uint8Array) {
- chunk = Buffer.from(chunk);
- }
- }
-
- if (err) {
- errorOrDestroy(stream, err);
- } else if (chunk === null) {
- state.reading = false;
- onEofChunk(stream, state);
- } else if (state.objectMode || (chunk.length > 0)) {
- if (addToFront) {
- if (state.endEmitted) {
- errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
- } else {
- addChunk(stream, state, chunk, true);
- }
- } else if (state.ended) {
- errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
- } else if (state.destroyed || state.errored) {
- return false;
- } else {
- state.reading = false;
- if (state.decoder && !usedEncoding) {
- //TODO(Soremwar)
- //I don't think this cast is right
- chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
- if (state.objectMode || chunk.length !== 0) {
- addChunk(stream, state, chunk, false);
- } else {
- maybeReadMore(stream, state);
- }
- } else {
- addChunk(stream, state, chunk, false);
- }
- }
- } else if (!addToFront) {
- state.reading = false;
- maybeReadMore(stream, state);
- }
-
- return !state.ended &&
- (state.length < state.highWaterMark || state.length === 0);
-}
-
-function addChunk(
- stream: Readable,
- state: ReadableState,
- chunk: string | Buffer | Uint8Array,
- addToFront: boolean,
-) {
- if (state.flowing && state.length === 0 && !state.sync) {
- if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).clear();
- } else {
- state.awaitDrainWriters = null;
- }
- stream.emit("data", chunk);
- } else {
- // Update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length;
- if (addToFront) {
- state.buffer.unshift(chunk);
- } else {
- state.buffer.push(chunk);
- }
-
- if (state.needReadable) {
- emitReadable(stream);
- }
- }
- maybeReadMore(stream, state);
-}
-
-function prependListener(
- emitter: EventEmitter,
- event: string,
- // deno-lint-ignore no-explicit-any
- fn: (...args: any[]) => any,
-) {
- if (typeof emitter.prependListener === "function") {
- return emitter.prependListener(event, fn);
- }
-
- // This is a hack to make sure that our error handler is attached before any
- // userland ones. NEVER DO THIS. This is here only because this code needs
- // to continue to work with older versions of Node.js that do not include
- //the prependListener() method. The goal is to eventually remove this hack.
- // TODO(Soremwar)
- // Burn it with fire
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- if (emitter._events.get(event)?.length) {
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- const listeners = [fn, ...emitter._events.get(event)];
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- emitter._events.set(event, listeners);
- } else {
- emitter.on(event, fn);
- }
-}
-
-/** Pluck off n bytes from an array of buffers.
-* Length is the combined lengths of all the buffers in the list.
-* This function is designed to be inlinable, so please take care when making
-* changes to the function body.
-*/
-function fromList(n: number, state: ReadableState) {
- // nothing buffered.
- if (state.length === 0) {
- return null;
- }
-
- let ret;
- if (state.objectMode) {
- ret = state.buffer.shift();
- } else if (!n || n >= state.length) {
- if (state.decoder) {
- ret = state.buffer.join("");
- } else if (state.buffer.length === 1) {
- ret = state.buffer.first();
- } else {
- ret = state.buffer.concat(state.length);
- }
- state.buffer.clear();
- } else {
- ret = state.buffer.consume(n, !!state.decoder);
- }
-
- return ret;
-}
-
-function endReadable(stream: Readable) {
- const state = stream._readableState;
-
- if (!state.endEmitted) {
- state.ended = true;
- queueMicrotask(() => endReadableNT(state, stream));
- }
-}
-
-function endReadableNT(state: ReadableState, stream: Readable) {
- if (
- !state.errorEmitted && !state.closeEmitted &&
- !state.endEmitted && state.length === 0
- ) {
- state.endEmitted = true;
- stream.emit("end");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
- }
-}
-
-// Don't raise the hwm > 1GB.
-const MAX_HWM = 0x40000000;
-function computeNewHighWaterMark(n: number) {
- if (n >= MAX_HWM) {
- n = MAX_HWM;
- } else {
- n--;
- n |= n >>> 1;
- n |= n >>> 2;
- n |= n >>> 4;
- n |= n >>> 8;
- n |= n >>> 16;
- n++;
- }
- return n;
-}
-
-function howMuchToRead(n: number, state: ReadableState) {
- if (n <= 0 || (state.length === 0 && state.ended)) {
- return 0;
- }
- if (state.objectMode) {
- return 1;
- }
- if (Number.isNaN(n)) {
- // Only flow one buffer at a time.
- if (state.flowing && state.length) {
- return state.buffer.first().length;
- }
- return state.length;
- }
- if (n <= state.length) {
- return n;
- }
- return state.ended ? state.length : 0;
-}
-
-function onEofChunk(stream: Readable, state: ReadableState) {
- if (state.ended) return;
- if (state.decoder) {
- const chunk = state.decoder.end();
- if (chunk && chunk.length) {
- state.buffer.push(chunk);
- state.length += state.objectMode ? 1 : chunk.length;
- }
- }
- state.ended = true;
-
- if (state.sync) {
- emitReadable(stream);
- } else {
- state.needReadable = false;
- state.emittedReadable = true;
- emitReadable_(stream);
- }
-}
-
-function emitReadable(stream: Readable) {
- const state = stream._readableState;
- state.needReadable = false;
- if (!state.emittedReadable) {
- state.emittedReadable = true;
- queueMicrotask(() => emitReadable_(stream));
- }
-}
-
-function emitReadable_(stream: Readable) {
- const state = stream._readableState;
- if (!state.destroyed && !state.errored && (state.length || state.ended)) {
- stream.emit("readable");
- state.emittedReadable = false;
- }
-
- state.needReadable = !state.flowing &&
- !state.ended &&
- state.length <= state.highWaterMark;
- flow(stream);
-}
-
-function maybeReadMore(stream: Readable, state: ReadableState) {
- if (!state.readingMore && state.constructed) {
- state.readingMore = true;
- queueMicrotask(() => maybeReadMore_(stream, state));
- }
-}
-
-function maybeReadMore_(stream: Readable, state: ReadableState) {
- while (
- !state.reading && !state.ended &&
- (state.length < state.highWaterMark ||
- (state.flowing && state.length === 0))
- ) {
- const len = state.length;
- stream.read(0);
- if (len === state.length) {
- // Didn't get any data, stop spinning.
- break;
- }
- }
- state.readingMore = false;
-}
+import { kDestroy, kPaused } from "./symbols.ts";
+import {
+ _destroy,
+ computeNewHighWaterMark,
+ emitReadable,
+ endReadable,
+ errorOrDestroy,
+ fromList,
+ howMuchToRead,
+ nReadingNextTick,
+ pipeOnDrain,
+ prependListener,
+ readableAddChunk,
+ resume,
+ updateReadableListening,
+} from "./readable_internal.ts";
+import Writable from "./writable.ts";
+import { errorOrDestroy as errorOrDestroyWritable } from "./writable_internal.ts";
+import Duplex, { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts";
export interface ReadableOptions {
autoDestroy?: boolean;
- construct?: () => void;
- //TODO(Soremwar)
- //Import available encodings
- defaultEncoding?: string;
+ defaultEncoding?: Encodings;
destroy?(
this: Readable,
error: Error | null,
callback: (error: Error | null) => void,
): void;
emitClose?: boolean;
- //TODO(Soremwar)
- //Import available encodings
- encoding?: string;
+ encoding?: Encodings;
highWaterMark?: number;
objectMode?: boolean;
read?(this: Readable): void;
@@ -494,7 +48,7 @@ export interface ReadableOptions {
export class ReadableState {
[kPaused]: boolean | null = null;
- awaitDrainWriters: Writable | Set<Writable> | null = null;
+ awaitDrainWriters: Duplex | Writable | Set<Duplex | Writable> | null = null;
buffer = new BufferList();
closed = false;
closeEmitted = false;
@@ -502,9 +56,7 @@ export class ReadableState {
decoder: StringDecoder | null = null;
destroyed = false;
emittedReadable = false;
- //TODO(Soremwar)
- //Import available encodings
- encoding: string | null = null;
+ encoding: Encodings | null = null;
ended = false;
endEmitted = false;
errored: Error | null = null;
@@ -515,7 +67,7 @@ export class ReadableState {
multiAwaitDrain = false;
needReadable = false;
objectMode: boolean;
- pipes: Writable[] = [];
+ pipes: Array<Duplex | Writable> = [];
readable = true;
readableListening = false;
reading = false;
@@ -551,7 +103,6 @@ export class ReadableState {
}
class Readable extends Stream {
- _construct?: (cb: (error?: Error) => void) => void;
_readableState: ReadableState;
constructor(options?: ReadableOptions) {
@@ -563,15 +114,8 @@ class Readable extends Stream {
if (typeof options.destroy === "function") {
this._destroy = options.destroy;
}
- if (typeof options.construct === "function") {
- this._construct = options.construct;
- }
}
this._readableState = new ReadableState(options);
-
- construct(this, () => {
- maybeReadMore(this, this._readableState);
- });
}
static from(
@@ -690,13 +234,11 @@ class Readable extends Stream {
return ret;
}
- _read() {
+ _read(_size?: number) {
throw new ERR_METHOD_NOT_IMPLEMENTED("_read()");
}
- //TODO(Soremwar)
- //Should be duplex
- pipe<T extends Writable>(dest: T, pipeOpts?: { end?: boolean }): T {
+ pipe<T extends Duplex | Writable>(dest: T, pipeOpts?: { end?: boolean }): T {
// deno-lint-ignore no-this-alias
const src = this;
const state = this._readableState;
@@ -776,7 +318,7 @@ class Readable extends Stream {
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
- (state.awaitDrainWriters as Set<Writable>).add(dest);
+ (state.awaitDrainWriters as Set<Duplex | Writable>).add(dest);
}
src.pause();
}
@@ -791,12 +333,13 @@ class Readable extends Stream {
unpipe();
dest.removeListener("error", onerror);
if (dest.listenerCount("error") === 0) {
- //TODO(Soremwar)
- //Should be const s = dest._writableState || dest._readableState;
- const s = dest._writableState;
+ const s = dest._writableState || (dest as Duplex)._readableState;
if (s && !s.errorEmitted) {
- // User incorrectly emitted 'error' directly on the stream.
- errorOrDestroyDuplex(dest, er);
+ if (dest instanceof Duplex) {
+ errorOrDestroyDuplex(dest as unknown as Duplex, er);
+ } else {
+ errorOrDestroyWritable(dest as Writable, er);
+ }
} else {
dest.emit("error", er);
}
@@ -817,7 +360,7 @@ class Readable extends Stream {
dest.once("finish", onfinish);
function unpipe() {
- src.unpipe(dest);
+ src.unpipe(dest as Writable);
}
dest.emit("pipe", this);
@@ -834,12 +377,11 @@ class Readable extends Stream {
this._readableState.flowing === false;
}
- //TODO(Soremwar)
- //Replace string with encoding types
- setEncoding(enc: string) {
+ setEncoding(enc: Encodings) {
const decoder = new StringDecoder(enc);
this._readableState.decoder = decoder;
- this._readableState.encoding = this._readableState.decoder.encoding;
+ this._readableState.encoding = this._readableState.decoder
+ .encoding as Encodings;
const buffer = this._readableState.buffer;
let content = "";
@@ -931,7 +473,7 @@ class Readable extends Stream {
off = this.removeListener;
- destroy(err?: Error, cb?: () => void) {
+ destroy(err?: Error | null, cb?: () => void) {
const r = this._readableState;
if (r.destroyed) {
@@ -989,10 +531,8 @@ class Readable extends Stream {
this.destroy(err);
}
- //TODO(Soremwar)
- //Same deal, string => encodings
// deno-lint-ignore no-explicit-any
- push(chunk: any, encoding?: string): boolean {
+ push(chunk: any, encoding?: Encodings): boolean {
return readableAddChunk(this, chunk, encoding, false);
}
@@ -1233,7 +773,7 @@ class Readable extends Stream {
}
}
-Object.defineProperties(Stream, {
+Object.defineProperties(Readable, {
_readableState: { enumerable: false },
destroyed: { enumerable: false },
readableBuffer: { enumerable: false },
diff --git a/std/node/_stream/readable_internal.ts b/std/node/_stream/readable_internal.ts
new file mode 100644
index 000000000..0ef261d4d
--- /dev/null
+++ b/std/node/_stream/readable_internal.ts
@@ -0,0 +1,438 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import type Duplex from "./duplex.ts";
+import type EventEmitter from "../events.ts";
+import type Readable from "./readable.ts";
+import type Writable from "./writable.ts";
+import type { ReadableState } from "./readable.ts";
+import { kPaused } from "./symbols.ts";
+import {
+ ERR_STREAM_PUSH_AFTER_EOF,
+ ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
+} from "../_errors.ts";
+
+export function _destroy(
+ self: Readable,
+ err?: Error | null,
+ cb?: (error?: Error | null) => void,
+) {
+ self._destroy(err || null, (err) => {
+ const r = (self as Readable)._readableState;
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!r.errored) {
+ r.errored = err;
+ }
+ }
+
+ r.closed = true;
+
+ if (typeof cb === "function") {
+ cb(err);
+ }
+
+ if (err) {
+ queueMicrotask(() => {
+ if (!r.errorEmitted) {
+ r.errorEmitted = true;
+ self.emit("error", err);
+ }
+ r.closeEmitted = true;
+ if (r.emitClose) {
+ self.emit("close");
+ }
+ });
+ } else {
+ queueMicrotask(() => {
+ r.closeEmitted = true;
+ if (r.emitClose) {
+ self.emit("close");
+ }
+ });
+ }
+ });
+}
+
+export function addChunk(
+ stream: Duplex | Readable,
+ state: ReadableState,
+ chunk: string | Buffer | Uint8Array,
+ addToFront: boolean,
+) {
+ if (state.flowing && state.length === 0 && !state.sync) {
+ if (state.multiAwaitDrain) {
+ (state.awaitDrainWriters as Set<Writable>).clear();
+ } else {
+ state.awaitDrainWriters = null;
+ }
+ stream.emit("data", chunk);
+ } else {
+ // Update the buffer info.
+ state.length += state.objectMode ? 1 : chunk.length;
+ if (addToFront) {
+ state.buffer.unshift(chunk);
+ } else {
+ state.buffer.push(chunk);
+ }
+
+ if (state.needReadable) {
+ emitReadable(stream);
+ }
+ }
+ maybeReadMore(stream, state);
+}
+
+// Don't raise the hwm > 1GB.
+const MAX_HWM = 0x40000000;
+export function computeNewHighWaterMark(n: number) {
+ if (n >= MAX_HWM) {
+ n = MAX_HWM;
+ } else {
+ n--;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ n++;
+ }
+ return n;
+}
+
+export function emitReadable(stream: Duplex | Readable) {
+ const state = stream._readableState;
+ state.needReadable = false;
+ if (!state.emittedReadable) {
+ state.emittedReadable = true;
+ queueMicrotask(() => emitReadable_(stream));
+ }
+}
+
+function emitReadable_(stream: Duplex | Readable) {
+ const state = stream._readableState;
+ if (!state.destroyed && !state.errored && (state.length || state.ended)) {
+ stream.emit("readable");
+ state.emittedReadable = false;
+ }
+
+ state.needReadable = !state.flowing &&
+ !state.ended &&
+ state.length <= state.highWaterMark;
+ flow(stream);
+}
+
+export function endReadable(stream: Readable) {
+ const state = stream._readableState;
+
+ if (!state.endEmitted) {
+ state.ended = true;
+ queueMicrotask(() => endReadableNT(state, stream));
+ }
+}
+
+function endReadableNT(state: ReadableState, stream: Readable) {
+ if (
+ !state.errorEmitted && !state.closeEmitted &&
+ !state.endEmitted && state.length === 0
+ ) {
+ state.endEmitted = true;
+ stream.emit("end");
+
+ if (state.autoDestroy) {
+ stream.destroy();
+ }
+ }
+}
+
+export function errorOrDestroy(
+ stream: Duplex | Readable,
+ err: Error,
+ sync = false,
+) {
+ const r = stream._readableState;
+
+ if (r.destroyed) {
+ return stream;
+ }
+
+ if (r.autoDestroy) {
+ stream.destroy(err);
+ } else if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!r.errored) {
+ r.errored = err;
+ }
+ if (sync) {
+ queueMicrotask(() => {
+ if (!r.errorEmitted) {
+ r.errorEmitted = true;
+ stream.emit("error", err);
+ }
+ });
+ } else if (!r.errorEmitted) {
+ r.errorEmitted = true;
+ stream.emit("error", err);
+ }
+ }
+}
+
+function flow(stream: Duplex | Readable) {
+ const state = stream._readableState;
+ while (state.flowing && stream.read() !== null);
+}
+
+/** Pluck off n bytes from an array of buffers.
+* Length is the combined lengths of all the buffers in the list.
+* This function is designed to be inlinable, so please take care when making
+* changes to the function body.
+*/
+export function fromList(n: number, state: ReadableState) {
+ // nothing buffered.
+ if (state.length === 0) {
+ return null;
+ }
+
+ let ret;
+ if (state.objectMode) {
+ ret = state.buffer.shift();
+ } else if (!n || n >= state.length) {
+ if (state.decoder) {
+ ret = state.buffer.join("");
+ } else if (state.buffer.length === 1) {
+ ret = state.buffer.first();
+ } else {
+ ret = state.buffer.concat(state.length);
+ }
+ state.buffer.clear();
+ } else {
+ ret = state.buffer.consume(n, !!state.decoder);
+ }
+
+ return ret;
+}
+
+export function howMuchToRead(n: number, state: ReadableState) {
+ if (n <= 0 || (state.length === 0 && state.ended)) {
+ return 0;
+ }
+ if (state.objectMode) {
+ return 1;
+ }
+ if (Number.isNaN(n)) {
+ // Only flow one buffer at a time.
+ if (state.flowing && state.length) {
+ return state.buffer.first().length;
+ }
+ return state.length;
+ }
+ if (n <= state.length) {
+ return n;
+ }
+ return state.ended ? state.length : 0;
+}
+
+export function maybeReadMore(stream: Readable, state: ReadableState) {
+ if (!state.readingMore && state.constructed) {
+ state.readingMore = true;
+ queueMicrotask(() => maybeReadMore_(stream, state));
+ }
+}
+
+function maybeReadMore_(stream: Readable, state: ReadableState) {
+ while (
+ !state.reading && !state.ended &&
+ (state.length < state.highWaterMark ||
+ (state.flowing && state.length === 0))
+ ) {
+ const len = state.length;
+ stream.read(0);
+ if (len === state.length) {
+ // Didn't get any data, stop spinning.
+ break;
+ }
+ }
+ state.readingMore = false;
+}
+
+export function nReadingNextTick(self: Duplex | Readable) {
+ self.read(0);
+}
+
+export function onEofChunk(stream: Duplex | Readable, state: ReadableState) {
+ if (state.ended) return;
+ if (state.decoder) {
+ const chunk = state.decoder.end();
+ if (chunk && chunk.length) {
+ state.buffer.push(chunk);
+ state.length += state.objectMode ? 1 : chunk.length;
+ }
+ }
+ state.ended = true;
+
+ if (state.sync) {
+ emitReadable(stream);
+ } else {
+ state.needReadable = false;
+ state.emittedReadable = true;
+ emitReadable_(stream);
+ }
+}
+
+export function pipeOnDrain(src: Duplex | Readable, dest: Duplex | Writable) {
+ return function pipeOnDrainFunctionResult() {
+ const state = src._readableState;
+
+ if (state.awaitDrainWriters === dest) {
+ state.awaitDrainWriters = null;
+ } else if (state.multiAwaitDrain) {
+ (state.awaitDrainWriters as Set<Duplex | Writable>).delete(dest);
+ }
+
+ if (
+ (!state.awaitDrainWriters ||
+ (state.awaitDrainWriters as Set<Writable>).size === 0) &&
+ src.listenerCount("data")
+ ) {
+ state.flowing = true;
+ flow(src);
+ }
+ };
+}
+
+export function prependListener(
+ emitter: EventEmitter,
+ event: string,
+ // deno-lint-ignore no-explicit-any
+ fn: (...args: any[]) => any,
+) {
+ if (typeof emitter.prependListener === "function") {
+ return emitter.prependListener(event, fn);
+ }
+
+ // This is a hack to make sure that our error handler is attached before any
+ // userland ones. NEVER DO THIS. This is here only because this code needs
+ // to continue to work with older versions of Node.js that do not include
+ //the prependListener() method. The goal is to eventually remove this hack.
+ // TODO(Soremwar)
+ // Burn it with fire
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ if (emitter._events.get(event)?.length) {
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ const listeners = [fn, ...emitter._events.get(event)];
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ emitter._events.set(event, listeners);
+ } else {
+ emitter.on(event, fn);
+ }
+}
+
+export function readableAddChunk(
+ stream: Duplex | Readable,
+ chunk: string | Buffer | Uint8Array | null,
+ encoding: undefined | string = undefined,
+ addToFront: boolean,
+) {
+ const state = stream._readableState;
+ let usedEncoding = encoding;
+
+ let err;
+ if (!state.objectMode) {
+ if (typeof chunk === "string") {
+ usedEncoding = encoding || state.defaultEncoding;
+ if (state.encoding !== usedEncoding) {
+ if (addToFront && state.encoding) {
+ chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
+ } else {
+ chunk = Buffer.from(chunk, usedEncoding);
+ usedEncoding = "";
+ }
+ }
+ } else if (chunk instanceof Uint8Array) {
+ chunk = Buffer.from(chunk);
+ }
+ }
+
+ if (err) {
+ errorOrDestroy(stream, err);
+ } else if (chunk === null) {
+ state.reading = false;
+ onEofChunk(stream, state);
+ } else if (state.objectMode || (chunk.length > 0)) {
+ if (addToFront) {
+ if (state.endEmitted) {
+ errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
+ } else {
+ addChunk(stream, state, chunk, true);
+ }
+ } else if (state.ended) {
+ errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
+ } else if (state.destroyed || state.errored) {
+ return false;
+ } else {
+ state.reading = false;
+ if (state.decoder && !usedEncoding) {
+ //TODO(Soremwar)
+ //I don't think this cast is right
+ chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
+ if (state.objectMode || chunk.length !== 0) {
+ addChunk(stream, state, chunk, false);
+ } else {
+ maybeReadMore(stream, state);
+ }
+ } else {
+ addChunk(stream, state, chunk, false);
+ }
+ }
+ } else if (!addToFront) {
+ state.reading = false;
+ maybeReadMore(stream, state);
+ }
+
+ return !state.ended &&
+ (state.length < state.highWaterMark || state.length === 0);
+}
+
+export function resume(stream: Duplex | Readable, state: ReadableState) {
+ if (!state.resumeScheduled) {
+ state.resumeScheduled = true;
+ queueMicrotask(() => resume_(stream, state));
+ }
+}
+
+function resume_(stream: Duplex | Readable, state: ReadableState) {
+ if (!state.reading) {
+ stream.read(0);
+ }
+
+ state.resumeScheduled = false;
+ stream.emit("resume");
+ flow(stream);
+ if (state.flowing && !state.reading) {
+ stream.read(0);
+ }
+}
+
+export function updateReadableListening(self: Duplex | Readable) {
+ const state = self._readableState;
+ state.readableListening = self.listenerCount("readable") > 0;
+
+ if (state.resumeScheduled && state[kPaused] === false) {
+ // Flowing needs to be set to true now, otherwise
+ // the upcoming resume will not flow.
+ state.flowing = true;
+
+ // Crude way to check if we should resume.
+ } else if (self.listenerCount("data") > 0) {
+ self.resume();
+ } else if (!state.readableListening) {
+ state.flowing = null;
+ }
+}
diff --git a/std/node/_stream/stream.ts b/std/node/_stream/stream.ts
index 708b8bcd3..4daafc77b 100644
--- a/std/node/_stream/stream.ts
+++ b/std/node/_stream/stream.ts
@@ -1,6 +1,7 @@
// Copyright Node.js contributors. All rights reserved. MIT License.
import { Buffer } from "../buffer.ts";
import EventEmitter from "../events.ts";
+import type Readable from "./readable.ts";
import type Writable from "./writable.ts";
import { types } from "../util.ts";
@@ -12,7 +13,7 @@ class Stream extends EventEmitter {
static _isUint8Array = types.isUint8Array;
static _uint8ArrayToBuffer = (chunk: Uint8Array) => Buffer.from(chunk);
- pipe(dest: Writable, options: { end: boolean }) {
+ pipe(dest: Readable | Writable, options?: { end?: boolean }) {
// deno-lint-ignore no-this-alias
const source = this;
@@ -31,7 +32,8 @@ class Stream extends EventEmitter {
if (didOnEnd) return;
didOnEnd = true;
- dest.end();
+ // 'end' is only called on Writable streams
+ (dest as Writable).end();
}
function onclose() {
diff --git a/std/node/_stream/transform.ts b/std/node/_stream/transform.ts
new file mode 100644
index 000000000..a4246e81a
--- /dev/null
+++ b/std/node/_stream/transform.ts
@@ -0,0 +1,132 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Encodings } from "../_utils.ts";
+import Duplex from "./duplex.ts";
+import type { DuplexOptions } from "./duplex.ts";
+import type { writeV } from "./writable_internal.ts";
+import { ERR_METHOD_NOT_IMPLEMENTED } from "../_errors.ts";
+
+const kCallback = Symbol("kCallback");
+
+type TransformFlush = (
+ this: Transform,
+ // deno-lint-ignore no-explicit-any
+ callback: (error?: Error | null, data?: any) => void,
+) => void;
+
+export interface TransformOptions extends DuplexOptions {
+ read?(this: Transform, size: number): void;
+ write?(
+ this: Transform,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: Encodings,
+ callback: (error?: Error | null) => void,
+ ): void;
+ writev?: writeV;
+ final?(this: Transform, callback: (error?: Error | null) => void): void;
+ destroy?(
+ this: Transform,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ): void;
+ transform?(
+ this: Transform,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: Encodings,
+ // deno-lint-ignore no-explicit-any
+ callback: (error?: Error | null, data?: any) => void,
+ ): void;
+ flush?: TransformFlush;
+}
+
+export default class Transform extends Duplex {
+ [kCallback]: null | ((error?: Error | null) => void);
+ _flush?: TransformFlush;
+
+ constructor(options?: TransformOptions) {
+ super(options);
+ this._readableState.sync = false;
+
+ this[kCallback] = null;
+
+ if (options) {
+ if (typeof options.transform === "function") {
+ this._transform = options.transform;
+ }
+
+ if (typeof options.flush === "function") {
+ this._flush = options.flush;
+ }
+ }
+
+ this.on("prefinish", function (this: Transform) {
+ if (typeof this._flush === "function" && !this.destroyed) {
+ this._flush((er, data) => {
+ if (er) {
+ this.destroy(er);
+ return;
+ }
+
+ if (data != null) {
+ this.push(data);
+ }
+ this.push(null);
+ });
+ } else {
+ this.push(null);
+ }
+ });
+ }
+
+ _read = () => {
+ if (this[kCallback]) {
+ const callback = this[kCallback] as (error?: Error | null) => void;
+ this[kCallback] = null;
+ callback();
+ }
+ };
+
+ _transform(
+ // deno-lint-ignore no-explicit-any
+ _chunk: any,
+ _encoding: string,
+ // deno-lint-ignore no-explicit-any
+ _callback: (error?: Error | null, data?: any) => void,
+ ) {
+ throw new ERR_METHOD_NOT_IMPLEMENTED("_transform()");
+ }
+
+ _write = (
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ callback: (error?: Error | null) => void,
+ ) => {
+ const rState = this._readableState;
+ const wState = this._writableState;
+ const length = rState.length;
+
+ this._transform(chunk, encoding, (err, val) => {
+ if (err) {
+ callback(err);
+ return;
+ }
+
+ if (val != null) {
+ this.push(val);
+ }
+
+ if (
+ wState.ended || // Backwards compat.
+ length === rState.length || // Backwards compat.
+ rState.length < rState.highWaterMark ||
+ rState.length === 0
+ ) {
+ callback();
+ } else {
+ this[kCallback] = callback;
+ }
+ });
+ };
+}
diff --git a/std/node/_stream/transform_test.ts b/std/node/_stream/transform_test.ts
new file mode 100644
index 000000000..d3b90ff01
--- /dev/null
+++ b/std/node/_stream/transform_test.ts
@@ -0,0 +1,68 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import Transform from "./transform.ts";
+import finished from "./end_of_stream.ts";
+import { deferred } from "../../async/mod.ts";
+import { assert, assertEquals } from "../../testing/asserts.ts";
+
+Deno.test("Transform stream finishes correctly", async () => {
+ let finishedExecuted = 0;
+ const finishedExecutedExpected = 1;
+ const finishedExecution = deferred();
+
+ const tr = new Transform({
+ transform(_data, _enc, cb) {
+ cb();
+ },
+ });
+
+ let finish = false;
+ let ended = false;
+
+ tr.on("end", () => {
+ ended = true;
+ });
+
+ tr.on("finish", () => {
+ finish = true;
+ });
+
+ finished(tr, (err) => {
+ finishedExecuted++;
+ if (finishedExecuted === finishedExecutedExpected) {
+ finishedExecution.resolve();
+ }
+ assert(!err, "no error");
+ assert(finish);
+ assert(ended);
+ });
+
+ tr.end();
+ tr.resume();
+
+ const finishedTimeout = setTimeout(
+ () => finishedExecution.reject(),
+ 1000,
+ );
+ await finishedExecution;
+ clearTimeout(finishedTimeout);
+ assertEquals(finishedExecuted, finishedExecutedExpected);
+});
+
+Deno.test("Transform stream flushes data correctly", () => {
+ const expected = "asdf";
+
+ const t = new Transform({
+ transform: (_d, _e, n) => {
+ n();
+ },
+ flush: (n) => {
+ n(null, expected);
+ },
+ });
+
+ t.end(Buffer.from("blerg"));
+ t.on("data", (data) => {
+ assertEquals(data.toString(), expected);
+ });
+});
diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts
index 158af8325..534fc22fb 100644
--- a/std/node/_stream/writable.ts
+++ b/std/node/_stream/writable.ts
@@ -2,12 +2,10 @@
import { Buffer } from "../buffer.ts";
import Stream from "./stream.ts";
import { captureRejectionSymbol } from "../events.ts";
-import { kConstruct, kDestroy } from "./symbols.ts";
import {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_OPT_VALUE,
ERR_METHOD_NOT_IMPLEMENTED,
- ERR_MULTIPLE_CALLBACK,
ERR_STREAM_ALREADY_FINISHED,
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
@@ -15,493 +13,27 @@ import {
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING,
} from "../_errors.ts";
-
-function nop() {}
-
-//TODO(Soremwar)
-//Bring in encodings
-type write_v = (
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: string }>,
- callback: (error?: Error | null) => void,
-) => void;
-
-type AfterWriteTick = {
- cb: (error?: Error) => void;
- count: number;
- state: WritableState;
- stream: Writable;
-};
-
-const kOnFinished = Symbol("kOnFinished");
-
-function destroy(this: Writable, err?: Error, cb?: () => void) {
- const w = this._writableState;
-
- if (w.destroyed) {
- if (typeof cb === "function") {
- cb();
- }
-
- return this;
- }
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- }
-
- w.destroyed = true;
-
- if (!w.constructed) {
- this.once(kDestroy, (er) => {
- _destroy(this, err || er, cb);
- });
- } else {
- _destroy(this, err, cb);
- }
-
- return this;
-}
-
-function _destroy(
- self: Writable,
- err?: Error,
- cb?: (error?: Error | null) => void,
-) {
- self._destroy(err || null, (err) => {
- const w = self._writableState;
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- }
-
- w.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- if (!w.errorEmitted) {
- w.errorEmitted = true;
- self.emit("error", err);
- }
- w.closeEmitted = true;
- if (w.emitClose) {
- self.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- w.closeEmitted = true;
- if (w.emitClose) {
- self.emit("close");
- }
- });
- }
- });
-}
-
-function errorOrDestroy(stream: Writable, err: Error, sync = false) {
- const w = stream._writableState;
-
- if (w.destroyed) {
- return stream;
- }
-
- if (w.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (sync) {
- queueMicrotask(() => {
- if (w.errorEmitted) {
- return;
- }
- w.errorEmitted = true;
- stream.emit("error", err);
- });
- } else {
- if (w.errorEmitted) {
- return;
- }
- w.errorEmitted = true;
- stream.emit("error", err);
- }
- }
-}
-
-function construct(stream: Writable, cb: (error: Error) => void) {
- if (!stream._construct) {
- return;
- }
-
- stream.once(kConstruct, cb);
- const w = stream._writableState;
-
- w.constructed = false;
-
- queueMicrotask(() => {
- let called = false;
- stream._construct?.((err) => {
- w.constructed = true;
-
- if (called) {
- err = new ERR_MULTIPLE_CALLBACK();
- } else {
- called = true;
- }
-
- if (w.destroyed) {
- stream.emit(kDestroy, err);
- } else if (err) {
- errorOrDestroy(stream, err, true);
- } else {
- queueMicrotask(() => {
- stream.emit(kConstruct);
- });
- }
- });
- });
-}
-
-//TODO(Soremwar)
-//Bring encodings in
-function writeOrBuffer(
- stream: Writable,
- state: WritableState,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- callback: (error: Error) => void,
-) {
- const len = state.objectMode ? 1 : chunk.length;
-
- state.length += len;
-
- if (state.writing || state.corked || state.errored || !state.constructed) {
- state.buffered.push({ chunk, encoding, callback });
- if (state.allBuffers && encoding !== "buffer") {
- state.allBuffers = false;
- }
- if (state.allNoop && callback !== nop) {
- state.allNoop = false;
- }
- } else {
- state.writelen = len;
- state.writecb = callback;
- state.writing = true;
- state.sync = true;
- stream._write(chunk, encoding, state.onwrite);
- state.sync = false;
- }
-
- const ret = state.length < state.highWaterMark;
-
- if (!ret) {
- state.needDrain = true;
- }
-
- return ret && !state.errored && !state.destroyed;
-}
-
-//TODO(Soremwar)
-//Bring encodings in
-function doWrite(
- stream: Writable,
- state: WritableState,
- writev: boolean,
- len: number,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- cb: (error: Error) => void,
-) {
- state.writelen = len;
- state.writecb = cb;
- state.writing = true;
- state.sync = true;
- if (state.destroyed) {
- state.onwrite(new ERR_STREAM_DESTROYED("write"));
- } else if (writev) {
- (stream._writev as unknown as write_v)(chunk, state.onwrite);
- } else {
- stream._write(chunk, encoding, state.onwrite);
- }
- state.sync = false;
-}
-
-function onwriteError(
- stream: Writable,
- state: WritableState,
- er: Error,
- cb: (error: Error) => void,
-) {
- --state.pendingcb;
-
- cb(er);
- errorBuffer(state);
- errorOrDestroy(stream, er);
-}
-
-function onwrite(stream: Writable, er?: Error | null) {
- const state = stream._writableState;
- const sync = state.sync;
- const cb = state.writecb;
-
- if (typeof cb !== "function") {
- errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
- return;
- }
-
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
-
- if (er) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- er.stack;
-
- if (!state.errored) {
- state.errored = er;
- }
-
- if (sync) {
- queueMicrotask(() => onwriteError(stream, state, er, cb));
- } else {
- onwriteError(stream, state, er, cb);
- }
- } else {
- if (state.buffered.length > state.bufferedIndex) {
- clearBuffer(stream, state);
- }
-
- if (sync) {
- if (
- state.afterWriteTickInfo !== null &&
- state.afterWriteTickInfo.cb === cb
- ) {
- state.afterWriteTickInfo.count++;
- } else {
- state.afterWriteTickInfo = {
- count: 1,
- cb: (cb as (error?: Error) => void),
- stream,
- state,
- };
- queueMicrotask(() =>
- afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
- );
- }
- } else {
- afterWrite(stream, state, 1, cb as (error?: Error) => void);
- }
- }
-}
-
-function afterWriteTick({
- cb,
- count,
- state,
- stream,
-}: AfterWriteTick) {
- state.afterWriteTickInfo = null;
- return afterWrite(stream, state, count, cb);
-}
-
-function afterWrite(
- stream: Writable,
- state: WritableState,
- count: number,
- cb: (error?: Error) => void,
-) {
- const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
- state.needDrain;
- if (needDrain) {
- state.needDrain = false;
- stream.emit("drain");
- }
-
- while (count-- > 0) {
- state.pendingcb--;
- cb();
- }
-
- if (state.destroyed) {
- errorBuffer(state);
- }
-
- finishMaybe(stream, state);
-}
-
-/** If there's something in the buffer waiting, then invoke callbacks.*/
-function errorBuffer(state: WritableState) {
- if (state.writing) {
- return;
- }
-
- for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
- const { chunk, callback } = state.buffered[n];
- const len = state.objectMode ? 1 : chunk.length;
- state.length -= len;
- callback(new ERR_STREAM_DESTROYED("write"));
- }
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback(new ERR_STREAM_DESTROYED("end"));
- }
-
- resetBuffer(state);
-}
-
-/** If there's something in the buffer waiting, then process it.*/
-function clearBuffer(stream: Writable, state: WritableState) {
- if (
- state.corked ||
- state.bufferProcessing ||
- state.destroyed ||
- !state.constructed
- ) {
- return;
- }
-
- const { buffered, bufferedIndex, objectMode } = state;
- const bufferedLength = buffered.length - bufferedIndex;
-
- if (!bufferedLength) {
- return;
- }
-
- const i = bufferedIndex;
-
- state.bufferProcessing = true;
- if (bufferedLength > 1 && stream._writev) {
- state.pendingcb -= bufferedLength - 1;
-
- const callback = state.allNoop ? nop : (err: Error) => {
- for (let n = i; n < buffered.length; ++n) {
- buffered[n].callback(err);
- }
- };
- const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
-
- doWrite(stream, state, true, state.length, chunks, "", callback);
-
- resetBuffer(state);
- } else {
- do {
- const { chunk, encoding, callback } = buffered[i];
- const len = objectMode ? 1 : chunk.length;
- doWrite(stream, state, false, len, chunk, encoding, callback);
- } while (i < buffered.length && !state.writing);
-
- if (i === buffered.length) {
- resetBuffer(state);
- } else if (i > 256) {
- buffered.splice(0, i);
- state.bufferedIndex = 0;
- } else {
- state.bufferedIndex = i;
- }
- }
- state.bufferProcessing = false;
-}
-
-function finish(stream: Writable, state: WritableState) {
- state.pendingcb--;
- if (state.errorEmitted || state.closeEmitted) {
- return;
- }
-
- state.finished = true;
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback();
- }
-
- stream.emit("finish");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
-}
-
-function finishMaybe(stream: Writable, state: WritableState, sync?: boolean) {
- if (needFinish(state)) {
- prefinish(stream, state);
- if (state.pendingcb === 0 && needFinish(state)) {
- state.pendingcb++;
- if (sync) {
- queueMicrotask(() => finish(stream, state));
- } else {
- finish(stream, state);
- }
- }
- }
-}
-
-function prefinish(stream: Writable, state: WritableState) {
- if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === "function" && !state.destroyed) {
- state.finalCalled = true;
-
- state.sync = true;
- state.pendingcb++;
- stream._final((err) => {
- state.pendingcb--;
- if (err) {
- for (const callback of state[kOnFinished].splice(0)) {
- callback(err);
- }
- errorOrDestroy(stream, err, state.sync);
- } else if (needFinish(state)) {
- state.prefinished = true;
- stream.emit("prefinish");
- state.pendingcb++;
- queueMicrotask(() => finish(stream, state));
- }
- });
- state.sync = false;
- } else {
- state.prefinished = true;
- stream.emit("prefinish");
- }
- }
-}
-
-function needFinish(state: WritableState) {
- return (state.ending &&
- state.constructed &&
- state.length === 0 &&
- !state.errored &&
- state.buffered.length === 0 &&
- !state.finished &&
- !state.writing);
-}
-
-interface WritableOptions {
+import type { AfterWriteTick, writeV } from "./writable_internal.ts";
+import {
+ clearBuffer,
+ destroy,
+ errorBuffer,
+ errorOrDestroy,
+ finishMaybe,
+ kOnFinished,
+ nop,
+ onwrite,
+ resetBuffer,
+ writeOrBuffer,
+} from "./writable_internal.ts";
+import type { Encodings } from "../_utils.ts";
+
+type WritableEncodings = Encodings | "buffer";
+
+export interface WritableOptions {
autoDestroy?: boolean;
decodeStrings?: boolean;
- //TODO(Soremwar)
- //Bring encodings in
- defaultEncoding?: string;
+ defaultEncoding?: WritableEncodings;
destroy?(
this: Writable,
error: Error | null,
@@ -511,17 +43,13 @@ interface WritableOptions {
final?(this: Writable, callback: (error?: Error | null) => void): void;
highWaterMark?: number;
objectMode?: boolean;
- //TODO(Soremwar)
- //Bring encodings in
write?(
this: Writable,
// deno-lint-ignore no-explicit-any
chunk: any,
- encoding: string,
+ encoding: WritableEncodings,
callback: (error?: Error | null) => void,
): void;
- //TODO(Soremwar)
- //Bring encodings in
writev?(
this: Writable,
// deno-lint-ignore no-explicit-any
@@ -530,14 +58,12 @@ interface WritableOptions {
): void;
}
-class WritableState {
+export class WritableState {
[kOnFinished]: Array<(error?: Error) => void> = [];
afterWriteTickInfo: null | AfterWriteTick = null;
allBuffers = true;
allNoop = true;
autoDestroy: boolean;
- //TODO(Soremwar)
- //Bring in encodings
buffered: Array<{
allBuffers?: boolean;
// deno-lint-ignore no-explicit-any
@@ -552,7 +78,7 @@ class WritableState {
constructed: boolean;
corked = 0;
decodeStrings: boolean;
- defaultEncoding: string;
+ defaultEncoding: WritableEncodings;
destroyed = false;
emitClose: boolean;
ended = false;
@@ -608,25 +134,17 @@ class WritableState {
}
}
-function resetBuffer(state: WritableState) {
- state.buffered = [];
- state.bufferedIndex = 0;
- state.allBuffers = true;
- state.allNoop = true;
-}
-
/** A bit simpler than readable streams.
* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all
* the drain event emission and buffering.
*/
class Writable extends Stream {
- _construct?: (cb: (error?: Error) => void) => void;
_final?: (
this: Writable,
callback: (error?: Error | null | undefined) => void,
) => void;
_writableState: WritableState;
- _writev?: write_v | null = null;
+ _writev?: writeV | null = null;
constructor(options?: WritableOptions) {
super();
@@ -649,16 +167,6 @@ class Writable extends Stream {
this._final = options.final;
}
}
-
- construct(this, () => {
- const state = this._writableState;
-
- if (!state.writing) {
- clearBuffer(this, state);
- }
-
- finishMaybe(this, state);
- });
}
[captureRejectionSymbol](err?: Error) {
@@ -735,7 +243,7 @@ class Writable extends Stream {
cb(err);
}
- destroy(err?: Error, cb?: () => void) {
+ destroy(err?: Error | null, cb?: () => void) {
const state = this._writableState;
if (!state.destroyed) {
queueMicrotask(() => errorBuffer(state));
@@ -747,25 +255,19 @@ class Writable extends Stream {
end(cb?: () => void): void;
// deno-lint-ignore no-explicit-any
end(chunk: any, cb?: () => void): void;
- //TODO(Soremwar)
- //Bring in encodings
// deno-lint-ignore no-explicit-any
- end(chunk: any, encoding: string, cb?: () => void): void;
+ end(chunk: any, encoding: WritableEncodings, cb?: () => void): void;
end(
// deno-lint-ignore no-explicit-any
x?: any | (() => void),
- //TODO(Soremwar)
- //Bring in encodings
- y?: string | (() => void),
+ y?: WritableEncodings | (() => void),
z?: () => void,
) {
const state = this._writableState;
// deno-lint-ignore no-explicit-any
let chunk: any | null;
- //TODO(Soremwar)
- //Bring in encodings
- let encoding: string | null;
+ let encoding: WritableEncodings | null;
let cb: undefined | ((error?: Error) => void);
if (typeof x === "function") {
@@ -778,7 +280,7 @@ class Writable extends Stream {
cb = y;
} else {
chunk = x;
- encoding = y as string;
+ encoding = y as WritableEncodings;
cb = z;
}
@@ -815,8 +317,6 @@ class Writable extends Stream {
return this;
}
- //TODO(Soremwar)
- //Bring in encodings
_write(
// deno-lint-ignore no-explicit-any
chunk: any,
@@ -838,27 +338,21 @@ class Writable extends Stream {
// deno-lint-ignore no-explicit-any
write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean;
- //TODO(Soremwar)
- //Bring in encodings
write(
// deno-lint-ignore no-explicit-any
chunk: any,
- encoding: string | null,
+ encoding: WritableEncodings | null,
cb?: (error: Error | null | undefined) => void,
): boolean;
- //TODO(Soremwar)
- //Bring in encodings
write(
// deno-lint-ignore no-explicit-any
chunk: any,
- x?: string | null | ((error: Error | null | undefined) => void),
+ x?: WritableEncodings | null | ((error: Error | null | undefined) => void),
y?: ((error: Error | null | undefined) => void),
) {
const state = this._writableState;
- //TODO(Soremwar)
- //Bring in encodings
- let encoding: string;
+ let encoding: WritableEncodings;
let cb: (error?: Error | null) => void;
if (typeof x === "function") {
@@ -933,8 +427,6 @@ class Writable extends Stream {
}
}
- //TODO(Soremwar)
- //Bring allowed encodings
setDefaultEncoding(encoding: string) {
// node::ParseEncoding() requires lower case.
if (typeof encoding === "string") {
@@ -943,10 +435,9 @@ class Writable extends Stream {
if (!Buffer.isEncoding(encoding)) {
throw new ERR_UNKNOWN_ENCODING(encoding);
}
- this._writableState.defaultEncoding = encoding;
+ this._writableState.defaultEncoding = encoding as WritableEncodings;
return this;
}
}
export default Writable;
-export { WritableState };
diff --git a/std/node/_stream/writable_internal.ts b/std/node/_stream/writable_internal.ts
new file mode 100644
index 000000000..e8c001af0
--- /dev/null
+++ b/std/node/_stream/writable_internal.ts
@@ -0,0 +1,457 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import type Duplex from "./duplex.ts";
+import type Writable from "./writable.ts";
+import type { WritableState } from "./writable.ts";
+import { kDestroy } from "./symbols.ts";
+import { ERR_MULTIPLE_CALLBACK, ERR_STREAM_DESTROYED } from "../_errors.ts";
+
+export type writeV = (
+ // deno-lint-ignore no-explicit-any
+ chunks: Array<{ chunk: any; encoding: string }>,
+ callback: (error?: Error | null) => void,
+) => void;
+
+export type AfterWriteTick = {
+ cb: (error?: Error) => void;
+ count: number;
+ state: WritableState;
+ stream: Writable;
+};
+
+export const kOnFinished = Symbol("kOnFinished");
+
+function _destroy(
+ self: Writable,
+ err?: Error | null,
+ cb?: (error?: Error | null) => void,
+) {
+ self._destroy(err || null, (err) => {
+ const w = self._writableState;
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ }
+
+ w.closed = true;
+
+ if (typeof cb === "function") {
+ cb(err);
+ }
+
+ if (err) {
+ queueMicrotask(() => {
+ if (!w.errorEmitted) {
+ w.errorEmitted = true;
+ self.emit("error", err);
+ }
+ w.closeEmitted = true;
+ if (w.emitClose) {
+ self.emit("close");
+ }
+ });
+ } else {
+ queueMicrotask(() => {
+ w.closeEmitted = true;
+ if (w.emitClose) {
+ self.emit("close");
+ }
+ });
+ }
+ });
+}
+
+export function afterWrite(
+ stream: Writable,
+ state: WritableState,
+ count: number,
+ cb: (error?: Error) => void,
+) {
+ const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
+ state.needDrain;
+ if (needDrain) {
+ state.needDrain = false;
+ stream.emit("drain");
+ }
+
+ while (count-- > 0) {
+ state.pendingcb--;
+ cb();
+ }
+
+ if (state.destroyed) {
+ errorBuffer(state);
+ }
+
+ finishMaybe(stream, state);
+}
+
+export function afterWriteTick({
+ cb,
+ count,
+ state,
+ stream,
+}: AfterWriteTick) {
+ state.afterWriteTickInfo = null;
+ return afterWrite(stream, state, count, cb);
+}
+
+/** If there's something in the buffer waiting, then process it.*/
+export function clearBuffer(stream: Duplex | Writable, state: WritableState) {
+ if (
+ state.corked ||
+ state.bufferProcessing ||
+ state.destroyed ||
+ !state.constructed
+ ) {
+ return;
+ }
+
+ const { buffered, bufferedIndex, objectMode } = state;
+ const bufferedLength = buffered.length - bufferedIndex;
+
+ if (!bufferedLength) {
+ return;
+ }
+
+ const i = bufferedIndex;
+
+ state.bufferProcessing = true;
+ if (bufferedLength > 1 && stream._writev) {
+ state.pendingcb -= bufferedLength - 1;
+
+ const callback = state.allNoop ? nop : (err: Error) => {
+ for (let n = i; n < buffered.length; ++n) {
+ buffered[n].callback(err);
+ }
+ };
+ const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
+
+ doWrite(stream, state, true, state.length, chunks, "", callback);
+
+ resetBuffer(state);
+ } else {
+ do {
+ const { chunk, encoding, callback } = buffered[i];
+ const len = objectMode ? 1 : chunk.length;
+ doWrite(stream, state, false, len, chunk, encoding, callback);
+ } while (i < buffered.length && !state.writing);
+
+ if (i === buffered.length) {
+ resetBuffer(state);
+ } else if (i > 256) {
+ buffered.splice(0, i);
+ state.bufferedIndex = 0;
+ } else {
+ state.bufferedIndex = i;
+ }
+ }
+ state.bufferProcessing = false;
+}
+
+export function destroy(this: Writable, err?: Error | null, cb?: () => void) {
+ const w = this._writableState;
+
+ if (w.destroyed) {
+ if (typeof cb === "function") {
+ cb();
+ }
+
+ return this;
+ }
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ }
+
+ w.destroyed = true;
+
+ if (!w.constructed) {
+ this.once(kDestroy, (er) => {
+ _destroy(this, err || er, cb);
+ });
+ } else {
+ _destroy(this, err, cb);
+ }
+
+ return this;
+}
+
+function doWrite(
+ stream: Duplex | Writable,
+ state: WritableState,
+ writev: boolean,
+ len: number,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ cb: (error: Error) => void,
+) {
+ state.writelen = len;
+ state.writecb = cb;
+ state.writing = true;
+ state.sync = true;
+ if (state.destroyed) {
+ state.onwrite(new ERR_STREAM_DESTROYED("write"));
+ } else if (writev) {
+ (stream._writev as unknown as writeV)(chunk, state.onwrite);
+ } else {
+ stream._write(chunk, encoding, state.onwrite);
+ }
+ state.sync = false;
+}
+
+/** If there's something in the buffer waiting, then invoke callbacks.*/
+export function errorBuffer(state: WritableState) {
+ if (state.writing) {
+ return;
+ }
+
+ for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
+ const { chunk, callback } = state.buffered[n];
+ const len = state.objectMode ? 1 : chunk.length;
+ state.length -= len;
+ callback(new ERR_STREAM_DESTROYED("write"));
+ }
+
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback(new ERR_STREAM_DESTROYED("end"));
+ }
+
+ resetBuffer(state);
+}
+
+export function errorOrDestroy(stream: Writable, err: Error, sync = false) {
+ const w = stream._writableState;
+
+ if (w.destroyed) {
+ return stream;
+ }
+
+ if (w.autoDestroy) {
+ stream.destroy(err);
+ } else if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ if (sync) {
+ queueMicrotask(() => {
+ if (w.errorEmitted) {
+ return;
+ }
+ w.errorEmitted = true;
+ stream.emit("error", err);
+ });
+ } else {
+ if (w.errorEmitted) {
+ return;
+ }
+ w.errorEmitted = true;
+ stream.emit("error", err);
+ }
+ }
+}
+
+function finish(stream: Writable, state: WritableState) {
+ state.pendingcb--;
+ if (state.errorEmitted || state.closeEmitted) {
+ return;
+ }
+
+ state.finished = true;
+
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback();
+ }
+
+ stream.emit("finish");
+
+ if (state.autoDestroy) {
+ stream.destroy();
+ }
+}
+
+export function finishMaybe(
+ stream: Writable,
+ state: WritableState,
+ sync?: boolean,
+) {
+ if (needFinish(state)) {
+ prefinish(stream, state);
+ if (state.pendingcb === 0 && needFinish(state)) {
+ state.pendingcb++;
+ if (sync) {
+ queueMicrotask(() => finish(stream, state));
+ } else {
+ finish(stream, state);
+ }
+ }
+ }
+}
+
+export function needFinish(state: WritableState) {
+ return (state.ending &&
+ state.constructed &&
+ state.length === 0 &&
+ !state.errored &&
+ state.buffered.length === 0 &&
+ !state.finished &&
+ !state.writing);
+}
+
+export function nop() {}
+
+export function resetBuffer(state: WritableState) {
+ state.buffered = [];
+ state.bufferedIndex = 0;
+ state.allBuffers = true;
+ state.allNoop = true;
+}
+
+function onwriteError(
+ stream: Writable,
+ state: WritableState,
+ er: Error,
+ cb: (error: Error) => void,
+) {
+ --state.pendingcb;
+
+ cb(er);
+ errorBuffer(state);
+ errorOrDestroy(stream, er);
+}
+
+export function onwrite(stream: Writable, er?: Error | null) {
+ const state = stream._writableState;
+ const sync = state.sync;
+ const cb = state.writecb;
+
+ if (typeof cb !== "function") {
+ errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
+ return;
+ }
+
+ state.writing = false;
+ state.writecb = null;
+ state.length -= state.writelen;
+ state.writelen = 0;
+
+ if (er) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ er.stack;
+
+ if (!state.errored) {
+ state.errored = er;
+ }
+
+ if (sync) {
+ queueMicrotask(() => onwriteError(stream, state, er, cb));
+ } else {
+ onwriteError(stream, state, er, cb);
+ }
+ } else {
+ if (state.buffered.length > state.bufferedIndex) {
+ clearBuffer(stream, state);
+ }
+
+ if (sync) {
+ if (
+ state.afterWriteTickInfo !== null &&
+ state.afterWriteTickInfo.cb === cb
+ ) {
+ state.afterWriteTickInfo.count++;
+ } else {
+ state.afterWriteTickInfo = {
+ count: 1,
+ cb: (cb as (error?: Error) => void),
+ stream,
+ state,
+ };
+ queueMicrotask(() =>
+ afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
+ );
+ }
+ } else {
+ afterWrite(stream, state, 1, cb as (error?: Error) => void);
+ }
+ }
+}
+
+export function prefinish(stream: Writable, state: WritableState) {
+ if (!state.prefinished && !state.finalCalled) {
+ if (typeof stream._final === "function" && !state.destroyed) {
+ state.finalCalled = true;
+
+ state.sync = true;
+ state.pendingcb++;
+ stream._final((err) => {
+ state.pendingcb--;
+ if (err) {
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback(err);
+ }
+ errorOrDestroy(stream, err, state.sync);
+ } else if (needFinish(state)) {
+ state.prefinished = true;
+ stream.emit("prefinish");
+ state.pendingcb++;
+ queueMicrotask(() => finish(stream, state));
+ }
+ });
+ state.sync = false;
+ } else {
+ state.prefinished = true;
+ stream.emit("prefinish");
+ }
+ }
+}
+
+export function writeOrBuffer(
+ stream: Duplex | Writable,
+ state: WritableState,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ callback: (error: Error) => void,
+) {
+ const len = state.objectMode ? 1 : chunk.length;
+
+ state.length += len;
+
+ if (state.writing || state.corked || state.errored || !state.constructed) {
+ state.buffered.push({ chunk, encoding, callback });
+ if (state.allBuffers && encoding !== "buffer") {
+ state.allBuffers = false;
+ }
+ if (state.allNoop && callback !== nop) {
+ state.allNoop = false;
+ }
+ } else {
+ state.writelen = len;
+ state.writecb = callback;
+ state.writing = true;
+ state.sync = true;
+ stream._write(chunk, encoding, state.onwrite);
+ state.sync = false;
+ }
+
+ const ret = state.length < state.highWaterMark;
+
+ if (!ret) {
+ state.needDrain = true;
+ }
+
+ return ret && !state.errored && !state.destroyed;
+}
diff --git a/std/node/_stream/writable_test.ts b/std/node/_stream/writable_test.ts
index d0650c49e..d6133b65f 100644
--- a/std/node/_stream/writable_test.ts
+++ b/std/node/_stream/writable_test.ts
@@ -1,6 +1,6 @@
// Copyright Node.js contributors. All rights reserved. MIT License.
import { Buffer } from "../buffer.ts";
-import finished from "./end-of-stream.ts";
+import finished from "./end_of_stream.ts";
import Writable from "../_stream/writable.ts";
import { deferred } from "../../async/mod.ts";
import {