summaryrefslogtreecommitdiff
path: root/std/node/_stream
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream')
-rw-r--r--std/node/_stream/async_iterator.ts243
-rw-r--r--std/node/_stream/async_iterator_test.ts249
-rw-r--r--std/node/_stream/buffer_list.ts183
-rw-r--r--std/node/_stream/destroy.ts38
-rw-r--r--std/node/_stream/duplex.ts682
-rw-r--r--std/node/_stream/duplex_internal.ts296
-rw-r--r--std/node/_stream/duplex_test.ts698
-rw-r--r--std/node/_stream/end_of_stream.ts241
-rw-r--r--std/node/_stream/end_of_stream_test.ts97
-rw-r--r--std/node/_stream/from.ts102
-rw-r--r--std/node/_stream/passthrough.ts20
-rw-r--r--std/node/_stream/pipeline.ts308
-rw-r--r--std/node/_stream/pipeline_test.ts387
-rw-r--r--std/node/_stream/promises.ts42
-rw-r--r--std/node/_stream/promises_test.ts84
-rw-r--r--std/node/_stream/readable.ts788
-rw-r--r--std/node/_stream/readable_internal.ts438
-rw-r--r--std/node/_stream/readable_test.ts489
-rw-r--r--std/node/_stream/stream.ts81
-rw-r--r--std/node/_stream/symbols.ts4
-rw-r--r--std/node/_stream/transform.ts132
-rw-r--r--std/node/_stream/transform_test.ts68
-rw-r--r--std/node/_stream/writable.ts443
-rw-r--r--std/node/_stream/writable_internal.ts457
-rw-r--r--std/node/_stream/writable_test.ts209
25 files changed, 0 insertions, 6779 deletions
diff --git a/std/node/_stream/async_iterator.ts b/std/node/_stream/async_iterator.ts
deleted file mode 100644
index 5369ef39c..000000000
--- a/std/node/_stream/async_iterator.ts
+++ /dev/null
@@ -1,243 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type { Buffer } from "../buffer.ts";
-import finished from "./end_of_stream.ts";
-import Readable from "./readable.ts";
-import type Stream from "./stream.ts";
-import { destroyer } from "./destroy.ts";
-
-const kLastResolve = Symbol("lastResolve");
-const kLastReject = Symbol("lastReject");
-const kError = Symbol("error");
-const kEnded = Symbol("ended");
-const kLastPromise = Symbol("lastPromise");
-const kHandlePromise = Symbol("handlePromise");
-const kStream = Symbol("stream");
-
-// TODO(Soremwar)
-// Add Duplex streams
-type IterableStreams = Stream | Readable;
-
-type IterableItem = Buffer | string | Uint8Array | undefined;
-type ReadableIteratorResult = IteratorResult<IterableItem>;
-
-function initIteratorSymbols(
- o: ReadableStreamAsyncIterator,
- symbols: symbol[],
-) {
- const properties: PropertyDescriptorMap = {};
- for (const sym in symbols) {
- properties[sym] = {
- configurable: false,
- enumerable: false,
- writable: true,
- };
- }
- Object.defineProperties(o, properties);
-}
-
-function createIterResult(
- value: IterableItem,
- done: boolean,
-): ReadableIteratorResult {
- return { value, done };
-}
-
-function readAndResolve(iter: ReadableStreamAsyncIterator) {
- const resolve = iter[kLastResolve];
- if (resolve !== null) {
- const data = iter[kStream].read();
- if (data !== null) {
- iter[kLastPromise] = null;
- iter[kLastResolve] = null;
- iter[kLastReject] = null;
- resolve(createIterResult(data, false));
- }
- }
-}
-
-function onReadable(iter: ReadableStreamAsyncIterator) {
- queueMicrotask(() => readAndResolve(iter));
-}
-
-function wrapForNext(
- lastPromise: Promise<ReadableIteratorResult>,
- iter: ReadableStreamAsyncIterator,
-) {
- return (
- resolve: (value: ReadableIteratorResult) => void,
- reject: (error: Error) => void,
- ) => {
- lastPromise.then(() => {
- if (iter[kEnded]) {
- resolve(createIterResult(undefined, true));
- return;
- }
-
- iter[kHandlePromise](resolve, reject);
- }, reject);
- };
-}
-
-function finish(self: ReadableStreamAsyncIterator, err?: Error) {
- return new Promise(
- (
- resolve: (result: ReadableIteratorResult) => void,
- reject: (error: Error) => void,
- ) => {
- const stream = self[kStream];
-
- finished(stream, (err) => {
- if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
- reject(err);
- } else {
- resolve(createIterResult(undefined, true));
- }
- });
- destroyer(stream, err);
- },
- );
-}
-
-const AsyncIteratorPrototype = Object.getPrototypeOf(
- Object.getPrototypeOf(async function* () {}).prototype,
-);
-
-export class ReadableStreamAsyncIterator
- implements AsyncIterableIterator<IterableItem> {
- [kEnded]: boolean;
- [kError]: Error | null = null;
- [kHandlePromise] = (
- resolve: (value: ReadableIteratorResult) => void,
- reject: (value: Error) => void,
- ) => {
- const data = this[kStream].read();
- if (data) {
- this[kLastPromise] = null;
- this[kLastResolve] = null;
- this[kLastReject] = null;
- resolve(createIterResult(data, false));
- } else {
- this[kLastResolve] = resolve;
- this[kLastReject] = reject;
- }
- };
- [kLastPromise]: null | Promise<ReadableIteratorResult>;
- [kLastReject]: null | ((value: Error) => void) = null;
- [kLastResolve]: null | ((value: ReadableIteratorResult) => void) = null;
- [kStream]: Readable;
- [Symbol.asyncIterator] = AsyncIteratorPrototype[Symbol.asyncIterator];
-
- constructor(stream: Readable) {
- this[kEnded] = stream.readableEnded || stream._readableState.endEmitted;
- this[kStream] = stream;
- initIteratorSymbols(this, [
- kEnded,
- kError,
- kHandlePromise,
- kLastPromise,
- kLastReject,
- kLastResolve,
- kStream,
- ]);
- }
-
- get stream() {
- return this[kStream];
- }
-
- next(): Promise<ReadableIteratorResult> {
- const error = this[kError];
- if (error !== null) {
- return Promise.reject(error);
- }
-
- if (this[kEnded]) {
- return Promise.resolve(createIterResult(undefined, true));
- }
-
- if (this[kStream].destroyed) {
- return new Promise((resolve, reject) => {
- if (this[kError]) {
- reject(this[kError]);
- } else if (this[kEnded]) {
- resolve(createIterResult(undefined, true));
- } else {
- finished(this[kStream], (err) => {
- if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
- reject(err);
- } else {
- resolve(createIterResult(undefined, true));
- }
- });
- }
- });
- }
-
- const lastPromise = this[kLastPromise];
- let promise;
-
- if (lastPromise) {
- promise = new Promise(wrapForNext(lastPromise, this));
- } else {
- const data = this[kStream].read();
- if (data !== null) {
- return Promise.resolve(createIterResult(data, false));
- }
-
- promise = new Promise(this[kHandlePromise]);
- }
-
- this[kLastPromise] = promise;
-
- return promise;
- }
-
- return(): Promise<ReadableIteratorResult> {
- return finish(this);
- }
-
- throw(err: Error): Promise<ReadableIteratorResult> {
- return finish(this, err);
- }
-}
-
-const createReadableStreamAsyncIterator = (stream: IterableStreams) => {
- // deno-lint-ignore no-explicit-any
- if (typeof (stream as any).read !== "function") {
- const src = stream;
- stream = new Readable({ objectMode: true }).wrap(src);
- finished(stream, (err) => destroyer(src, err));
- }
-
- const iterator = new ReadableStreamAsyncIterator(stream as Readable);
- iterator[kLastPromise] = null;
-
- finished(stream, { writable: false }, (err) => {
- if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
- const reject = iterator[kLastReject];
- if (reject !== null) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- reject(err);
- }
- iterator[kError] = err;
- return;
- }
-
- const resolve = iterator[kLastResolve];
- if (resolve !== null) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- resolve(createIterResult(undefined, true));
- }
- iterator[kEnded] = true;
- });
-
- stream.on("readable", onReadable.bind(null, iterator));
-
- return iterator;
-};
-
-export default createReadableStreamAsyncIterator;
diff --git a/std/node/_stream/async_iterator_test.ts b/std/node/_stream/async_iterator_test.ts
deleted file mode 100644
index 17698e0fd..000000000
--- a/std/node/_stream/async_iterator_test.ts
+++ /dev/null
@@ -1,249 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import Readable from "./readable.ts";
-import Stream from "./stream.ts";
-import toReadableAsyncIterator from "./async_iterator.ts";
-import { deferred } from "../../async/mod.ts";
-import { assertEquals, assertThrowsAsync } from "../../testing/asserts.ts";
-
-Deno.test("Stream to async iterator", async () => {
- let destroyExecuted = 0;
- const destroyExecutedExpected = 1;
- const destroyExpectedExecutions = deferred();
-
- class AsyncIteratorStream extends Stream {
- constructor() {
- super();
- }
-
- destroy() {
- destroyExecuted++;
- if (destroyExecuted == destroyExecutedExpected) {
- destroyExpectedExecutions.resolve();
- }
- }
-
- [Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator];
- }
-
- const stream = new AsyncIteratorStream();
-
- queueMicrotask(() => {
- stream.emit("data", "hello");
- stream.emit("data", "world");
- stream.emit("end");
- });
-
- let res = "";
-
- for await (const d of stream) {
- res += d;
- }
- assertEquals(res, "helloworld");
-
- const destroyTimeout = setTimeout(
- () => destroyExpectedExecutions.reject(),
- 1000,
- );
- await destroyExpectedExecutions;
- clearTimeout(destroyTimeout);
- assertEquals(destroyExecuted, destroyExecutedExpected);
-});
-
-Deno.test("Stream to async iterator throws on 'error' emitted", async () => {
- let closeExecuted = 0;
- const closeExecutedExpected = 1;
- const closeExpectedExecutions = deferred();
-
- let errorExecuted = 0;
- const errorExecutedExpected = 1;
- const errorExpectedExecutions = deferred();
-
- class StreamImplementation extends Stream {
- close() {
- closeExecuted++;
- if (closeExecuted == closeExecutedExpected) {
- closeExpectedExecutions.resolve();
- }
- }
- }
-
- const stream = new StreamImplementation();
- queueMicrotask(() => {
- stream.emit("data", 0);
- stream.emit("data", 1);
- stream.emit("error", new Error("asd"));
- });
-
- toReadableAsyncIterator(stream)
- .next()
- .catch((err) => {
- errorExecuted++;
- if (errorExecuted == errorExecutedExpected) {
- errorExpectedExecutions.resolve();
- }
- assertEquals(err.message, "asd");
- });
-
- const closeTimeout = setTimeout(
- () => closeExpectedExecutions.reject(),
- 1000,
- );
- const errorTimeout = setTimeout(
- () => errorExpectedExecutions.reject(),
- 1000,
- );
- await closeExpectedExecutions;
- await errorExpectedExecutions;
- clearTimeout(closeTimeout);
- clearTimeout(errorTimeout);
- assertEquals(closeExecuted, closeExecutedExpected);
- assertEquals(errorExecuted, errorExecutedExpected);
-});
-
-Deno.test("Async iterator matches values of Readable", async () => {
- const readable = new Readable({
- objectMode: true,
- read() {},
- });
- readable.push(0);
- readable.push(1);
- readable.push(null);
-
- const iter = readable[Symbol.asyncIterator]();
-
- assertEquals(
- await iter.next().then(({ value }) => value),
- 0,
- );
- for await (const d of iter) {
- assertEquals(d, 1);
- }
-});
-
-Deno.test("Async iterator throws on Readable destroyed sync", async () => {
- const message = "kaboom from read";
-
- const readable = new Readable({
- objectMode: true,
- read() {
- this.destroy(new Error(message));
- },
- });
-
- await assertThrowsAsync(
- async () => {
- // deno-lint-ignore no-empty
- for await (const k of readable) {}
- },
- Error,
- message,
- );
-});
-
-Deno.test("Async iterator throws on Readable destroyed async", async () => {
- const message = "kaboom";
- const readable = new Readable({
- read() {},
- });
- const iterator = readable[Symbol.asyncIterator]();
-
- readable.destroy(new Error(message));
-
- await assertThrowsAsync(
- iterator.next.bind(iterator),
- Error,
- message,
- );
-});
-
-Deno.test("Async iterator finishes the iterator when Readable destroyed", async () => {
- const readable = new Readable({
- read() {},
- });
-
- readable.destroy();
-
- const { done } = await readable[Symbol.asyncIterator]().next();
- assertEquals(done, true);
-});
-
-Deno.test("Async iterator finishes all item promises when Readable destroyed", async () => {
- const r = new Readable({
- objectMode: true,
- read() {
- },
- });
-
- const b = r[Symbol.asyncIterator]();
- const c = b.next();
- const d = b.next();
- r.destroy();
- assertEquals(await c, { done: true, value: undefined });
- assertEquals(await d, { done: true, value: undefined });
-});
-
-Deno.test("Async iterator: 'next' is triggered by Readable push", async () => {
- const max = 42;
- let readed = 0;
- let received = 0;
- const readable = new Readable({
- objectMode: true,
- read() {
- this.push("hello");
- if (++readed === max) {
- this.push(null);
- }
- },
- });
-
- for await (const k of readable) {
- received++;
- assertEquals(k, "hello");
- }
-
- assertEquals(readed, received);
-});
-
-Deno.test("Async iterator: 'close' called on forced iteration end", async () => {
- let closeExecuted = 0;
- const closeExecutedExpected = 1;
- const closeExpectedExecutions = deferred();
-
- class IndestructibleReadable extends Readable {
- constructor() {
- super({
- autoDestroy: false,
- read() {},
- });
- }
-
- close() {
- closeExecuted++;
- if (closeExecuted == closeExecutedExpected) {
- closeExpectedExecutions.resolve();
- }
- readable.emit("close");
- }
-
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- destroy = null;
- }
-
- const readable = new IndestructibleReadable();
- readable.push("asd");
- readable.push("asd");
-
- // eslint-disable-next-line @typescript-eslint/no-unused-vars
- for await (const d of readable) {
- break;
- }
-
- const closeTimeout = setTimeout(
- () => closeExpectedExecutions.reject(),
- 1000,
- );
- await closeExpectedExecutions;
- clearTimeout(closeTimeout);
- assertEquals(closeExecuted, closeExecutedExpected);
-});
diff --git a/std/node/_stream/buffer_list.ts b/std/node/_stream/buffer_list.ts
deleted file mode 100644
index fe1a693c0..000000000
--- a/std/node/_stream/buffer_list.ts
+++ /dev/null
@@ -1,183 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-
-type BufferListItem = {
- data: Buffer | string | Uint8Array;
- next: BufferListItem | null;
-};
-
-export default class BufferList {
- head: BufferListItem | null = null;
- tail: BufferListItem | null = null;
- length: number;
-
- constructor() {
- this.head = null;
- this.tail = null;
- this.length = 0;
- }
-
- push(v: Buffer | string | Uint8Array) {
- const entry = { data: v, next: null };
- if (this.length > 0) {
- (this.tail as BufferListItem).next = entry;
- } else {
- this.head = entry;
- }
- this.tail = entry;
- ++this.length;
- }
-
- unshift(v: Buffer | string | Uint8Array) {
- const entry = { data: v, next: this.head };
- if (this.length === 0) {
- this.tail = entry;
- }
- this.head = entry;
- ++this.length;
- }
-
- shift() {
- if (this.length === 0) {
- return;
- }
- const ret = (this.head as BufferListItem).data;
- if (this.length === 1) {
- this.head = this.tail = null;
- } else {
- this.head = (this.head as BufferListItem).next;
- }
- --this.length;
- return ret;
- }
-
- clear() {
- this.head = this.tail = null;
- this.length = 0;
- }
-
- join(s: string) {
- if (this.length === 0) {
- return "";
- }
- let p: BufferListItem | null = (this.head as BufferListItem);
- let ret = "" + p.data;
- p = p.next;
- while (p) {
- ret += s + p.data;
- p = p.next;
- }
- return ret;
- }
-
- concat(n: number) {
- if (this.length === 0) {
- return Buffer.alloc(0);
- }
- const ret = Buffer.allocUnsafe(n >>> 0);
- let p = this.head;
- let i = 0;
- while (p) {
- ret.set(p.data as Buffer, i);
- i += p.data.length;
- p = p.next;
- }
- return ret;
- }
-
- // Consumes a specified amount of bytes or characters from the buffered data.
- consume(n: number, hasStrings: boolean) {
- const data = (this.head as BufferListItem).data;
- if (n < data.length) {
- // `slice` is the same for buffers and strings.
- const slice = data.slice(0, n);
- (this.head as BufferListItem).data = data.slice(n);
- return slice;
- }
- if (n === data.length) {
- // First chunk is a perfect match.
- return this.shift();
- }
- // Result spans more than one buffer.
- return hasStrings ? this._getString(n) : this._getBuffer(n);
- }
-
- first() {
- return (this.head as BufferListItem).data;
- }
-
- *[Symbol.iterator]() {
- for (let p = this.head; p; p = p.next) {
- yield p.data;
- }
- }
-
- // Consumes a specified amount of characters from the buffered data.
- _getString(n: number) {
- let ret = "";
- let p: BufferListItem | null = (this.head as BufferListItem);
- let c = 0;
- p = p.next as BufferListItem;
- do {
- const str = p.data;
- if (n > str.length) {
- ret += str;
- n -= str.length;
- } else {
- if (n === str.length) {
- ret += str;
- ++c;
- if (p.next) {
- this.head = p.next;
- } else {
- this.head = this.tail = null;
- }
- } else {
- ret += str.slice(0, n);
- this.head = p;
- p.data = str.slice(n);
- }
- break;
- }
- ++c;
- p = p.next;
- } while (p);
- this.length -= c;
- return ret;
- }
-
- // Consumes a specified amount of bytes from the buffered data.
- _getBuffer(n: number) {
- const ret = Buffer.allocUnsafe(n);
- const retLen = n;
- let p: BufferListItem | null = (this.head as BufferListItem);
- let c = 0;
- p = p.next as BufferListItem;
- do {
- const buf = p.data as Buffer;
- if (n > buf.length) {
- ret.set(buf, retLen - n);
- n -= buf.length;
- } else {
- if (n === buf.length) {
- ret.set(buf, retLen - n);
- ++c;
- if (p.next) {
- this.head = p.next;
- } else {
- this.head = this.tail = null;
- }
- } else {
- ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n);
- this.head = p;
- p.data = buf.slice(n);
- }
- break;
- }
- ++c;
- p = p.next;
- } while (p);
- this.length -= c;
- return ret;
- }
-}
diff --git a/std/node/_stream/destroy.ts b/std/node/_stream/destroy.ts
deleted file mode 100644
index d13e12de2..000000000
--- a/std/node/_stream/destroy.ts
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type Duplex from "./duplex.ts";
-import type Readable from "./readable.ts";
-import type Stream from "./stream.ts";
-import type Writable from "./writable.ts";
-
-//This whole module acts as a 'normalizer'
-//Idea behind it is you can pass any kind of streams and functions will execute anyways
-
-//TODO(Soremwar)
-//Should be any implementation of stream
-//This is a guard to check executed methods exist inside the implementation
-type StreamImplementations = Duplex | Readable | Writable;
-
-// TODO(Soremwar)
-// Bring back once requests are implemented
-// function isRequest(stream: any) {
-// return stream && stream.setHeader && typeof stream.abort === "function";
-// }
-
-export function destroyer(stream: Stream, err?: Error | null) {
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // if (isRequest(stream)) return stream.abort();
- // if (isRequest(stream.req)) return stream.req.abort();
- if (
- typeof (stream as StreamImplementations).destroy === "function"
- ) {
- return (stream as StreamImplementations).destroy(err);
- }
- // A test of async iterator mocks an upcoming implementation of stream
- // his is casted to any in the meanwhile
- // deno-lint-ignore no-explicit-any
- if (typeof (stream as any).close === "function") {
- // deno-lint-ignore no-explicit-any
- return (stream as any).close();
- }
-}
diff --git a/std/node/_stream/duplex.ts b/std/node/_stream/duplex.ts
deleted file mode 100644
index b5c429f0a..000000000
--- a/std/node/_stream/duplex.ts
+++ /dev/null
@@ -1,682 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { captureRejectionSymbol } from "../events.ts";
-import Readable, { ReadableState } from "./readable.ts";
-import Stream from "./stream.ts";
-import Writable, { WritableState } from "./writable.ts";
-import { Buffer } from "../buffer.ts";
-import {
- ERR_STREAM_ALREADY_FINISHED,
- ERR_STREAM_DESTROYED,
- ERR_UNKNOWN_ENCODING,
-} from "../_errors.ts";
-import type { Encodings } from "../_utils.ts";
-import createReadableStreamAsyncIterator from "./async_iterator.ts";
-import type { ReadableStreamAsyncIterator } from "./async_iterator.ts";
-import {
- _destroy,
- computeNewHighWaterMark,
- emitReadable,
- fromList,
- howMuchToRead,
- nReadingNextTick,
- updateReadableListening,
-} from "./readable_internal.ts";
-import { kOnFinished, writeV } from "./writable_internal.ts";
-import {
- endDuplex,
- finishMaybe,
- onwrite,
- readableAddChunk,
-} from "./duplex_internal.ts";
-export { errorOrDestroy } from "./duplex_internal.ts";
-
-export interface DuplexOptions {
- allowHalfOpen?: boolean;
- autoDestroy?: boolean;
- decodeStrings?: boolean;
- defaultEncoding?: Encodings;
- destroy?(
- this: Duplex,
- error: Error | null,
- callback: (error: Error | null) => void,
- ): void;
- emitClose?: boolean;
- encoding?: Encodings;
- final?(this: Duplex, callback: (error?: Error | null) => void): void;
- highWaterMark?: number;
- objectMode?: boolean;
- read?(this: Duplex, size: number): void;
- readable?: boolean;
- readableHighWaterMark?: number;
- readableObjectMode?: boolean;
- writable?: boolean;
- writableCorked?: number;
- writableHighWaterMark?: number;
- writableObjectMode?: boolean;
- write?(
- this: Duplex,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: Encodings,
- callback: (error?: Error | null) => void,
- ): void;
- writev?: writeV;
-}
-
-interface Duplex extends Readable, Writable {}
-
-/**
- * A duplex is an implementation of a stream that has both Readable and Writable
- * attributes and capabilities
- */
-class Duplex extends Stream {
- allowHalfOpen = true;
- _final?: (
- callback: (error?: Error | null | undefined) => void,
- ) => void;
- _readableState: ReadableState;
- _writableState: WritableState;
- _writev?: writeV | null;
-
- constructor(options?: DuplexOptions) {
- super();
-
- if (options) {
- if (options.allowHalfOpen === false) {
- this.allowHalfOpen = false;
- }
- if (typeof options.destroy === "function") {
- this._destroy = options.destroy;
- }
- if (typeof options.final === "function") {
- this._final = options.final;
- }
- if (typeof options.read === "function") {
- this._read = options.read;
- }
- if (options.readable === false) {
- this.readable = false;
- }
- if (options.writable === false) {
- this.writable = false;
- }
- if (typeof options.write === "function") {
- this._write = options.write;
- }
- if (typeof options.writev === "function") {
- this._writev = options.writev;
- }
- }
-
- const readableOptions = {
- autoDestroy: options?.autoDestroy,
- defaultEncoding: options?.defaultEncoding,
- destroy: options?.destroy as unknown as (
- this: Readable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ) => void,
- emitClose: options?.emitClose,
- encoding: options?.encoding,
- highWaterMark: options?.highWaterMark ?? options?.readableHighWaterMark,
- objectMode: options?.objectMode ?? options?.readableObjectMode,
- read: options?.read as unknown as (this: Readable) => void,
- };
-
- const writableOptions = {
- autoDestroy: options?.autoDestroy,
- decodeStrings: options?.decodeStrings,
- defaultEncoding: options?.defaultEncoding,
- destroy: options?.destroy as unknown as (
- this: Writable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ) => void,
- emitClose: options?.emitClose,
- final: options?.final as unknown as (
- this: Writable,
- callback: (error?: Error | null) => void,
- ) => void,
- highWaterMark: options?.highWaterMark ?? options?.writableHighWaterMark,
- objectMode: options?.objectMode ?? options?.writableObjectMode,
- write: options?.write as unknown as (
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- callback: (error?: Error | null) => void,
- ) => void,
- writev: options?.writev as unknown as (
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: Encodings }>,
- callback: (error?: Error | null) => void,
- ) => void,
- };
-
- this._readableState = new ReadableState(readableOptions);
- this._writableState = new WritableState(
- writableOptions,
- this as unknown as Writable,
- );
- //Very important to override onwrite here, duplex implementation adds a check
- //on the readable side
- this._writableState.onwrite = onwrite.bind(undefined, this);
- }
-
- [captureRejectionSymbol](err?: Error) {
- this.destroy(err);
- }
-
- [Symbol.asyncIterator](): ReadableStreamAsyncIterator {
- return createReadableStreamAsyncIterator(this);
- }
-
- _destroy(
- error: Error | null,
- callback: (error?: Error | null) => void,
- ): void {
- callback(error);
- }
-
- _read = Readable.prototype._read;
-
- _undestroy = Readable.prototype._undestroy;
-
- destroy(err?: Error | null, cb?: (error?: Error | null) => void) {
- const r = this._readableState;
- const w = this._writableState;
-
- if (w.destroyed || r.destroyed) {
- if (typeof cb === "function") {
- cb();
- }
-
- return this;
- }
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- w.destroyed = true;
- r.destroyed = true;
-
- this._destroy(err || null, (err) => {
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- w.closed = true;
- r.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- const r = this._readableState;
- const w = this._writableState;
-
- if (!w.errorEmitted && !r.errorEmitted) {
- w.errorEmitted = true;
- r.errorEmitted = true;
-
- this.emit("error", err);
- }
-
- r.closeEmitted = true;
-
- if (w.emitClose || r.emitClose) {
- this.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- const r = this._readableState;
- const w = this._writableState;
-
- r.closeEmitted = true;
-
- if (w.emitClose || r.emitClose) {
- this.emit("close");
- }
- });
- }
- });
-
- return this;
- }
-
- isPaused = Readable.prototype.isPaused;
-
- off = this.removeListener;
-
- on(
- event: "close" | "end" | "pause" | "readable" | "resume",
- listener: () => void,
- ): this;
- // deno-lint-ignore no-explicit-any
- on(event: "data", listener: (chunk: any) => void): this;
- on(event: "error", listener: (err: Error) => void): this;
- // deno-lint-ignore no-explicit-any
- on(event: string | symbol, listener: (...args: any[]) => void): this;
- on(
- ev: string | symbol,
- fn:
- | (() => void)
- // deno-lint-ignore no-explicit-any
- | ((chunk: any) => void)
- | ((err: Error) => void)
- // deno-lint-ignore no-explicit-any
- | ((...args: any[]) => void),
- ) {
- const res = super.on.call(this, ev, fn);
- const state = this._readableState;
-
- if (ev === "data") {
- state.readableListening = this.listenerCount("readable") > 0;
-
- if (state.flowing !== false) {
- this.resume();
- }
- } else if (ev === "readable") {
- if (!state.endEmitted && !state.readableListening) {
- state.readableListening = state.needReadable = true;
- state.flowing = false;
- state.emittedReadable = false;
- if (state.length) {
- emitReadable(this);
- } else if (!state.reading) {
- queueMicrotask(() => nReadingNextTick(this));
- }
- }
- }
-
- return res;
- }
-
- pause = Readable.prototype.pause as () => this;
-
- pipe = Readable.prototype.pipe;
-
- // deno-lint-ignore no-explicit-any
- push(chunk: any, encoding?: Encodings): boolean {
- return readableAddChunk(this, chunk, encoding, false);
- }
-
- /** You can override either this method, or the async `_read` method */
- read(n?: number) {
- // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
- // in this scenario, so we are doing it manually.
- if (n === undefined) {
- n = NaN;
- }
- const state = this._readableState;
- const nOrig = n;
-
- if (n > state.highWaterMark) {
- state.highWaterMark = computeNewHighWaterMark(n);
- }
-
- if (n !== 0) {
- state.emittedReadable = false;
- }
-
- if (
- n === 0 &&
- state.needReadable &&
- ((state.highWaterMark !== 0
- ? state.length >= state.highWaterMark
- : state.length > 0) ||
- state.ended)
- ) {
- if (state.length === 0 && state.ended) {
- endDuplex(this);
- } else {
- emitReadable(this);
- }
- return null;
- }
-
- n = howMuchToRead(n, state);
-
- if (n === 0 && state.ended) {
- if (state.length === 0) {
- endDuplex(this);
- }
- return null;
- }
-
- let doRead = state.needReadable;
- if (
- state.length === 0 || state.length - (n as number) < state.highWaterMark
- ) {
- doRead = true;
- }
-
- if (
- state.ended || state.reading || state.destroyed || state.errored ||
- !state.constructed
- ) {
- doRead = false;
- } else if (doRead) {
- state.reading = true;
- state.sync = true;
- if (state.length === 0) {
- state.needReadable = true;
- }
- this._read();
- state.sync = false;
- if (!state.reading) {
- n = howMuchToRead(nOrig, state);
- }
- }
-
- let ret;
- if ((n as number) > 0) {
- ret = fromList((n as number), state);
- } else {
- ret = null;
- }
-
- if (ret === null) {
- state.needReadable = state.length <= state.highWaterMark;
- n = 0;
- } else {
- state.length -= n as number;
- if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).clear();
- } else {
- state.awaitDrainWriters = null;
- }
- }
-
- if (state.length === 0) {
- if (!state.ended) {
- state.needReadable = true;
- }
-
- if (nOrig !== n && state.ended) {
- endDuplex(this);
- }
- }
-
- if (ret !== null) {
- this.emit("data", ret);
- }
-
- return ret;
- }
-
- removeAllListeners(
- ev:
- | "close"
- | "data"
- | "end"
- | "error"
- | "pause"
- | "readable"
- | "resume"
- | symbol
- | undefined,
- ) {
- const res = super.removeAllListeners(ev);
-
- if (ev === "readable" || ev === undefined) {
- queueMicrotask(() => updateReadableListening(this));
- }
-
- return res;
- }
-
- removeListener(
- event: "close" | "end" | "pause" | "readable" | "resume",
- listener: () => void,
- ): this;
- // deno-lint-ignore no-explicit-any
- removeListener(event: "data", listener: (chunk: any) => void): this;
- removeListener(event: "error", listener: (err: Error) => void): this;
- removeListener(
- event: string | symbol,
- // deno-lint-ignore no-explicit-any
- listener: (...args: any[]) => void,
- ): this;
- removeListener(
- ev: string | symbol,
- fn:
- | (() => void)
- // deno-lint-ignore no-explicit-any
- | ((chunk: any) => void)
- | ((err: Error) => void)
- // deno-lint-ignore no-explicit-any
- | ((...args: any[]) => void),
- ) {
- const res = super.removeListener.call(this, ev, fn);
-
- if (ev === "readable") {
- queueMicrotask(() => updateReadableListening(this));
- }
-
- return res;
- }
-
- resume = Readable.prototype.resume as () => this;
-
- setEncoding = Readable.prototype.setEncoding as (enc: string) => this;
-
- // deno-lint-ignore no-explicit-any
- unshift(chunk: any, encoding?: Encodings): boolean {
- return readableAddChunk(this, chunk, encoding, true);
- }
-
- unpipe = Readable.prototype.unpipe as (dest?: Writable | undefined) => this;
-
- wrap = Readable.prototype.wrap as (stream: Stream) => this;
-
- get readable(): boolean {
- return this._readableState?.readable &&
- !this._readableState?.destroyed &&
- !this._readableState?.errorEmitted &&
- !this._readableState?.endEmitted;
- }
- set readable(val: boolean) {
- if (this._readableState) {
- this._readableState.readable = val;
- }
- }
-
- get readableHighWaterMark(): number {
- return this._readableState.highWaterMark;
- }
-
- get readableBuffer() {
- return this._readableState && this._readableState.buffer;
- }
-
- get readableFlowing(): boolean | null {
- return this._readableState.flowing;
- }
-
- set readableFlowing(state: boolean | null) {
- if (this._readableState) {
- this._readableState.flowing = state;
- }
- }
-
- get readableLength() {
- return this._readableState.length;
- }
-
- get readableObjectMode() {
- return this._readableState ? this._readableState.objectMode : false;
- }
-
- get readableEncoding() {
- return this._readableState ? this._readableState.encoding : null;
- }
-
- get readableEnded() {
- return this._readableState ? this._readableState.endEmitted : false;
- }
-
- _write = Writable.prototype._write;
-
- write = Writable.prototype.write;
-
- cork = Writable.prototype.cork;
-
- uncork = Writable.prototype.uncork;
-
- setDefaultEncoding(encoding: string) {
- // node::ParseEncoding() requires lower case.
- if (typeof encoding === "string") {
- encoding = encoding.toLowerCase();
- }
- if (!Buffer.isEncoding(encoding)) {
- throw new ERR_UNKNOWN_ENCODING(encoding);
- }
- this._writableState.defaultEncoding = encoding as Encodings;
- return this;
- }
-
- end(cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, encoding: Encodings, cb?: () => void): void;
-
- end(
- // deno-lint-ignore no-explicit-any
- x?: any | (() => void),
- y?: Encodings | (() => void),
- z?: () => void,
- ) {
- const state = this._writableState;
- // deno-lint-ignore no-explicit-any
- let chunk: any | null;
- let encoding: Encodings | null;
- let cb: undefined | ((error?: Error) => void);
-
- if (typeof x === "function") {
- chunk = null;
- encoding = null;
- cb = x;
- } else if (typeof y === "function") {
- chunk = x;
- encoding = null;
- cb = y;
- } else {
- chunk = x;
- encoding = y as Encodings;
- cb = z;
- }
-
- if (chunk !== null && chunk !== undefined) {
- this.write(chunk, encoding);
- }
-
- if (state.corked) {
- state.corked = 1;
- this.uncork();
- }
-
- let err: Error | undefined;
- if (!state.errored && !state.ending) {
- state.ending = true;
- finishMaybe(this, state, true);
- state.ended = true;
- } else if (state.finished) {
- err = new ERR_STREAM_ALREADY_FINISHED("end");
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("end");
- }
-
- if (typeof cb === "function") {
- if (err || state.finished) {
- queueMicrotask(() => {
- (cb as (error?: Error | undefined) => void)(err);
- });
- } else {
- state[kOnFinished].push(cb);
- }
- }
-
- return this;
- }
-
- get destroyed() {
- if (
- this._readableState === undefined ||
- this._writableState === undefined
- ) {
- return false;
- }
- return this._readableState.destroyed && this._writableState.destroyed;
- }
-
- set destroyed(value: boolean) {
- if (this._readableState && this._writableState) {
- this._readableState.destroyed = value;
- this._writableState.destroyed = value;
- }
- }
-
- get writable() {
- const w = this._writableState;
- return !w.destroyed && !w.errored && !w.ending && !w.ended;
- }
-
- set writable(val) {
- if (this._writableState) {
- this._writableState.writable = !!val;
- }
- }
-
- get writableFinished() {
- return this._writableState ? this._writableState.finished : false;
- }
-
- get writableObjectMode() {
- return this._writableState ? this._writableState.objectMode : false;
- }
-
- get writableBuffer() {
- return this._writableState && this._writableState.getBuffer();
- }
-
- get writableEnded() {
- return this._writableState ? this._writableState.ending : false;
- }
-
- get writableHighWaterMark() {
- return this._writableState && this._writableState.highWaterMark;
- }
-
- get writableCorked() {
- return this._writableState ? this._writableState.corked : 0;
- }
-
- get writableLength() {
- return this._writableState && this._writableState.length;
- }
-}
-
-export default Duplex;
diff --git a/std/node/_stream/duplex_internal.ts b/std/node/_stream/duplex_internal.ts
deleted file mode 100644
index bfd9749f8..000000000
--- a/std/node/_stream/duplex_internal.ts
+++ /dev/null
@@ -1,296 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type { ReadableState } from "./readable.ts";
-import { addChunk, maybeReadMore, onEofChunk } from "./readable_internal.ts";
-import type Writable from "./writable.ts";
-import type { WritableState } from "./writable.ts";
-import {
- afterWrite,
- AfterWriteTick,
- afterWriteTick,
- clearBuffer,
- errorBuffer,
- kOnFinished,
- needFinish,
- prefinish,
-} from "./writable_internal.ts";
-import { Buffer } from "../buffer.ts";
-import type Duplex from "./duplex.ts";
-import {
- ERR_MULTIPLE_CALLBACK,
- ERR_STREAM_PUSH_AFTER_EOF,
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
-} from "../_errors.ts";
-
-export function endDuplex(stream: Duplex) {
- const state = stream._readableState;
-
- if (!state.endEmitted) {
- state.ended = true;
- queueMicrotask(() => endReadableNT(state, stream));
- }
-}
-
-function endReadableNT(state: ReadableState, stream: Duplex) {
- // Check that we didn't get one last unshift.
- if (
- !state.errorEmitted && !state.closeEmitted &&
- !state.endEmitted && state.length === 0
- ) {
- state.endEmitted = true;
- stream.emit("end");
-
- if (stream.writable && stream.allowHalfOpen === false) {
- queueMicrotask(() => endWritableNT(state, stream));
- } else if (state.autoDestroy) {
- // In case of duplex streams we need a way to detect
- // if the writable side is ready for autoDestroy as well.
- const wState = stream._writableState;
- const autoDestroy = !wState || (
- wState.autoDestroy &&
- // We don't expect the writable to ever 'finish'
- // if writable is explicitly set to false.
- (wState.finished || wState.writable === false)
- );
-
- if (autoDestroy) {
- stream.destroy();
- }
- }
- }
-}
-
-function endWritableNT(state: ReadableState, stream: Duplex) {
- const writable = stream.writable &&
- !stream.writableEnded &&
- !stream.destroyed;
- if (writable) {
- stream.end();
- }
-}
-
-export function errorOrDestroy(
- // deno-lint-ignore no-explicit-any
- this: any,
- stream: Duplex,
- err: Error,
- sync = false,
-) {
- const r = stream._readableState;
- const w = stream._writableState;
-
- if (w.destroyed || r.destroyed) {
- return this;
- }
-
- if (r.autoDestroy || w.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (w && !w.errored) {
- w.errored = err;
- }
- if (r && !r.errored) {
- r.errored = err;
- }
-
- if (sync) {
- queueMicrotask(() => {
- if (w.errorEmitted || r.errorEmitted) {
- return;
- }
-
- w.errorEmitted = true;
- r.errorEmitted = true;
-
- stream.emit("error", err);
- });
- } else {
- if (w.errorEmitted || r.errorEmitted) {
- return;
- }
-
- w.errorEmitted = true;
- r.errorEmitted = true;
-
- stream.emit("error", err);
- }
- }
-}
-
-function finish(stream: Duplex, state: WritableState) {
- state.pendingcb--;
- if (state.errorEmitted || state.closeEmitted) {
- return;
- }
-
- state.finished = true;
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback();
- }
-
- stream.emit("finish");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
-}
-
-export function finishMaybe(
- stream: Duplex,
- state: WritableState,
- sync?: boolean,
-) {
- if (needFinish(state)) {
- prefinish(stream as Writable, state);
- if (state.pendingcb === 0 && needFinish(state)) {
- state.pendingcb++;
- if (sync) {
- queueMicrotask(() => finish(stream, state));
- } else {
- finish(stream, state);
- }
- }
- }
-}
-
-export function onwrite(stream: Duplex, er?: Error | null) {
- const state = stream._writableState;
- const sync = state.sync;
- const cb = state.writecb;
-
- if (typeof cb !== "function") {
- errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
- return;
- }
-
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
-
- if (er) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- er.stack;
-
- if (!state.errored) {
- state.errored = er;
- }
-
- if (stream._readableState && !stream._readableState.errored) {
- stream._readableState.errored = er;
- }
-
- if (sync) {
- queueMicrotask(() => onwriteError(stream, state, er, cb));
- } else {
- onwriteError(stream, state, er, cb);
- }
- } else {
- if (state.buffered.length > state.bufferedIndex) {
- clearBuffer(stream, state);
- }
-
- if (sync) {
- if (
- state.afterWriteTickInfo !== null &&
- state.afterWriteTickInfo.cb === cb
- ) {
- state.afterWriteTickInfo.count++;
- } else {
- state.afterWriteTickInfo = {
- count: 1,
- cb: (cb as (error?: Error) => void),
- stream: stream as Writable,
- state,
- };
- queueMicrotask(() =>
- afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
- );
- }
- } else {
- afterWrite(stream as Writable, state, 1, cb as (error?: Error) => void);
- }
- }
-}
-
-function onwriteError(
- stream: Duplex,
- state: WritableState,
- er: Error,
- cb: (error: Error) => void,
-) {
- --state.pendingcb;
-
- cb(er);
- errorBuffer(state);
- errorOrDestroy(stream, er);
-}
-
-export function readableAddChunk(
- stream: Duplex,
- chunk: string | Buffer | Uint8Array | null,
- encoding: undefined | string = undefined,
- addToFront: boolean,
-) {
- const state = stream._readableState;
- let usedEncoding = encoding;
-
- let err;
- if (!state.objectMode) {
- if (typeof chunk === "string") {
- usedEncoding = encoding || state.defaultEncoding;
- if (state.encoding !== usedEncoding) {
- if (addToFront && state.encoding) {
- chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
- } else {
- chunk = Buffer.from(chunk, usedEncoding);
- usedEncoding = "";
- }
- }
- } else if (chunk instanceof Uint8Array) {
- chunk = Buffer.from(chunk);
- }
- }
-
- if (err) {
- errorOrDestroy(stream, err);
- } else if (chunk === null) {
- state.reading = false;
- onEofChunk(stream, state);
- } else if (state.objectMode || (chunk.length > 0)) {
- if (addToFront) {
- if (state.endEmitted) {
- errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
- } else {
- addChunk(stream, state, chunk, true);
- }
- } else if (state.ended) {
- errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
- } else if (state.destroyed || state.errored) {
- return false;
- } else {
- state.reading = false;
- if (state.decoder && !usedEncoding) {
- //TODO(Soremwar)
- //I don't think this cast is right
- chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
- if (state.objectMode || chunk.length !== 0) {
- addChunk(stream, state, chunk, false);
- } else {
- maybeReadMore(stream, state);
- }
- } else {
- addChunk(stream, state, chunk, false);
- }
- }
- } else if (!addToFront) {
- state.reading = false;
- maybeReadMore(stream, state);
- }
-
- return !state.ended &&
- (state.length < state.highWaterMark || state.length === 0);
-}
diff --git a/std/node/_stream/duplex_test.ts b/std/node/_stream/duplex_test.ts
deleted file mode 100644
index 1596ec218..000000000
--- a/std/node/_stream/duplex_test.ts
+++ /dev/null
@@ -1,698 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Duplex from "./duplex.ts";
-import finished from "./end_of_stream.ts";
-import {
- assert,
- assertEquals,
- assertStrictEquals,
- assertThrows,
-} from "../../testing/asserts.ts";
-import { deferred, delay } from "../../async/mod.ts";
-
-Deno.test("Duplex stream works normally", () => {
- const stream = new Duplex({ objectMode: true });
-
- assert(stream._readableState.objectMode);
- assert(stream._writableState.objectMode);
- assert(stream.allowHalfOpen);
- assertEquals(stream.listenerCount("end"), 0);
-
- let written: { val: number };
- let read: { val: number };
-
- stream._write = (obj, _, cb) => {
- written = obj;
- cb();
- };
-
- stream._read = () => {};
-
- stream.on("data", (obj) => {
- read = obj;
- });
-
- stream.push({ val: 1 });
- stream.end({ val: 2 });
-
- stream.on("finish", () => {
- assertEquals(read.val, 1);
- assertEquals(written.val, 2);
- });
-});
-
-Deno.test("Duplex stream gets constructed correctly", () => {
- const d1 = new Duplex({
- objectMode: true,
- highWaterMark: 100,
- });
-
- assertEquals(d1.readableObjectMode, true);
- assertEquals(d1.readableHighWaterMark, 100);
- assertEquals(d1.writableObjectMode, true);
- assertEquals(d1.writableHighWaterMark, 100);
-
- const d2 = new Duplex({
- readableObjectMode: false,
- readableHighWaterMark: 10,
- writableObjectMode: true,
- writableHighWaterMark: 100,
- });
-
- assertEquals(d2.writableObjectMode, true);
- assertEquals(d2.writableHighWaterMark, 100);
- assertEquals(d2.readableObjectMode, false);
- assertEquals(d2.readableHighWaterMark, 10);
-});
-
-Deno.test("Duplex stream can be paused", () => {
- const readable = new Duplex();
-
- // _read is a noop, here.
- readable._read = () => {};
-
- // Default state of a stream is not "paused"
- assert(!readable.isPaused());
-
- // Make the stream start flowing...
- readable.on("data", () => {});
-
- // still not paused.
- assert(!readable.isPaused());
-
- readable.pause();
- assert(readable.isPaused());
- readable.resume();
- assert(!readable.isPaused());
-});
-
-Deno.test("Duplex stream sets enconding correctly", () => {
- const readable = new Duplex({
- read() {},
- });
-
- readable.setEncoding("utf8");
-
- readable.push(new TextEncoder().encode("DEF"));
- readable.unshift(new TextEncoder().encode("ABC"));
-
- assertStrictEquals(readable.read(), "ABCDEF");
-});
-
-Deno.test("Duplex stream sets encoding correctly", () => {
- const readable = new Duplex({
- read() {},
- });
-
- readable.setEncoding("utf8");
-
- readable.push(new TextEncoder().encode("DEF"));
- readable.unshift(new TextEncoder().encode("ABC"));
-
- assertStrictEquals(readable.read(), "ABCDEF");
-});
-
-Deno.test("Duplex stream holds up a big push", async () => {
- let readExecuted = 0;
- const readExecutedExpected = 3;
- const readExpectedExecutions = deferred();
-
- let endExecuted = 0;
- const endExecutedExpected = 1;
- const endExpectedExecutions = deferred();
-
- const str = "asdfasdfasdfasdfasdf";
-
- const r = new Duplex({
- highWaterMark: 5,
- encoding: "utf8",
- });
-
- let reads = 0;
-
- function _read() {
- if (reads === 0) {
- setTimeout(() => {
- r.push(str);
- }, 1);
- reads++;
- } else if (reads === 1) {
- const ret = r.push(str);
- assertEquals(ret, false);
- reads++;
- } else {
- r.push(null);
- }
- }
-
- r._read = () => {
- readExecuted++;
- if (readExecuted == readExecutedExpected) {
- readExpectedExecutions.resolve();
- }
- _read();
- };
-
- r.on("end", () => {
- endExecuted++;
- if (endExecuted == endExecutedExpected) {
- endExpectedExecutions.resolve();
- }
- });
-
- // Push some data in to start.
- // We've never gotten any read event at this point.
- const ret = r.push(str);
- assert(!ret);
- let chunk = r.read();
- assertEquals(chunk, str);
- chunk = r.read();
- assertEquals(chunk, null);
-
- r.once("readable", () => {
- // This time, we'll get *all* the remaining data, because
- // it's been added synchronously, as the read WOULD take
- // us below the hwm, and so it triggered a _read() again,
- // which synchronously added more, which we then return.
- chunk = r.read();
- assertEquals(chunk, str + str);
-
- chunk = r.read();
- assertEquals(chunk, null);
- });
-
- const readTimeout = setTimeout(
- () => readExpectedExecutions.reject(),
- 1000,
- );
- const endTimeout = setTimeout(
- () => endExpectedExecutions.reject(),
- 1000,
- );
- await readExpectedExecutions;
- await endExpectedExecutions;
- clearTimeout(readTimeout);
- clearTimeout(endTimeout);
- assertEquals(readExecuted, readExecutedExpected);
- assertEquals(endExecuted, endExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- const r = new Duplex({
- highWaterMark: 3,
- });
-
- r._read = () => {
- throw new Error("_read must not be called");
- };
- r.push(Buffer.from("blerg"));
-
- setTimeout(function () {
- assert(!r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- clearTimeout(readableTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- let readExecuted = 0;
- const readExecutedExpected = 1;
- const readExpectedExecutions = deferred();
-
- const r = new Duplex({
- highWaterMark: 3,
- });
-
- r._read = () => {
- readExecuted++;
- if (readExecuted == readExecutedExpected) {
- readExpectedExecutions.resolve();
- }
- };
-
- r.push(Buffer.from("bl"));
-
- setTimeout(function () {
- assert(r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- const readTimeout = setTimeout(
- () => readExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- await readExpectedExecutions;
- clearTimeout(readableTimeout);
- clearTimeout(readTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
- assertEquals(readExecuted, readExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- const r = new Duplex({
- highWaterMark: 30,
- });
-
- r._read = () => {
- throw new Error("Must not be executed");
- };
-
- r.push(Buffer.from("blerg"));
- //This ends the stream and triggers end
- r.push(null);
-
- setTimeout(function () {
- // Assert we're testing what we think we are
- assert(!r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- clearTimeout(readableTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'read' is emitted on empty string pushed in non-object mode", async () => {
- let endExecuted = 0;
- const endExecutedExpected = 1;
- const endExpectedExecutions = deferred();
-
- const underlyingData = ["", "x", "y", "", "z"];
- const expected = underlyingData.filter((data) => data);
- const result: unknown[] = [];
-
- const r = new Duplex({
- encoding: "utf8",
- });
- r._read = function () {
- queueMicrotask(() => {
- if (!underlyingData.length) {
- this.push(null);
- } else {
- this.push(underlyingData.shift());
- }
- });
- };
-
- r.on("readable", () => {
- const data = r.read();
- if (data !== null) result.push(data);
- });
-
- r.on("end", () => {
- endExecuted++;
- if (endExecuted == endExecutedExpected) {
- endExpectedExecutions.resolve();
- }
- assertEquals(result, expected);
- });
-
- const endTimeout = setTimeout(
- () => endExpectedExecutions.reject(),
- 1000,
- );
- await endExpectedExecutions;
- clearTimeout(endTimeout);
- assertEquals(endExecuted, endExecutedExpected);
-});
-
-Deno.test("Duplex stream: listeners can be removed", () => {
- const r = new Duplex();
- r._read = () => {};
- r.on("data", () => {});
-
- r.removeAllListeners("data");
-
- assertEquals(r.eventNames().length, 0);
-});
-
-Deno.test("Duplex stream writes correctly", async () => {
- let callback: undefined | ((error?: Error | null | undefined) => void);
-
- let writeExecuted = 0;
- const writeExecutedExpected = 1;
- const writeExpectedExecutions = deferred();
-
- let writevExecuted = 0;
- const writevExecutedExpected = 1;
- const writevExpectedExecutions = deferred();
-
- const writable = new Duplex({
- write: (chunk, encoding, cb) => {
- writeExecuted++;
- if (writeExecuted == writeExecutedExpected) {
- writeExpectedExecutions.resolve();
- }
- assert(chunk instanceof Buffer);
- assertStrictEquals(encoding, "buffer");
- assertStrictEquals(String(chunk), "ABC");
- callback = cb;
- },
- writev: (chunks) => {
- writevExecuted++;
- if (writevExecuted == writevExecutedExpected) {
- writevExpectedExecutions.resolve();
- }
- assertStrictEquals(chunks.length, 2);
- assertStrictEquals(chunks[0].encoding, "buffer");
- assertStrictEquals(chunks[1].encoding, "buffer");
- assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI");
- },
- });
-
- writable.write(new TextEncoder().encode("ABC"));
- writable.write(new TextEncoder().encode("DEF"));
- writable.end(new TextEncoder().encode("GHI"));
- callback?.();
-
- const writeTimeout = setTimeout(
- () => writeExpectedExecutions.reject(),
- 1000,
- );
- const writevTimeout = setTimeout(
- () => writevExpectedExecutions.reject(),
- 1000,
- );
- await writeExpectedExecutions;
- await writevExpectedExecutions;
- clearTimeout(writeTimeout);
- clearTimeout(writevTimeout);
- assertEquals(writeExecuted, writeExecutedExpected);
- assertEquals(writevExecuted, writevExecutedExpected);
-});
-
-Deno.test("Duplex stream writes Uint8Array in object mode", async () => {
- let writeExecuted = 0;
- const writeExecutedExpected = 1;
- const writeExpectedExecutions = deferred();
-
- const ABC = new TextEncoder().encode("ABC");
-
- const writable = new Duplex({
- objectMode: true,
- write: (chunk, encoding, cb) => {
- writeExecuted++;
- if (writeExecuted == writeExecutedExpected) {
- writeExpectedExecutions.resolve();
- }
- assert(!(chunk instanceof Buffer));
- assert(chunk instanceof Uint8Array);
- assertEquals(chunk, ABC);
- assertEquals(encoding, "utf8");
- cb();
- },
- });
-
- writable.end(ABC);
-
- const writeTimeout = setTimeout(
- () => writeExpectedExecutions.reject(),
- 1000,
- );
- await writeExpectedExecutions;
- clearTimeout(writeTimeout);
- assertEquals(writeExecuted, writeExecutedExpected);
-});
-
-Deno.test("Duplex stream throws on unexpected close", async () => {
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const writable = new Duplex({
- write: () => {},
- });
- writable.writable = false;
- writable.destroy();
-
- finished(writable, (err) => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE");
- });
-
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- clearTimeout(finishedTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
-});
-
-Deno.test("Duplex stream finishes correctly after error", async () => {
- let errorExecuted = 0;
- const errorExecutedExpected = 1;
- const errorExpectedExecutions = deferred();
-
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const w = new Duplex({
- write(_chunk, _encoding, cb) {
- cb(new Error());
- },
- autoDestroy: false,
- });
- w.write("asd");
- w.on("error", () => {
- errorExecuted++;
- if (errorExecuted == errorExecutedExpected) {
- errorExpectedExecutions.resolve();
- }
- finished(w, () => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- });
- });
-
- const errorTimeout = setTimeout(
- () => errorExpectedExecutions.reject(),
- 1000,
- );
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- await errorExpectedExecutions;
- clearTimeout(finishedTimeout);
- clearTimeout(errorTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
- assertEquals(errorExecuted, errorExecutedExpected);
-});
-
-Deno.test("Duplex stream fails on 'write' null value", () => {
- const writable = new Duplex();
- assertThrows(() => writable.write(null));
-});
-
-Deno.test("Duplex stream is destroyed correctly", async () => {
- let closeExecuted = 0;
- const closeExecutedExpected = 1;
- const closeExpectedExecutions = deferred();
-
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- write(_chunk, _enc, cb) {
- cb();
- },
- read() {},
- });
-
- duplex.resume();
-
- function never() {
- unexpectedExecution.reject();
- }
-
- duplex.on("end", never);
- duplex.on("finish", never);
- duplex.on("close", () => {
- closeExecuted++;
- if (closeExecuted == closeExecutedExpected) {
- closeExpectedExecutions.resolve();
- }
- });
-
- duplex.destroy();
- assertEquals(duplex.destroyed, true);
-
- const closeTimeout = setTimeout(
- () => closeExpectedExecutions.reject(),
- 1000,
- );
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
- await closeExpectedExecutions;
- clearTimeout(closeTimeout);
- assertEquals(closeExecuted, closeExecutedExpected);
-});
-
-Deno.test("Duplex stream errors correctly on destroy", async () => {
- let errorExecuted = 0;
- const errorExecutedExpected = 1;
- const errorExpectedExecutions = deferred();
-
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- write(_chunk, _enc, cb) {
- cb();
- },
- read() {},
- });
- duplex.resume();
-
- const expected = new Error("kaboom");
-
- function never() {
- unexpectedExecution.reject();
- }
-
- duplex.on("end", never);
- duplex.on("finish", never);
- duplex.on("error", (err) => {
- errorExecuted++;
- if (errorExecuted == errorExecutedExpected) {
- errorExpectedExecutions.resolve();
- }
- assertStrictEquals(err, expected);
- });
-
- duplex.destroy(expected);
- assertEquals(duplex.destroyed, true);
-
- const errorTimeout = setTimeout(
- () => errorExpectedExecutions.reject(),
- 1000,
- );
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
- await errorExpectedExecutions;
- clearTimeout(errorTimeout);
- assertEquals(errorExecuted, errorExecutedExpected);
-});
-
-Deno.test("Duplex stream doesn't finish on allowHalfOpen", async () => {
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- read() {},
- });
-
- assertEquals(duplex.allowHalfOpen, true);
- duplex.on("finish", () => unexpectedExecution.reject());
- assertEquals(duplex.listenerCount("end"), 0);
- duplex.resume();
- duplex.push(null);
-
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
-});
-
-Deno.test("Duplex stream finishes when allowHalfOpen is disabled", async () => {
- let finishExecuted = 0;
- const finishExecutedExpected = 1;
- const finishExpectedExecutions = deferred();
-
- const duplex = new Duplex({
- read() {},
- allowHalfOpen: false,
- });
-
- assertEquals(duplex.allowHalfOpen, false);
- duplex.on("finish", () => {
- finishExecuted++;
- if (finishExecuted == finishExecutedExpected) {
- finishExpectedExecutions.resolve();
- }
- });
- assertEquals(duplex.listenerCount("end"), 0);
- duplex.resume();
- duplex.push(null);
-
- const finishTimeout = setTimeout(
- () => finishExpectedExecutions.reject(),
- 1000,
- );
- await finishExpectedExecutions;
- clearTimeout(finishTimeout);
- assertEquals(finishExecuted, finishExecutedExpected);
-});
-
-Deno.test("Duplex stream doesn't finish when allowHalfOpen is disabled but stream ended", async () => {
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- read() {},
- allowHalfOpen: false,
- });
-
- assertEquals(duplex.allowHalfOpen, false);
- duplex._writableState.ended = true;
- duplex.on("finish", () => unexpectedExecution.reject());
- assertEquals(duplex.listenerCount("end"), 0);
- duplex.resume();
- duplex.push(null);
-
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
-});
diff --git a/std/node/_stream/end_of_stream.ts b/std/node/_stream/end_of_stream.ts
deleted file mode 100644
index 6179e7fc4..000000000
--- a/std/node/_stream/end_of_stream.ts
+++ /dev/null
@@ -1,241 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { once } from "../_utils.ts";
-import type Duplex from "./duplex.ts";
-import type Readable from "./readable.ts";
-import type Stream from "./stream.ts";
-import type { ReadableState } from "./readable.ts";
-import type Writable from "./writable.ts";
-import type { WritableState } from "./writable.ts";
-import {
- ERR_INVALID_ARG_TYPE,
- ERR_STREAM_PREMATURE_CLOSE,
- NodeErrorAbstraction,
-} from "../_errors.ts";
-
-export type StreamImplementations = Duplex | Readable | Stream | Writable;
-
-// TODO(Soremwar)
-// Bring back once requests are implemented
-// function isRequest(stream: Stream) {
-// return stream.setHeader && typeof stream.abort === "function";
-// }
-
-// deno-lint-ignore no-explicit-any
-function isReadable(stream: any) {
- return typeof stream.readable === "boolean" ||
- typeof stream.readableEnded === "boolean" ||
- !!stream._readableState;
-}
-
-// deno-lint-ignore no-explicit-any
-function isWritable(stream: any) {
- return typeof stream.writable === "boolean" ||
- typeof stream.writableEnded === "boolean" ||
- !!stream._writableState;
-}
-
-function isWritableFinished(stream: Writable) {
- if (stream.writableFinished) return true;
- const wState = stream._writableState;
- if (!wState || wState.errored) return false;
- return wState.finished || (wState.ended && wState.length === 0);
-}
-
-function nop() {}
-
-function isReadableEnded(stream: Readable) {
- if (stream.readableEnded) return true;
- const rState = stream._readableState;
- if (!rState || rState.errored) return false;
- return rState.endEmitted || (rState.ended && rState.length === 0);
-}
-
-export interface FinishedOptions {
- error?: boolean;
- readable?: boolean;
- writable?: boolean;
-}
-
-/**
- * Appends an ending callback triggered when a stream is no longer readable,
- * writable or has experienced an error or a premature close event
-*/
-export default function eos(
- stream: StreamImplementations,
- options: FinishedOptions | null,
- callback: (err?: NodeErrorAbstraction | null) => void,
-): () => void;
-export default function eos(
- stream: StreamImplementations,
- callback: (err?: NodeErrorAbstraction | null) => void,
-): () => void;
-export default function eos(
- stream: StreamImplementations,
- x: FinishedOptions | ((err?: NodeErrorAbstraction | null) => void) | null,
- y?: (err?: NodeErrorAbstraction | null) => void,
-) {
- let opts: FinishedOptions;
- let callback: (err?: NodeErrorAbstraction | null) => void;
-
- if (!y) {
- if (typeof x !== "function") {
- throw new ERR_INVALID_ARG_TYPE("callback", "function", x);
- }
- opts = {};
- callback = x;
- } else {
- if (!x || Array.isArray(x) || typeof x !== "object") {
- throw new ERR_INVALID_ARG_TYPE("opts", "object", x);
- }
- opts = x;
-
- if (typeof y !== "function") {
- throw new ERR_INVALID_ARG_TYPE("callback", "function", y);
- }
- callback = y;
- }
-
- callback = once(callback);
-
- const readable = opts.readable ?? isReadable(stream);
- const writable = opts.writable ?? isWritable(stream);
-
- // deno-lint-ignore no-explicit-any
- const wState: WritableState | undefined = (stream as any)._writableState;
- // deno-lint-ignore no-explicit-any
- const rState: ReadableState | undefined = (stream as any)._readableState;
- const validState = wState || rState;
-
- const onlegacyfinish = () => {
- if (!(stream as Writable).writable) {
- onfinish();
- }
- };
-
- let willEmitClose = (
- validState?.autoDestroy &&
- validState?.emitClose &&
- validState?.closed === false &&
- isReadable(stream) === readable &&
- isWritable(stream) === writable
- );
-
- let writableFinished = (stream as Writable).writableFinished ||
- wState?.finished;
- const onfinish = () => {
- writableFinished = true;
- // deno-lint-ignore no-explicit-any
- if ((stream as any).destroyed) {
- willEmitClose = false;
- }
-
- if (willEmitClose && (!(stream as Readable).readable || readable)) {
- return;
- }
- if (!readable || readableEnded) {
- callback.call(stream);
- }
- };
-
- let readableEnded = (stream as Readable).readableEnded || rState?.endEmitted;
- const onend = () => {
- readableEnded = true;
- // deno-lint-ignore no-explicit-any
- if ((stream as any).destroyed) {
- willEmitClose = false;
- }
-
- if (willEmitClose && (!(stream as Writable).writable || writable)) {
- return;
- }
- if (!writable || writableFinished) {
- callback.call(stream);
- }
- };
-
- const onerror = (err: NodeErrorAbstraction) => {
- callback.call(stream, err);
- };
-
- const onclose = () => {
- if (readable && !readableEnded) {
- if (!isReadableEnded(stream as Readable)) {
- return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
- }
- }
- if (writable && !writableFinished) {
- if (!isWritableFinished(stream as Writable)) {
- return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
- }
- }
- callback.call(stream);
- };
-
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // const onrequest = () => {
- // stream.req.on("finish", onfinish);
- // };
-
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // if (isRequest(stream)) {
- // stream.on("complete", onfinish);
- // stream.on("abort", onclose);
- // if (stream.req) {
- // onrequest();
- // } else {
- // stream.on("request", onrequest);
- // }
- // } else
- if (writable && !wState) {
- stream.on("end", onlegacyfinish);
- stream.on("close", onlegacyfinish);
- }
-
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // if (typeof stream.aborted === "boolean") {
- // stream.on("aborted", onclose);
- // }
-
- stream.on("end", onend);
- stream.on("finish", onfinish);
- if (opts.error !== false) stream.on("error", onerror);
- stream.on("close", onclose);
-
- const closed = (
- wState?.closed ||
- rState?.closed ||
- wState?.errorEmitted ||
- rState?.errorEmitted ||
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // (rState && stream.req && stream.aborted) ||
- (
- (!writable || wState?.finished) &&
- (!readable || rState?.endEmitted)
- )
- );
-
- if (closed) {
- queueMicrotask(callback);
- }
-
- return function () {
- callback = nop;
- stream.removeListener("aborted", onclose);
- stream.removeListener("complete", onfinish);
- stream.removeListener("abort", onclose);
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // stream.removeListener("request", onrequest);
- // if (stream.req) stream.req.removeListener("finish", onfinish);
- stream.removeListener("end", onlegacyfinish);
- stream.removeListener("close", onlegacyfinish);
- stream.removeListener("finish", onfinish);
- stream.removeListener("end", onend);
- stream.removeListener("error", onerror);
- stream.removeListener("close", onclose);
- };
-}
diff --git a/std/node/_stream/end_of_stream_test.ts b/std/node/_stream/end_of_stream_test.ts
deleted file mode 100644
index 571e75b99..000000000
--- a/std/node/_stream/end_of_stream_test.ts
+++ /dev/null
@@ -1,97 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import finished from "./end_of_stream.ts";
-import Readable from "./readable.ts";
-import Transform from "./transform.ts";
-import Writable from "./writable.ts";
-import { mustCall } from "../_utils.ts";
-import { assert, fail } from "../../testing/asserts.ts";
-import { deferred, delay } from "../../async/mod.ts";
-
-Deno.test("Finished appends to Readable correctly", async () => {
- const rs = new Readable({
- read() {},
- });
-
- const [finishedExecution, finishedCb] = mustCall((err) => {
- assert(!err);
- });
-
- finished(rs, finishedCb);
-
- rs.push(null);
- rs.resume();
-
- await finishedExecution;
-});
-
-Deno.test("Finished appends to Writable correctly", async () => {
- const ws = new Writable({
- write(_data, _enc, cb) {
- cb();
- },
- });
-
- const [finishedExecution, finishedCb] = mustCall((err) => {
- assert(!err);
- });
-
- finished(ws, finishedCb);
-
- ws.end();
-
- await finishedExecution;
-});
-
-Deno.test("Finished appends to Transform correctly", async () => {
- const tr = new Transform({
- transform(_data, _enc, cb) {
- cb();
- },
- });
-
- let finish = false;
- let ended = false;
-
- tr.on("end", () => {
- ended = true;
- });
-
- tr.on("finish", () => {
- finish = true;
- });
-
- const [finishedExecution, finishedCb] = mustCall((err) => {
- assert(!err);
- assert(finish);
- assert(ended);
- });
-
- finished(tr, finishedCb);
-
- tr.end();
- tr.resume();
-
- await finishedExecution;
-});
-
-Deno.test("The function returned by Finished clears the listeners", async () => {
- const finishedExecution = deferred();
-
- const ws = new Writable({
- write(_data, _env, cb) {
- cb();
- },
- });
-
- const removeListener = finished(ws, () => {
- finishedExecution.reject();
- });
- removeListener();
- ws.end();
-
- await Promise.race([
- delay(100),
- finishedExecution,
- ])
- .catch(() => fail("Finished was executed"));
-});
diff --git a/std/node/_stream/from.ts b/std/node/_stream/from.ts
deleted file mode 100644
index 652c17715..000000000
--- a/std/node/_stream/from.ts
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Readable from "./readable.ts";
-import type { ReadableOptions } from "./readable.ts";
-import { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } from "../_errors.ts";
-
-export default function from(
- // deno-lint-ignore no-explicit-any
- iterable: Iterable<any> | AsyncIterable<any>,
- opts?: ReadableOptions,
-) {
- let iterator:
- // deno-lint-ignore no-explicit-any
- | Iterator<any, any, undefined>
- // deno-lint-ignore no-explicit-any
- | AsyncIterator<any, any, undefined>;
- if (typeof iterable === "string" || iterable instanceof Buffer) {
- return new Readable({
- objectMode: true,
- ...opts,
- read() {
- this.push(iterable);
- this.push(null);
- },
- });
- }
-
- if (Symbol.asyncIterator in iterable) {
- // deno-lint-ignore no-explicit-any
- iterator = (iterable as AsyncIterable<any>)[Symbol.asyncIterator]();
- } else if (Symbol.iterator in iterable) {
- // deno-lint-ignore no-explicit-any
- iterator = (iterable as Iterable<any>)[Symbol.iterator]();
- } else {
- throw new ERR_INVALID_ARG_TYPE("iterable", ["Iterable"], iterable);
- }
-
- const readable = new Readable({
- objectMode: true,
- highWaterMark: 1,
- ...opts,
- });
-
- // Reading boolean to protect against _read
- // being called before last iteration completion.
- let reading = false;
-
- // needToClose boolean if iterator needs to be explicitly closed
- let needToClose = false;
-
- readable._read = function () {
- if (!reading) {
- reading = true;
- next();
- }
- };
-
- readable._destroy = function (error, cb) {
- if (needToClose) {
- needToClose = false;
- close().then(
- () => queueMicrotask(() => cb(error)),
- (e) => queueMicrotask(() => cb(error || e)),
- );
- } else {
- cb(error);
- }
- };
-
- async function close() {
- if (typeof iterator.return === "function") {
- const { value } = await iterator.return();
- await value;
- }
- }
-
- async function next() {
- try {
- needToClose = false;
- const { value, done } = await iterator.next();
- needToClose = !done;
- if (done) {
- readable.push(null);
- } else if (readable.destroyed) {
- await close();
- } else {
- const res = await value;
- if (res === null) {
- reading = false;
- throw new ERR_STREAM_NULL_VALUES();
- } else if (readable.push(res)) {
- next();
- } else {
- reading = false;
- }
- }
- } catch (err) {
- readable.destroy(err);
- }
- }
- return readable;
-}
diff --git a/std/node/_stream/passthrough.ts b/std/node/_stream/passthrough.ts
deleted file mode 100644
index 9126420e5..000000000
--- a/std/node/_stream/passthrough.ts
+++ /dev/null
@@ -1,20 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import Transform from "./transform.ts";
-import type { TransformOptions } from "./transform.ts";
-import type { Encodings } from "../_utils.ts";
-
-export default class PassThrough extends Transform {
- constructor(options?: TransformOptions) {
- super(options);
- }
-
- _transform(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- _encoding: Encodings,
- // deno-lint-ignore no-explicit-any
- cb: (error?: Error | null, data?: any) => void,
- ) {
- cb(null, chunk);
- }
-}
diff --git a/std/node/_stream/pipeline.ts b/std/node/_stream/pipeline.ts
deleted file mode 100644
index d02a92870..000000000
--- a/std/node/_stream/pipeline.ts
+++ /dev/null
@@ -1,308 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { once } from "../_utils.ts";
-import { destroyer as implDestroyer } from "./destroy.ts";
-import eos from "./end_of_stream.ts";
-import createReadableStreamAsyncIterator from "./async_iterator.ts";
-import * as events from "../events.ts";
-import PassThrough from "./passthrough.ts";
-import {
- ERR_INVALID_ARG_TYPE,
- ERR_INVALID_CALLBACK,
- ERR_INVALID_RETURN_VALUE,
- ERR_MISSING_ARGS,
- ERR_STREAM_DESTROYED,
- NodeErrorAbstraction,
-} from "../_errors.ts";
-import type Duplex from "./duplex.ts";
-import type Readable from "./readable.ts";
-import type Stream from "./stream.ts";
-import type Transform from "./transform.ts";
-import type Writable from "./writable.ts";
-
-type Streams = Duplex | Readable | Writable;
-// deno-lint-ignore no-explicit-any
-type EndCallback = (err?: NodeErrorAbstraction | null, val?: any) => void;
-type TransformCallback =
- // deno-lint-ignore no-explicit-any
- | ((value?: any) => AsyncGenerator<any>)
- // deno-lint-ignore no-explicit-any
- | ((value?: any) => Promise<any>);
-/**
- * This type represents an array that contains a data source,
- * many Transform Streams, a writable stream destination
- * and end in an optional callback
- * */
-type DataSource =
- // deno-lint-ignore no-explicit-any
- | (() => AsyncGenerator<any>)
- | // deno-lint-ignore no-explicit-any
- AsyncIterable<any>
- | Duplex
- | // deno-lint-ignore no-explicit-any
- Iterable<any>
- | // deno-lint-ignore no-explicit-any
- (() => Generator<any>)
- | Readable;
-type Transformers = Duplex | Transform | TransformCallback | Writable;
-export type PipelineArguments = [
- DataSource,
- ...Array<Transformers | EndCallback>,
-];
-
-function destroyer(
- stream: Streams,
- reading: boolean,
- writing: boolean,
- callback: EndCallback,
-) {
- callback = once(callback);
-
- let finished = false;
- stream.on("close", () => {
- finished = true;
- });
-
- eos(stream, { readable: reading, writable: writing }, (err) => {
- finished = !err;
-
- // deno-lint-ignore no-explicit-any
- const rState = (stream as any)?._readableState;
- if (
- err &&
- err.code === "ERR_STREAM_PREMATURE_CLOSE" &&
- reading &&
- (rState?.ended && !rState?.errored && !rState?.errorEmitted)
- ) {
- stream
- .once("end", callback)
- .once("error", callback);
- } else {
- callback(err);
- }
- });
-
- return (err: NodeErrorAbstraction) => {
- if (finished) return;
- finished = true;
- implDestroyer(stream, err);
- callback(err || new ERR_STREAM_DESTROYED("pipe"));
- };
-}
-
-function popCallback(streams: PipelineArguments): EndCallback {
- if (typeof streams[streams.length - 1] !== "function") {
- throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]);
- }
- return streams.pop() as EndCallback;
-}
-
-// function isPromise(obj) {
-// return !!(obj && typeof obj.then === "function");
-// }
-
-// deno-lint-ignore no-explicit-any
-function isReadable(obj: any): obj is Stream {
- return !!(obj && typeof obj.pipe === "function");
-}
-
-// deno-lint-ignore no-explicit-any
-function isWritable(obj: any) {
- return !!(obj && typeof obj.write === "function");
-}
-
-// deno-lint-ignore no-explicit-any
-function isStream(obj: any) {
- return isReadable(obj) || isWritable(obj);
-}
-
-// deno-lint-ignore no-explicit-any
-function isIterable(obj: any, isAsync?: boolean) {
- if (!obj) return false;
- if (isAsync === true) return typeof obj[Symbol.asyncIterator] === "function";
- if (isAsync === false) return typeof obj[Symbol.iterator] === "function";
- return typeof obj[Symbol.asyncIterator] === "function" ||
- typeof obj[Symbol.iterator] === "function";
-}
-
-// deno-lint-ignore no-explicit-any
-function makeAsyncIterable(val: Readable | Iterable<any> | AsyncIterable<any>) {
- if (isIterable(val)) {
- return val;
- } else if (isReadable(val)) {
- return fromReadable(val as Readable);
- }
- throw new ERR_INVALID_ARG_TYPE(
- "val",
- ["Readable", "Iterable", "AsyncIterable"],
- val,
- );
-}
-
-async function* fromReadable(val: Readable) {
- yield* createReadableStreamAsyncIterator(val);
-}
-
-async function pump(
- // deno-lint-ignore no-explicit-any
- iterable: Iterable<any>,
- writable: Duplex | Writable,
- finish: (err?: NodeErrorAbstraction | null) => void,
-) {
- let error;
- try {
- for await (const chunk of iterable) {
- if (!writable.write(chunk)) {
- if (writable.destroyed) return;
- await events.once(writable, "drain");
- }
- }
- writable.end();
- } catch (err) {
- error = err;
- } finally {
- finish(error);
- }
-}
-
-export default function pipeline(...args: PipelineArguments) {
- const callback: EndCallback = once(popCallback(args));
-
- let streams: [DataSource, ...Transformers[]];
- if (args.length > 1) {
- streams = args as [DataSource, ...Transformers[]];
- } else {
- throw new ERR_MISSING_ARGS("streams");
- }
-
- let error: NodeErrorAbstraction;
- // deno-lint-ignore no-explicit-any
- let value: any;
- const destroys: Array<(err: NodeErrorAbstraction) => void> = [];
-
- let finishCount = 0;
-
- function finish(err?: NodeErrorAbstraction | null) {
- const final = --finishCount === 0;
-
- if (err && (!error || error.code === "ERR_STREAM_PREMATURE_CLOSE")) {
- error = err;
- }
-
- if (!error && !final) {
- return;
- }
-
- while (destroys.length) {
- (destroys.shift() as (err: NodeErrorAbstraction) => void)(error);
- }
-
- if (final) {
- callback(error, value);
- }
- }
-
- // TODO(Soremwar)
- // Simplify the hell out of this
- // deno-lint-ignore no-explicit-any
- let ret: any;
- for (let i = 0; i < streams.length; i++) {
- const stream = streams[i];
- const reading = i < streams.length - 1;
- const writing = i > 0;
-
- if (isStream(stream)) {
- finishCount++;
- destroys.push(destroyer(stream as Streams, reading, writing, finish));
- }
-
- if (i === 0) {
- if (typeof stream === "function") {
- ret = stream();
- if (!isIterable(ret)) {
- throw new ERR_INVALID_RETURN_VALUE(
- "Iterable, AsyncIterable or Stream",
- "source",
- ret,
- );
- }
- } else if (isIterable(stream) || isReadable(stream)) {
- ret = stream;
- } else {
- throw new ERR_INVALID_ARG_TYPE(
- "source",
- ["Stream", "Iterable", "AsyncIterable", "Function"],
- stream,
- );
- }
- } else if (typeof stream === "function") {
- ret = makeAsyncIterable(ret);
- ret = stream(ret);
-
- if (reading) {
- if (!isIterable(ret, true)) {
- throw new ERR_INVALID_RETURN_VALUE(
- "AsyncIterable",
- `transform[${i - 1}]`,
- ret,
- );
- }
- } else {
- // If the last argument to pipeline is not a stream
- // we must create a proxy stream so that pipeline(...)
- // always returns a stream which can be further
- // composed through `.pipe(stream)`.
- const pt = new PassThrough({
- objectMode: true,
- });
- if (ret instanceof Promise) {
- ret
- .then((val) => {
- value = val;
- pt.end(val);
- }, (err) => {
- pt.destroy(err);
- });
- } else if (isIterable(ret, true)) {
- finishCount++;
- pump(ret, pt, finish);
- } else {
- throw new ERR_INVALID_RETURN_VALUE(
- "AsyncIterable or Promise",
- "destination",
- ret,
- );
- }
-
- ret = pt;
-
- finishCount++;
- destroys.push(destroyer(ret, false, true, finish));
- }
- } else if (isStream(stream)) {
- if (isReadable(ret)) {
- ret.pipe(stream as Readable);
-
- // TODO(Soremwar)
- // Reimplement after stdout and stderr are implemented
- // if (stream === process.stdout || stream === process.stderr) {
- // ret.on("end", () => stream.end());
- // }
- } else {
- ret = makeAsyncIterable(ret);
-
- finishCount++;
- pump(ret, stream as Writable, finish);
- }
- ret = stream;
- } else {
- const name = reading ? `transform[${i - 1}]` : "destination";
- throw new ERR_INVALID_ARG_TYPE(
- name,
- ["Stream", "Function"],
- ret,
- );
- }
- }
-
- return ret as unknown as Readable;
-}
diff --git a/std/node/_stream/pipeline_test.ts b/std/node/_stream/pipeline_test.ts
deleted file mode 100644
index aa1869416..000000000
--- a/std/node/_stream/pipeline_test.ts
+++ /dev/null
@@ -1,387 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import PassThrough from "./passthrough.ts";
-import pipeline from "./pipeline.ts";
-import Readable from "./readable.ts";
-import Transform from "./transform.ts";
-import Writable from "./writable.ts";
-import { mustCall } from "../_utils.ts";
-import {
- assert,
- assertEquals,
- assertStrictEquals,
-} from "../../testing/asserts.ts";
-import type { NodeErrorAbstraction } from "../_errors.ts";
-
-Deno.test("Pipeline ends on stream finished", async () => {
- let finished = false;
-
- // deno-lint-ignore no-explicit-any
- const processed: any[] = [];
- const expected = [
- Buffer.from("a"),
- Buffer.from("b"),
- Buffer.from("c"),
- ];
-
- const read = new Readable({
- read() {},
- });
-
- const write = new Writable({
- write(data, _enc, cb) {
- processed.push(data);
- cb();
- },
- });
-
- write.on("finish", () => {
- finished = true;
- });
-
- for (let i = 0; i < expected.length; i++) {
- read.push(expected[i]);
- }
- read.push(null);
-
- const [finishedCompleted, finishedCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assert(finished);
- assertEquals(processed, expected);
- },
- 1,
- );
-
- pipeline(read, write, finishedCb);
-
- await finishedCompleted;
-});
-
-Deno.test("Pipeline fails on stream destroyed", async () => {
- const read = new Readable({
- read() {},
- });
-
- const write = new Writable({
- write(_data, _enc, cb) {
- cb();
- },
- });
-
- read.push("data");
- queueMicrotask(() => read.destroy());
-
- const [pipelineExecuted, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(err);
- },
- 1,
- );
- pipeline(read, write, pipelineCb);
-
- await pipelineExecuted;
-});
-
-Deno.test("Pipeline exits on stream error", async () => {
- const read = new Readable({
- read() {},
- });
-
- const transform = new Transform({
- transform(_data, _enc, cb) {
- cb(new Error("kaboom"));
- },
- });
-
- const write = new Writable({
- write(_data, _enc, cb) {
- cb();
- },
- });
-
- const [readExecution, readCb] = mustCall();
- read.on("close", readCb);
- const [closeExecution, closeCb] = mustCall();
- transform.on("close", closeCb);
- const [writeExecution, writeCb] = mustCall();
- write.on("close", writeCb);
-
- const errorExecutions = [read, transform, write]
- .map((stream) => {
- const [execution, cb] = mustCall((err?: NodeErrorAbstraction | null) => {
- assertEquals(err, new Error("kaboom"));
- });
-
- stream.on("error", cb);
- return execution;
- });
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assertEquals(err, new Error("kaboom"));
- },
- );
- const dst = pipeline(read, transform, write, pipelineCb);
-
- assertStrictEquals(dst, write);
-
- read.push("hello");
-
- await readExecution;
- await closeExecution;
- await writeExecution;
- await Promise.all(errorExecutions);
- await pipelineExecution;
-});
-
-Deno.test("Pipeline processes iterators correctly", async () => {
- let res = "";
- const w = new Writable({
- write(chunk, _encoding, callback) {
- res += chunk;
- callback();
- },
- });
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assertEquals(res, "helloworld");
- },
- );
- pipeline(
- function* () {
- yield "hello";
- yield "world";
- }(),
- w,
- pipelineCb,
- );
-
- await pipelineExecution;
-});
-
-Deno.test("Pipeline processes async iterators correctly", async () => {
- let res = "";
- const w = new Writable({
- write(chunk, _encoding, callback) {
- res += chunk;
- callback();
- },
- });
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assertEquals(res, "helloworld");
- },
- );
- pipeline(
- async function* () {
- await Promise.resolve();
- yield "hello";
- yield "world";
- }(),
- w,
- pipelineCb,
- );
-
- await pipelineExecution;
-});
-
-Deno.test("Pipeline processes generators correctly", async () => {
- let res = "";
- const w = new Writable({
- write(chunk, _encoding, callback) {
- res += chunk;
- callback();
- },
- });
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assertEquals(res, "helloworld");
- },
- );
- pipeline(
- function* () {
- yield "hello";
- yield "world";
- },
- w,
- pipelineCb,
- );
-
- await pipelineExecution;
-});
-
-Deno.test("Pipeline processes async generators correctly", async () => {
- let res = "";
- const w = new Writable({
- write(chunk, _encoding, callback) {
- res += chunk;
- callback();
- },
- });
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assertEquals(res, "helloworld");
- },
- );
- pipeline(
- async function* () {
- await Promise.resolve();
- yield "hello";
- yield "world";
- },
- w,
- pipelineCb,
- );
-
- await pipelineExecution;
-});
-
-Deno.test("Pipeline handles generator transforms", async () => {
- let res = "";
-
- const [pipelineExecuted, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assertEquals(res, "HELLOWORLD");
- },
- );
- pipeline(
- async function* () {
- await Promise.resolve();
- yield "hello";
- yield "world";
- },
- async function* (source: string[]) {
- for await (const chunk of source) {
- yield chunk.toUpperCase();
- }
- },
- async function (source: string[]) {
- for await (const chunk of source) {
- res += chunk;
- }
- },
- pipelineCb,
- );
-
- await pipelineExecuted;
-});
-
-Deno.test("Pipeline passes result to final callback", async () => {
- const [pipelineExecuted, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null, val?: unknown) => {
- assert(!err);
- assertEquals(val, "HELLOWORLD");
- },
- );
- pipeline(
- async function* () {
- await Promise.resolve();
- yield "hello";
- yield "world";
- },
- async function* (source: string[]) {
- for await (const chunk of source) {
- yield chunk.toUpperCase();
- }
- },
- async function (source: string[]) {
- let ret = "";
- for await (const chunk of source) {
- ret += chunk;
- }
- return ret;
- },
- pipelineCb,
- );
-
- await pipelineExecuted;
-});
-
-Deno.test("Pipeline returns a stream after ending", async () => {
- const [pipelineExecuted, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assertEquals(err, undefined);
- },
- );
- const ret = pipeline(
- async function* () {
- await Promise.resolve();
- yield "hello";
- },
- // deno-lint-ignore require-yield
- async function* (source: string[]) {
- for await (const chunk of source) {
- chunk;
- }
- },
- pipelineCb,
- );
-
- ret.resume();
-
- assertEquals(typeof ret.pipe, "function");
-
- await pipelineExecuted;
-});
-
-Deno.test("Pipeline returns a stream after erroring", async () => {
- const errorText = "kaboom";
-
- const [pipelineExecuted, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assertEquals(err?.message, errorText);
- },
- );
- const ret = pipeline(
- // deno-lint-ignore require-yield
- async function* () {
- await Promise.resolve();
- throw new Error(errorText);
- },
- // deno-lint-ignore require-yield
- async function* (source: string[]) {
- for await (const chunk of source) {
- chunk;
- }
- },
- pipelineCb,
- );
-
- ret.resume();
-
- assertEquals(typeof ret.pipe, "function");
-
- await pipelineExecuted;
-});
-
-Deno.test("Pipeline destination gets destroyed on error", async () => {
- const errorText = "kaboom";
- const s = new PassThrough();
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assertEquals(err?.message, errorText);
- assertEquals(s.destroyed, true);
- },
- );
- pipeline(
- // deno-lint-ignore require-yield
- async function* () {
- throw new Error(errorText);
- },
- s,
- pipelineCb,
- );
-
- await pipelineExecution;
-});
diff --git a/std/node/_stream/promises.ts b/std/node/_stream/promises.ts
deleted file mode 100644
index 1adf4ea3f..000000000
--- a/std/node/_stream/promises.ts
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import pl from "./pipeline.ts";
-import type { PipelineArguments } from "./pipeline.ts";
-import eos from "./end_of_stream.ts";
-import type {
- FinishedOptions,
- StreamImplementations as FinishedStreams,
-} from "./end_of_stream.ts";
-
-export function pipeline(...streams: PipelineArguments) {
- return new Promise((resolve, reject) => {
- pl(
- ...streams,
- (err, value) => {
- if (err) {
- reject(err);
- } else {
- resolve(value);
- }
- },
- );
- });
-}
-
-export function finished(
- stream: FinishedStreams,
- opts?: FinishedOptions,
-) {
- return new Promise<void>((resolve, reject) => {
- eos(
- stream,
- opts || null,
- (err) => {
- if (err) {
- reject(err);
- } else {
- resolve();
- }
- },
- );
- });
-}
diff --git a/std/node/_stream/promises_test.ts b/std/node/_stream/promises_test.ts
deleted file mode 100644
index 90803b4af..000000000
--- a/std/node/_stream/promises_test.ts
+++ /dev/null
@@ -1,84 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Readable from "./readable.ts";
-import Writable from "./writable.ts";
-import { pipeline } from "./promises.ts";
-import { deferred } from "../../async/mod.ts";
-import {
- assert,
- assertEquals,
- assertThrowsAsync,
-} from "../../testing/asserts.ts";
-
-Deno.test("Promise pipeline works correctly", async () => {
- let pipelineExecuted = 0;
- const pipelineExecutedExpected = 1;
- const pipelineExpectedExecutions = deferred();
-
- let finished = false;
- // deno-lint-ignore no-explicit-any
- const processed: any[] = [];
- const expected = [
- Buffer.from("a"),
- Buffer.from("b"),
- Buffer.from("c"),
- ];
-
- const read = new Readable({
- read() {},
- });
-
- const write = new Writable({
- write(data, _enc, cb) {
- processed.push(data);
- cb();
- },
- });
-
- write.on("finish", () => {
- finished = true;
- });
-
- for (let i = 0; i < expected.length; i++) {
- read.push(expected[i]);
- }
- read.push(null);
-
- pipeline(read, write).then(() => {
- pipelineExecuted++;
- if (pipelineExecuted == pipelineExecutedExpected) {
- pipelineExpectedExecutions.resolve();
- }
- assert(finished);
- assertEquals(processed, expected);
- });
-
- const pipelineTimeout = setTimeout(
- () => pipelineExpectedExecutions.reject(),
- 1000,
- );
- await pipelineExpectedExecutions;
- clearTimeout(pipelineTimeout);
- assertEquals(pipelineExecuted, pipelineExecutedExpected);
-});
-
-Deno.test("Promise pipeline throws on readable destroyed", async () => {
- const read = new Readable({
- read() {},
- });
-
- const write = new Writable({
- write(_data, _enc, cb) {
- cb();
- },
- });
-
- read.push("data");
- read.destroy();
-
- await assertThrowsAsync(
- () => pipeline(read, write),
- Error,
- "Premature close",
- );
-});
diff --git a/std/node/_stream/readable.ts b/std/node/_stream/readable.ts
deleted file mode 100644
index 54e0d8ecd..000000000
--- a/std/node/_stream/readable.ts
+++ /dev/null
@@ -1,788 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { captureRejectionSymbol } from "../events.ts";
-import Stream from "./stream.ts";
-import type { Buffer } from "../buffer.ts";
-import BufferList from "./buffer_list.ts";
-import {
- ERR_INVALID_OPT_VALUE,
- ERR_METHOD_NOT_IMPLEMENTED,
-} from "../_errors.ts";
-import type { Encodings } from "../_utils.ts";
-import { StringDecoder } from "../string_decoder.ts";
-import createReadableStreamAsyncIterator from "./async_iterator.ts";
-import streamFrom from "./from.ts";
-import { kDestroy, kPaused } from "./symbols.ts";
-import {
- _destroy,
- computeNewHighWaterMark,
- emitReadable,
- endReadable,
- errorOrDestroy,
- fromList,
- howMuchToRead,
- nReadingNextTick,
- pipeOnDrain,
- prependListener,
- readableAddChunk,
- resume,
- updateReadableListening,
-} from "./readable_internal.ts";
-import Writable from "./writable.ts";
-import { errorOrDestroy as errorOrDestroyWritable } from "./writable_internal.ts";
-import Duplex, { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts";
-
-export interface ReadableOptions {
- autoDestroy?: boolean;
- defaultEncoding?: Encodings;
- destroy?(
- this: Readable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ): void;
- emitClose?: boolean;
- encoding?: Encodings;
- highWaterMark?: number;
- objectMode?: boolean;
- read?(this: Readable): void;
-}
-
-export class ReadableState {
- [kPaused]: boolean | null = null;
- awaitDrainWriters: Duplex | Writable | Set<Duplex | Writable> | null = null;
- buffer = new BufferList();
- closed = false;
- closeEmitted = false;
- constructed: boolean;
- decoder: StringDecoder | null = null;
- destroyed = false;
- emittedReadable = false;
- encoding: Encodings | null = null;
- ended = false;
- endEmitted = false;
- errored: Error | null = null;
- errorEmitted = false;
- flowing: boolean | null = null;
- highWaterMark: number;
- length = 0;
- multiAwaitDrain = false;
- needReadable = false;
- objectMode: boolean;
- pipes: Array<Duplex | Writable> = [];
- readable = true;
- readableListening = false;
- reading = false;
- readingMore = false;
- resumeScheduled = false;
- sync = true;
- emitClose: boolean;
- autoDestroy: boolean;
- defaultEncoding: string;
-
- constructor(options?: ReadableOptions) {
- this.objectMode = !!options?.objectMode;
-
- this.highWaterMark = options?.highWaterMark ??
- (this.objectMode ? 16 : 16 * 1024);
- if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) {
- this.highWaterMark = Math.floor(this.highWaterMark);
- } else {
- throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark);
- }
-
- this.emitClose = options?.emitClose ?? true;
- this.autoDestroy = options?.autoDestroy ?? true;
- this.defaultEncoding = options?.defaultEncoding || "utf8";
-
- if (options?.encoding) {
- this.decoder = new StringDecoder(options.encoding);
- this.encoding = options.encoding;
- }
-
- this.constructed = true;
- }
-}
-
-class Readable extends Stream {
- _readableState: ReadableState;
-
- constructor(options?: ReadableOptions) {
- super();
- if (options) {
- if (typeof options.read === "function") {
- this._read = options.read;
- }
- if (typeof options.destroy === "function") {
- this._destroy = options.destroy;
- }
- }
- this._readableState = new ReadableState(options);
- }
-
- static from(
- // deno-lint-ignore no-explicit-any
- iterable: Iterable<any> | AsyncIterable<any>,
- opts?: ReadableOptions,
- ): Readable {
- return streamFrom(iterable, opts);
- }
-
- static ReadableState = ReadableState;
-
- static _fromList = fromList;
-
- // You can override either this method, or the async _read(n) below.
- read(n?: number) {
- // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
- // in this scenario, so we are doing it manually.
- if (n === undefined) {
- n = NaN;
- }
- const state = this._readableState;
- const nOrig = n;
-
- if (n > state.highWaterMark) {
- state.highWaterMark = computeNewHighWaterMark(n);
- }
-
- if (n !== 0) {
- state.emittedReadable = false;
- }
-
- if (
- n === 0 &&
- state.needReadable &&
- ((state.highWaterMark !== 0
- ? state.length >= state.highWaterMark
- : state.length > 0) ||
- state.ended)
- ) {
- if (state.length === 0 && state.ended) {
- endReadable(this);
- } else {
- emitReadable(this);
- }
- return null;
- }
-
- n = howMuchToRead(n, state);
-
- if (n === 0 && state.ended) {
- if (state.length === 0) {
- endReadable(this);
- }
- return null;
- }
-
- let doRead = state.needReadable;
- if (
- state.length === 0 || state.length - (n as number) < state.highWaterMark
- ) {
- doRead = true;
- }
-
- if (
- state.ended || state.reading || state.destroyed || state.errored ||
- !state.constructed
- ) {
- doRead = false;
- } else if (doRead) {
- state.reading = true;
- state.sync = true;
- if (state.length === 0) {
- state.needReadable = true;
- }
- this._read();
- state.sync = false;
- if (!state.reading) {
- n = howMuchToRead(nOrig, state);
- }
- }
-
- let ret;
- if ((n as number) > 0) {
- ret = fromList((n as number), state);
- } else {
- ret = null;
- }
-
- if (ret === null) {
- state.needReadable = state.length <= state.highWaterMark;
- n = 0;
- } else {
- state.length -= n as number;
- if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).clear();
- } else {
- state.awaitDrainWriters = null;
- }
- }
-
- if (state.length === 0) {
- if (!state.ended) {
- state.needReadable = true;
- }
-
- if (nOrig !== n && state.ended) {
- endReadable(this);
- }
- }
-
- if (ret !== null) {
- this.emit("data", ret);
- }
-
- return ret;
- }
-
- _read(_size?: number) {
- throw new ERR_METHOD_NOT_IMPLEMENTED("_read()");
- }
-
- pipe<T extends Duplex | Writable>(dest: T, pipeOpts?: { end?: boolean }): T {
- // deno-lint-ignore no-this-alias
- const src = this;
- const state = this._readableState;
-
- if (state.pipes.length === 1) {
- if (!state.multiAwaitDrain) {
- state.multiAwaitDrain = true;
- state.awaitDrainWriters = new Set(
- state.awaitDrainWriters ? [state.awaitDrainWriters as Writable] : [],
- );
- }
- }
-
- state.pipes.push(dest);
-
- const doEnd = (!pipeOpts || pipeOpts.end !== false);
-
- //TODO(Soremwar)
- //Part of doEnd condition
- //In node, output/input are a duplex Stream
- // &&
- // dest !== stdout &&
- // dest !== stderr
-
- const endFn = doEnd ? onend : unpipe;
- if (state.endEmitted) {
- queueMicrotask(endFn);
- } else {
- this.once("end", endFn);
- }
-
- dest.on("unpipe", onunpipe);
- function onunpipe(readable: Readable, unpipeInfo: { hasUnpiped: boolean }) {
- if (readable === src) {
- if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
- unpipeInfo.hasUnpiped = true;
- cleanup();
- }
- }
- }
-
- function onend() {
- dest.end();
- }
-
- let ondrain: () => void;
-
- let cleanedUp = false;
- function cleanup() {
- dest.removeListener("close", onclose);
- dest.removeListener("finish", onfinish);
- if (ondrain) {
- dest.removeListener("drain", ondrain);
- }
- dest.removeListener("error", onerror);
- dest.removeListener("unpipe", onunpipe);
- src.removeListener("end", onend);
- src.removeListener("end", unpipe);
- src.removeListener("data", ondata);
-
- cleanedUp = true;
- if (
- ondrain && state.awaitDrainWriters &&
- (!dest._writableState || dest._writableState.needDrain)
- ) {
- ondrain();
- }
- }
-
- this.on("data", ondata);
- // deno-lint-ignore no-explicit-any
- function ondata(chunk: any) {
- const ret = dest.write(chunk);
- if (ret === false) {
- if (!cleanedUp) {
- if (state.pipes.length === 1 && state.pipes[0] === dest) {
- state.awaitDrainWriters = dest;
- state.multiAwaitDrain = false;
- } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
- (state.awaitDrainWriters as Set<Duplex | Writable>).add(dest);
- }
- src.pause();
- }
- if (!ondrain) {
- ondrain = pipeOnDrain(src, dest);
- dest.on("drain", ondrain);
- }
- }
- }
-
- function onerror(er: Error) {
- unpipe();
- dest.removeListener("error", onerror);
- if (dest.listenerCount("error") === 0) {
- const s = dest._writableState || (dest as Duplex)._readableState;
- if (s && !s.errorEmitted) {
- if (dest instanceof Duplex) {
- errorOrDestroyDuplex(dest as unknown as Duplex, er);
- } else {
- errorOrDestroyWritable(dest as Writable, er);
- }
- } else {
- dest.emit("error", er);
- }
- }
- }
-
- prependListener(dest, "error", onerror);
-
- function onclose() {
- dest.removeListener("finish", onfinish);
- unpipe();
- }
- dest.once("close", onclose);
- function onfinish() {
- dest.removeListener("close", onclose);
- unpipe();
- }
- dest.once("finish", onfinish);
-
- function unpipe() {
- src.unpipe(dest as Writable);
- }
-
- dest.emit("pipe", this);
-
- if (!state.flowing) {
- this.resume();
- }
-
- return dest;
- }
-
- isPaused() {
- return this._readableState[kPaused] === true ||
- this._readableState.flowing === false;
- }
-
- setEncoding(enc: Encodings) {
- const decoder = new StringDecoder(enc);
- this._readableState.decoder = decoder;
- this._readableState.encoding = this._readableState.decoder
- .encoding as Encodings;
-
- const buffer = this._readableState.buffer;
- let content = "";
- for (const data of buffer) {
- content += decoder.write(data as Buffer);
- }
- buffer.clear();
- if (content !== "") {
- buffer.push(content);
- }
- this._readableState.length = content.length;
- return this;
- }
-
- on(
- event: "close" | "end" | "pause" | "readable" | "resume",
- listener: () => void,
- ): this;
- // deno-lint-ignore no-explicit-any
- on(event: "data", listener: (chunk: any) => void): this;
- on(event: "error", listener: (err: Error) => void): this;
- // deno-lint-ignore no-explicit-any
- on(event: string | symbol, listener: (...args: any[]) => void): this;
- on(
- ev: string | symbol,
- fn:
- | (() => void)
- // deno-lint-ignore no-explicit-any
- | ((chunk: any) => void)
- | ((err: Error) => void)
- // deno-lint-ignore no-explicit-any
- | ((...args: any[]) => void),
- ) {
- const res = super.on.call(this, ev, fn);
- const state = this._readableState;
-
- if (ev === "data") {
- state.readableListening = this.listenerCount("readable") > 0;
-
- if (state.flowing !== false) {
- this.resume();
- }
- } else if (ev === "readable") {
- if (!state.endEmitted && !state.readableListening) {
- state.readableListening = state.needReadable = true;
- state.flowing = false;
- state.emittedReadable = false;
- if (state.length) {
- emitReadable(this);
- } else if (!state.reading) {
- queueMicrotask(() => nReadingNextTick(this));
- }
- }
- }
-
- return res;
- }
-
- removeListener(
- event: "close" | "end" | "pause" | "readable" | "resume",
- listener: () => void,
- ): this;
- // deno-lint-ignore no-explicit-any
- removeListener(event: "data", listener: (chunk: any) => void): this;
- removeListener(event: "error", listener: (err: Error) => void): this;
- removeListener(
- event: string | symbol,
- // deno-lint-ignore no-explicit-any
- listener: (...args: any[]) => void,
- ): this;
- removeListener(
- ev: string | symbol,
- fn:
- | (() => void)
- // deno-lint-ignore no-explicit-any
- | ((chunk: any) => void)
- | ((err: Error) => void)
- // deno-lint-ignore no-explicit-any
- | ((...args: any[]) => void),
- ) {
- const res = super.removeListener.call(this, ev, fn);
-
- if (ev === "readable") {
- queueMicrotask(() => updateReadableListening(this));
- }
-
- return res;
- }
-
- off = this.removeListener;
-
- destroy(err?: Error | null, cb?: () => void) {
- const r = this._readableState;
-
- if (r.destroyed) {
- if (typeof cb === "function") {
- cb();
- }
-
- return this;
- }
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- r.destroyed = true;
-
- // If still constructing then defer calling _destroy.
- if (!r.constructed) {
- this.once(kDestroy, (er: Error) => {
- _destroy(this, err || er, cb);
- });
- } else {
- _destroy(this, err, cb);
- }
-
- return this;
- }
-
- _undestroy() {
- const r = this._readableState;
- r.constructed = true;
- r.closed = false;
- r.closeEmitted = false;
- r.destroyed = false;
- r.errored = null;
- r.errorEmitted = false;
- r.reading = false;
- r.ended = false;
- r.endEmitted = false;
- }
-
- _destroy(
- error: Error | null,
- callback: (error?: Error | null) => void,
- ): void {
- callback(error);
- }
-
- [captureRejectionSymbol](err: Error) {
- this.destroy(err);
- }
-
- // deno-lint-ignore no-explicit-any
- push(chunk: any, encoding?: Encodings): boolean {
- return readableAddChunk(this, chunk, encoding, false);
- }
-
- // deno-lint-ignore no-explicit-any
- unshift(chunk: any, encoding?: string): boolean {
- return readableAddChunk(this, chunk, encoding, true);
- }
-
- unpipe(dest?: Writable): this {
- const state = this._readableState;
- const unpipeInfo = { hasUnpiped: false };
-
- if (state.pipes.length === 0) {
- return this;
- }
-
- if (!dest) {
- // remove all.
- const dests = state.pipes;
- state.pipes = [];
- this.pause();
-
- for (const dest of dests) {
- dest.emit("unpipe", this, { hasUnpiped: false });
- }
- return this;
- }
-
- const index = state.pipes.indexOf(dest);
- if (index === -1) {
- return this;
- }
-
- state.pipes.splice(index, 1);
- if (state.pipes.length === 0) {
- this.pause();
- }
-
- dest.emit("unpipe", this, unpipeInfo);
-
- return this;
- }
-
- removeAllListeners(
- ev:
- | "close"
- | "data"
- | "end"
- | "error"
- | "pause"
- | "readable"
- | "resume"
- | symbol
- | undefined,
- ) {
- const res = super.removeAllListeners(ev);
-
- if (ev === "readable" || ev === undefined) {
- queueMicrotask(() => updateReadableListening(this));
- }
-
- return res;
- }
-
- resume() {
- const state = this._readableState;
- if (!state.flowing) {
- // We flow only if there is no one listening
- // for readable, but we still have to call
- // resume().
- state.flowing = !state.readableListening;
- resume(this, state);
- }
- state[kPaused] = false;
- return this;
- }
-
- pause() {
- if (this._readableState.flowing !== false) {
- this._readableState.flowing = false;
- this.emit("pause");
- }
- this._readableState[kPaused] = true;
- return this;
- }
-
- /** Wrap an old-style stream as the async data source. */
- wrap(stream: Stream): this {
- const state = this._readableState;
- let paused = false;
-
- stream.on("end", () => {
- if (state.decoder && !state.ended) {
- const chunk = state.decoder.end();
- if (chunk && chunk.length) {
- this.push(chunk);
- }
- }
-
- this.push(null);
- });
-
- stream.on("data", (chunk) => {
- if (state.decoder) {
- chunk = state.decoder.write(chunk);
- }
-
- if (state.objectMode && (chunk === null || chunk === undefined)) {
- return;
- } else if (!state.objectMode && (!chunk || !chunk.length)) {
- return;
- }
-
- const ret = this.push(chunk);
- if (!ret) {
- paused = true;
- // By the time this is triggered, stream will be a readable stream
- // deno-lint-ignore ban-ts-comment
- // @ts-ignore
- stream.pause();
- }
- });
-
- // TODO(Soremwar)
- // There must be a clean way to implement this on TypeScript
- // Proxy all the other methods. Important when wrapping filters and duplexes.
- for (const i in stream) {
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- if (this[i] === undefined && typeof stream[i] === "function") {
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- this[i] = function methodWrap(method) {
- return function methodWrapReturnFunction() {
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- return stream[method].apply(stream);
- };
- }(i);
- }
- }
-
- stream.on("error", (err) => {
- errorOrDestroy(this, err);
- });
-
- stream.on("close", () => {
- this.emit("close");
- });
-
- stream.on("destroy", () => {
- this.emit("destroy");
- });
-
- stream.on("pause", () => {
- this.emit("pause");
- });
-
- stream.on("resume", () => {
- this.emit("resume");
- });
-
- this._read = () => {
- if (paused) {
- paused = false;
- // By the time this is triggered, stream will be a readable stream
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- stream.resume();
- }
- };
-
- return this;
- }
-
- [Symbol.asyncIterator]() {
- return createReadableStreamAsyncIterator(this);
- }
-
- get readable(): boolean {
- return this._readableState?.readable &&
- !this._readableState?.destroyed &&
- !this._readableState?.errorEmitted &&
- !this._readableState?.endEmitted;
- }
- set readable(val: boolean) {
- if (this._readableState) {
- this._readableState.readable = val;
- }
- }
-
- get readableHighWaterMark(): number {
- return this._readableState.highWaterMark;
- }
-
- get readableBuffer() {
- return this._readableState && this._readableState.buffer;
- }
-
- get readableFlowing(): boolean | null {
- return this._readableState.flowing;
- }
-
- set readableFlowing(state: boolean | null) {
- if (this._readableState) {
- this._readableState.flowing = state;
- }
- }
-
- get readableLength() {
- return this._readableState.length;
- }
-
- get readableObjectMode() {
- return this._readableState ? this._readableState.objectMode : false;
- }
-
- get readableEncoding() {
- return this._readableState ? this._readableState.encoding : null;
- }
-
- get destroyed() {
- if (this._readableState === undefined) {
- return false;
- }
- return this._readableState.destroyed;
- }
-
- set destroyed(value: boolean) {
- if (!this._readableState) {
- return;
- }
- this._readableState.destroyed = value;
- }
-
- get readableEnded() {
- return this._readableState ? this._readableState.endEmitted : false;
- }
-}
-
-Object.defineProperties(Readable, {
- _readableState: { enumerable: false },
- destroyed: { enumerable: false },
- readableBuffer: { enumerable: false },
- readableEncoding: { enumerable: false },
- readableEnded: { enumerable: false },
- readableFlowing: { enumerable: false },
- readableHighWaterMark: { enumerable: false },
- readableLength: { enumerable: false },
- readableObjectMode: { enumerable: false },
-});
-
-export default Readable;
diff --git a/std/node/_stream/readable_internal.ts b/std/node/_stream/readable_internal.ts
deleted file mode 100644
index 0ef261d4d..000000000
--- a/std/node/_stream/readable_internal.ts
+++ /dev/null
@@ -1,438 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import type Duplex from "./duplex.ts";
-import type EventEmitter from "../events.ts";
-import type Readable from "./readable.ts";
-import type Writable from "./writable.ts";
-import type { ReadableState } from "./readable.ts";
-import { kPaused } from "./symbols.ts";
-import {
- ERR_STREAM_PUSH_AFTER_EOF,
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
-} from "../_errors.ts";
-
-export function _destroy(
- self: Readable,
- err?: Error | null,
- cb?: (error?: Error | null) => void,
-) {
- self._destroy(err || null, (err) => {
- const r = (self as Readable)._readableState;
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- r.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- if (!r.errorEmitted) {
- r.errorEmitted = true;
- self.emit("error", err);
- }
- r.closeEmitted = true;
- if (r.emitClose) {
- self.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- r.closeEmitted = true;
- if (r.emitClose) {
- self.emit("close");
- }
- });
- }
- });
-}
-
-export function addChunk(
- stream: Duplex | Readable,
- state: ReadableState,
- chunk: string | Buffer | Uint8Array,
- addToFront: boolean,
-) {
- if (state.flowing && state.length === 0 && !state.sync) {
- if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).clear();
- } else {
- state.awaitDrainWriters = null;
- }
- stream.emit("data", chunk);
- } else {
- // Update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length;
- if (addToFront) {
- state.buffer.unshift(chunk);
- } else {
- state.buffer.push(chunk);
- }
-
- if (state.needReadable) {
- emitReadable(stream);
- }
- }
- maybeReadMore(stream, state);
-}
-
-// Don't raise the hwm > 1GB.
-const MAX_HWM = 0x40000000;
-export function computeNewHighWaterMark(n: number) {
- if (n >= MAX_HWM) {
- n = MAX_HWM;
- } else {
- n--;
- n |= n >>> 1;
- n |= n >>> 2;
- n |= n >>> 4;
- n |= n >>> 8;
- n |= n >>> 16;
- n++;
- }
- return n;
-}
-
-export function emitReadable(stream: Duplex | Readable) {
- const state = stream._readableState;
- state.needReadable = false;
- if (!state.emittedReadable) {
- state.emittedReadable = true;
- queueMicrotask(() => emitReadable_(stream));
- }
-}
-
-function emitReadable_(stream: Duplex | Readable) {
- const state = stream._readableState;
- if (!state.destroyed && !state.errored && (state.length || state.ended)) {
- stream.emit("readable");
- state.emittedReadable = false;
- }
-
- state.needReadable = !state.flowing &&
- !state.ended &&
- state.length <= state.highWaterMark;
- flow(stream);
-}
-
-export function endReadable(stream: Readable) {
- const state = stream._readableState;
-
- if (!state.endEmitted) {
- state.ended = true;
- queueMicrotask(() => endReadableNT(state, stream));
- }
-}
-
-function endReadableNT(state: ReadableState, stream: Readable) {
- if (
- !state.errorEmitted && !state.closeEmitted &&
- !state.endEmitted && state.length === 0
- ) {
- state.endEmitted = true;
- stream.emit("end");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
- }
-}
-
-export function errorOrDestroy(
- stream: Duplex | Readable,
- err: Error,
- sync = false,
-) {
- const r = stream._readableState;
-
- if (r.destroyed) {
- return stream;
- }
-
- if (r.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!r.errored) {
- r.errored = err;
- }
- if (sync) {
- queueMicrotask(() => {
- if (!r.errorEmitted) {
- r.errorEmitted = true;
- stream.emit("error", err);
- }
- });
- } else if (!r.errorEmitted) {
- r.errorEmitted = true;
- stream.emit("error", err);
- }
- }
-}
-
-function flow(stream: Duplex | Readable) {
- const state = stream._readableState;
- while (state.flowing && stream.read() !== null);
-}
-
-/** Pluck off n bytes from an array of buffers.
-* Length is the combined lengths of all the buffers in the list.
-* This function is designed to be inlinable, so please take care when making
-* changes to the function body.
-*/
-export function fromList(n: number, state: ReadableState) {
- // nothing buffered.
- if (state.length === 0) {
- return null;
- }
-
- let ret;
- if (state.objectMode) {
- ret = state.buffer.shift();
- } else if (!n || n >= state.length) {
- if (state.decoder) {
- ret = state.buffer.join("");
- } else if (state.buffer.length === 1) {
- ret = state.buffer.first();
- } else {
- ret = state.buffer.concat(state.length);
- }
- state.buffer.clear();
- } else {
- ret = state.buffer.consume(n, !!state.decoder);
- }
-
- return ret;
-}
-
-export function howMuchToRead(n: number, state: ReadableState) {
- if (n <= 0 || (state.length === 0 && state.ended)) {
- return 0;
- }
- if (state.objectMode) {
- return 1;
- }
- if (Number.isNaN(n)) {
- // Only flow one buffer at a time.
- if (state.flowing && state.length) {
- return state.buffer.first().length;
- }
- return state.length;
- }
- if (n <= state.length) {
- return n;
- }
- return state.ended ? state.length : 0;
-}
-
-export function maybeReadMore(stream: Readable, state: ReadableState) {
- if (!state.readingMore && state.constructed) {
- state.readingMore = true;
- queueMicrotask(() => maybeReadMore_(stream, state));
- }
-}
-
-function maybeReadMore_(stream: Readable, state: ReadableState) {
- while (
- !state.reading && !state.ended &&
- (state.length < state.highWaterMark ||
- (state.flowing && state.length === 0))
- ) {
- const len = state.length;
- stream.read(0);
- if (len === state.length) {
- // Didn't get any data, stop spinning.
- break;
- }
- }
- state.readingMore = false;
-}
-
-export function nReadingNextTick(self: Duplex | Readable) {
- self.read(0);
-}
-
-export function onEofChunk(stream: Duplex | Readable, state: ReadableState) {
- if (state.ended) return;
- if (state.decoder) {
- const chunk = state.decoder.end();
- if (chunk && chunk.length) {
- state.buffer.push(chunk);
- state.length += state.objectMode ? 1 : chunk.length;
- }
- }
- state.ended = true;
-
- if (state.sync) {
- emitReadable(stream);
- } else {
- state.needReadable = false;
- state.emittedReadable = true;
- emitReadable_(stream);
- }
-}
-
-export function pipeOnDrain(src: Duplex | Readable, dest: Duplex | Writable) {
- return function pipeOnDrainFunctionResult() {
- const state = src._readableState;
-
- if (state.awaitDrainWriters === dest) {
- state.awaitDrainWriters = null;
- } else if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Duplex | Writable>).delete(dest);
- }
-
- if (
- (!state.awaitDrainWriters ||
- (state.awaitDrainWriters as Set<Writable>).size === 0) &&
- src.listenerCount("data")
- ) {
- state.flowing = true;
- flow(src);
- }
- };
-}
-
-export function prependListener(
- emitter: EventEmitter,
- event: string,
- // deno-lint-ignore no-explicit-any
- fn: (...args: any[]) => any,
-) {
- if (typeof emitter.prependListener === "function") {
- return emitter.prependListener(event, fn);
- }
-
- // This is a hack to make sure that our error handler is attached before any
- // userland ones. NEVER DO THIS. This is here only because this code needs
- // to continue to work with older versions of Node.js that do not include
- //the prependListener() method. The goal is to eventually remove this hack.
- // TODO(Soremwar)
- // Burn it with fire
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- if (emitter._events.get(event)?.length) {
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- const listeners = [fn, ...emitter._events.get(event)];
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- emitter._events.set(event, listeners);
- } else {
- emitter.on(event, fn);
- }
-}
-
-export function readableAddChunk(
- stream: Duplex | Readable,
- chunk: string | Buffer | Uint8Array | null,
- encoding: undefined | string = undefined,
- addToFront: boolean,
-) {
- const state = stream._readableState;
- let usedEncoding = encoding;
-
- let err;
- if (!state.objectMode) {
- if (typeof chunk === "string") {
- usedEncoding = encoding || state.defaultEncoding;
- if (state.encoding !== usedEncoding) {
- if (addToFront && state.encoding) {
- chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
- } else {
- chunk = Buffer.from(chunk, usedEncoding);
- usedEncoding = "";
- }
- }
- } else if (chunk instanceof Uint8Array) {
- chunk = Buffer.from(chunk);
- }
- }
-
- if (err) {
- errorOrDestroy(stream, err);
- } else if (chunk === null) {
- state.reading = false;
- onEofChunk(stream, state);
- } else if (state.objectMode || (chunk.length > 0)) {
- if (addToFront) {
- if (state.endEmitted) {
- errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
- } else {
- addChunk(stream, state, chunk, true);
- }
- } else if (state.ended) {
- errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
- } else if (state.destroyed || state.errored) {
- return false;
- } else {
- state.reading = false;
- if (state.decoder && !usedEncoding) {
- //TODO(Soremwar)
- //I don't think this cast is right
- chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
- if (state.objectMode || chunk.length !== 0) {
- addChunk(stream, state, chunk, false);
- } else {
- maybeReadMore(stream, state);
- }
- } else {
- addChunk(stream, state, chunk, false);
- }
- }
- } else if (!addToFront) {
- state.reading = false;
- maybeReadMore(stream, state);
- }
-
- return !state.ended &&
- (state.length < state.highWaterMark || state.length === 0);
-}
-
-export function resume(stream: Duplex | Readable, state: ReadableState) {
- if (!state.resumeScheduled) {
- state.resumeScheduled = true;
- queueMicrotask(() => resume_(stream, state));
- }
-}
-
-function resume_(stream: Duplex | Readable, state: ReadableState) {
- if (!state.reading) {
- stream.read(0);
- }
-
- state.resumeScheduled = false;
- stream.emit("resume");
- flow(stream);
- if (state.flowing && !state.reading) {
- stream.read(0);
- }
-}
-
-export function updateReadableListening(self: Duplex | Readable) {
- const state = self._readableState;
- state.readableListening = self.listenerCount("readable") > 0;
-
- if (state.resumeScheduled && state[kPaused] === false) {
- // Flowing needs to be set to true now, otherwise
- // the upcoming resume will not flow.
- state.flowing = true;
-
- // Crude way to check if we should resume.
- } else if (self.listenerCount("data") > 0) {
- self.resume();
- } else if (!state.readableListening) {
- state.flowing = null;
- }
-}
diff --git a/std/node/_stream/readable_test.ts b/std/node/_stream/readable_test.ts
deleted file mode 100644
index 72767e28f..000000000
--- a/std/node/_stream/readable_test.ts
+++ /dev/null
@@ -1,489 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Readable from "../_stream/readable.ts";
-import { once } from "../events.ts";
-import { deferred } from "../../async/mod.ts";
-import {
- assert,
- assertEquals,
- assertStrictEquals,
-} from "../../testing/asserts.ts";
-
-Deno.test("Readable stream from iterator", async () => {
- function* generate() {
- yield "a";
- yield "b";
- yield "c";
- }
-
- const stream = Readable.from(generate());
-
- const expected = ["a", "b", "c"];
-
- for await (const chunk of stream) {
- assertStrictEquals(chunk, expected.shift());
- }
-});
-
-Deno.test("Readable stream from async iterator", async () => {
- async function* generate() {
- yield "a";
- yield "b";
- yield "c";
- }
-
- const stream = Readable.from(generate());
-
- const expected = ["a", "b", "c"];
-
- for await (const chunk of stream) {
- assertStrictEquals(chunk, expected.shift());
- }
-});
-
-Deno.test("Readable stream from promise", async () => {
- const promises = [
- Promise.resolve("a"),
- Promise.resolve("b"),
- Promise.resolve("c"),
- ];
-
- const stream = Readable.from(promises);
-
- const expected = ["a", "b", "c"];
-
- for await (const chunk of stream) {
- assertStrictEquals(chunk, expected.shift());
- }
-});
-
-Deno.test("Readable stream from string", async () => {
- const string = "abc";
- const stream = Readable.from(string);
-
- for await (const chunk of stream) {
- assertStrictEquals(chunk, string);
- }
-});
-
-Deno.test("Readable stream from Buffer", async () => {
- const string = "abc";
- const stream = Readable.from(Buffer.from(string));
-
- for await (const chunk of stream) {
- assertStrictEquals((chunk as Buffer).toString(), string);
- }
-});
-
-Deno.test("Readable stream gets destroyed on error", async () => {
- // deno-lint-ignore require-yield
- async function* generate() {
- throw new Error("kaboom");
- }
-
- const stream = Readable.from(generate());
-
- stream.read();
-
- const [err] = await once(stream, "error");
- assertStrictEquals(err.message, "kaboom");
- assertStrictEquals(stream.destroyed, true);
-});
-
-Deno.test("Readable stream works as Transform stream", async () => {
- async function* generate(stream: Readable) {
- for await (const chunk of stream) {
- yield (chunk as string).toUpperCase();
- }
- }
-
- const source = new Readable({
- objectMode: true,
- read() {
- this.push("a");
- this.push("b");
- this.push("c");
- this.push(null);
- },
- });
-
- const stream = Readable.from(generate(source));
-
- const expected = ["A", "B", "C"];
-
- for await (const chunk of stream) {
- assertStrictEquals(chunk, expected.shift());
- }
-});
-
-Deno.test("Readable stream can be paused", () => {
- const readable = new Readable();
-
- // _read is a noop, here.
- readable._read = () => {};
-
- // Default state of a stream is not "paused"
- assert(!readable.isPaused());
-
- // Make the stream start flowing...
- readable.on("data", () => {});
-
- // still not paused.
- assert(!readable.isPaused());
-
- readable.pause();
- assert(readable.isPaused());
- readable.resume();
- assert(!readable.isPaused());
-});
-
-Deno.test("Readable stream sets enconding correctly", () => {
- const readable = new Readable({
- read() {},
- });
-
- readable.setEncoding("utf8");
-
- readable.push(new TextEncoder().encode("DEF"));
- readable.unshift(new TextEncoder().encode("ABC"));
-
- assertStrictEquals(readable.read(), "ABCDEF");
-});
-
-Deno.test("Readable stream sets encoding correctly", () => {
- const readable = new Readable({
- read() {},
- });
-
- readable.setEncoding("utf8");
-
- readable.push(new TextEncoder().encode("DEF"));
- readable.unshift(new TextEncoder().encode("ABC"));
-
- assertStrictEquals(readable.read(), "ABCDEF");
-});
-
-Deno.test("Readable stream holds up a big push", async () => {
- let readExecuted = 0;
- const readExecutedExpected = 3;
- const readExpectedExecutions = deferred();
-
- let endExecuted = 0;
- const endExecutedExpected = 1;
- const endExpectedExecutions = deferred();
-
- const str = "asdfasdfasdfasdfasdf";
-
- const r = new Readable({
- highWaterMark: 5,
- encoding: "utf8",
- });
-
- let reads = 0;
-
- function _read() {
- if (reads === 0) {
- setTimeout(() => {
- r.push(str);
- }, 1);
- reads++;
- } else if (reads === 1) {
- const ret = r.push(str);
- assertEquals(ret, false);
- reads++;
- } else {
- r.push(null);
- }
- }
-
- r._read = () => {
- readExecuted++;
- if (readExecuted == readExecutedExpected) {
- readExpectedExecutions.resolve();
- }
- _read();
- };
-
- r.on("end", () => {
- endExecuted++;
- if (endExecuted == endExecutedExpected) {
- endExpectedExecutions.resolve();
- }
- });
-
- // Push some data in to start.
- // We've never gotten any read event at this point.
- const ret = r.push(str);
- assert(!ret);
- let chunk = r.read();
- assertEquals(chunk, str);
- chunk = r.read();
- assertEquals(chunk, null);
-
- r.once("readable", () => {
- // This time, we'll get *all* the remaining data, because
- // it's been added synchronously, as the read WOULD take
- // us below the hwm, and so it triggered a _read() again,
- // which synchronously added more, which we then return.
- chunk = r.read();
- assertEquals(chunk, str + str);
-
- chunk = r.read();
- assertEquals(chunk, null);
- });
-
- const readTimeout = setTimeout(
- () => readExpectedExecutions.reject(),
- 1000,
- );
- const endTimeout = setTimeout(
- () => endExpectedExecutions.reject(),
- 1000,
- );
- await readExpectedExecutions;
- await endExpectedExecutions;
- clearTimeout(readTimeout);
- clearTimeout(endTimeout);
- assertEquals(readExecuted, readExecutedExpected);
- assertEquals(endExecuted, endExecutedExpected);
-});
-
-Deno.test("Readable stream: 'on' event", async () => {
- async function* generate() {
- yield "a";
- yield "b";
- yield "c";
- }
-
- const stream = Readable.from(generate());
-
- let iterations = 0;
- const expected = ["a", "b", "c"];
-
- stream.on("data", (chunk) => {
- iterations++;
- assertStrictEquals(chunk, expected.shift());
- });
-
- await once(stream, "end");
-
- assertStrictEquals(iterations, 3);
-});
-
-Deno.test("Readable stream: 'data' event", async () => {
- async function* generate() {
- yield "a";
- yield "b";
- yield "c";
- }
-
- const stream = Readable.from(generate(), { objectMode: false });
-
- let iterations = 0;
- const expected = ["a", "b", "c"];
-
- stream.on("data", (chunk) => {
- iterations++;
- assertStrictEquals(chunk instanceof Buffer, true);
- assertStrictEquals(chunk.toString(), expected.shift());
- });
-
- await once(stream, "end");
-
- assertStrictEquals(iterations, 3);
-});
-
-Deno.test("Readable stream: 'data' event on non-object", async () => {
- async function* generate() {
- yield "a";
- yield "b";
- yield "c";
- }
-
- const stream = Readable.from(generate(), { objectMode: false });
-
- let iterations = 0;
- const expected = ["a", "b", "c"];
-
- stream.on("data", (chunk) => {
- iterations++;
- assertStrictEquals(chunk instanceof Buffer, true);
- assertStrictEquals(chunk.toString(), expected.shift());
- });
-
- await once(stream, "end");
-
- assertStrictEquals(iterations, 3);
-});
-
-Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- const r = new Readable({
- highWaterMark: 3,
- });
-
- r._read = () => {
- throw new Error("_read must not be called");
- };
- r.push(Buffer.from("blerg"));
-
- setTimeout(function () {
- assert(!r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- clearTimeout(readableTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
-});
-
-Deno.test("Readable stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- let readExecuted = 0;
- const readExecutedExpected = 1;
- const readExpectedExecutions = deferred();
-
- const r = new Readable({
- highWaterMark: 3,
- });
-
- r._read = () => {
- readExecuted++;
- if (readExecuted == readExecutedExpected) {
- readExpectedExecutions.resolve();
- }
- };
-
- r.push(Buffer.from("bl"));
-
- setTimeout(function () {
- assert(r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- const readTimeout = setTimeout(
- () => readExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- await readExpectedExecutions;
- clearTimeout(readableTimeout);
- clearTimeout(readTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
- assertEquals(readExecuted, readExecutedExpected);
-});
-
-Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- const r = new Readable({
- highWaterMark: 30,
- });
-
- r._read = () => {
- throw new Error("Must not be executed");
- };
-
- r.push(Buffer.from("blerg"));
- //This ends the stream and triggers end
- r.push(null);
-
- setTimeout(function () {
- // Assert we're testing what we think we are
- assert(!r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- clearTimeout(readableTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
-});
-
-Deno.test("Readable stream: 'read' is emitted on empty string pushed in non-object mode", async () => {
- let endExecuted = 0;
- const endExecutedExpected = 1;
- const endExpectedExecutions = deferred();
-
- const underlyingData = ["", "x", "y", "", "z"];
- const expected = underlyingData.filter((data) => data);
- const result: unknown[] = [];
-
- const r = new Readable({
- encoding: "utf8",
- });
- r._read = function () {
- queueMicrotask(() => {
- if (!underlyingData.length) {
- this.push(null);
- } else {
- this.push(underlyingData.shift());
- }
- });
- };
-
- r.on("readable", () => {
- const data = r.read();
- if (data !== null) result.push(data);
- });
-
- r.on("end", () => {
- endExecuted++;
- if (endExecuted == endExecutedExpected) {
- endExpectedExecutions.resolve();
- }
- assertEquals(result, expected);
- });
-
- const endTimeout = setTimeout(
- () => endExpectedExecutions.reject(),
- 1000,
- );
- await endExpectedExecutions;
- clearTimeout(endTimeout);
- assertEquals(endExecuted, endExecutedExpected);
-});
-
-Deno.test("Readable stream: listeners can be removed", () => {
- const r = new Readable();
- r._read = () => {};
- r.on("data", () => {});
-
- r.removeAllListeners("data");
-
- assertEquals(r.eventNames().length, 0);
-});
diff --git a/std/node/_stream/stream.ts b/std/node/_stream/stream.ts
deleted file mode 100644
index 4daafc77b..000000000
--- a/std/node/_stream/stream.ts
+++ /dev/null
@@ -1,81 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import EventEmitter from "../events.ts";
-import type Readable from "./readable.ts";
-import type Writable from "./writable.ts";
-import { types } from "../util.ts";
-
-class Stream extends EventEmitter {
- constructor() {
- super();
- }
-
- static _isUint8Array = types.isUint8Array;
- static _uint8ArrayToBuffer = (chunk: Uint8Array) => Buffer.from(chunk);
-
- pipe(dest: Readable | Writable, options?: { end?: boolean }) {
- // deno-lint-ignore no-this-alias
- const source = this;
-
- //TODO(Soremwar)
- //isStdio exist on stdin || stdout only, which extend from Duplex
- //if (!dest._isStdio && (options?.end ?? true)) {
- //Find an alternative to be able to pipe streams to stdin & stdout
- //Port them as well?
- if (options?.end ?? true) {
- source.on("end", onend);
- source.on("close", onclose);
- }
-
- let didOnEnd = false;
- function onend() {
- if (didOnEnd) return;
- didOnEnd = true;
-
- // 'end' is only called on Writable streams
- (dest as Writable).end();
- }
-
- function onclose() {
- if (didOnEnd) return;
- didOnEnd = true;
-
- if (typeof dest.destroy === "function") dest.destroy();
- }
-
- // Don't leave dangling pipes when there are errors.
- function onerror(this: Stream, er: Error) {
- cleanup();
- if (this.listenerCount("error") === 0) {
- throw er; // Unhandled stream error in pipe.
- }
- }
-
- source.on("error", onerror);
- dest.on("error", onerror);
-
- // Remove all the event listeners that were added.
- function cleanup() {
- source.removeListener("end", onend);
- source.removeListener("close", onclose);
-
- source.removeListener("error", onerror);
- dest.removeListener("error", onerror);
-
- source.removeListener("end", cleanup);
- source.removeListener("close", cleanup);
-
- dest.removeListener("close", cleanup);
- }
-
- source.on("end", cleanup);
- source.on("close", cleanup);
-
- dest.on("close", cleanup);
- dest.emit("pipe", source);
-
- return dest;
- }
-}
-
-export default Stream;
diff --git a/std/node/_stream/symbols.ts b/std/node/_stream/symbols.ts
deleted file mode 100644
index addb969d3..000000000
--- a/std/node/_stream/symbols.ts
+++ /dev/null
@@ -1,4 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-export const kConstruct = Symbol("kConstruct");
-export const kDestroy = Symbol("kDestroy");
-export const kPaused = Symbol("kPaused");
diff --git a/std/node/_stream/transform.ts b/std/node/_stream/transform.ts
deleted file mode 100644
index a4246e81a..000000000
--- a/std/node/_stream/transform.ts
+++ /dev/null
@@ -1,132 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Encodings } from "../_utils.ts";
-import Duplex from "./duplex.ts";
-import type { DuplexOptions } from "./duplex.ts";
-import type { writeV } from "./writable_internal.ts";
-import { ERR_METHOD_NOT_IMPLEMENTED } from "../_errors.ts";
-
-const kCallback = Symbol("kCallback");
-
-type TransformFlush = (
- this: Transform,
- // deno-lint-ignore no-explicit-any
- callback: (error?: Error | null, data?: any) => void,
-) => void;
-
-export interface TransformOptions extends DuplexOptions {
- read?(this: Transform, size: number): void;
- write?(
- this: Transform,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: Encodings,
- callback: (error?: Error | null) => void,
- ): void;
- writev?: writeV;
- final?(this: Transform, callback: (error?: Error | null) => void): void;
- destroy?(
- this: Transform,
- error: Error | null,
- callback: (error: Error | null) => void,
- ): void;
- transform?(
- this: Transform,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: Encodings,
- // deno-lint-ignore no-explicit-any
- callback: (error?: Error | null, data?: any) => void,
- ): void;
- flush?: TransformFlush;
-}
-
-export default class Transform extends Duplex {
- [kCallback]: null | ((error?: Error | null) => void);
- _flush?: TransformFlush;
-
- constructor(options?: TransformOptions) {
- super(options);
- this._readableState.sync = false;
-
- this[kCallback] = null;
-
- if (options) {
- if (typeof options.transform === "function") {
- this._transform = options.transform;
- }
-
- if (typeof options.flush === "function") {
- this._flush = options.flush;
- }
- }
-
- this.on("prefinish", function (this: Transform) {
- if (typeof this._flush === "function" && !this.destroyed) {
- this._flush((er, data) => {
- if (er) {
- this.destroy(er);
- return;
- }
-
- if (data != null) {
- this.push(data);
- }
- this.push(null);
- });
- } else {
- this.push(null);
- }
- });
- }
-
- _read = () => {
- if (this[kCallback]) {
- const callback = this[kCallback] as (error?: Error | null) => void;
- this[kCallback] = null;
- callback();
- }
- };
-
- _transform(
- // deno-lint-ignore no-explicit-any
- _chunk: any,
- _encoding: string,
- // deno-lint-ignore no-explicit-any
- _callback: (error?: Error | null, data?: any) => void,
- ) {
- throw new ERR_METHOD_NOT_IMPLEMENTED("_transform()");
- }
-
- _write = (
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- callback: (error?: Error | null) => void,
- ) => {
- const rState = this._readableState;
- const wState = this._writableState;
- const length = rState.length;
-
- this._transform(chunk, encoding, (err, val) => {
- if (err) {
- callback(err);
- return;
- }
-
- if (val != null) {
- this.push(val);
- }
-
- if (
- wState.ended || // Backwards compat.
- length === rState.length || // Backwards compat.
- rState.length < rState.highWaterMark ||
- rState.length === 0
- ) {
- callback();
- } else {
- this[kCallback] = callback;
- }
- });
- };
-}
diff --git a/std/node/_stream/transform_test.ts b/std/node/_stream/transform_test.ts
deleted file mode 100644
index d3b90ff01..000000000
--- a/std/node/_stream/transform_test.ts
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Transform from "./transform.ts";
-import finished from "./end_of_stream.ts";
-import { deferred } from "../../async/mod.ts";
-import { assert, assertEquals } from "../../testing/asserts.ts";
-
-Deno.test("Transform stream finishes correctly", async () => {
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExecution = deferred();
-
- const tr = new Transform({
- transform(_data, _enc, cb) {
- cb();
- },
- });
-
- let finish = false;
- let ended = false;
-
- tr.on("end", () => {
- ended = true;
- });
-
- tr.on("finish", () => {
- finish = true;
- });
-
- finished(tr, (err) => {
- finishedExecuted++;
- if (finishedExecuted === finishedExecutedExpected) {
- finishedExecution.resolve();
- }
- assert(!err, "no error");
- assert(finish);
- assert(ended);
- });
-
- tr.end();
- tr.resume();
-
- const finishedTimeout = setTimeout(
- () => finishedExecution.reject(),
- 1000,
- );
- await finishedExecution;
- clearTimeout(finishedTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
-});
-
-Deno.test("Transform stream flushes data correctly", () => {
- const expected = "asdf";
-
- const t = new Transform({
- transform: (_d, _e, n) => {
- n();
- },
- flush: (n) => {
- n(null, expected);
- },
- });
-
- t.end(Buffer.from("blerg"));
- t.on("data", (data) => {
- assertEquals(data.toString(), expected);
- });
-});
diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts
deleted file mode 100644
index 534fc22fb..000000000
--- a/std/node/_stream/writable.ts
+++ /dev/null
@@ -1,443 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Stream from "./stream.ts";
-import { captureRejectionSymbol } from "../events.ts";
-import {
- ERR_INVALID_ARG_TYPE,
- ERR_INVALID_OPT_VALUE,
- ERR_METHOD_NOT_IMPLEMENTED,
- ERR_STREAM_ALREADY_FINISHED,
- ERR_STREAM_CANNOT_PIPE,
- ERR_STREAM_DESTROYED,
- ERR_STREAM_NULL_VALUES,
- ERR_STREAM_WRITE_AFTER_END,
- ERR_UNKNOWN_ENCODING,
-} from "../_errors.ts";
-import type { AfterWriteTick, writeV } from "./writable_internal.ts";
-import {
- clearBuffer,
- destroy,
- errorBuffer,
- errorOrDestroy,
- finishMaybe,
- kOnFinished,
- nop,
- onwrite,
- resetBuffer,
- writeOrBuffer,
-} from "./writable_internal.ts";
-import type { Encodings } from "../_utils.ts";
-
-type WritableEncodings = Encodings | "buffer";
-
-export interface WritableOptions {
- autoDestroy?: boolean;
- decodeStrings?: boolean;
- defaultEncoding?: WritableEncodings;
- destroy?(
- this: Writable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ): void;
- emitClose?: boolean;
- final?(this: Writable, callback: (error?: Error | null) => void): void;
- highWaterMark?: number;
- objectMode?: boolean;
- write?(
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: WritableEncodings,
- callback: (error?: Error | null) => void,
- ): void;
- writev?(
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: string }>,
- callback: (error?: Error | null) => void,
- ): void;
-}
-
-export class WritableState {
- [kOnFinished]: Array<(error?: Error) => void> = [];
- afterWriteTickInfo: null | AfterWriteTick = null;
- allBuffers = true;
- allNoop = true;
- autoDestroy: boolean;
- buffered: Array<{
- allBuffers?: boolean;
- // deno-lint-ignore no-explicit-any
- chunk: any;
- encoding: string;
- callback: (error: Error) => void;
- }> = [];
- bufferedIndex = 0;
- bufferProcessing = false;
- closed = false;
- closeEmitted = false;
- constructed: boolean;
- corked = 0;
- decodeStrings: boolean;
- defaultEncoding: WritableEncodings;
- destroyed = false;
- emitClose: boolean;
- ended = false;
- ending = false;
- errored: Error | null = null;
- errorEmitted = false;
- finalCalled = false;
- finished = false;
- highWaterMark: number;
- length = 0;
- needDrain = false;
- objectMode: boolean;
- onwrite: (error?: Error | null) => void;
- pendingcb = 0;
- prefinished = false;
- sync = true;
- writecb: null | ((error: Error) => void) = null;
- writable = true;
- writelen = 0;
- writing = false;
-
- constructor(options: WritableOptions | undefined, stream: Writable) {
- this.objectMode = !!options?.objectMode;
-
- this.highWaterMark = options?.highWaterMark ??
- (this.objectMode ? 16 : 16 * 1024);
-
- if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) {
- this.highWaterMark = Math.floor(this.highWaterMark);
- } else {
- throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark);
- }
-
- this.decodeStrings = !options?.decodeStrings === false;
-
- this.defaultEncoding = options?.defaultEncoding || "utf8";
-
- this.onwrite = onwrite.bind(undefined, stream);
-
- resetBuffer(this);
-
- this.emitClose = options?.emitClose ?? true;
- this.autoDestroy = options?.autoDestroy ?? true;
- this.constructed = true;
- }
-
- getBuffer() {
- return this.buffered.slice(this.bufferedIndex);
- }
-
- get bufferedRequestCount() {
- return this.buffered.length - this.bufferedIndex;
- }
-}
-
-/** A bit simpler than readable streams.
-* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all
-* the drain event emission and buffering.
-*/
-class Writable extends Stream {
- _final?: (
- this: Writable,
- callback: (error?: Error | null | undefined) => void,
- ) => void;
- _writableState: WritableState;
- _writev?: writeV | null = null;
-
- constructor(options?: WritableOptions) {
- super();
- this._writableState = new WritableState(options, this);
-
- if (options) {
- if (typeof options.write === "function") {
- this._write = options.write;
- }
-
- if (typeof options.writev === "function") {
- this._writev = options.writev;
- }
-
- if (typeof options.destroy === "function") {
- this._destroy = options.destroy;
- }
-
- if (typeof options.final === "function") {
- this._final = options.final;
- }
- }
- }
-
- [captureRejectionSymbol](err?: Error) {
- this.destroy(err);
- }
-
- static WritableState = WritableState;
-
- get destroyed() {
- return this._writableState ? this._writableState.destroyed : false;
- }
-
- set destroyed(value) {
- if (this._writableState) {
- this._writableState.destroyed = value;
- }
- }
-
- get writable() {
- const w = this._writableState;
- return !w.destroyed && !w.errored && !w.ending && !w.ended;
- }
-
- set writable(val) {
- if (this._writableState) {
- this._writableState.writable = !!val;
- }
- }
-
- get writableFinished() {
- return this._writableState ? this._writableState.finished : false;
- }
-
- get writableObjectMode() {
- return this._writableState ? this._writableState.objectMode : false;
- }
-
- get writableBuffer() {
- return this._writableState && this._writableState.getBuffer();
- }
-
- get writableEnded() {
- return this._writableState ? this._writableState.ending : false;
- }
-
- get writableHighWaterMark() {
- return this._writableState && this._writableState.highWaterMark;
- }
-
- get writableCorked() {
- return this._writableState ? this._writableState.corked : 0;
- }
-
- get writableLength() {
- return this._writableState && this._writableState.length;
- }
-
- _undestroy() {
- const w = this._writableState;
- w.constructed = true;
- w.destroyed = false;
- w.closed = false;
- w.closeEmitted = false;
- w.errored = null;
- w.errorEmitted = false;
- w.ended = false;
- w.ending = false;
- w.finalCalled = false;
- w.prefinished = false;
- w.finished = false;
- }
-
- _destroy(err: Error | null, cb: (error?: Error | null) => void) {
- cb(err);
- }
-
- destroy(err?: Error | null, cb?: () => void) {
- const state = this._writableState;
- if (!state.destroyed) {
- queueMicrotask(() => errorBuffer(state));
- }
- destroy.call(this, err, cb);
- return this;
- }
-
- end(cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, encoding: WritableEncodings, cb?: () => void): void;
-
- end(
- // deno-lint-ignore no-explicit-any
- x?: any | (() => void),
- y?: WritableEncodings | (() => void),
- z?: () => void,
- ) {
- const state = this._writableState;
- // deno-lint-ignore no-explicit-any
- let chunk: any | null;
- let encoding: WritableEncodings | null;
- let cb: undefined | ((error?: Error) => void);
-
- if (typeof x === "function") {
- chunk = null;
- encoding = null;
- cb = x;
- } else if (typeof y === "function") {
- chunk = x;
- encoding = null;
- cb = y;
- } else {
- chunk = x;
- encoding = y as WritableEncodings;
- cb = z;
- }
-
- if (chunk !== null && chunk !== undefined) {
- this.write(chunk, encoding);
- }
-
- if (state.corked) {
- state.corked = 1;
- this.uncork();
- }
-
- let err: Error | undefined;
- if (!state.errored && !state.ending) {
- state.ending = true;
- finishMaybe(this, state, true);
- state.ended = true;
- } else if (state.finished) {
- err = new ERR_STREAM_ALREADY_FINISHED("end");
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("end");
- }
-
- if (typeof cb === "function") {
- if (err || state.finished) {
- queueMicrotask(() => {
- (cb as (error?: Error | undefined) => void)(err);
- });
- } else {
- state[kOnFinished].push(cb);
- }
- }
-
- return this;
- }
-
- _write(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- cb: (error?: Error | null) => void,
- ): void {
- if (this._writev) {
- this._writev([{ chunk, encoding }], cb);
- } else {
- throw new ERR_METHOD_NOT_IMPLEMENTED("_write()");
- }
- }
-
- //This signature was changed to keep inheritance coherent
- pipe(dest: Writable): Writable {
- errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
- return dest;
- }
-
- // deno-lint-ignore no-explicit-any
- write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean;
- write(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: WritableEncodings | null,
- cb?: (error: Error | null | undefined) => void,
- ): boolean;
-
- write(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- x?: WritableEncodings | null | ((error: Error | null | undefined) => void),
- y?: ((error: Error | null | undefined) => void),
- ) {
- const state = this._writableState;
- let encoding: WritableEncodings;
- let cb: (error?: Error | null) => void;
-
- if (typeof x === "function") {
- cb = x;
- encoding = state.defaultEncoding;
- } else {
- if (!x) {
- encoding = state.defaultEncoding;
- } else if (x !== "buffer" && !Buffer.isEncoding(x)) {
- throw new ERR_UNKNOWN_ENCODING(x);
- } else {
- encoding = x;
- }
- if (typeof y !== "function") {
- cb = nop;
- } else {
- cb = y;
- }
- }
-
- if (chunk === null) {
- throw new ERR_STREAM_NULL_VALUES();
- } else if (!state.objectMode) {
- if (typeof chunk === "string") {
- if (state.decodeStrings !== false) {
- chunk = Buffer.from(chunk, encoding);
- encoding = "buffer";
- }
- } else if (chunk instanceof Buffer) {
- encoding = "buffer";
- } else if (Stream._isUint8Array(chunk)) {
- chunk = Stream._uint8ArrayToBuffer(chunk);
- encoding = "buffer";
- } else {
- throw new ERR_INVALID_ARG_TYPE(
- "chunk",
- ["string", "Buffer", "Uint8Array"],
- chunk,
- );
- }
- }
-
- let err: Error | undefined;
- if (state.ending) {
- err = new ERR_STREAM_WRITE_AFTER_END();
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("write");
- }
-
- if (err) {
- queueMicrotask(() => cb(err));
- errorOrDestroy(this, err, true);
- return false;
- }
- state.pendingcb++;
- return writeOrBuffer(this, state, chunk, encoding, cb);
- }
-
- cork() {
- this._writableState.corked++;
- }
-
- uncork() {
- const state = this._writableState;
-
- if (state.corked) {
- state.corked--;
-
- if (!state.writing) {
- clearBuffer(this, state);
- }
- }
- }
-
- setDefaultEncoding(encoding: string) {
- // node::ParseEncoding() requires lower case.
- if (typeof encoding === "string") {
- encoding = encoding.toLowerCase();
- }
- if (!Buffer.isEncoding(encoding)) {
- throw new ERR_UNKNOWN_ENCODING(encoding);
- }
- this._writableState.defaultEncoding = encoding as WritableEncodings;
- return this;
- }
-}
-
-export default Writable;
diff --git a/std/node/_stream/writable_internal.ts b/std/node/_stream/writable_internal.ts
deleted file mode 100644
index e8c001af0..000000000
--- a/std/node/_stream/writable_internal.ts
+++ /dev/null
@@ -1,457 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type Duplex from "./duplex.ts";
-import type Writable from "./writable.ts";
-import type { WritableState } from "./writable.ts";
-import { kDestroy } from "./symbols.ts";
-import { ERR_MULTIPLE_CALLBACK, ERR_STREAM_DESTROYED } from "../_errors.ts";
-
-export type writeV = (
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: string }>,
- callback: (error?: Error | null) => void,
-) => void;
-
-export type AfterWriteTick = {
- cb: (error?: Error) => void;
- count: number;
- state: WritableState;
- stream: Writable;
-};
-
-export const kOnFinished = Symbol("kOnFinished");
-
-function _destroy(
- self: Writable,
- err?: Error | null,
- cb?: (error?: Error | null) => void,
-) {
- self._destroy(err || null, (err) => {
- const w = self._writableState;
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- }
-
- w.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- if (!w.errorEmitted) {
- w.errorEmitted = true;
- self.emit("error", err);
- }
- w.closeEmitted = true;
- if (w.emitClose) {
- self.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- w.closeEmitted = true;
- if (w.emitClose) {
- self.emit("close");
- }
- });
- }
- });
-}
-
-export function afterWrite(
- stream: Writable,
- state: WritableState,
- count: number,
- cb: (error?: Error) => void,
-) {
- const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
- state.needDrain;
- if (needDrain) {
- state.needDrain = false;
- stream.emit("drain");
- }
-
- while (count-- > 0) {
- state.pendingcb--;
- cb();
- }
-
- if (state.destroyed) {
- errorBuffer(state);
- }
-
- finishMaybe(stream, state);
-}
-
-export function afterWriteTick({
- cb,
- count,
- state,
- stream,
-}: AfterWriteTick) {
- state.afterWriteTickInfo = null;
- return afterWrite(stream, state, count, cb);
-}
-
-/** If there's something in the buffer waiting, then process it.*/
-export function clearBuffer(stream: Duplex | Writable, state: WritableState) {
- if (
- state.corked ||
- state.bufferProcessing ||
- state.destroyed ||
- !state.constructed
- ) {
- return;
- }
-
- const { buffered, bufferedIndex, objectMode } = state;
- const bufferedLength = buffered.length - bufferedIndex;
-
- if (!bufferedLength) {
- return;
- }
-
- const i = bufferedIndex;
-
- state.bufferProcessing = true;
- if (bufferedLength > 1 && stream._writev) {
- state.pendingcb -= bufferedLength - 1;
-
- const callback = state.allNoop ? nop : (err: Error) => {
- for (let n = i; n < buffered.length; ++n) {
- buffered[n].callback(err);
- }
- };
- const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
-
- doWrite(stream, state, true, state.length, chunks, "", callback);
-
- resetBuffer(state);
- } else {
- do {
- const { chunk, encoding, callback } = buffered[i];
- const len = objectMode ? 1 : chunk.length;
- doWrite(stream, state, false, len, chunk, encoding, callback);
- } while (i < buffered.length && !state.writing);
-
- if (i === buffered.length) {
- resetBuffer(state);
- } else if (i > 256) {
- buffered.splice(0, i);
- state.bufferedIndex = 0;
- } else {
- state.bufferedIndex = i;
- }
- }
- state.bufferProcessing = false;
-}
-
-export function destroy(this: Writable, err?: Error | null, cb?: () => void) {
- const w = this._writableState;
-
- if (w.destroyed) {
- if (typeof cb === "function") {
- cb();
- }
-
- return this;
- }
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- }
-
- w.destroyed = true;
-
- if (!w.constructed) {
- this.once(kDestroy, (er) => {
- _destroy(this, err || er, cb);
- });
- } else {
- _destroy(this, err, cb);
- }
-
- return this;
-}
-
-function doWrite(
- stream: Duplex | Writable,
- state: WritableState,
- writev: boolean,
- len: number,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- cb: (error: Error) => void,
-) {
- state.writelen = len;
- state.writecb = cb;
- state.writing = true;
- state.sync = true;
- if (state.destroyed) {
- state.onwrite(new ERR_STREAM_DESTROYED("write"));
- } else if (writev) {
- (stream._writev as unknown as writeV)(chunk, state.onwrite);
- } else {
- stream._write(chunk, encoding, state.onwrite);
- }
- state.sync = false;
-}
-
-/** If there's something in the buffer waiting, then invoke callbacks.*/
-export function errorBuffer(state: WritableState) {
- if (state.writing) {
- return;
- }
-
- for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
- const { chunk, callback } = state.buffered[n];
- const len = state.objectMode ? 1 : chunk.length;
- state.length -= len;
- callback(new ERR_STREAM_DESTROYED("write"));
- }
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback(new ERR_STREAM_DESTROYED("end"));
- }
-
- resetBuffer(state);
-}
-
-export function errorOrDestroy(stream: Writable, err: Error, sync = false) {
- const w = stream._writableState;
-
- if (w.destroyed) {
- return stream;
- }
-
- if (w.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (sync) {
- queueMicrotask(() => {
- if (w.errorEmitted) {
- return;
- }
- w.errorEmitted = true;
- stream.emit("error", err);
- });
- } else {
- if (w.errorEmitted) {
- return;
- }
- w.errorEmitted = true;
- stream.emit("error", err);
- }
- }
-}
-
-function finish(stream: Writable, state: WritableState) {
- state.pendingcb--;
- if (state.errorEmitted || state.closeEmitted) {
- return;
- }
-
- state.finished = true;
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback();
- }
-
- stream.emit("finish");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
-}
-
-export function finishMaybe(
- stream: Writable,
- state: WritableState,
- sync?: boolean,
-) {
- if (needFinish(state)) {
- prefinish(stream, state);
- if (state.pendingcb === 0 && needFinish(state)) {
- state.pendingcb++;
- if (sync) {
- queueMicrotask(() => finish(stream, state));
- } else {
- finish(stream, state);
- }
- }
- }
-}
-
-export function needFinish(state: WritableState) {
- return (state.ending &&
- state.constructed &&
- state.length === 0 &&
- !state.errored &&
- state.buffered.length === 0 &&
- !state.finished &&
- !state.writing);
-}
-
-export function nop() {}
-
-export function resetBuffer(state: WritableState) {
- state.buffered = [];
- state.bufferedIndex = 0;
- state.allBuffers = true;
- state.allNoop = true;
-}
-
-function onwriteError(
- stream: Writable,
- state: WritableState,
- er: Error,
- cb: (error: Error) => void,
-) {
- --state.pendingcb;
-
- cb(er);
- errorBuffer(state);
- errorOrDestroy(stream, er);
-}
-
-export function onwrite(stream: Writable, er?: Error | null) {
- const state = stream._writableState;
- const sync = state.sync;
- const cb = state.writecb;
-
- if (typeof cb !== "function") {
- errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
- return;
- }
-
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
-
- if (er) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- er.stack;
-
- if (!state.errored) {
- state.errored = er;
- }
-
- if (sync) {
- queueMicrotask(() => onwriteError(stream, state, er, cb));
- } else {
- onwriteError(stream, state, er, cb);
- }
- } else {
- if (state.buffered.length > state.bufferedIndex) {
- clearBuffer(stream, state);
- }
-
- if (sync) {
- if (
- state.afterWriteTickInfo !== null &&
- state.afterWriteTickInfo.cb === cb
- ) {
- state.afterWriteTickInfo.count++;
- } else {
- state.afterWriteTickInfo = {
- count: 1,
- cb: (cb as (error?: Error) => void),
- stream,
- state,
- };
- queueMicrotask(() =>
- afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
- );
- }
- } else {
- afterWrite(stream, state, 1, cb as (error?: Error) => void);
- }
- }
-}
-
-export function prefinish(stream: Writable, state: WritableState) {
- if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === "function" && !state.destroyed) {
- state.finalCalled = true;
-
- state.sync = true;
- state.pendingcb++;
- stream._final((err) => {
- state.pendingcb--;
- if (err) {
- for (const callback of state[kOnFinished].splice(0)) {
- callback(err);
- }
- errorOrDestroy(stream, err, state.sync);
- } else if (needFinish(state)) {
- state.prefinished = true;
- stream.emit("prefinish");
- state.pendingcb++;
- queueMicrotask(() => finish(stream, state));
- }
- });
- state.sync = false;
- } else {
- state.prefinished = true;
- stream.emit("prefinish");
- }
- }
-}
-
-export function writeOrBuffer(
- stream: Duplex | Writable,
- state: WritableState,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- callback: (error: Error) => void,
-) {
- const len = state.objectMode ? 1 : chunk.length;
-
- state.length += len;
-
- if (state.writing || state.corked || state.errored || !state.constructed) {
- state.buffered.push({ chunk, encoding, callback });
- if (state.allBuffers && encoding !== "buffer") {
- state.allBuffers = false;
- }
- if (state.allNoop && callback !== nop) {
- state.allNoop = false;
- }
- } else {
- state.writelen = len;
- state.writecb = callback;
- state.writing = true;
- state.sync = true;
- stream._write(chunk, encoding, state.onwrite);
- state.sync = false;
- }
-
- const ret = state.length < state.highWaterMark;
-
- if (!ret) {
- state.needDrain = true;
- }
-
- return ret && !state.errored && !state.destroyed;
-}
diff --git a/std/node/_stream/writable_test.ts b/std/node/_stream/writable_test.ts
deleted file mode 100644
index d6133b65f..000000000
--- a/std/node/_stream/writable_test.ts
+++ /dev/null
@@ -1,209 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import finished from "./end_of_stream.ts";
-import Writable from "../_stream/writable.ts";
-import { deferred } from "../../async/mod.ts";
-import {
- assert,
- assertEquals,
- assertStrictEquals,
- assertThrows,
-} from "../../testing/asserts.ts";
-
-Deno.test("Writable stream writes correctly", async () => {
- let callback: undefined | ((error?: Error | null | undefined) => void);
-
- let writeExecuted = 0;
- const writeExecutedExpected = 1;
- const writeExpectedExecutions = deferred();
-
- let writevExecuted = 0;
- const writevExecutedExpected = 1;
- const writevExpectedExecutions = deferred();
-
- const writable = new Writable({
- write: (chunk, encoding, cb) => {
- writeExecuted++;
- if (writeExecuted == writeExecutedExpected) {
- writeExpectedExecutions.resolve();
- }
- assert(chunk instanceof Buffer);
- assertStrictEquals(encoding, "buffer");
- assertStrictEquals(String(chunk), "ABC");
- callback = cb;
- },
- writev: (chunks) => {
- writevExecuted++;
- if (writevExecuted == writevExecutedExpected) {
- writevExpectedExecutions.resolve();
- }
- assertStrictEquals(chunks.length, 2);
- assertStrictEquals(chunks[0].encoding, "buffer");
- assertStrictEquals(chunks[1].encoding, "buffer");
- assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI");
- },
- });
-
- writable.write(new TextEncoder().encode("ABC"));
- writable.write(new TextEncoder().encode("DEF"));
- writable.end(new TextEncoder().encode("GHI"));
- callback?.();
-
- const writeTimeout = setTimeout(
- () => writeExpectedExecutions.reject(),
- 1000,
- );
- const writevTimeout = setTimeout(
- () => writevExpectedExecutions.reject(),
- 1000,
- );
- await writeExpectedExecutions;
- await writevExpectedExecutions;
- clearTimeout(writeTimeout);
- clearTimeout(writevTimeout);
- assertEquals(writeExecuted, writeExecutedExpected);
- assertEquals(writevExecuted, writevExecutedExpected);
-});
-
-Deno.test("Writable stream writes Uint8Array in object mode", async () => {
- let writeExecuted = 0;
- const writeExecutedExpected = 1;
- const writeExpectedExecutions = deferred();
-
- const ABC = new TextEncoder().encode("ABC");
-
- const writable = new Writable({
- objectMode: true,
- write: (chunk, encoding, cb) => {
- writeExecuted++;
- if (writeExecuted == writeExecutedExpected) {
- writeExpectedExecutions.resolve();
- }
- assert(!(chunk instanceof Buffer));
- assert(chunk instanceof Uint8Array);
- assertEquals(chunk, ABC);
- assertEquals(encoding, "utf8");
- cb();
- },
- });
-
- writable.end(ABC);
-
- const writeTimeout = setTimeout(
- () => writeExpectedExecutions.reject(),
- 1000,
- );
- await writeExpectedExecutions;
- clearTimeout(writeTimeout);
- assertEquals(writeExecuted, writeExecutedExpected);
-});
-
-Deno.test("Writable stream throws on unexpected close", async () => {
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const writable = new Writable({
- write: () => {},
- });
- writable.writable = false;
- writable.destroy();
-
- finished(writable, (err) => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE");
- });
-
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- clearTimeout(finishedTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
-});
-
-Deno.test("Writable stream finishes correctly", async () => {
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const w = new Writable({
- write(_chunk, _encoding, cb) {
- cb();
- },
- autoDestroy: false,
- });
-
- w.end("asd");
-
- queueMicrotask(() => {
- finished(w, () => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- });
- });
-
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- clearTimeout(finishedTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
-});
-
-Deno.test("Writable stream finishes correctly after error", async () => {
- let errorExecuted = 0;
- const errorExecutedExpected = 1;
- const errorExpectedExecutions = deferred();
-
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const w = new Writable({
- write(_chunk, _encoding, cb) {
- cb(new Error());
- },
- autoDestroy: false,
- });
- w.write("asd");
- w.on("error", () => {
- errorExecuted++;
- if (errorExecuted == errorExecutedExpected) {
- errorExpectedExecutions.resolve();
- }
- finished(w, () => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- });
- });
-
- const errorTimeout = setTimeout(
- () => errorExpectedExecutions.reject(),
- 1000,
- );
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- await errorExpectedExecutions;
- clearTimeout(finishedTimeout);
- clearTimeout(errorTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
- assertEquals(errorExecuted, errorExecutedExpected);
-});
-
-Deno.test("Writable stream fails on 'write' null value", () => {
- const writable = new Writable();
- assertThrows(() => writable.write(null));
-});