diff options
Diffstat (limited to 'std/node')
34 files changed, 4121 insertions, 1109 deletions
diff --git a/std/node/_buffer.ts b/std/node/_buffer.ts index 8cbd117ca..d7e8af6ee 100644 --- a/std/node/_buffer.ts +++ b/std/node/_buffer.ts @@ -1,7 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. import * as hex from "../encoding/hex.ts"; import * as base64 from "../encoding/base64.ts"; -import { normalizeEncoding, notImplemented } from "./_utils.ts"; +import { Encodings, normalizeEncoding, notImplemented } from "./_utils.ts"; const notImplementedEncodings = [ "ascii", @@ -11,7 +11,7 @@ const notImplementedEncodings = [ "utf16le", ]; -function checkEncoding(encoding = "utf8", strict = true): string { +function checkEncoding(encoding = "utf8", strict = true): Encodings { if (typeof encoding !== "string" || (strict && encoding === "")) { if (!strict) return "utf8"; throw new TypeError(`Unkown encoding: ${encoding}`); @@ -93,14 +93,14 @@ export class Buffer extends Uint8Array { let bufFill; if (typeof fill === "string") { - encoding = checkEncoding(encoding); + const clearEncoding = checkEncoding(encoding); if ( typeof fill === "string" && fill.length === 1 && - encoding === "utf8" + clearEncoding === "utf8" ) { buf.fill(fill.charCodeAt(0)); - } else bufFill = Buffer.from(fill, encoding); + } else bufFill = Buffer.from(fill, clearEncoding); } else if (typeof fill === "number") { buf.fill(fill); } else if (fill instanceof Uint8Array) { diff --git a/std/node/_errors.ts b/std/node/_errors.ts index 2424303b1..db37e3186 100644 --- a/std/node/_errors.ts +++ b/std/node/_errors.ts @@ -1232,7 +1232,7 @@ export class ERR_INVALID_BUFFER_SIZE extends NodeRangeError { } } export class ERR_INVALID_CALLBACK extends NodeTypeError { - constructor(object: { [key: string]: unknown }) { + constructor(object: unknown) { super( "ERR_INVALID_CALLBACK", `Callback must be a function. Received ${JSON.stringify(object)}`, diff --git a/std/node/_fs/_fs_appendFile.ts b/std/node/_fs/_fs_appendFile.ts index b7fce274f..5443b0374 100644 --- a/std/node/_fs/_fs_appendFile.ts +++ b/std/node/_fs/_fs_appendFile.ts @@ -1,12 +1,11 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. import { CallbackWithError, - Encodings, getOpenOptions, isFileOptions, WriteFileOptions, } from "./_fs_common.ts"; -import { notImplemented } from "../_utils.ts"; +import { Encodings, notImplemented } from "../_utils.ts"; import { fromFileUrl } from "../path.ts"; /** diff --git a/std/node/_fs/_fs_common.ts b/std/node/_fs/_fs_common.ts index 0b1ffe0bb..6fe77d54c 100644 --- a/std/node/_fs/_fs_common.ts +++ b/std/node/_fs/_fs_common.ts @@ -1,21 +1,13 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -import { notImplemented } from "../_utils.ts"; +import { + BinaryEncodings, + Encodings, + notImplemented, + TextEncodings, +} from "../_utils.ts"; export type CallbackWithError = (err?: Error | null) => void; -export type TextEncodings = - | "ascii" - | "utf8" - | "utf-8" - | "utf16le" - | "ucs2" - | "ucs-2" - | "base64" - | "latin1" - | "hex"; -export type BinaryEncodings = "binary"; -export type Encodings = TextEncodings | BinaryEncodings; - export interface FileOptions { encoding?: Encodings; flag?: string; diff --git a/std/node/_fs/_fs_readFile.ts b/std/node/_fs/_fs_readFile.ts index e4a7d2031..d380161fd 100644 --- a/std/node/_fs/_fs_readFile.ts +++ b/std/node/_fs/_fs_readFile.ts @@ -1,15 +1,13 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. import { - BinaryEncodings, BinaryOptionsArgument, - Encodings, FileOptionsArgument, getEncoding, - TextEncodings, TextOptionsArgument, } from "./_fs_common.ts"; import { Buffer } from "../buffer.ts"; import { fromFileUrl } from "../path.ts"; +import { BinaryEncodings, Encodings, TextEncodings } from "../_utils.ts"; function maybeDecode(data: Uint8Array, encoding: TextEncodings): string; function maybeDecode( diff --git a/std/node/_fs/_fs_writeFile.ts b/std/node/_fs/_fs_writeFile.ts index 19181ee21..434e779e3 100644 --- a/std/node/_fs/_fs_writeFile.ts +++ b/std/node/_fs/_fs_writeFile.ts @@ -1,12 +1,10 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -import { notImplemented } from "../_utils.ts"; +import { Encodings, notImplemented } from "../_utils.ts"; import { fromFileUrl } from "../path.ts"; import { Buffer } from "../buffer.ts"; - import { CallbackWithError, checkEncoding, - Encodings, getEncoding, getOpenOptions, isFileOptions, diff --git a/std/node/_fs/_fs_writeFile_test.ts b/std/node/_fs/_fs_writeFile_test.ts index 845d4a155..6e1e47e1a 100644 --- a/std/node/_fs/_fs_writeFile_test.ts +++ b/std/node/_fs/_fs_writeFile_test.ts @@ -6,7 +6,7 @@ import { assertThrows, } from "../../testing/asserts.ts"; import { writeFile, writeFileSync } from "./_fs_writeFile.ts"; -import type { TextEncodings } from "./_fs_common.ts"; +import type { TextEncodings } from "../_utils.ts"; import * as path from "../../path/mod.ts"; const moduleDir = path.dirname(path.fromFileUrl(import.meta.url)); diff --git a/std/node/_fs/promises/_fs_writeFile.ts b/std/node/_fs/promises/_fs_writeFile.ts index 1c9ea5032..bc398b1a2 100644 --- a/std/node/_fs/promises/_fs_writeFile.ts +++ b/std/node/_fs/promises/_fs_writeFile.ts @@ -1,5 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -import type { Encodings, WriteFileOptions } from "../_fs_common.ts"; +import type { WriteFileOptions } from "../_fs_common.ts"; +import type { Encodings } from "../../_utils.ts"; import { writeFile as writeFileCallback } from "../_fs_writeFile.ts"; diff --git a/std/node/_fs/promises/_fs_writeFile_test.ts b/std/node/_fs/promises/_fs_writeFile_test.ts index 698284057..296387827 100644 --- a/std/node/_fs/promises/_fs_writeFile_test.ts +++ b/std/node/_fs/promises/_fs_writeFile_test.ts @@ -6,7 +6,7 @@ import { assertThrowsAsync, } from "../../../testing/asserts.ts"; import { writeFile } from "./_fs_writeFile.ts"; -import type { TextEncodings } from "../_fs_common.ts"; +import type { TextEncodings } from "../../_utils.ts"; const decoder = new TextDecoder("utf-8"); diff --git a/std/node/_stream/async_iterator.ts b/std/node/_stream/async_iterator.ts index cd1b6db3c..5369ef39c 100644 --- a/std/node/_stream/async_iterator.ts +++ b/std/node/_stream/async_iterator.ts @@ -1,8 +1,9 @@ // Copyright Node.js contributors. All rights reserved. MIT License. import type { Buffer } from "../buffer.ts"; -import finished from "./end-of-stream.ts"; +import finished from "./end_of_stream.ts"; import Readable from "./readable.ts"; import type Stream from "./stream.ts"; +import { destroyer } from "./destroy.ts"; const kLastResolve = Symbol("lastResolve"); const kLastReject = Symbol("lastReject"); @@ -34,24 +35,6 @@ function initIteratorSymbols( Object.defineProperties(o, properties); } -// TODO(Soremwar) -// Bring back once requests are implemented -// function isRequest(stream: any) { -// return stream && stream.setHeader && typeof stream.abort === "function"; -// } - -//TODO(Soremwar) -//Should be any implementation of stream -// deno-lint-ignore no-explicit-any -function destroyer(stream: any, err?: Error | null) { - // TODO(Soremwar) - // Bring back once requests are implemented - // if (isRequest(stream)) return stream.abort(); - // if (isRequest(stream.req)) return stream.req.abort(); - if (typeof stream.destroy === "function") return stream.destroy(err); - if (typeof stream.close === "function") return stream.close(); -} - function createIterResult( value: IterableItem, done: boolean, @@ -119,7 +102,7 @@ const AsyncIteratorPrototype = Object.getPrototypeOf( Object.getPrototypeOf(async function* () {}).prototype, ); -class ReadableStreamAsyncIterator +export class ReadableStreamAsyncIterator implements AsyncIterableIterator<IterableItem> { [kEnded]: boolean; [kError]: Error | null = null; diff --git a/std/node/_stream/destroy.ts b/std/node/_stream/destroy.ts new file mode 100644 index 000000000..d13e12de2 --- /dev/null +++ b/std/node/_stream/destroy.ts @@ -0,0 +1,38 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import type Duplex from "./duplex.ts"; +import type Readable from "./readable.ts"; +import type Stream from "./stream.ts"; +import type Writable from "./writable.ts"; + +//This whole module acts as a 'normalizer' +//Idea behind it is you can pass any kind of streams and functions will execute anyways + +//TODO(Soremwar) +//Should be any implementation of stream +//This is a guard to check executed methods exist inside the implementation +type StreamImplementations = Duplex | Readable | Writable; + +// TODO(Soremwar) +// Bring back once requests are implemented +// function isRequest(stream: any) { +// return stream && stream.setHeader && typeof stream.abort === "function"; +// } + +export function destroyer(stream: Stream, err?: Error | null) { + // TODO(Soremwar) + // Bring back once requests are implemented + // if (isRequest(stream)) return stream.abort(); + // if (isRequest(stream.req)) return stream.req.abort(); + if ( + typeof (stream as StreamImplementations).destroy === "function" + ) { + return (stream as StreamImplementations).destroy(err); + } + // A test of async iterator mocks an upcoming implementation of stream + // his is casted to any in the meanwhile + // deno-lint-ignore no-explicit-any + if (typeof (stream as any).close === "function") { + // deno-lint-ignore no-explicit-any + return (stream as any).close(); + } +} diff --git a/std/node/_stream/duplex.ts b/std/node/_stream/duplex.ts index c5faed6f8..b5c429f0a 100644 --- a/std/node/_stream/duplex.ts +++ b/std/node/_stream/duplex.ts @@ -1,3 +1,682 @@ // Copyright Node.js contributors. All rights reserved. MIT License. -// deno-lint-ignore no-explicit-any -export const errorOrDestroy = (...args: any[]) => {}; +import { captureRejectionSymbol } from "../events.ts"; +import Readable, { ReadableState } from "./readable.ts"; +import Stream from "./stream.ts"; +import Writable, { WritableState } from "./writable.ts"; +import { Buffer } from "../buffer.ts"; +import { + ERR_STREAM_ALREADY_FINISHED, + ERR_STREAM_DESTROYED, + ERR_UNKNOWN_ENCODING, +} from "../_errors.ts"; +import type { Encodings } from "../_utils.ts"; +import createReadableStreamAsyncIterator from "./async_iterator.ts"; +import type { ReadableStreamAsyncIterator } from "./async_iterator.ts"; +import { + _destroy, + computeNewHighWaterMark, + emitReadable, + fromList, + howMuchToRead, + nReadingNextTick, + updateReadableListening, +} from "./readable_internal.ts"; +import { kOnFinished, writeV } from "./writable_internal.ts"; +import { + endDuplex, + finishMaybe, + onwrite, + readableAddChunk, +} from "./duplex_internal.ts"; +export { errorOrDestroy } from "./duplex_internal.ts"; + +export interface DuplexOptions { + allowHalfOpen?: boolean; + autoDestroy?: boolean; + decodeStrings?: boolean; + defaultEncoding?: Encodings; + destroy?( + this: Duplex, + error: Error | null, + callback: (error: Error | null) => void, + ): void; + emitClose?: boolean; + encoding?: Encodings; + final?(this: Duplex, callback: (error?: Error | null) => void): void; + highWaterMark?: number; + objectMode?: boolean; + read?(this: Duplex, size: number): void; + readable?: boolean; + readableHighWaterMark?: number; + readableObjectMode?: boolean; + writable?: boolean; + writableCorked?: number; + writableHighWaterMark?: number; + writableObjectMode?: boolean; + write?( + this: Duplex, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: Encodings, + callback: (error?: Error | null) => void, + ): void; + writev?: writeV; +} + +interface Duplex extends Readable, Writable {} + +/** + * A duplex is an implementation of a stream that has both Readable and Writable + * attributes and capabilities + */ +class Duplex extends Stream { + allowHalfOpen = true; + _final?: ( + callback: (error?: Error | null | undefined) => void, + ) => void; + _readableState: ReadableState; + _writableState: WritableState; + _writev?: writeV | null; + + constructor(options?: DuplexOptions) { + super(); + + if (options) { + if (options.allowHalfOpen === false) { + this.allowHalfOpen = false; + } + if (typeof options.destroy === "function") { + this._destroy = options.destroy; + } + if (typeof options.final === "function") { + this._final = options.final; + } + if (typeof options.read === "function") { + this._read = options.read; + } + if (options.readable === false) { + this.readable = false; + } + if (options.writable === false) { + this.writable = false; + } + if (typeof options.write === "function") { + this._write = options.write; + } + if (typeof options.writev === "function") { + this._writev = options.writev; + } + } + + const readableOptions = { + autoDestroy: options?.autoDestroy, + defaultEncoding: options?.defaultEncoding, + destroy: options?.destroy as unknown as ( + this: Readable, + error: Error | null, + callback: (error: Error | null) => void, + ) => void, + emitClose: options?.emitClose, + encoding: options?.encoding, + highWaterMark: options?.highWaterMark ?? options?.readableHighWaterMark, + objectMode: options?.objectMode ?? options?.readableObjectMode, + read: options?.read as unknown as (this: Readable) => void, + }; + + const writableOptions = { + autoDestroy: options?.autoDestroy, + decodeStrings: options?.decodeStrings, + defaultEncoding: options?.defaultEncoding, + destroy: options?.destroy as unknown as ( + this: Writable, + error: Error | null, + callback: (error: Error | null) => void, + ) => void, + emitClose: options?.emitClose, + final: options?.final as unknown as ( + this: Writable, + callback: (error?: Error | null) => void, + ) => void, + highWaterMark: options?.highWaterMark ?? options?.writableHighWaterMark, + objectMode: options?.objectMode ?? options?.writableObjectMode, + write: options?.write as unknown as ( + this: Writable, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + callback: (error?: Error | null) => void, + ) => void, + writev: options?.writev as unknown as ( + this: Writable, + // deno-lint-ignore no-explicit-any + chunks: Array<{ chunk: any; encoding: Encodings }>, + callback: (error?: Error | null) => void, + ) => void, + }; + + this._readableState = new ReadableState(readableOptions); + this._writableState = new WritableState( + writableOptions, + this as unknown as Writable, + ); + //Very important to override onwrite here, duplex implementation adds a check + //on the readable side + this._writableState.onwrite = onwrite.bind(undefined, this); + } + + [captureRejectionSymbol](err?: Error) { + this.destroy(err); + } + + [Symbol.asyncIterator](): ReadableStreamAsyncIterator { + return createReadableStreamAsyncIterator(this); + } + + _destroy( + error: Error | null, + callback: (error?: Error | null) => void, + ): void { + callback(error); + } + + _read = Readable.prototype._read; + + _undestroy = Readable.prototype._undestroy; + + destroy(err?: Error | null, cb?: (error?: Error | null) => void) { + const r = this._readableState; + const w = this._writableState; + + if (w.destroyed || r.destroyed) { + if (typeof cb === "function") { + cb(); + } + + return this; + } + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + if (!r.errored) { + r.errored = err; + } + } + + w.destroyed = true; + r.destroyed = true; + + this._destroy(err || null, (err) => { + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + if (!r.errored) { + r.errored = err; + } + } + + w.closed = true; + r.closed = true; + + if (typeof cb === "function") { + cb(err); + } + + if (err) { + queueMicrotask(() => { + const r = this._readableState; + const w = this._writableState; + + if (!w.errorEmitted && !r.errorEmitted) { + w.errorEmitted = true; + r.errorEmitted = true; + + this.emit("error", err); + } + + r.closeEmitted = true; + + if (w.emitClose || r.emitClose) { + this.emit("close"); + } + }); + } else { + queueMicrotask(() => { + const r = this._readableState; + const w = this._writableState; + + r.closeEmitted = true; + + if (w.emitClose || r.emitClose) { + this.emit("close"); + } + }); + } + }); + + return this; + } + + isPaused = Readable.prototype.isPaused; + + off = this.removeListener; + + on( + event: "close" | "end" | "pause" | "readable" | "resume", + listener: () => void, + ): this; + // deno-lint-ignore no-explicit-any + on(event: "data", listener: (chunk: any) => void): this; + on(event: "error", listener: (err: Error) => void): this; + // deno-lint-ignore no-explicit-any + on(event: string | symbol, listener: (...args: any[]) => void): this; + on( + ev: string | symbol, + fn: + | (() => void) + // deno-lint-ignore no-explicit-any + | ((chunk: any) => void) + | ((err: Error) => void) + // deno-lint-ignore no-explicit-any + | ((...args: any[]) => void), + ) { + const res = super.on.call(this, ev, fn); + const state = this._readableState; + + if (ev === "data") { + state.readableListening = this.listenerCount("readable") > 0; + + if (state.flowing !== false) { + this.resume(); + } + } else if (ev === "readable") { + if (!state.endEmitted && !state.readableListening) { + state.readableListening = state.needReadable = true; + state.flowing = false; + state.emittedReadable = false; + if (state.length) { + emitReadable(this); + } else if (!state.reading) { + queueMicrotask(() => nReadingNextTick(this)); + } + } + } + + return res; + } + + pause = Readable.prototype.pause as () => this; + + pipe = Readable.prototype.pipe; + + // deno-lint-ignore no-explicit-any + push(chunk: any, encoding?: Encodings): boolean { + return readableAddChunk(this, chunk, encoding, false); + } + + /** You can override either this method, or the async `_read` method */ + read(n?: number) { + // Same as parseInt(undefined, 10), however V8 7.3 performance regressed + // in this scenario, so we are doing it manually. + if (n === undefined) { + n = NaN; + } + const state = this._readableState; + const nOrig = n; + + if (n > state.highWaterMark) { + state.highWaterMark = computeNewHighWaterMark(n); + } + + if (n !== 0) { + state.emittedReadable = false; + } + + if ( + n === 0 && + state.needReadable && + ((state.highWaterMark !== 0 + ? state.length >= state.highWaterMark + : state.length > 0) || + state.ended) + ) { + if (state.length === 0 && state.ended) { + endDuplex(this); + } else { + emitReadable(this); + } + return null; + } + + n = howMuchToRead(n, state); + + if (n === 0 && state.ended) { + if (state.length === 0) { + endDuplex(this); + } + return null; + } + + let doRead = state.needReadable; + if ( + state.length === 0 || state.length - (n as number) < state.highWaterMark + ) { + doRead = true; + } + + if ( + state.ended || state.reading || state.destroyed || state.errored || + !state.constructed + ) { + doRead = false; + } else if (doRead) { + state.reading = true; + state.sync = true; + if (state.length === 0) { + state.needReadable = true; + } + this._read(); + state.sync = false; + if (!state.reading) { + n = howMuchToRead(nOrig, state); + } + } + + let ret; + if ((n as number) > 0) { + ret = fromList((n as number), state); + } else { + ret = null; + } + + if (ret === null) { + state.needReadable = state.length <= state.highWaterMark; + n = 0; + } else { + state.length -= n as number; + if (state.multiAwaitDrain) { + (state.awaitDrainWriters as Set<Writable>).clear(); + } else { + state.awaitDrainWriters = null; + } + } + + if (state.length === 0) { + if (!state.ended) { + state.needReadable = true; + } + + if (nOrig !== n && state.ended) { + endDuplex(this); + } + } + + if (ret !== null) { + this.emit("data", ret); + } + + return ret; + } + + removeAllListeners( + ev: + | "close" + | "data" + | "end" + | "error" + | "pause" + | "readable" + | "resume" + | symbol + | undefined, + ) { + const res = super.removeAllListeners(ev); + + if (ev === "readable" || ev === undefined) { + queueMicrotask(() => updateReadableListening(this)); + } + + return res; + } + + removeListener( + event: "close" | "end" | "pause" | "readable" | "resume", + listener: () => void, + ): this; + // deno-lint-ignore no-explicit-any + removeListener(event: "data", listener: (chunk: any) => void): this; + removeListener(event: "error", listener: (err: Error) => void): this; + removeListener( + event: string | symbol, + // deno-lint-ignore no-explicit-any + listener: (...args: any[]) => void, + ): this; + removeListener( + ev: string | symbol, + fn: + | (() => void) + // deno-lint-ignore no-explicit-any + | ((chunk: any) => void) + | ((err: Error) => void) + // deno-lint-ignore no-explicit-any + | ((...args: any[]) => void), + ) { + const res = super.removeListener.call(this, ev, fn); + + if (ev === "readable") { + queueMicrotask(() => updateReadableListening(this)); + } + + return res; + } + + resume = Readable.prototype.resume as () => this; + + setEncoding = Readable.prototype.setEncoding as (enc: string) => this; + + // deno-lint-ignore no-explicit-any + unshift(chunk: any, encoding?: Encodings): boolean { + return readableAddChunk(this, chunk, encoding, true); + } + + unpipe = Readable.prototype.unpipe as (dest?: Writable | undefined) => this; + + wrap = Readable.prototype.wrap as (stream: Stream) => this; + + get readable(): boolean { + return this._readableState?.readable && + !this._readableState?.destroyed && + !this._readableState?.errorEmitted && + !this._readableState?.endEmitted; + } + set readable(val: boolean) { + if (this._readableState) { + this._readableState.readable = val; + } + } + + get readableHighWaterMark(): number { + return this._readableState.highWaterMark; + } + + get readableBuffer() { + return this._readableState && this._readableState.buffer; + } + + get readableFlowing(): boolean | null { + return this._readableState.flowing; + } + + set readableFlowing(state: boolean | null) { + if (this._readableState) { + this._readableState.flowing = state; + } + } + + get readableLength() { + return this._readableState.length; + } + + get readableObjectMode() { + return this._readableState ? this._readableState.objectMode : false; + } + + get readableEncoding() { + return this._readableState ? this._readableState.encoding : null; + } + + get readableEnded() { + return this._readableState ? this._readableState.endEmitted : false; + } + + _write = Writable.prototype._write; + + write = Writable.prototype.write; + + cork = Writable.prototype.cork; + + uncork = Writable.prototype.uncork; + + setDefaultEncoding(encoding: string) { + // node::ParseEncoding() requires lower case. + if (typeof encoding === "string") { + encoding = encoding.toLowerCase(); + } + if (!Buffer.isEncoding(encoding)) { + throw new ERR_UNKNOWN_ENCODING(encoding); + } + this._writableState.defaultEncoding = encoding as Encodings; + return this; + } + + end(cb?: () => void): void; + // deno-lint-ignore no-explicit-any + end(chunk: any, cb?: () => void): void; + // deno-lint-ignore no-explicit-any + end(chunk: any, encoding: Encodings, cb?: () => void): void; + + end( + // deno-lint-ignore no-explicit-any + x?: any | (() => void), + y?: Encodings | (() => void), + z?: () => void, + ) { + const state = this._writableState; + // deno-lint-ignore no-explicit-any + let chunk: any | null; + let encoding: Encodings | null; + let cb: undefined | ((error?: Error) => void); + + if (typeof x === "function") { + chunk = null; + encoding = null; + cb = x; + } else if (typeof y === "function") { + chunk = x; + encoding = null; + cb = y; + } else { + chunk = x; + encoding = y as Encodings; + cb = z; + } + + if (chunk !== null && chunk !== undefined) { + this.write(chunk, encoding); + } + + if (state.corked) { + state.corked = 1; + this.uncork(); + } + + let err: Error | undefined; + if (!state.errored && !state.ending) { + state.ending = true; + finishMaybe(this, state, true); + state.ended = true; + } else if (state.finished) { + err = new ERR_STREAM_ALREADY_FINISHED("end"); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED("end"); + } + + if (typeof cb === "function") { + if (err || state.finished) { + queueMicrotask(() => { + (cb as (error?: Error | undefined) => void)(err); + }); + } else { + state[kOnFinished].push(cb); + } + } + + return this; + } + + get destroyed() { + if ( + this._readableState === undefined || + this._writableState === undefined + ) { + return false; + } + return this._readableState.destroyed && this._writableState.destroyed; + } + + set destroyed(value: boolean) { + if (this._readableState && this._writableState) { + this._readableState.destroyed = value; + this._writableState.destroyed = value; + } + } + + get writable() { + const w = this._writableState; + return !w.destroyed && !w.errored && !w.ending && !w.ended; + } + + set writable(val) { + if (this._writableState) { + this._writableState.writable = !!val; + } + } + + get writableFinished() { + return this._writableState ? this._writableState.finished : false; + } + + get writableObjectMode() { + return this._writableState ? this._writableState.objectMode : false; + } + + get writableBuffer() { + return this._writableState && this._writableState.getBuffer(); + } + + get writableEnded() { + return this._writableState ? this._writableState.ending : false; + } + + get writableHighWaterMark() { + return this._writableState && this._writableState.highWaterMark; + } + + get writableCorked() { + return this._writableState ? this._writableState.corked : 0; + } + + get writableLength() { + return this._writableState && this._writableState.length; + } +} + +export default Duplex; diff --git a/std/node/_stream/duplex_internal.ts b/std/node/_stream/duplex_internal.ts new file mode 100644 index 000000000..bfd9749f8 --- /dev/null +++ b/std/node/_stream/duplex_internal.ts @@ -0,0 +1,296 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import type { ReadableState } from "./readable.ts"; +import { addChunk, maybeReadMore, onEofChunk } from "./readable_internal.ts"; +import type Writable from "./writable.ts"; +import type { WritableState } from "./writable.ts"; +import { + afterWrite, + AfterWriteTick, + afterWriteTick, + clearBuffer, + errorBuffer, + kOnFinished, + needFinish, + prefinish, +} from "./writable_internal.ts"; +import { Buffer } from "../buffer.ts"; +import type Duplex from "./duplex.ts"; +import { + ERR_MULTIPLE_CALLBACK, + ERR_STREAM_PUSH_AFTER_EOF, + ERR_STREAM_UNSHIFT_AFTER_END_EVENT, +} from "../_errors.ts"; + +export function endDuplex(stream: Duplex) { + const state = stream._readableState; + + if (!state.endEmitted) { + state.ended = true; + queueMicrotask(() => endReadableNT(state, stream)); + } +} + +function endReadableNT(state: ReadableState, stream: Duplex) { + // Check that we didn't get one last unshift. + if ( + !state.errorEmitted && !state.closeEmitted && + !state.endEmitted && state.length === 0 + ) { + state.endEmitted = true; + stream.emit("end"); + + if (stream.writable && stream.allowHalfOpen === false) { + queueMicrotask(() => endWritableNT(state, stream)); + } else if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the writable side is ready for autoDestroy as well. + const wState = stream._writableState; + const autoDestroy = !wState || ( + wState.autoDestroy && + // We don't expect the writable to ever 'finish' + // if writable is explicitly set to false. + (wState.finished || wState.writable === false) + ); + + if (autoDestroy) { + stream.destroy(); + } + } + } +} + +function endWritableNT(state: ReadableState, stream: Duplex) { + const writable = stream.writable && + !stream.writableEnded && + !stream.destroyed; + if (writable) { + stream.end(); + } +} + +export function errorOrDestroy( + // deno-lint-ignore no-explicit-any + this: any, + stream: Duplex, + err: Error, + sync = false, +) { + const r = stream._readableState; + const w = stream._writableState; + + if (w.destroyed || r.destroyed) { + return this; + } + + if (r.autoDestroy || w.autoDestroy) { + stream.destroy(err); + } else if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (w && !w.errored) { + w.errored = err; + } + if (r && !r.errored) { + r.errored = err; + } + + if (sync) { + queueMicrotask(() => { + if (w.errorEmitted || r.errorEmitted) { + return; + } + + w.errorEmitted = true; + r.errorEmitted = true; + + stream.emit("error", err); + }); + } else { + if (w.errorEmitted || r.errorEmitted) { + return; + } + + w.errorEmitted = true; + r.errorEmitted = true; + + stream.emit("error", err); + } + } +} + +function finish(stream: Duplex, state: WritableState) { + state.pendingcb--; + if (state.errorEmitted || state.closeEmitted) { + return; + } + + state.finished = true; + + for (const callback of state[kOnFinished].splice(0)) { + callback(); + } + + stream.emit("finish"); + + if (state.autoDestroy) { + stream.destroy(); + } +} + +export function finishMaybe( + stream: Duplex, + state: WritableState, + sync?: boolean, +) { + if (needFinish(state)) { + prefinish(stream as Writable, state); + if (state.pendingcb === 0 && needFinish(state)) { + state.pendingcb++; + if (sync) { + queueMicrotask(() => finish(stream, state)); + } else { + finish(stream, state); + } + } + } +} + +export function onwrite(stream: Duplex, er?: Error | null) { + const state = stream._writableState; + const sync = state.sync; + const cb = state.writecb; + + if (typeof cb !== "function") { + errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); + return; + } + + state.writing = false; + state.writecb = null; + state.length -= state.writelen; + state.writelen = 0; + + if (er) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + er.stack; + + if (!state.errored) { + state.errored = er; + } + + if (stream._readableState && !stream._readableState.errored) { + stream._readableState.errored = er; + } + + if (sync) { + queueMicrotask(() => onwriteError(stream, state, er, cb)); + } else { + onwriteError(stream, state, er, cb); + } + } else { + if (state.buffered.length > state.bufferedIndex) { + clearBuffer(stream, state); + } + + if (sync) { + if ( + state.afterWriteTickInfo !== null && + state.afterWriteTickInfo.cb === cb + ) { + state.afterWriteTickInfo.count++; + } else { + state.afterWriteTickInfo = { + count: 1, + cb: (cb as (error?: Error) => void), + stream: stream as Writable, + state, + }; + queueMicrotask(() => + afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) + ); + } + } else { + afterWrite(stream as Writable, state, 1, cb as (error?: Error) => void); + } + } +} + +function onwriteError( + stream: Duplex, + state: WritableState, + er: Error, + cb: (error: Error) => void, +) { + --state.pendingcb; + + cb(er); + errorBuffer(state); + errorOrDestroy(stream, er); +} + +export function readableAddChunk( + stream: Duplex, + chunk: string | Buffer | Uint8Array | null, + encoding: undefined | string = undefined, + addToFront: boolean, +) { + const state = stream._readableState; + let usedEncoding = encoding; + + let err; + if (!state.objectMode) { + if (typeof chunk === "string") { + usedEncoding = encoding || state.defaultEncoding; + if (state.encoding !== usedEncoding) { + if (addToFront && state.encoding) { + chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding); + } else { + chunk = Buffer.from(chunk, usedEncoding); + usedEncoding = ""; + } + } + } else if (chunk instanceof Uint8Array) { + chunk = Buffer.from(chunk); + } + } + + if (err) { + errorOrDestroy(stream, err); + } else if (chunk === null) { + state.reading = false; + onEofChunk(stream, state); + } else if (state.objectMode || (chunk.length > 0)) { + if (addToFront) { + if (state.endEmitted) { + errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); + } else { + addChunk(stream, state, chunk, true); + } + } else if (state.ended) { + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); + } else if (state.destroyed || state.errored) { + return false; + } else { + state.reading = false; + if (state.decoder && !usedEncoding) { + //TODO(Soremwar) + //I don't think this cast is right + chunk = state.decoder.write(Buffer.from(chunk as Uint8Array)); + if (state.objectMode || chunk.length !== 0) { + addChunk(stream, state, chunk, false); + } else { + maybeReadMore(stream, state); + } + } else { + addChunk(stream, state, chunk, false); + } + } + } else if (!addToFront) { + state.reading = false; + maybeReadMore(stream, state); + } + + return !state.ended && + (state.length < state.highWaterMark || state.length === 0); +} diff --git a/std/node/_stream/duplex_test.ts b/std/node/_stream/duplex_test.ts new file mode 100644 index 000000000..1596ec218 --- /dev/null +++ b/std/node/_stream/duplex_test.ts @@ -0,0 +1,698 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import Duplex from "./duplex.ts"; +import finished from "./end_of_stream.ts"; +import { + assert, + assertEquals, + assertStrictEquals, + assertThrows, +} from "../../testing/asserts.ts"; +import { deferred, delay } from "../../async/mod.ts"; + +Deno.test("Duplex stream works normally", () => { + const stream = new Duplex({ objectMode: true }); + + assert(stream._readableState.objectMode); + assert(stream._writableState.objectMode); + assert(stream.allowHalfOpen); + assertEquals(stream.listenerCount("end"), 0); + + let written: { val: number }; + let read: { val: number }; + + stream._write = (obj, _, cb) => { + written = obj; + cb(); + }; + + stream._read = () => {}; + + stream.on("data", (obj) => { + read = obj; + }); + + stream.push({ val: 1 }); + stream.end({ val: 2 }); + + stream.on("finish", () => { + assertEquals(read.val, 1); + assertEquals(written.val, 2); + }); +}); + +Deno.test("Duplex stream gets constructed correctly", () => { + const d1 = new Duplex({ + objectMode: true, + highWaterMark: 100, + }); + + assertEquals(d1.readableObjectMode, true); + assertEquals(d1.readableHighWaterMark, 100); + assertEquals(d1.writableObjectMode, true); + assertEquals(d1.writableHighWaterMark, 100); + + const d2 = new Duplex({ + readableObjectMode: false, + readableHighWaterMark: 10, + writableObjectMode: true, + writableHighWaterMark: 100, + }); + + assertEquals(d2.writableObjectMode, true); + assertEquals(d2.writableHighWaterMark, 100); + assertEquals(d2.readableObjectMode, false); + assertEquals(d2.readableHighWaterMark, 10); +}); + +Deno.test("Duplex stream can be paused", () => { + const readable = new Duplex(); + + // _read is a noop, here. + readable._read = () => {}; + + // Default state of a stream is not "paused" + assert(!readable.isPaused()); + + // Make the stream start flowing... + readable.on("data", () => {}); + + // still not paused. + assert(!readable.isPaused()); + + readable.pause(); + assert(readable.isPaused()); + readable.resume(); + assert(!readable.isPaused()); +}); + +Deno.test("Duplex stream sets enconding correctly", () => { + const readable = new Duplex({ + read() {}, + }); + + readable.setEncoding("utf8"); + + readable.push(new TextEncoder().encode("DEF")); + readable.unshift(new TextEncoder().encode("ABC")); + + assertStrictEquals(readable.read(), "ABCDEF"); +}); + +Deno.test("Duplex stream sets encoding correctly", () => { + const readable = new Duplex({ + read() {}, + }); + + readable.setEncoding("utf8"); + + readable.push(new TextEncoder().encode("DEF")); + readable.unshift(new TextEncoder().encode("ABC")); + + assertStrictEquals(readable.read(), "ABCDEF"); +}); + +Deno.test("Duplex stream holds up a big push", async () => { + let readExecuted = 0; + const readExecutedExpected = 3; + const readExpectedExecutions = deferred(); + + let endExecuted = 0; + const endExecutedExpected = 1; + const endExpectedExecutions = deferred(); + + const str = "asdfasdfasdfasdfasdf"; + + const r = new Duplex({ + highWaterMark: 5, + encoding: "utf8", + }); + + let reads = 0; + + function _read() { + if (reads === 0) { + setTimeout(() => { + r.push(str); + }, 1); + reads++; + } else if (reads === 1) { + const ret = r.push(str); + assertEquals(ret, false); + reads++; + } else { + r.push(null); + } + } + + r._read = () => { + readExecuted++; + if (readExecuted == readExecutedExpected) { + readExpectedExecutions.resolve(); + } + _read(); + }; + + r.on("end", () => { + endExecuted++; + if (endExecuted == endExecutedExpected) { + endExpectedExecutions.resolve(); + } + }); + + // Push some data in to start. + // We've never gotten any read event at this point. + const ret = r.push(str); + assert(!ret); + let chunk = r.read(); + assertEquals(chunk, str); + chunk = r.read(); + assertEquals(chunk, null); + + r.once("readable", () => { + // This time, we'll get *all* the remaining data, because + // it's been added synchronously, as the read WOULD take + // us below the hwm, and so it triggered a _read() again, + // which synchronously added more, which we then return. + chunk = r.read(); + assertEquals(chunk, str + str); + + chunk = r.read(); + assertEquals(chunk, null); + }); + + const readTimeout = setTimeout( + () => readExpectedExecutions.reject(), + 1000, + ); + const endTimeout = setTimeout( + () => endExpectedExecutions.reject(), + 1000, + ); + await readExpectedExecutions; + await endExpectedExecutions; + clearTimeout(readTimeout); + clearTimeout(endTimeout); + assertEquals(readExecuted, readExecutedExpected); + assertEquals(endExecuted, endExecutedExpected); +}); + +Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => { + let readableExecuted = 0; + const readableExecutedExpected = 1; + const readableExpectedExecutions = deferred(); + + const r = new Duplex({ + highWaterMark: 3, + }); + + r._read = () => { + throw new Error("_read must not be called"); + }; + r.push(Buffer.from("blerg")); + + setTimeout(function () { + assert(!r._readableState.reading); + r.on("readable", () => { + readableExecuted++; + if (readableExecuted == readableExecutedExpected) { + readableExpectedExecutions.resolve(); + } + }); + }, 1); + + const readableTimeout = setTimeout( + () => readableExpectedExecutions.reject(), + 1000, + ); + await readableExpectedExecutions; + clearTimeout(readableTimeout); + assertEquals(readableExecuted, readableExecutedExpected); +}); + +Deno.test("Duplex stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => { + let readableExecuted = 0; + const readableExecutedExpected = 1; + const readableExpectedExecutions = deferred(); + + let readExecuted = 0; + const readExecutedExpected = 1; + const readExpectedExecutions = deferred(); + + const r = new Duplex({ + highWaterMark: 3, + }); + + r._read = () => { + readExecuted++; + if (readExecuted == readExecutedExpected) { + readExpectedExecutions.resolve(); + } + }; + + r.push(Buffer.from("bl")); + + setTimeout(function () { + assert(r._readableState.reading); + r.on("readable", () => { + readableExecuted++; + if (readableExecuted == readableExecutedExpected) { + readableExpectedExecutions.resolve(); + } + }); + }, 1); + + const readableTimeout = setTimeout( + () => readableExpectedExecutions.reject(), + 1000, + ); + const readTimeout = setTimeout( + () => readExpectedExecutions.reject(), + 1000, + ); + await readableExpectedExecutions; + await readExpectedExecutions; + clearTimeout(readableTimeout); + clearTimeout(readTimeout); + assertEquals(readableExecuted, readableExecutedExpected); + assertEquals(readExecuted, readExecutedExpected); +}); + +Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => { + let readableExecuted = 0; + const readableExecutedExpected = 1; + const readableExpectedExecutions = deferred(); + + const r = new Duplex({ + highWaterMark: 30, + }); + + r._read = () => { + throw new Error("Must not be executed"); + }; + + r.push(Buffer.from("blerg")); + //This ends the stream and triggers end + r.push(null); + + setTimeout(function () { + // Assert we're testing what we think we are + assert(!r._readableState.reading); + r.on("readable", () => { + readableExecuted++; + if (readableExecuted == readableExecutedExpected) { + readableExpectedExecutions.resolve(); + } + }); + }, 1); + + const readableTimeout = setTimeout( + () => readableExpectedExecutions.reject(), + 1000, + ); + await readableExpectedExecutions; + clearTimeout(readableTimeout); + assertEquals(readableExecuted, readableExecutedExpected); +}); + +Deno.test("Duplex stream: 'read' is emitted on empty string pushed in non-object mode", async () => { + let endExecuted = 0; + const endExecutedExpected = 1; + const endExpectedExecutions = deferred(); + + const underlyingData = ["", "x", "y", "", "z"]; + const expected = underlyingData.filter((data) => data); + const result: unknown[] = []; + + const r = new Duplex({ + encoding: "utf8", + }); + r._read = function () { + queueMicrotask(() => { + if (!underlyingData.length) { + this.push(null); + } else { + this.push(underlyingData.shift()); + } + }); + }; + + r.on("readable", () => { + const data = r.read(); + if (data !== null) result.push(data); + }); + + r.on("end", () => { + endExecuted++; + if (endExecuted == endExecutedExpected) { + endExpectedExecutions.resolve(); + } + assertEquals(result, expected); + }); + + const endTimeout = setTimeout( + () => endExpectedExecutions.reject(), + 1000, + ); + await endExpectedExecutions; + clearTimeout(endTimeout); + assertEquals(endExecuted, endExecutedExpected); +}); + +Deno.test("Duplex stream: listeners can be removed", () => { + const r = new Duplex(); + r._read = () => {}; + r.on("data", () => {}); + + r.removeAllListeners("data"); + + assertEquals(r.eventNames().length, 0); +}); + +Deno.test("Duplex stream writes correctly", async () => { + let callback: undefined | ((error?: Error | null | undefined) => void); + + let writeExecuted = 0; + const writeExecutedExpected = 1; + const writeExpectedExecutions = deferred(); + + let writevExecuted = 0; + const writevExecutedExpected = 1; + const writevExpectedExecutions = deferred(); + + const writable = new Duplex({ + write: (chunk, encoding, cb) => { + writeExecuted++; + if (writeExecuted == writeExecutedExpected) { + writeExpectedExecutions.resolve(); + } + assert(chunk instanceof Buffer); + assertStrictEquals(encoding, "buffer"); + assertStrictEquals(String(chunk), "ABC"); + callback = cb; + }, + writev: (chunks) => { + writevExecuted++; + if (writevExecuted == writevExecutedExpected) { + writevExpectedExecutions.resolve(); + } + assertStrictEquals(chunks.length, 2); + assertStrictEquals(chunks[0].encoding, "buffer"); + assertStrictEquals(chunks[1].encoding, "buffer"); + assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI"); + }, + }); + + writable.write(new TextEncoder().encode("ABC")); + writable.write(new TextEncoder().encode("DEF")); + writable.end(new TextEncoder().encode("GHI")); + callback?.(); + + const writeTimeout = setTimeout( + () => writeExpectedExecutions.reject(), + 1000, + ); + const writevTimeout = setTimeout( + () => writevExpectedExecutions.reject(), + 1000, + ); + await writeExpectedExecutions; + await writevExpectedExecutions; + clearTimeout(writeTimeout); + clearTimeout(writevTimeout); + assertEquals(writeExecuted, writeExecutedExpected); + assertEquals(writevExecuted, writevExecutedExpected); +}); + +Deno.test("Duplex stream writes Uint8Array in object mode", async () => { + let writeExecuted = 0; + const writeExecutedExpected = 1; + const writeExpectedExecutions = deferred(); + + const ABC = new TextEncoder().encode("ABC"); + + const writable = new Duplex({ + objectMode: true, + write: (chunk, encoding, cb) => { + writeExecuted++; + if (writeExecuted == writeExecutedExpected) { + writeExpectedExecutions.resolve(); + } + assert(!(chunk instanceof Buffer)); + assert(chunk instanceof Uint8Array); + assertEquals(chunk, ABC); + assertEquals(encoding, "utf8"); + cb(); + }, + }); + + writable.end(ABC); + + const writeTimeout = setTimeout( + () => writeExpectedExecutions.reject(), + 1000, + ); + await writeExpectedExecutions; + clearTimeout(writeTimeout); + assertEquals(writeExecuted, writeExecutedExpected); +}); + +Deno.test("Duplex stream throws on unexpected close", async () => { + let finishedExecuted = 0; + const finishedExecutedExpected = 1; + const finishedExpectedExecutions = deferred(); + + const writable = new Duplex({ + write: () => {}, + }); + writable.writable = false; + writable.destroy(); + + finished(writable, (err) => { + finishedExecuted++; + if (finishedExecuted == finishedExecutedExpected) { + finishedExpectedExecutions.resolve(); + } + assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE"); + }); + + const finishedTimeout = setTimeout( + () => finishedExpectedExecutions.reject(), + 1000, + ); + await finishedExpectedExecutions; + clearTimeout(finishedTimeout); + assertEquals(finishedExecuted, finishedExecutedExpected); +}); + +Deno.test("Duplex stream finishes correctly after error", async () => { + let errorExecuted = 0; + const errorExecutedExpected = 1; + const errorExpectedExecutions = deferred(); + + let finishedExecuted = 0; + const finishedExecutedExpected = 1; + const finishedExpectedExecutions = deferred(); + + const w = new Duplex({ + write(_chunk, _encoding, cb) { + cb(new Error()); + }, + autoDestroy: false, + }); + w.write("asd"); + w.on("error", () => { + errorExecuted++; + if (errorExecuted == errorExecutedExpected) { + errorExpectedExecutions.resolve(); + } + finished(w, () => { + finishedExecuted++; + if (finishedExecuted == finishedExecutedExpected) { + finishedExpectedExecutions.resolve(); + } + }); + }); + + const errorTimeout = setTimeout( + () => errorExpectedExecutions.reject(), + 1000, + ); + const finishedTimeout = setTimeout( + () => finishedExpectedExecutions.reject(), + 1000, + ); + await finishedExpectedExecutions; + await errorExpectedExecutions; + clearTimeout(finishedTimeout); + clearTimeout(errorTimeout); + assertEquals(finishedExecuted, finishedExecutedExpected); + assertEquals(errorExecuted, errorExecutedExpected); +}); + +Deno.test("Duplex stream fails on 'write' null value", () => { + const writable = new Duplex(); + assertThrows(() => writable.write(null)); +}); + +Deno.test("Duplex stream is destroyed correctly", async () => { + let closeExecuted = 0; + const closeExecutedExpected = 1; + const closeExpectedExecutions = deferred(); + + const unexpectedExecution = deferred(); + + const duplex = new Duplex({ + write(_chunk, _enc, cb) { + cb(); + }, + read() {}, + }); + + duplex.resume(); + + function never() { + unexpectedExecution.reject(); + } + + duplex.on("end", never); + duplex.on("finish", never); + duplex.on("close", () => { + closeExecuted++; + if (closeExecuted == closeExecutedExpected) { + closeExpectedExecutions.resolve(); + } + }); + + duplex.destroy(); + assertEquals(duplex.destroyed, true); + + const closeTimeout = setTimeout( + () => closeExpectedExecutions.reject(), + 1000, + ); + await Promise.race([ + unexpectedExecution, + delay(100), + ]); + await closeExpectedExecutions; + clearTimeout(closeTimeout); + assertEquals(closeExecuted, closeExecutedExpected); +}); + +Deno.test("Duplex stream errors correctly on destroy", async () => { + let errorExecuted = 0; + const errorExecutedExpected = 1; + const errorExpectedExecutions = deferred(); + + const unexpectedExecution = deferred(); + + const duplex = new Duplex({ + write(_chunk, _enc, cb) { + cb(); + }, + read() {}, + }); + duplex.resume(); + + const expected = new Error("kaboom"); + + function never() { + unexpectedExecution.reject(); + } + + duplex.on("end", never); + duplex.on("finish", never); + duplex.on("error", (err) => { + errorExecuted++; + if (errorExecuted == errorExecutedExpected) { + errorExpectedExecutions.resolve(); + } + assertStrictEquals(err, expected); + }); + + duplex.destroy(expected); + assertEquals(duplex.destroyed, true); + + const errorTimeout = setTimeout( + () => errorExpectedExecutions.reject(), + 1000, + ); + await Promise.race([ + unexpectedExecution, + delay(100), + ]); + await errorExpectedExecutions; + clearTimeout(errorTimeout); + assertEquals(errorExecuted, errorExecutedExpected); +}); + +Deno.test("Duplex stream doesn't finish on allowHalfOpen", async () => { + const unexpectedExecution = deferred(); + + const duplex = new Duplex({ + read() {}, + }); + + assertEquals(duplex.allowHalfOpen, true); + duplex.on("finish", () => unexpectedExecution.reject()); + assertEquals(duplex.listenerCount("end"), 0); + duplex.resume(); + duplex.push(null); + + await Promise.race([ + unexpectedExecution, + delay(100), + ]); +}); + +Deno.test("Duplex stream finishes when allowHalfOpen is disabled", async () => { + let finishExecuted = 0; + const finishExecutedExpected = 1; + const finishExpectedExecutions = deferred(); + + const duplex = new Duplex({ + read() {}, + allowHalfOpen: false, + }); + + assertEquals(duplex.allowHalfOpen, false); + duplex.on("finish", () => { + finishExecuted++; + if (finishExecuted == finishExecutedExpected) { + finishExpectedExecutions.resolve(); + } + }); + assertEquals(duplex.listenerCount("end"), 0); + duplex.resume(); + duplex.push(null); + + const finishTimeout = setTimeout( + () => finishExpectedExecutions.reject(), + 1000, + ); + await finishExpectedExecutions; + clearTimeout(finishTimeout); + assertEquals(finishExecuted, finishExecutedExpected); +}); + +Deno.test("Duplex stream doesn't finish when allowHalfOpen is disabled but stream ended", async () => { + const unexpectedExecution = deferred(); + + const duplex = new Duplex({ + read() {}, + allowHalfOpen: false, + }); + + assertEquals(duplex.allowHalfOpen, false); + duplex._writableState.ended = true; + duplex.on("finish", () => unexpectedExecution.reject()); + assertEquals(duplex.listenerCount("end"), 0); + duplex.resume(); + duplex.push(null); + + await Promise.race([ + unexpectedExecution, + delay(100), + ]); +}); diff --git a/std/node/_stream/end-of-stream.ts b/std/node/_stream/end_of_stream.ts index c42bb0e1c..6179e7fc4 100644 --- a/std/node/_stream/end-of-stream.ts +++ b/std/node/_stream/end_of_stream.ts @@ -1,5 +1,6 @@ // Copyright Node.js contributors. All rights reserved. MIT License. import { once } from "../_utils.ts"; +import type Duplex from "./duplex.ts"; import type Readable from "./readable.ts"; import type Stream from "./stream.ts"; import type { ReadableState } from "./readable.ts"; @@ -11,7 +12,7 @@ import { NodeErrorAbstraction, } from "../_errors.ts"; -type StreamImplementations = Readable | Stream | Writable; +export type StreamImplementations = Duplex | Readable | Stream | Writable; // TODO(Soremwar) // Bring back once requests are implemented @@ -49,7 +50,7 @@ function isReadableEnded(stream: Readable) { return rState.endEmitted || (rState.ended && rState.length === 0); } -interface FinishedOptions { +export interface FinishedOptions { error?: boolean; readable?: boolean; writable?: boolean; diff --git a/std/node/_stream/end_of_stream_test.ts b/std/node/_stream/end_of_stream_test.ts new file mode 100644 index 000000000..571e75b99 --- /dev/null +++ b/std/node/_stream/end_of_stream_test.ts @@ -0,0 +1,97 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import finished from "./end_of_stream.ts"; +import Readable from "./readable.ts"; +import Transform from "./transform.ts"; +import Writable from "./writable.ts"; +import { mustCall } from "../_utils.ts"; +import { assert, fail } from "../../testing/asserts.ts"; +import { deferred, delay } from "../../async/mod.ts"; + +Deno.test("Finished appends to Readable correctly", async () => { + const rs = new Readable({ + read() {}, + }); + + const [finishedExecution, finishedCb] = mustCall((err) => { + assert(!err); + }); + + finished(rs, finishedCb); + + rs.push(null); + rs.resume(); + + await finishedExecution; +}); + +Deno.test("Finished appends to Writable correctly", async () => { + const ws = new Writable({ + write(_data, _enc, cb) { + cb(); + }, + }); + + const [finishedExecution, finishedCb] = mustCall((err) => { + assert(!err); + }); + + finished(ws, finishedCb); + + ws.end(); + + await finishedExecution; +}); + +Deno.test("Finished appends to Transform correctly", async () => { + const tr = new Transform({ + transform(_data, _enc, cb) { + cb(); + }, + }); + + let finish = false; + let ended = false; + + tr.on("end", () => { + ended = true; + }); + + tr.on("finish", () => { + finish = true; + }); + + const [finishedExecution, finishedCb] = mustCall((err) => { + assert(!err); + assert(finish); + assert(ended); + }); + + finished(tr, finishedCb); + + tr.end(); + tr.resume(); + + await finishedExecution; +}); + +Deno.test("The function returned by Finished clears the listeners", async () => { + const finishedExecution = deferred(); + + const ws = new Writable({ + write(_data, _env, cb) { + cb(); + }, + }); + + const removeListener = finished(ws, () => { + finishedExecution.reject(); + }); + removeListener(); + ws.end(); + + await Promise.race([ + delay(100), + finishedExecution, + ]) + .catch(() => fail("Finished was executed")); +}); diff --git a/std/node/_stream/passthrough.ts b/std/node/_stream/passthrough.ts new file mode 100644 index 000000000..9126420e5 --- /dev/null +++ b/std/node/_stream/passthrough.ts @@ -0,0 +1,20 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import Transform from "./transform.ts"; +import type { TransformOptions } from "./transform.ts"; +import type { Encodings } from "../_utils.ts"; + +export default class PassThrough extends Transform { + constructor(options?: TransformOptions) { + super(options); + } + + _transform( + // deno-lint-ignore no-explicit-any + chunk: any, + _encoding: Encodings, + // deno-lint-ignore no-explicit-any + cb: (error?: Error | null, data?: any) => void, + ) { + cb(null, chunk); + } +} diff --git a/std/node/_stream/pipeline.ts b/std/node/_stream/pipeline.ts new file mode 100644 index 000000000..d02a92870 --- /dev/null +++ b/std/node/_stream/pipeline.ts @@ -0,0 +1,308 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { once } from "../_utils.ts"; +import { destroyer as implDestroyer } from "./destroy.ts"; +import eos from "./end_of_stream.ts"; +import createReadableStreamAsyncIterator from "./async_iterator.ts"; +import * as events from "../events.ts"; +import PassThrough from "./passthrough.ts"; +import { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_CALLBACK, + ERR_INVALID_RETURN_VALUE, + ERR_MISSING_ARGS, + ERR_STREAM_DESTROYED, + NodeErrorAbstraction, +} from "../_errors.ts"; +import type Duplex from "./duplex.ts"; +import type Readable from "./readable.ts"; +import type Stream from "./stream.ts"; +import type Transform from "./transform.ts"; +import type Writable from "./writable.ts"; + +type Streams = Duplex | Readable | Writable; +// deno-lint-ignore no-explicit-any +type EndCallback = (err?: NodeErrorAbstraction | null, val?: any) => void; +type TransformCallback = + // deno-lint-ignore no-explicit-any + | ((value?: any) => AsyncGenerator<any>) + // deno-lint-ignore no-explicit-any + | ((value?: any) => Promise<any>); +/** + * This type represents an array that contains a data source, + * many Transform Streams, a writable stream destination + * and end in an optional callback + * */ +type DataSource = + // deno-lint-ignore no-explicit-any + | (() => AsyncGenerator<any>) + | // deno-lint-ignore no-explicit-any + AsyncIterable<any> + | Duplex + | // deno-lint-ignore no-explicit-any + Iterable<any> + | // deno-lint-ignore no-explicit-any + (() => Generator<any>) + | Readable; +type Transformers = Duplex | Transform | TransformCallback | Writable; +export type PipelineArguments = [ + DataSource, + ...Array<Transformers | EndCallback>, +]; + +function destroyer( + stream: Streams, + reading: boolean, + writing: boolean, + callback: EndCallback, +) { + callback = once(callback); + + let finished = false; + stream.on("close", () => { + finished = true; + }); + + eos(stream, { readable: reading, writable: writing }, (err) => { + finished = !err; + + // deno-lint-ignore no-explicit-any + const rState = (stream as any)?._readableState; + if ( + err && + err.code === "ERR_STREAM_PREMATURE_CLOSE" && + reading && + (rState?.ended && !rState?.errored && !rState?.errorEmitted) + ) { + stream + .once("end", callback) + .once("error", callback); + } else { + callback(err); + } + }); + + return (err: NodeErrorAbstraction) => { + if (finished) return; + finished = true; + implDestroyer(stream, err); + callback(err || new ERR_STREAM_DESTROYED("pipe")); + }; +} + +function popCallback(streams: PipelineArguments): EndCallback { + if (typeof streams[streams.length - 1] !== "function") { + throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]); + } + return streams.pop() as EndCallback; +} + +// function isPromise(obj) { +// return !!(obj && typeof obj.then === "function"); +// } + +// deno-lint-ignore no-explicit-any +function isReadable(obj: any): obj is Stream { + return !!(obj && typeof obj.pipe === "function"); +} + +// deno-lint-ignore no-explicit-any +function isWritable(obj: any) { + return !!(obj && typeof obj.write === "function"); +} + +// deno-lint-ignore no-explicit-any +function isStream(obj: any) { + return isReadable(obj) || isWritable(obj); +} + +// deno-lint-ignore no-explicit-any +function isIterable(obj: any, isAsync?: boolean) { + if (!obj) return false; + if (isAsync === true) return typeof obj[Symbol.asyncIterator] === "function"; + if (isAsync === false) return typeof obj[Symbol.iterator] === "function"; + return typeof obj[Symbol.asyncIterator] === "function" || + typeof obj[Symbol.iterator] === "function"; +} + +// deno-lint-ignore no-explicit-any +function makeAsyncIterable(val: Readable | Iterable<any> | AsyncIterable<any>) { + if (isIterable(val)) { + return val; + } else if (isReadable(val)) { + return fromReadable(val as Readable); + } + throw new ERR_INVALID_ARG_TYPE( + "val", + ["Readable", "Iterable", "AsyncIterable"], + val, + ); +} + +async function* fromReadable(val: Readable) { + yield* createReadableStreamAsyncIterator(val); +} + +async function pump( + // deno-lint-ignore no-explicit-any + iterable: Iterable<any>, + writable: Duplex | Writable, + finish: (err?: NodeErrorAbstraction | null) => void, +) { + let error; + try { + for await (const chunk of iterable) { + if (!writable.write(chunk)) { + if (writable.destroyed) return; + await events.once(writable, "drain"); + } + } + writable.end(); + } catch (err) { + error = err; + } finally { + finish(error); + } +} + +export default function pipeline(...args: PipelineArguments) { + const callback: EndCallback = once(popCallback(args)); + + let streams: [DataSource, ...Transformers[]]; + if (args.length > 1) { + streams = args as [DataSource, ...Transformers[]]; + } else { + throw new ERR_MISSING_ARGS("streams"); + } + + let error: NodeErrorAbstraction; + // deno-lint-ignore no-explicit-any + let value: any; + const destroys: Array<(err: NodeErrorAbstraction) => void> = []; + + let finishCount = 0; + + function finish(err?: NodeErrorAbstraction | null) { + const final = --finishCount === 0; + + if (err && (!error || error.code === "ERR_STREAM_PREMATURE_CLOSE")) { + error = err; + } + + if (!error && !final) { + return; + } + + while (destroys.length) { + (destroys.shift() as (err: NodeErrorAbstraction) => void)(error); + } + + if (final) { + callback(error, value); + } + } + + // TODO(Soremwar) + // Simplify the hell out of this + // deno-lint-ignore no-explicit-any + let ret: any; + for (let i = 0; i < streams.length; i++) { + const stream = streams[i]; + const reading = i < streams.length - 1; + const writing = i > 0; + + if (isStream(stream)) { + finishCount++; + destroys.push(destroyer(stream as Streams, reading, writing, finish)); + } + + if (i === 0) { + if (typeof stream === "function") { + ret = stream(); + if (!isIterable(ret)) { + throw new ERR_INVALID_RETURN_VALUE( + "Iterable, AsyncIterable or Stream", + "source", + ret, + ); + } + } else if (isIterable(stream) || isReadable(stream)) { + ret = stream; + } else { + throw new ERR_INVALID_ARG_TYPE( + "source", + ["Stream", "Iterable", "AsyncIterable", "Function"], + stream, + ); + } + } else if (typeof stream === "function") { + ret = makeAsyncIterable(ret); + ret = stream(ret); + + if (reading) { + if (!isIterable(ret, true)) { + throw new ERR_INVALID_RETURN_VALUE( + "AsyncIterable", + `transform[${i - 1}]`, + ret, + ); + } + } else { + // If the last argument to pipeline is not a stream + // we must create a proxy stream so that pipeline(...) + // always returns a stream which can be further + // composed through `.pipe(stream)`. + const pt = new PassThrough({ + objectMode: true, + }); + if (ret instanceof Promise) { + ret + .then((val) => { + value = val; + pt.end(val); + }, (err) => { + pt.destroy(err); + }); + } else if (isIterable(ret, true)) { + finishCount++; + pump(ret, pt, finish); + } else { + throw new ERR_INVALID_RETURN_VALUE( + "AsyncIterable or Promise", + "destination", + ret, + ); + } + + ret = pt; + + finishCount++; + destroys.push(destroyer(ret, false, true, finish)); + } + } else if (isStream(stream)) { + if (isReadable(ret)) { + ret.pipe(stream as Readable); + + // TODO(Soremwar) + // Reimplement after stdout and stderr are implemented + // if (stream === process.stdout || stream === process.stderr) { + // ret.on("end", () => stream.end()); + // } + } else { + ret = makeAsyncIterable(ret); + + finishCount++; + pump(ret, stream as Writable, finish); + } + ret = stream; + } else { + const name = reading ? `transform[${i - 1}]` : "destination"; + throw new ERR_INVALID_ARG_TYPE( + name, + ["Stream", "Function"], + ret, + ); + } + } + + return ret as unknown as Readable; +} diff --git a/std/node/_stream/pipeline_test.ts b/std/node/_stream/pipeline_test.ts new file mode 100644 index 000000000..aa1869416 --- /dev/null +++ b/std/node/_stream/pipeline_test.ts @@ -0,0 +1,387 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import PassThrough from "./passthrough.ts"; +import pipeline from "./pipeline.ts"; +import Readable from "./readable.ts"; +import Transform from "./transform.ts"; +import Writable from "./writable.ts"; +import { mustCall } from "../_utils.ts"; +import { + assert, + assertEquals, + assertStrictEquals, +} from "../../testing/asserts.ts"; +import type { NodeErrorAbstraction } from "../_errors.ts"; + +Deno.test("Pipeline ends on stream finished", async () => { + let finished = false; + + // deno-lint-ignore no-explicit-any + const processed: any[] = []; + const expected = [ + Buffer.from("a"), + Buffer.from("b"), + Buffer.from("c"), + ]; + + const read = new Readable({ + read() {}, + }); + + const write = new Writable({ + write(data, _enc, cb) { + processed.push(data); + cb(); + }, + }); + + write.on("finish", () => { + finished = true; + }); + + for (let i = 0; i < expected.length; i++) { + read.push(expected[i]); + } + read.push(null); + + const [finishedCompleted, finishedCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assert(finished); + assertEquals(processed, expected); + }, + 1, + ); + + pipeline(read, write, finishedCb); + + await finishedCompleted; +}); + +Deno.test("Pipeline fails on stream destroyed", async () => { + const read = new Readable({ + read() {}, + }); + + const write = new Writable({ + write(_data, _enc, cb) { + cb(); + }, + }); + + read.push("data"); + queueMicrotask(() => read.destroy()); + + const [pipelineExecuted, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(err); + }, + 1, + ); + pipeline(read, write, pipelineCb); + + await pipelineExecuted; +}); + +Deno.test("Pipeline exits on stream error", async () => { + const read = new Readable({ + read() {}, + }); + + const transform = new Transform({ + transform(_data, _enc, cb) { + cb(new Error("kaboom")); + }, + }); + + const write = new Writable({ + write(_data, _enc, cb) { + cb(); + }, + }); + + const [readExecution, readCb] = mustCall(); + read.on("close", readCb); + const [closeExecution, closeCb] = mustCall(); + transform.on("close", closeCb); + const [writeExecution, writeCb] = mustCall(); + write.on("close", writeCb); + + const errorExecutions = [read, transform, write] + .map((stream) => { + const [execution, cb] = mustCall((err?: NodeErrorAbstraction | null) => { + assertEquals(err, new Error("kaboom")); + }); + + stream.on("error", cb); + return execution; + }); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assertEquals(err, new Error("kaboom")); + }, + ); + const dst = pipeline(read, transform, write, pipelineCb); + + assertStrictEquals(dst, write); + + read.push("hello"); + + await readExecution; + await closeExecution; + await writeExecution; + await Promise.all(errorExecutions); + await pipelineExecution; +}); + +Deno.test("Pipeline processes iterators correctly", async () => { + let res = ""; + const w = new Writable({ + write(chunk, _encoding, callback) { + res += chunk; + callback(); + }, + }); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assertEquals(res, "helloworld"); + }, + ); + pipeline( + function* () { + yield "hello"; + yield "world"; + }(), + w, + pipelineCb, + ); + + await pipelineExecution; +}); + +Deno.test("Pipeline processes async iterators correctly", async () => { + let res = ""; + const w = new Writable({ + write(chunk, _encoding, callback) { + res += chunk; + callback(); + }, + }); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assertEquals(res, "helloworld"); + }, + ); + pipeline( + async function* () { + await Promise.resolve(); + yield "hello"; + yield "world"; + }(), + w, + pipelineCb, + ); + + await pipelineExecution; +}); + +Deno.test("Pipeline processes generators correctly", async () => { + let res = ""; + const w = new Writable({ + write(chunk, _encoding, callback) { + res += chunk; + callback(); + }, + }); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assertEquals(res, "helloworld"); + }, + ); + pipeline( + function* () { + yield "hello"; + yield "world"; + }, + w, + pipelineCb, + ); + + await pipelineExecution; +}); + +Deno.test("Pipeline processes async generators correctly", async () => { + let res = ""; + const w = new Writable({ + write(chunk, _encoding, callback) { + res += chunk; + callback(); + }, + }); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assertEquals(res, "helloworld"); + }, + ); + pipeline( + async function* () { + await Promise.resolve(); + yield "hello"; + yield "world"; + }, + w, + pipelineCb, + ); + + await pipelineExecution; +}); + +Deno.test("Pipeline handles generator transforms", async () => { + let res = ""; + + const [pipelineExecuted, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assertEquals(res, "HELLOWORLD"); + }, + ); + pipeline( + async function* () { + await Promise.resolve(); + yield "hello"; + yield "world"; + }, + async function* (source: string[]) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + }, + async function (source: string[]) { + for await (const chunk of source) { + res += chunk; + } + }, + pipelineCb, + ); + + await pipelineExecuted; +}); + +Deno.test("Pipeline passes result to final callback", async () => { + const [pipelineExecuted, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null, val?: unknown) => { + assert(!err); + assertEquals(val, "HELLOWORLD"); + }, + ); + pipeline( + async function* () { + await Promise.resolve(); + yield "hello"; + yield "world"; + }, + async function* (source: string[]) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + }, + async function (source: string[]) { + let ret = ""; + for await (const chunk of source) { + ret += chunk; + } + return ret; + }, + pipelineCb, + ); + + await pipelineExecuted; +}); + +Deno.test("Pipeline returns a stream after ending", async () => { + const [pipelineExecuted, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assertEquals(err, undefined); + }, + ); + const ret = pipeline( + async function* () { + await Promise.resolve(); + yield "hello"; + }, + // deno-lint-ignore require-yield + async function* (source: string[]) { + for await (const chunk of source) { + chunk; + } + }, + pipelineCb, + ); + + ret.resume(); + + assertEquals(typeof ret.pipe, "function"); + + await pipelineExecuted; +}); + +Deno.test("Pipeline returns a stream after erroring", async () => { + const errorText = "kaboom"; + + const [pipelineExecuted, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assertEquals(err?.message, errorText); + }, + ); + const ret = pipeline( + // deno-lint-ignore require-yield + async function* () { + await Promise.resolve(); + throw new Error(errorText); + }, + // deno-lint-ignore require-yield + async function* (source: string[]) { + for await (const chunk of source) { + chunk; + } + }, + pipelineCb, + ); + + ret.resume(); + + assertEquals(typeof ret.pipe, "function"); + + await pipelineExecuted; +}); + +Deno.test("Pipeline destination gets destroyed on error", async () => { + const errorText = "kaboom"; + const s = new PassThrough(); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assertEquals(err?.message, errorText); + assertEquals(s.destroyed, true); + }, + ); + pipeline( + // deno-lint-ignore require-yield + async function* () { + throw new Error(errorText); + }, + s, + pipelineCb, + ); + + await pipelineExecution; +}); diff --git a/std/node/_stream/promises.ts b/std/node/_stream/promises.ts new file mode 100644 index 000000000..1adf4ea3f --- /dev/null +++ b/std/node/_stream/promises.ts @@ -0,0 +1,42 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import pl from "./pipeline.ts"; +import type { PipelineArguments } from "./pipeline.ts"; +import eos from "./end_of_stream.ts"; +import type { + FinishedOptions, + StreamImplementations as FinishedStreams, +} from "./end_of_stream.ts"; + +export function pipeline(...streams: PipelineArguments) { + return new Promise((resolve, reject) => { + pl( + ...streams, + (err, value) => { + if (err) { + reject(err); + } else { + resolve(value); + } + }, + ); + }); +} + +export function finished( + stream: FinishedStreams, + opts?: FinishedOptions, +) { + return new Promise<void>((resolve, reject) => { + eos( + stream, + opts || null, + (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }, + ); + }); +} diff --git a/std/node/_stream/promises_test.ts b/std/node/_stream/promises_test.ts new file mode 100644 index 000000000..90803b4af --- /dev/null +++ b/std/node/_stream/promises_test.ts @@ -0,0 +1,84 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import Readable from "./readable.ts"; +import Writable from "./writable.ts"; +import { pipeline } from "./promises.ts"; +import { deferred } from "../../async/mod.ts"; +import { + assert, + assertEquals, + assertThrowsAsync, +} from "../../testing/asserts.ts"; + +Deno.test("Promise pipeline works correctly", async () => { + let pipelineExecuted = 0; + const pipelineExecutedExpected = 1; + const pipelineExpectedExecutions = deferred(); + + let finished = false; + // deno-lint-ignore no-explicit-any + const processed: any[] = []; + const expected = [ + Buffer.from("a"), + Buffer.from("b"), + Buffer.from("c"), + ]; + + const read = new Readable({ + read() {}, + }); + + const write = new Writable({ + write(data, _enc, cb) { + processed.push(data); + cb(); + }, + }); + + write.on("finish", () => { + finished = true; + }); + + for (let i = 0; i < expected.length; i++) { + read.push(expected[i]); + } + read.push(null); + + pipeline(read, write).then(() => { + pipelineExecuted++; + if (pipelineExecuted == pipelineExecutedExpected) { + pipelineExpectedExecutions.resolve(); + } + assert(finished); + assertEquals(processed, expected); + }); + + const pipelineTimeout = setTimeout( + () => pipelineExpectedExecutions.reject(), + 1000, + ); + await pipelineExpectedExecutions; + clearTimeout(pipelineTimeout); + assertEquals(pipelineExecuted, pipelineExecutedExpected); +}); + +Deno.test("Promise pipeline throws on readable destroyed", async () => { + const read = new Readable({ + read() {}, + }); + + const write = new Writable({ + write(_data, _enc, cb) { + cb(); + }, + }); + + read.push("data"); + read.destroy(); + + await assertThrowsAsync( + () => pipeline(read, write), + Error, + "Premature close", + ); +}); diff --git a/std/node/_stream/readable.ts b/std/node/_stream/readable.ts index 72e61dff7..c8ed29953 100644 --- a/std/node/_stream/readable.ts +++ b/std/node/_stream/readable.ts @@ -1,492 +1,46 @@ // Copyright Node.js contributors. All rights reserved. MIT License. -import EventEmitter, { captureRejectionSymbol } from "../events.ts"; +import { captureRejectionSymbol } from "../events.ts"; import Stream from "./stream.ts"; -import { Buffer } from "../buffer.ts"; +import type { Buffer } from "../buffer.ts"; import BufferList from "./buffer_list.ts"; import { ERR_INVALID_OPT_VALUE, ERR_METHOD_NOT_IMPLEMENTED, - ERR_MULTIPLE_CALLBACK, - ERR_STREAM_PUSH_AFTER_EOF, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT, } from "../_errors.ts"; +import type { Encodings } from "../_utils.ts"; import { StringDecoder } from "../string_decoder.ts"; import createReadableStreamAsyncIterator from "./async_iterator.ts"; import streamFrom from "./from.ts"; -import { kConstruct, kDestroy, kPaused } from "./symbols.ts"; -import type Writable from "./writable.ts"; -import { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts"; - -function construct(stream: Readable, cb: () => void) { - const r = stream._readableState; - - if (!stream._construct) { - return; - } - - stream.once(kConstruct, cb); - - r.constructed = false; - - queueMicrotask(() => { - let called = false; - stream._construct?.((err?: Error) => { - r.constructed = true; - - if (called) { - err = new ERR_MULTIPLE_CALLBACK(); - } else { - called = true; - } - - if (r.destroyed) { - stream.emit(kDestroy, err); - } else if (err) { - errorOrDestroy(stream, err, true); - } else { - queueMicrotask(() => stream.emit(kConstruct)); - } - }); - }); -} - -function _destroy( - self: Readable, - err?: Error, - cb?: (error?: Error | null) => void, -) { - self._destroy(err || null, (err) => { - const r = (self as Readable)._readableState; - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!r.errored) { - r.errored = err; - } - } - - r.closed = true; - - if (typeof cb === "function") { - cb(err); - } - - if (err) { - queueMicrotask(() => { - if (!r.errorEmitted) { - r.errorEmitted = true; - self.emit("error", err); - } - r.closeEmitted = true; - if (r.emitClose) { - self.emit("close"); - } - }); - } else { - queueMicrotask(() => { - r.closeEmitted = true; - if (r.emitClose) { - self.emit("close"); - } - }); - } - }); -} - -function errorOrDestroy(stream: Readable, err: Error, sync = false) { - const r = stream._readableState; - - if (r.destroyed) { - return stream; - } - - if (r.autoDestroy) { - stream.destroy(err); - } else if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!r.errored) { - r.errored = err; - } - if (sync) { - queueMicrotask(() => { - if (!r.errorEmitted) { - r.errorEmitted = true; - stream.emit("error", err); - } - }); - } else if (!r.errorEmitted) { - r.errorEmitted = true; - stream.emit("error", err); - } - } -} - -function flow(stream: Readable) { - const state = stream._readableState; - while (state.flowing && stream.read() !== null); -} - -function pipeOnDrain(src: Readable, dest: Writable) { - return function pipeOnDrainFunctionResult() { - const state = src._readableState; - - if (state.awaitDrainWriters === dest) { - state.awaitDrainWriters = null; - } else if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Writable>).delete(dest); - } - - if ( - (!state.awaitDrainWriters || - (state.awaitDrainWriters as Set<Writable>).size === 0) && - src.listenerCount("data") - ) { - state.flowing = true; - flow(src); - } - }; -} - -function updateReadableListening(self: Readable) { - const state = self._readableState; - state.readableListening = self.listenerCount("readable") > 0; - - if (state.resumeScheduled && state[kPaused] === false) { - // Flowing needs to be set to true now, otherwise - // the upcoming resume will not flow. - state.flowing = true; - - // Crude way to check if we should resume. - } else if (self.listenerCount("data") > 0) { - self.resume(); - } else if (!state.readableListening) { - state.flowing = null; - } -} - -function nReadingNextTick(self: Readable) { - self.read(0); -} - -function resume(stream: Readable, state: ReadableState) { - if (!state.resumeScheduled) { - state.resumeScheduled = true; - queueMicrotask(() => resume_(stream, state)); - } -} - -function resume_(stream: Readable, state: ReadableState) { - if (!state.reading) { - stream.read(0); - } - - state.resumeScheduled = false; - stream.emit("resume"); - flow(stream); - if (state.flowing && !state.reading) { - stream.read(0); - } -} - -function readableAddChunk( - stream: Readable, - chunk: string | Buffer | Uint8Array | null, - encoding: undefined | string = undefined, - addToFront: boolean, -) { - const state = stream._readableState; - let usedEncoding = encoding; - - let err; - if (!state.objectMode) { - if (typeof chunk === "string") { - usedEncoding = encoding || state.defaultEncoding; - if (state.encoding !== usedEncoding) { - if (addToFront && state.encoding) { - chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding); - } else { - chunk = Buffer.from(chunk, usedEncoding); - usedEncoding = ""; - } - } - } else if (chunk instanceof Uint8Array) { - chunk = Buffer.from(chunk); - } - } - - if (err) { - errorOrDestroy(stream, err); - } else if (chunk === null) { - state.reading = false; - onEofChunk(stream, state); - } else if (state.objectMode || (chunk.length > 0)) { - if (addToFront) { - if (state.endEmitted) { - errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - } else { - addChunk(stream, state, chunk, true); - } - } else if (state.ended) { - errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state.destroyed || state.errored) { - return false; - } else { - state.reading = false; - if (state.decoder && !usedEncoding) { - //TODO(Soremwar) - //I don't think this cast is right - chunk = state.decoder.write(Buffer.from(chunk as Uint8Array)); - if (state.objectMode || chunk.length !== 0) { - addChunk(stream, state, chunk, false); - } else { - maybeReadMore(stream, state); - } - } else { - addChunk(stream, state, chunk, false); - } - } - } else if (!addToFront) { - state.reading = false; - maybeReadMore(stream, state); - } - - return !state.ended && - (state.length < state.highWaterMark || state.length === 0); -} - -function addChunk( - stream: Readable, - state: ReadableState, - chunk: string | Buffer | Uint8Array, - addToFront: boolean, -) { - if (state.flowing && state.length === 0 && !state.sync) { - if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Writable>).clear(); - } else { - state.awaitDrainWriters = null; - } - stream.emit("data", chunk); - } else { - // Update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - if (addToFront) { - state.buffer.unshift(chunk); - } else { - state.buffer.push(chunk); - } - - if (state.needReadable) { - emitReadable(stream); - } - } - maybeReadMore(stream, state); -} - -function prependListener( - emitter: EventEmitter, - event: string, - // deno-lint-ignore no-explicit-any - fn: (...args: any[]) => any, -) { - if (typeof emitter.prependListener === "function") { - return emitter.prependListener(event, fn); - } - - // This is a hack to make sure that our error handler is attached before any - // userland ones. NEVER DO THIS. This is here only because this code needs - // to continue to work with older versions of Node.js that do not include - //the prependListener() method. The goal is to eventually remove this hack. - // TODO(Soremwar) - // Burn it with fire - // deno-lint-ignore ban-ts-comment - //@ts-ignore - if (emitter._events.get(event)?.length) { - // deno-lint-ignore ban-ts-comment - //@ts-ignore - const listeners = [fn, ...emitter._events.get(event)]; - // deno-lint-ignore ban-ts-comment - //@ts-ignore - emitter._events.set(event, listeners); - } else { - emitter.on(event, fn); - } -} - -/** Pluck off n bytes from an array of buffers. -* Length is the combined lengths of all the buffers in the list. -* This function is designed to be inlinable, so please take care when making -* changes to the function body. -*/ -function fromList(n: number, state: ReadableState) { - // nothing buffered. - if (state.length === 0) { - return null; - } - - let ret; - if (state.objectMode) { - ret = state.buffer.shift(); - } else if (!n || n >= state.length) { - if (state.decoder) { - ret = state.buffer.join(""); - } else if (state.buffer.length === 1) { - ret = state.buffer.first(); - } else { - ret = state.buffer.concat(state.length); - } - state.buffer.clear(); - } else { - ret = state.buffer.consume(n, !!state.decoder); - } - - return ret; -} - -function endReadable(stream: Readable) { - const state = stream._readableState; - - if (!state.endEmitted) { - state.ended = true; - queueMicrotask(() => endReadableNT(state, stream)); - } -} - -function endReadableNT(state: ReadableState, stream: Readable) { - if ( - !state.errorEmitted && !state.closeEmitted && - !state.endEmitted && state.length === 0 - ) { - state.endEmitted = true; - stream.emit("end"); - - if (state.autoDestroy) { - stream.destroy(); - } - } -} - -// Don't raise the hwm > 1GB. -const MAX_HWM = 0x40000000; -function computeNewHighWaterMark(n: number) { - if (n >= MAX_HWM) { - n = MAX_HWM; - } else { - n--; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - n++; - } - return n; -} - -function howMuchToRead(n: number, state: ReadableState) { - if (n <= 0 || (state.length === 0 && state.ended)) { - return 0; - } - if (state.objectMode) { - return 1; - } - if (Number.isNaN(n)) { - // Only flow one buffer at a time. - if (state.flowing && state.length) { - return state.buffer.first().length; - } - return state.length; - } - if (n <= state.length) { - return n; - } - return state.ended ? state.length : 0; -} - -function onEofChunk(stream: Readable, state: ReadableState) { - if (state.ended) return; - if (state.decoder) { - const chunk = state.decoder.end(); - if (chunk && chunk.length) { - state.buffer.push(chunk); - state.length += state.objectMode ? 1 : chunk.length; - } - } - state.ended = true; - - if (state.sync) { - emitReadable(stream); - } else { - state.needReadable = false; - state.emittedReadable = true; - emitReadable_(stream); - } -} - -function emitReadable(stream: Readable) { - const state = stream._readableState; - state.needReadable = false; - if (!state.emittedReadable) { - state.emittedReadable = true; - queueMicrotask(() => emitReadable_(stream)); - } -} - -function emitReadable_(stream: Readable) { - const state = stream._readableState; - if (!state.destroyed && !state.errored && (state.length || state.ended)) { - stream.emit("readable"); - state.emittedReadable = false; - } - - state.needReadable = !state.flowing && - !state.ended && - state.length <= state.highWaterMark; - flow(stream); -} - -function maybeReadMore(stream: Readable, state: ReadableState) { - if (!state.readingMore && state.constructed) { - state.readingMore = true; - queueMicrotask(() => maybeReadMore_(stream, state)); - } -} - -function maybeReadMore_(stream: Readable, state: ReadableState) { - while ( - !state.reading && !state.ended && - (state.length < state.highWaterMark || - (state.flowing && state.length === 0)) - ) { - const len = state.length; - stream.read(0); - if (len === state.length) { - // Didn't get any data, stop spinning. - break; - } - } - state.readingMore = false; -} +import { kDestroy, kPaused } from "./symbols.ts"; +import { + _destroy, + computeNewHighWaterMark, + emitReadable, + endReadable, + errorOrDestroy, + fromList, + howMuchToRead, + nReadingNextTick, + pipeOnDrain, + prependListener, + readableAddChunk, + resume, + updateReadableListening, +} from "./readable_internal.ts"; +import Writable from "./writable.ts"; +import { errorOrDestroy as errorOrDestroyWritable } from "./writable_internal.ts"; +import Duplex, { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts"; export interface ReadableOptions { autoDestroy?: boolean; - construct?: () => void; - //TODO(Soremwar) - //Import available encodings - defaultEncoding?: string; + defaultEncoding?: Encodings; destroy?( this: Readable, error: Error | null, callback: (error: Error | null) => void, ): void; emitClose?: boolean; - //TODO(Soremwar) - //Import available encodings - encoding?: string; + encoding?: Encodings; highWaterMark?: number; objectMode?: boolean; read?(this: Readable): void; @@ -494,7 +48,7 @@ export interface ReadableOptions { export class ReadableState { [kPaused]: boolean | null = null; - awaitDrainWriters: Writable | Set<Writable> | null = null; + awaitDrainWriters: Duplex | Writable | Set<Duplex | Writable> | null = null; buffer = new BufferList(); closed = false; closeEmitted = false; @@ -502,9 +56,7 @@ export class ReadableState { decoder: StringDecoder | null = null; destroyed = false; emittedReadable = false; - //TODO(Soremwar) - //Import available encodings - encoding: string | null = null; + encoding: Encodings | null = null; ended = false; endEmitted = false; errored: Error | null = null; @@ -515,7 +67,7 @@ export class ReadableState { multiAwaitDrain = false; needReadable = false; objectMode: boolean; - pipes: Writable[] = []; + pipes: Array<Duplex | Writable> = []; readable = true; readableListening = false; reading = false; @@ -551,7 +103,6 @@ export class ReadableState { } class Readable extends Stream { - _construct?: (cb: (error?: Error) => void) => void; _readableState: ReadableState; constructor(options?: ReadableOptions) { @@ -563,15 +114,8 @@ class Readable extends Stream { if (typeof options.destroy === "function") { this._destroy = options.destroy; } - if (typeof options.construct === "function") { - this._construct = options.construct; - } } this._readableState = new ReadableState(options); - - construct(this, () => { - maybeReadMore(this, this._readableState); - }); } static from( @@ -690,13 +234,11 @@ class Readable extends Stream { return ret; } - _read() { + _read(_size?: number) { throw new ERR_METHOD_NOT_IMPLEMENTED("_read()"); } - //TODO(Soremwar) - //Should be duplex - pipe<T extends Writable>(dest: T, pipeOpts?: { end?: boolean }): T { + pipe<T extends Duplex | Writable>(dest: T, pipeOpts?: { end?: boolean }): T { // deno-lint-ignore no-this-alias const src = this; const state = this._readableState; @@ -776,7 +318,7 @@ class Readable extends Stream { state.awaitDrainWriters = dest; state.multiAwaitDrain = false; } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - (state.awaitDrainWriters as Set<Writable>).add(dest); + (state.awaitDrainWriters as Set<Duplex | Writable>).add(dest); } src.pause(); } @@ -791,12 +333,13 @@ class Readable extends Stream { unpipe(); dest.removeListener("error", onerror); if (dest.listenerCount("error") === 0) { - //TODO(Soremwar) - //Should be const s = dest._writableState || dest._readableState; - const s = dest._writableState; + const s = dest._writableState || (dest as Duplex)._readableState; if (s && !s.errorEmitted) { - // User incorrectly emitted 'error' directly on the stream. - errorOrDestroyDuplex(dest, er); + if (dest instanceof Duplex) { + errorOrDestroyDuplex(dest as unknown as Duplex, er); + } else { + errorOrDestroyWritable(dest as Writable, er); + } } else { dest.emit("error", er); } @@ -817,7 +360,7 @@ class Readable extends Stream { dest.once("finish", onfinish); function unpipe() { - src.unpipe(dest); + src.unpipe(dest as Writable); } dest.emit("pipe", this); @@ -834,12 +377,11 @@ class Readable extends Stream { this._readableState.flowing === false; } - //TODO(Soremwar) - //Replace string with encoding types - setEncoding(enc: string) { + setEncoding(enc: Encodings) { const decoder = new StringDecoder(enc); this._readableState.decoder = decoder; - this._readableState.encoding = this._readableState.decoder.encoding; + this._readableState.encoding = this._readableState.decoder + .encoding as Encodings; const buffer = this._readableState.buffer; let content = ""; @@ -931,7 +473,7 @@ class Readable extends Stream { off = this.removeListener; - destroy(err?: Error, cb?: () => void) { + destroy(err?: Error | null, cb?: () => void) { const r = this._readableState; if (r.destroyed) { @@ -989,10 +531,8 @@ class Readable extends Stream { this.destroy(err); } - //TODO(Soremwar) - //Same deal, string => encodings // deno-lint-ignore no-explicit-any - push(chunk: any, encoding?: string): boolean { + push(chunk: any, encoding?: Encodings): boolean { return readableAddChunk(this, chunk, encoding, false); } @@ -1233,7 +773,7 @@ class Readable extends Stream { } } -Object.defineProperties(Stream, { +Object.defineProperties(Readable, { _readableState: { enumerable: false }, destroyed: { enumerable: false }, readableBuffer: { enumerable: false }, diff --git a/std/node/_stream/readable_internal.ts b/std/node/_stream/readable_internal.ts new file mode 100644 index 000000000..0ef261d4d --- /dev/null +++ b/std/node/_stream/readable_internal.ts @@ -0,0 +1,438 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import type Duplex from "./duplex.ts"; +import type EventEmitter from "../events.ts"; +import type Readable from "./readable.ts"; +import type Writable from "./writable.ts"; +import type { ReadableState } from "./readable.ts"; +import { kPaused } from "./symbols.ts"; +import { + ERR_STREAM_PUSH_AFTER_EOF, + ERR_STREAM_UNSHIFT_AFTER_END_EVENT, +} from "../_errors.ts"; + +export function _destroy( + self: Readable, + err?: Error | null, + cb?: (error?: Error | null) => void, +) { + self._destroy(err || null, (err) => { + const r = (self as Readable)._readableState; + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!r.errored) { + r.errored = err; + } + } + + r.closed = true; + + if (typeof cb === "function") { + cb(err); + } + + if (err) { + queueMicrotask(() => { + if (!r.errorEmitted) { + r.errorEmitted = true; + self.emit("error", err); + } + r.closeEmitted = true; + if (r.emitClose) { + self.emit("close"); + } + }); + } else { + queueMicrotask(() => { + r.closeEmitted = true; + if (r.emitClose) { + self.emit("close"); + } + }); + } + }); +} + +export function addChunk( + stream: Duplex | Readable, + state: ReadableState, + chunk: string | Buffer | Uint8Array, + addToFront: boolean, +) { + if (state.flowing && state.length === 0 && !state.sync) { + if (state.multiAwaitDrain) { + (state.awaitDrainWriters as Set<Writable>).clear(); + } else { + state.awaitDrainWriters = null; + } + stream.emit("data", chunk); + } else { + // Update the buffer info. + state.length += state.objectMode ? 1 : chunk.length; + if (addToFront) { + state.buffer.unshift(chunk); + } else { + state.buffer.push(chunk); + } + + if (state.needReadable) { + emitReadable(stream); + } + } + maybeReadMore(stream, state); +} + +// Don't raise the hwm > 1GB. +const MAX_HWM = 0x40000000; +export function computeNewHighWaterMark(n: number) { + if (n >= MAX_HWM) { + n = MAX_HWM; + } else { + n--; + n |= n >>> 1; + n |= n >>> 2; + n |= n >>> 4; + n |= n >>> 8; + n |= n >>> 16; + n++; + } + return n; +} + +export function emitReadable(stream: Duplex | Readable) { + const state = stream._readableState; + state.needReadable = false; + if (!state.emittedReadable) { + state.emittedReadable = true; + queueMicrotask(() => emitReadable_(stream)); + } +} + +function emitReadable_(stream: Duplex | Readable) { + const state = stream._readableState; + if (!state.destroyed && !state.errored && (state.length || state.ended)) { + stream.emit("readable"); + state.emittedReadable = false; + } + + state.needReadable = !state.flowing && + !state.ended && + state.length <= state.highWaterMark; + flow(stream); +} + +export function endReadable(stream: Readable) { + const state = stream._readableState; + + if (!state.endEmitted) { + state.ended = true; + queueMicrotask(() => endReadableNT(state, stream)); + } +} + +function endReadableNT(state: ReadableState, stream: Readable) { + if ( + !state.errorEmitted && !state.closeEmitted && + !state.endEmitted && state.length === 0 + ) { + state.endEmitted = true; + stream.emit("end"); + + if (state.autoDestroy) { + stream.destroy(); + } + } +} + +export function errorOrDestroy( + stream: Duplex | Readable, + err: Error, + sync = false, +) { + const r = stream._readableState; + + if (r.destroyed) { + return stream; + } + + if (r.autoDestroy) { + stream.destroy(err); + } else if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!r.errored) { + r.errored = err; + } + if (sync) { + queueMicrotask(() => { + if (!r.errorEmitted) { + r.errorEmitted = true; + stream.emit("error", err); + } + }); + } else if (!r.errorEmitted) { + r.errorEmitted = true; + stream.emit("error", err); + } + } +} + +function flow(stream: Duplex | Readable) { + const state = stream._readableState; + while (state.flowing && stream.read() !== null); +} + +/** Pluck off n bytes from an array of buffers. +* Length is the combined lengths of all the buffers in the list. +* This function is designed to be inlinable, so please take care when making +* changes to the function body. +*/ +export function fromList(n: number, state: ReadableState) { + // nothing buffered. + if (state.length === 0) { + return null; + } + + let ret; + if (state.objectMode) { + ret = state.buffer.shift(); + } else if (!n || n >= state.length) { + if (state.decoder) { + ret = state.buffer.join(""); + } else if (state.buffer.length === 1) { + ret = state.buffer.first(); + } else { + ret = state.buffer.concat(state.length); + } + state.buffer.clear(); + } else { + ret = state.buffer.consume(n, !!state.decoder); + } + + return ret; +} + +export function howMuchToRead(n: number, state: ReadableState) { + if (n <= 0 || (state.length === 0 && state.ended)) { + return 0; + } + if (state.objectMode) { + return 1; + } + if (Number.isNaN(n)) { + // Only flow one buffer at a time. + if (state.flowing && state.length) { + return state.buffer.first().length; + } + return state.length; + } + if (n <= state.length) { + return n; + } + return state.ended ? state.length : 0; +} + +export function maybeReadMore(stream: Readable, state: ReadableState) { + if (!state.readingMore && state.constructed) { + state.readingMore = true; + queueMicrotask(() => maybeReadMore_(stream, state)); + } +} + +function maybeReadMore_(stream: Readable, state: ReadableState) { + while ( + !state.reading && !state.ended && + (state.length < state.highWaterMark || + (state.flowing && state.length === 0)) + ) { + const len = state.length; + stream.read(0); + if (len === state.length) { + // Didn't get any data, stop spinning. + break; + } + } + state.readingMore = false; +} + +export function nReadingNextTick(self: Duplex | Readable) { + self.read(0); +} + +export function onEofChunk(stream: Duplex | Readable, state: ReadableState) { + if (state.ended) return; + if (state.decoder) { + const chunk = state.decoder.end(); + if (chunk && chunk.length) { + state.buffer.push(chunk); + state.length += state.objectMode ? 1 : chunk.length; + } + } + state.ended = true; + + if (state.sync) { + emitReadable(stream); + } else { + state.needReadable = false; + state.emittedReadable = true; + emitReadable_(stream); + } +} + +export function pipeOnDrain(src: Duplex | Readable, dest: Duplex | Writable) { + return function pipeOnDrainFunctionResult() { + const state = src._readableState; + + if (state.awaitDrainWriters === dest) { + state.awaitDrainWriters = null; + } else if (state.multiAwaitDrain) { + (state.awaitDrainWriters as Set<Duplex | Writable>).delete(dest); + } + + if ( + (!state.awaitDrainWriters || + (state.awaitDrainWriters as Set<Writable>).size === 0) && + src.listenerCount("data") + ) { + state.flowing = true; + flow(src); + } + }; +} + +export function prependListener( + emitter: EventEmitter, + event: string, + // deno-lint-ignore no-explicit-any + fn: (...args: any[]) => any, +) { + if (typeof emitter.prependListener === "function") { + return emitter.prependListener(event, fn); + } + + // This is a hack to make sure that our error handler is attached before any + // userland ones. NEVER DO THIS. This is here only because this code needs + // to continue to work with older versions of Node.js that do not include + //the prependListener() method. The goal is to eventually remove this hack. + // TODO(Soremwar) + // Burn it with fire + // deno-lint-ignore ban-ts-comment + //@ts-ignore + if (emitter._events.get(event)?.length) { + // deno-lint-ignore ban-ts-comment + //@ts-ignore + const listeners = [fn, ...emitter._events.get(event)]; + // deno-lint-ignore ban-ts-comment + //@ts-ignore + emitter._events.set(event, listeners); + } else { + emitter.on(event, fn); + } +} + +export function readableAddChunk( + stream: Duplex | Readable, + chunk: string | Buffer | Uint8Array | null, + encoding: undefined | string = undefined, + addToFront: boolean, +) { + const state = stream._readableState; + let usedEncoding = encoding; + + let err; + if (!state.objectMode) { + if (typeof chunk === "string") { + usedEncoding = encoding || state.defaultEncoding; + if (state.encoding !== usedEncoding) { + if (addToFront && state.encoding) { + chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding); + } else { + chunk = Buffer.from(chunk, usedEncoding); + usedEncoding = ""; + } + } + } else if (chunk instanceof Uint8Array) { + chunk = Buffer.from(chunk); + } + } + + if (err) { + errorOrDestroy(stream, err); + } else if (chunk === null) { + state.reading = false; + onEofChunk(stream, state); + } else if (state.objectMode || (chunk.length > 0)) { + if (addToFront) { + if (state.endEmitted) { + errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); + } else { + addChunk(stream, state, chunk, true); + } + } else if (state.ended) { + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); + } else if (state.destroyed || state.errored) { + return false; + } else { + state.reading = false; + if (state.decoder && !usedEncoding) { + //TODO(Soremwar) + //I don't think this cast is right + chunk = state.decoder.write(Buffer.from(chunk as Uint8Array)); + if (state.objectMode || chunk.length !== 0) { + addChunk(stream, state, chunk, false); + } else { + maybeReadMore(stream, state); + } + } else { + addChunk(stream, state, chunk, false); + } + } + } else if (!addToFront) { + state.reading = false; + maybeReadMore(stream, state); + } + + return !state.ended && + (state.length < state.highWaterMark || state.length === 0); +} + +export function resume(stream: Duplex | Readable, state: ReadableState) { + if (!state.resumeScheduled) { + state.resumeScheduled = true; + queueMicrotask(() => resume_(stream, state)); + } +} + +function resume_(stream: Duplex | Readable, state: ReadableState) { + if (!state.reading) { + stream.read(0); + } + + state.resumeScheduled = false; + stream.emit("resume"); + flow(stream); + if (state.flowing && !state.reading) { + stream.read(0); + } +} + +export function updateReadableListening(self: Duplex | Readable) { + const state = self._readableState; + state.readableListening = self.listenerCount("readable") > 0; + + if (state.resumeScheduled && state[kPaused] === false) { + // Flowing needs to be set to true now, otherwise + // the upcoming resume will not flow. + state.flowing = true; + + // Crude way to check if we should resume. + } else if (self.listenerCount("data") > 0) { + self.resume(); + } else if (!state.readableListening) { + state.flowing = null; + } +} diff --git a/std/node/_stream/stream.ts b/std/node/_stream/stream.ts index 708b8bcd3..4daafc77b 100644 --- a/std/node/_stream/stream.ts +++ b/std/node/_stream/stream.ts @@ -1,6 +1,7 @@ // Copyright Node.js contributors. All rights reserved. MIT License. import { Buffer } from "../buffer.ts"; import EventEmitter from "../events.ts"; +import type Readable from "./readable.ts"; import type Writable from "./writable.ts"; import { types } from "../util.ts"; @@ -12,7 +13,7 @@ class Stream extends EventEmitter { static _isUint8Array = types.isUint8Array; static _uint8ArrayToBuffer = (chunk: Uint8Array) => Buffer.from(chunk); - pipe(dest: Writable, options: { end: boolean }) { + pipe(dest: Readable | Writable, options?: { end?: boolean }) { // deno-lint-ignore no-this-alias const source = this; @@ -31,7 +32,8 @@ class Stream extends EventEmitter { if (didOnEnd) return; didOnEnd = true; - dest.end(); + // 'end' is only called on Writable streams + (dest as Writable).end(); } function onclose() { diff --git a/std/node/_stream/transform.ts b/std/node/_stream/transform.ts new file mode 100644 index 000000000..a4246e81a --- /dev/null +++ b/std/node/_stream/transform.ts @@ -0,0 +1,132 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Encodings } from "../_utils.ts"; +import Duplex from "./duplex.ts"; +import type { DuplexOptions } from "./duplex.ts"; +import type { writeV } from "./writable_internal.ts"; +import { ERR_METHOD_NOT_IMPLEMENTED } from "../_errors.ts"; + +const kCallback = Symbol("kCallback"); + +type TransformFlush = ( + this: Transform, + // deno-lint-ignore no-explicit-any + callback: (error?: Error | null, data?: any) => void, +) => void; + +export interface TransformOptions extends DuplexOptions { + read?(this: Transform, size: number): void; + write?( + this: Transform, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: Encodings, + callback: (error?: Error | null) => void, + ): void; + writev?: writeV; + final?(this: Transform, callback: (error?: Error | null) => void): void; + destroy?( + this: Transform, + error: Error | null, + callback: (error: Error | null) => void, + ): void; + transform?( + this: Transform, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: Encodings, + // deno-lint-ignore no-explicit-any + callback: (error?: Error | null, data?: any) => void, + ): void; + flush?: TransformFlush; +} + +export default class Transform extends Duplex { + [kCallback]: null | ((error?: Error | null) => void); + _flush?: TransformFlush; + + constructor(options?: TransformOptions) { + super(options); + this._readableState.sync = false; + + this[kCallback] = null; + + if (options) { + if (typeof options.transform === "function") { + this._transform = options.transform; + } + + if (typeof options.flush === "function") { + this._flush = options.flush; + } + } + + this.on("prefinish", function (this: Transform) { + if (typeof this._flush === "function" && !this.destroyed) { + this._flush((er, data) => { + if (er) { + this.destroy(er); + return; + } + + if (data != null) { + this.push(data); + } + this.push(null); + }); + } else { + this.push(null); + } + }); + } + + _read = () => { + if (this[kCallback]) { + const callback = this[kCallback] as (error?: Error | null) => void; + this[kCallback] = null; + callback(); + } + }; + + _transform( + // deno-lint-ignore no-explicit-any + _chunk: any, + _encoding: string, + // deno-lint-ignore no-explicit-any + _callback: (error?: Error | null, data?: any) => void, + ) { + throw new ERR_METHOD_NOT_IMPLEMENTED("_transform()"); + } + + _write = ( + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + callback: (error?: Error | null) => void, + ) => { + const rState = this._readableState; + const wState = this._writableState; + const length = rState.length; + + this._transform(chunk, encoding, (err, val) => { + if (err) { + callback(err); + return; + } + + if (val != null) { + this.push(val); + } + + if ( + wState.ended || // Backwards compat. + length === rState.length || // Backwards compat. + rState.length < rState.highWaterMark || + rState.length === 0 + ) { + callback(); + } else { + this[kCallback] = callback; + } + }); + }; +} diff --git a/std/node/_stream/transform_test.ts b/std/node/_stream/transform_test.ts new file mode 100644 index 000000000..d3b90ff01 --- /dev/null +++ b/std/node/_stream/transform_test.ts @@ -0,0 +1,68 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import Transform from "./transform.ts"; +import finished from "./end_of_stream.ts"; +import { deferred } from "../../async/mod.ts"; +import { assert, assertEquals } from "../../testing/asserts.ts"; + +Deno.test("Transform stream finishes correctly", async () => { + let finishedExecuted = 0; + const finishedExecutedExpected = 1; + const finishedExecution = deferred(); + + const tr = new Transform({ + transform(_data, _enc, cb) { + cb(); + }, + }); + + let finish = false; + let ended = false; + + tr.on("end", () => { + ended = true; + }); + + tr.on("finish", () => { + finish = true; + }); + + finished(tr, (err) => { + finishedExecuted++; + if (finishedExecuted === finishedExecutedExpected) { + finishedExecution.resolve(); + } + assert(!err, "no error"); + assert(finish); + assert(ended); + }); + + tr.end(); + tr.resume(); + + const finishedTimeout = setTimeout( + () => finishedExecution.reject(), + 1000, + ); + await finishedExecution; + clearTimeout(finishedTimeout); + assertEquals(finishedExecuted, finishedExecutedExpected); +}); + +Deno.test("Transform stream flushes data correctly", () => { + const expected = "asdf"; + + const t = new Transform({ + transform: (_d, _e, n) => { + n(); + }, + flush: (n) => { + n(null, expected); + }, + }); + + t.end(Buffer.from("blerg")); + t.on("data", (data) => { + assertEquals(data.toString(), expected); + }); +}); diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts index 158af8325..534fc22fb 100644 --- a/std/node/_stream/writable.ts +++ b/std/node/_stream/writable.ts @@ -2,12 +2,10 @@ import { Buffer } from "../buffer.ts"; import Stream from "./stream.ts"; import { captureRejectionSymbol } from "../events.ts"; -import { kConstruct, kDestroy } from "./symbols.ts"; import { ERR_INVALID_ARG_TYPE, ERR_INVALID_OPT_VALUE, ERR_METHOD_NOT_IMPLEMENTED, - ERR_MULTIPLE_CALLBACK, ERR_STREAM_ALREADY_FINISHED, ERR_STREAM_CANNOT_PIPE, ERR_STREAM_DESTROYED, @@ -15,493 +13,27 @@ import { ERR_STREAM_WRITE_AFTER_END, ERR_UNKNOWN_ENCODING, } from "../_errors.ts"; - -function nop() {} - -//TODO(Soremwar) -//Bring in encodings -type write_v = ( - // deno-lint-ignore no-explicit-any - chunks: Array<{ chunk: any; encoding: string }>, - callback: (error?: Error | null) => void, -) => void; - -type AfterWriteTick = { - cb: (error?: Error) => void; - count: number; - state: WritableState; - stream: Writable; -}; - -const kOnFinished = Symbol("kOnFinished"); - -function destroy(this: Writable, err?: Error, cb?: () => void) { - const w = this._writableState; - - if (w.destroyed) { - if (typeof cb === "function") { - cb(); - } - - return this; - } - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - } - - w.destroyed = true; - - if (!w.constructed) { - this.once(kDestroy, (er) => { - _destroy(this, err || er, cb); - }); - } else { - _destroy(this, err, cb); - } - - return this; -} - -function _destroy( - self: Writable, - err?: Error, - cb?: (error?: Error | null) => void, -) { - self._destroy(err || null, (err) => { - const w = self._writableState; - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - } - - w.closed = true; - - if (typeof cb === "function") { - cb(err); - } - - if (err) { - queueMicrotask(() => { - if (!w.errorEmitted) { - w.errorEmitted = true; - self.emit("error", err); - } - w.closeEmitted = true; - if (w.emitClose) { - self.emit("close"); - } - }); - } else { - queueMicrotask(() => { - w.closeEmitted = true; - if (w.emitClose) { - self.emit("close"); - } - }); - } - }); -} - -function errorOrDestroy(stream: Writable, err: Error, sync = false) { - const w = stream._writableState; - - if (w.destroyed) { - return stream; - } - - if (w.autoDestroy) { - stream.destroy(err); - } else if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - if (sync) { - queueMicrotask(() => { - if (w.errorEmitted) { - return; - } - w.errorEmitted = true; - stream.emit("error", err); - }); - } else { - if (w.errorEmitted) { - return; - } - w.errorEmitted = true; - stream.emit("error", err); - } - } -} - -function construct(stream: Writable, cb: (error: Error) => void) { - if (!stream._construct) { - return; - } - - stream.once(kConstruct, cb); - const w = stream._writableState; - - w.constructed = false; - - queueMicrotask(() => { - let called = false; - stream._construct?.((err) => { - w.constructed = true; - - if (called) { - err = new ERR_MULTIPLE_CALLBACK(); - } else { - called = true; - } - - if (w.destroyed) { - stream.emit(kDestroy, err); - } else if (err) { - errorOrDestroy(stream, err, true); - } else { - queueMicrotask(() => { - stream.emit(kConstruct); - }); - } - }); - }); -} - -//TODO(Soremwar) -//Bring encodings in -function writeOrBuffer( - stream: Writable, - state: WritableState, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - callback: (error: Error) => void, -) { - const len = state.objectMode ? 1 : chunk.length; - - state.length += len; - - if (state.writing || state.corked || state.errored || !state.constructed) { - state.buffered.push({ chunk, encoding, callback }); - if (state.allBuffers && encoding !== "buffer") { - state.allBuffers = false; - } - if (state.allNoop && callback !== nop) { - state.allNoop = false; - } - } else { - state.writelen = len; - state.writecb = callback; - state.writing = true; - state.sync = true; - stream._write(chunk, encoding, state.onwrite); - state.sync = false; - } - - const ret = state.length < state.highWaterMark; - - if (!ret) { - state.needDrain = true; - } - - return ret && !state.errored && !state.destroyed; -} - -//TODO(Soremwar) -//Bring encodings in -function doWrite( - stream: Writable, - state: WritableState, - writev: boolean, - len: number, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - cb: (error: Error) => void, -) { - state.writelen = len; - state.writecb = cb; - state.writing = true; - state.sync = true; - if (state.destroyed) { - state.onwrite(new ERR_STREAM_DESTROYED("write")); - } else if (writev) { - (stream._writev as unknown as write_v)(chunk, state.onwrite); - } else { - stream._write(chunk, encoding, state.onwrite); - } - state.sync = false; -} - -function onwriteError( - stream: Writable, - state: WritableState, - er: Error, - cb: (error: Error) => void, -) { - --state.pendingcb; - - cb(er); - errorBuffer(state); - errorOrDestroy(stream, er); -} - -function onwrite(stream: Writable, er?: Error | null) { - const state = stream._writableState; - const sync = state.sync; - const cb = state.writecb; - - if (typeof cb !== "function") { - errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); - return; - } - - state.writing = false; - state.writecb = null; - state.length -= state.writelen; - state.writelen = 0; - - if (er) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - er.stack; - - if (!state.errored) { - state.errored = er; - } - - if (sync) { - queueMicrotask(() => onwriteError(stream, state, er, cb)); - } else { - onwriteError(stream, state, er, cb); - } - } else { - if (state.buffered.length > state.bufferedIndex) { - clearBuffer(stream, state); - } - - if (sync) { - if ( - state.afterWriteTickInfo !== null && - state.afterWriteTickInfo.cb === cb - ) { - state.afterWriteTickInfo.count++; - } else { - state.afterWriteTickInfo = { - count: 1, - cb: (cb as (error?: Error) => void), - stream, - state, - }; - queueMicrotask(() => - afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) - ); - } - } else { - afterWrite(stream, state, 1, cb as (error?: Error) => void); - } - } -} - -function afterWriteTick({ - cb, - count, - state, - stream, -}: AfterWriteTick) { - state.afterWriteTickInfo = null; - return afterWrite(stream, state, count, cb); -} - -function afterWrite( - stream: Writable, - state: WritableState, - count: number, - cb: (error?: Error) => void, -) { - const needDrain = !state.ending && !stream.destroyed && state.length === 0 && - state.needDrain; - if (needDrain) { - state.needDrain = false; - stream.emit("drain"); - } - - while (count-- > 0) { - state.pendingcb--; - cb(); - } - - if (state.destroyed) { - errorBuffer(state); - } - - finishMaybe(stream, state); -} - -/** If there's something in the buffer waiting, then invoke callbacks.*/ -function errorBuffer(state: WritableState) { - if (state.writing) { - return; - } - - for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { - const { chunk, callback } = state.buffered[n]; - const len = state.objectMode ? 1 : chunk.length; - state.length -= len; - callback(new ERR_STREAM_DESTROYED("write")); - } - - for (const callback of state[kOnFinished].splice(0)) { - callback(new ERR_STREAM_DESTROYED("end")); - } - - resetBuffer(state); -} - -/** If there's something in the buffer waiting, then process it.*/ -function clearBuffer(stream: Writable, state: WritableState) { - if ( - state.corked || - state.bufferProcessing || - state.destroyed || - !state.constructed - ) { - return; - } - - const { buffered, bufferedIndex, objectMode } = state; - const bufferedLength = buffered.length - bufferedIndex; - - if (!bufferedLength) { - return; - } - - const i = bufferedIndex; - - state.bufferProcessing = true; - if (bufferedLength > 1 && stream._writev) { - state.pendingcb -= bufferedLength - 1; - - const callback = state.allNoop ? nop : (err: Error) => { - for (let n = i; n < buffered.length; ++n) { - buffered[n].callback(err); - } - }; - const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); - - doWrite(stream, state, true, state.length, chunks, "", callback); - - resetBuffer(state); - } else { - do { - const { chunk, encoding, callback } = buffered[i]; - const len = objectMode ? 1 : chunk.length; - doWrite(stream, state, false, len, chunk, encoding, callback); - } while (i < buffered.length && !state.writing); - - if (i === buffered.length) { - resetBuffer(state); - } else if (i > 256) { - buffered.splice(0, i); - state.bufferedIndex = 0; - } else { - state.bufferedIndex = i; - } - } - state.bufferProcessing = false; -} - -function finish(stream: Writable, state: WritableState) { - state.pendingcb--; - if (state.errorEmitted || state.closeEmitted) { - return; - } - - state.finished = true; - - for (const callback of state[kOnFinished].splice(0)) { - callback(); - } - - stream.emit("finish"); - - if (state.autoDestroy) { - stream.destroy(); - } -} - -function finishMaybe(stream: Writable, state: WritableState, sync?: boolean) { - if (needFinish(state)) { - prefinish(stream, state); - if (state.pendingcb === 0 && needFinish(state)) { - state.pendingcb++; - if (sync) { - queueMicrotask(() => finish(stream, state)); - } else { - finish(stream, state); - } - } - } -} - -function prefinish(stream: Writable, state: WritableState) { - if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === "function" && !state.destroyed) { - state.finalCalled = true; - - state.sync = true; - state.pendingcb++; - stream._final((err) => { - state.pendingcb--; - if (err) { - for (const callback of state[kOnFinished].splice(0)) { - callback(err); - } - errorOrDestroy(stream, err, state.sync); - } else if (needFinish(state)) { - state.prefinished = true; - stream.emit("prefinish"); - state.pendingcb++; - queueMicrotask(() => finish(stream, state)); - } - }); - state.sync = false; - } else { - state.prefinished = true; - stream.emit("prefinish"); - } - } -} - -function needFinish(state: WritableState) { - return (state.ending && - state.constructed && - state.length === 0 && - !state.errored && - state.buffered.length === 0 && - !state.finished && - !state.writing); -} - -interface WritableOptions { +import type { AfterWriteTick, writeV } from "./writable_internal.ts"; +import { + clearBuffer, + destroy, + errorBuffer, + errorOrDestroy, + finishMaybe, + kOnFinished, + nop, + onwrite, + resetBuffer, + writeOrBuffer, +} from "./writable_internal.ts"; +import type { Encodings } from "../_utils.ts"; + +type WritableEncodings = Encodings | "buffer"; + +export interface WritableOptions { autoDestroy?: boolean; decodeStrings?: boolean; - //TODO(Soremwar) - //Bring encodings in - defaultEncoding?: string; + defaultEncoding?: WritableEncodings; destroy?( this: Writable, error: Error | null, @@ -511,17 +43,13 @@ interface WritableOptions { final?(this: Writable, callback: (error?: Error | null) => void): void; highWaterMark?: number; objectMode?: boolean; - //TODO(Soremwar) - //Bring encodings in write?( this: Writable, // deno-lint-ignore no-explicit-any chunk: any, - encoding: string, + encoding: WritableEncodings, callback: (error?: Error | null) => void, ): void; - //TODO(Soremwar) - //Bring encodings in writev?( this: Writable, // deno-lint-ignore no-explicit-any @@ -530,14 +58,12 @@ interface WritableOptions { ): void; } -class WritableState { +export class WritableState { [kOnFinished]: Array<(error?: Error) => void> = []; afterWriteTickInfo: null | AfterWriteTick = null; allBuffers = true; allNoop = true; autoDestroy: boolean; - //TODO(Soremwar) - //Bring in encodings buffered: Array<{ allBuffers?: boolean; // deno-lint-ignore no-explicit-any @@ -552,7 +78,7 @@ class WritableState { constructed: boolean; corked = 0; decodeStrings: boolean; - defaultEncoding: string; + defaultEncoding: WritableEncodings; destroyed = false; emitClose: boolean; ended = false; @@ -608,25 +134,17 @@ class WritableState { } } -function resetBuffer(state: WritableState) { - state.buffered = []; - state.bufferedIndex = 0; - state.allBuffers = true; - state.allNoop = true; -} - /** A bit simpler than readable streams. * Implement an async `._write(chunk, encoding, cb)`, and it'll handle all * the drain event emission and buffering. */ class Writable extends Stream { - _construct?: (cb: (error?: Error) => void) => void; _final?: ( this: Writable, callback: (error?: Error | null | undefined) => void, ) => void; _writableState: WritableState; - _writev?: write_v | null = null; + _writev?: writeV | null = null; constructor(options?: WritableOptions) { super(); @@ -649,16 +167,6 @@ class Writable extends Stream { this._final = options.final; } } - - construct(this, () => { - const state = this._writableState; - - if (!state.writing) { - clearBuffer(this, state); - } - - finishMaybe(this, state); - }); } [captureRejectionSymbol](err?: Error) { @@ -735,7 +243,7 @@ class Writable extends Stream { cb(err); } - destroy(err?: Error, cb?: () => void) { + destroy(err?: Error | null, cb?: () => void) { const state = this._writableState; if (!state.destroyed) { queueMicrotask(() => errorBuffer(state)); @@ -747,25 +255,19 @@ class Writable extends Stream { end(cb?: () => void): void; // deno-lint-ignore no-explicit-any end(chunk: any, cb?: () => void): void; - //TODO(Soremwar) - //Bring in encodings // deno-lint-ignore no-explicit-any - end(chunk: any, encoding: string, cb?: () => void): void; + end(chunk: any, encoding: WritableEncodings, cb?: () => void): void; end( // deno-lint-ignore no-explicit-any x?: any | (() => void), - //TODO(Soremwar) - //Bring in encodings - y?: string | (() => void), + y?: WritableEncodings | (() => void), z?: () => void, ) { const state = this._writableState; // deno-lint-ignore no-explicit-any let chunk: any | null; - //TODO(Soremwar) - //Bring in encodings - let encoding: string | null; + let encoding: WritableEncodings | null; let cb: undefined | ((error?: Error) => void); if (typeof x === "function") { @@ -778,7 +280,7 @@ class Writable extends Stream { cb = y; } else { chunk = x; - encoding = y as string; + encoding = y as WritableEncodings; cb = z; } @@ -815,8 +317,6 @@ class Writable extends Stream { return this; } - //TODO(Soremwar) - //Bring in encodings _write( // deno-lint-ignore no-explicit-any chunk: any, @@ -838,27 +338,21 @@ class Writable extends Stream { // deno-lint-ignore no-explicit-any write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean; - //TODO(Soremwar) - //Bring in encodings write( // deno-lint-ignore no-explicit-any chunk: any, - encoding: string | null, + encoding: WritableEncodings | null, cb?: (error: Error | null | undefined) => void, ): boolean; - //TODO(Soremwar) - //Bring in encodings write( // deno-lint-ignore no-explicit-any chunk: any, - x?: string | null | ((error: Error | null | undefined) => void), + x?: WritableEncodings | null | ((error: Error | null | undefined) => void), y?: ((error: Error | null | undefined) => void), ) { const state = this._writableState; - //TODO(Soremwar) - //Bring in encodings - let encoding: string; + let encoding: WritableEncodings; let cb: (error?: Error | null) => void; if (typeof x === "function") { @@ -933,8 +427,6 @@ class Writable extends Stream { } } - //TODO(Soremwar) - //Bring allowed encodings setDefaultEncoding(encoding: string) { // node::ParseEncoding() requires lower case. if (typeof encoding === "string") { @@ -943,10 +435,9 @@ class Writable extends Stream { if (!Buffer.isEncoding(encoding)) { throw new ERR_UNKNOWN_ENCODING(encoding); } - this._writableState.defaultEncoding = encoding; + this._writableState.defaultEncoding = encoding as WritableEncodings; return this; } } export default Writable; -export { WritableState }; diff --git a/std/node/_stream/writable_internal.ts b/std/node/_stream/writable_internal.ts new file mode 100644 index 000000000..e8c001af0 --- /dev/null +++ b/std/node/_stream/writable_internal.ts @@ -0,0 +1,457 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import type Duplex from "./duplex.ts"; +import type Writable from "./writable.ts"; +import type { WritableState } from "./writable.ts"; +import { kDestroy } from "./symbols.ts"; +import { ERR_MULTIPLE_CALLBACK, ERR_STREAM_DESTROYED } from "../_errors.ts"; + +export type writeV = ( + // deno-lint-ignore no-explicit-any + chunks: Array<{ chunk: any; encoding: string }>, + callback: (error?: Error | null) => void, +) => void; + +export type AfterWriteTick = { + cb: (error?: Error) => void; + count: number; + state: WritableState; + stream: Writable; +}; + +export const kOnFinished = Symbol("kOnFinished"); + +function _destroy( + self: Writable, + err?: Error | null, + cb?: (error?: Error | null) => void, +) { + self._destroy(err || null, (err) => { + const w = self._writableState; + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + } + + w.closed = true; + + if (typeof cb === "function") { + cb(err); + } + + if (err) { + queueMicrotask(() => { + if (!w.errorEmitted) { + w.errorEmitted = true; + self.emit("error", err); + } + w.closeEmitted = true; + if (w.emitClose) { + self.emit("close"); + } + }); + } else { + queueMicrotask(() => { + w.closeEmitted = true; + if (w.emitClose) { + self.emit("close"); + } + }); + } + }); +} + +export function afterWrite( + stream: Writable, + state: WritableState, + count: number, + cb: (error?: Error) => void, +) { + const needDrain = !state.ending && !stream.destroyed && state.length === 0 && + state.needDrain; + if (needDrain) { + state.needDrain = false; + stream.emit("drain"); + } + + while (count-- > 0) { + state.pendingcb--; + cb(); + } + + if (state.destroyed) { + errorBuffer(state); + } + + finishMaybe(stream, state); +} + +export function afterWriteTick({ + cb, + count, + state, + stream, +}: AfterWriteTick) { + state.afterWriteTickInfo = null; + return afterWrite(stream, state, count, cb); +} + +/** If there's something in the buffer waiting, then process it.*/ +export function clearBuffer(stream: Duplex | Writable, state: WritableState) { + if ( + state.corked || + state.bufferProcessing || + state.destroyed || + !state.constructed + ) { + return; + } + + const { buffered, bufferedIndex, objectMode } = state; + const bufferedLength = buffered.length - bufferedIndex; + + if (!bufferedLength) { + return; + } + + const i = bufferedIndex; + + state.bufferProcessing = true; + if (bufferedLength > 1 && stream._writev) { + state.pendingcb -= bufferedLength - 1; + + const callback = state.allNoop ? nop : (err: Error) => { + for (let n = i; n < buffered.length; ++n) { + buffered[n].callback(err); + } + }; + const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); + + doWrite(stream, state, true, state.length, chunks, "", callback); + + resetBuffer(state); + } else { + do { + const { chunk, encoding, callback } = buffered[i]; + const len = objectMode ? 1 : chunk.length; + doWrite(stream, state, false, len, chunk, encoding, callback); + } while (i < buffered.length && !state.writing); + + if (i === buffered.length) { + resetBuffer(state); + } else if (i > 256) { + buffered.splice(0, i); + state.bufferedIndex = 0; + } else { + state.bufferedIndex = i; + } + } + state.bufferProcessing = false; +} + +export function destroy(this: Writable, err?: Error | null, cb?: () => void) { + const w = this._writableState; + + if (w.destroyed) { + if (typeof cb === "function") { + cb(); + } + + return this; + } + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + } + + w.destroyed = true; + + if (!w.constructed) { + this.once(kDestroy, (er) => { + _destroy(this, err || er, cb); + }); + } else { + _destroy(this, err, cb); + } + + return this; +} + +function doWrite( + stream: Duplex | Writable, + state: WritableState, + writev: boolean, + len: number, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + cb: (error: Error) => void, +) { + state.writelen = len; + state.writecb = cb; + state.writing = true; + state.sync = true; + if (state.destroyed) { + state.onwrite(new ERR_STREAM_DESTROYED("write")); + } else if (writev) { + (stream._writev as unknown as writeV)(chunk, state.onwrite); + } else { + stream._write(chunk, encoding, state.onwrite); + } + state.sync = false; +} + +/** If there's something in the buffer waiting, then invoke callbacks.*/ +export function errorBuffer(state: WritableState) { + if (state.writing) { + return; + } + + for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { + const { chunk, callback } = state.buffered[n]; + const len = state.objectMode ? 1 : chunk.length; + state.length -= len; + callback(new ERR_STREAM_DESTROYED("write")); + } + + for (const callback of state[kOnFinished].splice(0)) { + callback(new ERR_STREAM_DESTROYED("end")); + } + + resetBuffer(state); +} + +export function errorOrDestroy(stream: Writable, err: Error, sync = false) { + const w = stream._writableState; + + if (w.destroyed) { + return stream; + } + + if (w.autoDestroy) { + stream.destroy(err); + } else if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + if (sync) { + queueMicrotask(() => { + if (w.errorEmitted) { + return; + } + w.errorEmitted = true; + stream.emit("error", err); + }); + } else { + if (w.errorEmitted) { + return; + } + w.errorEmitted = true; + stream.emit("error", err); + } + } +} + +function finish(stream: Writable, state: WritableState) { + state.pendingcb--; + if (state.errorEmitted || state.closeEmitted) { + return; + } + + state.finished = true; + + for (const callback of state[kOnFinished].splice(0)) { + callback(); + } + + stream.emit("finish"); + + if (state.autoDestroy) { + stream.destroy(); + } +} + +export function finishMaybe( + stream: Writable, + state: WritableState, + sync?: boolean, +) { + if (needFinish(state)) { + prefinish(stream, state); + if (state.pendingcb === 0 && needFinish(state)) { + state.pendingcb++; + if (sync) { + queueMicrotask(() => finish(stream, state)); + } else { + finish(stream, state); + } + } + } +} + +export function needFinish(state: WritableState) { + return (state.ending && + state.constructed && + state.length === 0 && + !state.errored && + state.buffered.length === 0 && + !state.finished && + !state.writing); +} + +export function nop() {} + +export function resetBuffer(state: WritableState) { + state.buffered = []; + state.bufferedIndex = 0; + state.allBuffers = true; + state.allNoop = true; +} + +function onwriteError( + stream: Writable, + state: WritableState, + er: Error, + cb: (error: Error) => void, +) { + --state.pendingcb; + + cb(er); + errorBuffer(state); + errorOrDestroy(stream, er); +} + +export function onwrite(stream: Writable, er?: Error | null) { + const state = stream._writableState; + const sync = state.sync; + const cb = state.writecb; + + if (typeof cb !== "function") { + errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); + return; + } + + state.writing = false; + state.writecb = null; + state.length -= state.writelen; + state.writelen = 0; + + if (er) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + er.stack; + + if (!state.errored) { + state.errored = er; + } + + if (sync) { + queueMicrotask(() => onwriteError(stream, state, er, cb)); + } else { + onwriteError(stream, state, er, cb); + } + } else { + if (state.buffered.length > state.bufferedIndex) { + clearBuffer(stream, state); + } + + if (sync) { + if ( + state.afterWriteTickInfo !== null && + state.afterWriteTickInfo.cb === cb + ) { + state.afterWriteTickInfo.count++; + } else { + state.afterWriteTickInfo = { + count: 1, + cb: (cb as (error?: Error) => void), + stream, + state, + }; + queueMicrotask(() => + afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) + ); + } + } else { + afterWrite(stream, state, 1, cb as (error?: Error) => void); + } + } +} + +export function prefinish(stream: Writable, state: WritableState) { + if (!state.prefinished && !state.finalCalled) { + if (typeof stream._final === "function" && !state.destroyed) { + state.finalCalled = true; + + state.sync = true; + state.pendingcb++; + stream._final((err) => { + state.pendingcb--; + if (err) { + for (const callback of state[kOnFinished].splice(0)) { + callback(err); + } + errorOrDestroy(stream, err, state.sync); + } else if (needFinish(state)) { + state.prefinished = true; + stream.emit("prefinish"); + state.pendingcb++; + queueMicrotask(() => finish(stream, state)); + } + }); + state.sync = false; + } else { + state.prefinished = true; + stream.emit("prefinish"); + } + } +} + +export function writeOrBuffer( + stream: Duplex | Writable, + state: WritableState, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + callback: (error: Error) => void, +) { + const len = state.objectMode ? 1 : chunk.length; + + state.length += len; + + if (state.writing || state.corked || state.errored || !state.constructed) { + state.buffered.push({ chunk, encoding, callback }); + if (state.allBuffers && encoding !== "buffer") { + state.allBuffers = false; + } + if (state.allNoop && callback !== nop) { + state.allNoop = false; + } + } else { + state.writelen = len; + state.writecb = callback; + state.writing = true; + state.sync = true; + stream._write(chunk, encoding, state.onwrite); + state.sync = false; + } + + const ret = state.length < state.highWaterMark; + + if (!ret) { + state.needDrain = true; + } + + return ret && !state.errored && !state.destroyed; +} diff --git a/std/node/_stream/writable_test.ts b/std/node/_stream/writable_test.ts index d0650c49e..d6133b65f 100644 --- a/std/node/_stream/writable_test.ts +++ b/std/node/_stream/writable_test.ts @@ -1,6 +1,6 @@ // Copyright Node.js contributors. All rights reserved. MIT License. import { Buffer } from "../buffer.ts"; -import finished from "./end-of-stream.ts"; +import finished from "./end_of_stream.ts"; import Writable from "../_stream/writable.ts"; import { deferred } from "../../async/mod.ts"; import { diff --git a/std/node/_string_decoder.ts b/std/node/_string_decoder.ts index ce7c19538..623070f58 100644 --- a/std/node/_string_decoder.ts +++ b/std/node/_string_decoder.ts @@ -172,7 +172,13 @@ function utf8End(this: Utf8Decoder, buf?: Buffer): string { return r; } -function utf8Write(this: Utf8Decoder | Base64Decoder, buf: Buffer): string { +function utf8Write( + this: Utf8Decoder | Base64Decoder, + buf: Buffer | string, +): string { + if (typeof buf === "string") { + return buf; + } if (buf.length === 0) return ""; let r; let i; @@ -210,7 +216,13 @@ function base64End(this: Base64Decoder, buf?: Buffer): string { return r; } -function simpleWrite(this: StringDecoderBase, buf: Buffer): string { +function simpleWrite( + this: StringDecoderBase, + buf: Buffer | string, +): string { + if (typeof buf === "string") { + return buf; + } return buf.toString(this.encoding); } diff --git a/std/node/_utils.ts b/std/node/_utils.ts index cb91fac27..b25bdf34e 100644 --- a/std/node/_utils.ts +++ b/std/node/_utils.ts @@ -1,4 +1,21 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { deferred } from "../async/mod.ts"; +import { fail } from "../testing/asserts.ts"; + +export type BinaryEncodings = "binary"; + +export type TextEncodings = + | "ascii" + | "utf8" + | "utf-8" + | "utf16le" + | "ucs2" + | "ucs-2" + | "base64" + | "latin1" + | "hex"; + +export type Encodings = BinaryEncodings | TextEncodings; export function notImplemented(msg?: string): never { const message = msg ? `Not implemented: ${msg}` : "Not implemented"; @@ -51,13 +68,15 @@ export function spliceOne(list: string[], index: number): void { // Return undefined if there is no match. // Move the "slow cases" to a separate function to make sure this function gets // inlined properly. That prioritizes the common case. -export function normalizeEncoding(enc: string | null): string | undefined { +export function normalizeEncoding( + enc: string | null, +): TextEncodings | undefined { if (enc == null || enc === "utf8" || enc === "utf-8") return "utf8"; return slowCases(enc); } // https://github.com/nodejs/node/blob/ba684805b6c0eded76e5cd89ee00328ac7a59365/lib/internal/util.js#L130 -function slowCases(enc: string): string | undefined { +function slowCases(enc: string): TextEncodings | undefined { switch (enc.length) { case 4: if (enc === "UTF8") return "utf8"; @@ -135,11 +154,52 @@ export function validateIntegerRange( type OptionalSpread<T> = T extends undefined ? [] : [T]; -export function once(callback: (...args: OptionalSpread<undefined>) => void) { +export function once<T = undefined>( + callback: (...args: OptionalSpread<T>) => void, +) { let called = false; - return function (this: unknown, ...args: OptionalSpread<undefined>) { + return function (this: unknown, ...args: OptionalSpread<T>) { if (called) return; called = true; callback.apply(this, args); }; } + +/** + * @param {number} [expectedExecutions = 1] + * @param {number} [timeout = 1000] Milliseconds to wait before the promise is forcefully exited +*/ +export function mustCall<T extends unknown[]>( + fn: ((...args: T) => void) = () => {}, + expectedExecutions = 1, + timeout = 1000, +): [Promise<void>, (...args: T) => void] { + if (expectedExecutions < 1) { + throw new Error("Expected executions can't be lower than 1"); + } + let timesExecuted = 0; + const completed = deferred(); + + const abort = setTimeout(() => completed.reject(), timeout); + + function callback(this: unknown, ...args: T) { + timesExecuted++; + if (timesExecuted === expectedExecutions) { + completed.resolve(); + } + fn.apply(this, args); + } + + const result = completed + .then(() => clearTimeout(abort)) + .catch(() => + fail( + `Async operation not completed: Expected ${expectedExecutions}, executed ${timesExecuted}`, + ) + ); + + return [ + result, + callback, + ]; +} diff --git a/std/node/module.ts b/std/node/module.ts index 597da09db..f10fc2ca7 100644 --- a/std/node/module.ts +++ b/std/node/module.ts @@ -26,9 +26,10 @@ import * as nodeEvents from "./events.ts"; import * as nodeFS from "./fs.ts"; import * as nodeOs from "./os.ts"; import * as nodePath from "./path.ts"; -import * as nodeTimers from "./timers.ts"; import * as nodeQueryString from "./querystring.ts"; +import * as nodeStream from "./stream.ts"; import * as nodeStringDecoder from "./string_decoder.ts"; +import * as nodeTimers from "./timers.ts"; import * as nodeUtil from "./util.ts"; import * as path from "../path/mod.ts"; @@ -604,6 +605,10 @@ nativeModulePolyfill.set( createNativeModule("querystring", nodeQueryString), ); nativeModulePolyfill.set( + "stream", + createNativeModule("string_decoder", nodeStream), +); +nativeModulePolyfill.set( "string_decoder", createNativeModule("string_decoder", nodeStringDecoder), ); diff --git a/std/node/stream.ts b/std/node/stream.ts new file mode 100644 index 000000000..230c5a9d6 --- /dev/null +++ b/std/node/stream.ts @@ -0,0 +1,53 @@ +// Copyright Node.js contributors. All rights reserved. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to +// deal in the Software without restriction, including without limitation the +// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +// sell copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +// IN THE SOFTWARE. +import Duplex from "./_stream/duplex.ts"; +import eos from "./_stream/end_of_stream.ts"; +import PassThrough from "./_stream/passthrough.ts"; +import pipeline from "./_stream/pipeline.ts"; +import * as promises from "./_stream/promises.ts"; +import Readable from "./_stream/readable.ts"; +import Stream from "./_stream/stream.ts"; +import Transform from "./_stream/transform.ts"; +import Writable from "./_stream/writable.ts"; + +const exports = { + Duplex, + finished: eos, + PassThrough, + pipeline, + promises, + Readable, + Stream, + Transform, + Writable, +}; + +export default exports; +export { + Duplex, + eos as finished, + PassThrough, + pipeline, + promises, + Readable, + Stream, + Transform, + Writable, +}; diff --git a/std/node/stream_test.ts b/std/node/stream_test.ts new file mode 100644 index 000000000..f8d4ecfa5 --- /dev/null +++ b/std/node/stream_test.ts @@ -0,0 +1,133 @@ +// Copyright Node.js contributors. All rights reserved. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to +// deal in the Software without restriction, including without limitation the +// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +// sell copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +// IN THE SOFTWARE. +import { Readable, Transform, Writable } from "./stream.ts"; +import { Buffer } from "./buffer.ts"; +import { deferred } from "../async/mod.ts"; +import { assert, assertEquals } from "../testing/asserts.ts"; +import { mustCall } from "./_utils.ts"; + +Deno.test("Readable and Writable stream backpressure test", async () => { + let pushes = 0; + const total = 65500 + 40 * 1024; + + let rsExecuted = 0; + const rsExecutedExpected = 11; + const rsExpectedExecutions = deferred(); + + let wsExecuted = 0; + const wsExecutedExpected = 410; + const wsExpectedExecutions = deferred(); + + const rs = new Readable({ + read: function () { + rsExecuted++; + if (rsExecuted == rsExecutedExpected) { + rsExpectedExecutions.resolve(); + } + + if (pushes++ === 10) { + this.push(null); + return; + } + + assert(this._readableState.length <= total); + + this.push(Buffer.alloc(65500)); + for (let i = 0; i < 40; i++) { + this.push(Buffer.alloc(1024)); + } + }, + }); + + const ws = new Writable({ + write: function (_data, _enc, cb) { + wsExecuted++; + if (wsExecuted == wsExecutedExpected) { + wsExpectedExecutions.resolve(); + } + cb(); + }, + }); + + rs.pipe(ws); + + const rsTimeout = setTimeout(() => rsExpectedExecutions.reject(), 1000); + const wsTimeout = setTimeout(() => wsExpectedExecutions.reject(), 1000); + await rsExpectedExecutions; + await wsExpectedExecutions; + clearTimeout(rsTimeout); + clearTimeout(wsTimeout); + assertEquals(rsExecuted, rsExecutedExpected); + assertEquals(wsExecuted, wsExecutedExpected); +}); + +Deno.test("Readable can be piped through Transform", async () => { + const [readExecution, readCb] = mustCall(function (this: Readable) { + this.push("content"); + this.push(null); + }); + + const r = new Readable({ + read: readCb, + }); + + const [transformExecution, transformCb] = mustCall( + function ( + this: Transform, + chunk: unknown, + _e, + callback: (error?: Error | null) => void, + ) { + this.push(chunk); + callback(); + }, + ); + + const [flushExecution, flushCb] = mustCall( + function (this: Transform, callback: (error?: Error | null) => void) { + callback(); + }, + ); + + const t = new Transform({ + transform: transformCb, + flush: flushCb, + }); + + r.pipe(t); + + const [readableExecution, readableCb] = mustCall(function () { + while (true) { + const chunk = t.read(); + if (!chunk) { + break; + } + + assertEquals(chunk.toString(), "content"); + } + }, 2); + + t.on("readable", readableCb); + + await readExecution; + await transformExecution; + await flushExecution; + await readableExecution; +}); |