summaryrefslogtreecommitdiff
path: root/std/node/_stream
diff options
context:
space:
mode:
authorCasper Beyer <caspervonb@pm.me>2021-02-02 19:05:46 +0800
committerGitHub <noreply@github.com>2021-02-02 12:05:46 +0100
commit6abf126c2a7a451cded8c6b5e6ddf1b69c84055d (patch)
treefd94c013a19fcb38954844085821ec1601c20e18 /std/node/_stream
parenta2b5d44f1aa9d64f448a2a3cc2001272e2f60b98 (diff)
chore: remove std directory (#9361)
This removes the std folder from the tree. Various parts of the tests are pretty tightly dependent on std (47 direct imports and 75 indirect imports, not counting the cli tests that use them as fixtures) so I've added std as a submodule for now.
Diffstat (limited to 'std/node/_stream')
-rw-r--r--std/node/_stream/async_iterator.ts243
-rw-r--r--std/node/_stream/async_iterator_test.ts249
-rw-r--r--std/node/_stream/buffer_list.ts183
-rw-r--r--std/node/_stream/destroy.ts38
-rw-r--r--std/node/_stream/duplex.ts682
-rw-r--r--std/node/_stream/duplex_internal.ts296
-rw-r--r--std/node/_stream/duplex_test.ts698
-rw-r--r--std/node/_stream/end_of_stream.ts241
-rw-r--r--std/node/_stream/end_of_stream_test.ts97
-rw-r--r--std/node/_stream/from.ts102
-rw-r--r--std/node/_stream/passthrough.ts20
-rw-r--r--std/node/_stream/pipeline.ts308
-rw-r--r--std/node/_stream/pipeline_test.ts387
-rw-r--r--std/node/_stream/promises.ts42
-rw-r--r--std/node/_stream/promises_test.ts84
-rw-r--r--std/node/_stream/readable.ts788
-rw-r--r--std/node/_stream/readable_internal.ts438
-rw-r--r--std/node/_stream/readable_test.ts489
-rw-r--r--std/node/_stream/stream.ts81
-rw-r--r--std/node/_stream/symbols.ts4
-rw-r--r--std/node/_stream/transform.ts132
-rw-r--r--std/node/_stream/transform_test.ts68
-rw-r--r--std/node/_stream/writable.ts443
-rw-r--r--std/node/_stream/writable_internal.ts457
-rw-r--r--std/node/_stream/writable_test.ts209
25 files changed, 0 insertions, 6779 deletions
diff --git a/std/node/_stream/async_iterator.ts b/std/node/_stream/async_iterator.ts
deleted file mode 100644
index 5369ef39c..000000000
--- a/std/node/_stream/async_iterator.ts
+++ /dev/null
@@ -1,243 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type { Buffer } from "../buffer.ts";
-import finished from "./end_of_stream.ts";
-import Readable from "./readable.ts";
-import type Stream from "./stream.ts";
-import { destroyer } from "./destroy.ts";
-
-const kLastResolve = Symbol("lastResolve");
-const kLastReject = Symbol("lastReject");
-const kError = Symbol("error");
-const kEnded = Symbol("ended");
-const kLastPromise = Symbol("lastPromise");
-const kHandlePromise = Symbol("handlePromise");
-const kStream = Symbol("stream");
-
-// TODO(Soremwar)
-// Add Duplex streams
-type IterableStreams = Stream | Readable;
-
-type IterableItem = Buffer | string | Uint8Array | undefined;
-type ReadableIteratorResult = IteratorResult<IterableItem>;
-
-function initIteratorSymbols(
- o: ReadableStreamAsyncIterator,
- symbols: symbol[],
-) {
- const properties: PropertyDescriptorMap = {};
- for (const sym in symbols) {
- properties[sym] = {
- configurable: false,
- enumerable: false,
- writable: true,
- };
- }
- Object.defineProperties(o, properties);
-}
-
-function createIterResult(
- value: IterableItem,
- done: boolean,
-): ReadableIteratorResult {
- return { value, done };
-}
-
-function readAndResolve(iter: ReadableStreamAsyncIterator) {
- const resolve = iter[kLastResolve];
- if (resolve !== null) {
- const data = iter[kStream].read();
- if (data !== null) {
- iter[kLastPromise] = null;
- iter[kLastResolve] = null;
- iter[kLastReject] = null;
- resolve(createIterResult(data, false));
- }
- }
-}
-
-function onReadable(iter: ReadableStreamAsyncIterator) {
- queueMicrotask(() => readAndResolve(iter));
-}
-
-function wrapForNext(
- lastPromise: Promise<ReadableIteratorResult>,
- iter: ReadableStreamAsyncIterator,
-) {
- return (
- resolve: (value: ReadableIteratorResult) => void,
- reject: (error: Error) => void,
- ) => {
- lastPromise.then(() => {
- if (iter[kEnded]) {
- resolve(createIterResult(undefined, true));
- return;
- }
-
- iter[kHandlePromise](resolve, reject);
- }, reject);
- };
-}
-
-function finish(self: ReadableStreamAsyncIterator, err?: Error) {
- return new Promise(
- (
- resolve: (result: ReadableIteratorResult) => void,
- reject: (error: Error) => void,
- ) => {
- const stream = self[kStream];
-
- finished(stream, (err) => {
- if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
- reject(err);
- } else {
- resolve(createIterResult(undefined, true));
- }
- });
- destroyer(stream, err);
- },
- );
-}
-
-const AsyncIteratorPrototype = Object.getPrototypeOf(
- Object.getPrototypeOf(async function* () {}).prototype,
-);
-
-export class ReadableStreamAsyncIterator
- implements AsyncIterableIterator<IterableItem> {
- [kEnded]: boolean;
- [kError]: Error | null = null;
- [kHandlePromise] = (
- resolve: (value: ReadableIteratorResult) => void,
- reject: (value: Error) => void,
- ) => {
- const data = this[kStream].read();
- if (data) {
- this[kLastPromise] = null;
- this[kLastResolve] = null;
- this[kLastReject] = null;
- resolve(createIterResult(data, false));
- } else {
- this[kLastResolve] = resolve;
- this[kLastReject] = reject;
- }
- };
- [kLastPromise]: null | Promise<ReadableIteratorResult>;
- [kLastReject]: null | ((value: Error) => void) = null;
- [kLastResolve]: null | ((value: ReadableIteratorResult) => void) = null;
- [kStream]: Readable;
- [Symbol.asyncIterator] = AsyncIteratorPrototype[Symbol.asyncIterator];
-
- constructor(stream: Readable) {
- this[kEnded] = stream.readableEnded || stream._readableState.endEmitted;
- this[kStream] = stream;
- initIteratorSymbols(this, [
- kEnded,
- kError,
- kHandlePromise,
- kLastPromise,
- kLastReject,
- kLastResolve,
- kStream,
- ]);
- }
-
- get stream() {
- return this[kStream];
- }
-
- next(): Promise<ReadableIteratorResult> {
- const error = this[kError];
- if (error !== null) {
- return Promise.reject(error);
- }
-
- if (this[kEnded]) {
- return Promise.resolve(createIterResult(undefined, true));
- }
-
- if (this[kStream].destroyed) {
- return new Promise((resolve, reject) => {
- if (this[kError]) {
- reject(this[kError]);
- } else if (this[kEnded]) {
- resolve(createIterResult(undefined, true));
- } else {
- finished(this[kStream], (err) => {
- if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
- reject(err);
- } else {
- resolve(createIterResult(undefined, true));
- }
- });
- }
- });
- }
-
- const lastPromise = this[kLastPromise];
- let promise;
-
- if (lastPromise) {
- promise = new Promise(wrapForNext(lastPromise, this));
- } else {
- const data = this[kStream].read();
- if (data !== null) {
- return Promise.resolve(createIterResult(data, false));
- }
-
- promise = new Promise(this[kHandlePromise]);
- }
-
- this[kLastPromise] = promise;
-
- return promise;
- }
-
- return(): Promise<ReadableIteratorResult> {
- return finish(this);
- }
-
- throw(err: Error): Promise<ReadableIteratorResult> {
- return finish(this, err);
- }
-}
-
-const createReadableStreamAsyncIterator = (stream: IterableStreams) => {
- // deno-lint-ignore no-explicit-any
- if (typeof (stream as any).read !== "function") {
- const src = stream;
- stream = new Readable({ objectMode: true }).wrap(src);
- finished(stream, (err) => destroyer(src, err));
- }
-
- const iterator = new ReadableStreamAsyncIterator(stream as Readable);
- iterator[kLastPromise] = null;
-
- finished(stream, { writable: false }, (err) => {
- if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
- const reject = iterator[kLastReject];
- if (reject !== null) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- reject(err);
- }
- iterator[kError] = err;
- return;
- }
-
- const resolve = iterator[kLastResolve];
- if (resolve !== null) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- resolve(createIterResult(undefined, true));
- }
- iterator[kEnded] = true;
- });
-
- stream.on("readable", onReadable.bind(null, iterator));
-
- return iterator;
-};
-
-export default createReadableStreamAsyncIterator;
diff --git a/std/node/_stream/async_iterator_test.ts b/std/node/_stream/async_iterator_test.ts
deleted file mode 100644
index 17698e0fd..000000000
--- a/std/node/_stream/async_iterator_test.ts
+++ /dev/null
@@ -1,249 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import Readable from "./readable.ts";
-import Stream from "./stream.ts";
-import toReadableAsyncIterator from "./async_iterator.ts";
-import { deferred } from "../../async/mod.ts";
-import { assertEquals, assertThrowsAsync } from "../../testing/asserts.ts";
-
-Deno.test("Stream to async iterator", async () => {
- let destroyExecuted = 0;
- const destroyExecutedExpected = 1;
- const destroyExpectedExecutions = deferred();
-
- class AsyncIteratorStream extends Stream {
- constructor() {
- super();
- }
-
- destroy() {
- destroyExecuted++;
- if (destroyExecuted == destroyExecutedExpected) {
- destroyExpectedExecutions.resolve();
- }
- }
-
- [Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator];
- }
-
- const stream = new AsyncIteratorStream();
-
- queueMicrotask(() => {
- stream.emit("data", "hello");
- stream.emit("data", "world");
- stream.emit("end");
- });
-
- let res = "";
-
- for await (const d of stream) {
- res += d;
- }
- assertEquals(res, "helloworld");
-
- const destroyTimeout = setTimeout(
- () => destroyExpectedExecutions.reject(),
- 1000,
- );
- await destroyExpectedExecutions;
- clearTimeout(destroyTimeout);
- assertEquals(destroyExecuted, destroyExecutedExpected);
-});
-
-Deno.test("Stream to async iterator throws on 'error' emitted", async () => {
- let closeExecuted = 0;
- const closeExecutedExpected = 1;
- const closeExpectedExecutions = deferred();
-
- let errorExecuted = 0;
- const errorExecutedExpected = 1;
- const errorExpectedExecutions = deferred();
-
- class StreamImplementation extends Stream {
- close() {
- closeExecuted++;
- if (closeExecuted == closeExecutedExpected) {
- closeExpectedExecutions.resolve();
- }
- }
- }
-
- const stream = new StreamImplementation();
- queueMicrotask(() => {
- stream.emit("data", 0);
- stream.emit("data", 1);
- stream.emit("error", new Error("asd"));
- });
-
- toReadableAsyncIterator(stream)
- .next()
- .catch((err) => {
- errorExecuted++;
- if (errorExecuted == errorExecutedExpected) {
- errorExpectedExecutions.resolve();
- }
- assertEquals(err.message, "asd");
- });
-
- const closeTimeout = setTimeout(
- () => closeExpectedExecutions.reject(),
- 1000,
- );
- const errorTimeout = setTimeout(
- () => errorExpectedExecutions.reject(),
- 1000,
- );
- await closeExpectedExecutions;
- await errorExpectedExecutions;
- clearTimeout(closeTimeout);
- clearTimeout(errorTimeout);
- assertEquals(closeExecuted, closeExecutedExpected);
- assertEquals(errorExecuted, errorExecutedExpected);
-});
-
-Deno.test("Async iterator matches values of Readable", async () => {
- const readable = new Readable({
- objectMode: true,
- read() {},
- });
- readable.push(0);
- readable.push(1);
- readable.push(null);
-
- const iter = readable[Symbol.asyncIterator]();
-
- assertEquals(
- await iter.next().then(({ value }) => value),
- 0,
- );
- for await (const d of iter) {
- assertEquals(d, 1);
- }
-});
-
-Deno.test("Async iterator throws on Readable destroyed sync", async () => {
- const message = "kaboom from read";
-
- const readable = new Readable({
- objectMode: true,
- read() {
- this.destroy(new Error(message));
- },
- });
-
- await assertThrowsAsync(
- async () => {
- // deno-lint-ignore no-empty
- for await (const k of readable) {}
- },
- Error,
- message,
- );
-});
-
-Deno.test("Async iterator throws on Readable destroyed async", async () => {
- const message = "kaboom";
- const readable = new Readable({
- read() {},
- });
- const iterator = readable[Symbol.asyncIterator]();
-
- readable.destroy(new Error(message));
-
- await assertThrowsAsync(
- iterator.next.bind(iterator),
- Error,
- message,
- );
-});
-
-Deno.test("Async iterator finishes the iterator when Readable destroyed", async () => {
- const readable = new Readable({
- read() {},
- });
-
- readable.destroy();
-
- const { done } = await readable[Symbol.asyncIterator]().next();
- assertEquals(done, true);
-});
-
-Deno.test("Async iterator finishes all item promises when Readable destroyed", async () => {
- const r = new Readable({
- objectMode: true,
- read() {
- },
- });
-
- const b = r[Symbol.asyncIterator]();
- const c = b.next();
- const d = b.next();
- r.destroy();
- assertEquals(await c, { done: true, value: undefined });
- assertEquals(await d, { done: true, value: undefined });
-});
-
-Deno.test("Async iterator: 'next' is triggered by Readable push", async () => {
- const max = 42;
- let readed = 0;
- let received = 0;
- const readable = new Readable({
- objectMode: true,
- read() {
- this.push("hello");
- if (++readed === max) {
- this.push(null);
- }
- },
- });
-
- for await (const k of readable) {
- received++;
- assertEquals(k, "hello");
- }
-
- assertEquals(readed, received);
-});
-
-Deno.test("Async iterator: 'close' called on forced iteration end", async () => {
- let closeExecuted = 0;
- const closeExecutedExpected = 1;
- const closeExpectedExecutions = deferred();
-
- class IndestructibleReadable extends Readable {
- constructor() {
- super({
- autoDestroy: false,
- read() {},
- });
- }
-
- close() {
- closeExecuted++;
- if (closeExecuted == closeExecutedExpected) {
- closeExpectedExecutions.resolve();
- }
- readable.emit("close");
- }
-
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- destroy = null;
- }
-
- const readable = new IndestructibleReadable();
- readable.push("asd");
- readable.push("asd");
-
- // eslint-disable-next-line @typescript-eslint/no-unused-vars
- for await (const d of readable) {
- break;
- }
-
- const closeTimeout = setTimeout(
- () => closeExpectedExecutions.reject(),
- 1000,
- );
- await closeExpectedExecutions;
- clearTimeout(closeTimeout);
- assertEquals(closeExecuted, closeExecutedExpected);
-});
diff --git a/std/node/_stream/buffer_list.ts b/std/node/_stream/buffer_list.ts
deleted file mode 100644
index fe1a693c0..000000000
--- a/std/node/_stream/buffer_list.ts
+++ /dev/null
@@ -1,183 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-
-type BufferListItem = {
- data: Buffer | string | Uint8Array;
- next: BufferListItem | null;
-};
-
-export default class BufferList {
- head: BufferListItem | null = null;
- tail: BufferListItem | null = null;
- length: number;
-
- constructor() {
- this.head = null;
- this.tail = null;
- this.length = 0;
- }
-
- push(v: Buffer | string | Uint8Array) {
- const entry = { data: v, next: null };
- if (this.length > 0) {
- (this.tail as BufferListItem).next = entry;
- } else {
- this.head = entry;
- }
- this.tail = entry;
- ++this.length;
- }
-
- unshift(v: Buffer | string | Uint8Array) {
- const entry = { data: v, next: this.head };
- if (this.length === 0) {
- this.tail = entry;
- }
- this.head = entry;
- ++this.length;
- }
-
- shift() {
- if (this.length === 0) {
- return;
- }
- const ret = (this.head as BufferListItem).data;
- if (this.length === 1) {
- this.head = this.tail = null;
- } else {
- this.head = (this.head as BufferListItem).next;
- }
- --this.length;
- return ret;
- }
-
- clear() {
- this.head = this.tail = null;
- this.length = 0;
- }
-
- join(s: string) {
- if (this.length === 0) {
- return "";
- }
- let p: BufferListItem | null = (this.head as BufferListItem);
- let ret = "" + p.data;
- p = p.next;
- while (p) {
- ret += s + p.data;
- p = p.next;
- }
- return ret;
- }
-
- concat(n: number) {
- if (this.length === 0) {
- return Buffer.alloc(0);
- }
- const ret = Buffer.allocUnsafe(n >>> 0);
- let p = this.head;
- let i = 0;
- while (p) {
- ret.set(p.data as Buffer, i);
- i += p.data.length;
- p = p.next;
- }
- return ret;
- }
-
- // Consumes a specified amount of bytes or characters from the buffered data.
- consume(n: number, hasStrings: boolean) {
- const data = (this.head as BufferListItem).data;
- if (n < data.length) {
- // `slice` is the same for buffers and strings.
- const slice = data.slice(0, n);
- (this.head as BufferListItem).data = data.slice(n);
- return slice;
- }
- if (n === data.length) {
- // First chunk is a perfect match.
- return this.shift();
- }
- // Result spans more than one buffer.
- return hasStrings ? this._getString(n) : this._getBuffer(n);
- }
-
- first() {
- return (this.head as BufferListItem).data;
- }
-
- *[Symbol.iterator]() {
- for (let p = this.head; p; p = p.next) {
- yield p.data;
- }
- }
-
- // Consumes a specified amount of characters from the buffered data.
- _getString(n: number) {
- let ret = "";
- let p: BufferListItem | null = (this.head as BufferListItem);
- let c = 0;
- p = p.next as BufferListItem;
- do {
- const str = p.data;
- if (n > str.length) {
- ret += str;
- n -= str.length;
- } else {
- if (n === str.length) {
- ret += str;
- ++c;
- if (p.next) {
- this.head = p.next;
- } else {
- this.head = this.tail = null;
- }
- } else {
- ret += str.slice(0, n);
- this.head = p;
- p.data = str.slice(n);
- }
- break;
- }
- ++c;
- p = p.next;
- } while (p);
- this.length -= c;
- return ret;
- }
-
- // Consumes a specified amount of bytes from the buffered data.
- _getBuffer(n: number) {
- const ret = Buffer.allocUnsafe(n);
- const retLen = n;
- let p: BufferListItem | null = (this.head as BufferListItem);
- let c = 0;
- p = p.next as BufferListItem;
- do {
- const buf = p.data as Buffer;
- if (n > buf.length) {
- ret.set(buf, retLen - n);
- n -= buf.length;
- } else {
- if (n === buf.length) {
- ret.set(buf, retLen - n);
- ++c;
- if (p.next) {
- this.head = p.next;
- } else {
- this.head = this.tail = null;
- }
- } else {
- ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n);
- this.head = p;
- p.data = buf.slice(n);
- }
- break;
- }
- ++c;
- p = p.next;
- } while (p);
- this.length -= c;
- return ret;
- }
-}
diff --git a/std/node/_stream/destroy.ts b/std/node/_stream/destroy.ts
deleted file mode 100644
index d13e12de2..000000000
--- a/std/node/_stream/destroy.ts
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type Duplex from "./duplex.ts";
-import type Readable from "./readable.ts";
-import type Stream from "./stream.ts";
-import type Writable from "./writable.ts";
-
-//This whole module acts as a 'normalizer'
-//Idea behind it is you can pass any kind of streams and functions will execute anyways
-
-//TODO(Soremwar)
-//Should be any implementation of stream
-//This is a guard to check executed methods exist inside the implementation
-type StreamImplementations = Duplex | Readable | Writable;
-
-// TODO(Soremwar)
-// Bring back once requests are implemented
-// function isRequest(stream: any) {
-// return stream && stream.setHeader && typeof stream.abort === "function";
-// }
-
-export function destroyer(stream: Stream, err?: Error | null) {
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // if (isRequest(stream)) return stream.abort();
- // if (isRequest(stream.req)) return stream.req.abort();
- if (
- typeof (stream as StreamImplementations).destroy === "function"
- ) {
- return (stream as StreamImplementations).destroy(err);
- }
- // A test of async iterator mocks an upcoming implementation of stream
- // his is casted to any in the meanwhile
- // deno-lint-ignore no-explicit-any
- if (typeof (stream as any).close === "function") {
- // deno-lint-ignore no-explicit-any
- return (stream as any).close();
- }
-}
diff --git a/std/node/_stream/duplex.ts b/std/node/_stream/duplex.ts
deleted file mode 100644
index b5c429f0a..000000000
--- a/std/node/_stream/duplex.ts
+++ /dev/null
@@ -1,682 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { captureRejectionSymbol } from "../events.ts";
-import Readable, { ReadableState } from "./readable.ts";
-import Stream from "./stream.ts";
-import Writable, { WritableState } from "./writable.ts";
-import { Buffer } from "../buffer.ts";
-import {
- ERR_STREAM_ALREADY_FINISHED,
- ERR_STREAM_DESTROYED,
- ERR_UNKNOWN_ENCODING,
-} from "../_errors.ts";
-import type { Encodings } from "../_utils.ts";
-import createReadableStreamAsyncIterator from "./async_iterator.ts";
-import type { ReadableStreamAsyncIterator } from "./async_iterator.ts";
-import {
- _destroy,
- computeNewHighWaterMark,
- emitReadable,
- fromList,
- howMuchToRead,
- nReadingNextTick,
- updateReadableListening,
-} from "./readable_internal.ts";
-import { kOnFinished, writeV } from "./writable_internal.ts";
-import {
- endDuplex,
- finishMaybe,
- onwrite,
- readableAddChunk,
-} from "./duplex_internal.ts";
-export { errorOrDestroy } from "./duplex_internal.ts";
-
-export interface DuplexOptions {
- allowHalfOpen?: boolean;
- autoDestroy?: boolean;
- decodeStrings?: boolean;
- defaultEncoding?: Encodings;
- destroy?(
- this: Duplex,
- error: Error | null,
- callback: (error: Error | null) => void,
- ): void;
- emitClose?: boolean;
- encoding?: Encodings;
- final?(this: Duplex, callback: (error?: Error | null) => void): void;
- highWaterMark?: number;
- objectMode?: boolean;
- read?(this: Duplex, size: number): void;
- readable?: boolean;
- readableHighWaterMark?: number;
- readableObjectMode?: boolean;
- writable?: boolean;
- writableCorked?: number;
- writableHighWaterMark?: number;
- writableObjectMode?: boolean;
- write?(
- this: Duplex,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: Encodings,
- callback: (error?: Error | null) => void,
- ): void;
- writev?: writeV;
-}
-
-interface Duplex extends Readable, Writable {}
-
-/**
- * A duplex is an implementation of a stream that has both Readable and Writable
- * attributes and capabilities
- */
-class Duplex extends Stream {
- allowHalfOpen = true;
- _final?: (
- callback: (error?: Error | null | undefined) => void,
- ) => void;
- _readableState: ReadableState;
- _writableState: WritableState;
- _writev?: writeV | null;
-
- constructor(options?: DuplexOptions) {
- super();
-
- if (options) {
- if (options.allowHalfOpen === false) {
- this.allowHalfOpen = false;
- }
- if (typeof options.destroy === "function") {
- this._destroy = options.destroy;
- }
- if (typeof options.final === "function") {
- this._final = options.final;
- }
- if (typeof options.read === "function") {
- this._read = options.read;
- }
- if (options.readable === false) {
- this.readable = false;
- }
- if (options.writable === false) {
- this.writable = false;
- }
- if (typeof options.write === "function") {
- this._write = options.write;
- }
- if (typeof options.writev === "function") {
- this._writev = options.writev;
- }
- }
-
- const readableOptions = {
- autoDestroy: options?.autoDestroy,
- defaultEncoding: options?.defaultEncoding,
- destroy: options?.destroy as unknown as (
- this: Readable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ) => void,
- emitClose: options?.emitClose,
- encoding: options?.encoding,
- highWaterMark: options?.highWaterMark ?? options?.readableHighWaterMark,
- objectMode: options?.objectMode ?? options?.readableObjectMode,
- read: options?.read as unknown as (this: Readable) => void,
- };
-
- const writableOptions = {
- autoDestroy: options?.autoDestroy,
- decodeStrings: options?.decodeStrings,
- defaultEncoding: options?.defaultEncoding,
- destroy: options?.destroy as unknown as (
- this: Writable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ) => void,
- emitClose: options?.emitClose,
- final: options?.final as unknown as (
- this: Writable,
- callback: (error?: Error | null) => void,
- ) => void,
- highWaterMark: options?.highWaterMark ?? options?.writableHighWaterMark,
- objectMode: options?.objectMode ?? options?.writableObjectMode,
- write: options?.write as unknown as (
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- callback: (error?: Error | null) => void,
- ) => void,
- writev: options?.writev as unknown as (
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: Encodings }>,
- callback: (error?: Error | null) => void,
- ) => void,
- };
-
- this._readableState = new ReadableState(readableOptions);
- this._writableState = new WritableState(
- writableOptions,
- this as unknown as Writable,
- );
- //Very important to override onwrite here, duplex implementation adds a check
- //on the readable side
- this._writableState.onwrite = onwrite.bind(undefined, this);
- }
-
- [captureRejectionSymbol](err?: Error) {
- this.destroy(err);
- }
-
- [Symbol.asyncIterator](): ReadableStreamAsyncIterator {
- return createReadableStreamAsyncIterator(this);
- }
-
- _destroy(
- error: Error | null,
- callback: (error?: Error | null) => void,
- ): void {
- callback(error);
- }
-
- _read = Readable.prototype._read;
-
- _undestroy = Readable.prototype._undestroy;
-
- destroy(err?: Error | null, cb?: (error?: Error | null) => void) {
- const r = this._readableState;
- const w = this._writableState;
-
- if (w.destroyed || r.destroyed) {
- if (typeof cb === "function") {
- cb();
- }
-
- return this;
- }
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- w.destroyed = true;
- r.destroyed = true;
-
- this._destroy(err || null, (err) => {
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- w.closed = true;
- r.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- const r = this._readableState;
- const w = this._writableState;
-
- if (!w.errorEmitted && !r.errorEmitted) {
- w.errorEmitted = true;
- r.errorEmitted = true;
-
- this.emit("error", err);
- }
-
- r.closeEmitted = true;
-
- if (w.emitClose || r.emitClose) {
- this.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- const r = this._readableState;
- const w = this._writableState;
-
- r.closeEmitted = true;
-
- if (w.emitClose || r.emitClose) {
- this.emit("close");
- }
- });
- }
- });
-
- return this;
- }
-
- isPaused = Readable.prototype.isPaused;
-
- off = this.removeListener;
-
- on(
- event: "close" | "end" | "pause" | "readable" | "resume",
- listener: () => void,
- ): this;
- // deno-lint-ignore no-explicit-any
- on(event: "data", listener: (chunk: any) => void): this;
- on(event: "error", listener: (err: Error) => void): this;
- // deno-lint-ignore no-explicit-any
- on(event: string | symbol, listener: (...args: any[]) => void): this;
- on(
- ev: string | symbol,
- fn:
- | (() => void)
- // deno-lint-ignore no-explicit-any
- | ((chunk: any) => void)
- | ((err: Error) => void)
- // deno-lint-ignore no-explicit-any
- | ((...args: any[]) => void),
- ) {
- const res = super.on.call(this, ev, fn);
- const state = this._readableState;
-
- if (ev === "data") {
- state.readableListening = this.listenerCount("readable") > 0;
-
- if (state.flowing !== false) {
- this.resume();
- }
- } else if (ev === "readable") {
- if (!state.endEmitted && !state.readableListening) {
- state.readableListening = state.needReadable = true;
- state.flowing = false;
- state.emittedReadable = false;
- if (state.length) {
- emitReadable(this);
- } else if (!state.reading) {
- queueMicrotask(() => nReadingNextTick(this));
- }
- }
- }
-
- return res;
- }
-
- pause = Readable.prototype.pause as () => this;
-
- pipe = Readable.prototype.pipe;
-
- // deno-lint-ignore no-explicit-any
- push(chunk: any, encoding?: Encodings): boolean {
- return readableAddChunk(this, chunk, encoding, false);
- }
-
- /** You can override either this method, or the async `_read` method */
- read(n?: number) {
- // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
- // in this scenario, so we are doing it manually.
- if (n === undefined) {
- n = NaN;
- }
- const state = this._readableState;
- const nOrig = n;
-
- if (n > state.highWaterMark) {
- state.highWaterMark = computeNewHighWaterMark(n);
- }
-
- if (n !== 0) {
- state.emittedReadable = false;
- }
-
- if (
- n === 0 &&
- state.needReadable &&
- ((state.highWaterMark !== 0
- ? state.length >= state.highWaterMark
- : state.length > 0) ||
- state.ended)
- ) {
- if (state.length === 0 && state.ended) {
- endDuplex(this);
- } else {
- emitReadable(this);
- }
- return null;
- }
-
- n = howMuchToRead(n, state);
-
- if (n === 0 && state.ended) {
- if (state.length === 0) {
- endDuplex(this);
- }
- return null;
- }
-
- let doRead = state.needReadable;
- if (
- state.length === 0 || state.length - (n as number) < state.highWaterMark
- ) {
- doRead = true;
- }
-
- if (
- state.ended || state.reading || state.destroyed || state.errored ||
- !state.constructed
- ) {
- doRead = false;
- } else if (doRead) {
- state.reading = true;
- state.sync = true;
- if (state.length === 0) {
- state.needReadable = true;
- }
- this._read();
- state.sync = false;
- if (!state.reading) {
- n = howMuchToRead(nOrig, state);
- }
- }
-
- let ret;
- if ((n as number) > 0) {
- ret = fromList((n as number), state);
- } else {
- ret = null;
- }
-
- if (ret === null) {
- state.needReadable = state.length <= state.highWaterMark;
- n = 0;
- } else {
- state.length -= n as number;
- if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).clear();
- } else {
- state.awaitDrainWriters = null;
- }
- }
-
- if (state.length === 0) {
- if (!state.ended) {
- state.needReadable = true;
- }
-
- if (nOrig !== n && state.ended) {
- endDuplex(this);
- }
- }
-
- if (ret !== null) {
- this.emit("data", ret);
- }
-
- return ret;
- }
-
- removeAllListeners(
- ev:
- | "close"
- | "data"
- | "end"
- | "error"
- | "pause"
- | "readable"
- | "resume"
- | symbol
- | undefined,
- ) {
- const res = super.removeAllListeners(ev);
-
- if (ev === "readable" || ev === undefined) {
- queueMicrotask(() => updateReadableListening(this));
- }
-
- return res;
- }
-
- removeListener(
- event: "close" | "end" | "pause" | "readable" | "resume",
- listener: () => void,
- ): this;
- // deno-lint-ignore no-explicit-any
- removeListener(event: "data", listener: (chunk: any) => void): this;
- removeListener(event: "error", listener: (err: Error) => void): this;
- removeListener(
- event: string | symbol,
- // deno-lint-ignore no-explicit-any
- listener: (...args: any[]) => void,
- ): this;
- removeListener(
- ev: string | symbol,
- fn:
- | (() => void)
- // deno-lint-ignore no-explicit-any
- | ((chunk: any) => void)
- | ((err: Error) => void)
- // deno-lint-ignore no-explicit-any
- | ((...args: any[]) => void),
- ) {
- const res = super.removeListener.call(this, ev, fn);
-
- if (ev === "readable") {
- queueMicrotask(() => updateReadableListening(this));
- }
-
- return res;
- }
-
- resume = Readable.prototype.resume as () => this;
-
- setEncoding = Readable.prototype.setEncoding as (enc: string) => this;
-
- // deno-lint-ignore no-explicit-any
- unshift(chunk: any, encoding?: Encodings): boolean {
- return readableAddChunk(this, chunk, encoding, true);
- }
-
- unpipe = Readable.prototype.unpipe as (dest?: Writable | undefined) => this;
-
- wrap = Readable.prototype.wrap as (stream: Stream) => this;
-
- get readable(): boolean {
- return this._readableState?.readable &&
- !this._readableState?.destroyed &&
- !this._readableState?.errorEmitted &&
- !this._readableState?.endEmitted;
- }
- set readable(val: boolean) {
- if (this._readableState) {
- this._readableState.readable = val;
- }
- }
-
- get readableHighWaterMark(): number {
- return this._readableState.highWaterMark;
- }
-
- get readableBuffer() {
- return this._readableState && this._readableState.buffer;
- }
-
- get readableFlowing(): boolean | null {
- return this._readableState.flowing;
- }
-
- set readableFlowing(state: boolean | null) {
- if (this._readableState) {
- this._readableState.flowing = state;
- }
- }
-
- get readableLength() {
- return this._readableState.length;
- }
-
- get readableObjectMode() {
- return this._readableState ? this._readableState.objectMode : false;
- }
-
- get readableEncoding() {
- return this._readableState ? this._readableState.encoding : null;
- }
-
- get readableEnded() {
- return this._readableState ? this._readableState.endEmitted : false;
- }
-
- _write = Writable.prototype._write;
-
- write = Writable.prototype.write;
-
- cork = Writable.prototype.cork;
-
- uncork = Writable.prototype.uncork;
-
- setDefaultEncoding(encoding: string) {
- // node::ParseEncoding() requires lower case.
- if (typeof encoding === "string") {
- encoding = encoding.toLowerCase();
- }
- if (!Buffer.isEncoding(encoding)) {
- throw new ERR_UNKNOWN_ENCODING(encoding);
- }
- this._writableState.defaultEncoding = encoding as Encodings;
- return this;
- }
-
- end(cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, encoding: Encodings, cb?: () => void): void;
-
- end(
- // deno-lint-ignore no-explicit-any
- x?: any | (() => void),
- y?: Encodings | (() => void),
- z?: () => void,
- ) {
- const state = this._writableState;
- // deno-lint-ignore no-explicit-any
- let chunk: any | null;
- let encoding: Encodings | null;
- let cb: undefined | ((error?: Error) => void);
-
- if (typeof x === "function") {
- chunk = null;
- encoding = null;
- cb = x;
- } else if (typeof y === "function") {
- chunk = x;
- encoding = null;
- cb = y;
- } else {
- chunk = x;
- encoding = y as Encodings;
- cb = z;
- }
-
- if (chunk !== null && chunk !== undefined) {
- this.write(chunk, encoding);
- }
-
- if (state.corked) {
- state.corked = 1;
- this.uncork();
- }
-
- let err: Error | undefined;
- if (!state.errored && !state.ending) {
- state.ending = true;
- finishMaybe(this, state, true);
- state.ended = true;
- } else if (state.finished) {
- err = new ERR_STREAM_ALREADY_FINISHED("end");
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("end");
- }
-
- if (typeof cb === "function") {
- if (err || state.finished) {
- queueMicrotask(() => {
- (cb as (error?: Error | undefined) => void)(err);
- });
- } else {
- state[kOnFinished].push(cb);
- }
- }
-
- return this;
- }
-
- get destroyed() {
- if (
- this._readableState === undefined ||
- this._writableState === undefined
- ) {
- return false;
- }
- return this._readableState.destroyed && this._writableState.destroyed;
- }
-
- set destroyed(value: boolean) {
- if (this._readableState && this._writableState) {
- this._readableState.destroyed = value;
- this._writableState.destroyed = value;
- }
- }
-
- get writable() {
- const w = this._writableState;
- return !w.destroyed && !w.errored && !w.ending && !w.ended;
- }
-
- set writable(val) {
- if (this._writableState) {
- this._writableState.writable = !!val;
- }
- }
-
- get writableFinished() {
- return this._writableState ? this._writableState.finished : false;
- }
-
- get writableObjectMode() {
- return this._writableState ? this._writableState.objectMode : false;
- }
-
- get writableBuffer() {
- return this._writableState && this._writableState.getBuffer();
- }
-
- get writableEnded() {
- return this._writableState ? this._writableState.ending : false;
- }
-
- get writableHighWaterMark() {
- return this._writableState && this._writableState.highWaterMark;
- }
-
- get writableCorked() {
- return this._writableState ? this._writableState.corked : 0;
- }
-
- get writableLength() {
- return this._writableState && this._writableState.length;
- }
-}
-
-export default Duplex;
diff --git a/std/node/_stream/duplex_internal.ts b/std/node/_stream/duplex_internal.ts
deleted file mode 100644
index bfd9749f8..000000000
--- a/std/node/_stream/duplex_internal.ts
+++ /dev/null
@@ -1,296 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type { ReadableState } from "./readable.ts";
-import { addChunk, maybeReadMore, onEofChunk } from "./readable_internal.ts";
-import type Writable from "./writable.ts";
-import type { WritableState } from "./writable.ts";
-import {
- afterWrite,
- AfterWriteTick,
- afterWriteTick,
- clearBuffer,
- errorBuffer,
- kOnFinished,
- needFinish,
- prefinish,
-} from "./writable_internal.ts";
-import { Buffer } from "../buffer.ts";
-import type Duplex from "./duplex.ts";
-import {
- ERR_MULTIPLE_CALLBACK,
- ERR_STREAM_PUSH_AFTER_EOF,
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
-} from "../_errors.ts";
-
-export function endDuplex(stream: Duplex) {
- const state = stream._readableState;
-
- if (!state.endEmitted) {
- state.ended = true;
- queueMicrotask(() => endReadableNT(state, stream));
- }
-}
-
-function endReadableNT(state: ReadableState, stream: Duplex) {
- // Check that we didn't get one last unshift.
- if (
- !state.errorEmitted && !state.closeEmitted &&
- !state.endEmitted && state.length === 0
- ) {
- state.endEmitted = true;
- stream.emit("end");
-
- if (stream.writable && stream.allowHalfOpen === false) {
- queueMicrotask(() => endWritableNT(state, stream));
- } else if (state.autoDestroy) {
- // In case of duplex streams we need a way to detect
- // if the writable side is ready for autoDestroy as well.
- const wState = stream._writableState;
- const autoDestroy = !wState || (
- wState.autoDestroy &&
- // We don't expect the writable to ever 'finish'
- // if writable is explicitly set to false.
- (wState.finished || wState.writable === false)
- );
-
- if (autoDestroy) {
- stream.destroy();
- }
- }
- }
-}
-
-function endWritableNT(state: ReadableState, stream: Duplex) {
- const writable = stream.writable &&
- !stream.writableEnded &&
- !stream.destroyed;
- if (writable) {
- stream.end();
- }
-}
-
-export function errorOrDestroy(
- // deno-lint-ignore no-explicit-any
- this: any,
- stream: Duplex,
- err: Error,
- sync = false,
-) {
- const r = stream._readableState;
- const w = stream._writableState;
-
- if (w.destroyed || r.destroyed) {
- return this;
- }
-
- if (r.autoDestroy || w.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (w && !w.errored) {
- w.errored = err;
- }
- if (r && !r.errored) {
- r.errored = err;
- }
-
- if (sync) {
- queueMicrotask(() => {
- if (w.errorEmitted || r.errorEmitted) {
- return;
- }
-
- w.errorEmitted = true;
- r.errorEmitted = true;
-
- stream.emit("error", err);
- });
- } else {
- if (w.errorEmitted || r.errorEmitted) {
- return;
- }
-
- w.errorEmitted = true;
- r.errorEmitted = true;
-
- stream.emit("error", err);
- }
- }
-}
-
-function finish(stream: Duplex, state: WritableState) {
- state.pendingcb--;
- if (state.errorEmitted || state.closeEmitted) {
- return;
- }
-
- state.finished = true;
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback();
- }
-
- stream.emit("finish");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
-}
-
-export function finishMaybe(
- stream: Duplex,
- state: WritableState,
- sync?: boolean,
-) {
- if (needFinish(state)) {
- prefinish(stream as Writable, state);
- if (state.pendingcb === 0 && needFinish(state)) {
- state.pendingcb++;
- if (sync) {
- queueMicrotask(() => finish(stream, state));
- } else {
- finish(stream, state);
- }
- }
- }
-}
-
-export function onwrite(stream: Duplex, er?: Error | null) {
- const state = stream._writableState;
- const sync = state.sync;
- const cb = state.writecb;
-
- if (typeof cb !== "function") {
- errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
- return;
- }
-
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
-
- if (er) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- er.stack;
-
- if (!state.errored) {
- state.errored = er;
- }
-
- if (stream._readableState && !stream._readableState.errored) {
- stream._readableState.errored = er;
- }
-
- if (sync) {
- queueMicrotask(() => onwriteError(stream, state, er, cb));
- } else {
- onwriteError(stream, state, er, cb);
- }
- } else {
- if (state.buffered.length > state.bufferedIndex) {
- clearBuffer(stream, state);
- }
-
- if (sync) {
- if (
- state.afterWriteTickInfo !== null &&
- state.afterWriteTickInfo.cb === cb
- ) {
- state.afterWriteTickInfo.count++;
- } else {
- state.afterWriteTickInfo = {
- count: 1,
- cb: (cb as (error?: Error) => void),
- stream: stream as Writable,
- state,
- };
- queueMicrotask(() =>
- afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
- );
- }
- } else {
- afterWrite(stream as Writable, state, 1, cb as (error?: Error) => void);
- }
- }
-}
-
-function onwriteError(
- stream: Duplex,
- state: WritableState,
- er: Error,
- cb: (error: Error) => void,
-) {
- --state.pendingcb;
-
- cb(er);
- errorBuffer(state);
- errorOrDestroy(stream, er);
-}
-
-export function readableAddChunk(
- stream: Duplex,
- chunk: string | Buffer | Uint8Array | null,
- encoding: undefined | string = undefined,
- addToFront: boolean,
-) {
- const state = stream._readableState;
- let usedEncoding = encoding;
-
- let err;
- if (!state.objectMode) {
- if (typeof chunk === "string") {
- usedEncoding = encoding || state.defaultEncoding;
- if (state.encoding !== usedEncoding) {
- if (addToFront && state.encoding) {
- chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
- } else {
- chunk = Buffer.from(chunk, usedEncoding);
- usedEncoding = "";
- }
- }
- } else if (chunk instanceof Uint8Array) {
- chunk = Buffer.from(chunk);
- }
- }
-
- if (err) {
- errorOrDestroy(stream, err);
- } else if (chunk === null) {
- state.reading = false;
- onEofChunk(stream, state);
- } else if (state.objectMode || (chunk.length > 0)) {
- if (addToFront) {
- if (state.endEmitted) {
- errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
- } else {
- addChunk(stream, state, chunk, true);
- }
- } else if (state.ended) {
- errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
- } else if (state.destroyed || state.errored) {
- return false;
- } else {
- state.reading = false;
- if (state.decoder && !usedEncoding) {
- //TODO(Soremwar)
- //I don't think this cast is right
- chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
- if (state.objectMode || chunk.length !== 0) {
- addChunk(stream, state, chunk, false);
- } else {
- maybeReadMore(stream, state);
- }
- } else {
- addChunk(stream, state, chunk, false);
- }
- }
- } else if (!addToFront) {
- state.reading = false;
- maybeReadMore(stream, state);
- }
-
- return !state.ended &&
- (state.length < state.highWaterMark || state.length === 0);
-}
diff --git a/std/node/_stream/duplex_test.ts b/std/node/_stream/duplex_test.ts
deleted file mode 100644
index 1596ec218..000000000
--- a/std/node/_stream/duplex_test.ts
+++ /dev/null
@@ -1,698 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Duplex from "./duplex.ts";
-import finished from "./end_of_stream.ts";
-import {
- assert,
- assertEquals,
- assertStrictEquals,
- assertThrows,
-} from "../../testing/asserts.ts";
-import { deferred, delay } from "../../async/mod.ts";
-
-Deno.test("Duplex stream works normally", () => {
- const stream = new Duplex({ objectMode: true });
-
- assert(stream._readableState.objectMode);
- assert(stream._writableState.objectMode);
- assert(stream.allowHalfOpen);
- assertEquals(stream.listenerCount("end"), 0);
-
- let written: { val: number };
- let read: { val: number };
-
- stream._write = (obj, _, cb) => {
- written = obj;
- cb();
- };
-
- stream._read = () => {};
-
- stream.on("data", (obj) => {
- read = obj;
- });
-
- stream.push({ val: 1 });
- stream.end({ val: 2 });
-
- stream.on("finish", () => {
- assertEquals(read.val, 1);
- assertEquals(written.val, 2);
- });
-});
-
-Deno.test("Duplex stream gets constructed correctly", () => {
- const d1 = new Duplex({
- objectMode: true,
- highWaterMark: 100,
- });
-
- assertEquals(d1.readableObjectMode, true);
- assertEquals(d1.readableHighWaterMark, 100);
- assertEquals(d1.writableObjectMode, true);
- assertEquals(d1.writableHighWaterMark, 100);
-
- const d2 = new Duplex({
- readableObjectMode: false,
- readableHighWaterMark: 10,
- writableObjectMode: true,
- writableHighWaterMark: 100,
- });
-
- assertEquals(d2.writableObjectMode, true);
- assertEquals(d2.writableHighWaterMark, 100);
- assertEquals(d2.readableObjectMode, false);
- assertEquals(d2.readableHighWaterMark, 10);
-});
-
-Deno.test("Duplex stream can be paused", () => {
- const readable = new Duplex();
-
- // _read is a noop, here.
- readable._read = () => {};
-
- // Default state of a stream is not "paused"
- assert(!readable.isPaused());
-
- // Make the stream start flowing...
- readable.on("data", () => {});
-
- // still not paused.
- assert(!readable.isPaused());
-
- readable.pause();
- assert(readable.isPaused());
- readable.resume();
- assert(!readable.isPaused());
-});
-
-Deno.test("Duplex stream sets enconding correctly", () => {
- const readable = new Duplex({
- read() {},
- });
-
- readable.setEncoding("utf8");
-
- readable.push(new TextEncoder().encode("DEF"));
- readable.unshift(new TextEncoder().encode("ABC"));
-
- assertStrictEquals(readable.read(), "ABCDEF");
-});
-
-Deno.test("Duplex stream sets encoding correctly", () => {
- const readable = new Duplex({
- read() {},
- });
-
- readable.setEncoding("utf8");
-
- readable.push(new TextEncoder().encode("DEF"));
- readable.unshift(new TextEncoder().encode("ABC"));
-
- assertStrictEquals(readable.read(), "ABCDEF");
-});
-
-Deno.test("Duplex stream holds up a big push", async () => {
- let readExecuted = 0;
- const readExecutedExpected = 3;
- const readExpectedExecutions = deferred();
-
- let endExecuted = 0;
- const endExecutedExpected = 1;
- const endExpectedExecutions = deferred();
-
- const str = "asdfasdfasdfasdfasdf";
-
- const r = new Duplex({
- highWaterMark: 5,
- encoding: "utf8",
- });
-
- let reads = 0;
-
- function _read() {
- if (reads === 0) {
- setTimeout(() => {
- r.push(str);
- }, 1);
- reads++;
- } else if (reads === 1) {
- const ret = r.push(str);
- assertEquals(ret, false);
- reads++;
- } else {
- r.push(null);
- }
- }
-
- r._read = () => {
- readExecuted++;
- if (readExecuted == readExecutedExpected) {
- readExpectedExecutions.resolve();
- }
- _read();
- };
-
- r.on("end", () => {
- endExecuted++;
- if (endExecuted == endExecutedExpected) {
- endExpectedExecutions.resolve();
- }
- });
-
- // Push some data in to start.
- // We've never gotten any read event at this point.
- const ret = r.push(str);
- assert(!ret);
- let chunk = r.read();
- assertEquals(chunk, str);
- chunk = r.read();
- assertEquals(chunk, null);
-
- r.once("readable", () => {
- // This time, we'll get *all* the remaining data, because
- // it's been added synchronously, as the read WOULD take
- // us below the hwm, and so it triggered a _read() again,
- // which synchronously added more, which we then return.
- chunk = r.read();
- assertEquals(chunk, str + str);
-
- chunk = r.read();
- assertEquals(chunk, null);
- });
-
- const readTimeout = setTimeout(
- () => readExpectedExecutions.reject(),
- 1000,
- );
- const endTimeout = setTimeout(
- () => endExpectedExecutions.reject(),
- 1000,
- );
- await readExpectedExecutions;
- await endExpectedExecutions;
- clearTimeout(readTimeout);
- clearTimeout(endTimeout);
- assertEquals(readExecuted, readExecutedExpected);
- assertEquals(endExecuted, endExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- const r = new Duplex({
- highWaterMark: 3,
- });
-
- r._read = () => {
- throw new Error("_read must not be called");
- };
- r.push(Buffer.from("blerg"));
-
- setTimeout(function () {
- assert(!r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- clearTimeout(readableTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- let readExecuted = 0;
- const readExecutedExpected = 1;
- const readExpectedExecutions = deferred();
-
- const r = new Duplex({
- highWaterMark: 3,
- });
-
- r._read = () => {
- readExecuted++;
- if (readExecuted == readExecutedExpected) {
- readExpectedExecutions.resolve();
- }
- };
-
- r.push(Buffer.from("bl"));
-
- setTimeout(function () {
- assert(r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- const readTimeout = setTimeout(
- () => readExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- await readExpectedExecutions;
- clearTimeout(readableTimeout);
- clearTimeout(readTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
- assertEquals(readExecuted, readExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- const r = new Duplex({
- highWaterMark: 30,
- });
-
- r._read = () => {
- throw new Error("Must not be executed");
- };
-
- r.push(Buffer.from("blerg"));
- //This ends the stream and triggers end
- r.push(null);
-
- setTimeout(function () {
- // Assert we're testing what we think we are
- assert(!r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- clearTimeout(readableTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'read' is emitted on empty string pushed in non-object mode", async () => {
- let endExecuted = 0;
- const endExecutedExpected = 1;
- const endExpectedExecutions = deferred();
-
- const underlyingData = ["", "x", "y", "", "z"];
- const expected = underlyingData.filter((data) => data);
- const result: unknown[] = [];
-
- const r = new Duplex({
- encoding: "utf8",
- });
- r._read = function () {
- queueMicrotask(() => {
- if (!underlyingData.length) {
- this.push(null);
- } else {
- this.push(underlyingData.shift());
- }
- });
- };
-
- r.on("readable", () => {
- const data = r.read();
- if (data !== null) result.push(data);
- });
-
- r.on("end", () => {
- endExecuted++;
- if (endExecuted == endExecutedExpected) {
- endExpectedExecutions.resolve();
- }
- assertEquals(result, expected);
- });
-
- const endTimeout = setTimeout(
- () => endExpectedExecutions.reject(),
- 1000,
- );
- await endExpectedExecutions;
- clearTimeout(endTimeout);
- assertEquals(endExecuted, endExecutedExpected);
-});
-
-Deno.test("Duplex stream: listeners can be removed", () => {
- const r = new Duplex();
- r._read = () => {};
- r.on("data", () => {});
-
- r.removeAllListeners("data");
-
- assertEquals(r.eventNames().length, 0);
-});
-
-Deno.test("Duplex stream writes correctly", async () => {
- let callback: undefined | ((error?: Error | null | undefined) => void);
-
- let writeExecuted = 0;
- const writeExecutedExpected = 1;
- const writeExpectedExecutions = deferred();
-
- let writevExecuted = 0;
- const writevExecutedExpected = 1;
- const writevExpectedExecutions = deferred();
-
- const writable = new Duplex({
- write: (chunk, encoding, cb) => {
- writeExecuted++;
- if (writeExecuted == writeExecutedExpected) {
- writeExpectedExecutions.resolve();
- }
- assert(chunk instanceof Buffer);
- assertStrictEquals(encoding, "buffer");
- assertStrictEquals(String(chunk), "ABC");
- callback = cb;
- },
- writev: (chunks) => {
- writevExecuted++;
- if (writevExecuted == writevExecutedExpected) {
- writevExpectedExecutions.resolve();
- }
- assertStrictEquals(chunks.length, 2);
- assertStrictEquals(chunks[0].encoding, "buffer");
- assertStrictEquals(chunks[1].encoding, "buffer");
- assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI");
- },
- });
-
- writable.write(new TextEncoder().encode("ABC"));
- writable.write(new TextEncoder().encode("DEF"));
- writable.end(new TextEncoder().encode("GHI"));
- callback?.();
-
- const writeTimeout = setTimeout(
- () => writeExpectedExecutions.reject(),
- 1000,
- );
- const writevTimeout = setTimeout(
- () => writevExpectedExecutions.reject(),
- 1000,
- );
- await writeExpectedExecutions;
- await writevExpectedExecutions;
- clearTimeout(writeTimeout);
- clearTimeout(writevTimeout);
- assertEquals(writeExecuted, writeExecutedExpected);
- assertEquals(writevExecuted, writevExecutedExpected);
-});
-
-Deno.test("Duplex stream writes Uint8Array in object mode", async () => {
- let writeExecuted = 0;
- const writeExecutedExpected = 1;
- const writeExpectedExecutions = deferred();
-
- const ABC = new TextEncoder().encode("ABC");
-
- const writable = new Duplex({
- objectMode: true,
- write: (chunk, encoding, cb) => {
- writeExecuted++;
- if (writeExecuted == writeExecutedExpected) {
- writeExpectedExecutions.resolve();
- }
- assert(!(chunk instanceof Buffer));
- assert(chunk instanceof Uint8Array);
- assertEquals(chunk, ABC);
- assertEquals(encoding, "utf8");
- cb();
- },
- });
-
- writable.end(ABC);
-
- const writeTimeout = setTimeout(
- () => writeExpectedExecutions.reject(),
- 1000,
- );
- await writeExpectedExecutions;
- clearTimeout(writeTimeout);
- assertEquals(writeExecuted, writeExecutedExpected);
-});
-
-Deno.test("Duplex stream throws on unexpected close", async () => {
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const writable = new Duplex({
- write: () => {},
- });
- writable.writable = false;
- writable.destroy();
-
- finished(writable, (err) => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE");
- });
-
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- clearTimeout(finishedTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
-});
-
-Deno.test("Duplex stream finishes correctly after error", async () => {
- let errorExecuted = 0;
- const errorExecutedExpected = 1;
- const errorExpectedExecutions = deferred();
-
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const w = new Duplex({
- write(_chunk, _encoding, cb) {
- cb(new Error());
- },
- autoDestroy: false,
- });
- w.write("asd");
- w.on("error", () => {
- errorExecuted++;
- if (errorExecuted == errorExecutedExpected) {
- errorExpectedExecutions.resolve();
- }
- finished(w, () => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- });
- });
-
- const errorTimeout = setTimeout(
- () => errorExpectedExecutions.reject(),
- 1000,
- );
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- await errorExpectedExecutions;
- clearTimeout(finishedTimeout);
- clearTimeout(errorTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
- assertEquals(errorExecuted, errorExecutedExpected);
-});
-
-Deno.test("Duplex stream fails on 'write' null value", () => {
- const writable = new Duplex();
- assertThrows(() => writable.write(null));
-});
-
-Deno.test("Duplex stream is destroyed correctly", async () => {
- let closeExecuted = 0;
- const closeExecutedExpected = 1;
- const closeExpectedExecutions = deferred();
-
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- write(_chunk, _enc, cb) {
- cb();
- },
- read() {},
- });
-
- duplex.resume();
-
- function never() {
- unexpectedExecution.reject();
- }
-
- duplex.on("end", never);
- duplex.on("finish", never);
- duplex.on("close", () => {
- closeExecuted++;
- if (closeExecuted == closeExecutedExpected) {
- closeExpectedExecutions.resolve();
- }
- });
-
- duplex.destroy();
- assertEquals(duplex.destroyed, true);
-
- const closeTimeout = setTimeout(
- () => closeExpectedExecutions.reject(),
- 1000,
- );
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
- await closeExpectedExecutions;
- clearTimeout(closeTimeout);
- assertEquals(closeExecuted, closeExecutedExpected);
-});
-
-Deno.test("Duplex stream errors correctly on destroy", async () => {
- let errorExecuted = 0;
- const errorExecutedExpected = 1;
- const errorExpectedExecutions = deferred();
-
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- write(_chunk, _enc, cb) {
- cb();
- },
- read() {},
- });
- duplex.resume();
-
- const expected = new Error("kaboom");
-
- function never() {
- unexpectedExecution.reject();
- }
-
- duplex.on("end", never);
- duplex.on("finish", never);
- duplex.on("error", (err) => {
- errorExecuted++;
- if (errorExecuted == errorExecutedExpected) {
- errorExpectedExecutions.resolve();
- }
- assertStrictEquals(err, expected);
- });
-
- duplex.destroy(expected);
- assertEquals(duplex.destroyed, true);
-
- const errorTimeout = setTimeout(
- () => errorExpectedExecutions.reject(),
- 1000,
- );
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
- await errorExpectedExecutions;
- clearTimeout(errorTimeout);
- assertEquals(errorExecuted, errorExecutedExpected);
-});
-
-Deno.test("Duplex stream doesn't finish on allowHalfOpen", async () => {
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- read() {},
- });
-
- assertEquals(duplex.allowHalfOpen, true);
- duplex.on("finish", () => unexpectedExecution.reject());
- assertEquals(duplex.listenerCount("end"), 0);
- duplex.resume();
- duplex.push(null);
-
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
-});
-
-Deno.test("Duplex stream finishes when allowHalfOpen is disabled", async () => {
- let finishExecuted = 0;
- const finishExecutedExpected = 1;
- const finishExpectedExecutions = deferred();
-
- const duplex = new Duplex({
- read() {},
- allowHalfOpen: false,
- });
-
- assertEquals(duplex.allowHalfOpen, false);
- duplex.on("finish", () => {
- finishExecuted++;
- if (finishExecuted == finishExecutedExpected) {
- finishExpectedExecutions.resolve();
- }
- });
- assertEquals(duplex.listenerCount("end"), 0);
- duplex.resume();
- duplex.push(null);
-
- const finishTimeout = setTimeout(
- () => finishExpectedExecutions.reject(),
- 1000,
- );
- await finishExpectedExecutions;
- clearTimeout(finishTimeout);
- assertEquals(finishExecuted, finishExecutedExpected);
-});
-
-Deno.test("Duplex stream doesn't finish when allowHalfOpen is disabled but stream ended", async () => {
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- read() {},
- allowHalfOpen: false,
- });
-
- assertEquals(duplex.allowHalfOpen, false);
- duplex._writableState.ended = true;
- duplex.on("finish", () => unexpectedExecution.reject());
- assertEquals(duplex.listenerCount("end"), 0);
- duplex.resume();
- duplex.push(null);
-
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
-});
diff --git a/std/node/_stream/end_of_stream.ts b/std/node/_stream/end_of_stream.ts
deleted file mode 100644
index 6179e7fc4..000000000
--- a/std/node/_stream/end_of_stream.ts
+++ /dev/null
@@ -1,241 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { once } from "../_utils.ts";
-import type Duplex from "./duplex.ts";
-import type Readable from "./readable.ts";
-import type Stream from "./stream.ts";
-import type { ReadableState } from "./readable.ts";
-import type Writable from "./writable.ts";
-import type { WritableState } from "./writable.ts";
-import {
- ERR_INVALID_ARG_TYPE,
- ERR_STREAM_PREMATURE_CLOSE,
- NodeErrorAbstraction,
-} from "../_errors.ts";
-
-export type StreamImplementations = Duplex | Readable | Stream | Writable;
-
-// TODO(Soremwar)
-// Bring back once requests are implemented
-// function isRequest(stream: Stream) {
-// return stream.setHeader && typeof stream.abort === "function";
-// }
-
-// deno-lint-ignore no-explicit-any
-function isReadable(stream: any) {
- return typeof stream.readable === "boolean" ||
- typeof stream.readableEnded === "boolean" ||
- !!stream._readableState;
-}
-
-// deno-lint-ignore no-explicit-any
-function isWritable(stream: any) {
- return typeof stream.writable === "boolean" ||
- typeof stream.writableEnded === "boolean" ||
- !!stream._writableState;
-}
-
-function isWritableFinished(stream: Writable) {
- if (stream.writableFinished) return true;
- const wState = stream._writableState;
- if (!wState || wState.errored) return false;
- return wState.finished || (wState.ended && wState.length === 0);
-}
-
-function nop() {}
-
-function isReadableEnded(stream: Readable) {
- if (stream.readableEnded) return true;
- const rState = stream._readableState;
- if (!rState || rState.errored) return false;
- return rState.endEmitted || (rState.ended && rState.length === 0);
-}
-
-export interface FinishedOptions {
- error?: boolean;
- readable?: boolean;
- writable?: boolean;
-}
-
-/**
- * Appends an ending callback triggered when a stream is no longer readable,
- * writable or has experienced an error or a premature close event
-*/
-export default function eos(
- stream: StreamImplementations,
- options: FinishedOptions | null,
- callback: (err?: NodeErrorAbstraction | null) => void,
-): () => void;
-export default function eos(
- stream: StreamImplementations,
- callback: (err?: NodeErrorAbstraction | null) => void,
-): () => void;
-export default function eos(
- stream: StreamImplementations,
- x: FinishedOptions | ((err?: NodeErrorAbstraction | null) => void) | null,
- y?: (err?: NodeErrorAbstraction | null) => void,
-) {
- let opts: FinishedOptions;
- let callback: (err?: NodeErrorAbstraction | null) => void;
-
- if (!y) {
- if (typeof x !== "function") {
- throw new ERR_INVALID_ARG_TYPE("callback", "function", x);
- }
- opts = {};
- callback = x;
- } else {
- if (!x || Array.isArray(x) || typeof x !== "object") {
- throw new ERR_INVALID_ARG_TYPE("opts", "object", x);
- }
- opts = x;
-
- if (typeof y !== "function") {
- throw new ERR_INVALID_ARG_TYPE("callback", "function", y);
- }
- callback = y;
- }
-
- callback = once(callback);
-
- const readable = opts.readable ?? isReadable(stream);
- const writable = opts.writable ?? isWritable(stream);
-
- // deno-lint-ignore no-explicit-any
- const wState: WritableState | undefined = (stream as any)._writableState;
- // deno-lint-ignore no-explicit-any
- const rState: ReadableState | undefined = (stream as any)._readableState;
- const validState = wState || rState;
-
- const onlegacyfinish = () => {
- if (!(stream as Writable).writable) {
- onfinish();
- }
- };
-
- let willEmitClose = (
- validState?.autoDestroy &&
- validState?.emitClose &&
- validState?.closed === false &&
- isReadable(stream) === readable &&
- isWritable(stream) === writable
- );
-
- let writableFinished = (stream as Writable).writableFinished ||
- wState?.finished;
- const onfinish = () => {
- writableFinished = true;
- // deno-lint-ignore no-explicit-any
- if ((stream as any).destroyed) {
- willEmitClose = false;
- }
-
- if (willEmitClose && (!(stream as Readable).readable || readable)) {
- return;
- }
- if (!readable || readableEnded) {
- callback.call(stream);
- }
- };
-
- let readableEnded = (stream as Readable).readableEnded || rState?.endEmitted;
- const onend = () => {
- readableEnded = true;
- // deno-lint-ignore no-explicit-any
- if ((stream as any).destroyed) {
- willEmitClose = false;
- }
-
- if (willEmitClose && (!(stream as Writable).writable || writable)) {
- return;
- }
- if (!writable || writableFinished) {
- callback.call(stream);
- }
- };
-
- const onerror = (err: NodeErrorAbstraction) => {
- callback.call(stream, err);
- };
-
- const onclose = () => {
- if (readable && !readableEnded) {
- if (!isReadableEnded(stream as Readable)) {
- return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
- }
- }
- if (writable && !writableFinished) {
- if (!isWritableFinished(stream as Writable)) {
- return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
- }
- }
- callback.call(stream);
- };
-
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // const onrequest = () => {
- // stream.req.on("finish", onfinish);
- // };
-
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // if (isRequest(stream)) {
- // stream.on("complete", onfinish);
- // stream.on("abort", onclose);
- // if (stream.req) {
- // onrequest();
- // } else {
- // stream.on("request", onrequest);
- // }
- // } else
- if (writable && !wState) {
- stream.on("end", onlegacyfinish);
- stream.on("close", onlegacyfinish);
- }
-
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // if (typeof stream.aborted === "boolean") {
- // stream.on("aborted", onclose);
- // }
-
- stream.on("end", onend);
- stream.on("finish", onfinish);
- if (opts.error !== false) stream.on("error", onerror);
- stream.on("close", onclose);
-
- const closed = (
- wState?.closed ||
- rState?.closed ||
- wState?.errorEmitted ||
- rState?.errorEmitted ||
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // (rState && stream.req && stream.aborted) ||
- (
- (!writable || wState?.finished) &&
- (!readable || rState?.endEmitted)
- )
- );
-
- if (closed) {
- queueMicrotask(callback);
- }
-
- return function () {
- callback = nop;
- stream.removeListener("aborted", onclose);
- stream.removeListener("complete", onfinish);
- stream.removeListener("abort", onclose);
- // TODO(Soremwar)
- // Bring back once requests are implemented
- // stream.removeListener("request", onrequest);
- // if (stream.req) stream.req.removeListener("finish", onfinish);
- stream.removeListener("end", onlegacyfinish);
- stream.removeListener("close", onlegacyfinish);
- stream.removeListener("finish", onfinish);
- stream.removeListener("end", onend);
- stream.removeListener("error", onerror);
- stream.removeListener("close", onclose);
- };
-}
diff --git a/std/node/_stream/end_of_stream_test.ts b/std/node/_stream/end_of_stream_test.ts
deleted file mode 100644
index 571e75b99..000000000
--- a/std/node/_stream/end_of_stream_test.ts
+++ /dev/null
@@ -1,97 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import finished from "./end_of_stream.ts";
-import Readable from "./readable.ts";
-import Transform from "./transform.ts";
-import Writable from "./writable.ts";
-import { mustCall } from "../_utils.ts";
-import { assert, fail } from "../../testing/asserts.ts";
-import { deferred, delay } from "../../async/mod.ts";
-
-Deno.test("Finished appends to Readable correctly", async () => {
- const rs = new Readable({
- read() {},
- });
-
- const [finishedExecution, finishedCb] = mustCall((err) => {
- assert(!err);
- });
-
- finished(rs, finishedCb);
-
- rs.push(null);
- rs.resume();
-
- await finishedExecution;
-});
-
-Deno.test("Finished appends to Writable correctly", async () => {
- const ws = new Writable({
- write(_data, _enc, cb) {
- cb();
- },
- });
-
- const [finishedExecution, finishedCb] = mustCall((err) => {
- assert(!err);
- });
-
- finished(ws, finishedCb);
-
- ws.end();
-
- await finishedExecution;
-});
-
-Deno.test("Finished appends to Transform correctly", async () => {
- const tr = new Transform({
- transform(_data, _enc, cb) {
- cb();
- },
- });
-
- let finish = false;
- let ended = false;
-
- tr.on("end", () => {
- ended = true;
- });
-
- tr.on("finish", () => {
- finish = true;
- });
-
- const [finishedExecution, finishedCb] = mustCall((err) => {
- assert(!err);
- assert(finish);
- assert(ended);
- });
-
- finished(tr, finishedCb);
-
- tr.end();
- tr.resume();
-
- await finishedExecution;
-});
-
-Deno.test("The function returned by Finished clears the listeners", async () => {
- const finishedExecution = deferred();
-
- const ws = new Writable({
- write(_data, _env, cb) {
- cb();
- },
- });
-
- const removeListener = finished(ws, () => {
- finishedExecution.reject();
- });
- removeListener();
- ws.end();
-
- await Promise.race([
- delay(100),
- finishedExecution,
- ])
- .catch(() => fail("Finished was executed"));
-});
diff --git a/std/node/_stream/from.ts b/std/node/_stream/from.ts
deleted file mode 100644
index 652c17715..000000000
--- a/std/node/_stream/from.ts
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Readable from "./readable.ts";
-import type { ReadableOptions } from "./readable.ts";
-import { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } from "../_errors.ts";
-
-export default function from(
- // deno-lint-ignore no-explicit-any
- iterable: Iterable<any> | AsyncIterable<any>,
- opts?: ReadableOptions,
-) {
- let iterator:
- // deno-lint-ignore no-explicit-any
- | Iterator<any, any, undefined>
- // deno-lint-ignore no-explicit-any
- | AsyncIterator<any, any, undefined>;
- if (typeof iterable === "string" || iterable instanceof Buffer) {
- return new Readable({
- objectMode: true,
- ...opts,
- read() {
- this.push(iterable);
- this.push(null);
- },
- });
- }
-
- if (Symbol.asyncIterator in iterable) {
- // deno-lint-ignore no-explicit-any
- iterator = (iterable as AsyncIterable<any>)[Symbol.asyncIterator]();
- } else if (Symbol.iterator in iterable) {
- // deno-lint-ignore no-explicit-any
- iterator = (iterable as Iterable<any>)[Symbol.iterator]();
- } else {
- throw new ERR_INVALID_ARG_TYPE("iterable", ["Iterable"], iterable);
- }
-
- const readable = new Readable({
- objectMode: true,
- highWaterMark: 1,
- ...opts,
- });
-
- // Reading boolean to protect against _read
- // being called before last iteration completion.
- let reading = false;
-
- // needToClose boolean if iterator needs to be explicitly closed
- let needToClose = false;
-
- readable._read = function () {
- if (!reading) {
- reading = true;
- next();
- }
- };
-
- readable._destroy = function (error, cb) {
- if (needToClose) {
- needToClose = false;
- close().then(
- () => queueMicrotask(() => cb(error)),
- (e) => queueMicrotask(() => cb(error || e)),
- );
- } else {
- cb(error);
- }
- };
-
- async function close() {
- if (typeof iterator.return === "function") {
- const { value } = await iterator.return();
- await value;
- }
- }
-
- async function next() {
- try {
- needToClose = false;
- const { value, done } = await iterator.next();
- needToClose = !done;
- if (done) {
- readable.push(null);
- } else if (readable.destroyed) {
- await close();
- } else {
- const res = await value;
- if (res === null) {
- reading = false;
- throw new ERR_STREAM_NULL_VALUES();
- } else if (readable.push(res)) {
- next();
- } else {
- reading = false;
- }
- }
- } catch (err) {
- readable.destroy(err);
- }
- }
- return readable;
-}
diff --git a/std/node/_stream/passthrough.ts b/std/node/_stream/passthrough.ts
deleted file mode 100644
index 9126420e5..000000000
--- a/std/node/_stream/passthrough.ts
+++ /dev/null
@@ -1,20 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import Transform from "./transform.ts";
-import type { TransformOptions } from "./transform.ts";
-import type { Encodings } from "../_utils.ts";
-
-export default class PassThrough extends Transform {
- constructor(options?: TransformOptions) {
- super(options);
- }
-
- _transform(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- _encoding: Encodings,
- // deno-lint-ignore no-explicit-any
- cb: (error?: Error | null, data?: any) => void,
- ) {
- cb(null, chunk);
- }
-}
diff --git a/std/node/_stream/pipeline.ts b/std/node/_stream/pipeline.ts
deleted file mode 100644
index d02a92870..000000000
--- a/std/node/_stream/pipeline.ts
+++ /dev/null
@@ -1,308 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { once } from "../_utils.ts";
-import { destroyer as implDestroyer } from "./destroy.ts";
-import eos from "./end_of_stream.ts";
-import createReadableStreamAsyncIterator from "./async_iterator.ts";
-import * as events from "../events.ts";
-import PassThrough from "./passthrough.ts";
-import {
- ERR_INVALID_ARG_TYPE,
- ERR_INVALID_CALLBACK,
- ERR_INVALID_RETURN_VALUE,
- ERR_MISSING_ARGS,
- ERR_STREAM_DESTROYED,
- NodeErrorAbstraction,
-} from "../_errors.ts";
-import type Duplex from "./duplex.ts";
-import type Readable from "./readable.ts";
-import type Stream from "./stream.ts";
-import type Transform from "./transform.ts";
-import type Writable from "./writable.ts";
-
-type Streams = Duplex | Readable | Writable;
-// deno-lint-ignore no-explicit-any
-type EndCallback = (err?: NodeErrorAbstraction | null, val?: any) => void;
-type TransformCallback =
- // deno-lint-ignore no-explicit-any
- | ((value?: any) => AsyncGenerator<any>)
- // deno-lint-ignore no-explicit-any
- | ((value?: any) => Promise<any>);
-/**
- * This type represents an array that contains a data source,
- * many Transform Streams, a writable stream destination
- * and end in an optional callback
- * */
-type DataSource =
- // deno-lint-ignore no-explicit-any
- | (() => AsyncGenerator<any>)
- | // deno-lint-ignore no-explicit-any
- AsyncIterable<any>
- | Duplex
- | // deno-lint-ignore no-explicit-any
- Iterable<any>
- | // deno-lint-ignore no-explicit-any
- (() => Generator<any>)
- | Readable;
-type Transformers = Duplex | Transform | TransformCallback | Writable;
-export type PipelineArguments = [
- DataSource,
- ...Array<Transformers | EndCallback>,
-];
-
-function destroyer(
- stream: Streams,
- reading: boolean,
- writing: boolean,
- callback: EndCallback,
-) {
- callback = once(callback);
-
- let finished = false;
- stream.on("close", () => {
- finished = true;
- });
-
- eos(stream, { readable: reading, writable: writing }, (err) => {
- finished = !err;
-
- // deno-lint-ignore no-explicit-any
- const rState = (stream as any)?._readableState;
- if (
- err &&
- err.code === "ERR_STREAM_PREMATURE_CLOSE" &&
- reading &&
- (rState?.ended && !rState?.errored && !rState?.errorEmitted)
- ) {
- stream
- .once("end", callback)
- .once("error", callback);
- } else {
- callback(err);
- }
- });
-
- return (err: NodeErrorAbstraction) => {
- if (finished) return;
- finished = true;
- implDestroyer(stream, err);
- callback(err || new ERR_STREAM_DESTROYED("pipe"));
- };
-}
-
-function popCallback(streams: PipelineArguments): EndCallback {
- if (typeof streams[streams.length - 1] !== "function") {
- throw new ERR_INVALID_CALLBACK(streams[streams.length - 1]);
- }
- return streams.pop() as EndCallback;
-}
-
-// function isPromise(obj) {
-// return !!(obj && typeof obj.then === "function");
-// }
-
-// deno-lint-ignore no-explicit-any
-function isReadable(obj: any): obj is Stream {
- return !!(obj && typeof obj.pipe === "function");
-}
-
-// deno-lint-ignore no-explicit-any
-function isWritable(obj: any) {
- return !!(obj && typeof obj.write === "function");
-}
-
-// deno-lint-ignore no-explicit-any
-function isStream(obj: any) {
- return isReadable(obj) || isWritable(obj);
-}
-
-// deno-lint-ignore no-explicit-any
-function isIterable(obj: any, isAsync?: boolean) {
- if (!obj) return false;
- if (isAsync === true) return typeof obj[Symbol.asyncIterator] === "function";
- if (isAsync === false) return typeof obj[Symbol.iterator] === "function";
- return typeof obj[Symbol.asyncIterator] === "function" ||
- typeof obj[Symbol.iterator] === "function";
-}
-
-// deno-lint-ignore no-explicit-any
-function makeAsyncIterable(val: Readable | Iterable<any> | AsyncIterable<any>) {
- if (isIterable(val)) {
- return val;
- } else if (isReadable(val)) {
- return fromReadable(val as Readable);
- }
- throw new ERR_INVALID_ARG_TYPE(
- "val",
- ["Readable", "Iterable", "AsyncIterable"],
- val,
- );
-}
-
-async function* fromReadable(val: Readable) {
- yield* createReadableStreamAsyncIterator(val);
-}
-
-async function pump(
- // deno-lint-ignore no-explicit-any
- iterable: Iterable<any>,
- writable: Duplex | Writable,
- finish: (err?: NodeErrorAbstraction | null) => void,
-) {
- let error;
- try {
- for await (const chunk of iterable) {
- if (!writable.write(chunk)) {
- if (writable.destroyed) return;
- await events.once(writable, "drain");
- }
- }
- writable.end();
- } catch (err) {
- error = err;
- } finally {
- finish(error);
- }
-}
-
-export default function pipeline(...args: PipelineArguments) {
- const callback: EndCallback = once(popCallback(args));
-
- let streams: [DataSource, ...Transformers[]];
- if (args.length > 1) {
- streams = args as [DataSource, ...Transformers[]];
- } else {
- throw new ERR_MISSING_ARGS("streams");
- }
-
- let error: NodeErrorAbstraction;
- // deno-lint-ignore no-explicit-any
- let value: any;
- const destroys: Array<(err: NodeErrorAbstraction) => void> = [];
-
- let finishCount = 0;
-
- function finish(err?: NodeErrorAbstraction | null) {
- const final = --finishCount === 0;
-
- if (err && (!error || error.code === "ERR_STREAM_PREMATURE_CLOSE")) {
- error = err;
- }
-
- if (!error && !final) {
- return;
- }
-
- while (destroys.length) {
- (destroys.shift() as (err: NodeErrorAbstraction) => void)(error);
- }
-
- if (final) {
- callback(error, value);
- }
- }
-
- // TODO(Soremwar)
- // Simplify the hell out of this
- // deno-lint-ignore no-explicit-any
- let ret: any;
- for (let i = 0; i < streams.length; i++) {
- const stream = streams[i];
- const reading = i < streams.length - 1;
- const writing = i > 0;
-
- if (isStream(stream)) {
- finishCount++;
- destroys.push(destroyer(stream as Streams, reading, writing, finish));
- }
-
- if (i === 0) {
- if (typeof stream === "function") {
- ret = stream();
- if (!isIterable(ret)) {
- throw new ERR_INVALID_RETURN_VALUE(
- "Iterable, AsyncIterable or Stream",
- "source",
- ret,
- );
- }
- } else if (isIterable(stream) || isReadable(stream)) {
- ret = stream;
- } else {
- throw new ERR_INVALID_ARG_TYPE(
- "source",
- ["Stream", "Iterable", "AsyncIterable", "Function"],
- stream,
- );
- }
- } else if (typeof stream === "function") {
- ret = makeAsyncIterable(ret);
- ret = stream(ret);
-
- if (reading) {
- if (!isIterable(ret, true)) {
- throw new ERR_INVALID_RETURN_VALUE(
- "AsyncIterable",
- `transform[${i - 1}]`,
- ret,
- );
- }
- } else {
- // If the last argument to pipeline is not a stream
- // we must create a proxy stream so that pipeline(...)
- // always returns a stream which can be further
- // composed through `.pipe(stream)`.
- const pt = new PassThrough({
- objectMode: true,
- });
- if (ret instanceof Promise) {
- ret
- .then((val) => {
- value = val;
- pt.end(val);
- }, (err) => {
- pt.destroy(err);
- });
- } else if (isIterable(ret, true)) {
- finishCount++;
- pump(ret, pt, finish);
- } else {
- throw new ERR_INVALID_RETURN_VALUE(
- "AsyncIterable or Promise",
- "destination",
- ret,
- );
- }
-
- ret = pt;
-
- finishCount++;
- destroys.push(destroyer(ret, false, true, finish));
- }
- } else if (isStream(stream)) {
- if (isReadable(ret)) {
- ret.pipe(stream as Readable);
-
- // TODO(Soremwar)
- // Reimplement after stdout and stderr are implemented
- // if (stream === process.stdout || stream === process.stderr) {
- // ret.on("end", () => stream.end());
- // }
- } else {
- ret = makeAsyncIterable(ret);
-
- finishCount++;
- pump(ret, stream as Writable, finish);
- }
- ret = stream;
- } else {
- const name = reading ? `transform[${i - 1}]` : "destination";
- throw new ERR_INVALID_ARG_TYPE(
- name,
- ["Stream", "Function"],
- ret,
- );
- }
- }
-
- return ret as unknown as Readable;
-}
diff --git a/std/node/_stream/pipeline_test.ts b/std/node/_stream/pipeline_test.ts
deleted file mode 100644
index aa1869416..000000000
--- a/std/node/_stream/pipeline_test.ts
+++ /dev/null
@@ -1,387 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import PassThrough from "./passthrough.ts";
-import pipeline from "./pipeline.ts";
-import Readable from "./readable.ts";
-import Transform from "./transform.ts";
-import Writable from "./writable.ts";
-import { mustCall } from "../_utils.ts";
-import {
- assert,
- assertEquals,
- assertStrictEquals,
-} from "../../testing/asserts.ts";
-import type { NodeErrorAbstraction } from "../_errors.ts";
-
-Deno.test("Pipeline ends on stream finished", async () => {
- let finished = false;
-
- // deno-lint-ignore no-explicit-any
- const processed: any[] = [];
- const expected = [
- Buffer.from("a"),
- Buffer.from("b"),
- Buffer.from("c"),
- ];
-
- const read = new Readable({
- read() {},
- });
-
- const write = new Writable({
- write(data, _enc, cb) {
- processed.push(data);
- cb();
- },
- });
-
- write.on("finish", () => {
- finished = true;
- });
-
- for (let i = 0; i < expected.length; i++) {
- read.push(expected[i]);
- }
- read.push(null);
-
- const [finishedCompleted, finishedCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assert(finished);
- assertEquals(processed, expected);
- },
- 1,
- );
-
- pipeline(read, write, finishedCb);
-
- await finishedCompleted;
-});
-
-Deno.test("Pipeline fails on stream destroyed", async () => {
- const read = new Readable({
- read() {},
- });
-
- const write = new Writable({
- write(_data, _enc, cb) {
- cb();
- },
- });
-
- read.push("data");
- queueMicrotask(() => read.destroy());
-
- const [pipelineExecuted, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(err);
- },
- 1,
- );
- pipeline(read, write, pipelineCb);
-
- await pipelineExecuted;
-});
-
-Deno.test("Pipeline exits on stream error", async () => {
- const read = new Readable({
- read() {},
- });
-
- const transform = new Transform({
- transform(_data, _enc, cb) {
- cb(new Error("kaboom"));
- },
- });
-
- const write = new Writable({
- write(_data, _enc, cb) {
- cb();
- },
- });
-
- const [readExecution, readCb] = mustCall();
- read.on("close", readCb);
- const [closeExecution, closeCb] = mustCall();
- transform.on("close", closeCb);
- const [writeExecution, writeCb] = mustCall();
- write.on("close", writeCb);
-
- const errorExecutions = [read, transform, write]
- .map((stream) => {
- const [execution, cb] = mustCall((err?: NodeErrorAbstraction | null) => {
- assertEquals(err, new Error("kaboom"));
- });
-
- stream.on("error", cb);
- return execution;
- });
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assertEquals(err, new Error("kaboom"));
- },
- );
- const dst = pipeline(read, transform, write, pipelineCb);
-
- assertStrictEquals(dst, write);
-
- read.push("hello");
-
- await readExecution;
- await closeExecution;
- await writeExecution;
- await Promise.all(errorExecutions);
- await pipelineExecution;
-});
-
-Deno.test("Pipeline processes iterators correctly", async () => {
- let res = "";
- const w = new Writable({
- write(chunk, _encoding, callback) {
- res += chunk;
- callback();
- },
- });
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assertEquals(res, "helloworld");
- },
- );
- pipeline(
- function* () {
- yield "hello";
- yield "world";
- }(),
- w,
- pipelineCb,
- );
-
- await pipelineExecution;
-});
-
-Deno.test("Pipeline processes async iterators correctly", async () => {
- let res = "";
- const w = new Writable({
- write(chunk, _encoding, callback) {
- res += chunk;
- callback();
- },
- });
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assertEquals(res, "helloworld");
- },
- );
- pipeline(
- async function* () {
- await Promise.resolve();
- yield "hello";
- yield "world";
- }(),
- w,
- pipelineCb,
- );
-
- await pipelineExecution;
-});
-
-Deno.test("Pipeline processes generators correctly", async () => {
- let res = "";
- const w = new Writable({
- write(chunk, _encoding, callback) {
- res += chunk;
- callback();
- },
- });
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assertEquals(res, "helloworld");
- },
- );
- pipeline(
- function* () {
- yield "hello";
- yield "world";
- },
- w,
- pipelineCb,
- );
-
- await pipelineExecution;
-});
-
-Deno.test("Pipeline processes async generators correctly", async () => {
- let res = "";
- const w = new Writable({
- write(chunk, _encoding, callback) {
- res += chunk;
- callback();
- },
- });
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assertEquals(res, "helloworld");
- },
- );
- pipeline(
- async function* () {
- await Promise.resolve();
- yield "hello";
- yield "world";
- },
- w,
- pipelineCb,
- );
-
- await pipelineExecution;
-});
-
-Deno.test("Pipeline handles generator transforms", async () => {
- let res = "";
-
- const [pipelineExecuted, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assert(!err);
- assertEquals(res, "HELLOWORLD");
- },
- );
- pipeline(
- async function* () {
- await Promise.resolve();
- yield "hello";
- yield "world";
- },
- async function* (source: string[]) {
- for await (const chunk of source) {
- yield chunk.toUpperCase();
- }
- },
- async function (source: string[]) {
- for await (const chunk of source) {
- res += chunk;
- }
- },
- pipelineCb,
- );
-
- await pipelineExecuted;
-});
-
-Deno.test("Pipeline passes result to final callback", async () => {
- const [pipelineExecuted, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null, val?: unknown) => {
- assert(!err);
- assertEquals(val, "HELLOWORLD");
- },
- );
- pipeline(
- async function* () {
- await Promise.resolve();
- yield "hello";
- yield "world";
- },
- async function* (source: string[]) {
- for await (const chunk of source) {
- yield chunk.toUpperCase();
- }
- },
- async function (source: string[]) {
- let ret = "";
- for await (const chunk of source) {
- ret += chunk;
- }
- return ret;
- },
- pipelineCb,
- );
-
- await pipelineExecuted;
-});
-
-Deno.test("Pipeline returns a stream after ending", async () => {
- const [pipelineExecuted, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assertEquals(err, undefined);
- },
- );
- const ret = pipeline(
- async function* () {
- await Promise.resolve();
- yield "hello";
- },
- // deno-lint-ignore require-yield
- async function* (source: string[]) {
- for await (const chunk of source) {
- chunk;
- }
- },
- pipelineCb,
- );
-
- ret.resume();
-
- assertEquals(typeof ret.pipe, "function");
-
- await pipelineExecuted;
-});
-
-Deno.test("Pipeline returns a stream after erroring", async () => {
- const errorText = "kaboom";
-
- const [pipelineExecuted, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assertEquals(err?.message, errorText);
- },
- );
- const ret = pipeline(
- // deno-lint-ignore require-yield
- async function* () {
- await Promise.resolve();
- throw new Error(errorText);
- },
- // deno-lint-ignore require-yield
- async function* (source: string[]) {
- for await (const chunk of source) {
- chunk;
- }
- },
- pipelineCb,
- );
-
- ret.resume();
-
- assertEquals(typeof ret.pipe, "function");
-
- await pipelineExecuted;
-});
-
-Deno.test("Pipeline destination gets destroyed on error", async () => {
- const errorText = "kaboom";
- const s = new PassThrough();
-
- const [pipelineExecution, pipelineCb] = mustCall(
- (err?: NodeErrorAbstraction | null) => {
- assertEquals(err?.message, errorText);
- assertEquals(s.destroyed, true);
- },
- );
- pipeline(
- // deno-lint-ignore require-yield
- async function* () {
- throw new Error(errorText);
- },
- s,
- pipelineCb,
- );
-
- await pipelineExecution;
-});
diff --git a/std/node/_stream/promises.ts b/std/node/_stream/promises.ts
deleted file mode 100644
index 1adf4ea3f..000000000
--- a/std/node/_stream/promises.ts
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import pl from "./pipeline.ts";
-import type { PipelineArguments } from "./pipeline.ts";
-import eos from "./end_of_stream.ts";
-import type {
- FinishedOptions,
- StreamImplementations as FinishedStreams,
-} from "./end_of_stream.ts";
-
-export function pipeline(...streams: PipelineArguments) {
- return new Promise((resolve, reject) => {
- pl(
- ...streams,
- (err, value) => {
- if (err) {
- reject(err);
- } else {
- resolve(value);
- }
- },
- );
- });
-}
-
-export function finished(
- stream: FinishedStreams,
- opts?: FinishedOptions,
-) {
- return new Promise<void>((resolve, reject) => {
- eos(
- stream,
- opts || null,
- (err) => {
- if (err) {
- reject(err);
- } else {
- resolve();
- }
- },
- );
- });
-}
diff --git a/std/node/_stream/promises_test.ts b/std/node/_stream/promises_test.ts
deleted file mode 100644
index 90803b4af..000000000
--- a/std/node/_stream/promises_test.ts
+++ /dev/null
@@ -1,84 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Readable from "./readable.ts";
-import Writable from "./writable.ts";
-import { pipeline } from "./promises.ts";
-import { deferred } from "../../async/mod.ts";
-import {
- assert,
- assertEquals,
- assertThrowsAsync,
-} from "../../testing/asserts.ts";
-
-Deno.test("Promise pipeline works correctly", async () => {
- let pipelineExecuted = 0;
- const pipelineExecutedExpected = 1;
- const pipelineExpectedExecutions = deferred();
-
- let finished = false;
- // deno-lint-ignore no-explicit-any
- const processed: any[] = [];
- const expected = [
- Buffer.from("a"),
- Buffer.from("b"),
- Buffer.from("c"),
- ];
-
- const read = new Readable({
- read() {},
- });
-
- const write = new Writable({
- write(data, _enc, cb) {
- processed.push(data);
- cb();
- },
- });
-
- write.on("finish", () => {
- finished = true;
- });
-
- for (let i = 0; i < expected.length; i++) {
- read.push(expected[i]);
- }
- read.push(null);
-
- pipeline(read, write).then(() => {
- pipelineExecuted++;
- if (pipelineExecuted == pipelineExecutedExpected) {
- pipelineExpectedExecutions.resolve();
- }
- assert(finished);
- assertEquals(processed, expected);
- });
-
- const pipelineTimeout = setTimeout(
- () => pipelineExpectedExecutions.reject(),
- 1000,
- );
- await pipelineExpectedExecutions;
- clearTimeout(pipelineTimeout);
- assertEquals(pipelineExecuted, pipelineExecutedExpected);
-});
-
-Deno.test("Promise pipeline throws on readable destroyed", async () => {
- const read = new Readable({
- read() {},
- });
-
- const write = new Writable({
- write(_data, _enc, cb) {
- cb();
- },
- });
-
- read.push("data");
- read.destroy();
-
- await assertThrowsAsync(
- () => pipeline(read, write),
- Error,
- "Premature close",
- );
-});
diff --git a/std/node/_stream/readable.ts b/std/node/_stream/readable.ts
deleted file mode 100644
index 54e0d8ecd..000000000
--- a/std/node/_stream/readable.ts
+++ /dev/null
@@ -1,788 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { captureRejectionSymbol } from "../events.ts";
-import Stream from "./stream.ts";
-import type { Buffer } from "../buffer.ts";
-import BufferList from "./buffer_list.ts";
-import {
- ERR_INVALID_OPT_VALUE,
- ERR_METHOD_NOT_IMPLEMENTED,
-} from "../_errors.ts";
-import type { Encodings } from "../_utils.ts";
-import { StringDecoder } from "../string_decoder.ts";
-import createReadableStreamAsyncIterator from "./async_iterator.ts";
-import streamFrom from "./from.ts";
-import { kDestroy, kPaused } from "./symbols.ts";
-import {
- _destroy,
- computeNewHighWaterMark,
- emitReadable,
- endReadable,
- errorOrDestroy,
- fromList,
- howMuchToRead,
- nReadingNextTick,
- pipeOnDrain,
- prependListener,
- readableAddChunk,
- resume,
- updateReadableListening,
-} from "./readable_internal.ts";
-import Writable from "./writable.ts";
-import { errorOrDestroy as errorOrDestroyWritable } from "./writable_internal.ts";
-import Duplex, { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts";
-
-export interface ReadableOptions {
- autoDestroy?: boolean;
- defaultEncoding?: Encodings;
- destroy?(
- this: Readable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ): void;
- emitClose?: boolean;
- encoding?: Encodings;
- highWaterMark?: number;
- objectMode?: boolean;
- read?(this: Readable): void;
-}
-
-export class ReadableState {
- [kPaused]: boolean | null = null;
- awaitDrainWriters: Duplex | Writable | Set<Duplex | Writable> | null = null;
- buffer = new BufferList();
- closed = false;
- closeEmitted = false;
- constructed: boolean;
- decoder: StringDecoder | null = null;
- destroyed = false;
- emittedReadable = false;
- encoding: Encodings | null = null;
- ended = false;
- endEmitted = false;
- errored: Error | null = null;
- errorEmitted = false;
- flowing: boolean | null = null;
- highWaterMark: number;
- length = 0;
- multiAwaitDrain = false;
- needReadable = false;
- objectMode: boolean;
- pipes: Array<Duplex | Writable> = [];
- readable = true;
- readableListening = false;
- reading = false;
- readingMore = false;
- resumeScheduled = false;
- sync = true;
- emitClose: boolean;
- autoDestroy: boolean;
- defaultEncoding: string;
-
- constructor(options?: ReadableOptions) {
- this.objectMode = !!options?.objectMode;
-
- this.highWaterMark = options?.highWaterMark ??
- (this.objectMode ? 16 : 16 * 1024);
- if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) {
- this.highWaterMark = Math.floor(this.highWaterMark);
- } else {
- throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark);
- }
-
- this.emitClose = options?.emitClose ?? true;
- this.autoDestroy = options?.autoDestroy ?? true;
- this.defaultEncoding = options?.defaultEncoding || "utf8";
-
- if (options?.encoding) {
- this.decoder = new StringDecoder(options.encoding);
- this.encoding = options.encoding;
- }
-
- this.constructed = true;
- }
-}
-
-class Readable extends Stream {
- _readableState: ReadableState;
-
- constructor(options?: ReadableOptions) {
- super();
- if (options) {
- if (typeof options.read === "function") {
- this._read = options.read;
- }
- if (typeof options.destroy === "function") {
- this._destroy = options.destroy;
- }
- }
- this._readableState = new ReadableState(options);
- }
-
- static from(
- // deno-lint-ignore no-explicit-any
- iterable: Iterable<any> | AsyncIterable<any>,
- opts?: ReadableOptions,
- ): Readable {
- return streamFrom(iterable, opts);
- }
-
- static ReadableState = ReadableState;
-
- static _fromList = fromList;
-
- // You can override either this method, or the async _read(n) below.
- read(n?: number) {
- // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
- // in this scenario, so we are doing it manually.
- if (n === undefined) {
- n = NaN;
- }
- const state = this._readableState;
- const nOrig = n;
-
- if (n > state.highWaterMark) {
- state.highWaterMark = computeNewHighWaterMark(n);
- }
-
- if (n !== 0) {
- state.emittedReadable = false;
- }
-
- if (
- n === 0 &&
- state.needReadable &&
- ((state.highWaterMark !== 0
- ? state.length >= state.highWaterMark
- : state.length > 0) ||
- state.ended)
- ) {
- if (state.length === 0 && state.ended) {
- endReadable(this);
- } else {
- emitReadable(this);
- }
- return null;
- }
-
- n = howMuchToRead(n, state);
-
- if (n === 0 && state.ended) {
- if (state.length === 0) {
- endReadable(this);
- }
- return null;
- }
-
- let doRead = state.needReadable;
- if (
- state.length === 0 || state.length - (n as number) < state.highWaterMark
- ) {
- doRead = true;
- }
-
- if (
- state.ended || state.reading || state.destroyed || state.errored ||
- !state.constructed
- ) {
- doRead = false;
- } else if (doRead) {
- state.reading = true;
- state.sync = true;
- if (state.length === 0) {
- state.needReadable = true;
- }
- this._read();
- state.sync = false;
- if (!state.reading) {
- n = howMuchToRead(nOrig, state);
- }
- }
-
- let ret;
- if ((n as number) > 0) {
- ret = fromList((n as number), state);
- } else {
- ret = null;
- }
-
- if (ret === null) {
- state.needReadable = state.length <= state.highWaterMark;
- n = 0;
- } else {
- state.length -= n as number;
- if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).clear();
- } else {
- state.awaitDrainWriters = null;
- }
- }
-
- if (state.length === 0) {
- if (!state.ended) {
- state.needReadable = true;
- }
-
- if (nOrig !== n && state.ended) {
- endReadable(this);
- }
- }
-
- if (ret !== null) {
- this.emit("data", ret);
- }
-
- return ret;
- }
-
- _read(_size?: number) {
- throw new ERR_METHOD_NOT_IMPLEMENTED("_read()");
- }
-
- pipe<T extends Duplex | Writable>(dest: T, pipeOpts?: { end?: boolean }): T {
- // deno-lint-ignore no-this-alias
- const src = this;
- const state = this._readableState;
-
- if (state.pipes.length === 1) {
- if (!state.multiAwaitDrain) {
- state.multiAwaitDrain = true;
- state.awaitDrainWriters = new Set(
- state.awaitDrainWriters ? [state.awaitDrainWriters as Writable] : [],
- );
- }
- }
-
- state.pipes.push(dest);
-
- const doEnd = (!pipeOpts || pipeOpts.end !== false);
-
- //TODO(Soremwar)
- //Part of doEnd condition
- //In node, output/input are a duplex Stream
- // &&
- // dest !== stdout &&
- // dest !== stderr
-
- const endFn = doEnd ? onend : unpipe;
- if (state.endEmitted) {
- queueMicrotask(endFn);
- } else {
- this.once("end", endFn);
- }
-
- dest.on("unpipe", onunpipe);
- function onunpipe(readable: Readable, unpipeInfo: { hasUnpiped: boolean }) {
- if (readable === src) {
- if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
- unpipeInfo.hasUnpiped = true;
- cleanup();
- }
- }
- }
-
- function onend() {
- dest.end();
- }
-
- let ondrain: () => void;
-
- let cleanedUp = false;
- function cleanup() {
- dest.removeListener("close", onclose);
- dest.removeListener("finish", onfinish);
- if (ondrain) {
- dest.removeListener("drain", ondrain);
- }
- dest.removeListener("error", onerror);
- dest.removeListener("unpipe", onunpipe);
- src.removeListener("end", onend);
- src.removeListener("end", unpipe);
- src.removeListener("data", ondata);
-
- cleanedUp = true;
- if (
- ondrain && state.awaitDrainWriters &&
- (!dest._writableState || dest._writableState.needDrain)
- ) {
- ondrain();
- }
- }
-
- this.on("data", ondata);
- // deno-lint-ignore no-explicit-any
- function ondata(chunk: any) {
- const ret = dest.write(chunk);
- if (ret === false) {
- if (!cleanedUp) {
- if (state.pipes.length === 1 && state.pipes[0] === dest) {
- state.awaitDrainWriters = dest;
- state.multiAwaitDrain = false;
- } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
- (state.awaitDrainWriters as Set<Duplex | Writable>).add(dest);
- }
- src.pause();
- }
- if (!ondrain) {
- ondrain = pipeOnDrain(src, dest);
- dest.on("drain", ondrain);
- }
- }
- }
-
- function onerror(er: Error) {
- unpipe();
- dest.removeListener("error", onerror);
- if (dest.listenerCount("error") === 0) {
- const s = dest._writableState || (dest as Duplex)._readableState;
- if (s && !s.errorEmitted) {
- if (dest instanceof Duplex) {
- errorOrDestroyDuplex(dest as unknown as Duplex, er);
- } else {
- errorOrDestroyWritable(dest as Writable, er);
- }
- } else {
- dest.emit("error", er);
- }
- }
- }
-
- prependListener(dest, "error", onerror);
-
- function onclose() {
- dest.removeListener("finish", onfinish);
- unpipe();
- }
- dest.once("close", onclose);
- function onfinish() {
- dest.removeListener("close", onclose);
- unpipe();
- }
- dest.once("finish", onfinish);
-
- function unpipe() {
- src.unpipe(dest as Writable);
- }
-
- dest.emit("pipe", this);
-
- if (!state.flowing) {
- this.resume();
- }
-
- return dest;
- }
-
- isPaused() {
- return this._readableState[kPaused] === true ||
- this._readableState.flowing === false;
- }
-
- setEncoding(enc: Encodings) {
- const decoder = new StringDecoder(enc);
- this._readableState.decoder = decoder;
- this._readableState.encoding = this._readableState.decoder
- .encoding as Encodings;
-
- const buffer = this._readableState.buffer;
- let content = "";
- for (const data of buffer) {
- content += decoder.write(data as Buffer);
- }
- buffer.clear();
- if (content !== "") {
- buffer.push(content);
- }
- this._readableState.length = content.length;
- return this;
- }
-
- on(
- event: "close" | "end" | "pause" | "readable" | "resume",
- listener: () => void,
- ): this;
- // deno-lint-ignore no-explicit-any
- on(event: "data", listener: (chunk: any) => void): this;
- on(event: "error", listener: (err: Error) => void): this;
- // deno-lint-ignore no-explicit-any
- on(event: string | symbol, listener: (...args: any[]) => void): this;
- on(
- ev: string | symbol,
- fn:
- | (() => void)
- // deno-lint-ignore no-explicit-any
- | ((chunk: any) => void)
- | ((err: Error) => void)
- // deno-lint-ignore no-explicit-any
- | ((...args: any[]) => void),
- ) {
- const res = super.on.call(this, ev, fn);
- const state = this._readableState;
-
- if (ev === "data") {
- state.readableListening = this.listenerCount("readable") > 0;
-
- if (state.flowing !== false) {
- this.resume();
- }
- } else if (ev === "readable") {
- if (!state.endEmitted && !state.readableListening) {
- state.readableListening = state.needReadable = true;
- state.flowing = false;
- state.emittedReadable = false;
- if (state.length) {
- emitReadable(this);
- } else if (!state.reading) {
- queueMicrotask(() => nReadingNextTick(this));
- }
- }
- }
-
- return res;
- }
-
- removeListener(
- event: "close" | "end" | "pause" | "readable" | "resume",
- listener: () => void,
- ): this;
- // deno-lint-ignore no-explicit-any
- removeListener(event: "data", listener: (chunk: any) => void): this;
- removeListener(event: "error", listener: (err: Error) => void): this;
- removeListener(
- event: string | symbol,
- // deno-lint-ignore no-explicit-any
- listener: (...args: any[]) => void,
- ): this;
- removeListener(
- ev: string | symbol,
- fn:
- | (() => void)
- // deno-lint-ignore no-explicit-any
- | ((chunk: any) => void)
- | ((err: Error) => void)
- // deno-lint-ignore no-explicit-any
- | ((...args: any[]) => void),
- ) {
- const res = super.removeListener.call(this, ev, fn);
-
- if (ev === "readable") {
- queueMicrotask(() => updateReadableListening(this));
- }
-
- return res;
- }
-
- off = this.removeListener;
-
- destroy(err?: Error | null, cb?: () => void) {
- const r = this._readableState;
-
- if (r.destroyed) {
- if (typeof cb === "function") {
- cb();
- }
-
- return this;
- }
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- r.destroyed = true;
-
- // If still constructing then defer calling _destroy.
- if (!r.constructed) {
- this.once(kDestroy, (er: Error) => {
- _destroy(this, err || er, cb);
- });
- } else {
- _destroy(this, err, cb);
- }
-
- return this;
- }
-
- _undestroy() {
- const r = this._readableState;
- r.constructed = true;
- r.closed = false;
- r.closeEmitted = false;
- r.destroyed = false;
- r.errored = null;
- r.errorEmitted = false;
- r.reading = false;
- r.ended = false;
- r.endEmitted = false;
- }
-
- _destroy(
- error: Error | null,
- callback: (error?: Error | null) => void,
- ): void {
- callback(error);
- }
-
- [captureRejectionSymbol](err: Error) {
- this.destroy(err);
- }
-
- // deno-lint-ignore no-explicit-any
- push(chunk: any, encoding?: Encodings): boolean {
- return readableAddChunk(this, chunk, encoding, false);
- }
-
- // deno-lint-ignore no-explicit-any
- unshift(chunk: any, encoding?: string): boolean {
- return readableAddChunk(this, chunk, encoding, true);
- }
-
- unpipe(dest?: Writable): this {
- const state = this._readableState;
- const unpipeInfo = { hasUnpiped: false };
-
- if (state.pipes.length === 0) {
- return this;
- }
-
- if (!dest) {
- // remove all.
- const dests = state.pipes;
- state.pipes = [];
- this.pause();
-
- for (const dest of dests) {
- dest.emit("unpipe", this, { hasUnpiped: false });
- }
- return this;
- }
-
- const index = state.pipes.indexOf(dest);
- if (index === -1) {
- return this;
- }
-
- state.pipes.splice(index, 1);
- if (state.pipes.length === 0) {
- this.pause();
- }
-
- dest.emit("unpipe", this, unpipeInfo);
-
- return this;
- }
-
- removeAllListeners(
- ev:
- | "close"
- | "data"
- | "end"
- | "error"
- | "pause"
- | "readable"
- | "resume"
- | symbol
- | undefined,
- ) {
- const res = super.removeAllListeners(ev);
-
- if (ev === "readable" || ev === undefined) {
- queueMicrotask(() => updateReadableListening(this));
- }
-
- return res;
- }
-
- resume() {
- const state = this._readableState;
- if (!state.flowing) {
- // We flow only if there is no one listening
- // for readable, but we still have to call
- // resume().
- state.flowing = !state.readableListening;
- resume(this, state);
- }
- state[kPaused] = false;
- return this;
- }
-
- pause() {
- if (this._readableState.flowing !== false) {
- this._readableState.flowing = false;
- this.emit("pause");
- }
- this._readableState[kPaused] = true;
- return this;
- }
-
- /** Wrap an old-style stream as the async data source. */
- wrap(stream: Stream): this {
- const state = this._readableState;
- let paused = false;
-
- stream.on("end", () => {
- if (state.decoder && !state.ended) {
- const chunk = state.decoder.end();
- if (chunk && chunk.length) {
- this.push(chunk);
- }
- }
-
- this.push(null);
- });
-
- stream.on("data", (chunk) => {
- if (state.decoder) {
- chunk = state.decoder.write(chunk);
- }
-
- if (state.objectMode && (chunk === null || chunk === undefined)) {
- return;
- } else if (!state.objectMode && (!chunk || !chunk.length)) {
- return;
- }
-
- const ret = this.push(chunk);
- if (!ret) {
- paused = true;
- // By the time this is triggered, stream will be a readable stream
- // deno-lint-ignore ban-ts-comment
- // @ts-ignore
- stream.pause();
- }
- });
-
- // TODO(Soremwar)
- // There must be a clean way to implement this on TypeScript
- // Proxy all the other methods. Important when wrapping filters and duplexes.
- for (const i in stream) {
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- if (this[i] === undefined && typeof stream[i] === "function") {
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- this[i] = function methodWrap(method) {
- return function methodWrapReturnFunction() {
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- return stream[method].apply(stream);
- };
- }(i);
- }
- }
-
- stream.on("error", (err) => {
- errorOrDestroy(this, err);
- });
-
- stream.on("close", () => {
- this.emit("close");
- });
-
- stream.on("destroy", () => {
- this.emit("destroy");
- });
-
- stream.on("pause", () => {
- this.emit("pause");
- });
-
- stream.on("resume", () => {
- this.emit("resume");
- });
-
- this._read = () => {
- if (paused) {
- paused = false;
- // By the time this is triggered, stream will be a readable stream
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- stream.resume();
- }
- };
-
- return this;
- }
-
- [Symbol.asyncIterator]() {
- return createReadableStreamAsyncIterator(this);
- }
-
- get readable(): boolean {
- return this._readableState?.readable &&
- !this._readableState?.destroyed &&
- !this._readableState?.errorEmitted &&
- !this._readableState?.endEmitted;
- }
- set readable(val: boolean) {
- if (this._readableState) {
- this._readableState.readable = val;
- }
- }
-
- get readableHighWaterMark(): number {
- return this._readableState.highWaterMark;
- }
-
- get readableBuffer() {
- return this._readableState && this._readableState.buffer;
- }
-
- get readableFlowing(): boolean | null {
- return this._readableState.flowing;
- }
-
- set readableFlowing(state: boolean | null) {
- if (this._readableState) {
- this._readableState.flowing = state;
- }
- }
-
- get readableLength() {
- return this._readableState.length;
- }
-
- get readableObjectMode() {
- return this._readableState ? this._readableState.objectMode : false;
- }
-
- get readableEncoding() {
- return this._readableState ? this._readableState.encoding : null;
- }
-
- get destroyed() {
- if (this._readableState === undefined) {
- return false;
- }
- return this._readableState.destroyed;
- }
-
- set destroyed(value: boolean) {
- if (!this._readableState) {
- return;
- }
- this._readableState.destroyed = value;
- }
-
- get readableEnded() {
- return this._readableState ? this._readableState.endEmitted : false;
- }
-}
-
-Object.defineProperties(Readable, {
- _readableState: { enumerable: false },
- destroyed: { enumerable: false },
- readableBuffer: { enumerable: false },
- readableEncoding: { enumerable: false },
- readableEnded: { enumerable: false },
- readableFlowing: { enumerable: false },
- readableHighWaterMark: { enumerable: false },
- readableLength: { enumerable: false },
- readableObjectMode: { enumerable: false },
-});
-
-export default Readable;
diff --git a/std/node/_stream/readable_internal.ts b/std/node/_stream/readable_internal.ts
deleted file mode 100644
index 0ef261d4d..000000000
--- a/std/node/_stream/readable_internal.ts
+++ /dev/null
@@ -1,438 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import type Duplex from "./duplex.ts";
-import type EventEmitter from "../events.ts";
-import type Readable from "./readable.ts";
-import type Writable from "./writable.ts";
-import type { ReadableState } from "./readable.ts";
-import { kPaused } from "./symbols.ts";
-import {
- ERR_STREAM_PUSH_AFTER_EOF,
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
-} from "../_errors.ts";
-
-export function _destroy(
- self: Readable,
- err?: Error | null,
- cb?: (error?: Error | null) => void,
-) {
- self._destroy(err || null, (err) => {
- const r = (self as Readable)._readableState;
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- r.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- if (!r.errorEmitted) {
- r.errorEmitted = true;
- self.emit("error", err);
- }
- r.closeEmitted = true;
- if (r.emitClose) {
- self.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- r.closeEmitted = true;
- if (r.emitClose) {
- self.emit("close");
- }
- });
- }
- });
-}
-
-export function addChunk(
- stream: Duplex | Readable,
- state: ReadableState,
- chunk: string | Buffer | Uint8Array,
- addToFront: boolean,
-) {
- if (state.flowing && state.length === 0 && !state.sync) {
- if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).clear();
- } else {
- state.awaitDrainWriters = null;
- }
- stream.emit("data", chunk);
- } else {
- // Update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length;
- if (addToFront) {
- state.buffer.unshift(chunk);
- } else {
- state.buffer.push(chunk);
- }
-
- if (state.needReadable) {
- emitReadable(stream);
- }
- }
- maybeReadMore(stream, state);
-}
-
-// Don't raise the hwm > 1GB.
-const MAX_HWM = 0x40000000;
-export function computeNewHighWaterMark(n: number) {
- if (n >= MAX_HWM) {
- n = MAX_HWM;
- } else {
- n--;
- n |= n >>> 1;
- n |= n >>> 2;
- n |= n >>> 4;
- n |= n >>> 8;
- n |= n >>> 16;
- n++;
- }
- return n;
-}
-
-export function emitReadable(stream: Duplex | Readable) {
- const state = stream._readableState;
- state.needReadable = false;
- if (!state.emittedReadable) {
- state.emittedReadable = true;
- queueMicrotask(() => emitReadable_(stream));
- }
-}
-
-function emitReadable_(stream: Duplex | Readable) {
- const state = stream._readableState;
- if (!state.destroyed && !state.errored && (state.length || state.ended)) {
- stream.emit("readable");
- state.emittedReadable = false;
- }
-
- state.needReadable = !state.flowing &&
- !state.ended &&
- state.length <= state.highWaterMark;
- flow(stream);
-}
-
-export function endReadable(stream: Readable) {
- const state = stream._readableState;
-
- if (!state.endEmitted) {
- state.ended = true;
- queueMicrotask(() => endReadableNT(state, stream));
- }
-}
-
-function endReadableNT(state: ReadableState, stream: Readable) {
- if (
- !state.errorEmitted && !state.closeEmitted &&
- !state.endEmitted && state.length === 0
- ) {
- state.endEmitted = true;
- stream.emit("end");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
- }
-}
-
-export function errorOrDestroy(
- stream: Duplex | Readable,
- err: Error,
- sync = false,
-) {
- const r = stream._readableState;
-
- if (r.destroyed) {
- return stream;
- }
-
- if (r.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!r.errored) {
- r.errored = err;
- }
- if (sync) {
- queueMicrotask(() => {
- if (!r.errorEmitted) {
- r.errorEmitted = true;
- stream.emit("error", err);
- }
- });
- } else if (!r.errorEmitted) {
- r.errorEmitted = true;
- stream.emit("error", err);
- }
- }
-}
-
-function flow(stream: Duplex | Readable) {
- const state = stream._readableState;
- while (state.flowing && stream.read() !== null);
-}
-
-/** Pluck off n bytes from an array of buffers.
-* Length is the combined lengths of all the buffers in the list.
-* This function is designed to be inlinable, so please take care when making
-* changes to the function body.
-*/
-export function fromList(n: number, state: ReadableState) {
- // nothing buffered.
- if (state.length === 0) {
- return null;
- }
-
- let ret;
- if (state.objectMode) {
- ret = state.buffer.shift();
- } else if (!n || n >= state.length) {
- if (state.decoder) {
- ret = state.buffer.join("");
- } else if (state.buffer.length === 1) {
- ret = state.buffer.first();
- } else {
- ret = state.buffer.concat(state.length);
- }
- state.buffer.clear();
- } else {
- ret = state.buffer.consume(n, !!state.decoder);
- }
-
- return ret;
-}
-
-export function howMuchToRead(n: number, state: ReadableState) {
- if (n <= 0 || (state.length === 0 && state.ended)) {
- return 0;
- }
- if (state.objectMode) {
- return 1;
- }
- if (Number.isNaN(n)) {
- // Only flow one buffer at a time.
- if (state.flowing && state.length) {
- return state.buffer.first().length;
- }
- return state.length;
- }
- if (n <= state.length) {
- return n;
- }
- return state.ended ? state.length : 0;
-}
-
-export function maybeReadMore(stream: Readable, state: ReadableState) {
- if (!state.readingMore && state.constructed) {
- state.readingMore = true;
- queueMicrotask(() => maybeReadMore_(stream, state));
- }
-}
-
-function maybeReadMore_(stream: Readable, state: ReadableState) {
- while (
- !state.reading && !state.ended &&
- (state.length < state.highWaterMark ||
- (state.flowing && state.length === 0))
- ) {
- const len = state.length;
- stream.read(0);
- if (len === state.length) {
- // Didn't get any data, stop spinning.
- break;
- }
- }
- state.readingMore = false;
-}
-
-export function nReadingNextTick(self: Duplex | Readable) {
- self.read(0);
-}
-
-export function onEofChunk(stream: Duplex | Readable, state: ReadableState) {
- if (state.ended) return;
- if (state.decoder) {
- const chunk = state.decoder.end();
- if (chunk && chunk.length) {
- state.buffer.push(chunk);
- state.length += state.objectMode ? 1 : chunk.length;
- }
- }
- state.ended = true;
-
- if (state.sync) {
- emitReadable(stream);
- } else {
- state.needReadable = false;
- state.emittedReadable = true;
- emitReadable_(stream);
- }
-}
-
-export function pipeOnDrain(src: Duplex | Readable, dest: Duplex | Writable) {
- return function pipeOnDrainFunctionResult() {
- const state = src._readableState;
-
- if (state.awaitDrainWriters === dest) {
- state.awaitDrainWriters = null;
- } else if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Duplex | Writable>).delete(dest);
- }
-
- if (
- (!state.awaitDrainWriters ||
- (state.awaitDrainWriters as Set<Writable>).size === 0) &&
- src.listenerCount("data")
- ) {
- state.flowing = true;
- flow(src);
- }
- };
-}
-
-export function prependListener(
- emitter: EventEmitter,
- event: string,
- // deno-lint-ignore no-explicit-any
- fn: (...args: any[]) => any,
-) {
- if (typeof emitter.prependListener === "function") {
- return emitter.prependListener(event, fn);
- }
-
- // This is a hack to make sure that our error handler is attached before any
- // userland ones. NEVER DO THIS. This is here only because this code needs
- // to continue to work with older versions of Node.js that do not include
- //the prependListener() method. The goal is to eventually remove this hack.
- // TODO(Soremwar)
- // Burn it with fire
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- if (emitter._events.get(event)?.length) {
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- const listeners = [fn, ...emitter._events.get(event)];
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- emitter._events.set(event, listeners);
- } else {
- emitter.on(event, fn);
- }
-}
-
-export function readableAddChunk(
- stream: Duplex | Readable,
- chunk: string | Buffer | Uint8Array | null,
- encoding: undefined | string = undefined,
- addToFront: boolean,
-) {
- const state = stream._readableState;
- let usedEncoding = encoding;
-
- let err;
- if (!state.objectMode) {
- if (typeof chunk === "string") {
- usedEncoding = encoding || state.defaultEncoding;
- if (state.encoding !== usedEncoding) {
- if (addToFront && state.encoding) {
- chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
- } else {
- chunk = Buffer.from(chunk, usedEncoding);
- usedEncoding = "";
- }
- }
- } else if (chunk instanceof Uint8Array) {
- chunk = Buffer.from(chunk);
- }
- }
-
- if (err) {
- errorOrDestroy(stream, err);
- } else if (chunk === null) {
- state.reading = false;
- onEofChunk(stream, state);
- } else if (state.objectMode || (chunk.length > 0)) {
- if (addToFront) {
- if (state.endEmitted) {
- errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
- } else {
- addChunk(stream, state, chunk, true);
- }
- } else if (state.ended) {
- errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
- } else if (state.destroyed || state.errored) {
- return false;
- } else {
- state.reading = false;
- if (state.decoder && !usedEncoding) {
- //TODO(Soremwar)
- //I don't think this cast is right
- chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
- if (state.objectMode || chunk.length !== 0) {
- addChunk(stream, state, chunk, false);
- } else {
- maybeReadMore(stream, state);
- }
- } else {
- addChunk(stream, state, chunk, false);
- }
- }
- } else if (!addToFront) {
- state.reading = false;
- maybeReadMore(stream, state);
- }
-
- return !state.ended &&
- (state.length < state.highWaterMark || state.length === 0);
-}
-
-export function resume(stream: Duplex | Readable, state: ReadableState) {
- if (!state.resumeScheduled) {
- state.resumeScheduled = true;
- queueMicrotask(() => resume_(stream, state));
- }
-}
-
-function resume_(stream: Duplex | Readable, state: ReadableState) {
- if (!state.reading) {
- stream.read(0);
- }
-
- state.resumeScheduled = false;
- stream.emit("resume");
- flow(stream);
- if (state.flowing && !state.reading) {
- stream.read(0);
- }
-}
-
-export function updateReadableListening(self: Duplex | Readable) {
- const state = self._readableState;
- state.readableListening = self.listenerCount("readable") > 0;
-
- if (state.resumeScheduled && state[kPaused] === false) {
- // Flowing needs to be set to true now, otherwise
- // the upcoming resume will not flow.
- state.flowing = true;
-
- // Crude way to check if we should resume.
- } else if (self.listenerCount("data") > 0) {
- self.resume();
- } else if (!state.readableListening) {
- state.flowing = null;
- }
-}
diff --git a/std/node/_stream/readable_test.ts b/std/node/_stream/readable_test.ts
deleted file mode 100644
index 72767e28f..000000000
--- a/std/node/_stream/readable_test.ts
+++ /dev/null
@@ -1,489 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Readable from "../_stream/readable.ts";
-import { once } from "../events.ts";
-import { deferred } from "../../async/mod.ts";
-import {
- assert,
- assertEquals,
- assertStrictEquals,
-} from "../../testing/asserts.ts";
-
-Deno.test("Readable stream from iterator", async () => {
- function* generate() {
- yield "a";
- yield "b";
- yield "c";
- }
-
- const stream = Readable.from(generate());
-
- const expected = ["a", "b", "c"];
-
- for await (const chunk of stream) {
- assertStrictEquals(chunk, expected.shift());
- }
-});
-
-Deno.test("Readable stream from async iterator", async () => {
- async function* generate() {
- yield "a";
- yield "b";
- yield "c";
- }
-
- const stream = Readable.from(generate());
-
- const expected = ["a", "b", "c"];
-
- for await (const chunk of stream) {
- assertStrictEquals(chunk, expected.shift());
- }
-});
-
-Deno.test("Readable stream from promise", async () => {
- const promises = [
- Promise.resolve("a"),
- Promise.resolve("b"),
- Promise.resolve("c"),
- ];
-
- const stream = Readable.from(promises);
-
- const expected = ["a", "b", "c"];
-
- for await (const chunk of stream) {
- assertStrictEquals(chunk, expected.shift());
- }
-});
-
-Deno.test("Readable stream from string", async () => {
- const string = "abc";
- const stream = Readable.from(string);
-
- for await (const chunk of stream) {
- assertStrictEquals(chunk, string);
- }
-});
-
-Deno.test("Readable stream from Buffer", async () => {
- const string = "abc";
- const stream = Readable.from(Buffer.from(string));
-
- for await (const chunk of stream) {
- assertStrictEquals((chunk as Buffer).toString(), string);
- }
-});
-
-Deno.test("Readable stream gets destroyed on error", async () => {
- // deno-lint-ignore require-yield
- async function* generate() {
- throw new Error("kaboom");
- }
-
- const stream = Readable.from(generate());
-
- stream.read();
-
- const [err] = await once(stream, "error");
- assertStrictEquals(err.message, "kaboom");
- assertStrictEquals(stream.destroyed, true);
-});
-
-Deno.test("Readable stream works as Transform stream", async () => {
- async function* generate(stream: Readable) {
- for await (const chunk of stream) {
- yield (chunk as string).toUpperCase();
- }
- }
-
- const source = new Readable({
- objectMode: true,
- read() {
- this.push("a");
- this.push("b");
- this.push("c");
- this.push(null);
- },
- });
-
- const stream = Readable.from(generate(source));
-
- const expected = ["A", "B", "C"];
-
- for await (const chunk of stream) {
- assertStrictEquals(chunk, expected.shift());
- }
-});
-
-Deno.test("Readable stream can be paused", () => {
- const readable = new Readable();
-
- // _read is a noop, here.
- readable._read = () => {};
-
- // Default state of a stream is not "paused"
- assert(!readable.isPaused());
-
- // Make the stream start flowing...
- readable.on("data", () => {});
-
- // still not paused.
- assert(!readable.isPaused());
-
- readable.pause();
- assert(readable.isPaused());
- readable.resume();
- assert(!readable.isPaused());
-});
-
-Deno.test("Readable stream sets enconding correctly", () => {
- const readable = new Readable({
- read() {},
- });
-
- readable.setEncoding("utf8");
-
- readable.push(new TextEncoder().encode("DEF"));
- readable.unshift(new TextEncoder().encode("ABC"));
-
- assertStrictEquals(readable.read(), "ABCDEF");
-});
-
-Deno.test("Readable stream sets encoding correctly", () => {
- const readable = new Readable({
- read() {},
- });
-
- readable.setEncoding("utf8");
-
- readable.push(new TextEncoder().encode("DEF"));
- readable.unshift(new TextEncoder().encode("ABC"));
-
- assertStrictEquals(readable.read(), "ABCDEF");
-});
-
-Deno.test("Readable stream holds up a big push", async () => {
- let readExecuted = 0;
- const readExecutedExpected = 3;
- const readExpectedExecutions = deferred();
-
- let endExecuted = 0;
- const endExecutedExpected = 1;
- const endExpectedExecutions = deferred();
-
- const str = "asdfasdfasdfasdfasdf";
-
- const r = new Readable({
- highWaterMark: 5,
- encoding: "utf8",
- });
-
- let reads = 0;
-
- function _read() {
- if (reads === 0) {
- setTimeout(() => {
- r.push(str);
- }, 1);
- reads++;
- } else if (reads === 1) {
- const ret = r.push(str);
- assertEquals(ret, false);
- reads++;
- } else {
- r.push(null);
- }
- }
-
- r._read = () => {
- readExecuted++;
- if (readExecuted == readExecutedExpected) {
- readExpectedExecutions.resolve();
- }
- _read();
- };
-
- r.on("end", () => {
- endExecuted++;
- if (endExecuted == endExecutedExpected) {
- endExpectedExecutions.resolve();
- }
- });
-
- // Push some data in to start.
- // We've never gotten any read event at this point.
- const ret = r.push(str);
- assert(!ret);
- let chunk = r.read();
- assertEquals(chunk, str);
- chunk = r.read();
- assertEquals(chunk, null);
-
- r.once("readable", () => {
- // This time, we'll get *all* the remaining data, because
- // it's been added synchronously, as the read WOULD take
- // us below the hwm, and so it triggered a _read() again,
- // which synchronously added more, which we then return.
- chunk = r.read();
- assertEquals(chunk, str + str);
-
- chunk = r.read();
- assertEquals(chunk, null);
- });
-
- const readTimeout = setTimeout(
- () => readExpectedExecutions.reject(),
- 1000,
- );
- const endTimeout = setTimeout(
- () => endExpectedExecutions.reject(),
- 1000,
- );
- await readExpectedExecutions;
- await endExpectedExecutions;
- clearTimeout(readTimeout);
- clearTimeout(endTimeout);
- assertEquals(readExecuted, readExecutedExpected);
- assertEquals(endExecuted, endExecutedExpected);
-});
-
-Deno.test("Readable stream: 'on' event", async () => {
- async function* generate() {
- yield "a";
- yield "b";
- yield "c";
- }
-
- const stream = Readable.from(generate());
-
- let iterations = 0;
- const expected = ["a", "b", "c"];
-
- stream.on("data", (chunk) => {
- iterations++;
- assertStrictEquals(chunk, expected.shift());
- });
-
- await once(stream, "end");
-
- assertStrictEquals(iterations, 3);
-});
-
-Deno.test("Readable stream: 'data' event", async () => {
- async function* generate() {
- yield "a";
- yield "b";
- yield "c";
- }
-
- const stream = Readable.from(generate(), { objectMode: false });
-
- let iterations = 0;
- const expected = ["a", "b", "c"];
-
- stream.on("data", (chunk) => {
- iterations++;
- assertStrictEquals(chunk instanceof Buffer, true);
- assertStrictEquals(chunk.toString(), expected.shift());
- });
-
- await once(stream, "end");
-
- assertStrictEquals(iterations, 3);
-});
-
-Deno.test("Readable stream: 'data' event on non-object", async () => {
- async function* generate() {
- yield "a";
- yield "b";
- yield "c";
- }
-
- const stream = Readable.from(generate(), { objectMode: false });
-
- let iterations = 0;
- const expected = ["a", "b", "c"];
-
- stream.on("data", (chunk) => {
- iterations++;
- assertStrictEquals(chunk instanceof Buffer, true);
- assertStrictEquals(chunk.toString(), expected.shift());
- });
-
- await once(stream, "end");
-
- assertStrictEquals(iterations, 3);
-});
-
-Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- const r = new Readable({
- highWaterMark: 3,
- });
-
- r._read = () => {
- throw new Error("_read must not be called");
- };
- r.push(Buffer.from("blerg"));
-
- setTimeout(function () {
- assert(!r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- clearTimeout(readableTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
-});
-
-Deno.test("Readable stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- let readExecuted = 0;
- const readExecutedExpected = 1;
- const readExpectedExecutions = deferred();
-
- const r = new Readable({
- highWaterMark: 3,
- });
-
- r._read = () => {
- readExecuted++;
- if (readExecuted == readExecutedExpected) {
- readExpectedExecutions.resolve();
- }
- };
-
- r.push(Buffer.from("bl"));
-
- setTimeout(function () {
- assert(r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- const readTimeout = setTimeout(
- () => readExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- await readExpectedExecutions;
- clearTimeout(readableTimeout);
- clearTimeout(readTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
- assertEquals(readExecuted, readExecutedExpected);
-});
-
-Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- const r = new Readable({
- highWaterMark: 30,
- });
-
- r._read = () => {
- throw new Error("Must not be executed");
- };
-
- r.push(Buffer.from("blerg"));
- //This ends the stream and triggers end
- r.push(null);
-
- setTimeout(function () {
- // Assert we're testing what we think we are
- assert(!r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- clearTimeout(readableTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
-});
-
-Deno.test("Readable stream: 'read' is emitted on empty string pushed in non-object mode", async () => {
- let endExecuted = 0;
- const endExecutedExpected = 1;
- const endExpectedExecutions = deferred();
-
- const underlyingData = ["", "x", "y", "", "z"];
- const expected = underlyingData.filter((data) => data);
- const result: unknown[] = [];
-
- const r = new Readable({
- encoding: "utf8",
- });
- r._read = function () {
- queueMicrotask(() => {
- if (!underlyingData.length) {
- this.push(null);
- } else {
- this.push(underlyingData.shift());
- }
- });
- };
-
- r.on("readable", () => {
- const data = r.read();
- if (data !== null) result.push(data);
- });
-
- r.on("end", () => {
- endExecuted++;
- if (endExecuted == endExecutedExpected) {
- endExpectedExecutions.resolve();
- }
- assertEquals(result, expected);
- });
-
- const endTimeout = setTimeout(
- () => endExpectedExecutions.reject(),
- 1000,
- );
- await endExpectedExecutions;
- clearTimeout(endTimeout);
- assertEquals(endExecuted, endExecutedExpected);
-});
-
-Deno.test("Readable stream: listeners can be removed", () => {
- const r = new Readable();
- r._read = () => {};
- r.on("data", () => {});
-
- r.removeAllListeners("data");
-
- assertEquals(r.eventNames().length, 0);
-});
diff --git a/std/node/_stream/stream.ts b/std/node/_stream/stream.ts
deleted file mode 100644
index 4daafc77b..000000000
--- a/std/node/_stream/stream.ts
+++ /dev/null
@@ -1,81 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import EventEmitter from "../events.ts";
-import type Readable from "./readable.ts";
-import type Writable from "./writable.ts";
-import { types } from "../util.ts";
-
-class Stream extends EventEmitter {
- constructor() {
- super();
- }
-
- static _isUint8Array = types.isUint8Array;
- static _uint8ArrayToBuffer = (chunk: Uint8Array) => Buffer.from(chunk);
-
- pipe(dest: Readable | Writable, options?: { end?: boolean }) {
- // deno-lint-ignore no-this-alias
- const source = this;
-
- //TODO(Soremwar)
- //isStdio exist on stdin || stdout only, which extend from Duplex
- //if (!dest._isStdio && (options?.end ?? true)) {
- //Find an alternative to be able to pipe streams to stdin & stdout
- //Port them as well?
- if (options?.end ?? true) {
- source.on("end", onend);
- source.on("close", onclose);
- }
-
- let didOnEnd = false;
- function onend() {
- if (didOnEnd) return;
- didOnEnd = true;
-
- // 'end' is only called on Writable streams
- (dest as Writable).end();
- }
-
- function onclose() {
- if (didOnEnd) return;
- didOnEnd = true;
-
- if (typeof dest.destroy === "function") dest.destroy();
- }
-
- // Don't leave dangling pipes when there are errors.
- function onerror(this: Stream, er: Error) {
- cleanup();
- if (this.listenerCount("error") === 0) {
- throw er; // Unhandled stream error in pipe.
- }
- }
-
- source.on("error", onerror);
- dest.on("error", onerror);
-
- // Remove all the event listeners that were added.
- function cleanup() {
- source.removeListener("end", onend);
- source.removeListener("close", onclose);
-
- source.removeListener("error", onerror);
- dest.removeListener("error", onerror);
-
- source.removeListener("end", cleanup);
- source.removeListener("close", cleanup);
-
- dest.removeListener("close", cleanup);
- }
-
- source.on("end", cleanup);
- source.on("close", cleanup);
-
- dest.on("close", cleanup);
- dest.emit("pipe", source);
-
- return dest;
- }
-}
-
-export default Stream;
diff --git a/std/node/_stream/symbols.ts b/std/node/_stream/symbols.ts
deleted file mode 100644
index addb969d3..000000000
--- a/std/node/_stream/symbols.ts
+++ /dev/null
@@ -1,4 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-export const kConstruct = Symbol("kConstruct");
-export const kDestroy = Symbol("kDestroy");
-export const kPaused = Symbol("kPaused");
diff --git a/std/node/_stream/transform.ts b/std/node/_stream/transform.ts
deleted file mode 100644
index a4246e81a..000000000
--- a/std/node/_stream/transform.ts
+++ /dev/null
@@ -1,132 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Encodings } from "../_utils.ts";
-import Duplex from "./duplex.ts";
-import type { DuplexOptions } from "./duplex.ts";
-import type { writeV } from "./writable_internal.ts";
-import { ERR_METHOD_NOT_IMPLEMENTED } from "../_errors.ts";
-
-const kCallback = Symbol("kCallback");
-
-type TransformFlush = (
- this: Transform,
- // deno-lint-ignore no-explicit-any
- callback: (error?: Error | null, data?: any) => void,
-) => void;
-
-export interface TransformOptions extends DuplexOptions {
- read?(this: Transform, size: number): void;
- write?(
- this: Transform,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: Encodings,
- callback: (error?: Error | null) => void,
- ): void;
- writev?: writeV;
- final?(this: Transform, callback: (error?: Error | null) => void): void;
- destroy?(
- this: Transform,
- error: Error | null,
- callback: (error: Error | null) => void,
- ): void;
- transform?(
- this: Transform,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: Encodings,
- // deno-lint-ignore no-explicit-any
- callback: (error?: Error | null, data?: any) => void,
- ): void;
- flush?: TransformFlush;
-}
-
-export default class Transform extends Duplex {
- [kCallback]: null | ((error?: Error | null) => void);
- _flush?: TransformFlush;
-
- constructor(options?: TransformOptions) {
- super(options);
- this._readableState.sync = false;
-
- this[kCallback] = null;
-
- if (options) {
- if (typeof options.transform === "function") {
- this._transform = options.transform;
- }
-
- if (typeof options.flush === "function") {
- this._flush = options.flush;
- }
- }
-
- this.on("prefinish", function (this: Transform) {
- if (typeof this._flush === "function" && !this.destroyed) {
- this._flush((er, data) => {
- if (er) {
- this.destroy(er);
- return;
- }
-
- if (data != null) {
- this.push(data);
- }
- this.push(null);
- });
- } else {
- this.push(null);
- }
- });
- }
-
- _read = () => {
- if (this[kCallback]) {
- const callback = this[kCallback] as (error?: Error | null) => void;
- this[kCallback] = null;
- callback();
- }
- };
-
- _transform(
- // deno-lint-ignore no-explicit-any
- _chunk: any,
- _encoding: string,
- // deno-lint-ignore no-explicit-any
- _callback: (error?: Error | null, data?: any) => void,
- ) {
- throw new ERR_METHOD_NOT_IMPLEMENTED("_transform()");
- }
-
- _write = (
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- callback: (error?: Error | null) => void,
- ) => {
- const rState = this._readableState;
- const wState = this._writableState;
- const length = rState.length;
-
- this._transform(chunk, encoding, (err, val) => {
- if (err) {
- callback(err);
- return;
- }
-
- if (val != null) {
- this.push(val);
- }
-
- if (
- wState.ended || // Backwards compat.
- length === rState.length || // Backwards compat.
- rState.length < rState.highWaterMark ||
- rState.length === 0
- ) {
- callback();
- } else {
- this[kCallback] = callback;
- }
- });
- };
-}
diff --git a/std/node/_stream/transform_test.ts b/std/node/_stream/transform_test.ts
deleted file mode 100644
index d3b90ff01..000000000
--- a/std/node/_stream/transform_test.ts
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Transform from "./transform.ts";
-import finished from "./end_of_stream.ts";
-import { deferred } from "../../async/mod.ts";
-import { assert, assertEquals } from "../../testing/asserts.ts";
-
-Deno.test("Transform stream finishes correctly", async () => {
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExecution = deferred();
-
- const tr = new Transform({
- transform(_data, _enc, cb) {
- cb();
- },
- });
-
- let finish = false;
- let ended = false;
-
- tr.on("end", () => {
- ended = true;
- });
-
- tr.on("finish", () => {
- finish = true;
- });
-
- finished(tr, (err) => {
- finishedExecuted++;
- if (finishedExecuted === finishedExecutedExpected) {
- finishedExecution.resolve();
- }
- assert(!err, "no error");
- assert(finish);
- assert(ended);
- });
-
- tr.end();
- tr.resume();
-
- const finishedTimeout = setTimeout(
- () => finishedExecution.reject(),
- 1000,
- );
- await finishedExecution;
- clearTimeout(finishedTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
-});
-
-Deno.test("Transform stream flushes data correctly", () => {
- const expected = "asdf";
-
- const t = new Transform({
- transform: (_d, _e, n) => {
- n();
- },
- flush: (n) => {
- n(null, expected);
- },
- });
-
- t.end(Buffer.from("blerg"));
- t.on("data", (data) => {
- assertEquals(data.toString(), expected);
- });
-});
diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts
deleted file mode 100644
index 534fc22fb..000000000
--- a/std/node/_stream/writable.ts
+++ /dev/null
@@ -1,443 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Stream from "./stream.ts";
-import { captureRejectionSymbol } from "../events.ts";
-import {
- ERR_INVALID_ARG_TYPE,
- ERR_INVALID_OPT_VALUE,
- ERR_METHOD_NOT_IMPLEMENTED,
- ERR_STREAM_ALREADY_FINISHED,
- ERR_STREAM_CANNOT_PIPE,
- ERR_STREAM_DESTROYED,
- ERR_STREAM_NULL_VALUES,
- ERR_STREAM_WRITE_AFTER_END,
- ERR_UNKNOWN_ENCODING,
-} from "../_errors.ts";
-import type { AfterWriteTick, writeV } from "./writable_internal.ts";
-import {
- clearBuffer,
- destroy,
- errorBuffer,
- errorOrDestroy,
- finishMaybe,
- kOnFinished,
- nop,
- onwrite,
- resetBuffer,
- writeOrBuffer,
-} from "./writable_internal.ts";
-import type { Encodings } from "../_utils.ts";
-
-type WritableEncodings = Encodings | "buffer";
-
-export interface WritableOptions {
- autoDestroy?: boolean;
- decodeStrings?: boolean;
- defaultEncoding?: WritableEncodings;
- destroy?(
- this: Writable,
- error: Error | null,
- callback: (error: Error | null) => void,
- ): void;
- emitClose?: boolean;
- final?(this: Writable, callback: (error?: Error | null) => void): void;
- highWaterMark?: number;
- objectMode?: boolean;
- write?(
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: WritableEncodings,
- callback: (error?: Error | null) => void,
- ): void;
- writev?(
- this: Writable,
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: string }>,
- callback: (error?: Error | null) => void,
- ): void;
-}
-
-export class WritableState {
- [kOnFinished]: Array<(error?: Error) => void> = [];
- afterWriteTickInfo: null | AfterWriteTick = null;
- allBuffers = true;
- allNoop = true;
- autoDestroy: boolean;
- buffered: Array<{
- allBuffers?: boolean;
- // deno-lint-ignore no-explicit-any
- chunk: any;
- encoding: string;
- callback: (error: Error) => void;
- }> = [];
- bufferedIndex = 0;
- bufferProcessing = false;
- closed = false;
- closeEmitted = false;
- constructed: boolean;
- corked = 0;
- decodeStrings: boolean;
- defaultEncoding: WritableEncodings;
- destroyed = false;
- emitClose: boolean;
- ended = false;
- ending = false;
- errored: Error | null = null;
- errorEmitted = false;
- finalCalled = false;
- finished = false;
- highWaterMark: number;
- length = 0;
- needDrain = false;
- objectMode: boolean;
- onwrite: (error?: Error | null) => void;
- pendingcb = 0;
- prefinished = false;
- sync = true;
- writecb: null | ((error: Error) => void) = null;
- writable = true;
- writelen = 0;
- writing = false;
-
- constructor(options: WritableOptions | undefined, stream: Writable) {
- this.objectMode = !!options?.objectMode;
-
- this.highWaterMark = options?.highWaterMark ??
- (this.objectMode ? 16 : 16 * 1024);
-
- if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) {
- this.highWaterMark = Math.floor(this.highWaterMark);
- } else {
- throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark);
- }
-
- this.decodeStrings = !options?.decodeStrings === false;
-
- this.defaultEncoding = options?.defaultEncoding || "utf8";
-
- this.onwrite = onwrite.bind(undefined, stream);
-
- resetBuffer(this);
-
- this.emitClose = options?.emitClose ?? true;
- this.autoDestroy = options?.autoDestroy ?? true;
- this.constructed = true;
- }
-
- getBuffer() {
- return this.buffered.slice(this.bufferedIndex);
- }
-
- get bufferedRequestCount() {
- return this.buffered.length - this.bufferedIndex;
- }
-}
-
-/** A bit simpler than readable streams.
-* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all
-* the drain event emission and buffering.
-*/
-class Writable extends Stream {
- _final?: (
- this: Writable,
- callback: (error?: Error | null | undefined) => void,
- ) => void;
- _writableState: WritableState;
- _writev?: writeV | null = null;
-
- constructor(options?: WritableOptions) {
- super();
- this._writableState = new WritableState(options, this);
-
- if (options) {
- if (typeof options.write === "function") {
- this._write = options.write;
- }
-
- if (typeof options.writev === "function") {
- this._writev = options.writev;
- }
-
- if (typeof options.destroy === "function") {
- this._destroy = options.destroy;
- }
-
- if (typeof options.final === "function") {
- this._final = options.final;
- }
- }
- }
-
- [captureRejectionSymbol](err?: Error) {
- this.destroy(err);
- }
-
- static WritableState = WritableState;
-
- get destroyed() {
- return this._writableState ? this._writableState.destroyed : false;
- }
-
- set destroyed(value) {
- if (this._writableState) {
- this._writableState.destroyed = value;
- }
- }
-
- get writable() {
- const w = this._writableState;
- return !w.destroyed && !w.errored && !w.ending && !w.ended;
- }
-
- set writable(val) {
- if (this._writableState) {
- this._writableState.writable = !!val;
- }
- }
-
- get writableFinished() {
- return this._writableState ? this._writableState.finished : false;
- }
-
- get writableObjectMode() {
- return this._writableState ? this._writableState.objectMode : false;
- }
-
- get writableBuffer() {
- return this._writableState && this._writableState.getBuffer();
- }
-
- get writableEnded() {
- return this._writableState ? this._writableState.ending : false;
- }
-
- get writableHighWaterMark() {
- return this._writableState && this._writableState.highWaterMark;
- }
-
- get writableCorked() {
- return this._writableState ? this._writableState.corked : 0;
- }
-
- get writableLength() {
- return this._writableState && this._writableState.length;
- }
-
- _undestroy() {
- const w = this._writableState;
- w.constructed = true;
- w.destroyed = false;
- w.closed = false;
- w.closeEmitted = false;
- w.errored = null;
- w.errorEmitted = false;
- w.ended = false;
- w.ending = false;
- w.finalCalled = false;
- w.prefinished = false;
- w.finished = false;
- }
-
- _destroy(err: Error | null, cb: (error?: Error | null) => void) {
- cb(err);
- }
-
- destroy(err?: Error | null, cb?: () => void) {
- const state = this._writableState;
- if (!state.destroyed) {
- queueMicrotask(() => errorBuffer(state));
- }
- destroy.call(this, err, cb);
- return this;
- }
-
- end(cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, cb?: () => void): void;
- // deno-lint-ignore no-explicit-any
- end(chunk: any, encoding: WritableEncodings, cb?: () => void): void;
-
- end(
- // deno-lint-ignore no-explicit-any
- x?: any | (() => void),
- y?: WritableEncodings | (() => void),
- z?: () => void,
- ) {
- const state = this._writableState;
- // deno-lint-ignore no-explicit-any
- let chunk: any | null;
- let encoding: WritableEncodings | null;
- let cb: undefined | ((error?: Error) => void);
-
- if (typeof x === "function") {
- chunk = null;
- encoding = null;
- cb = x;
- } else if (typeof y === "function") {
- chunk = x;
- encoding = null;
- cb = y;
- } else {
- chunk = x;
- encoding = y as WritableEncodings;
- cb = z;
- }
-
- if (chunk !== null && chunk !== undefined) {
- this.write(chunk, encoding);
- }
-
- if (state.corked) {
- state.corked = 1;
- this.uncork();
- }
-
- let err: Error | undefined;
- if (!state.errored && !state.ending) {
- state.ending = true;
- finishMaybe(this, state, true);
- state.ended = true;
- } else if (state.finished) {
- err = new ERR_STREAM_ALREADY_FINISHED("end");
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("end");
- }
-
- if (typeof cb === "function") {
- if (err || state.finished) {
- queueMicrotask(() => {
- (cb as (error?: Error | undefined) => void)(err);
- });
- } else {
- state[kOnFinished].push(cb);
- }
- }
-
- return this;
- }
-
- _write(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- cb: (error?: Error | null) => void,
- ): void {
- if (this._writev) {
- this._writev([{ chunk, encoding }], cb);
- } else {
- throw new ERR_METHOD_NOT_IMPLEMENTED("_write()");
- }
- }
-
- //This signature was changed to keep inheritance coherent
- pipe(dest: Writable): Writable {
- errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
- return dest;
- }
-
- // deno-lint-ignore no-explicit-any
- write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean;
- write(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: WritableEncodings | null,
- cb?: (error: Error | null | undefined) => void,
- ): boolean;
-
- write(
- // deno-lint-ignore no-explicit-any
- chunk: any,
- x?: WritableEncodings | null | ((error: Error | null | undefined) => void),
- y?: ((error: Error | null | undefined) => void),
- ) {
- const state = this._writableState;
- let encoding: WritableEncodings;
- let cb: (error?: Error | null) => void;
-
- if (typeof x === "function") {
- cb = x;
- encoding = state.defaultEncoding;
- } else {
- if (!x) {
- encoding = state.defaultEncoding;
- } else if (x !== "buffer" && !Buffer.isEncoding(x)) {
- throw new ERR_UNKNOWN_ENCODING(x);
- } else {
- encoding = x;
- }
- if (typeof y !== "function") {
- cb = nop;
- } else {
- cb = y;
- }
- }
-
- if (chunk === null) {
- throw new ERR_STREAM_NULL_VALUES();
- } else if (!state.objectMode) {
- if (typeof chunk === "string") {
- if (state.decodeStrings !== false) {
- chunk = Buffer.from(chunk, encoding);
- encoding = "buffer";
- }
- } else if (chunk instanceof Buffer) {
- encoding = "buffer";
- } else if (Stream._isUint8Array(chunk)) {
- chunk = Stream._uint8ArrayToBuffer(chunk);
- encoding = "buffer";
- } else {
- throw new ERR_INVALID_ARG_TYPE(
- "chunk",
- ["string", "Buffer", "Uint8Array"],
- chunk,
- );
- }
- }
-
- let err: Error | undefined;
- if (state.ending) {
- err = new ERR_STREAM_WRITE_AFTER_END();
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED("write");
- }
-
- if (err) {
- queueMicrotask(() => cb(err));
- errorOrDestroy(this, err, true);
- return false;
- }
- state.pendingcb++;
- return writeOrBuffer(this, state, chunk, encoding, cb);
- }
-
- cork() {
- this._writableState.corked++;
- }
-
- uncork() {
- const state = this._writableState;
-
- if (state.corked) {
- state.corked--;
-
- if (!state.writing) {
- clearBuffer(this, state);
- }
- }
- }
-
- setDefaultEncoding(encoding: string) {
- // node::ParseEncoding() requires lower case.
- if (typeof encoding === "string") {
- encoding = encoding.toLowerCase();
- }
- if (!Buffer.isEncoding(encoding)) {
- throw new ERR_UNKNOWN_ENCODING(encoding);
- }
- this._writableState.defaultEncoding = encoding as WritableEncodings;
- return this;
- }
-}
-
-export default Writable;
diff --git a/std/node/_stream/writable_internal.ts b/std/node/_stream/writable_internal.ts
deleted file mode 100644
index e8c001af0..000000000
--- a/std/node/_stream/writable_internal.ts
+++ /dev/null
@@ -1,457 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type Duplex from "./duplex.ts";
-import type Writable from "./writable.ts";
-import type { WritableState } from "./writable.ts";
-import { kDestroy } from "./symbols.ts";
-import { ERR_MULTIPLE_CALLBACK, ERR_STREAM_DESTROYED } from "../_errors.ts";
-
-export type writeV = (
- // deno-lint-ignore no-explicit-any
- chunks: Array<{ chunk: any; encoding: string }>,
- callback: (error?: Error | null) => void,
-) => void;
-
-export type AfterWriteTick = {
- cb: (error?: Error) => void;
- count: number;
- state: WritableState;
- stream: Writable;
-};
-
-export const kOnFinished = Symbol("kOnFinished");
-
-function _destroy(
- self: Writable,
- err?: Error | null,
- cb?: (error?: Error | null) => void,
-) {
- self._destroy(err || null, (err) => {
- const w = self._writableState;
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- }
-
- w.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- if (!w.errorEmitted) {
- w.errorEmitted = true;
- self.emit("error", err);
- }
- w.closeEmitted = true;
- if (w.emitClose) {
- self.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- w.closeEmitted = true;
- if (w.emitClose) {
- self.emit("close");
- }
- });
- }
- });
-}
-
-export function afterWrite(
- stream: Writable,
- state: WritableState,
- count: number,
- cb: (error?: Error) => void,
-) {
- const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
- state.needDrain;
- if (needDrain) {
- state.needDrain = false;
- stream.emit("drain");
- }
-
- while (count-- > 0) {
- state.pendingcb--;
- cb();
- }
-
- if (state.destroyed) {
- errorBuffer(state);
- }
-
- finishMaybe(stream, state);
-}
-
-export function afterWriteTick({
- cb,
- count,
- state,
- stream,
-}: AfterWriteTick) {
- state.afterWriteTickInfo = null;
- return afterWrite(stream, state, count, cb);
-}
-
-/** If there's something in the buffer waiting, then process it.*/
-export function clearBuffer(stream: Duplex | Writable, state: WritableState) {
- if (
- state.corked ||
- state.bufferProcessing ||
- state.destroyed ||
- !state.constructed
- ) {
- return;
- }
-
- const { buffered, bufferedIndex, objectMode } = state;
- const bufferedLength = buffered.length - bufferedIndex;
-
- if (!bufferedLength) {
- return;
- }
-
- const i = bufferedIndex;
-
- state.bufferProcessing = true;
- if (bufferedLength > 1 && stream._writev) {
- state.pendingcb -= bufferedLength - 1;
-
- const callback = state.allNoop ? nop : (err: Error) => {
- for (let n = i; n < buffered.length; ++n) {
- buffered[n].callback(err);
- }
- };
- const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
-
- doWrite(stream, state, true, state.length, chunks, "", callback);
-
- resetBuffer(state);
- } else {
- do {
- const { chunk, encoding, callback } = buffered[i];
- const len = objectMode ? 1 : chunk.length;
- doWrite(stream, state, false, len, chunk, encoding, callback);
- } while (i < buffered.length && !state.writing);
-
- if (i === buffered.length) {
- resetBuffer(state);
- } else if (i > 256) {
- buffered.splice(0, i);
- state.bufferedIndex = 0;
- } else {
- state.bufferedIndex = i;
- }
- }
- state.bufferProcessing = false;
-}
-
-export function destroy(this: Writable, err?: Error | null, cb?: () => void) {
- const w = this._writableState;
-
- if (w.destroyed) {
- if (typeof cb === "function") {
- cb();
- }
-
- return this;
- }
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- }
-
- w.destroyed = true;
-
- if (!w.constructed) {
- this.once(kDestroy, (er) => {
- _destroy(this, err || er, cb);
- });
- } else {
- _destroy(this, err, cb);
- }
-
- return this;
-}
-
-function doWrite(
- stream: Duplex | Writable,
- state: WritableState,
- writev: boolean,
- len: number,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- cb: (error: Error) => void,
-) {
- state.writelen = len;
- state.writecb = cb;
- state.writing = true;
- state.sync = true;
- if (state.destroyed) {
- state.onwrite(new ERR_STREAM_DESTROYED("write"));
- } else if (writev) {
- (stream._writev as unknown as writeV)(chunk, state.onwrite);
- } else {
- stream._write(chunk, encoding, state.onwrite);
- }
- state.sync = false;
-}
-
-/** If there's something in the buffer waiting, then invoke callbacks.*/
-export function errorBuffer(state: WritableState) {
- if (state.writing) {
- return;
- }
-
- for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
- const { chunk, callback } = state.buffered[n];
- const len = state.objectMode ? 1 : chunk.length;
- state.length -= len;
- callback(new ERR_STREAM_DESTROYED("write"));
- }
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback(new ERR_STREAM_DESTROYED("end"));
- }
-
- resetBuffer(state);
-}
-
-export function errorOrDestroy(stream: Writable, err: Error, sync = false) {
- const w = stream._writableState;
-
- if (w.destroyed) {
- return stream;
- }
-
- if (w.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!w.errored) {
- w.errored = err;
- }
- if (sync) {
- queueMicrotask(() => {
- if (w.errorEmitted) {
- return;
- }
- w.errorEmitted = true;
- stream.emit("error", err);
- });
- } else {
- if (w.errorEmitted) {
- return;
- }
- w.errorEmitted = true;
- stream.emit("error", err);
- }
- }
-}
-
-function finish(stream: Writable, state: WritableState) {
- state.pendingcb--;
- if (state.errorEmitted || state.closeEmitted) {
- return;
- }
-
- state.finished = true;
-
- for (const callback of state[kOnFinished].splice(0)) {
- callback();
- }
-
- stream.emit("finish");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
-}
-
-export function finishMaybe(
- stream: Writable,
- state: WritableState,
- sync?: boolean,
-) {
- if (needFinish(state)) {
- prefinish(stream, state);
- if (state.pendingcb === 0 && needFinish(state)) {
- state.pendingcb++;
- if (sync) {
- queueMicrotask(() => finish(stream, state));
- } else {
- finish(stream, state);
- }
- }
- }
-}
-
-export function needFinish(state: WritableState) {
- return (state.ending &&
- state.constructed &&
- state.length === 0 &&
- !state.errored &&
- state.buffered.length === 0 &&
- !state.finished &&
- !state.writing);
-}
-
-export function nop() {}
-
-export function resetBuffer(state: WritableState) {
- state.buffered = [];
- state.bufferedIndex = 0;
- state.allBuffers = true;
- state.allNoop = true;
-}
-
-function onwriteError(
- stream: Writable,
- state: WritableState,
- er: Error,
- cb: (error: Error) => void,
-) {
- --state.pendingcb;
-
- cb(er);
- errorBuffer(state);
- errorOrDestroy(stream, er);
-}
-
-export function onwrite(stream: Writable, er?: Error | null) {
- const state = stream._writableState;
- const sync = state.sync;
- const cb = state.writecb;
-
- if (typeof cb !== "function") {
- errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
- return;
- }
-
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
-
- if (er) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- er.stack;
-
- if (!state.errored) {
- state.errored = er;
- }
-
- if (sync) {
- queueMicrotask(() => onwriteError(stream, state, er, cb));
- } else {
- onwriteError(stream, state, er, cb);
- }
- } else {
- if (state.buffered.length > state.bufferedIndex) {
- clearBuffer(stream, state);
- }
-
- if (sync) {
- if (
- state.afterWriteTickInfo !== null &&
- state.afterWriteTickInfo.cb === cb
- ) {
- state.afterWriteTickInfo.count++;
- } else {
- state.afterWriteTickInfo = {
- count: 1,
- cb: (cb as (error?: Error) => void),
- stream,
- state,
- };
- queueMicrotask(() =>
- afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
- );
- }
- } else {
- afterWrite(stream, state, 1, cb as (error?: Error) => void);
- }
- }
-}
-
-export function prefinish(stream: Writable, state: WritableState) {
- if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === "function" && !state.destroyed) {
- state.finalCalled = true;
-
- state.sync = true;
- state.pendingcb++;
- stream._final((err) => {
- state.pendingcb--;
- if (err) {
- for (const callback of state[kOnFinished].splice(0)) {
- callback(err);
- }
- errorOrDestroy(stream, err, state.sync);
- } else if (needFinish(state)) {
- state.prefinished = true;
- stream.emit("prefinish");
- state.pendingcb++;
- queueMicrotask(() => finish(stream, state));
- }
- });
- state.sync = false;
- } else {
- state.prefinished = true;
- stream.emit("prefinish");
- }
- }
-}
-
-export function writeOrBuffer(
- stream: Duplex | Writable,
- state: WritableState,
- // deno-lint-ignore no-explicit-any
- chunk: any,
- encoding: string,
- callback: (error: Error) => void,
-) {
- const len = state.objectMode ? 1 : chunk.length;
-
- state.length += len;
-
- if (state.writing || state.corked || state.errored || !state.constructed) {
- state.buffered.push({ chunk, encoding, callback });
- if (state.allBuffers && encoding !== "buffer") {
- state.allBuffers = false;
- }
- if (state.allNoop && callback !== nop) {
- state.allNoop = false;
- }
- } else {
- state.writelen = len;
- state.writecb = callback;
- state.writing = true;
- state.sync = true;
- stream._write(chunk, encoding, state.onwrite);
- state.sync = false;
- }
-
- const ret = state.length < state.highWaterMark;
-
- if (!ret) {
- state.needDrain = true;
- }
-
- return ret && !state.errored && !state.destroyed;
-}
diff --git a/std/node/_stream/writable_test.ts b/std/node/_stream/writable_test.ts
deleted file mode 100644
index d6133b65f..000000000
--- a/std/node/_stream/writable_test.ts
+++ /dev/null
@@ -1,209 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import finished from "./end_of_stream.ts";
-import Writable from "../_stream/writable.ts";
-import { deferred } from "../../async/mod.ts";
-import {
- assert,
- assertEquals,
- assertStrictEquals,
- assertThrows,
-} from "../../testing/asserts.ts";
-
-Deno.test("Writable stream writes correctly", async () => {
- let callback: undefined | ((error?: Error | null | undefined) => void);
-
- let writeExecuted = 0;
- const writeExecutedExpected = 1;
- const writeExpectedExecutions = deferred();
-
- let writevExecuted = 0;
- const writevExecutedExpected = 1;
- const writevExpectedExecutions = deferred();
-
- const writable = new Writable({
- write: (chunk, encoding, cb) => {
- writeExecuted++;
- if (writeExecuted == writeExecutedExpected) {
- writeExpectedExecutions.resolve();
- }
- assert(chunk instanceof Buffer);
- assertStrictEquals(encoding, "buffer");
- assertStrictEquals(String(chunk), "ABC");
- callback = cb;
- },
- writev: (chunks) => {
- writevExecuted++;
- if (writevExecuted == writevExecutedExpected) {
- writevExpectedExecutions.resolve();
- }
- assertStrictEquals(chunks.length, 2);
- assertStrictEquals(chunks[0].encoding, "buffer");
- assertStrictEquals(chunks[1].encoding, "buffer");
- assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI");
- },
- });
-
- writable.write(new TextEncoder().encode("ABC"));
- writable.write(new TextEncoder().encode("DEF"));
- writable.end(new TextEncoder().encode("GHI"));
- callback?.();
-
- const writeTimeout = setTimeout(
- () => writeExpectedExecutions.reject(),
- 1000,
- );
- const writevTimeout = setTimeout(
- () => writevExpectedExecutions.reject(),
- 1000,
- );
- await writeExpectedExecutions;
- await writevExpectedExecutions;
- clearTimeout(writeTimeout);
- clearTimeout(writevTimeout);
- assertEquals(writeExecuted, writeExecutedExpected);
- assertEquals(writevExecuted, writevExecutedExpected);
-});
-
-Deno.test("Writable stream writes Uint8Array in object mode", async () => {
- let writeExecuted = 0;
- const writeExecutedExpected = 1;
- const writeExpectedExecutions = deferred();
-
- const ABC = new TextEncoder().encode("ABC");
-
- const writable = new Writable({
- objectMode: true,
- write: (chunk, encoding, cb) => {
- writeExecuted++;
- if (writeExecuted == writeExecutedExpected) {
- writeExpectedExecutions.resolve();
- }
- assert(!(chunk instanceof Buffer));
- assert(chunk instanceof Uint8Array);
- assertEquals(chunk, ABC);
- assertEquals(encoding, "utf8");
- cb();
- },
- });
-
- writable.end(ABC);
-
- const writeTimeout = setTimeout(
- () => writeExpectedExecutions.reject(),
- 1000,
- );
- await writeExpectedExecutions;
- clearTimeout(writeTimeout);
- assertEquals(writeExecuted, writeExecutedExpected);
-});
-
-Deno.test("Writable stream throws on unexpected close", async () => {
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const writable = new Writable({
- write: () => {},
- });
- writable.writable = false;
- writable.destroy();
-
- finished(writable, (err) => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE");
- });
-
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- clearTimeout(finishedTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
-});
-
-Deno.test("Writable stream finishes correctly", async () => {
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const w = new Writable({
- write(_chunk, _encoding, cb) {
- cb();
- },
- autoDestroy: false,
- });
-
- w.end("asd");
-
- queueMicrotask(() => {
- finished(w, () => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- });
- });
-
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- clearTimeout(finishedTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
-});
-
-Deno.test("Writable stream finishes correctly after error", async () => {
- let errorExecuted = 0;
- const errorExecutedExpected = 1;
- const errorExpectedExecutions = deferred();
-
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const w = new Writable({
- write(_chunk, _encoding, cb) {
- cb(new Error());
- },
- autoDestroy: false,
- });
- w.write("asd");
- w.on("error", () => {
- errorExecuted++;
- if (errorExecuted == errorExecutedExpected) {
- errorExpectedExecutions.resolve();
- }
- finished(w, () => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- });
- });
-
- const errorTimeout = setTimeout(
- () => errorExpectedExecutions.reject(),
- 1000,
- );
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- await errorExpectedExecutions;
- clearTimeout(finishedTimeout);
- clearTimeout(errorTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
- assertEquals(errorExecuted, errorExecutedExpected);
-});
-
-Deno.test("Writable stream fails on 'write' null value", () => {
- const writable = new Writable();
- assertThrows(() => writable.write(null));
-});