diff options
Diffstat (limited to 'std/node/_stream')
25 files changed, 0 insertions, 6779 deletions
diff --git a/std/node/_stream/async_iterator.ts b/std/node/_stream/async_iterator.ts deleted file mode 100644 index 5369ef39c..000000000 --- a/std/node/_stream/async_iterator.ts +++ /dev/null @@ -1,243 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import type { Buffer } from "../buffer.ts"; -import finished from "./end_of_stream.ts"; -import Readable from "./readable.ts"; -import type Stream from "./stream.ts"; -import { destroyer } from "./destroy.ts"; - -const kLastResolve = Symbol("lastResolve"); -const kLastReject = Symbol("lastReject"); -const kError = Symbol("error"); -const kEnded = Symbol("ended"); -const kLastPromise = Symbol("lastPromise"); -const kHandlePromise = Symbol("handlePromise"); -const kStream = Symbol("stream"); - -// TODO(Soremwar) -// Add Duplex streams -type IterableStreams = Stream | Readable; - -type IterableItem = Buffer | string | Uint8Array | undefined; -type ReadableIteratorResult = IteratorResult<IterableItem>; - -function initIteratorSymbols( - o: ReadableStreamAsyncIterator, - symbols: symbol[], -) { - const properties: PropertyDescriptorMap = {}; - for (const sym in symbols) { - properties[sym] = { - configurable: false, - enumerable: false, - writable: true, - }; - } - Object.defineProperties(o, properties); -} - -function createIterResult( - value: IterableItem, - done: boolean, -): ReadableIteratorResult { - return { value, done }; -} - -function readAndResolve(iter: ReadableStreamAsyncIterator) { - const resolve = iter[kLastResolve]; - if (resolve !== null) { - const data = iter[kStream].read(); - if (data !== null) { - iter[kLastPromise] = null; - iter[kLastResolve] = null; - iter[kLastReject] = null; - resolve(createIterResult(data, false)); - } - } -} - -function onReadable(iter: ReadableStreamAsyncIterator) { - queueMicrotask(() => readAndResolve(iter)); -} - -function wrapForNext( - lastPromise: Promise<ReadableIteratorResult>, - iter: ReadableStreamAsyncIterator, -) { - return ( - resolve: (value: ReadableIteratorResult) => void, - reject: (error: Error) => void, - ) => { - lastPromise.then(() => { - if (iter[kEnded]) { - resolve(createIterResult(undefined, true)); - return; - } - - iter[kHandlePromise](resolve, reject); - }, reject); - }; -} - -function finish(self: ReadableStreamAsyncIterator, err?: Error) { - return new Promise( - ( - resolve: (result: ReadableIteratorResult) => void, - reject: (error: Error) => void, - ) => { - const stream = self[kStream]; - - finished(stream, (err) => { - if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") { - reject(err); - } else { - resolve(createIterResult(undefined, true)); - } - }); - destroyer(stream, err); - }, - ); -} - -const AsyncIteratorPrototype = Object.getPrototypeOf( - Object.getPrototypeOf(async function* () {}).prototype, -); - -export class ReadableStreamAsyncIterator - implements AsyncIterableIterator<IterableItem> { - [kEnded]: boolean; - [kError]: Error | null = null; - [kHandlePromise] = ( - resolve: (value: ReadableIteratorResult) => void, - reject: (value: Error) => void, - ) => { - const data = this[kStream].read(); - if (data) { - this[kLastPromise] = null; - this[kLastResolve] = null; - this[kLastReject] = null; - resolve(createIterResult(data, false)); - } else { - this[kLastResolve] = resolve; - this[kLastReject] = reject; - } - }; - [kLastPromise]: null | Promise<ReadableIteratorResult>; - [kLastReject]: null | ((value: Error) => void) = null; - [kLastResolve]: null | ((value: ReadableIteratorResult) => void) = null; - [kStream]: Readable; - [Symbol.asyncIterator] = AsyncIteratorPrototype[Symbol.asyncIterator]; - - constructor(stream: Readable) { - this[kEnded] = stream.readableEnded || stream._readableState.endEmitted; - this[kStream] = stream; - initIteratorSymbols(this, [ - kEnded, - kError, - kHandlePromise, - kLastPromise, - kLastReject, - kLastResolve, - kStream, - ]); - } - - get stream() { - return this[kStream]; - } - - next(): Promise<ReadableIteratorResult> { - const error = this[kError]; - if (error !== null) { - return Promise.reject(error); - } - - if (this[kEnded]) { - return Promise.resolve(createIterResult(undefined, true)); - } - - if (this[kStream].destroyed) { - return new Promise((resolve, reject) => { - if (this[kError]) { - reject(this[kError]); - } else if (this[kEnded]) { - resolve(createIterResult(undefined, true)); - } else { - finished(this[kStream], (err) => { - if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") { - reject(err); - } else { - resolve(createIterResult(undefined, true)); - } - }); - } - }); - } - - const lastPromise = this[kLastPromise]; - let promise; - - if (lastPromise) { - promise = new Promise(wrapForNext(lastPromise, this)); - } else { - const data = this[kStream].read(); - if (data !== null) { - return Promise.resolve(createIterResult(data, false)); - } - - promise = new Promise(this[kHandlePromise]); - } - - this[kLastPromise] = promise; - - return promise; - } - - return(): Promise<ReadableIteratorResult> { - return finish(this); - } - - throw(err: Error): Promise<ReadableIteratorResult> { - return finish(this, err); - } -} - -const createReadableStreamAsyncIterator = (stream: IterableStreams) => { - // deno-lint-ignore no-explicit-any - if (typeof (stream as any).read !== "function") { - const src = stream; - stream = new Readable({ objectMode: true }).wrap(src); - finished(stream, (err) => destroyer(src, err)); - } - - const iterator = new ReadableStreamAsyncIterator(stream as Readable); - iterator[kLastPromise] = null; - - finished(stream, { writable: false }, (err) => { - if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") { - const reject = iterator[kLastReject]; - if (reject !== null) { - iterator[kLastPromise] = null; - iterator[kLastResolve] = null; - iterator[kLastReject] = null; - reject(err); - } - iterator[kError] = err; - return; - } - - const resolve = iterator[kLastResolve]; - if (resolve !== null) { - iterator[kLastPromise] = null; - iterator[kLastResolve] = null; - iterator[kLastReject] = null; - resolve(createIterResult(undefined, true)); - } - iterator[kEnded] = true; - }); - - stream.on("readable", onReadable.bind(null, iterator)); - - return iterator; -}; - -export default createReadableStreamAsyncIterator; diff --git a/std/node/_stream/async_iterator_test.ts b/std/node/_stream/async_iterator_test.ts deleted file mode 100644 index 17698e0fd..000000000 --- a/std/node/_stream/async_iterator_test.ts +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import Readable from "./readable.ts"; -import Stream from "./stream.ts"; -import toReadableAsyncIterator from "./async_iterator.ts"; -import { deferred } from "../../async/mod.ts"; -import { assertEquals, assertThrowsAsync } from "../../testing/asserts.ts"; - -Deno.test("Stream to async iterator", async () => { - let destroyExecuted = 0; - const destroyExecutedExpected = 1; - const destroyExpectedExecutions = deferred(); - - class AsyncIteratorStream extends Stream { - constructor() { - super(); - } - - destroy() { - destroyExecuted++; - if (destroyExecuted == destroyExecutedExpected) { - destroyExpectedExecutions.resolve(); - } - } - - [Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator]; - } - - const stream = new AsyncIteratorStream(); - - queueMicrotask(() => { - stream.emit("data", "hello"); - stream.emit("data", "world"); - stream.emit("end"); - }); - - let res = ""; - - for await (const d of stream) { - res += d; - } - assertEquals(res, "helloworld"); - - const destroyTimeout = setTimeout( - () => destroyExpectedExecutions.reject(), - 1000, - ); - await destroyExpectedExecutions; - clearTimeout(destroyTimeout); - assertEquals(destroyExecuted, destroyExecutedExpected); -}); - -Deno.test("Stream to async iterator throws on 'error' emitted", async () => { - let closeExecuted = 0; - const closeExecutedExpected = 1; - const closeExpectedExecutions = deferred(); - - let errorExecuted = 0; - const errorExecutedExpected = 1; - const errorExpectedExecutions = deferred(); - - class StreamImplementation extends Stream { - close() { - closeExecuted++; - if (closeExecuted == closeExecutedExpected) { - closeExpectedExecutions.resolve(); - } - } - } - - const stream = new StreamImplementation(); - queueMicrotask(() => { - stream.emit("data", 0); - stream.emit("data", 1); - stream.emit("error", new Error("asd")); - }); - - toReadableAsyncIterator(stream) - .next() - .catch((err) => { - errorExecuted++; - if (errorExecuted == errorExecutedExpected) { - errorExpectedExecutions.resolve(); - } - assertEquals(err.message, "asd"); - }); - - const closeTimeout = setTimeout( - () => closeExpectedExecutions.reject(), - 1000, - ); - const errorTimeout = setTimeout( - () => errorExpectedExecutions.reject(), - 1000, - ); - await closeExpectedExecutions; - await errorExpectedExecutions; - clearTimeout(closeTimeout); - clearTimeout(errorTimeout); - assertEquals(closeExecuted, closeExecutedExpected); - assertEquals(errorExecuted, errorExecutedExpected); -}); - -Deno.test("Async iterator matches values of Readable", async () => { - const readable = new Readable({ - objectMode: true, - read() {}, - }); - readable.push(0); - readable.push(1); - readable.push(null); - - const iter = readable[Symbol.asyncIterator](); - - assertEquals( - await iter.next().then(({ value }) => value), - 0, - ); - for await (const d of iter) { - assertEquals(d, 1); - } -}); - -Deno.test("Async iterator throws on Readable destroyed sync", async () => { - const message = "kaboom from read"; - - const readable = new Readable({ - objectMode: true, - read() { - this.destroy(new Error(message)); - }, - }); - - await assertThrowsAsync( - async () => { - // deno-lint-ignore no-empty - for await (const k of readable) {} - }, - Error, - message, - ); -}); - -Deno.test("Async iterator throws on Readable destroyed async", async () => { - const message = "kaboom"; - const readable = new Readable({ - read() {}, - }); - const iterator = readable[Symbol.asyncIterator](); - - readable.destroy(new Error(message)); - - await assertThrowsAsync( - iterator.next.bind(iterator), - Error, - message, - ); -}); - -Deno.test("Async iterator finishes the iterator when Readable destroyed", async () => { - const readable = new Readable({ - read() {}, - }); - - readable.destroy(); - - const { done } = await readable[Symbol.asyncIterator]().next(); - assertEquals(done, true); -}); - -Deno.test("Async iterator finishes all item promises when Readable destroyed", async () => { - const r = new Readable({ - objectMode: true, - read() { - }, - }); - - const b = r[Symbol.asyncIterator](); - const c = b.next(); - const d = b.next(); - r.destroy(); - assertEquals(await c, { done: true, value: undefined }); - assertEquals(await d, { done: true, value: undefined }); -}); - -Deno.test("Async iterator: 'next' is triggered by Readable push", async () => { - const max = 42; - let readed = 0; - let received = 0; - const readable = new Readable({ - objectMode: true, - read() { - this.push("hello"); - if (++readed === max) { - this.push(null); - } - }, - }); - - for await (const k of readable) { - received++; - assertEquals(k, "hello"); - } - - assertEquals(readed, received); -}); - -Deno.test("Async iterator: 'close' called on forced iteration end", async () => { - let closeExecuted = 0; - const closeExecutedExpected = 1; - const closeExpectedExecutions = deferred(); - - class IndestructibleReadable extends Readable { - constructor() { - super({ - autoDestroy: false, - read() {}, - }); - } - - close() { - closeExecuted++; - if (closeExecuted == closeExecutedExpected) { - closeExpectedExecutions.resolve(); - } - readable.emit("close"); - } - - // deno-lint-ignore ban-ts-comment - //@ts-ignore - destroy = null; - } - - const readable = new IndestructibleReadable(); - readable.push("asd"); - readable.push("asd"); - - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const d of readable) { - break; - } - - const closeTimeout = setTimeout( - () => closeExpectedExecutions.reject(), - 1000, - ); - await closeExpectedExecutions; - clearTimeout(closeTimeout); - assertEquals(closeExecuted, closeExecutedExpected); -}); diff --git a/std/node/_stream/buffer_list.ts b/std/node/_stream/buffer_list.ts deleted file mode 100644 index fe1a693c0..000000000 --- a/std/node/_stream/buffer_list.ts +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; - -type BufferListItem = { - data: Buffer | string | Uint8Array; - next: BufferListItem | null; -}; - -export default class BufferList { - head: BufferListItem | null = null; - tail: BufferListItem | null = null; - length: number; - - constructor() { - this.head = null; - this.tail = null; - this.length = 0; - } - - push(v: Buffer | string | Uint8Array) { - const entry = { data: v, next: null }; - if (this.length > 0) { - (this.tail as BufferListItem).next = entry; - } else { - this.head = entry; - } - this.tail = entry; - ++this.length; - } - - unshift(v: Buffer | string | Uint8Array) { - const entry = { data: v, next: this.head }; - if (this.length === 0) { - this.tail = entry; - } - this.head = entry; - ++this.length; - } - - shift() { - if (this.length === 0) { - return; - } - const ret = (this.head as BufferListItem).data; - if (this.length === 1) { - this.head = this.tail = null; - } else { - this.head = (this.head as BufferListItem).next; - } - --this.length; - return ret; - } - - clear() { - this.head = this.tail = null; - this.length = 0; - } - - join(s: string) { - if (this.length === 0) { - return ""; - } - let p: BufferListItem | null = (this.head as BufferListItem); - let ret = "" + p.data; - p = p.next; - while (p) { - ret += s + p.data; - p = p.next; - } - return ret; - } - - concat(n: number) { - if (this.length === 0) { - return Buffer.alloc(0); - } - const ret = Buffer.allocUnsafe(n >>> 0); - let p = this.head; - let i = 0; - while (p) { - ret.set(p.data as Buffer, i); - i += p.data.length; - p = p.next; - } - return ret; - } - - // Consumes a specified amount of bytes or characters from the buffered data. - consume(n: number, hasStrings: boolean) { - const data = (this.head as BufferListItem).data; - if (n < data.length) { - // `slice` is the same for buffers and strings. - const slice = data.slice(0, n); - (this.head as BufferListItem).data = data.slice(n); - return slice; - } - if (n === data.length) { - // First chunk is a perfect match. - return this.shift(); - } - // Result spans more than one buffer. - return hasStrings ? this._getString(n) : this._getBuffer(n); - } - - first() { - return (this.head as BufferListItem).data; - } - - *[Symbol.iterator]() { - for (let p = this.head; p; p = p.next) { - yield p.data; - } - } - - // Consumes a specified amount of characters from the buffered data. - _getString(n: number) { - let ret = ""; - let p: BufferListItem | null = (this.head as BufferListItem); - let c = 0; - p = p.next as BufferListItem; - do { - const str = p.data; - if (n > str.length) { - ret += str; - n -= str.length; - } else { - if (n === str.length) { - ret += str; - ++c; - if (p.next) { - this.head = p.next; - } else { - this.head = this.tail = null; - } - } else { - ret += str.slice(0, n); - this.head = p; - p.data = str.slice(n); - } - break; - } - ++c; - p = p.next; - } while (p); - this.length -= c; - return ret; - } - - // Consumes a specified amount of bytes from the buffered data. - _getBuffer(n: number) { - const ret = Buffer.allocUnsafe(n); - const retLen = n; - let p: BufferListItem | null = (this.head as BufferListItem); - let c = 0; - p = p.next as BufferListItem; - do { - const buf = p.data as Buffer; - if (n > buf.length) { - ret.set(buf, retLen - n); - n -= buf.length; - } else { - if (n === buf.length) { - ret.set(buf, retLen - n); - ++c; - if (p.next) { - this.head = p.next; - } else { - this.head = this.tail = null; - } - } else { - ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); - this.head = p; - p.data = buf.slice(n); - } - break; - } - ++c; - p = p.next; - } while (p); - this.length -= c; - return ret; - } -} diff --git a/std/node/_stream/destroy.ts b/std/node/_stream/destroy.ts deleted file mode 100644 index d13e12de2..000000000 --- a/std/node/_stream/destroy.ts +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import type Duplex from "./duplex.ts"; -import type Readable from "./readable.ts"; -import type Stream from "./stream.ts"; -import type Writable from "./writable.ts"; - -//This whole module acts as a 'normalizer' -//Idea behind it is you can pass any kind of streams and functions will execute anyways - -//TODO(Soremwar) -//Should be any implementation of stream -//This is a guard to check executed methods exist inside the implementation -type StreamImplementations = Duplex | Readable | Writable; - -// TODO(Soremwar) -// Bring back once requests are implemented -// function isRequest(stream: any) { -// return stream && stream.setHeader && typeof stream.abort === "function"; -// } - -export function destroyer(stream: Stream, err?: Error | null) { - // TODO(Soremwar) - // Bring back once requests are implemented - // if (isRequest(stream)) return stream.abort(); - // if (isRequest(stream.req)) return stream.req.abort(); - if ( - typeof (stream as StreamImplementations).destroy === "function" - ) { - return (stream as StreamImplementations).destroy(err); - } - // A test of async iterator mocks an upcoming implementation of stream - // his is casted to any in the meanwhile - // deno-lint-ignore no-explicit-any - if (typeof (stream as any).close === "function") { - // deno-lint-ignore no-explicit-any - return (stream as any).close(); - } -} diff --git a/std/node/_stream/duplex.ts b/std/node/_stream/duplex.ts deleted file mode 100644 index b5c429f0a..000000000 --- a/std/node/_stream/duplex.ts +++ /dev/null @@ -1,682 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { captureRejectionSymbol } from "../events.ts"; -import Readable, { ReadableState } from "./readable.ts"; -import Stream from "./stream.ts"; -import Writable, { WritableState } from "./writable.ts"; -import { Buffer } from "../buffer.ts"; -import { - ERR_STREAM_ALREADY_FINISHED, - ERR_STREAM_DESTROYED, - ERR_UNKNOWN_ENCODING, -} from "../_errors.ts"; -import type { Encodings } from "../_utils.ts"; -import createReadableStreamAsyncIterator from "./async_iterator.ts"; -import type { ReadableStreamAsyncIterator } from "./async_iterator.ts"; -import { - _destroy, - computeNewHighWaterMark, - emitReadable, - fromList, - howMuchToRead, - nReadingNextTick, - updateReadableListening, -} from "./readable_internal.ts"; -import { kOnFinished, writeV } from "./writable_internal.ts"; -import { - endDuplex, - finishMaybe, - onwrite, - readableAddChunk, -} from "./duplex_internal.ts"; -export { errorOrDestroy } from "./duplex_internal.ts"; - -export interface DuplexOptions { - allowHalfOpen?: boolean; - autoDestroy?: boolean; - decodeStrings?: boolean; - defaultEncoding?: Encodings; - destroy?( - this: Duplex, - error: Error | null, - callback: (error: Error | null) => void, - ): void; - emitClose?: boolean; - encoding?: Encodings; - final?(this: Duplex, callback: (error?: Error | null) => void): void; - highWaterMark?: number; - objectMode?: boolean; - read?(this: Duplex, size: number): void; - readable?: boolean; - readableHighWaterMark?: number; - readableObjectMode?: boolean; - writable?: boolean; - writableCorked?: number; - writableHighWaterMark?: number; - writableObjectMode?: boolean; - write?( - this: Duplex, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: Encodings, - callback: (error?: Error | null) => void, - ): void; - writev?: writeV; -} - -interface Duplex extends Readable, Writable {} - -/** - * A duplex is an implementation of a stream that has both Readable and Writable - * attributes and capabilities - */ -class Duplex extends Stream { - allowHalfOpen = true; - _final?: ( - callback: (error?: Error | null | undefined) => void, - ) => void; - _readableState: ReadableState; - _writableState: WritableState; - _writev?: writeV | null; - - constructor(options?: DuplexOptions) { - super(); - - if (options) { - if (options.allowHalfOpen === false) { - this.allowHalfOpen = false; - } - if (typeof options.destroy === "function") { - this._destroy = options.destroy; - } - if (typeof options.final === "function") { - this._final = options.final; - } - if (typeof options.read === "function") { - this._read = options.read; - } - if (options.readable === false) { - this.readable = false; - } - if (options.writable === false) { - this.writable = false; - } - if (typeof options.write === "function") { - this._write = options.write; - } - if (typeof options.writev === "function") { - this._writev = options.writev; - } - } - - const readableOptions = { - autoDestroy: options?.autoDestroy, - defaultEncoding: options?.defaultEncoding, - destroy: options?.destroy as unknown as ( - this: Readable, - error: Error | null, - callback: (error: Error | null) => void, - ) => void, - emitClose: options?.emitClose, - encoding: options?.encoding, - highWaterMark: options?.highWaterMark ?? options?.readableHighWaterMark, - objectMode: options?.objectMode ?? options?.readableObjectMode, - read: options?.read as unknown as (this: Readable) => void, - }; - - const writableOptions = { - autoDestroy: options?.autoDestroy, - decodeStrings: options?.decodeStrings, - defaultEncoding: options?.defaultEncoding, - destroy: options?.destroy as unknown as ( - this: Writable, - error: Error | null, - callback: (error: Error | null) => void, - ) => void, - emitClose: options?.emitClose, - final: options?.final as unknown as ( - this: Writable, - callback: (error?: Error | null) => void, - ) => void, - highWaterMark: options?.highWaterMark ?? options?.writableHighWaterMark, - objectMode: options?.objectMode ?? options?.writableObjectMode, - write: options?.write as unknown as ( - this: Writable, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - callback: (error?: Error | null) => void, - ) => void, - writev: options?.writev as unknown as ( - this: Writable, - // deno-lint-ignore no-explicit-any - chunks: Array<{ chunk: any; encoding: Encodings }>, - callback: (error?: Error | null) => void, - ) => void, - }; - - this._readableState = new ReadableState(readableOptions); - this._writableState = new WritableState( - writableOptions, - this as unknown as Writable, - ); - //Very important to override onwrite here, duplex implementation adds a check - //on the readable side - this._writableState.onwrite = onwrite.bind(undefined, this); - } - - [captureRejectionSymbol](err?: Error) { - this.destroy(err); - } - - [Symbol.asyncIterator](): ReadableStreamAsyncIterator { - return createReadableStreamAsyncIterator(this); - } - - _destroy( - error: Error | null, - callback: (error?: Error | null) => void, - ): void { - callback(error); - } - - _read = Readable.prototype._read; - - _undestroy = Readable.prototype._undestroy; - - destroy(err?: Error | null, cb?: (error?: Error | null) => void) { - const r = this._readableState; - const w = this._writableState; - - if (w.destroyed || r.destroyed) { - if (typeof cb === "function") { - cb(); - } - - return this; - } - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - if (!r.errored) { - r.errored = err; - } - } - - w.destroyed = true; - r.destroyed = true; - - this._destroy(err || null, (err) => { - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - if (!r.errored) { - r.errored = err; - } - } - - w.closed = true; - r.closed = true; - - if (typeof cb === "function") { - cb(err); - } - - if (err) { - queueMicrotask(() => { - const r = this._readableState; - const w = this._writableState; - - if (!w.errorEmitted && !r.errorEmitted) { - w.errorEmitted = true; - r.errorEmitted = true; - - this.emit("error", err); - } - - r.closeEmitted = true; - - if (w.emitClose || r.emitClose) { - this.emit("close"); - } - }); - } else { - queueMicrotask(() => { - const r = this._readableState; - const w = this._writableState; - - r.closeEmitted = true; - - if (w.emitClose || r.emitClose) { - this.emit("close"); - } - }); - } - }); - - return this; - } - - isPaused = Readable.prototype.isPaused; - - off = this.removeListener; - - on( - event: "close" | "end" | "pause" | "readable" | "resume", - listener: () => void, - ): this; - // deno-lint-ignore no-explicit-any - on(event: "data", listener: (chunk: any) => void): this; - on(event: "error", listener: (err: Error) => void): this; - // deno-lint-ignore no-explicit-any - on(event: string | symbol, listener: (...args: any[]) => void): this; - on( - ev: string | symbol, - fn: - | (() => void) - // deno-lint-ignore no-explicit-any - | ((chunk: any) => void) - | ((err: Error) => void) - // deno-lint-ignore no-explicit-any - | ((...args: any[]) => void), - ) { - const res = super.on.call(this, ev, fn); - const state = this._readableState; - - if (ev === "data") { - state.readableListening = this.listenerCount("readable") > 0; - - if (state.flowing !== false) { - this.resume(); - } - } else if (ev === "readable") { - if (!state.endEmitted && !state.readableListening) { - state.readableListening = state.needReadable = true; - state.flowing = false; - state.emittedReadable = false; - if (state.length) { - emitReadable(this); - } else if (!state.reading) { - queueMicrotask(() => nReadingNextTick(this)); - } - } - } - - return res; - } - - pause = Readable.prototype.pause as () => this; - - pipe = Readable.prototype.pipe; - - // deno-lint-ignore no-explicit-any - push(chunk: any, encoding?: Encodings): boolean { - return readableAddChunk(this, chunk, encoding, false); - } - - /** You can override either this method, or the async `_read` method */ - read(n?: number) { - // Same as parseInt(undefined, 10), however V8 7.3 performance regressed - // in this scenario, so we are doing it manually. - if (n === undefined) { - n = NaN; - } - const state = this._readableState; - const nOrig = n; - - if (n > state.highWaterMark) { - state.highWaterMark = computeNewHighWaterMark(n); - } - - if (n !== 0) { - state.emittedReadable = false; - } - - if ( - n === 0 && - state.needReadable && - ((state.highWaterMark !== 0 - ? state.length >= state.highWaterMark - : state.length > 0) || - state.ended) - ) { - if (state.length === 0 && state.ended) { - endDuplex(this); - } else { - emitReadable(this); - } - return null; - } - - n = howMuchToRead(n, state); - - if (n === 0 && state.ended) { - if (state.length === 0) { - endDuplex(this); - } - return null; - } - - let doRead = state.needReadable; - if ( - state.length === 0 || state.length - (n as number) < state.highWaterMark - ) { - doRead = true; - } - - if ( - state.ended || state.reading || state.destroyed || state.errored || - !state.constructed - ) { - doRead = false; - } else if (doRead) { - state.reading = true; - state.sync = true; - if (state.length === 0) { - state.needReadable = true; - } - this._read(); - state.sync = false; - if (!state.reading) { - n = howMuchToRead(nOrig, state); - } - } - - let ret; - if ((n as number) > 0) { - ret = fromList((n as number), state); - } else { - ret = null; - } - - if (ret === null) { - state.needReadable = state.length <= state.highWaterMark; - n = 0; - } else { - state.length -= n as number; - if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Writable>).clear(); - } else { - state.awaitDrainWriters = null; - } - } - - if (state.length === 0) { - if (!state.ended) { - state.needReadable = true; - } - - if (nOrig !== n && state.ended) { - endDuplex(this); - } - } - - if (ret !== null) { - this.emit("data", ret); - } - - return ret; - } - - removeAllListeners( - ev: - | "close" - | "data" - | "end" - | "error" - | "pause" - | "readable" - | "resume" - | symbol - | undefined, - ) { - const res = super.removeAllListeners(ev); - - if (ev === "readable" || ev === undefined) { - queueMicrotask(() => updateReadableListening(this)); - } - - return res; - } - - removeListener( - event: "close" | "end" | "pause" | "readable" | "resume", - listener: () => void, - ): this; - // deno-lint-ignore no-explicit-any - removeListener(event: "data", listener: (chunk: any) => void): this; - removeListener(event: "error", listener: (err: Error) => void): this; - removeListener( - event: string | symbol, - // deno-lint-ignore no-explicit-any - listener: (...args: any[]) => void, - ): this; - removeListener( - ev: string | symbol, - fn: - | (() => void) - // deno-lint-ignore no-explicit-any - | ((chunk: any) => void) - | ((err: Error) => void) - // deno-lint-ignore no-explicit-any - | ((...args: any[]) => void), - ) { - const res = super.removeListener.call(this, ev, fn); - - if (ev === "readable") { - queueMicrotask(() => updateReadableListening(this)); - } - - return res; - } - - resume = Readable.prototype.resume as () => this; - - setEncoding = Readable.prototype.setEncoding as (enc: string) => this; - - // deno-lint-ignore no-explicit-any - unshift(chunk: any, encoding?: Encodings): boolean { - return readableAddChunk(this, chunk, encoding, true); - } - - unpipe = Readable.prototype.unpipe as (dest?: Writable | undefined) => this; - - wrap = Readable.prototype.wrap as (stream: Stream) => this; - - get readable(): boolean { - return this._readableState?.readable && - !this._readableState?.destroyed && - !this._readableState?.errorEmitted && - !this._readableState?.endEmitted; - } - set readable(val: boolean) { - if (this._readableState) { - this._readableState.readable = val; - } - } - - get readableHighWaterMark(): number { - return this._readableState.highWaterMark; - } - - get readableBuffer() { - return this._readableState && this._readableState.buffer; - } - - get readableFlowing(): boolean | null { - return this._readableState.flowing; - } - - set readableFlowing(state: boolean | null) { - if (this._readableState) { - this._readableState.flowing = state; - } - } - - get readableLength() { - return this._readableState.length; - } - - get readableObjectMode() { - return this._readableState ? this._readableState.objectMode : false; - } - - get readableEncoding() { - return this._readableState ? this._readableState.encoding : null; - } - - get readableEnded() { - return this._readableState ? this._readableState.endEmitted : false; - } - - _write = Writable.prototype._write; - - write = Writable.prototype.write; - - cork = Writable.prototype.cork; - - uncork = Writable.prototype.uncork; - - setDefaultEncoding(encoding: string) { - // node::ParseEncoding() requires lower case. - if (typeof encoding === "string") { - encoding = encoding.toLowerCase(); - } - if (!Buffer.isEncoding(encoding)) { - throw new ERR_UNKNOWN_ENCODING(encoding); - } - this._writableState.defaultEncoding = encoding as Encodings; - return this; - } - - end(cb?: () => void): void; - // deno-lint-ignore no-explicit-any - end(chunk: any, cb?: () => void): void; - // deno-lint-ignore no-explicit-any - end(chunk: any, encoding: Encodings, cb?: () => void): void; - - end( - // deno-lint-ignore no-explicit-any - x?: any | (() => void), - y?: Encodings | (() => void), - z?: () => void, - ) { - const state = this._writableState; - // deno-lint-ignore no-explicit-any - let chunk: any | null; - let encoding: Encodings | null; - let cb: undefined | ((error?: Error) => void); - - if (typeof x === "function") { - chunk = null; - encoding = null; - cb = x; - } else if (typeof y === "function") { - chunk = x; - encoding = null; - cb = y; - } else { - chunk = x; - encoding = y as Encodings; - cb = z; - } - - if (chunk !== null && chunk !== undefined) { - this.write(chunk, encoding); - } - - if (state.corked) { - state.corked = 1; - this.uncork(); - } - - let err: Error | undefined; - if (!state.errored && !state.ending) { - state.ending = true; - finishMaybe(this, state, true); - state.ended = true; - } else if (state.finished) { - err = new ERR_STREAM_ALREADY_FINISHED("end"); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED("end"); - } - - if (typeof cb === "function") { - if (err || state.finished) { - queueMicrotask(() => { - (cb as (error?: Error | undefined) => void)(err); - }); - } else { - state[kOnFinished].push(cb); - } - } - - return this; - } - - get destroyed() { - if ( - this._readableState === undefined || - this._writableState === undefined - ) { - return false; - } - return this._readableState.destroyed && this._writableState.destroyed; - } - - set destroyed(value: boolean) { - if (this._readableState && this._writableState) { - this._readableState.destroyed = value; - this._writableState.destroyed = value; - } - } - - get writable() { - const w = this._writableState; - return !w.destroyed && !w.errored && !w.ending && !w.ended; - } - - set writable(val) { - if (this._writableState) { - this._writableState.writable = !!val; - } - } - - get writableFinished() { - return this._writableState ? this._writableState.finished : false; - } - - get writableObjectMode() { - return this._writableState ? this._writableState.objectMode : false; - } - - get writableBuffer() { - return this._writableState && this._writableState.getBuffer(); - } - - get writableEnded() { - return this._writableState ? this._writableState.ending : false; - } - - get writableHighWaterMark() { - return this._writableState && this._writableState.highWaterMark; - } - - get writableCorked() { - return this._writableState ? this._writableState.corked : 0; - } - - get writableLength() { - return this._writableState && this._writableState.length; - } -} - -export default Duplex; diff --git a/std/node/_stream/duplex_internal.ts b/std/node/_stream/duplex_internal.ts deleted file mode 100644 index bfd9749f8..000000000 --- a/std/node/_stream/duplex_internal.ts +++ /dev/null @@ -1,296 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import type { ReadableState } from "./readable.ts"; -import { addChunk, maybeReadMore, onEofChunk } from "./readable_internal.ts"; -import type Writable from "./writable.ts"; -import type { WritableState } from "./writable.ts"; -import { - afterWrite, - AfterWriteTick, - afterWriteTick, - clearBuffer, - errorBuffer, - kOnFinished, - needFinish, - prefinish, -} from "./writable_internal.ts"; -import { Buffer } from "../buffer.ts"; -import type Duplex from "./duplex.ts"; -import { - ERR_MULTIPLE_CALLBACK, - ERR_STREAM_PUSH_AFTER_EOF, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT, -} from "../_errors.ts"; - -export function endDuplex(stream: Duplex) { - const state = stream._readableState; - - if (!state.endEmitted) { - state.ended = true; - queueMicrotask(() => endReadableNT(state, stream)); - } -} - -function endReadableNT(state: ReadableState, stream: Duplex) { - // Check that we didn't get one last unshift. - if ( - !state.errorEmitted && !state.closeEmitted && - !state.endEmitted && state.length === 0 - ) { - state.endEmitted = true; - stream.emit("end"); - - if (stream.writable && stream.allowHalfOpen === false) { - queueMicrotask(() => endWritableNT(state, stream)); - } else if (state.autoDestroy) { - // In case of duplex streams we need a way to detect - // if the writable side is ready for autoDestroy as well. - const wState = stream._writableState; - const autoDestroy = !wState || ( - wState.autoDestroy && - // We don't expect the writable to ever 'finish' - // if writable is explicitly set to false. - (wState.finished || wState.writable === false) - ); - - if (autoDestroy) { - stream.destroy(); - } - } - } -} - -function endWritableNT(state: ReadableState, stream: Duplex) { - const writable = stream.writable && - !stream.writableEnded && - !stream.destroyed; - if (writable) { - stream.end(); - } -} - -export function errorOrDestroy( - // deno-lint-ignore no-explicit-any - this: any, - stream: Duplex, - err: Error, - sync = false, -) { - const r = stream._readableState; - const w = stream._writableState; - - if (w.destroyed || r.destroyed) { - return this; - } - - if (r.autoDestroy || w.autoDestroy) { - stream.destroy(err); - } else if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (w && !w.errored) { - w.errored = err; - } - if (r && !r.errored) { - r.errored = err; - } - - if (sync) { - queueMicrotask(() => { - if (w.errorEmitted || r.errorEmitted) { - return; - } - - w.errorEmitted = true; - r.errorEmitted = true; - - stream.emit("error", err); - }); - } else { - if (w.errorEmitted || r.errorEmitted) { - return; - } - - w.errorEmitted = true; - r.errorEmitted = true; - - stream.emit("error", err); - } - } -} - -function finish(stream: Duplex, state: WritableState) { - state.pendingcb--; - if (state.errorEmitted || state.closeEmitted) { - return; - } - - state.finished = true; - - for (const callback of state[kOnFinished].splice(0)) { - callback(); - } - - stream.emit("finish"); - - if (state.autoDestroy) { - stream.destroy(); - } -} - -export function finishMaybe( - stream: Duplex, - state: WritableState, - sync?: boolean, -) { - if (needFinish(state)) { - prefinish(stream as Writable, state); - if (state.pendingcb === 0 && needFinish(state)) { - state.pendingcb++; - if (sync) { - queueMicrotask(() => finish(stream, state)); - } else { - finish(stream, state); - } - } - } -} - -export function onwrite(stream: Duplex, er?: Error | null) { - const state = stream._writableState; - const sync = state.sync; - const cb = state.writecb; - - if (typeof cb !== "function") { - errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); - return; - } - - state.writing = false; - state.writecb = null; - state.length -= state.writelen; - state.writelen = 0; - - if (er) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - er.stack; - - if (!state.errored) { - state.errored = er; - } - - if (stream._readableState && !stream._readableState.errored) { - stream._readableState.errored = er; - } - - if (sync) { - queueMicrotask(() => onwriteError(stream, state, er, cb)); - } else { - onwriteError(stream, state, er, cb); - } - } else { - if (state.buffered.length > state.bufferedIndex) { - clearBuffer(stream, state); - } - - if (sync) { - if ( - state.afterWriteTickInfo !== null && - state.afterWriteTickInfo.cb === cb - ) { - state.afterWriteTickInfo.count++; - } else { - state.afterWriteTickInfo = { - count: 1, - cb: (cb as (error?: Error) => void), - stream: stream as Writable, - state, - }; - queueMicrotask(() => - afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) - ); - } - } else { - afterWrite(stream as Writable, state, 1, cb as (error?: Error) => void); - } - } -} - -function onwriteError( - stream: Duplex, - state: WritableState, - er: Error, - cb: (error: Error) => void, -) { - --state.pendingcb; - - cb(er); - errorBuffer(state); - errorOrDestroy(stream, er); -} - -export function readableAddChunk( - stream: Duplex, - chunk: string | Buffer | Uint8Array | null, - encoding: undefined | string = undefined, - addToFront: boolean, -) { - const state = stream._readableState; - let usedEncoding = encoding; - - let err; - if (!state.objectMode) { - if (typeof chunk === "string") { - usedEncoding = encoding || state.defaultEncoding; - if (state.encoding !== usedEncoding) { - if (addToFront && state.encoding) { - chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding); - } else { - chunk = Buffer.from(chunk, usedEncoding); - usedEncoding = ""; - } - } - } else if (chunk instanceof Uint8Array) { - chunk = Buffer.from(chunk); - } - } - - if (err) { - errorOrDestroy(stream, err); - } else if (chunk === null) { - state.reading = false; - onEofChunk(stream, state); - } else if (state.objectMode || (chunk.length > 0)) { - if (addToFront) { - if (state.endEmitted) { - errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - } else { - addChunk(stream, state, chunk, true); - } - } else if (state.ended) { - errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state.destroyed || state.errored) { - return false; - } else { - state.reading = false; - if (state.decoder && !usedEncoding) { - //TODO(Soremwar) - //I don't think this cast is right - chunk = state.decoder.write(Buffer.from(chunk as Uint8Array)); - if (state.objectMode || chunk.length !== 0) { - addChunk(stream, state, chunk, false); - } else { - maybeReadMore(stream, state); - } - } else { - addChunk(stream, state, chunk, false); - } - } - } else if (!addToFront) { - state.reading = false; - maybeReadMore(stream, state); - } - - return !state.ended && - (state.length < state.highWaterMark || state.length === 0); -} diff --git a/std/node/_stream/duplex_test.ts b/std/node/_stream/duplex_test.ts deleted file mode 100644 index 1596ec218..000000000 --- a/std/node/_stream/duplex_test.ts +++ /dev/null @@ -1,698 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import Duplex from "./duplex.ts"; -import finished from "./end_of_stream.ts"; -import { - assert, - assertEquals, - assertStrictEquals, - assertThrows, -} from "../../testing/asserts.ts"; -import { deferred, delay } from "../../async/mod.ts"; - -Deno.test("Duplex stream works normally", () => { - const stream = new Duplex({ objectMode: true }); - - assert(stream._readableState.objectMode); - assert(stream._writableState.objectMode); - assert(stream.allowHalfOpen); - assertEquals(stream.listenerCount("end"), 0); - - let written: { val: number }; - let read: { val: number }; - - stream._write = (obj, _, cb) => { - written = obj; - cb(); - }; - - stream._read = () => {}; - - stream.on("data", (obj) => { - read = obj; - }); - - stream.push({ val: 1 }); - stream.end({ val: 2 }); - - stream.on("finish", () => { - assertEquals(read.val, 1); - assertEquals(written.val, 2); - }); -}); - -Deno.test("Duplex stream gets constructed correctly", () => { - const d1 = new Duplex({ - objectMode: true, - highWaterMark: 100, - }); - - assertEquals(d1.readableObjectMode, true); - assertEquals(d1.readableHighWaterMark, 100); - assertEquals(d1.writableObjectMode, true); - assertEquals(d1.writableHighWaterMark, 100); - - const d2 = new Duplex({ - readableObjectMode: false, - readableHighWaterMark: 10, - writableObjectMode: true, - writableHighWaterMark: 100, - }); - - assertEquals(d2.writableObjectMode, true); - assertEquals(d2.writableHighWaterMark, 100); - assertEquals(d2.readableObjectMode, false); - assertEquals(d2.readableHighWaterMark, 10); -}); - -Deno.test("Duplex stream can be paused", () => { - const readable = new Duplex(); - - // _read is a noop, here. - readable._read = () => {}; - - // Default state of a stream is not "paused" - assert(!readable.isPaused()); - - // Make the stream start flowing... - readable.on("data", () => {}); - - // still not paused. - assert(!readable.isPaused()); - - readable.pause(); - assert(readable.isPaused()); - readable.resume(); - assert(!readable.isPaused()); -}); - -Deno.test("Duplex stream sets enconding correctly", () => { - const readable = new Duplex({ - read() {}, - }); - - readable.setEncoding("utf8"); - - readable.push(new TextEncoder().encode("DEF")); - readable.unshift(new TextEncoder().encode("ABC")); - - assertStrictEquals(readable.read(), "ABCDEF"); -}); - -Deno.test("Duplex stream sets encoding correctly", () => { - const readable = new Duplex({ - read() {}, - }); - - readable.setEncoding("utf8"); - - readable.push(new TextEncoder().encode("DEF")); - readable.unshift(new TextEncoder().encode("ABC")); - - assertStrictEquals(readable.read(), "ABCDEF"); -}); - -Deno.test("Duplex stream holds up a big push", async () => { - let readExecuted = 0; - const readExecutedExpected = 3; - const readExpectedExecutions = deferred(); - - let endExecuted = 0; - const endExecutedExpected = 1; - const endExpectedExecutions = deferred(); - - const str = "asdfasdfasdfasdfasdf"; - - const r = new Duplex({ - highWaterMark: 5, - encoding: "utf8", - }); - - let reads = 0; - - function _read() { - if (reads === 0) { - setTimeout(() => { - r.push(str); - }, 1); - reads++; - } else if (reads === 1) { - const ret = r.push(str); - assertEquals(ret, false); - reads++; - } else { - r.push(null); - } - } - - r._read = () => { - readExecuted++; - if (readExecuted == readExecutedExpected) { - readExpectedExecutions.resolve(); - } - _read(); - }; - - r.on("end", () => { - endExecuted++; - if (endExecuted == endExecutedExpected) { - endExpectedExecutions.resolve(); - } - }); - - // Push some data in to start. - // We've never gotten any read event at this point. - const ret = r.push(str); - assert(!ret); - let chunk = r.read(); - assertEquals(chunk, str); - chunk = r.read(); - assertEquals(chunk, null); - - r.once("readable", () => { - // This time, we'll get *all* the remaining data, because - // it's been added synchronously, as the read WOULD take - // us below the hwm, and so it triggered a _read() again, - // which synchronously added more, which we then return. - chunk = r.read(); - assertEquals(chunk, str + str); - - chunk = r.read(); - assertEquals(chunk, null); - }); - - const readTimeout = setTimeout( - () => readExpectedExecutions.reject(), - 1000, - ); - const endTimeout = setTimeout( - () => endExpectedExecutions.reject(), - 1000, - ); - await readExpectedExecutions; - await endExpectedExecutions; - clearTimeout(readTimeout); - clearTimeout(endTimeout); - assertEquals(readExecuted, readExecutedExpected); - assertEquals(endExecuted, endExecutedExpected); -}); - -Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => { - let readableExecuted = 0; - const readableExecutedExpected = 1; - const readableExpectedExecutions = deferred(); - - const r = new Duplex({ - highWaterMark: 3, - }); - - r._read = () => { - throw new Error("_read must not be called"); - }; - r.push(Buffer.from("blerg")); - - setTimeout(function () { - assert(!r._readableState.reading); - r.on("readable", () => { - readableExecuted++; - if (readableExecuted == readableExecutedExpected) { - readableExpectedExecutions.resolve(); - } - }); - }, 1); - - const readableTimeout = setTimeout( - () => readableExpectedExecutions.reject(), - 1000, - ); - await readableExpectedExecutions; - clearTimeout(readableTimeout); - assertEquals(readableExecuted, readableExecutedExpected); -}); - -Deno.test("Duplex stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => { - let readableExecuted = 0; - const readableExecutedExpected = 1; - const readableExpectedExecutions = deferred(); - - let readExecuted = 0; - const readExecutedExpected = 1; - const readExpectedExecutions = deferred(); - - const r = new Duplex({ - highWaterMark: 3, - }); - - r._read = () => { - readExecuted++; - if (readExecuted == readExecutedExpected) { - readExpectedExecutions.resolve(); - } - }; - - r.push(Buffer.from("bl")); - - setTimeout(function () { - assert(r._readableState.reading); - r.on("readable", () => { - readableExecuted++; - if (readableExecuted == readableExecutedExpected) { - readableExpectedExecutions.resolve(); - } - }); - }, 1); - - const readableTimeout = setTimeout( - () => readableExpectedExecutions.reject(), - 1000, - ); - const readTimeout = setTimeout( - () => readExpectedExecutions.reject(), - 1000, - ); - await readableExpectedExecutions; - await readExpectedExecutions; - clearTimeout(readableTimeout); - clearTimeout(readTimeout); - assertEquals(readableExecuted, readableExecutedExpected); - assertEquals(readExecuted, readExecutedExpected); -}); - -Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => { - let readableExecuted = 0; - const readableExecutedExpected = 1; - const readableExpectedExecutions = deferred(); - - const r = new Duplex({ - highWaterMark: 30, - }); - - r._read = () => { - throw new Error("Must not be executed"); - }; - - r.push(Buffer.from("blerg")); - //This ends the stream and triggers end - r.push(null); - - setTimeout(function () { - // Assert we're testing what we think we are - assert(!r._readableState.reading); - r.on("readable", () => { - readableExecuted++; - if (readableExecuted == readableExecutedExpected) { - readableExpectedExecutions.resolve(); - } - }); - }, 1); - - const readableTimeout = setTimeout( - () => readableExpectedExecutions.reject(), - 1000, - ); - await readableExpectedExecutions; - clearTimeout(readableTimeout); - assertEquals(readableExecuted, readableExecutedExpected); -}); - -Deno.test("Duplex stream: 'read' is emitted on empty string pushed in non-object mode", async () => { - let endExecuted = 0; - const endExecutedExpected = 1; - const endExpectedExecutions = deferred(); - - const underlyingData = ["", "x", "y", "", "z"]; - const expected = underlyingData.filter((data) => data); - const result: unknown[] = []; - - const r = new Duplex({ - encoding: "utf8", - }); - r._read = function () { - queueMicrotask(() => { - if (!underlyingData.length) { - this.push(null); - } else { - this.push(underlyingData.shift()); - } - }); - }; - - r.on("readable", () => { - const data = r.read(); - if (data !== null) result.push(data); - }); - - r.on("end", () => { - endExecuted++; - if (endExecuted == endExecutedExpected) { - endExpectedExecutions.resolve(); - } - assertEquals(result, expected); - }); - - const endTimeout = setTimeout( - () => endExpectedExecutions.reject(), - 1000, - ); - await endExpectedExecutions; - clearTimeout(endTimeout); - assertEquals(endExecuted, endExecutedExpected); -}); - -Deno.test("Duplex stream: listeners can be removed", () => { - const r = new Duplex(); - r._read = () => {}; - r.on("data", () => {}); - - r.removeAllListeners("data"); - - assertEquals(r.eventNames().length, 0); -}); - -Deno.test("Duplex stream writes correctly", async () => { - let callback: undefined | ((error?: Error | null | undefined) => void); - - let writeExecuted = 0; - const writeExecutedExpected = 1; - const writeExpectedExecutions = deferred(); - - let writevExecuted = 0; - const writevExecutedExpected = 1; - const writevExpectedExecutions = deferred(); - - const writable = new Duplex({ - write: (chunk, encoding, cb) => { - writeExecuted++; - if (writeExecuted == writeExecutedExpected) { - writeExpectedExecutions.resolve(); - } - assert(chunk instanceof Buffer); - assertStrictEquals(encoding, "buffer"); - assertStrictEquals(String(chunk), "ABC"); - callback = cb; - }, - writev: (chunks) => { - writevExecuted++; - if (writevExecuted == writevExecutedExpected) { - writevExpectedExecutions.resolve(); - } - assertStrictEquals(chunks.length, 2); - assertStrictEquals(chunks[0].encoding, "buffer"); - assertStrictEquals(chunks[1].encoding, "buffer"); - assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI"); - }, - }); - - writable.write(new TextEncoder().encode("ABC")); - writable.write(new TextEncoder().encode("DEF")); - writable.end(new TextEncoder().encode("GHI")); - callback?.(); - - const writeTimeout = setTimeout( - () => writeExpectedExecutions.reject(), - 1000, - ); - const writevTimeout = setTimeout( - () => writevExpectedExecutions.reject(), - 1000, - ); - await writeExpectedExecutions; - await writevExpectedExecutions; - clearTimeout(writeTimeout); - clearTimeout(writevTimeout); - assertEquals(writeExecuted, writeExecutedExpected); - assertEquals(writevExecuted, writevExecutedExpected); -}); - -Deno.test("Duplex stream writes Uint8Array in object mode", async () => { - let writeExecuted = 0; - const writeExecutedExpected = 1; - const writeExpectedExecutions = deferred(); - - const ABC = new TextEncoder().encode("ABC"); - - const writable = new Duplex({ - objectMode: true, - write: (chunk, encoding, cb) => { - writeExecuted++; - if (writeExecuted == writeExecutedExpected) { - writeExpectedExecutions.resolve(); - } - assert(!(chunk instanceof Buffer)); - assert(chunk instanceof Uint8Array); - assertEquals(chunk, ABC); - assertEquals(encoding, "utf8"); - cb(); - }, - }); - - writable.end(ABC); - - const writeTimeout = setTimeout( - () => writeExpectedExecutions.reject(), - 1000, - ); - await writeExpectedExecutions; - clearTimeout(writeTimeout); - assertEquals(writeExecuted, writeExecutedExpected); -}); - -Deno.test("Duplex stream throws on unexpected close", async () => { - let finishedExecuted = 0; - const finishedExecutedExpected = 1; - const finishedExpectedExecutions = deferred(); - - const writable = new Duplex({ - write: () => {}, - }); - writable.writable = false; - writable.destroy(); - - finished(writable, (err) => { - finishedExecuted++; - if (finishedExecuted == finishedExecutedExpected) { - finishedExpectedExecutions.resolve(); - } - assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE"); - }); - - const finishedTimeout = setTimeout( - () => finishedExpectedExecutions.reject(), - 1000, - ); - await finishedExpectedExecutions; - clearTimeout(finishedTimeout); - assertEquals(finishedExecuted, finishedExecutedExpected); -}); - -Deno.test("Duplex stream finishes correctly after error", async () => { - let errorExecuted = 0; - const errorExecutedExpected = 1; - const errorExpectedExecutions = deferred(); - - let finishedExecuted = 0; - const finishedExecutedExpected = 1; - const finishedExpectedExecutions = deferred(); - - const w = new Duplex({ - write(_chunk, _encoding, cb) { - cb(new Error()); - }, - autoDestroy: false, - }); - w.write("asd"); - w.on("error", () => { - errorExecuted++; - if (errorExecuted == errorExecutedExpected) { - errorExpectedExecutions.resolve(); - } - finished(w, () => { - finishedExecuted++; - if (finishedExecuted == finishedExecutedExpected) { - finishedExpectedExecutions.resolve(); - } - }); - }); - - const errorTimeout = setTimeout( - () => errorExpectedExecutions.reject(), - 1000, - ); - const finishedTimeout = setTimeout( - () => finishedExpectedExecutions.reject(), - 1000, - ); - await finishedExpectedExecutions; - await errorExpectedExecutions; - clearTimeout(finishedTimeout); - clearTimeout(errorTimeout); - assertEquals(finishedExecuted, finishedExecutedExpected); - assertEquals(errorExecuted, errorExecutedExpected); -}); - -Deno.test("Duplex stream fails on 'write' null value", () => { - const writable = new Duplex(); - assertThrows(() => writable.write(null)); -}); - -Deno.test("Duplex stream is destroyed correctly", async () => { - let closeExecuted = 0; - const closeExecutedExpected = 1; - const closeExpectedExecutions = deferred(); - - const unexpectedExecution = deferred(); - - const duplex = new Duplex({ - write(_chunk, _enc, cb) { - cb(); - }, - read() {}, - }); - - duplex.resume(); - - function never() { - unexpectedExecution.reject(); - } - - duplex.on("end", never); - duplex.on("finish", never); - duplex.on("close", () => { - closeExecuted++; - if (closeExecuted == closeExecutedExpected) { - closeExpectedExecutions.resolve(); - } - }); - - duplex.destroy(); - assertEquals(duplex.destroyed, true); - - const closeTimeout = setTimeout( - () => closeExpectedExecutions.reject(), - 1000, - ); - await Promise.race([ - unexpectedExecution, - delay(100), - ]); - await closeExpectedExecutions; - clearTimeout(closeTimeout); - assertEquals(closeExecuted, closeExecutedExpected); -}); - -Deno.test("Duplex stream errors correctly on destroy", async () => { - let errorExecuted = 0; - const errorExecutedExpected = 1; - const errorExpectedExecutions = deferred(); - - const unexpectedExecution = deferred(); - - const duplex = new Duplex({ - write(_chunk, _enc, cb) { - cb(); - }, - read() {}, - }); - duplex.resume(); - - const expected = new Error("kaboom"); - - function never() { - unexpectedExecution.reject(); - } - - duplex.on("end", never); - duplex.on("finish", never); - duplex.on("error", (err) => { - errorExecuted++; - if (errorExecuted == errorExecutedExpected) { - errorExpectedExecutions.resolve(); - } - assertStrictEquals(err, expected); - }); - - duplex.destroy(expected); - assertEquals(duplex.destroyed, true); - - const errorTimeout = setTimeout( - () => errorExpectedExecutions.reject(), - 1000, - ); - await Promise.race([ - unexpectedExecution, - delay(100), - ]); - await errorExpectedExecutions; - clearTimeout(errorTimeout); - assertEquals(errorExecuted, errorExecutedExpected); -}); - -Deno.test("Duplex stream doesn't finish on allowHalfOpen", async () => { - const unexpectedExecution = deferred(); - - const duplex = new Duplex({ - read() {}, - }); - - assertEquals(duplex.allowHalfOpen, true); - duplex.on("finish", () => unexpectedExecution.reject()); - assertEquals(duplex.listenerCount("end"), 0); - duplex.resume(); - duplex.push(null); - - await Promise.race([ - unexpectedExecution, - delay(100), - ]); -}); - -Deno.test("Duplex stream finishes when allowHalfOpen is disabled", async () => { - let finishExecuted = 0; - const finishExecutedExpected = 1; - const finishExpectedExecutions = deferred(); - - const duplex = new Duplex({ - read() {}, - allowHalfOpen: false, - }); - - assertEquals(duplex.allowHalfOpen, false); - duplex.on("finish", () => { - finishExecuted++; - if (finishExecuted == finishExecutedExpected) { - finishExpectedExecutions.resolve(); - } - }); - assertEquals(duplex.listenerCount("end"), 0); - duplex.resume(); - duplex.push(null); - - const finishTimeout = setTimeout( - () => finishExpectedExecutions.reject(), - 1000, - ); - await finishExpectedExecutions; - clearTimeout(finishTimeout); - assertEquals(finishExecuted, finishExecutedExpected); -}); - -Deno.test("Duplex stream doesn't finish when allowHalfOpen is disabled but stream ended", async () => { - const unexpectedExecution = deferred(); - - const duplex = new Duplex({ - read() {}, - allowHalfOpen: false, - }); - - assertEquals(duplex.allowHalfOpen, false); - duplex._writableState.ended = true; - duplex.on("finish", () => unexpectedExecution.reject()); - assertEquals(duplex.listenerCount("end"), 0); - duplex.resume(); - duplex.push(null); - - await Promise.race([ - unexpectedExecution, - delay(100), - ]); -}); diff --git a/std/node/_stream/end_of_stream.ts b/std/node/_stream/end_of_stream.ts deleted file mode 100644 index 6179e7fc4..000000000 --- a/std/node/_stream/end_of_stream.ts +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { once } from "../_utils.ts"; -import type Duplex from "./duplex.ts"; -import type Readable from "./readable.ts"; -import type Stream from "./stream.ts"; -import type { ReadableState } from "./readable.ts"; -import type Writable from "./writable.ts"; -import type { WritableState } from "./writable.ts"; -import { - ERR_INVALID_ARG_TYPE, - ERR_STREAM_PREMATURE_CLOSE, - NodeErrorAbstraction, -} from "../_errors.ts"; - -export type StreamImplementations = Duplex | Readable | Stream | Writable; - -// TODO(Soremwar) -// Bring back once requests are implemented -// function isRequest(stream: Stream) { -// return stream.setHeader && typeof stream.abort === "function"; -// } - -// deno-lint-ignore no-explicit-any -function isReadable(stream: any) { - return typeof stream.readable === "boolean" || - typeof stream.readableEnded === "boolean" || - !!stream._readableState; -} - -// deno-lint-ignore no-explicit-any -function isWritable(stream: any) { - return typeof stream.writable === "boolean" || - typeof stream.writableEnded === "boolean" || - !!stream._writableState; -} - -function isWritableFinished(stream: Writable) { - if (stream.writableFinished) return true; - const wState = stream._writableState; - if (!wState || wState.errored) return false; - return wState.finished || (wState.ended && wState.length === 0); -} - -function nop() {} - -function isReadableEnded(stream: Readable) { - if (stream.readableEnded) return true; - const rState = stream._readableState; - if (!rState || rState.errored) return false; - return rState.endEmitted || (rState.ended && rState.length === 0); -} - -export interface FinishedOptions { - error?: boolean; - readable?: boolean; - writable?: boolean; -} - -/** - * Appends an ending callback triggered when a stream is no longer readable, - * writable or has experienced an error or a premature close event -*/ -export default function eos( - stream: StreamImplementations, - options: FinishedOptions | null, - callback: (err?: NodeErrorAbstraction | null) => void, -): () => void; -export default function eos( - stream: StreamImplementations, - callback: (err?: NodeErrorAbstraction | null) => void, -): () => void; -export default function eos( - stream: StreamImplementations, - x: FinishedOptions | ((err?: NodeErrorAbstraction | null) => void) | null, - y?: (err?: NodeErrorAbstraction | null) => void, -) { - let opts: FinishedOptions; - let callback: (err?: NodeErrorAbstraction | null) => void; - - if (!y) { - if (typeof x !== "function") { - throw new ERR_INVALID_ARG_TYPE("callback", "function", x); - } - opts = {}; - callback = x; - } else { - if (!x || Array.isArray(x) || typeof x !== "object") { - throw new ERR_INVALID_ARG_TYPE("opts", "object", x); - } - opts = x; - - if (typeof y !== "function") { - throw new ERR_INVALID_ARG_TYPE("callback", "function", y); - } - callback = y; - } - - callback = once(callback); - - const readable = opts.readable ?? isReadable(stream); - const writable = opts.writable ?? isWritable(stream); - - // deno-lint-ignore no-explicit-any - const wState: WritableState | undefined = (stream as any)._writableState; - // deno-lint-ignore no-explicit-any - const rState: ReadableState | undefined = (stream as any)._readableState; - const validState = wState || rState; - - const onlegacyfinish = () => { - if (!(stream as Writable).writable) { - onfinish(); - } - }; - - let willEmitClose = ( - validState?.autoDestroy && - validState?.emitClose && - validState?.closed === false && - isReadable(stream) === readable && - isWritable(stream) === writable - ); - - let writableFinished = (stream as Writable).writableFinished || - wState?.finished; - const onfinish = () => { - writableFinished = true; - // deno-lint-ignore no-explicit-any - if ((stream as any).destroyed) { - willEmitClose = false; - } - - if (willEmitClose && (!(stream as Readable).readable || readable)) { - return; - } - if (!readable || readableEnded) { - callback.call(stream); - } - }; - - let readableEnded = (stream as Readable).readableEnded || rState?.endEmitted; - const onend = () => { - readableEnded = true; - // deno-lint-ignore no-explicit-any - if ((stream as any).destroyed) { - willEmitClose = false; - } - - if (willEmitClose && (!(stream as Writable).writable || writable)) { - return; - } - if (!writable || writableFinished) { - callback.call(stream); - } - }; - - const onerror = (err: NodeErrorAbstraction) => { - callback.call(stream, err); - }; - - const onclose = () => { - if (readable && !readableEnded) { - if (!isReadableEnded(stream as Readable)) { - return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); - } - } - if (writable && !writableFinished) { - if (!isWritableFinished(stream as Writable)) { - return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); - } - } - callback.call(stream); - }; - - // TODO(Soremwar) - // Bring back once requests are implemented - // const onrequest = () => { - // stream.req.on("finish", onfinish); - // }; - - // TODO(Soremwar) - // Bring back once requests are implemented - // if (isRequest(stream)) { - // stream.on("complete", onfinish); - // stream.on("abort", onclose); - // if (stream.req) { - // onrequest(); - // } else { - // stream.on("request", onrequest); - // } - // } else - if (writable && !wState) { - stream.on("end", onlegacyfinish); - stream.on("close", onlegacyfinish); - } - - // TODO(Soremwar) - // Bring back once requests are implemented - // if (typeof stream.aborted === "boolean") { - // stream.on("aborted", onclose); - // } - - stream.on("end", onend); - stream.on("finish", onfinish); - if (opts.error !== false) stream.on("error", onerror); - stream.on("close", onclose); - - const closed = ( - wState?.closed || - rState?.closed || - wState?.errorEmitted || - rState?.errorEmitted || - // TODO(Soremwar) - // Bring back once requests are implemented - // (rState && stream.req && stream.aborted) || - ( - (!writable || wState?.finished) && - (!readable || rState?.endEmitted) - ) - ); - - if (closed) { - queueMicrotask(callback); - } - - return function () { - callback = nop; - stream.removeListener("aborted", onclose); - stream.removeListener("complete", onfinish); - stream.removeListener("abort", onclose); - // TODO(Soremwar) - // Bring back once requests are implemented - // stream.removeListener("request", onrequest); - // if (stream.req) stream.req.removeListener("finish", onfinish); - stream.removeListener("end", onlegacyfinish); - stream.removeListener("close", onlegacyfinish); - stream.removeListener("finish", onfinish); - stream.removeListener("end", onend); - stream.removeListener("error", onerror); - stream.removeListener("close", onclose); - }; -} diff --git a/std/node/_stream/end_of_stream_test.ts b/std/node/_stream/end_of_stream_test.ts deleted file mode 100644 index 571e75b99..000000000 --- a/std/node/_stream/end_of_stream_test.ts +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import finished from "./end_of_stream.ts"; -import Readable from "./readable.ts"; -import Transform from "./transform.ts"; -import Writable from "./writable.ts"; -import { mustCall } from "../_utils.ts"; -import { assert, fail } from "../../testing/asserts.ts"; -import { deferred, delay } from "../../async/mod.ts"; - -Deno.test("Finished appends to Readable correctly", async () => { - const rs = new Readable({ - read() {}, - }); - - const [finishedExecution, finishedCb] = mustCall((err) => { - assert(!err); - }); - - finished(rs, finishedCb); - - rs.push(null); - rs.resume(); - - await finishedExecution; -}); - -Deno.test("Finished appends to Writable correctly", async () => { - const ws = new Writable({ - write(_data, _enc, cb) { - cb(); - }, - }); - - const [finishedExecution, finishedCb] = mustCall((err) => { - assert(!err); - }); - - finished(ws, finishedCb); - - ws.end(); - - await finishedExecution; -}); - -Deno.test("Finished appends to Transform correctly", async () => { - const tr = new Transform({ - transform(_data, _enc, cb) { - cb(); - }, - }); - - let finish = false; - let ended = false; - - tr.on("end", () => { - ended = true; - }); - - tr.on("finish", () => { - finish = true; - }); - - const [finishedExecution, finishedCb] = mustCall((err) => { - assert(!err); - assert(finish); - assert(ended); - }); - - finished(tr, finishedCb); - - tr.end(); - tr.resume(); - - await finishedExecution; -}); - -Deno.test("The function returned by Finished clears the listeners", async () => { - const finishedExecution = deferred(); - - const ws = new Writable({ - write(_data, _env, cb) { - cb(); - }, - }); - - const removeListener = finished(ws, () => { - finishedExecution.reject(); - }); - removeListener(); - ws.end(); - - await Promise.race([ - delay(100), - finishedExecution, - ]) - .catch(() => fail("Finished was executed")); -}); diff --git a/std/node/_stream/from.ts b/std/node/_stream/from.ts deleted file mode 100644 index 652c17715..000000000 --- a/std/node/_stream/from.ts +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import Readable from "./readable.ts"; -import type { ReadableOptions } from "./readable.ts"; -import { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } from "../_errors.ts"; - -export default function from( - // deno-lint-ignore no-explicit-any - iterable: Iterable<any> | AsyncIterable<any>, - opts?: ReadableOptions, -) { - let iterator: - // deno-lint-ignore no-explicit-any - | Iterator<any, any, undefined> - // deno-lint-ignore no-explicit-any - | AsyncIterator<any, any, undefined>; - if (typeof iterable === "string" || iterable instanceof Buffer) { - return new Readable({ - objectMode: true, - ...opts, - read() { - this.push(iterable); - this.push(null); - }, - }); - } - - if (Symbol.asyncIterator in iterable) { - // deno-lint-ignore no-explicit-any - iterator = (iterable as AsyncIterable<any>)[Symbol.asyncIterator](); - } else if (Symbol.iterator in iterable) { - // deno-lint-ignore no-explicit-any - iterator = (iterable as Iterable<any>)[Symbol.iterator](); - } else { - throw new ERR_INVALID_ARG_TYPE("iterable", ["Iterable"], iterable); - } - - const readable = new Readable({ - objectMode: true, - highWaterMark: 1, - ...opts, - }); - - // Reading boolean to protect against _read - // being called before last iteration completion. - let reading = false; - - // needToClose boolean if iterator needs to be explicitly closed - let needToClose = false; - - readable._read = function () { - if (!reading) { - reading = true; - next(); - } - }; - - readable._destroy = function (error, cb) { - if (needToClose) { - needToClose = false; - close().then( - () => queueMicrotask(() => cb(error)), - (e) => queueMicrotask(() => cb(error || e)), - ); - } else { - cb(error); - } - }; - - async function close() { - if (typeof iterator.return === "function") { - const { value } = await iterator.return(); - await value; - } - } - - async function next() { - try { - needToClose = false; - const { value, done } = await iterator.next(); - needToClose = !done; - if (done) { - readable.push(null); - } else if (readable.destroyed) { - await close(); - } else { - const res = await value; - if (res === null) { - reading = false; - throw new ERR_STREAM_NULL_VALUES(); - } else if (readable.push(res)) { - next(); - } else { - reading = false; - } - } - } catch (err) { - readable.destroy(err); - } - } - return readable; -} diff --git a/std/node/_stream/passthrough.ts b/std/node/_stream/passthrough.ts deleted file mode 100644 index 9126420e5..000000000 --- a/std/node/_stream/passthrough.ts +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import Transform from "./transform.ts"; -import type { TransformOptions } from "./transform.ts"; -import type { Encodings } from "../_utils.ts"; - -export default class PassThrough extends Transform { - constructor(options?: TransformOptions) { - super(options); - } - - _transform( - // deno-lint-ignore no-explicit-any - chunk: any, - _encoding: Encodings, - // deno-lint-ignore no-explicit-any - cb: (error?: Error | null, data?: any) => void, - ) { - cb(null, chunk); - } -} diff --git a/std/node/_stream/pipeline.ts b/std/node/_stream/pipeline.ts deleted file mode 100644 index d02a92870..000000000 --- a/std/node/_stream/pipeline.ts +++ /dev/null @@ -1,308 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { once } from "../_utils.ts"; -import { destroyer as implDestroyer } from "./destroy.ts"; -import eos from "./end_of_stream.ts"; -import createReadableStreamAsyncIterator from "./async_iterator.ts"; -import * as events from "../events.ts"; -import PassThrough from "./passthrough.ts"; -import { - ERR_INVALID_ARG_TYPE, - ERR_INVALID_CALLBACK, - ERR_INVALID_RETURN_VALUE, - ERR_MISSING_ARGS, - ERR_STREAM_DESTROYED, - NodeErrorAbstraction, -} from "../_errors.ts"; -import type Duplex from "./duplex.ts"; -import type Readable from "./readable.ts"; -import type Stream from "./stream.ts"; -import type Transform from "./transform.ts"; -import type Writable from "./writable.ts"; - -type Streams = Duplex | Readable | Writable; -// deno-lint-ignore no-explicit-any -type EndCallback = (err?: NodeErrorAbstraction | null, val?: any) => void; -type TransformCallback = - // deno-lint-ignore no-explicit-any - | ((value?: any) => AsyncGenerator<any>) - // deno-lint-ignore no-explicit-any - | ((value?: any) => Promise<any>); -/** - * This type represents an array that contains a data source, - * many Transform Streams, a writable stream destination - * and end in an optional callback - * */ -type DataSource = - // deno-lint-ignore no-explicit-any - | (() => AsyncGenerator<any>) - | // deno-lint-ignore no-explicit-any - AsyncIterable<any> - | Duplex - | // deno-lint-ignore no-explicit-any - Iterable<any> - | // deno-lint-ignore no-explicit-any - (() => Generator<any>) - | Readable; -type Transformers = Duplex | Transform | TransformCallback | Writable; -export type PipelineArguments = [ - DataSource, - ...Array<Transformers | EndCallback>, -]; - -function destroyer( - stream: Streams, - reading: boolean, - writing: boolean, - callback: EndCallback, -) { - callback = once(callback); - - let finished = false; - stream.on("close", () => { - finished = true; - }); - - eos(stream, { readable: reading, writable: writing }, (err) => { - finished = !err; - - // deno-lint-ignore no-explicit-any - const rState = (stream as any)?._readableState; - if ( - err && - err.code === "ERR_STREAM_PREMATURE_CLOSE" && - reading && - (rState?.ended && !rState?.errored && !rState?.errorEmitted) - ) { - stream - .once("end", callback) - .once("error", callback); - } else { - callback(err); - } - }); - - return (err: NodeErrorAbstraction) => { - if (finished) return; - finished = true; - implDestroyer(stream, err); - callback(err || new ERR_STREAM_DESTROYED("pipe")); - }; -} - -function popCallback(streams: PipelineArguments): EndCallback { - if (typeof streams[streams.length - 1] !== "function") { - throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]); - } - return streams.pop() as EndCallback; -} - -// function isPromise(obj) { -// return !!(obj && typeof obj.then === "function"); -// } - -// deno-lint-ignore no-explicit-any -function isReadable(obj: any): obj is Stream { - return !!(obj && typeof obj.pipe === "function"); -} - -// deno-lint-ignore no-explicit-any -function isWritable(obj: any) { - return !!(obj && typeof obj.write === "function"); -} - -// deno-lint-ignore no-explicit-any -function isStream(obj: any) { - return isReadable(obj) || isWritable(obj); -} - -// deno-lint-ignore no-explicit-any -function isIterable(obj: any, isAsync?: boolean) { - if (!obj) return false; - if (isAsync === true) return typeof obj[Symbol.asyncIterator] === "function"; - if (isAsync === false) return typeof obj[Symbol.iterator] === "function"; - return typeof obj[Symbol.asyncIterator] === "function" || - typeof obj[Symbol.iterator] === "function"; -} - -// deno-lint-ignore no-explicit-any -function makeAsyncIterable(val: Readable | Iterable<any> | AsyncIterable<any>) { - if (isIterable(val)) { - return val; - } else if (isReadable(val)) { - return fromReadable(val as Readable); - } - throw new ERR_INVALID_ARG_TYPE( - "val", - ["Readable", "Iterable", "AsyncIterable"], - val, - ); -} - -async function* fromReadable(val: Readable) { - yield* createReadableStreamAsyncIterator(val); -} - -async function pump( - // deno-lint-ignore no-explicit-any - iterable: Iterable<any>, - writable: Duplex | Writable, - finish: (err?: NodeErrorAbstraction | null) => void, -) { - let error; - try { - for await (const chunk of iterable) { - if (!writable.write(chunk)) { - if (writable.destroyed) return; - await events.once(writable, "drain"); - } - } - writable.end(); - } catch (err) { - error = err; - } finally { - finish(error); - } -} - -export default function pipeline(...args: PipelineArguments) { - const callback: EndCallback = once(popCallback(args)); - - let streams: [DataSource, ...Transformers[]]; - if (args.length > 1) { - streams = args as [DataSource, ...Transformers[]]; - } else { - throw new ERR_MISSING_ARGS("streams"); - } - - let error: NodeErrorAbstraction; - // deno-lint-ignore no-explicit-any - let value: any; - const destroys: Array<(err: NodeErrorAbstraction) => void> = []; - - let finishCount = 0; - - function finish(err?: NodeErrorAbstraction | null) { - const final = --finishCount === 0; - - if (err && (!error || error.code === "ERR_STREAM_PREMATURE_CLOSE")) { - error = err; - } - - if (!error && !final) { - return; - } - - while (destroys.length) { - (destroys.shift() as (err: NodeErrorAbstraction) => void)(error); - } - - if (final) { - callback(error, value); - } - } - - // TODO(Soremwar) - // Simplify the hell out of this - // deno-lint-ignore no-explicit-any - let ret: any; - for (let i = 0; i < streams.length; i++) { - const stream = streams[i]; - const reading = i < streams.length - 1; - const writing = i > 0; - - if (isStream(stream)) { - finishCount++; - destroys.push(destroyer(stream as Streams, reading, writing, finish)); - } - - if (i === 0) { - if (typeof stream === "function") { - ret = stream(); - if (!isIterable(ret)) { - throw new ERR_INVALID_RETURN_VALUE( - "Iterable, AsyncIterable or Stream", - "source", - ret, - ); - } - } else if (isIterable(stream) || isReadable(stream)) { - ret = stream; - } else { - throw new ERR_INVALID_ARG_TYPE( - "source", - ["Stream", "Iterable", "AsyncIterable", "Function"], - stream, - ); - } - } else if (typeof stream === "function") { - ret = makeAsyncIterable(ret); - ret = stream(ret); - - if (reading) { - if (!isIterable(ret, true)) { - throw new ERR_INVALID_RETURN_VALUE( - "AsyncIterable", - `transform[${i - 1}]`, - ret, - ); - } - } else { - // If the last argument to pipeline is not a stream - // we must create a proxy stream so that pipeline(...) - // always returns a stream which can be further - // composed through `.pipe(stream)`. - const pt = new PassThrough({ - objectMode: true, - }); - if (ret instanceof Promise) { - ret - .then((val) => { - value = val; - pt.end(val); - }, (err) => { - pt.destroy(err); - }); - } else if (isIterable(ret, true)) { - finishCount++; - pump(ret, pt, finish); - } else { - throw new ERR_INVALID_RETURN_VALUE( - "AsyncIterable or Promise", - "destination", - ret, - ); - } - - ret = pt; - - finishCount++; - destroys.push(destroyer(ret, false, true, finish)); - } - } else if (isStream(stream)) { - if (isReadable(ret)) { - ret.pipe(stream as Readable); - - // TODO(Soremwar) - // Reimplement after stdout and stderr are implemented - // if (stream === process.stdout || stream === process.stderr) { - // ret.on("end", () => stream.end()); - // } - } else { - ret = makeAsyncIterable(ret); - - finishCount++; - pump(ret, stream as Writable, finish); - } - ret = stream; - } else { - const name = reading ? `transform[${i - 1}]` : "destination"; - throw new ERR_INVALID_ARG_TYPE( - name, - ["Stream", "Function"], - ret, - ); - } - } - - return ret as unknown as Readable; -} diff --git a/std/node/_stream/pipeline_test.ts b/std/node/_stream/pipeline_test.ts deleted file mode 100644 index aa1869416..000000000 --- a/std/node/_stream/pipeline_test.ts +++ /dev/null @@ -1,387 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import PassThrough from "./passthrough.ts"; -import pipeline from "./pipeline.ts"; -import Readable from "./readable.ts"; -import Transform from "./transform.ts"; -import Writable from "./writable.ts"; -import { mustCall } from "../_utils.ts"; -import { - assert, - assertEquals, - assertStrictEquals, -} from "../../testing/asserts.ts"; -import type { NodeErrorAbstraction } from "../_errors.ts"; - -Deno.test("Pipeline ends on stream finished", async () => { - let finished = false; - - // deno-lint-ignore no-explicit-any - const processed: any[] = []; - const expected = [ - Buffer.from("a"), - Buffer.from("b"), - Buffer.from("c"), - ]; - - const read = new Readable({ - read() {}, - }); - - const write = new Writable({ - write(data, _enc, cb) { - processed.push(data); - cb(); - }, - }); - - write.on("finish", () => { - finished = true; - }); - - for (let i = 0; i < expected.length; i++) { - read.push(expected[i]); - } - read.push(null); - - const [finishedCompleted, finishedCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assert(!err); - assert(finished); - assertEquals(processed, expected); - }, - 1, - ); - - pipeline(read, write, finishedCb); - - await finishedCompleted; -}); - -Deno.test("Pipeline fails on stream destroyed", async () => { - const read = new Readable({ - read() {}, - }); - - const write = new Writable({ - write(_data, _enc, cb) { - cb(); - }, - }); - - read.push("data"); - queueMicrotask(() => read.destroy()); - - const [pipelineExecuted, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assert(err); - }, - 1, - ); - pipeline(read, write, pipelineCb); - - await pipelineExecuted; -}); - -Deno.test("Pipeline exits on stream error", async () => { - const read = new Readable({ - read() {}, - }); - - const transform = new Transform({ - transform(_data, _enc, cb) { - cb(new Error("kaboom")); - }, - }); - - const write = new Writable({ - write(_data, _enc, cb) { - cb(); - }, - }); - - const [readExecution, readCb] = mustCall(); - read.on("close", readCb); - const [closeExecution, closeCb] = mustCall(); - transform.on("close", closeCb); - const [writeExecution, writeCb] = mustCall(); - write.on("close", writeCb); - - const errorExecutions = [read, transform, write] - .map((stream) => { - const [execution, cb] = mustCall((err?: NodeErrorAbstraction | null) => { - assertEquals(err, new Error("kaboom")); - }); - - stream.on("error", cb); - return execution; - }); - - const [pipelineExecution, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assertEquals(err, new Error("kaboom")); - }, - ); - const dst = pipeline(read, transform, write, pipelineCb); - - assertStrictEquals(dst, write); - - read.push("hello"); - - await readExecution; - await closeExecution; - await writeExecution; - await Promise.all(errorExecutions); - await pipelineExecution; -}); - -Deno.test("Pipeline processes iterators correctly", async () => { - let res = ""; - const w = new Writable({ - write(chunk, _encoding, callback) { - res += chunk; - callback(); - }, - }); - - const [pipelineExecution, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assert(!err); - assertEquals(res, "helloworld"); - }, - ); - pipeline( - function* () { - yield "hello"; - yield "world"; - }(), - w, - pipelineCb, - ); - - await pipelineExecution; -}); - -Deno.test("Pipeline processes async iterators correctly", async () => { - let res = ""; - const w = new Writable({ - write(chunk, _encoding, callback) { - res += chunk; - callback(); - }, - }); - - const [pipelineExecution, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assert(!err); - assertEquals(res, "helloworld"); - }, - ); - pipeline( - async function* () { - await Promise.resolve(); - yield "hello"; - yield "world"; - }(), - w, - pipelineCb, - ); - - await pipelineExecution; -}); - -Deno.test("Pipeline processes generators correctly", async () => { - let res = ""; - const w = new Writable({ - write(chunk, _encoding, callback) { - res += chunk; - callback(); - }, - }); - - const [pipelineExecution, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assert(!err); - assertEquals(res, "helloworld"); - }, - ); - pipeline( - function* () { - yield "hello"; - yield "world"; - }, - w, - pipelineCb, - ); - - await pipelineExecution; -}); - -Deno.test("Pipeline processes async generators correctly", async () => { - let res = ""; - const w = new Writable({ - write(chunk, _encoding, callback) { - res += chunk; - callback(); - }, - }); - - const [pipelineExecution, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assert(!err); - assertEquals(res, "helloworld"); - }, - ); - pipeline( - async function* () { - await Promise.resolve(); - yield "hello"; - yield "world"; - }, - w, - pipelineCb, - ); - - await pipelineExecution; -}); - -Deno.test("Pipeline handles generator transforms", async () => { - let res = ""; - - const [pipelineExecuted, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assert(!err); - assertEquals(res, "HELLOWORLD"); - }, - ); - pipeline( - async function* () { - await Promise.resolve(); - yield "hello"; - yield "world"; - }, - async function* (source: string[]) { - for await (const chunk of source) { - yield chunk.toUpperCase(); - } - }, - async function (source: string[]) { - for await (const chunk of source) { - res += chunk; - } - }, - pipelineCb, - ); - - await pipelineExecuted; -}); - -Deno.test("Pipeline passes result to final callback", async () => { - const [pipelineExecuted, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null, val?: unknown) => { - assert(!err); - assertEquals(val, "HELLOWORLD"); - }, - ); - pipeline( - async function* () { - await Promise.resolve(); - yield "hello"; - yield "world"; - }, - async function* (source: string[]) { - for await (const chunk of source) { - yield chunk.toUpperCase(); - } - }, - async function (source: string[]) { - let ret = ""; - for await (const chunk of source) { - ret += chunk; - } - return ret; - }, - pipelineCb, - ); - - await pipelineExecuted; -}); - -Deno.test("Pipeline returns a stream after ending", async () => { - const [pipelineExecuted, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assertEquals(err, undefined); - }, - ); - const ret = pipeline( - async function* () { - await Promise.resolve(); - yield "hello"; - }, - // deno-lint-ignore require-yield - async function* (source: string[]) { - for await (const chunk of source) { - chunk; - } - }, - pipelineCb, - ); - - ret.resume(); - - assertEquals(typeof ret.pipe, "function"); - - await pipelineExecuted; -}); - -Deno.test("Pipeline returns a stream after erroring", async () => { - const errorText = "kaboom"; - - const [pipelineExecuted, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assertEquals(err?.message, errorText); - }, - ); - const ret = pipeline( - // deno-lint-ignore require-yield - async function* () { - await Promise.resolve(); - throw new Error(errorText); - }, - // deno-lint-ignore require-yield - async function* (source: string[]) { - for await (const chunk of source) { - chunk; - } - }, - pipelineCb, - ); - - ret.resume(); - - assertEquals(typeof ret.pipe, "function"); - - await pipelineExecuted; -}); - -Deno.test("Pipeline destination gets destroyed on error", async () => { - const errorText = "kaboom"; - const s = new PassThrough(); - - const [pipelineExecution, pipelineCb] = mustCall( - (err?: NodeErrorAbstraction | null) => { - assertEquals(err?.message, errorText); - assertEquals(s.destroyed, true); - }, - ); - pipeline( - // deno-lint-ignore require-yield - async function* () { - throw new Error(errorText); - }, - s, - pipelineCb, - ); - - await pipelineExecution; -}); diff --git a/std/node/_stream/promises.ts b/std/node/_stream/promises.ts deleted file mode 100644 index 1adf4ea3f..000000000 --- a/std/node/_stream/promises.ts +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import pl from "./pipeline.ts"; -import type { PipelineArguments } from "./pipeline.ts"; -import eos from "./end_of_stream.ts"; -import type { - FinishedOptions, - StreamImplementations as FinishedStreams, -} from "./end_of_stream.ts"; - -export function pipeline(...streams: PipelineArguments) { - return new Promise((resolve, reject) => { - pl( - ...streams, - (err, value) => { - if (err) { - reject(err); - } else { - resolve(value); - } - }, - ); - }); -} - -export function finished( - stream: FinishedStreams, - opts?: FinishedOptions, -) { - return new Promise<void>((resolve, reject) => { - eos( - stream, - opts || null, - (err) => { - if (err) { - reject(err); - } else { - resolve(); - } - }, - ); - }); -} diff --git a/std/node/_stream/promises_test.ts b/std/node/_stream/promises_test.ts deleted file mode 100644 index 90803b4af..000000000 --- a/std/node/_stream/promises_test.ts +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import Readable from "./readable.ts"; -import Writable from "./writable.ts"; -import { pipeline } from "./promises.ts"; -import { deferred } from "../../async/mod.ts"; -import { - assert, - assertEquals, - assertThrowsAsync, -} from "../../testing/asserts.ts"; - -Deno.test("Promise pipeline works correctly", async () => { - let pipelineExecuted = 0; - const pipelineExecutedExpected = 1; - const pipelineExpectedExecutions = deferred(); - - let finished = false; - // deno-lint-ignore no-explicit-any - const processed: any[] = []; - const expected = [ - Buffer.from("a"), - Buffer.from("b"), - Buffer.from("c"), - ]; - - const read = new Readable({ - read() {}, - }); - - const write = new Writable({ - write(data, _enc, cb) { - processed.push(data); - cb(); - }, - }); - - write.on("finish", () => { - finished = true; - }); - - for (let i = 0; i < expected.length; i++) { - read.push(expected[i]); - } - read.push(null); - - pipeline(read, write).then(() => { - pipelineExecuted++; - if (pipelineExecuted == pipelineExecutedExpected) { - pipelineExpectedExecutions.resolve(); - } - assert(finished); - assertEquals(processed, expected); - }); - - const pipelineTimeout = setTimeout( - () => pipelineExpectedExecutions.reject(), - 1000, - ); - await pipelineExpectedExecutions; - clearTimeout(pipelineTimeout); - assertEquals(pipelineExecuted, pipelineExecutedExpected); -}); - -Deno.test("Promise pipeline throws on readable destroyed", async () => { - const read = new Readable({ - read() {}, - }); - - const write = new Writable({ - write(_data, _enc, cb) { - cb(); - }, - }); - - read.push("data"); - read.destroy(); - - await assertThrowsAsync( - () => pipeline(read, write), - Error, - "Premature close", - ); -}); diff --git a/std/node/_stream/readable.ts b/std/node/_stream/readable.ts deleted file mode 100644 index 54e0d8ecd..000000000 --- a/std/node/_stream/readable.ts +++ /dev/null @@ -1,788 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { captureRejectionSymbol } from "../events.ts"; -import Stream from "./stream.ts"; -import type { Buffer } from "../buffer.ts"; -import BufferList from "./buffer_list.ts"; -import { - ERR_INVALID_OPT_VALUE, - ERR_METHOD_NOT_IMPLEMENTED, -} from "../_errors.ts"; -import type { Encodings } from "../_utils.ts"; -import { StringDecoder } from "../string_decoder.ts"; -import createReadableStreamAsyncIterator from "./async_iterator.ts"; -import streamFrom from "./from.ts"; -import { kDestroy, kPaused } from "./symbols.ts"; -import { - _destroy, - computeNewHighWaterMark, - emitReadable, - endReadable, - errorOrDestroy, - fromList, - howMuchToRead, - nReadingNextTick, - pipeOnDrain, - prependListener, - readableAddChunk, - resume, - updateReadableListening, -} from "./readable_internal.ts"; -import Writable from "./writable.ts"; -import { errorOrDestroy as errorOrDestroyWritable } from "./writable_internal.ts"; -import Duplex, { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts"; - -export interface ReadableOptions { - autoDestroy?: boolean; - defaultEncoding?: Encodings; - destroy?( - this: Readable, - error: Error | null, - callback: (error: Error | null) => void, - ): void; - emitClose?: boolean; - encoding?: Encodings; - highWaterMark?: number; - objectMode?: boolean; - read?(this: Readable): void; -} - -export class ReadableState { - [kPaused]: boolean | null = null; - awaitDrainWriters: Duplex | Writable | Set<Duplex | Writable> | null = null; - buffer = new BufferList(); - closed = false; - closeEmitted = false; - constructed: boolean; - decoder: StringDecoder | null = null; - destroyed = false; - emittedReadable = false; - encoding: Encodings | null = null; - ended = false; - endEmitted = false; - errored: Error | null = null; - errorEmitted = false; - flowing: boolean | null = null; - highWaterMark: number; - length = 0; - multiAwaitDrain = false; - needReadable = false; - objectMode: boolean; - pipes: Array<Duplex | Writable> = []; - readable = true; - readableListening = false; - reading = false; - readingMore = false; - resumeScheduled = false; - sync = true; - emitClose: boolean; - autoDestroy: boolean; - defaultEncoding: string; - - constructor(options?: ReadableOptions) { - this.objectMode = !!options?.objectMode; - - this.highWaterMark = options?.highWaterMark ?? - (this.objectMode ? 16 : 16 * 1024); - if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) { - this.highWaterMark = Math.floor(this.highWaterMark); - } else { - throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark); - } - - this.emitClose = options?.emitClose ?? true; - this.autoDestroy = options?.autoDestroy ?? true; - this.defaultEncoding = options?.defaultEncoding || "utf8"; - - if (options?.encoding) { - this.decoder = new StringDecoder(options.encoding); - this.encoding = options.encoding; - } - - this.constructed = true; - } -} - -class Readable extends Stream { - _readableState: ReadableState; - - constructor(options?: ReadableOptions) { - super(); - if (options) { - if (typeof options.read === "function") { - this._read = options.read; - } - if (typeof options.destroy === "function") { - this._destroy = options.destroy; - } - } - this._readableState = new ReadableState(options); - } - - static from( - // deno-lint-ignore no-explicit-any - iterable: Iterable<any> | AsyncIterable<any>, - opts?: ReadableOptions, - ): Readable { - return streamFrom(iterable, opts); - } - - static ReadableState = ReadableState; - - static _fromList = fromList; - - // You can override either this method, or the async _read(n) below. - read(n?: number) { - // Same as parseInt(undefined, 10), however V8 7.3 performance regressed - // in this scenario, so we are doing it manually. - if (n === undefined) { - n = NaN; - } - const state = this._readableState; - const nOrig = n; - - if (n > state.highWaterMark) { - state.highWaterMark = computeNewHighWaterMark(n); - } - - if (n !== 0) { - state.emittedReadable = false; - } - - if ( - n === 0 && - state.needReadable && - ((state.highWaterMark !== 0 - ? state.length >= state.highWaterMark - : state.length > 0) || - state.ended) - ) { - if (state.length === 0 && state.ended) { - endReadable(this); - } else { - emitReadable(this); - } - return null; - } - - n = howMuchToRead(n, state); - - if (n === 0 && state.ended) { - if (state.length === 0) { - endReadable(this); - } - return null; - } - - let doRead = state.needReadable; - if ( - state.length === 0 || state.length - (n as number) < state.highWaterMark - ) { - doRead = true; - } - - if ( - state.ended || state.reading || state.destroyed || state.errored || - !state.constructed - ) { - doRead = false; - } else if (doRead) { - state.reading = true; - state.sync = true; - if (state.length === 0) { - state.needReadable = true; - } - this._read(); - state.sync = false; - if (!state.reading) { - n = howMuchToRead(nOrig, state); - } - } - - let ret; - if ((n as number) > 0) { - ret = fromList((n as number), state); - } else { - ret = null; - } - - if (ret === null) { - state.needReadable = state.length <= state.highWaterMark; - n = 0; - } else { - state.length -= n as number; - if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Writable>).clear(); - } else { - state.awaitDrainWriters = null; - } - } - - if (state.length === 0) { - if (!state.ended) { - state.needReadable = true; - } - - if (nOrig !== n && state.ended) { - endReadable(this); - } - } - - if (ret !== null) { - this.emit("data", ret); - } - - return ret; - } - - _read(_size?: number) { - throw new ERR_METHOD_NOT_IMPLEMENTED("_read()"); - } - - pipe<T extends Duplex | Writable>(dest: T, pipeOpts?: { end?: boolean }): T { - // deno-lint-ignore no-this-alias - const src = this; - const state = this._readableState; - - if (state.pipes.length === 1) { - if (!state.multiAwaitDrain) { - state.multiAwaitDrain = true; - state.awaitDrainWriters = new Set( - state.awaitDrainWriters ? [state.awaitDrainWriters as Writable] : [], - ); - } - } - - state.pipes.push(dest); - - const doEnd = (!pipeOpts || pipeOpts.end !== false); - - //TODO(Soremwar) - //Part of doEnd condition - //In node, output/input are a duplex Stream - // && - // dest !== stdout && - // dest !== stderr - - const endFn = doEnd ? onend : unpipe; - if (state.endEmitted) { - queueMicrotask(endFn); - } else { - this.once("end", endFn); - } - - dest.on("unpipe", onunpipe); - function onunpipe(readable: Readable, unpipeInfo: { hasUnpiped: boolean }) { - if (readable === src) { - if (unpipeInfo && unpipeInfo.hasUnpiped === false) { - unpipeInfo.hasUnpiped = true; - cleanup(); - } - } - } - - function onend() { - dest.end(); - } - - let ondrain: () => void; - - let cleanedUp = false; - function cleanup() { - dest.removeListener("close", onclose); - dest.removeListener("finish", onfinish); - if (ondrain) { - dest.removeListener("drain", ondrain); - } - dest.removeListener("error", onerror); - dest.removeListener("unpipe", onunpipe); - src.removeListener("end", onend); - src.removeListener("end", unpipe); - src.removeListener("data", ondata); - - cleanedUp = true; - if ( - ondrain && state.awaitDrainWriters && - (!dest._writableState || dest._writableState.needDrain) - ) { - ondrain(); - } - } - - this.on("data", ondata); - // deno-lint-ignore no-explicit-any - function ondata(chunk: any) { - const ret = dest.write(chunk); - if (ret === false) { - if (!cleanedUp) { - if (state.pipes.length === 1 && state.pipes[0] === dest) { - state.awaitDrainWriters = dest; - state.multiAwaitDrain = false; - } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - (state.awaitDrainWriters as Set<Duplex | Writable>).add(dest); - } - src.pause(); - } - if (!ondrain) { - ondrain = pipeOnDrain(src, dest); - dest.on("drain", ondrain); - } - } - } - - function onerror(er: Error) { - unpipe(); - dest.removeListener("error", onerror); - if (dest.listenerCount("error") === 0) { - const s = dest._writableState || (dest as Duplex)._readableState; - if (s && !s.errorEmitted) { - if (dest instanceof Duplex) { - errorOrDestroyDuplex(dest as unknown as Duplex, er); - } else { - errorOrDestroyWritable(dest as Writable, er); - } - } else { - dest.emit("error", er); - } - } - } - - prependListener(dest, "error", onerror); - - function onclose() { - dest.removeListener("finish", onfinish); - unpipe(); - } - dest.once("close", onclose); - function onfinish() { - dest.removeListener("close", onclose); - unpipe(); - } - dest.once("finish", onfinish); - - function unpipe() { - src.unpipe(dest as Writable); - } - - dest.emit("pipe", this); - - if (!state.flowing) { - this.resume(); - } - - return dest; - } - - isPaused() { - return this._readableState[kPaused] === true || - this._readableState.flowing === false; - } - - setEncoding(enc: Encodings) { - const decoder = new StringDecoder(enc); - this._readableState.decoder = decoder; - this._readableState.encoding = this._readableState.decoder - .encoding as Encodings; - - const buffer = this._readableState.buffer; - let content = ""; - for (const data of buffer) { - content += decoder.write(data as Buffer); - } - buffer.clear(); - if (content !== "") { - buffer.push(content); - } - this._readableState.length = content.length; - return this; - } - - on( - event: "close" | "end" | "pause" | "readable" | "resume", - listener: () => void, - ): this; - // deno-lint-ignore no-explicit-any - on(event: "data", listener: (chunk: any) => void): this; - on(event: "error", listener: (err: Error) => void): this; - // deno-lint-ignore no-explicit-any - on(event: string | symbol, listener: (...args: any[]) => void): this; - on( - ev: string | symbol, - fn: - | (() => void) - // deno-lint-ignore no-explicit-any - | ((chunk: any) => void) - | ((err: Error) => void) - // deno-lint-ignore no-explicit-any - | ((...args: any[]) => void), - ) { - const res = super.on.call(this, ev, fn); - const state = this._readableState; - - if (ev === "data") { - state.readableListening = this.listenerCount("readable") > 0; - - if (state.flowing !== false) { - this.resume(); - } - } else if (ev === "readable") { - if (!state.endEmitted && !state.readableListening) { - state.readableListening = state.needReadable = true; - state.flowing = false; - state.emittedReadable = false; - if (state.length) { - emitReadable(this); - } else if (!state.reading) { - queueMicrotask(() => nReadingNextTick(this)); - } - } - } - - return res; - } - - removeListener( - event: "close" | "end" | "pause" | "readable" | "resume", - listener: () => void, - ): this; - // deno-lint-ignore no-explicit-any - removeListener(event: "data", listener: (chunk: any) => void): this; - removeListener(event: "error", listener: (err: Error) => void): this; - removeListener( - event: string | symbol, - // deno-lint-ignore no-explicit-any - listener: (...args: any[]) => void, - ): this; - removeListener( - ev: string | symbol, - fn: - | (() => void) - // deno-lint-ignore no-explicit-any - | ((chunk: any) => void) - | ((err: Error) => void) - // deno-lint-ignore no-explicit-any - | ((...args: any[]) => void), - ) { - const res = super.removeListener.call(this, ev, fn); - - if (ev === "readable") { - queueMicrotask(() => updateReadableListening(this)); - } - - return res; - } - - off = this.removeListener; - - destroy(err?: Error | null, cb?: () => void) { - const r = this._readableState; - - if (r.destroyed) { - if (typeof cb === "function") { - cb(); - } - - return this; - } - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!r.errored) { - r.errored = err; - } - } - - r.destroyed = true; - - // If still constructing then defer calling _destroy. - if (!r.constructed) { - this.once(kDestroy, (er: Error) => { - _destroy(this, err || er, cb); - }); - } else { - _destroy(this, err, cb); - } - - return this; - } - - _undestroy() { - const r = this._readableState; - r.constructed = true; - r.closed = false; - r.closeEmitted = false; - r.destroyed = false; - r.errored = null; - r.errorEmitted = false; - r.reading = false; - r.ended = false; - r.endEmitted = false; - } - - _destroy( - error: Error | null, - callback: (error?: Error | null) => void, - ): void { - callback(error); - } - - [captureRejectionSymbol](err: Error) { - this.destroy(err); - } - - // deno-lint-ignore no-explicit-any - push(chunk: any, encoding?: Encodings): boolean { - return readableAddChunk(this, chunk, encoding, false); - } - - // deno-lint-ignore no-explicit-any - unshift(chunk: any, encoding?: string): boolean { - return readableAddChunk(this, chunk, encoding, true); - } - - unpipe(dest?: Writable): this { - const state = this._readableState; - const unpipeInfo = { hasUnpiped: false }; - - if (state.pipes.length === 0) { - return this; - } - - if (!dest) { - // remove all. - const dests = state.pipes; - state.pipes = []; - this.pause(); - - for (const dest of dests) { - dest.emit("unpipe", this, { hasUnpiped: false }); - } - return this; - } - - const index = state.pipes.indexOf(dest); - if (index === -1) { - return this; - } - - state.pipes.splice(index, 1); - if (state.pipes.length === 0) { - this.pause(); - } - - dest.emit("unpipe", this, unpipeInfo); - - return this; - } - - removeAllListeners( - ev: - | "close" - | "data" - | "end" - | "error" - | "pause" - | "readable" - | "resume" - | symbol - | undefined, - ) { - const res = super.removeAllListeners(ev); - - if (ev === "readable" || ev === undefined) { - queueMicrotask(() => updateReadableListening(this)); - } - - return res; - } - - resume() { - const state = this._readableState; - if (!state.flowing) { - // We flow only if there is no one listening - // for readable, but we still have to call - // resume(). - state.flowing = !state.readableListening; - resume(this, state); - } - state[kPaused] = false; - return this; - } - - pause() { - if (this._readableState.flowing !== false) { - this._readableState.flowing = false; - this.emit("pause"); - } - this._readableState[kPaused] = true; - return this; - } - - /** Wrap an old-style stream as the async data source. */ - wrap(stream: Stream): this { - const state = this._readableState; - let paused = false; - - stream.on("end", () => { - if (state.decoder && !state.ended) { - const chunk = state.decoder.end(); - if (chunk && chunk.length) { - this.push(chunk); - } - } - - this.push(null); - }); - - stream.on("data", (chunk) => { - if (state.decoder) { - chunk = state.decoder.write(chunk); - } - - if (state.objectMode && (chunk === null || chunk === undefined)) { - return; - } else if (!state.objectMode && (!chunk || !chunk.length)) { - return; - } - - const ret = this.push(chunk); - if (!ret) { - paused = true; - // By the time this is triggered, stream will be a readable stream - // deno-lint-ignore ban-ts-comment - // @ts-ignore - stream.pause(); - } - }); - - // TODO(Soremwar) - // There must be a clean way to implement this on TypeScript - // Proxy all the other methods. Important when wrapping filters and duplexes. - for (const i in stream) { - // deno-lint-ignore ban-ts-comment - //@ts-ignore - if (this[i] === undefined && typeof stream[i] === "function") { - // deno-lint-ignore ban-ts-comment - //@ts-ignore - this[i] = function methodWrap(method) { - return function methodWrapReturnFunction() { - // deno-lint-ignore ban-ts-comment - //@ts-ignore - return stream[method].apply(stream); - }; - }(i); - } - } - - stream.on("error", (err) => { - errorOrDestroy(this, err); - }); - - stream.on("close", () => { - this.emit("close"); - }); - - stream.on("destroy", () => { - this.emit("destroy"); - }); - - stream.on("pause", () => { - this.emit("pause"); - }); - - stream.on("resume", () => { - this.emit("resume"); - }); - - this._read = () => { - if (paused) { - paused = false; - // By the time this is triggered, stream will be a readable stream - // deno-lint-ignore ban-ts-comment - //@ts-ignore - stream.resume(); - } - }; - - return this; - } - - [Symbol.asyncIterator]() { - return createReadableStreamAsyncIterator(this); - } - - get readable(): boolean { - return this._readableState?.readable && - !this._readableState?.destroyed && - !this._readableState?.errorEmitted && - !this._readableState?.endEmitted; - } - set readable(val: boolean) { - if (this._readableState) { - this._readableState.readable = val; - } - } - - get readableHighWaterMark(): number { - return this._readableState.highWaterMark; - } - - get readableBuffer() { - return this._readableState && this._readableState.buffer; - } - - get readableFlowing(): boolean | null { - return this._readableState.flowing; - } - - set readableFlowing(state: boolean | null) { - if (this._readableState) { - this._readableState.flowing = state; - } - } - - get readableLength() { - return this._readableState.length; - } - - get readableObjectMode() { - return this._readableState ? this._readableState.objectMode : false; - } - - get readableEncoding() { - return this._readableState ? this._readableState.encoding : null; - } - - get destroyed() { - if (this._readableState === undefined) { - return false; - } - return this._readableState.destroyed; - } - - set destroyed(value: boolean) { - if (!this._readableState) { - return; - } - this._readableState.destroyed = value; - } - - get readableEnded() { - return this._readableState ? this._readableState.endEmitted : false; - } -} - -Object.defineProperties(Readable, { - _readableState: { enumerable: false }, - destroyed: { enumerable: false }, - readableBuffer: { enumerable: false }, - readableEncoding: { enumerable: false }, - readableEnded: { enumerable: false }, - readableFlowing: { enumerable: false }, - readableHighWaterMark: { enumerable: false }, - readableLength: { enumerable: false }, - readableObjectMode: { enumerable: false }, -}); - -export default Readable; diff --git a/std/node/_stream/readable_internal.ts b/std/node/_stream/readable_internal.ts deleted file mode 100644 index 0ef261d4d..000000000 --- a/std/node/_stream/readable_internal.ts +++ /dev/null @@ -1,438 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import type Duplex from "./duplex.ts"; -import type EventEmitter from "../events.ts"; -import type Readable from "./readable.ts"; -import type Writable from "./writable.ts"; -import type { ReadableState } from "./readable.ts"; -import { kPaused } from "./symbols.ts"; -import { - ERR_STREAM_PUSH_AFTER_EOF, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT, -} from "../_errors.ts"; - -export function _destroy( - self: Readable, - err?: Error | null, - cb?: (error?: Error | null) => void, -) { - self._destroy(err || null, (err) => { - const r = (self as Readable)._readableState; - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!r.errored) { - r.errored = err; - } - } - - r.closed = true; - - if (typeof cb === "function") { - cb(err); - } - - if (err) { - queueMicrotask(() => { - if (!r.errorEmitted) { - r.errorEmitted = true; - self.emit("error", err); - } - r.closeEmitted = true; - if (r.emitClose) { - self.emit("close"); - } - }); - } else { - queueMicrotask(() => { - r.closeEmitted = true; - if (r.emitClose) { - self.emit("close"); - } - }); - } - }); -} - -export function addChunk( - stream: Duplex | Readable, - state: ReadableState, - chunk: string | Buffer | Uint8Array, - addToFront: boolean, -) { - if (state.flowing && state.length === 0 && !state.sync) { - if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Writable>).clear(); - } else { - state.awaitDrainWriters = null; - } - stream.emit("data", chunk); - } else { - // Update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - if (addToFront) { - state.buffer.unshift(chunk); - } else { - state.buffer.push(chunk); - } - - if (state.needReadable) { - emitReadable(stream); - } - } - maybeReadMore(stream, state); -} - -// Don't raise the hwm > 1GB. -const MAX_HWM = 0x40000000; -export function computeNewHighWaterMark(n: number) { - if (n >= MAX_HWM) { - n = MAX_HWM; - } else { - n--; - n |= n >>> 1; - n |= n >>> 2; - n |= n >>> 4; - n |= n >>> 8; - n |= n >>> 16; - n++; - } - return n; -} - -export function emitReadable(stream: Duplex | Readable) { - const state = stream._readableState; - state.needReadable = false; - if (!state.emittedReadable) { - state.emittedReadable = true; - queueMicrotask(() => emitReadable_(stream)); - } -} - -function emitReadable_(stream: Duplex | Readable) { - const state = stream._readableState; - if (!state.destroyed && !state.errored && (state.length || state.ended)) { - stream.emit("readable"); - state.emittedReadable = false; - } - - state.needReadable = !state.flowing && - !state.ended && - state.length <= state.highWaterMark; - flow(stream); -} - -export function endReadable(stream: Readable) { - const state = stream._readableState; - - if (!state.endEmitted) { - state.ended = true; - queueMicrotask(() => endReadableNT(state, stream)); - } -} - -function endReadableNT(state: ReadableState, stream: Readable) { - if ( - !state.errorEmitted && !state.closeEmitted && - !state.endEmitted && state.length === 0 - ) { - state.endEmitted = true; - stream.emit("end"); - - if (state.autoDestroy) { - stream.destroy(); - } - } -} - -export function errorOrDestroy( - stream: Duplex | Readable, - err: Error, - sync = false, -) { - const r = stream._readableState; - - if (r.destroyed) { - return stream; - } - - if (r.autoDestroy) { - stream.destroy(err); - } else if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!r.errored) { - r.errored = err; - } - if (sync) { - queueMicrotask(() => { - if (!r.errorEmitted) { - r.errorEmitted = true; - stream.emit("error", err); - } - }); - } else if (!r.errorEmitted) { - r.errorEmitted = true; - stream.emit("error", err); - } - } -} - -function flow(stream: Duplex | Readable) { - const state = stream._readableState; - while (state.flowing && stream.read() !== null); -} - -/** Pluck off n bytes from an array of buffers. -* Length is the combined lengths of all the buffers in the list. -* This function is designed to be inlinable, so please take care when making -* changes to the function body. -*/ -export function fromList(n: number, state: ReadableState) { - // nothing buffered. - if (state.length === 0) { - return null; - } - - let ret; - if (state.objectMode) { - ret = state.buffer.shift(); - } else if (!n || n >= state.length) { - if (state.decoder) { - ret = state.buffer.join(""); - } else if (state.buffer.length === 1) { - ret = state.buffer.first(); - } else { - ret = state.buffer.concat(state.length); - } - state.buffer.clear(); - } else { - ret = state.buffer.consume(n, !!state.decoder); - } - - return ret; -} - -export function howMuchToRead(n: number, state: ReadableState) { - if (n <= 0 || (state.length === 0 && state.ended)) { - return 0; - } - if (state.objectMode) { - return 1; - } - if (Number.isNaN(n)) { - // Only flow one buffer at a time. - if (state.flowing && state.length) { - return state.buffer.first().length; - } - return state.length; - } - if (n <= state.length) { - return n; - } - return state.ended ? state.length : 0; -} - -export function maybeReadMore(stream: Readable, state: ReadableState) { - if (!state.readingMore && state.constructed) { - state.readingMore = true; - queueMicrotask(() => maybeReadMore_(stream, state)); - } -} - -function maybeReadMore_(stream: Readable, state: ReadableState) { - while ( - !state.reading && !state.ended && - (state.length < state.highWaterMark || - (state.flowing && state.length === 0)) - ) { - const len = state.length; - stream.read(0); - if (len === state.length) { - // Didn't get any data, stop spinning. - break; - } - } - state.readingMore = false; -} - -export function nReadingNextTick(self: Duplex | Readable) { - self.read(0); -} - -export function onEofChunk(stream: Duplex | Readable, state: ReadableState) { - if (state.ended) return; - if (state.decoder) { - const chunk = state.decoder.end(); - if (chunk && chunk.length) { - state.buffer.push(chunk); - state.length += state.objectMode ? 1 : chunk.length; - } - } - state.ended = true; - - if (state.sync) { - emitReadable(stream); - } else { - state.needReadable = false; - state.emittedReadable = true; - emitReadable_(stream); - } -} - -export function pipeOnDrain(src: Duplex | Readable, dest: Duplex | Writable) { - return function pipeOnDrainFunctionResult() { - const state = src._readableState; - - if (state.awaitDrainWriters === dest) { - state.awaitDrainWriters = null; - } else if (state.multiAwaitDrain) { - (state.awaitDrainWriters as Set<Duplex | Writable>).delete(dest); - } - - if ( - (!state.awaitDrainWriters || - (state.awaitDrainWriters as Set<Writable>).size === 0) && - src.listenerCount("data") - ) { - state.flowing = true; - flow(src); - } - }; -} - -export function prependListener( - emitter: EventEmitter, - event: string, - // deno-lint-ignore no-explicit-any - fn: (...args: any[]) => any, -) { - if (typeof emitter.prependListener === "function") { - return emitter.prependListener(event, fn); - } - - // This is a hack to make sure that our error handler is attached before any - // userland ones. NEVER DO THIS. This is here only because this code needs - // to continue to work with older versions of Node.js that do not include - //the prependListener() method. The goal is to eventually remove this hack. - // TODO(Soremwar) - // Burn it with fire - // deno-lint-ignore ban-ts-comment - //@ts-ignore - if (emitter._events.get(event)?.length) { - // deno-lint-ignore ban-ts-comment - //@ts-ignore - const listeners = [fn, ...emitter._events.get(event)]; - // deno-lint-ignore ban-ts-comment - //@ts-ignore - emitter._events.set(event, listeners); - } else { - emitter.on(event, fn); - } -} - -export function readableAddChunk( - stream: Duplex | Readable, - chunk: string | Buffer | Uint8Array | null, - encoding: undefined | string = undefined, - addToFront: boolean, -) { - const state = stream._readableState; - let usedEncoding = encoding; - - let err; - if (!state.objectMode) { - if (typeof chunk === "string") { - usedEncoding = encoding || state.defaultEncoding; - if (state.encoding !== usedEncoding) { - if (addToFront && state.encoding) { - chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding); - } else { - chunk = Buffer.from(chunk, usedEncoding); - usedEncoding = ""; - } - } - } else if (chunk instanceof Uint8Array) { - chunk = Buffer.from(chunk); - } - } - - if (err) { - errorOrDestroy(stream, err); - } else if (chunk === null) { - state.reading = false; - onEofChunk(stream, state); - } else if (state.objectMode || (chunk.length > 0)) { - if (addToFront) { - if (state.endEmitted) { - errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); - } else { - addChunk(stream, state, chunk, true); - } - } else if (state.ended) { - errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state.destroyed || state.errored) { - return false; - } else { - state.reading = false; - if (state.decoder && !usedEncoding) { - //TODO(Soremwar) - //I don't think this cast is right - chunk = state.decoder.write(Buffer.from(chunk as Uint8Array)); - if (state.objectMode || chunk.length !== 0) { - addChunk(stream, state, chunk, false); - } else { - maybeReadMore(stream, state); - } - } else { - addChunk(stream, state, chunk, false); - } - } - } else if (!addToFront) { - state.reading = false; - maybeReadMore(stream, state); - } - - return !state.ended && - (state.length < state.highWaterMark || state.length === 0); -} - -export function resume(stream: Duplex | Readable, state: ReadableState) { - if (!state.resumeScheduled) { - state.resumeScheduled = true; - queueMicrotask(() => resume_(stream, state)); - } -} - -function resume_(stream: Duplex | Readable, state: ReadableState) { - if (!state.reading) { - stream.read(0); - } - - state.resumeScheduled = false; - stream.emit("resume"); - flow(stream); - if (state.flowing && !state.reading) { - stream.read(0); - } -} - -export function updateReadableListening(self: Duplex | Readable) { - const state = self._readableState; - state.readableListening = self.listenerCount("readable") > 0; - - if (state.resumeScheduled && state[kPaused] === false) { - // Flowing needs to be set to true now, otherwise - // the upcoming resume will not flow. - state.flowing = true; - - // Crude way to check if we should resume. - } else if (self.listenerCount("data") > 0) { - self.resume(); - } else if (!state.readableListening) { - state.flowing = null; - } -} diff --git a/std/node/_stream/readable_test.ts b/std/node/_stream/readable_test.ts deleted file mode 100644 index 72767e28f..000000000 --- a/std/node/_stream/readable_test.ts +++ /dev/null @@ -1,489 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import Readable from "../_stream/readable.ts"; -import { once } from "../events.ts"; -import { deferred } from "../../async/mod.ts"; -import { - assert, - assertEquals, - assertStrictEquals, -} from "../../testing/asserts.ts"; - -Deno.test("Readable stream from iterator", async () => { - function* generate() { - yield "a"; - yield "b"; - yield "c"; - } - - const stream = Readable.from(generate()); - - const expected = ["a", "b", "c"]; - - for await (const chunk of stream) { - assertStrictEquals(chunk, expected.shift()); - } -}); - -Deno.test("Readable stream from async iterator", async () => { - async function* generate() { - yield "a"; - yield "b"; - yield "c"; - } - - const stream = Readable.from(generate()); - - const expected = ["a", "b", "c"]; - - for await (const chunk of stream) { - assertStrictEquals(chunk, expected.shift()); - } -}); - -Deno.test("Readable stream from promise", async () => { - const promises = [ - Promise.resolve("a"), - Promise.resolve("b"), - Promise.resolve("c"), - ]; - - const stream = Readable.from(promises); - - const expected = ["a", "b", "c"]; - - for await (const chunk of stream) { - assertStrictEquals(chunk, expected.shift()); - } -}); - -Deno.test("Readable stream from string", async () => { - const string = "abc"; - const stream = Readable.from(string); - - for await (const chunk of stream) { - assertStrictEquals(chunk, string); - } -}); - -Deno.test("Readable stream from Buffer", async () => { - const string = "abc"; - const stream = Readable.from(Buffer.from(string)); - - for await (const chunk of stream) { - assertStrictEquals((chunk as Buffer).toString(), string); - } -}); - -Deno.test("Readable stream gets destroyed on error", async () => { - // deno-lint-ignore require-yield - async function* generate() { - throw new Error("kaboom"); - } - - const stream = Readable.from(generate()); - - stream.read(); - - const [err] = await once(stream, "error"); - assertStrictEquals(err.message, "kaboom"); - assertStrictEquals(stream.destroyed, true); -}); - -Deno.test("Readable stream works as Transform stream", async () => { - async function* generate(stream: Readable) { - for await (const chunk of stream) { - yield (chunk as string).toUpperCase(); - } - } - - const source = new Readable({ - objectMode: true, - read() { - this.push("a"); - this.push("b"); - this.push("c"); - this.push(null); - }, - }); - - const stream = Readable.from(generate(source)); - - const expected = ["A", "B", "C"]; - - for await (const chunk of stream) { - assertStrictEquals(chunk, expected.shift()); - } -}); - -Deno.test("Readable stream can be paused", () => { - const readable = new Readable(); - - // _read is a noop, here. - readable._read = () => {}; - - // Default state of a stream is not "paused" - assert(!readable.isPaused()); - - // Make the stream start flowing... - readable.on("data", () => {}); - - // still not paused. - assert(!readable.isPaused()); - - readable.pause(); - assert(readable.isPaused()); - readable.resume(); - assert(!readable.isPaused()); -}); - -Deno.test("Readable stream sets enconding correctly", () => { - const readable = new Readable({ - read() {}, - }); - - readable.setEncoding("utf8"); - - readable.push(new TextEncoder().encode("DEF")); - readable.unshift(new TextEncoder().encode("ABC")); - - assertStrictEquals(readable.read(), "ABCDEF"); -}); - -Deno.test("Readable stream sets encoding correctly", () => { - const readable = new Readable({ - read() {}, - }); - - readable.setEncoding("utf8"); - - readable.push(new TextEncoder().encode("DEF")); - readable.unshift(new TextEncoder().encode("ABC")); - - assertStrictEquals(readable.read(), "ABCDEF"); -}); - -Deno.test("Readable stream holds up a big push", async () => { - let readExecuted = 0; - const readExecutedExpected = 3; - const readExpectedExecutions = deferred(); - - let endExecuted = 0; - const endExecutedExpected = 1; - const endExpectedExecutions = deferred(); - - const str = "asdfasdfasdfasdfasdf"; - - const r = new Readable({ - highWaterMark: 5, - encoding: "utf8", - }); - - let reads = 0; - - function _read() { - if (reads === 0) { - setTimeout(() => { - r.push(str); - }, 1); - reads++; - } else if (reads === 1) { - const ret = r.push(str); - assertEquals(ret, false); - reads++; - } else { - r.push(null); - } - } - - r._read = () => { - readExecuted++; - if (readExecuted == readExecutedExpected) { - readExpectedExecutions.resolve(); - } - _read(); - }; - - r.on("end", () => { - endExecuted++; - if (endExecuted == endExecutedExpected) { - endExpectedExecutions.resolve(); - } - }); - - // Push some data in to start. - // We've never gotten any read event at this point. - const ret = r.push(str); - assert(!ret); - let chunk = r.read(); - assertEquals(chunk, str); - chunk = r.read(); - assertEquals(chunk, null); - - r.once("readable", () => { - // This time, we'll get *all* the remaining data, because - // it's been added synchronously, as the read WOULD take - // us below the hwm, and so it triggered a _read() again, - // which synchronously added more, which we then return. - chunk = r.read(); - assertEquals(chunk, str + str); - - chunk = r.read(); - assertEquals(chunk, null); - }); - - const readTimeout = setTimeout( - () => readExpectedExecutions.reject(), - 1000, - ); - const endTimeout = setTimeout( - () => endExpectedExecutions.reject(), - 1000, - ); - await readExpectedExecutions; - await endExpectedExecutions; - clearTimeout(readTimeout); - clearTimeout(endTimeout); - assertEquals(readExecuted, readExecutedExpected); - assertEquals(endExecuted, endExecutedExpected); -}); - -Deno.test("Readable stream: 'on' event", async () => { - async function* generate() { - yield "a"; - yield "b"; - yield "c"; - } - - const stream = Readable.from(generate()); - - let iterations = 0; - const expected = ["a", "b", "c"]; - - stream.on("data", (chunk) => { - iterations++; - assertStrictEquals(chunk, expected.shift()); - }); - - await once(stream, "end"); - - assertStrictEquals(iterations, 3); -}); - -Deno.test("Readable stream: 'data' event", async () => { - async function* generate() { - yield "a"; - yield "b"; - yield "c"; - } - - const stream = Readable.from(generate(), { objectMode: false }); - - let iterations = 0; - const expected = ["a", "b", "c"]; - - stream.on("data", (chunk) => { - iterations++; - assertStrictEquals(chunk instanceof Buffer, true); - assertStrictEquals(chunk.toString(), expected.shift()); - }); - - await once(stream, "end"); - - assertStrictEquals(iterations, 3); -}); - -Deno.test("Readable stream: 'data' event on non-object", async () => { - async function* generate() { - yield "a"; - yield "b"; - yield "c"; - } - - const stream = Readable.from(generate(), { objectMode: false }); - - let iterations = 0; - const expected = ["a", "b", "c"]; - - stream.on("data", (chunk) => { - iterations++; - assertStrictEquals(chunk instanceof Buffer, true); - assertStrictEquals(chunk.toString(), expected.shift()); - }); - - await once(stream, "end"); - - assertStrictEquals(iterations, 3); -}); - -Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => { - let readableExecuted = 0; - const readableExecutedExpected = 1; - const readableExpectedExecutions = deferred(); - - const r = new Readable({ - highWaterMark: 3, - }); - - r._read = () => { - throw new Error("_read must not be called"); - }; - r.push(Buffer.from("blerg")); - - setTimeout(function () { - assert(!r._readableState.reading); - r.on("readable", () => { - readableExecuted++; - if (readableExecuted == readableExecutedExpected) { - readableExpectedExecutions.resolve(); - } - }); - }, 1); - - const readableTimeout = setTimeout( - () => readableExpectedExecutions.reject(), - 1000, - ); - await readableExpectedExecutions; - clearTimeout(readableTimeout); - assertEquals(readableExecuted, readableExecutedExpected); -}); - -Deno.test("Readable stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => { - let readableExecuted = 0; - const readableExecutedExpected = 1; - const readableExpectedExecutions = deferred(); - - let readExecuted = 0; - const readExecutedExpected = 1; - const readExpectedExecutions = deferred(); - - const r = new Readable({ - highWaterMark: 3, - }); - - r._read = () => { - readExecuted++; - if (readExecuted == readExecutedExpected) { - readExpectedExecutions.resolve(); - } - }; - - r.push(Buffer.from("bl")); - - setTimeout(function () { - assert(r._readableState.reading); - r.on("readable", () => { - readableExecuted++; - if (readableExecuted == readableExecutedExpected) { - readableExpectedExecutions.resolve(); - } - }); - }, 1); - - const readableTimeout = setTimeout( - () => readableExpectedExecutions.reject(), - 1000, - ); - const readTimeout = setTimeout( - () => readExpectedExecutions.reject(), - 1000, - ); - await readableExpectedExecutions; - await readExpectedExecutions; - clearTimeout(readableTimeout); - clearTimeout(readTimeout); - assertEquals(readableExecuted, readableExecutedExpected); - assertEquals(readExecuted, readExecutedExpected); -}); - -Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => { - let readableExecuted = 0; - const readableExecutedExpected = 1; - const readableExpectedExecutions = deferred(); - - const r = new Readable({ - highWaterMark: 30, - }); - - r._read = () => { - throw new Error("Must not be executed"); - }; - - r.push(Buffer.from("blerg")); - //This ends the stream and triggers end - r.push(null); - - setTimeout(function () { - // Assert we're testing what we think we are - assert(!r._readableState.reading); - r.on("readable", () => { - readableExecuted++; - if (readableExecuted == readableExecutedExpected) { - readableExpectedExecutions.resolve(); - } - }); - }, 1); - - const readableTimeout = setTimeout( - () => readableExpectedExecutions.reject(), - 1000, - ); - await readableExpectedExecutions; - clearTimeout(readableTimeout); - assertEquals(readableExecuted, readableExecutedExpected); -}); - -Deno.test("Readable stream: 'read' is emitted on empty string pushed in non-object mode", async () => { - let endExecuted = 0; - const endExecutedExpected = 1; - const endExpectedExecutions = deferred(); - - const underlyingData = ["", "x", "y", "", "z"]; - const expected = underlyingData.filter((data) => data); - const result: unknown[] = []; - - const r = new Readable({ - encoding: "utf8", - }); - r._read = function () { - queueMicrotask(() => { - if (!underlyingData.length) { - this.push(null); - } else { - this.push(underlyingData.shift()); - } - }); - }; - - r.on("readable", () => { - const data = r.read(); - if (data !== null) result.push(data); - }); - - r.on("end", () => { - endExecuted++; - if (endExecuted == endExecutedExpected) { - endExpectedExecutions.resolve(); - } - assertEquals(result, expected); - }); - - const endTimeout = setTimeout( - () => endExpectedExecutions.reject(), - 1000, - ); - await endExpectedExecutions; - clearTimeout(endTimeout); - assertEquals(endExecuted, endExecutedExpected); -}); - -Deno.test("Readable stream: listeners can be removed", () => { - const r = new Readable(); - r._read = () => {}; - r.on("data", () => {}); - - r.removeAllListeners("data"); - - assertEquals(r.eventNames().length, 0); -}); diff --git a/std/node/_stream/stream.ts b/std/node/_stream/stream.ts deleted file mode 100644 index 4daafc77b..000000000 --- a/std/node/_stream/stream.ts +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import EventEmitter from "../events.ts"; -import type Readable from "./readable.ts"; -import type Writable from "./writable.ts"; -import { types } from "../util.ts"; - -class Stream extends EventEmitter { - constructor() { - super(); - } - - static _isUint8Array = types.isUint8Array; - static _uint8ArrayToBuffer = (chunk: Uint8Array) => Buffer.from(chunk); - - pipe(dest: Readable | Writable, options?: { end?: boolean }) { - // deno-lint-ignore no-this-alias - const source = this; - - //TODO(Soremwar) - //isStdio exist on stdin || stdout only, which extend from Duplex - //if (!dest._isStdio && (options?.end ?? true)) { - //Find an alternative to be able to pipe streams to stdin & stdout - //Port them as well? - if (options?.end ?? true) { - source.on("end", onend); - source.on("close", onclose); - } - - let didOnEnd = false; - function onend() { - if (didOnEnd) return; - didOnEnd = true; - - // 'end' is only called on Writable streams - (dest as Writable).end(); - } - - function onclose() { - if (didOnEnd) return; - didOnEnd = true; - - if (typeof dest.destroy === "function") dest.destroy(); - } - - // Don't leave dangling pipes when there are errors. - function onerror(this: Stream, er: Error) { - cleanup(); - if (this.listenerCount("error") === 0) { - throw er; // Unhandled stream error in pipe. - } - } - - source.on("error", onerror); - dest.on("error", onerror); - - // Remove all the event listeners that were added. - function cleanup() { - source.removeListener("end", onend); - source.removeListener("close", onclose); - - source.removeListener("error", onerror); - dest.removeListener("error", onerror); - - source.removeListener("end", cleanup); - source.removeListener("close", cleanup); - - dest.removeListener("close", cleanup); - } - - source.on("end", cleanup); - source.on("close", cleanup); - - dest.on("close", cleanup); - dest.emit("pipe", source); - - return dest; - } -} - -export default Stream; diff --git a/std/node/_stream/symbols.ts b/std/node/_stream/symbols.ts deleted file mode 100644 index addb969d3..000000000 --- a/std/node/_stream/symbols.ts +++ /dev/null @@ -1,4 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -export const kConstruct = Symbol("kConstruct"); -export const kDestroy = Symbol("kDestroy"); -export const kPaused = Symbol("kPaused"); diff --git a/std/node/_stream/transform.ts b/std/node/_stream/transform.ts deleted file mode 100644 index a4246e81a..000000000 --- a/std/node/_stream/transform.ts +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Encodings } from "../_utils.ts"; -import Duplex from "./duplex.ts"; -import type { DuplexOptions } from "./duplex.ts"; -import type { writeV } from "./writable_internal.ts"; -import { ERR_METHOD_NOT_IMPLEMENTED } from "../_errors.ts"; - -const kCallback = Symbol("kCallback"); - -type TransformFlush = ( - this: Transform, - // deno-lint-ignore no-explicit-any - callback: (error?: Error | null, data?: any) => void, -) => void; - -export interface TransformOptions extends DuplexOptions { - read?(this: Transform, size: number): void; - write?( - this: Transform, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: Encodings, - callback: (error?: Error | null) => void, - ): void; - writev?: writeV; - final?(this: Transform, callback: (error?: Error | null) => void): void; - destroy?( - this: Transform, - error: Error | null, - callback: (error: Error | null) => void, - ): void; - transform?( - this: Transform, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: Encodings, - // deno-lint-ignore no-explicit-any - callback: (error?: Error | null, data?: any) => void, - ): void; - flush?: TransformFlush; -} - -export default class Transform extends Duplex { - [kCallback]: null | ((error?: Error | null) => void); - _flush?: TransformFlush; - - constructor(options?: TransformOptions) { - super(options); - this._readableState.sync = false; - - this[kCallback] = null; - - if (options) { - if (typeof options.transform === "function") { - this._transform = options.transform; - } - - if (typeof options.flush === "function") { - this._flush = options.flush; - } - } - - this.on("prefinish", function (this: Transform) { - if (typeof this._flush === "function" && !this.destroyed) { - this._flush((er, data) => { - if (er) { - this.destroy(er); - return; - } - - if (data != null) { - this.push(data); - } - this.push(null); - }); - } else { - this.push(null); - } - }); - } - - _read = () => { - if (this[kCallback]) { - const callback = this[kCallback] as (error?: Error | null) => void; - this[kCallback] = null; - callback(); - } - }; - - _transform( - // deno-lint-ignore no-explicit-any - _chunk: any, - _encoding: string, - // deno-lint-ignore no-explicit-any - _callback: (error?: Error | null, data?: any) => void, - ) { - throw new ERR_METHOD_NOT_IMPLEMENTED("_transform()"); - } - - _write = ( - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - callback: (error?: Error | null) => void, - ) => { - const rState = this._readableState; - const wState = this._writableState; - const length = rState.length; - - this._transform(chunk, encoding, (err, val) => { - if (err) { - callback(err); - return; - } - - if (val != null) { - this.push(val); - } - - if ( - wState.ended || // Backwards compat. - length === rState.length || // Backwards compat. - rState.length < rState.highWaterMark || - rState.length === 0 - ) { - callback(); - } else { - this[kCallback] = callback; - } - }); - }; -} diff --git a/std/node/_stream/transform_test.ts b/std/node/_stream/transform_test.ts deleted file mode 100644 index d3b90ff01..000000000 --- a/std/node/_stream/transform_test.ts +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import Transform from "./transform.ts"; -import finished from "./end_of_stream.ts"; -import { deferred } from "../../async/mod.ts"; -import { assert, assertEquals } from "../../testing/asserts.ts"; - -Deno.test("Transform stream finishes correctly", async () => { - let finishedExecuted = 0; - const finishedExecutedExpected = 1; - const finishedExecution = deferred(); - - const tr = new Transform({ - transform(_data, _enc, cb) { - cb(); - }, - }); - - let finish = false; - let ended = false; - - tr.on("end", () => { - ended = true; - }); - - tr.on("finish", () => { - finish = true; - }); - - finished(tr, (err) => { - finishedExecuted++; - if (finishedExecuted === finishedExecutedExpected) { - finishedExecution.resolve(); - } - assert(!err, "no error"); - assert(finish); - assert(ended); - }); - - tr.end(); - tr.resume(); - - const finishedTimeout = setTimeout( - () => finishedExecution.reject(), - 1000, - ); - await finishedExecution; - clearTimeout(finishedTimeout); - assertEquals(finishedExecuted, finishedExecutedExpected); -}); - -Deno.test("Transform stream flushes data correctly", () => { - const expected = "asdf"; - - const t = new Transform({ - transform: (_d, _e, n) => { - n(); - }, - flush: (n) => { - n(null, expected); - }, - }); - - t.end(Buffer.from("blerg")); - t.on("data", (data) => { - assertEquals(data.toString(), expected); - }); -}); diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts deleted file mode 100644 index 534fc22fb..000000000 --- a/std/node/_stream/writable.ts +++ /dev/null @@ -1,443 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import Stream from "./stream.ts"; -import { captureRejectionSymbol } from "../events.ts"; -import { - ERR_INVALID_ARG_TYPE, - ERR_INVALID_OPT_VALUE, - ERR_METHOD_NOT_IMPLEMENTED, - ERR_STREAM_ALREADY_FINISHED, - ERR_STREAM_CANNOT_PIPE, - ERR_STREAM_DESTROYED, - ERR_STREAM_NULL_VALUES, - ERR_STREAM_WRITE_AFTER_END, - ERR_UNKNOWN_ENCODING, -} from "../_errors.ts"; -import type { AfterWriteTick, writeV } from "./writable_internal.ts"; -import { - clearBuffer, - destroy, - errorBuffer, - errorOrDestroy, - finishMaybe, - kOnFinished, - nop, - onwrite, - resetBuffer, - writeOrBuffer, -} from "./writable_internal.ts"; -import type { Encodings } from "../_utils.ts"; - -type WritableEncodings = Encodings | "buffer"; - -export interface WritableOptions { - autoDestroy?: boolean; - decodeStrings?: boolean; - defaultEncoding?: WritableEncodings; - destroy?( - this: Writable, - error: Error | null, - callback: (error: Error | null) => void, - ): void; - emitClose?: boolean; - final?(this: Writable, callback: (error?: Error | null) => void): void; - highWaterMark?: number; - objectMode?: boolean; - write?( - this: Writable, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: WritableEncodings, - callback: (error?: Error | null) => void, - ): void; - writev?( - this: Writable, - // deno-lint-ignore no-explicit-any - chunks: Array<{ chunk: any; encoding: string }>, - callback: (error?: Error | null) => void, - ): void; -} - -export class WritableState { - [kOnFinished]: Array<(error?: Error) => void> = []; - afterWriteTickInfo: null | AfterWriteTick = null; - allBuffers = true; - allNoop = true; - autoDestroy: boolean; - buffered: Array<{ - allBuffers?: boolean; - // deno-lint-ignore no-explicit-any - chunk: any; - encoding: string; - callback: (error: Error) => void; - }> = []; - bufferedIndex = 0; - bufferProcessing = false; - closed = false; - closeEmitted = false; - constructed: boolean; - corked = 0; - decodeStrings: boolean; - defaultEncoding: WritableEncodings; - destroyed = false; - emitClose: boolean; - ended = false; - ending = false; - errored: Error | null = null; - errorEmitted = false; - finalCalled = false; - finished = false; - highWaterMark: number; - length = 0; - needDrain = false; - objectMode: boolean; - onwrite: (error?: Error | null) => void; - pendingcb = 0; - prefinished = false; - sync = true; - writecb: null | ((error: Error) => void) = null; - writable = true; - writelen = 0; - writing = false; - - constructor(options: WritableOptions | undefined, stream: Writable) { - this.objectMode = !!options?.objectMode; - - this.highWaterMark = options?.highWaterMark ?? - (this.objectMode ? 16 : 16 * 1024); - - if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) { - this.highWaterMark = Math.floor(this.highWaterMark); - } else { - throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark); - } - - this.decodeStrings = !options?.decodeStrings === false; - - this.defaultEncoding = options?.defaultEncoding || "utf8"; - - this.onwrite = onwrite.bind(undefined, stream); - - resetBuffer(this); - - this.emitClose = options?.emitClose ?? true; - this.autoDestroy = options?.autoDestroy ?? true; - this.constructed = true; - } - - getBuffer() { - return this.buffered.slice(this.bufferedIndex); - } - - get bufferedRequestCount() { - return this.buffered.length - this.bufferedIndex; - } -} - -/** A bit simpler than readable streams. -* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all -* the drain event emission and buffering. -*/ -class Writable extends Stream { - _final?: ( - this: Writable, - callback: (error?: Error | null | undefined) => void, - ) => void; - _writableState: WritableState; - _writev?: writeV | null = null; - - constructor(options?: WritableOptions) { - super(); - this._writableState = new WritableState(options, this); - - if (options) { - if (typeof options.write === "function") { - this._write = options.write; - } - - if (typeof options.writev === "function") { - this._writev = options.writev; - } - - if (typeof options.destroy === "function") { - this._destroy = options.destroy; - } - - if (typeof options.final === "function") { - this._final = options.final; - } - } - } - - [captureRejectionSymbol](err?: Error) { - this.destroy(err); - } - - static WritableState = WritableState; - - get destroyed() { - return this._writableState ? this._writableState.destroyed : false; - } - - set destroyed(value) { - if (this._writableState) { - this._writableState.destroyed = value; - } - } - - get writable() { - const w = this._writableState; - return !w.destroyed && !w.errored && !w.ending && !w.ended; - } - - set writable(val) { - if (this._writableState) { - this._writableState.writable = !!val; - } - } - - get writableFinished() { - return this._writableState ? this._writableState.finished : false; - } - - get writableObjectMode() { - return this._writableState ? this._writableState.objectMode : false; - } - - get writableBuffer() { - return this._writableState && this._writableState.getBuffer(); - } - - get writableEnded() { - return this._writableState ? this._writableState.ending : false; - } - - get writableHighWaterMark() { - return this._writableState && this._writableState.highWaterMark; - } - - get writableCorked() { - return this._writableState ? this._writableState.corked : 0; - } - - get writableLength() { - return this._writableState && this._writableState.length; - } - - _undestroy() { - const w = this._writableState; - w.constructed = true; - w.destroyed = false; - w.closed = false; - w.closeEmitted = false; - w.errored = null; - w.errorEmitted = false; - w.ended = false; - w.ending = false; - w.finalCalled = false; - w.prefinished = false; - w.finished = false; - } - - _destroy(err: Error | null, cb: (error?: Error | null) => void) { - cb(err); - } - - destroy(err?: Error | null, cb?: () => void) { - const state = this._writableState; - if (!state.destroyed) { - queueMicrotask(() => errorBuffer(state)); - } - destroy.call(this, err, cb); - return this; - } - - end(cb?: () => void): void; - // deno-lint-ignore no-explicit-any - end(chunk: any, cb?: () => void): void; - // deno-lint-ignore no-explicit-any - end(chunk: any, encoding: WritableEncodings, cb?: () => void): void; - - end( - // deno-lint-ignore no-explicit-any - x?: any | (() => void), - y?: WritableEncodings | (() => void), - z?: () => void, - ) { - const state = this._writableState; - // deno-lint-ignore no-explicit-any - let chunk: any | null; - let encoding: WritableEncodings | null; - let cb: undefined | ((error?: Error) => void); - - if (typeof x === "function") { - chunk = null; - encoding = null; - cb = x; - } else if (typeof y === "function") { - chunk = x; - encoding = null; - cb = y; - } else { - chunk = x; - encoding = y as WritableEncodings; - cb = z; - } - - if (chunk !== null && chunk !== undefined) { - this.write(chunk, encoding); - } - - if (state.corked) { - state.corked = 1; - this.uncork(); - } - - let err: Error | undefined; - if (!state.errored && !state.ending) { - state.ending = true; - finishMaybe(this, state, true); - state.ended = true; - } else if (state.finished) { - err = new ERR_STREAM_ALREADY_FINISHED("end"); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED("end"); - } - - if (typeof cb === "function") { - if (err || state.finished) { - queueMicrotask(() => { - (cb as (error?: Error | undefined) => void)(err); - }); - } else { - state[kOnFinished].push(cb); - } - } - - return this; - } - - _write( - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - cb: (error?: Error | null) => void, - ): void { - if (this._writev) { - this._writev([{ chunk, encoding }], cb); - } else { - throw new ERR_METHOD_NOT_IMPLEMENTED("_write()"); - } - } - - //This signature was changed to keep inheritance coherent - pipe(dest: Writable): Writable { - errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); - return dest; - } - - // deno-lint-ignore no-explicit-any - write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean; - write( - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: WritableEncodings | null, - cb?: (error: Error | null | undefined) => void, - ): boolean; - - write( - // deno-lint-ignore no-explicit-any - chunk: any, - x?: WritableEncodings | null | ((error: Error | null | undefined) => void), - y?: ((error: Error | null | undefined) => void), - ) { - const state = this._writableState; - let encoding: WritableEncodings; - let cb: (error?: Error | null) => void; - - if (typeof x === "function") { - cb = x; - encoding = state.defaultEncoding; - } else { - if (!x) { - encoding = state.defaultEncoding; - } else if (x !== "buffer" && !Buffer.isEncoding(x)) { - throw new ERR_UNKNOWN_ENCODING(x); - } else { - encoding = x; - } - if (typeof y !== "function") { - cb = nop; - } else { - cb = y; - } - } - - if (chunk === null) { - throw new ERR_STREAM_NULL_VALUES(); - } else if (!state.objectMode) { - if (typeof chunk === "string") { - if (state.decodeStrings !== false) { - chunk = Buffer.from(chunk, encoding); - encoding = "buffer"; - } - } else if (chunk instanceof Buffer) { - encoding = "buffer"; - } else if (Stream._isUint8Array(chunk)) { - chunk = Stream._uint8ArrayToBuffer(chunk); - encoding = "buffer"; - } else { - throw new ERR_INVALID_ARG_TYPE( - "chunk", - ["string", "Buffer", "Uint8Array"], - chunk, - ); - } - } - - let err: Error | undefined; - if (state.ending) { - err = new ERR_STREAM_WRITE_AFTER_END(); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED("write"); - } - - if (err) { - queueMicrotask(() => cb(err)); - errorOrDestroy(this, err, true); - return false; - } - state.pendingcb++; - return writeOrBuffer(this, state, chunk, encoding, cb); - } - - cork() { - this._writableState.corked++; - } - - uncork() { - const state = this._writableState; - - if (state.corked) { - state.corked--; - - if (!state.writing) { - clearBuffer(this, state); - } - } - } - - setDefaultEncoding(encoding: string) { - // node::ParseEncoding() requires lower case. - if (typeof encoding === "string") { - encoding = encoding.toLowerCase(); - } - if (!Buffer.isEncoding(encoding)) { - throw new ERR_UNKNOWN_ENCODING(encoding); - } - this._writableState.defaultEncoding = encoding as WritableEncodings; - return this; - } -} - -export default Writable; diff --git a/std/node/_stream/writable_internal.ts b/std/node/_stream/writable_internal.ts deleted file mode 100644 index e8c001af0..000000000 --- a/std/node/_stream/writable_internal.ts +++ /dev/null @@ -1,457 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import type Duplex from "./duplex.ts"; -import type Writable from "./writable.ts"; -import type { WritableState } from "./writable.ts"; -import { kDestroy } from "./symbols.ts"; -import { ERR_MULTIPLE_CALLBACK, ERR_STREAM_DESTROYED } from "../_errors.ts"; - -export type writeV = ( - // deno-lint-ignore no-explicit-any - chunks: Array<{ chunk: any; encoding: string }>, - callback: (error?: Error | null) => void, -) => void; - -export type AfterWriteTick = { - cb: (error?: Error) => void; - count: number; - state: WritableState; - stream: Writable; -}; - -export const kOnFinished = Symbol("kOnFinished"); - -function _destroy( - self: Writable, - err?: Error | null, - cb?: (error?: Error | null) => void, -) { - self._destroy(err || null, (err) => { - const w = self._writableState; - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - } - - w.closed = true; - - if (typeof cb === "function") { - cb(err); - } - - if (err) { - queueMicrotask(() => { - if (!w.errorEmitted) { - w.errorEmitted = true; - self.emit("error", err); - } - w.closeEmitted = true; - if (w.emitClose) { - self.emit("close"); - } - }); - } else { - queueMicrotask(() => { - w.closeEmitted = true; - if (w.emitClose) { - self.emit("close"); - } - }); - } - }); -} - -export function afterWrite( - stream: Writable, - state: WritableState, - count: number, - cb: (error?: Error) => void, -) { - const needDrain = !state.ending && !stream.destroyed && state.length === 0 && - state.needDrain; - if (needDrain) { - state.needDrain = false; - stream.emit("drain"); - } - - while (count-- > 0) { - state.pendingcb--; - cb(); - } - - if (state.destroyed) { - errorBuffer(state); - } - - finishMaybe(stream, state); -} - -export function afterWriteTick({ - cb, - count, - state, - stream, -}: AfterWriteTick) { - state.afterWriteTickInfo = null; - return afterWrite(stream, state, count, cb); -} - -/** If there's something in the buffer waiting, then process it.*/ -export function clearBuffer(stream: Duplex | Writable, state: WritableState) { - if ( - state.corked || - state.bufferProcessing || - state.destroyed || - !state.constructed - ) { - return; - } - - const { buffered, bufferedIndex, objectMode } = state; - const bufferedLength = buffered.length - bufferedIndex; - - if (!bufferedLength) { - return; - } - - const i = bufferedIndex; - - state.bufferProcessing = true; - if (bufferedLength > 1 && stream._writev) { - state.pendingcb -= bufferedLength - 1; - - const callback = state.allNoop ? nop : (err: Error) => { - for (let n = i; n < buffered.length; ++n) { - buffered[n].callback(err); - } - }; - const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); - - doWrite(stream, state, true, state.length, chunks, "", callback); - - resetBuffer(state); - } else { - do { - const { chunk, encoding, callback } = buffered[i]; - const len = objectMode ? 1 : chunk.length; - doWrite(stream, state, false, len, chunk, encoding, callback); - } while (i < buffered.length && !state.writing); - - if (i === buffered.length) { - resetBuffer(state); - } else if (i > 256) { - buffered.splice(0, i); - state.bufferedIndex = 0; - } else { - state.bufferedIndex = i; - } - } - state.bufferProcessing = false; -} - -export function destroy(this: Writable, err?: Error | null, cb?: () => void) { - const w = this._writableState; - - if (w.destroyed) { - if (typeof cb === "function") { - cb(); - } - - return this; - } - - if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - } - - w.destroyed = true; - - if (!w.constructed) { - this.once(kDestroy, (er) => { - _destroy(this, err || er, cb); - }); - } else { - _destroy(this, err, cb); - } - - return this; -} - -function doWrite( - stream: Duplex | Writable, - state: WritableState, - writev: boolean, - len: number, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - cb: (error: Error) => void, -) { - state.writelen = len; - state.writecb = cb; - state.writing = true; - state.sync = true; - if (state.destroyed) { - state.onwrite(new ERR_STREAM_DESTROYED("write")); - } else if (writev) { - (stream._writev as unknown as writeV)(chunk, state.onwrite); - } else { - stream._write(chunk, encoding, state.onwrite); - } - state.sync = false; -} - -/** If there's something in the buffer waiting, then invoke callbacks.*/ -export function errorBuffer(state: WritableState) { - if (state.writing) { - return; - } - - for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { - const { chunk, callback } = state.buffered[n]; - const len = state.objectMode ? 1 : chunk.length; - state.length -= len; - callback(new ERR_STREAM_DESTROYED("write")); - } - - for (const callback of state[kOnFinished].splice(0)) { - callback(new ERR_STREAM_DESTROYED("end")); - } - - resetBuffer(state); -} - -export function errorOrDestroy(stream: Writable, err: Error, sync = false) { - const w = stream._writableState; - - if (w.destroyed) { - return stream; - } - - if (w.autoDestroy) { - stream.destroy(err); - } else if (err) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; - - if (!w.errored) { - w.errored = err; - } - if (sync) { - queueMicrotask(() => { - if (w.errorEmitted) { - return; - } - w.errorEmitted = true; - stream.emit("error", err); - }); - } else { - if (w.errorEmitted) { - return; - } - w.errorEmitted = true; - stream.emit("error", err); - } - } -} - -function finish(stream: Writable, state: WritableState) { - state.pendingcb--; - if (state.errorEmitted || state.closeEmitted) { - return; - } - - state.finished = true; - - for (const callback of state[kOnFinished].splice(0)) { - callback(); - } - - stream.emit("finish"); - - if (state.autoDestroy) { - stream.destroy(); - } -} - -export function finishMaybe( - stream: Writable, - state: WritableState, - sync?: boolean, -) { - if (needFinish(state)) { - prefinish(stream, state); - if (state.pendingcb === 0 && needFinish(state)) { - state.pendingcb++; - if (sync) { - queueMicrotask(() => finish(stream, state)); - } else { - finish(stream, state); - } - } - } -} - -export function needFinish(state: WritableState) { - return (state.ending && - state.constructed && - state.length === 0 && - !state.errored && - state.buffered.length === 0 && - !state.finished && - !state.writing); -} - -export function nop() {} - -export function resetBuffer(state: WritableState) { - state.buffered = []; - state.bufferedIndex = 0; - state.allBuffers = true; - state.allNoop = true; -} - -function onwriteError( - stream: Writable, - state: WritableState, - er: Error, - cb: (error: Error) => void, -) { - --state.pendingcb; - - cb(er); - errorBuffer(state); - errorOrDestroy(stream, er); -} - -export function onwrite(stream: Writable, er?: Error | null) { - const state = stream._writableState; - const sync = state.sync; - const cb = state.writecb; - - if (typeof cb !== "function") { - errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); - return; - } - - state.writing = false; - state.writecb = null; - state.length -= state.writelen; - state.writelen = 0; - - if (er) { - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - er.stack; - - if (!state.errored) { - state.errored = er; - } - - if (sync) { - queueMicrotask(() => onwriteError(stream, state, er, cb)); - } else { - onwriteError(stream, state, er, cb); - } - } else { - if (state.buffered.length > state.bufferedIndex) { - clearBuffer(stream, state); - } - - if (sync) { - if ( - state.afterWriteTickInfo !== null && - state.afterWriteTickInfo.cb === cb - ) { - state.afterWriteTickInfo.count++; - } else { - state.afterWriteTickInfo = { - count: 1, - cb: (cb as (error?: Error) => void), - stream, - state, - }; - queueMicrotask(() => - afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) - ); - } - } else { - afterWrite(stream, state, 1, cb as (error?: Error) => void); - } - } -} - -export function prefinish(stream: Writable, state: WritableState) { - if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === "function" && !state.destroyed) { - state.finalCalled = true; - - state.sync = true; - state.pendingcb++; - stream._final((err) => { - state.pendingcb--; - if (err) { - for (const callback of state[kOnFinished].splice(0)) { - callback(err); - } - errorOrDestroy(stream, err, state.sync); - } else if (needFinish(state)) { - state.prefinished = true; - stream.emit("prefinish"); - state.pendingcb++; - queueMicrotask(() => finish(stream, state)); - } - }); - state.sync = false; - } else { - state.prefinished = true; - stream.emit("prefinish"); - } - } -} - -export function writeOrBuffer( - stream: Duplex | Writable, - state: WritableState, - // deno-lint-ignore no-explicit-any - chunk: any, - encoding: string, - callback: (error: Error) => void, -) { - const len = state.objectMode ? 1 : chunk.length; - - state.length += len; - - if (state.writing || state.corked || state.errored || !state.constructed) { - state.buffered.push({ chunk, encoding, callback }); - if (state.allBuffers && encoding !== "buffer") { - state.allBuffers = false; - } - if (state.allNoop && callback !== nop) { - state.allNoop = false; - } - } else { - state.writelen = len; - state.writecb = callback; - state.writing = true; - state.sync = true; - stream._write(chunk, encoding, state.onwrite); - state.sync = false; - } - - const ret = state.length < state.highWaterMark; - - if (!ret) { - state.needDrain = true; - } - - return ret && !state.errored && !state.destroyed; -} diff --git a/std/node/_stream/writable_test.ts b/std/node/_stream/writable_test.ts deleted file mode 100644 index d6133b65f..000000000 --- a/std/node/_stream/writable_test.ts +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright Node.js contributors. All rights reserved. MIT License. -import { Buffer } from "../buffer.ts"; -import finished from "./end_of_stream.ts"; -import Writable from "../_stream/writable.ts"; -import { deferred } from "../../async/mod.ts"; -import { - assert, - assertEquals, - assertStrictEquals, - assertThrows, -} from "../../testing/asserts.ts"; - -Deno.test("Writable stream writes correctly", async () => { - let callback: undefined | ((error?: Error | null | undefined) => void); - - let writeExecuted = 0; - const writeExecutedExpected = 1; - const writeExpectedExecutions = deferred(); - - let writevExecuted = 0; - const writevExecutedExpected = 1; - const writevExpectedExecutions = deferred(); - - const writable = new Writable({ - write: (chunk, encoding, cb) => { - writeExecuted++; - if (writeExecuted == writeExecutedExpected) { - writeExpectedExecutions.resolve(); - } - assert(chunk instanceof Buffer); - assertStrictEquals(encoding, "buffer"); - assertStrictEquals(String(chunk), "ABC"); - callback = cb; - }, - writev: (chunks) => { - writevExecuted++; - if (writevExecuted == writevExecutedExpected) { - writevExpectedExecutions.resolve(); - } - assertStrictEquals(chunks.length, 2); - assertStrictEquals(chunks[0].encoding, "buffer"); - assertStrictEquals(chunks[1].encoding, "buffer"); - assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI"); - }, - }); - - writable.write(new TextEncoder().encode("ABC")); - writable.write(new TextEncoder().encode("DEF")); - writable.end(new TextEncoder().encode("GHI")); - callback?.(); - - const writeTimeout = setTimeout( - () => writeExpectedExecutions.reject(), - 1000, - ); - const writevTimeout = setTimeout( - () => writevExpectedExecutions.reject(), - 1000, - ); - await writeExpectedExecutions; - await writevExpectedExecutions; - clearTimeout(writeTimeout); - clearTimeout(writevTimeout); - assertEquals(writeExecuted, writeExecutedExpected); - assertEquals(writevExecuted, writevExecutedExpected); -}); - -Deno.test("Writable stream writes Uint8Array in object mode", async () => { - let writeExecuted = 0; - const writeExecutedExpected = 1; - const writeExpectedExecutions = deferred(); - - const ABC = new TextEncoder().encode("ABC"); - - const writable = new Writable({ - objectMode: true, - write: (chunk, encoding, cb) => { - writeExecuted++; - if (writeExecuted == writeExecutedExpected) { - writeExpectedExecutions.resolve(); - } - assert(!(chunk instanceof Buffer)); - assert(chunk instanceof Uint8Array); - assertEquals(chunk, ABC); - assertEquals(encoding, "utf8"); - cb(); - }, - }); - - writable.end(ABC); - - const writeTimeout = setTimeout( - () => writeExpectedExecutions.reject(), - 1000, - ); - await writeExpectedExecutions; - clearTimeout(writeTimeout); - assertEquals(writeExecuted, writeExecutedExpected); -}); - -Deno.test("Writable stream throws on unexpected close", async () => { - let finishedExecuted = 0; - const finishedExecutedExpected = 1; - const finishedExpectedExecutions = deferred(); - - const writable = new Writable({ - write: () => {}, - }); - writable.writable = false; - writable.destroy(); - - finished(writable, (err) => { - finishedExecuted++; - if (finishedExecuted == finishedExecutedExpected) { - finishedExpectedExecutions.resolve(); - } - assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE"); - }); - - const finishedTimeout = setTimeout( - () => finishedExpectedExecutions.reject(), - 1000, - ); - await finishedExpectedExecutions; - clearTimeout(finishedTimeout); - assertEquals(finishedExecuted, finishedExecutedExpected); -}); - -Deno.test("Writable stream finishes correctly", async () => { - let finishedExecuted = 0; - const finishedExecutedExpected = 1; - const finishedExpectedExecutions = deferred(); - - const w = new Writable({ - write(_chunk, _encoding, cb) { - cb(); - }, - autoDestroy: false, - }); - - w.end("asd"); - - queueMicrotask(() => { - finished(w, () => { - finishedExecuted++; - if (finishedExecuted == finishedExecutedExpected) { - finishedExpectedExecutions.resolve(); - } - }); - }); - - const finishedTimeout = setTimeout( - () => finishedExpectedExecutions.reject(), - 1000, - ); - await finishedExpectedExecutions; - clearTimeout(finishedTimeout); - assertEquals(finishedExecuted, finishedExecutedExpected); -}); - -Deno.test("Writable stream finishes correctly after error", async () => { - let errorExecuted = 0; - const errorExecutedExpected = 1; - const errorExpectedExecutions = deferred(); - - let finishedExecuted = 0; - const finishedExecutedExpected = 1; - const finishedExpectedExecutions = deferred(); - - const w = new Writable({ - write(_chunk, _encoding, cb) { - cb(new Error()); - }, - autoDestroy: false, - }); - w.write("asd"); - w.on("error", () => { - errorExecuted++; - if (errorExecuted == errorExecutedExpected) { - errorExpectedExecutions.resolve(); - } - finished(w, () => { - finishedExecuted++; - if (finishedExecuted == finishedExecutedExpected) { - finishedExpectedExecutions.resolve(); - } - }); - }); - - const errorTimeout = setTimeout( - () => errorExpectedExecutions.reject(), - 1000, - ); - const finishedTimeout = setTimeout( - () => finishedExpectedExecutions.reject(), - 1000, - ); - await finishedExpectedExecutions; - await errorExpectedExecutions; - clearTimeout(finishedTimeout); - clearTimeout(errorTimeout); - assertEquals(finishedExecuted, finishedExecutedExpected); - assertEquals(errorExecuted, errorExecutedExpected); -}); - -Deno.test("Writable stream fails on 'write' null value", () => { - const writable = new Writable(); - assertThrows(() => writable.write(null)); -}); |