summaryrefslogtreecommitdiff
path: root/std/node/_stream
diff options
context:
space:
mode:
authorSteven Guerrero <stephenguerrero43@gmail.com>2020-11-21 16:13:18 -0500
committerGitHub <noreply@github.com>2020-11-21 16:13:18 -0500
commita4f27c4d570ad9b47bbd560fbf9b017f852fc29f (patch)
tree84b3b0111fca262932aa2be7f1ef884fc6d5ddc3 /std/node/_stream
parentce890f2ae7e8b557211e8f529180d30dc44ea7b5 (diff)
feat(std/node): Add Readable Stream / Writable Stream / errors support (#7569)
Diffstat (limited to 'std/node/_stream')
-rw-r--r--std/node/_stream/async_iterator.ts260
-rw-r--r--std/node/_stream/async_iterator_test.ts249
-rw-r--r--std/node/_stream/buffer_list.ts183
-rw-r--r--std/node/_stream/duplex.ts3
-rw-r--r--std/node/_stream/end-of-stream.ts240
-rw-r--r--std/node/_stream/from.ts102
-rw-r--r--std/node/_stream/readable.ts1248
-rw-r--r--std/node/_stream/readable_test.ts489
-rw-r--r--std/node/_stream/stream.ts79
-rw-r--r--std/node/_stream/symbols.ts4
-rw-r--r--std/node/_stream/writable.ts952
-rw-r--r--std/node/_stream/writable_test.ts209
12 files changed, 4018 insertions, 0 deletions
diff --git a/std/node/_stream/async_iterator.ts b/std/node/_stream/async_iterator.ts
new file mode 100644
index 000000000..cd1b6db3c
--- /dev/null
+++ b/std/node/_stream/async_iterator.ts
@@ -0,0 +1,260 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import type { Buffer } from "../buffer.ts";
+import finished from "./end-of-stream.ts";
+import Readable from "./readable.ts";
+import type Stream from "./stream.ts";
+
+const kLastResolve = Symbol("lastResolve");
+const kLastReject = Symbol("lastReject");
+const kError = Symbol("error");
+const kEnded = Symbol("ended");
+const kLastPromise = Symbol("lastPromise");
+const kHandlePromise = Symbol("handlePromise");
+const kStream = Symbol("stream");
+
+// TODO(Soremwar)
+// Add Duplex streams
+type IterableStreams = Stream | Readable;
+
+type IterableItem = Buffer | string | Uint8Array | undefined;
+type ReadableIteratorResult = IteratorResult<IterableItem>;
+
+function initIteratorSymbols(
+ o: ReadableStreamAsyncIterator,
+ symbols: symbol[],
+) {
+ const properties: PropertyDescriptorMap = {};
+ for (const sym in symbols) {
+ properties[sym] = {
+ configurable: false,
+ enumerable: false,
+ writable: true,
+ };
+ }
+ Object.defineProperties(o, properties);
+}
+
+// TODO(Soremwar)
+// Bring back once requests are implemented
+// function isRequest(stream: any) {
+// return stream && stream.setHeader && typeof stream.abort === "function";
+// }
+
+//TODO(Soremwar)
+//Should be any implementation of stream
+// deno-lint-ignore no-explicit-any
+function destroyer(stream: any, err?: Error | null) {
+ // TODO(Soremwar)
+ // Bring back once requests are implemented
+ // if (isRequest(stream)) return stream.abort();
+ // if (isRequest(stream.req)) return stream.req.abort();
+ if (typeof stream.destroy === "function") return stream.destroy(err);
+ if (typeof stream.close === "function") return stream.close();
+}
+
+function createIterResult(
+ value: IterableItem,
+ done: boolean,
+): ReadableIteratorResult {
+ return { value, done };
+}
+
+function readAndResolve(iter: ReadableStreamAsyncIterator) {
+ const resolve = iter[kLastResolve];
+ if (resolve !== null) {
+ const data = iter[kStream].read();
+ if (data !== null) {
+ iter[kLastPromise] = null;
+ iter[kLastResolve] = null;
+ iter[kLastReject] = null;
+ resolve(createIterResult(data, false));
+ }
+ }
+}
+
+function onReadable(iter: ReadableStreamAsyncIterator) {
+ queueMicrotask(() => readAndResolve(iter));
+}
+
+function wrapForNext(
+ lastPromise: Promise<ReadableIteratorResult>,
+ iter: ReadableStreamAsyncIterator,
+) {
+ return (
+ resolve: (value: ReadableIteratorResult) => void,
+ reject: (error: Error) => void,
+ ) => {
+ lastPromise.then(() => {
+ if (iter[kEnded]) {
+ resolve(createIterResult(undefined, true));
+ return;
+ }
+
+ iter[kHandlePromise](resolve, reject);
+ }, reject);
+ };
+}
+
+function finish(self: ReadableStreamAsyncIterator, err?: Error) {
+ return new Promise(
+ (
+ resolve: (result: ReadableIteratorResult) => void,
+ reject: (error: Error) => void,
+ ) => {
+ const stream = self[kStream];
+
+ finished(stream, (err) => {
+ if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
+ reject(err);
+ } else {
+ resolve(createIterResult(undefined, true));
+ }
+ });
+ destroyer(stream, err);
+ },
+ );
+}
+
+const AsyncIteratorPrototype = Object.getPrototypeOf(
+ Object.getPrototypeOf(async function* () {}).prototype,
+);
+
+class ReadableStreamAsyncIterator
+ implements AsyncIterableIterator<IterableItem> {
+ [kEnded]: boolean;
+ [kError]: Error | null = null;
+ [kHandlePromise] = (
+ resolve: (value: ReadableIteratorResult) => void,
+ reject: (value: Error) => void,
+ ) => {
+ const data = this[kStream].read();
+ if (data) {
+ this[kLastPromise] = null;
+ this[kLastResolve] = null;
+ this[kLastReject] = null;
+ resolve(createIterResult(data, false));
+ } else {
+ this[kLastResolve] = resolve;
+ this[kLastReject] = reject;
+ }
+ };
+ [kLastPromise]: null | Promise<ReadableIteratorResult>;
+ [kLastReject]: null | ((value: Error) => void) = null;
+ [kLastResolve]: null | ((value: ReadableIteratorResult) => void) = null;
+ [kStream]: Readable;
+ [Symbol.asyncIterator] = AsyncIteratorPrototype[Symbol.asyncIterator];
+
+ constructor(stream: Readable) {
+ this[kEnded] = stream.readableEnded || stream._readableState.endEmitted;
+ this[kStream] = stream;
+ initIteratorSymbols(this, [
+ kEnded,
+ kError,
+ kHandlePromise,
+ kLastPromise,
+ kLastReject,
+ kLastResolve,
+ kStream,
+ ]);
+ }
+
+ get stream() {
+ return this[kStream];
+ }
+
+ next(): Promise<ReadableIteratorResult> {
+ const error = this[kError];
+ if (error !== null) {
+ return Promise.reject(error);
+ }
+
+ if (this[kEnded]) {
+ return Promise.resolve(createIterResult(undefined, true));
+ }
+
+ if (this[kStream].destroyed) {
+ return new Promise((resolve, reject) => {
+ if (this[kError]) {
+ reject(this[kError]);
+ } else if (this[kEnded]) {
+ resolve(createIterResult(undefined, true));
+ } else {
+ finished(this[kStream], (err) => {
+ if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
+ reject(err);
+ } else {
+ resolve(createIterResult(undefined, true));
+ }
+ });
+ }
+ });
+ }
+
+ const lastPromise = this[kLastPromise];
+ let promise;
+
+ if (lastPromise) {
+ promise = new Promise(wrapForNext(lastPromise, this));
+ } else {
+ const data = this[kStream].read();
+ if (data !== null) {
+ return Promise.resolve(createIterResult(data, false));
+ }
+
+ promise = new Promise(this[kHandlePromise]);
+ }
+
+ this[kLastPromise] = promise;
+
+ return promise;
+ }
+
+ return(): Promise<ReadableIteratorResult> {
+ return finish(this);
+ }
+
+ throw(err: Error): Promise<ReadableIteratorResult> {
+ return finish(this, err);
+ }
+}
+
+const createReadableStreamAsyncIterator = (stream: IterableStreams) => {
+ // deno-lint-ignore no-explicit-any
+ if (typeof (stream as any).read !== "function") {
+ const src = stream;
+ stream = new Readable({ objectMode: true }).wrap(src);
+ finished(stream, (err) => destroyer(src, err));
+ }
+
+ const iterator = new ReadableStreamAsyncIterator(stream as Readable);
+ iterator[kLastPromise] = null;
+
+ finished(stream, { writable: false }, (err) => {
+ if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
+ const reject = iterator[kLastReject];
+ if (reject !== null) {
+ iterator[kLastPromise] = null;
+ iterator[kLastResolve] = null;
+ iterator[kLastReject] = null;
+ reject(err);
+ }
+ iterator[kError] = err;
+ return;
+ }
+
+ const resolve = iterator[kLastResolve];
+ if (resolve !== null) {
+ iterator[kLastPromise] = null;
+ iterator[kLastResolve] = null;
+ iterator[kLastReject] = null;
+ resolve(createIterResult(undefined, true));
+ }
+ iterator[kEnded] = true;
+ });
+
+ stream.on("readable", onReadable.bind(null, iterator));
+
+ return iterator;
+};
+
+export default createReadableStreamAsyncIterator;
diff --git a/std/node/_stream/async_iterator_test.ts b/std/node/_stream/async_iterator_test.ts
new file mode 100644
index 000000000..17698e0fd
--- /dev/null
+++ b/std/node/_stream/async_iterator_test.ts
@@ -0,0 +1,249 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import Readable from "./readable.ts";
+import Stream from "./stream.ts";
+import toReadableAsyncIterator from "./async_iterator.ts";
+import { deferred } from "../../async/mod.ts";
+import { assertEquals, assertThrowsAsync } from "../../testing/asserts.ts";
+
+Deno.test("Stream to async iterator", async () => {
+ let destroyExecuted = 0;
+ const destroyExecutedExpected = 1;
+ const destroyExpectedExecutions = deferred();
+
+ class AsyncIteratorStream extends Stream {
+ constructor() {
+ super();
+ }
+
+ destroy() {
+ destroyExecuted++;
+ if (destroyExecuted == destroyExecutedExpected) {
+ destroyExpectedExecutions.resolve();
+ }
+ }
+
+ [Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator];
+ }
+
+ const stream = new AsyncIteratorStream();
+
+ queueMicrotask(() => {
+ stream.emit("data", "hello");
+ stream.emit("data", "world");
+ stream.emit("end");
+ });
+
+ let res = "";
+
+ for await (const d of stream) {
+ res += d;
+ }
+ assertEquals(res, "helloworld");
+
+ const destroyTimeout = setTimeout(
+ () => destroyExpectedExecutions.reject(),
+ 1000,
+ );
+ await destroyExpectedExecutions;
+ clearTimeout(destroyTimeout);
+ assertEquals(destroyExecuted, destroyExecutedExpected);
+});
+
+Deno.test("Stream to async iterator throws on 'error' emitted", async () => {
+ let closeExecuted = 0;
+ const closeExecutedExpected = 1;
+ const closeExpectedExecutions = deferred();
+
+ let errorExecuted = 0;
+ const errorExecutedExpected = 1;
+ const errorExpectedExecutions = deferred();
+
+ class StreamImplementation extends Stream {
+ close() {
+ closeExecuted++;
+ if (closeExecuted == closeExecutedExpected) {
+ closeExpectedExecutions.resolve();
+ }
+ }
+ }
+
+ const stream = new StreamImplementation();
+ queueMicrotask(() => {
+ stream.emit("data", 0);
+ stream.emit("data", 1);
+ stream.emit("error", new Error("asd"));
+ });
+
+ toReadableAsyncIterator(stream)
+ .next()
+ .catch((err) => {
+ errorExecuted++;
+ if (errorExecuted == errorExecutedExpected) {
+ errorExpectedExecutions.resolve();
+ }
+ assertEquals(err.message, "asd");
+ });
+
+ const closeTimeout = setTimeout(
+ () => closeExpectedExecutions.reject(),
+ 1000,
+ );
+ const errorTimeout = setTimeout(
+ () => errorExpectedExecutions.reject(),
+ 1000,
+ );
+ await closeExpectedExecutions;
+ await errorExpectedExecutions;
+ clearTimeout(closeTimeout);
+ clearTimeout(errorTimeout);
+ assertEquals(closeExecuted, closeExecutedExpected);
+ assertEquals(errorExecuted, errorExecutedExpected);
+});
+
+Deno.test("Async iterator matches values of Readable", async () => {
+ const readable = new Readable({
+ objectMode: true,
+ read() {},
+ });
+ readable.push(0);
+ readable.push(1);
+ readable.push(null);
+
+ const iter = readable[Symbol.asyncIterator]();
+
+ assertEquals(
+ await iter.next().then(({ value }) => value),
+ 0,
+ );
+ for await (const d of iter) {
+ assertEquals(d, 1);
+ }
+});
+
+Deno.test("Async iterator throws on Readable destroyed sync", async () => {
+ const message = "kaboom from read";
+
+ const readable = new Readable({
+ objectMode: true,
+ read() {
+ this.destroy(new Error(message));
+ },
+ });
+
+ await assertThrowsAsync(
+ async () => {
+ // deno-lint-ignore no-empty
+ for await (const k of readable) {}
+ },
+ Error,
+ message,
+ );
+});
+
+Deno.test("Async iterator throws on Readable destroyed async", async () => {
+ const message = "kaboom";
+ const readable = new Readable({
+ read() {},
+ });
+ const iterator = readable[Symbol.asyncIterator]();
+
+ readable.destroy(new Error(message));
+
+ await assertThrowsAsync(
+ iterator.next.bind(iterator),
+ Error,
+ message,
+ );
+});
+
+Deno.test("Async iterator finishes the iterator when Readable destroyed", async () => {
+ const readable = new Readable({
+ read() {},
+ });
+
+ readable.destroy();
+
+ const { done } = await readable[Symbol.asyncIterator]().next();
+ assertEquals(done, true);
+});
+
+Deno.test("Async iterator finishes all item promises when Readable destroyed", async () => {
+ const r = new Readable({
+ objectMode: true,
+ read() {
+ },
+ });
+
+ const b = r[Symbol.asyncIterator]();
+ const c = b.next();
+ const d = b.next();
+ r.destroy();
+ assertEquals(await c, { done: true, value: undefined });
+ assertEquals(await d, { done: true, value: undefined });
+});
+
+Deno.test("Async iterator: 'next' is triggered by Readable push", async () => {
+ const max = 42;
+ let readed = 0;
+ let received = 0;
+ const readable = new Readable({
+ objectMode: true,
+ read() {
+ this.push("hello");
+ if (++readed === max) {
+ this.push(null);
+ }
+ },
+ });
+
+ for await (const k of readable) {
+ received++;
+ assertEquals(k, "hello");
+ }
+
+ assertEquals(readed, received);
+});
+
+Deno.test("Async iterator: 'close' called on forced iteration end", async () => {
+ let closeExecuted = 0;
+ const closeExecutedExpected = 1;
+ const closeExpectedExecutions = deferred();
+
+ class IndestructibleReadable extends Readable {
+ constructor() {
+ super({
+ autoDestroy: false,
+ read() {},
+ });
+ }
+
+ close() {
+ closeExecuted++;
+ if (closeExecuted == closeExecutedExpected) {
+ closeExpectedExecutions.resolve();
+ }
+ readable.emit("close");
+ }
+
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ destroy = null;
+ }
+
+ const readable = new IndestructibleReadable();
+ readable.push("asd");
+ readable.push("asd");
+
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
+ for await (const d of readable) {
+ break;
+ }
+
+ const closeTimeout = setTimeout(
+ () => closeExpectedExecutions.reject(),
+ 1000,
+ );
+ await closeExpectedExecutions;
+ clearTimeout(closeTimeout);
+ assertEquals(closeExecuted, closeExecutedExpected);
+});
diff --git a/std/node/_stream/buffer_list.ts b/std/node/_stream/buffer_list.ts
new file mode 100644
index 000000000..fe1a693c0
--- /dev/null
+++ b/std/node/_stream/buffer_list.ts
@@ -0,0 +1,183 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+
+type BufferListItem = {
+ data: Buffer | string | Uint8Array;
+ next: BufferListItem | null;
+};
+
+export default class BufferList {
+ head: BufferListItem | null = null;
+ tail: BufferListItem | null = null;
+ length: number;
+
+ constructor() {
+ this.head = null;
+ this.tail = null;
+ this.length = 0;
+ }
+
+ push(v: Buffer | string | Uint8Array) {
+ const entry = { data: v, next: null };
+ if (this.length > 0) {
+ (this.tail as BufferListItem).next = entry;
+ } else {
+ this.head = entry;
+ }
+ this.tail = entry;
+ ++this.length;
+ }
+
+ unshift(v: Buffer | string | Uint8Array) {
+ const entry = { data: v, next: this.head };
+ if (this.length === 0) {
+ this.tail = entry;
+ }
+ this.head = entry;
+ ++this.length;
+ }
+
+ shift() {
+ if (this.length === 0) {
+ return;
+ }
+ const ret = (this.head as BufferListItem).data;
+ if (this.length === 1) {
+ this.head = this.tail = null;
+ } else {
+ this.head = (this.head as BufferListItem).next;
+ }
+ --this.length;
+ return ret;
+ }
+
+ clear() {
+ this.head = this.tail = null;
+ this.length = 0;
+ }
+
+ join(s: string) {
+ if (this.length === 0) {
+ return "";
+ }
+ let p: BufferListItem | null = (this.head as BufferListItem);
+ let ret = "" + p.data;
+ p = p.next;
+ while (p) {
+ ret += s + p.data;
+ p = p.next;
+ }
+ return ret;
+ }
+
+ concat(n: number) {
+ if (this.length === 0) {
+ return Buffer.alloc(0);
+ }
+ const ret = Buffer.allocUnsafe(n >>> 0);
+ let p = this.head;
+ let i = 0;
+ while (p) {
+ ret.set(p.data as Buffer, i);
+ i += p.data.length;
+ p = p.next;
+ }
+ return ret;
+ }
+
+ // Consumes a specified amount of bytes or characters from the buffered data.
+ consume(n: number, hasStrings: boolean) {
+ const data = (this.head as BufferListItem).data;
+ if (n < data.length) {
+ // `slice` is the same for buffers and strings.
+ const slice = data.slice(0, n);
+ (this.head as BufferListItem).data = data.slice(n);
+ return slice;
+ }
+ if (n === data.length) {
+ // First chunk is a perfect match.
+ return this.shift();
+ }
+ // Result spans more than one buffer.
+ return hasStrings ? this._getString(n) : this._getBuffer(n);
+ }
+
+ first() {
+ return (this.head as BufferListItem).data;
+ }
+
+ *[Symbol.iterator]() {
+ for (let p = this.head; p; p = p.next) {
+ yield p.data;
+ }
+ }
+
+ // Consumes a specified amount of characters from the buffered data.
+ _getString(n: number) {
+ let ret = "";
+ let p: BufferListItem | null = (this.head as BufferListItem);
+ let c = 0;
+ p = p.next as BufferListItem;
+ do {
+ const str = p.data;
+ if (n > str.length) {
+ ret += str;
+ n -= str.length;
+ } else {
+ if (n === str.length) {
+ ret += str;
+ ++c;
+ if (p.next) {
+ this.head = p.next;
+ } else {
+ this.head = this.tail = null;
+ }
+ } else {
+ ret += str.slice(0, n);
+ this.head = p;
+ p.data = str.slice(n);
+ }
+ break;
+ }
+ ++c;
+ p = p.next;
+ } while (p);
+ this.length -= c;
+ return ret;
+ }
+
+ // Consumes a specified amount of bytes from the buffered data.
+ _getBuffer(n: number) {
+ const ret = Buffer.allocUnsafe(n);
+ const retLen = n;
+ let p: BufferListItem | null = (this.head as BufferListItem);
+ let c = 0;
+ p = p.next as BufferListItem;
+ do {
+ const buf = p.data as Buffer;
+ if (n > buf.length) {
+ ret.set(buf, retLen - n);
+ n -= buf.length;
+ } else {
+ if (n === buf.length) {
+ ret.set(buf, retLen - n);
+ ++c;
+ if (p.next) {
+ this.head = p.next;
+ } else {
+ this.head = this.tail = null;
+ }
+ } else {
+ ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n);
+ this.head = p;
+ p.data = buf.slice(n);
+ }
+ break;
+ }
+ ++c;
+ p = p.next;
+ } while (p);
+ this.length -= c;
+ return ret;
+ }
+}
diff --git a/std/node/_stream/duplex.ts b/std/node/_stream/duplex.ts
new file mode 100644
index 000000000..c5faed6f8
--- /dev/null
+++ b/std/node/_stream/duplex.ts
@@ -0,0 +1,3 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+// deno-lint-ignore no-explicit-any
+export const errorOrDestroy = (...args: any[]) => {};
diff --git a/std/node/_stream/end-of-stream.ts b/std/node/_stream/end-of-stream.ts
new file mode 100644
index 000000000..c42bb0e1c
--- /dev/null
+++ b/std/node/_stream/end-of-stream.ts
@@ -0,0 +1,240 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { once } from "../_utils.ts";
+import type Readable from "./readable.ts";
+import type Stream from "./stream.ts";
+import type { ReadableState } from "./readable.ts";
+import type Writable from "./writable.ts";
+import type { WritableState } from "./writable.ts";
+import {
+ ERR_INVALID_ARG_TYPE,
+ ERR_STREAM_PREMATURE_CLOSE,
+ NodeErrorAbstraction,
+} from "../_errors.ts";
+
+type StreamImplementations = Readable | Stream | Writable;
+
+// TODO(Soremwar)
+// Bring back once requests are implemented
+// function isRequest(stream: Stream) {
+// return stream.setHeader && typeof stream.abort === "function";
+// }
+
+// deno-lint-ignore no-explicit-any
+function isReadable(stream: any) {
+ return typeof stream.readable === "boolean" ||
+ typeof stream.readableEnded === "boolean" ||
+ !!stream._readableState;
+}
+
+// deno-lint-ignore no-explicit-any
+function isWritable(stream: any) {
+ return typeof stream.writable === "boolean" ||
+ typeof stream.writableEnded === "boolean" ||
+ !!stream._writableState;
+}
+
+function isWritableFinished(stream: Writable) {
+ if (stream.writableFinished) return true;
+ const wState = stream._writableState;
+ if (!wState || wState.errored) return false;
+ return wState.finished || (wState.ended && wState.length === 0);
+}
+
+function nop() {}
+
+function isReadableEnded(stream: Readable) {
+ if (stream.readableEnded) return true;
+ const rState = stream._readableState;
+ if (!rState || rState.errored) return false;
+ return rState.endEmitted || (rState.ended && rState.length === 0);
+}
+
+interface FinishedOptions {
+ error?: boolean;
+ readable?: boolean;
+ writable?: boolean;
+}
+
+/**
+ * Appends an ending callback triggered when a stream is no longer readable,
+ * writable or has experienced an error or a premature close event
+*/
+export default function eos(
+ stream: StreamImplementations,
+ options: FinishedOptions | null,
+ callback: (err?: NodeErrorAbstraction | null) => void,
+): () => void;
+export default function eos(
+ stream: StreamImplementations,
+ callback: (err?: NodeErrorAbstraction | null) => void,
+): () => void;
+export default function eos(
+ stream: StreamImplementations,
+ x: FinishedOptions | ((err?: NodeErrorAbstraction | null) => void) | null,
+ y?: (err?: NodeErrorAbstraction | null) => void,
+) {
+ let opts: FinishedOptions;
+ let callback: (err?: NodeErrorAbstraction | null) => void;
+
+ if (!y) {
+ if (typeof x !== "function") {
+ throw new ERR_INVALID_ARG_TYPE("callback", "function", x);
+ }
+ opts = {};
+ callback = x;
+ } else {
+ if (!x || Array.isArray(x) || typeof x !== "object") {
+ throw new ERR_INVALID_ARG_TYPE("opts", "object", x);
+ }
+ opts = x;
+
+ if (typeof y !== "function") {
+ throw new ERR_INVALID_ARG_TYPE("callback", "function", y);
+ }
+ callback = y;
+ }
+
+ callback = once(callback);
+
+ const readable = opts.readable ?? isReadable(stream);
+ const writable = opts.writable ?? isWritable(stream);
+
+ // deno-lint-ignore no-explicit-any
+ const wState: WritableState | undefined = (stream as any)._writableState;
+ // deno-lint-ignore no-explicit-any
+ const rState: ReadableState | undefined = (stream as any)._readableState;
+ const validState = wState || rState;
+
+ const onlegacyfinish = () => {
+ if (!(stream as Writable).writable) {
+ onfinish();
+ }
+ };
+
+ let willEmitClose = (
+ validState?.autoDestroy &&
+ validState?.emitClose &&
+ validState?.closed === false &&
+ isReadable(stream) === readable &&
+ isWritable(stream) === writable
+ );
+
+ let writableFinished = (stream as Writable).writableFinished ||
+ wState?.finished;
+ const onfinish = () => {
+ writableFinished = true;
+ // deno-lint-ignore no-explicit-any
+ if ((stream as any).destroyed) {
+ willEmitClose = false;
+ }
+
+ if (willEmitClose && (!(stream as Readable).readable || readable)) {
+ return;
+ }
+ if (!readable || readableEnded) {
+ callback.call(stream);
+ }
+ };
+
+ let readableEnded = (stream as Readable).readableEnded || rState?.endEmitted;
+ const onend = () => {
+ readableEnded = true;
+ // deno-lint-ignore no-explicit-any
+ if ((stream as any).destroyed) {
+ willEmitClose = false;
+ }
+
+ if (willEmitClose && (!(stream as Writable).writable || writable)) {
+ return;
+ }
+ if (!writable || writableFinished) {
+ callback.call(stream);
+ }
+ };
+
+ const onerror = (err: NodeErrorAbstraction) => {
+ callback.call(stream, err);
+ };
+
+ const onclose = () => {
+ if (readable && !readableEnded) {
+ if (!isReadableEnded(stream as Readable)) {
+ return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
+ }
+ }
+ if (writable && !writableFinished) {
+ if (!isWritableFinished(stream as Writable)) {
+ return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
+ }
+ }
+ callback.call(stream);
+ };
+
+ // TODO(Soremwar)
+ // Bring back once requests are implemented
+ // const onrequest = () => {
+ // stream.req.on("finish", onfinish);
+ // };
+
+ // TODO(Soremwar)
+ // Bring back once requests are implemented
+ // if (isRequest(stream)) {
+ // stream.on("complete", onfinish);
+ // stream.on("abort", onclose);
+ // if (stream.req) {
+ // onrequest();
+ // } else {
+ // stream.on("request", onrequest);
+ // }
+ // } else
+ if (writable && !wState) {
+ stream.on("end", onlegacyfinish);
+ stream.on("close", onlegacyfinish);
+ }
+
+ // TODO(Soremwar)
+ // Bring back once requests are implemented
+ // if (typeof stream.aborted === "boolean") {
+ // stream.on("aborted", onclose);
+ // }
+
+ stream.on("end", onend);
+ stream.on("finish", onfinish);
+ if (opts.error !== false) stream.on("error", onerror);
+ stream.on("close", onclose);
+
+ const closed = (
+ wState?.closed ||
+ rState?.closed ||
+ wState?.errorEmitted ||
+ rState?.errorEmitted ||
+ // TODO(Soremwar)
+ // Bring back once requests are implemented
+ // (rState && stream.req && stream.aborted) ||
+ (
+ (!writable || wState?.finished) &&
+ (!readable || rState?.endEmitted)
+ )
+ );
+
+ if (closed) {
+ queueMicrotask(callback);
+ }
+
+ return function () {
+ callback = nop;
+ stream.removeListener("aborted", onclose);
+ stream.removeListener("complete", onfinish);
+ stream.removeListener("abort", onclose);
+ // TODO(Soremwar)
+ // Bring back once requests are implemented
+ // stream.removeListener("request", onrequest);
+ // if (stream.req) stream.req.removeListener("finish", onfinish);
+ stream.removeListener("end", onlegacyfinish);
+ stream.removeListener("close", onlegacyfinish);
+ stream.removeListener("finish", onfinish);
+ stream.removeListener("end", onend);
+ stream.removeListener("error", onerror);
+ stream.removeListener("close", onclose);
+ };
+}
diff --git a/std/node/_stream/from.ts b/std/node/_stream/from.ts
new file mode 100644
index 000000000..652c17715
--- /dev/null
+++ b/std/node/_stream/from.ts
@@ -0,0 +1,102 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import Readable from "./readable.ts";
+import type { ReadableOptions } from "./readable.ts";
+import { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } from "../_errors.ts";
+
+export default function from(
+ // deno-lint-ignore no-explicit-any
+ iterable: Iterable<any> | AsyncIterable<any>,
+ opts?: ReadableOptions,
+) {
+ let iterator:
+ // deno-lint-ignore no-explicit-any
+ | Iterator<any, any, undefined>
+ // deno-lint-ignore no-explicit-any
+ | AsyncIterator<any, any, undefined>;
+ if (typeof iterable === "string" || iterable instanceof Buffer) {
+ return new Readable({
+ objectMode: true,
+ ...opts,
+ read() {
+ this.push(iterable);
+ this.push(null);
+ },
+ });
+ }
+
+ if (Symbol.asyncIterator in iterable) {
+ // deno-lint-ignore no-explicit-any
+ iterator = (iterable as AsyncIterable<any>)[Symbol.asyncIterator]();
+ } else if (Symbol.iterator in iterable) {
+ // deno-lint-ignore no-explicit-any
+ iterator = (iterable as Iterable<any>)[Symbol.iterator]();
+ } else {
+ throw new ERR_INVALID_ARG_TYPE("iterable", ["Iterable"], iterable);
+ }
+
+ const readable = new Readable({
+ objectMode: true,
+ highWaterMark: 1,
+ ...opts,
+ });
+
+ // Reading boolean to protect against _read
+ // being called before last iteration completion.
+ let reading = false;
+
+ // needToClose boolean if iterator needs to be explicitly closed
+ let needToClose = false;
+
+ readable._read = function () {
+ if (!reading) {
+ reading = true;
+ next();
+ }
+ };
+
+ readable._destroy = function (error, cb) {
+ if (needToClose) {
+ needToClose = false;
+ close().then(
+ () => queueMicrotask(() => cb(error)),
+ (e) => queueMicrotask(() => cb(error || e)),
+ );
+ } else {
+ cb(error);
+ }
+ };
+
+ async function close() {
+ if (typeof iterator.return === "function") {
+ const { value } = await iterator.return();
+ await value;
+ }
+ }
+
+ async function next() {
+ try {
+ needToClose = false;
+ const { value, done } = await iterator.next();
+ needToClose = !done;
+ if (done) {
+ readable.push(null);
+ } else if (readable.destroyed) {
+ await close();
+ } else {
+ const res = await value;
+ if (res === null) {
+ reading = false;
+ throw new ERR_STREAM_NULL_VALUES();
+ } else if (readable.push(res)) {
+ next();
+ } else {
+ reading = false;
+ }
+ }
+ } catch (err) {
+ readable.destroy(err);
+ }
+ }
+ return readable;
+}
diff --git a/std/node/_stream/readable.ts b/std/node/_stream/readable.ts
new file mode 100644
index 000000000..72e61dff7
--- /dev/null
+++ b/std/node/_stream/readable.ts
@@ -0,0 +1,1248 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import EventEmitter, { captureRejectionSymbol } from "../events.ts";
+import Stream from "./stream.ts";
+import { Buffer } from "../buffer.ts";
+import BufferList from "./buffer_list.ts";
+import {
+ ERR_INVALID_OPT_VALUE,
+ ERR_METHOD_NOT_IMPLEMENTED,
+ ERR_MULTIPLE_CALLBACK,
+ ERR_STREAM_PUSH_AFTER_EOF,
+ ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
+} from "../_errors.ts";
+import { StringDecoder } from "../string_decoder.ts";
+import createReadableStreamAsyncIterator from "./async_iterator.ts";
+import streamFrom from "./from.ts";
+import { kConstruct, kDestroy, kPaused } from "./symbols.ts";
+import type Writable from "./writable.ts";
+import { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts";
+
+function construct(stream: Readable, cb: () => void) {
+ const r = stream._readableState;
+
+ if (!stream._construct) {
+ return;
+ }
+
+ stream.once(kConstruct, cb);
+
+ r.constructed = false;
+
+ queueMicrotask(() => {
+ let called = false;
+ stream._construct?.((err?: Error) => {
+ r.constructed = true;
+
+ if (called) {
+ err = new ERR_MULTIPLE_CALLBACK();
+ } else {
+ called = true;
+ }
+
+ if (r.destroyed) {
+ stream.emit(kDestroy, err);
+ } else if (err) {
+ errorOrDestroy(stream, err, true);
+ } else {
+ queueMicrotask(() => stream.emit(kConstruct));
+ }
+ });
+ });
+}
+
+function _destroy(
+ self: Readable,
+ err?: Error,
+ cb?: (error?: Error | null) => void,
+) {
+ self._destroy(err || null, (err) => {
+ const r = (self as Readable)._readableState;
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!r.errored) {
+ r.errored = err;
+ }
+ }
+
+ r.closed = true;
+
+ if (typeof cb === "function") {
+ cb(err);
+ }
+
+ if (err) {
+ queueMicrotask(() => {
+ if (!r.errorEmitted) {
+ r.errorEmitted = true;
+ self.emit("error", err);
+ }
+ r.closeEmitted = true;
+ if (r.emitClose) {
+ self.emit("close");
+ }
+ });
+ } else {
+ queueMicrotask(() => {
+ r.closeEmitted = true;
+ if (r.emitClose) {
+ self.emit("close");
+ }
+ });
+ }
+ });
+}
+
+function errorOrDestroy(stream: Readable, err: Error, sync = false) {
+ const r = stream._readableState;
+
+ if (r.destroyed) {
+ return stream;
+ }
+
+ if (r.autoDestroy) {
+ stream.destroy(err);
+ } else if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!r.errored) {
+ r.errored = err;
+ }
+ if (sync) {
+ queueMicrotask(() => {
+ if (!r.errorEmitted) {
+ r.errorEmitted = true;
+ stream.emit("error", err);
+ }
+ });
+ } else if (!r.errorEmitted) {
+ r.errorEmitted = true;
+ stream.emit("error", err);
+ }
+ }
+}
+
+function flow(stream: Readable) {
+ const state = stream._readableState;
+ while (state.flowing && stream.read() !== null);
+}
+
+function pipeOnDrain(src: Readable, dest: Writable) {
+ return function pipeOnDrainFunctionResult() {
+ const state = src._readableState;
+
+ if (state.awaitDrainWriters === dest) {
+ state.awaitDrainWriters = null;
+ } else if (state.multiAwaitDrain) {
+ (state.awaitDrainWriters as Set<Writable>).delete(dest);
+ }
+
+ if (
+ (!state.awaitDrainWriters ||
+ (state.awaitDrainWriters as Set<Writable>).size === 0) &&
+ src.listenerCount("data")
+ ) {
+ state.flowing = true;
+ flow(src);
+ }
+ };
+}
+
+function updateReadableListening(self: Readable) {
+ const state = self._readableState;
+ state.readableListening = self.listenerCount("readable") > 0;
+
+ if (state.resumeScheduled && state[kPaused] === false) {
+ // Flowing needs to be set to true now, otherwise
+ // the upcoming resume will not flow.
+ state.flowing = true;
+
+ // Crude way to check if we should resume.
+ } else if (self.listenerCount("data") > 0) {
+ self.resume();
+ } else if (!state.readableListening) {
+ state.flowing = null;
+ }
+}
+
+function nReadingNextTick(self: Readable) {
+ self.read(0);
+}
+
+function resume(stream: Readable, state: ReadableState) {
+ if (!state.resumeScheduled) {
+ state.resumeScheduled = true;
+ queueMicrotask(() => resume_(stream, state));
+ }
+}
+
+function resume_(stream: Readable, state: ReadableState) {
+ if (!state.reading) {
+ stream.read(0);
+ }
+
+ state.resumeScheduled = false;
+ stream.emit("resume");
+ flow(stream);
+ if (state.flowing && !state.reading) {
+ stream.read(0);
+ }
+}
+
+function readableAddChunk(
+ stream: Readable,
+ chunk: string | Buffer | Uint8Array | null,
+ encoding: undefined | string = undefined,
+ addToFront: boolean,
+) {
+ const state = stream._readableState;
+ let usedEncoding = encoding;
+
+ let err;
+ if (!state.objectMode) {
+ if (typeof chunk === "string") {
+ usedEncoding = encoding || state.defaultEncoding;
+ if (state.encoding !== usedEncoding) {
+ if (addToFront && state.encoding) {
+ chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
+ } else {
+ chunk = Buffer.from(chunk, usedEncoding);
+ usedEncoding = "";
+ }
+ }
+ } else if (chunk instanceof Uint8Array) {
+ chunk = Buffer.from(chunk);
+ }
+ }
+
+ if (err) {
+ errorOrDestroy(stream, err);
+ } else if (chunk === null) {
+ state.reading = false;
+ onEofChunk(stream, state);
+ } else if (state.objectMode || (chunk.length > 0)) {
+ if (addToFront) {
+ if (state.endEmitted) {
+ errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
+ } else {
+ addChunk(stream, state, chunk, true);
+ }
+ } else if (state.ended) {
+ errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
+ } else if (state.destroyed || state.errored) {
+ return false;
+ } else {
+ state.reading = false;
+ if (state.decoder && !usedEncoding) {
+ //TODO(Soremwar)
+ //I don't think this cast is right
+ chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
+ if (state.objectMode || chunk.length !== 0) {
+ addChunk(stream, state, chunk, false);
+ } else {
+ maybeReadMore(stream, state);
+ }
+ } else {
+ addChunk(stream, state, chunk, false);
+ }
+ }
+ } else if (!addToFront) {
+ state.reading = false;
+ maybeReadMore(stream, state);
+ }
+
+ return !state.ended &&
+ (state.length < state.highWaterMark || state.length === 0);
+}
+
+function addChunk(
+ stream: Readable,
+ state: ReadableState,
+ chunk: string | Buffer | Uint8Array,
+ addToFront: boolean,
+) {
+ if (state.flowing && state.length === 0 && !state.sync) {
+ if (state.multiAwaitDrain) {
+ (state.awaitDrainWriters as Set<Writable>).clear();
+ } else {
+ state.awaitDrainWriters = null;
+ }
+ stream.emit("data", chunk);
+ } else {
+ // Update the buffer info.
+ state.length += state.objectMode ? 1 : chunk.length;
+ if (addToFront) {
+ state.buffer.unshift(chunk);
+ } else {
+ state.buffer.push(chunk);
+ }
+
+ if (state.needReadable) {
+ emitReadable(stream);
+ }
+ }
+ maybeReadMore(stream, state);
+}
+
+function prependListener(
+ emitter: EventEmitter,
+ event: string,
+ // deno-lint-ignore no-explicit-any
+ fn: (...args: any[]) => any,
+) {
+ if (typeof emitter.prependListener === "function") {
+ return emitter.prependListener(event, fn);
+ }
+
+ // This is a hack to make sure that our error handler is attached before any
+ // userland ones. NEVER DO THIS. This is here only because this code needs
+ // to continue to work with older versions of Node.js that do not include
+ //the prependListener() method. The goal is to eventually remove this hack.
+ // TODO(Soremwar)
+ // Burn it with fire
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ if (emitter._events.get(event)?.length) {
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ const listeners = [fn, ...emitter._events.get(event)];
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ emitter._events.set(event, listeners);
+ } else {
+ emitter.on(event, fn);
+ }
+}
+
+/** Pluck off n bytes from an array of buffers.
+* Length is the combined lengths of all the buffers in the list.
+* This function is designed to be inlinable, so please take care when making
+* changes to the function body.
+*/
+function fromList(n: number, state: ReadableState) {
+ // nothing buffered.
+ if (state.length === 0) {
+ return null;
+ }
+
+ let ret;
+ if (state.objectMode) {
+ ret = state.buffer.shift();
+ } else if (!n || n >= state.length) {
+ if (state.decoder) {
+ ret = state.buffer.join("");
+ } else if (state.buffer.length === 1) {
+ ret = state.buffer.first();
+ } else {
+ ret = state.buffer.concat(state.length);
+ }
+ state.buffer.clear();
+ } else {
+ ret = state.buffer.consume(n, !!state.decoder);
+ }
+
+ return ret;
+}
+
+function endReadable(stream: Readable) {
+ const state = stream._readableState;
+
+ if (!state.endEmitted) {
+ state.ended = true;
+ queueMicrotask(() => endReadableNT(state, stream));
+ }
+}
+
+function endReadableNT(state: ReadableState, stream: Readable) {
+ if (
+ !state.errorEmitted && !state.closeEmitted &&
+ !state.endEmitted && state.length === 0
+ ) {
+ state.endEmitted = true;
+ stream.emit("end");
+
+ if (state.autoDestroy) {
+ stream.destroy();
+ }
+ }
+}
+
+// Don't raise the hwm > 1GB.
+const MAX_HWM = 0x40000000;
+function computeNewHighWaterMark(n: number) {
+ if (n >= MAX_HWM) {
+ n = MAX_HWM;
+ } else {
+ n--;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ n++;
+ }
+ return n;
+}
+
+function howMuchToRead(n: number, state: ReadableState) {
+ if (n <= 0 || (state.length === 0 && state.ended)) {
+ return 0;
+ }
+ if (state.objectMode) {
+ return 1;
+ }
+ if (Number.isNaN(n)) {
+ // Only flow one buffer at a time.
+ if (state.flowing && state.length) {
+ return state.buffer.first().length;
+ }
+ return state.length;
+ }
+ if (n <= state.length) {
+ return n;
+ }
+ return state.ended ? state.length : 0;
+}
+
+function onEofChunk(stream: Readable, state: ReadableState) {
+ if (state.ended) return;
+ if (state.decoder) {
+ const chunk = state.decoder.end();
+ if (chunk && chunk.length) {
+ state.buffer.push(chunk);
+ state.length += state.objectMode ? 1 : chunk.length;
+ }
+ }
+ state.ended = true;
+
+ if (state.sync) {
+ emitReadable(stream);
+ } else {
+ state.needReadable = false;
+ state.emittedReadable = true;
+ emitReadable_(stream);
+ }
+}
+
+function emitReadable(stream: Readable) {
+ const state = stream._readableState;
+ state.needReadable = false;
+ if (!state.emittedReadable) {
+ state.emittedReadable = true;
+ queueMicrotask(() => emitReadable_(stream));
+ }
+}
+
+function emitReadable_(stream: Readable) {
+ const state = stream._readableState;
+ if (!state.destroyed && !state.errored && (state.length || state.ended)) {
+ stream.emit("readable");
+ state.emittedReadable = false;
+ }
+
+ state.needReadable = !state.flowing &&
+ !state.ended &&
+ state.length <= state.highWaterMark;
+ flow(stream);
+}
+
+function maybeReadMore(stream: Readable, state: ReadableState) {
+ if (!state.readingMore && state.constructed) {
+ state.readingMore = true;
+ queueMicrotask(() => maybeReadMore_(stream, state));
+ }
+}
+
+function maybeReadMore_(stream: Readable, state: ReadableState) {
+ while (
+ !state.reading && !state.ended &&
+ (state.length < state.highWaterMark ||
+ (state.flowing && state.length === 0))
+ ) {
+ const len = state.length;
+ stream.read(0);
+ if (len === state.length) {
+ // Didn't get any data, stop spinning.
+ break;
+ }
+ }
+ state.readingMore = false;
+}
+
+export interface ReadableOptions {
+ autoDestroy?: boolean;
+ construct?: () => void;
+ //TODO(Soremwar)
+ //Import available encodings
+ defaultEncoding?: string;
+ destroy?(
+ this: Readable,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ): void;
+ emitClose?: boolean;
+ //TODO(Soremwar)
+ //Import available encodings
+ encoding?: string;
+ highWaterMark?: number;
+ objectMode?: boolean;
+ read?(this: Readable): void;
+}
+
+export class ReadableState {
+ [kPaused]: boolean | null = null;
+ awaitDrainWriters: Writable | Set<Writable> | null = null;
+ buffer = new BufferList();
+ closed = false;
+ closeEmitted = false;
+ constructed: boolean;
+ decoder: StringDecoder | null = null;
+ destroyed = false;
+ emittedReadable = false;
+ //TODO(Soremwar)
+ //Import available encodings
+ encoding: string | null = null;
+ ended = false;
+ endEmitted = false;
+ errored: Error | null = null;
+ errorEmitted = false;
+ flowing: boolean | null = null;
+ highWaterMark: number;
+ length = 0;
+ multiAwaitDrain = false;
+ needReadable = false;
+ objectMode: boolean;
+ pipes: Writable[] = [];
+ readable = true;
+ readableListening = false;
+ reading = false;
+ readingMore = false;
+ resumeScheduled = false;
+ sync = true;
+ emitClose: boolean;
+ autoDestroy: boolean;
+ defaultEncoding: string;
+
+ constructor(options?: ReadableOptions) {
+ this.objectMode = !!options?.objectMode;
+
+ this.highWaterMark = options?.highWaterMark ??
+ (this.objectMode ? 16 : 16 * 1024);
+ if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) {
+ this.highWaterMark = Math.floor(this.highWaterMark);
+ } else {
+ throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark);
+ }
+
+ this.emitClose = options?.emitClose ?? true;
+ this.autoDestroy = options?.autoDestroy ?? true;
+ this.defaultEncoding = options?.defaultEncoding || "utf8";
+
+ if (options?.encoding) {
+ this.decoder = new StringDecoder(options.encoding);
+ this.encoding = options.encoding;
+ }
+
+ this.constructed = true;
+ }
+}
+
+class Readable extends Stream {
+ _construct?: (cb: (error?: Error) => void) => void;
+ _readableState: ReadableState;
+
+ constructor(options?: ReadableOptions) {
+ super();
+ if (options) {
+ if (typeof options.read === "function") {
+ this._read = options.read;
+ }
+ if (typeof options.destroy === "function") {
+ this._destroy = options.destroy;
+ }
+ if (typeof options.construct === "function") {
+ this._construct = options.construct;
+ }
+ }
+ this._readableState = new ReadableState(options);
+
+ construct(this, () => {
+ maybeReadMore(this, this._readableState);
+ });
+ }
+
+ static from(
+ // deno-lint-ignore no-explicit-any
+ iterable: Iterable<any> | AsyncIterable<any>,
+ opts?: ReadableOptions,
+ ): Readable {
+ return streamFrom(iterable, opts);
+ }
+
+ static ReadableState = ReadableState;
+
+ static _fromList = fromList;
+
+ // You can override either this method, or the async _read(n) below.
+ read(n?: number) {
+ // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
+ // in this scenario, so we are doing it manually.
+ if (n === undefined) {
+ n = NaN;
+ }
+ const state = this._readableState;
+ const nOrig = n;
+
+ if (n > state.highWaterMark) {
+ state.highWaterMark = computeNewHighWaterMark(n);
+ }
+
+ if (n !== 0) {
+ state.emittedReadable = false;
+ }
+
+ if (
+ n === 0 &&
+ state.needReadable &&
+ ((state.highWaterMark !== 0
+ ? state.length >= state.highWaterMark
+ : state.length > 0) ||
+ state.ended)
+ ) {
+ if (state.length === 0 && state.ended) {
+ endReadable(this);
+ } else {
+ emitReadable(this);
+ }
+ return null;
+ }
+
+ n = howMuchToRead(n, state);
+
+ if (n === 0 && state.ended) {
+ if (state.length === 0) {
+ endReadable(this);
+ }
+ return null;
+ }
+
+ let doRead = state.needReadable;
+ if (
+ state.length === 0 || state.length - (n as number) < state.highWaterMark
+ ) {
+ doRead = true;
+ }
+
+ if (
+ state.ended || state.reading || state.destroyed || state.errored ||
+ !state.constructed
+ ) {
+ doRead = false;
+ } else if (doRead) {
+ state.reading = true;
+ state.sync = true;
+ if (state.length === 0) {
+ state.needReadable = true;
+ }
+ this._read();
+ state.sync = false;
+ if (!state.reading) {
+ n = howMuchToRead(nOrig, state);
+ }
+ }
+
+ let ret;
+ if ((n as number) > 0) {
+ ret = fromList((n as number), state);
+ } else {
+ ret = null;
+ }
+
+ if (ret === null) {
+ state.needReadable = state.length <= state.highWaterMark;
+ n = 0;
+ } else {
+ state.length -= n as number;
+ if (state.multiAwaitDrain) {
+ (state.awaitDrainWriters as Set<Writable>).clear();
+ } else {
+ state.awaitDrainWriters = null;
+ }
+ }
+
+ if (state.length === 0) {
+ if (!state.ended) {
+ state.needReadable = true;
+ }
+
+ if (nOrig !== n && state.ended) {
+ endReadable(this);
+ }
+ }
+
+ if (ret !== null) {
+ this.emit("data", ret);
+ }
+
+ return ret;
+ }
+
+ _read() {
+ throw new ERR_METHOD_NOT_IMPLEMENTED("_read()");
+ }
+
+ //TODO(Soremwar)
+ //Should be duplex
+ pipe<T extends Writable>(dest: T, pipeOpts?: { end?: boolean }): T {
+ // deno-lint-ignore no-this-alias
+ const src = this;
+ const state = this._readableState;
+
+ if (state.pipes.length === 1) {
+ if (!state.multiAwaitDrain) {
+ state.multiAwaitDrain = true;
+ state.awaitDrainWriters = new Set(
+ state.awaitDrainWriters ? [state.awaitDrainWriters as Writable] : [],
+ );
+ }
+ }
+
+ state.pipes.push(dest);
+
+ const doEnd = (!pipeOpts || pipeOpts.end !== false);
+
+ //TODO(Soremwar)
+ //Part of doEnd condition
+ //In node, output/inout are a duplex Stream
+ // &&
+ // dest !== stdout &&
+ // dest !== stderr
+
+ const endFn = doEnd ? onend : unpipe;
+ if (state.endEmitted) {
+ queueMicrotask(endFn);
+ } else {
+ this.once("end", endFn);
+ }
+
+ dest.on("unpipe", onunpipe);
+ function onunpipe(readable: Readable, unpipeInfo: { hasUnpiped: boolean }) {
+ if (readable === src) {
+ if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
+ unpipeInfo.hasUnpiped = true;
+ cleanup();
+ }
+ }
+ }
+
+ function onend() {
+ dest.end();
+ }
+
+ let ondrain: () => void;
+
+ let cleanedUp = false;
+ function cleanup() {
+ dest.removeListener("close", onclose);
+ dest.removeListener("finish", onfinish);
+ if (ondrain) {
+ dest.removeListener("drain", ondrain);
+ }
+ dest.removeListener("error", onerror);
+ dest.removeListener("unpipe", onunpipe);
+ src.removeListener("end", onend);
+ src.removeListener("end", unpipe);
+ src.removeListener("data", ondata);
+
+ cleanedUp = true;
+ if (
+ ondrain && state.awaitDrainWriters &&
+ (!dest._writableState || dest._writableState.needDrain)
+ ) {
+ ondrain();
+ }
+ }
+
+ this.on("data", ondata);
+ // deno-lint-ignore no-explicit-any
+ function ondata(chunk: any) {
+ const ret = dest.write(chunk);
+ if (ret === false) {
+ if (!cleanedUp) {
+ if (state.pipes.length === 1 && state.pipes[0] === dest) {
+ state.awaitDrainWriters = dest;
+ state.multiAwaitDrain = false;
+ } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
+ (state.awaitDrainWriters as Set<Writable>).add(dest);
+ }
+ src.pause();
+ }
+ if (!ondrain) {
+ ondrain = pipeOnDrain(src, dest);
+ dest.on("drain", ondrain);
+ }
+ }
+ }
+
+ function onerror(er: Error) {
+ unpipe();
+ dest.removeListener("error", onerror);
+ if (dest.listenerCount("error") === 0) {
+ //TODO(Soremwar)
+ //Should be const s = dest._writableState || dest._readableState;
+ const s = dest._writableState;
+ if (s && !s.errorEmitted) {
+ // User incorrectly emitted 'error' directly on the stream.
+ errorOrDestroyDuplex(dest, er);
+ } else {
+ dest.emit("error", er);
+ }
+ }
+ }
+
+ prependListener(dest, "error", onerror);
+
+ function onclose() {
+ dest.removeListener("finish", onfinish);
+ unpipe();
+ }
+ dest.once("close", onclose);
+ function onfinish() {
+ dest.removeListener("close", onclose);
+ unpipe();
+ }
+ dest.once("finish", onfinish);
+
+ function unpipe() {
+ src.unpipe(dest);
+ }
+
+ dest.emit("pipe", this);
+
+ if (!state.flowing) {
+ this.resume();
+ }
+
+ return dest;
+ }
+
+ isPaused() {
+ return this._readableState[kPaused] === true ||
+ this._readableState.flowing === false;
+ }
+
+ //TODO(Soremwar)
+ //Replace string with encoding types
+ setEncoding(enc: string) {
+ const decoder = new StringDecoder(enc);
+ this._readableState.decoder = decoder;
+ this._readableState.encoding = this._readableState.decoder.encoding;
+
+ const buffer = this._readableState.buffer;
+ let content = "";
+ for (const data of buffer) {
+ content += decoder.write(data as Buffer);
+ }
+ buffer.clear();
+ if (content !== "") {
+ buffer.push(content);
+ }
+ this._readableState.length = content.length;
+ return this;
+ }
+
+ on(
+ event: "close" | "end" | "pause" | "readable" | "resume",
+ listener: () => void,
+ ): this;
+ // deno-lint-ignore no-explicit-any
+ on(event: "data", listener: (chunk: any) => void): this;
+ on(event: "error", listener: (err: Error) => void): this;
+ // deno-lint-ignore no-explicit-any
+ on(event: string | symbol, listener: (...args: any[]) => void): this;
+ on(
+ ev: string | symbol,
+ fn:
+ | (() => void)
+ // deno-lint-ignore no-explicit-any
+ | ((chunk: any) => void)
+ | ((err: Error) => void)
+ // deno-lint-ignore no-explicit-any
+ | ((...args: any[]) => void),
+ ) {
+ const res = super.on.call(this, ev, fn);
+ const state = this._readableState;
+
+ if (ev === "data") {
+ state.readableListening = this.listenerCount("readable") > 0;
+
+ if (state.flowing !== false) {
+ this.resume();
+ }
+ } else if (ev === "readable") {
+ if (!state.endEmitted && !state.readableListening) {
+ state.readableListening = state.needReadable = true;
+ state.flowing = false;
+ state.emittedReadable = false;
+ if (state.length) {
+ emitReadable(this);
+ } else if (!state.reading) {
+ queueMicrotask(() => nReadingNextTick(this));
+ }
+ }
+ }
+
+ return res;
+ }
+
+ removeListener(
+ event: "close" | "end" | "pause" | "readable" | "resume",
+ listener: () => void,
+ ): this;
+ // deno-lint-ignore no-explicit-any
+ removeListener(event: "data", listener: (chunk: any) => void): this;
+ removeListener(event: "error", listener: (err: Error) => void): this;
+ removeListener(
+ event: string | symbol,
+ // deno-lint-ignore no-explicit-any
+ listener: (...args: any[]) => void,
+ ): this;
+ removeListener(
+ ev: string | symbol,
+ fn:
+ | (() => void)
+ // deno-lint-ignore no-explicit-any
+ | ((chunk: any) => void)
+ | ((err: Error) => void)
+ // deno-lint-ignore no-explicit-any
+ | ((...args: any[]) => void),
+ ) {
+ const res = super.removeListener.call(this, ev, fn);
+
+ if (ev === "readable") {
+ queueMicrotask(() => updateReadableListening(this));
+ }
+
+ return res;
+ }
+
+ off = this.removeListener;
+
+ destroy(err?: Error, cb?: () => void) {
+ const r = this._readableState;
+
+ if (r.destroyed) {
+ if (typeof cb === "function") {
+ cb();
+ }
+
+ return this;
+ }
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!r.errored) {
+ r.errored = err;
+ }
+ }
+
+ r.destroyed = true;
+
+ // If still constructing then defer calling _destroy.
+ if (!r.constructed) {
+ this.once(kDestroy, (er: Error) => {
+ _destroy(this, err || er, cb);
+ });
+ } else {
+ _destroy(this, err, cb);
+ }
+
+ return this;
+ }
+
+ _undestroy() {
+ const r = this._readableState;
+ r.constructed = true;
+ r.closed = false;
+ r.closeEmitted = false;
+ r.destroyed = false;
+ r.errored = null;
+ r.errorEmitted = false;
+ r.reading = false;
+ r.ended = false;
+ r.endEmitted = false;
+ }
+
+ _destroy(
+ error: Error | null,
+ callback: (error?: Error | null) => void,
+ ): void {
+ callback(error);
+ }
+
+ [captureRejectionSymbol](err: Error) {
+ this.destroy(err);
+ }
+
+ //TODO(Soremwar)
+ //Same deal, string => encodings
+ // deno-lint-ignore no-explicit-any
+ push(chunk: any, encoding?: string): boolean {
+ return readableAddChunk(this, chunk, encoding, false);
+ }
+
+ // deno-lint-ignore no-explicit-any
+ unshift(chunk: any, encoding?: string): boolean {
+ return readableAddChunk(this, chunk, encoding, true);
+ }
+
+ unpipe(dest?: Writable): this {
+ const state = this._readableState;
+ const unpipeInfo = { hasUnpiped: false };
+
+ if (state.pipes.length === 0) {
+ return this;
+ }
+
+ if (!dest) {
+ // remove all.
+ const dests = state.pipes;
+ state.pipes = [];
+ this.pause();
+
+ for (const dest of dests) {
+ dest.emit("unpipe", this, { hasUnpiped: false });
+ }
+ return this;
+ }
+
+ const index = state.pipes.indexOf(dest);
+ if (index === -1) {
+ return this;
+ }
+
+ state.pipes.splice(index, 1);
+ if (state.pipes.length === 0) {
+ this.pause();
+ }
+
+ dest.emit("unpipe", this, unpipeInfo);
+
+ return this;
+ }
+
+ removeAllListeners(
+ ev:
+ | "close"
+ | "data"
+ | "end"
+ | "error"
+ | "pause"
+ | "readable"
+ | "resume"
+ | symbol
+ | undefined,
+ ) {
+ const res = super.removeAllListeners(ev);
+
+ if (ev === "readable" || ev === undefined) {
+ queueMicrotask(() => updateReadableListening(this));
+ }
+
+ return res;
+ }
+
+ resume() {
+ const state = this._readableState;
+ if (!state.flowing) {
+ // We flow only if there is no one listening
+ // for readable, but we still have to call
+ // resume().
+ state.flowing = !state.readableListening;
+ resume(this, state);
+ }
+ state[kPaused] = false;
+ return this;
+ }
+
+ pause() {
+ if (this._readableState.flowing !== false) {
+ this._readableState.flowing = false;
+ this.emit("pause");
+ }
+ this._readableState[kPaused] = true;
+ return this;
+ }
+
+ /** Wrap an old-style stream as the async data source. */
+ wrap(stream: Stream): this {
+ const state = this._readableState;
+ let paused = false;
+
+ stream.on("end", () => {
+ if (state.decoder && !state.ended) {
+ const chunk = state.decoder.end();
+ if (chunk && chunk.length) {
+ this.push(chunk);
+ }
+ }
+
+ this.push(null);
+ });
+
+ stream.on("data", (chunk) => {
+ if (state.decoder) {
+ chunk = state.decoder.write(chunk);
+ }
+
+ if (state.objectMode && (chunk === null || chunk === undefined)) {
+ return;
+ } else if (!state.objectMode && (!chunk || !chunk.length)) {
+ return;
+ }
+
+ const ret = this.push(chunk);
+ if (!ret) {
+ paused = true;
+ // By the time this is triggered, stream will be a readable stream
+ // deno-lint-ignore ban-ts-comment
+ // @ts-ignore
+ stream.pause();
+ }
+ });
+
+ // TODO(Soremwar)
+ // There must be a clean way to implement this on TypeScript
+ // Proxy all the other methods. Important when wrapping filters and duplexes.
+ for (const i in stream) {
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ if (this[i] === undefined && typeof stream[i] === "function") {
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ this[i] = function methodWrap(method) {
+ return function methodWrapReturnFunction() {
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ return stream[method].apply(stream);
+ };
+ }(i);
+ }
+ }
+
+ stream.on("error", (err) => {
+ errorOrDestroy(this, err);
+ });
+
+ stream.on("close", () => {
+ this.emit("close");
+ });
+
+ stream.on("destroy", () => {
+ this.emit("destroy");
+ });
+
+ stream.on("pause", () => {
+ this.emit("pause");
+ });
+
+ stream.on("resume", () => {
+ this.emit("resume");
+ });
+
+ this._read = () => {
+ if (paused) {
+ paused = false;
+ // By the time this is triggered, stream will be a readable stream
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ stream.resume();
+ }
+ };
+
+ return this;
+ }
+
+ [Symbol.asyncIterator]() {
+ return createReadableStreamAsyncIterator(this);
+ }
+
+ get readable(): boolean {
+ return this._readableState?.readable &&
+ !this._readableState?.destroyed &&
+ !this._readableState?.errorEmitted &&
+ !this._readableState?.endEmitted;
+ }
+ set readable(val: boolean) {
+ if (this._readableState) {
+ this._readableState.readable = val;
+ }
+ }
+
+ get readableHighWaterMark(): number {
+ return this._readableState.highWaterMark;
+ }
+
+ get readableBuffer() {
+ return this._readableState && this._readableState.buffer;
+ }
+
+ get readableFlowing(): boolean | null {
+ return this._readableState.flowing;
+ }
+
+ set readableFlowing(state: boolean | null) {
+ if (this._readableState) {
+ this._readableState.flowing = state;
+ }
+ }
+
+ get readableLength() {
+ return this._readableState.length;
+ }
+
+ get readableObjectMode() {
+ return this._readableState ? this._readableState.objectMode : false;
+ }
+
+ get readableEncoding() {
+ return this._readableState ? this._readableState.encoding : null;
+ }
+
+ get destroyed() {
+ if (this._readableState === undefined) {
+ return false;
+ }
+ return this._readableState.destroyed;
+ }
+
+ set destroyed(value: boolean) {
+ if (!this._readableState) {
+ return;
+ }
+ this._readableState.destroyed = value;
+ }
+
+ get readableEnded() {
+ return this._readableState ? this._readableState.endEmitted : false;
+ }
+}
+
+Object.defineProperties(Stream, {
+ _readableState: { enumerable: false },
+ destroyed: { enumerable: false },
+ readableBuffer: { enumerable: false },
+ readableEncoding: { enumerable: false },
+ readableEnded: { enumerable: false },
+ readableFlowing: { enumerable: false },
+ readableHighWaterMark: { enumerable: false },
+ readableLength: { enumerable: false },
+ readableObjectMode: { enumerable: false },
+});
+
+export default Readable;
diff --git a/std/node/_stream/readable_test.ts b/std/node/_stream/readable_test.ts
new file mode 100644
index 000000000..72767e28f
--- /dev/null
+++ b/std/node/_stream/readable_test.ts
@@ -0,0 +1,489 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import Readable from "../_stream/readable.ts";
+import { once } from "../events.ts";
+import { deferred } from "../../async/mod.ts";
+import {
+ assert,
+ assertEquals,
+ assertStrictEquals,
+} from "../../testing/asserts.ts";
+
+Deno.test("Readable stream from iterator", async () => {
+ function* generate() {
+ yield "a";
+ yield "b";
+ yield "c";
+ }
+
+ const stream = Readable.from(generate());
+
+ const expected = ["a", "b", "c"];
+
+ for await (const chunk of stream) {
+ assertStrictEquals(chunk, expected.shift());
+ }
+});
+
+Deno.test("Readable stream from async iterator", async () => {
+ async function* generate() {
+ yield "a";
+ yield "b";
+ yield "c";
+ }
+
+ const stream = Readable.from(generate());
+
+ const expected = ["a", "b", "c"];
+
+ for await (const chunk of stream) {
+ assertStrictEquals(chunk, expected.shift());
+ }
+});
+
+Deno.test("Readable stream from promise", async () => {
+ const promises = [
+ Promise.resolve("a"),
+ Promise.resolve("b"),
+ Promise.resolve("c"),
+ ];
+
+ const stream = Readable.from(promises);
+
+ const expected = ["a", "b", "c"];
+
+ for await (const chunk of stream) {
+ assertStrictEquals(chunk, expected.shift());
+ }
+});
+
+Deno.test("Readable stream from string", async () => {
+ const string = "abc";
+ const stream = Readable.from(string);
+
+ for await (const chunk of stream) {
+ assertStrictEquals(chunk, string);
+ }
+});
+
+Deno.test("Readable stream from Buffer", async () => {
+ const string = "abc";
+ const stream = Readable.from(Buffer.from(string));
+
+ for await (const chunk of stream) {
+ assertStrictEquals((chunk as Buffer).toString(), string);
+ }
+});
+
+Deno.test("Readable stream gets destroyed on error", async () => {
+ // deno-lint-ignore require-yield
+ async function* generate() {
+ throw new Error("kaboom");
+ }
+
+ const stream = Readable.from(generate());
+
+ stream.read();
+
+ const [err] = await once(stream, "error");
+ assertStrictEquals(err.message, "kaboom");
+ assertStrictEquals(stream.destroyed, true);
+});
+
+Deno.test("Readable stream works as Transform stream", async () => {
+ async function* generate(stream: Readable) {
+ for await (const chunk of stream) {
+ yield (chunk as string).toUpperCase();
+ }
+ }
+
+ const source = new Readable({
+ objectMode: true,
+ read() {
+ this.push("a");
+ this.push("b");
+ this.push("c");
+ this.push(null);
+ },
+ });
+
+ const stream = Readable.from(generate(source));
+
+ const expected = ["A", "B", "C"];
+
+ for await (const chunk of stream) {
+ assertStrictEquals(chunk, expected.shift());
+ }
+});
+
+Deno.test("Readable stream can be paused", () => {
+ const readable = new Readable();
+
+ // _read is a noop, here.
+ readable._read = () => {};
+
+ // Default state of a stream is not "paused"
+ assert(!readable.isPaused());
+
+ // Make the stream start flowing...
+ readable.on("data", () => {});
+
+ // still not paused.
+ assert(!readable.isPaused());
+
+ readable.pause();
+ assert(readable.isPaused());
+ readable.resume();
+ assert(!readable.isPaused());
+});
+
+Deno.test("Readable stream sets enconding correctly", () => {
+ const readable = new Readable({
+ read() {},
+ });
+
+ readable.setEncoding("utf8");
+
+ readable.push(new TextEncoder().encode("DEF"));
+ readable.unshift(new TextEncoder().encode("ABC"));
+
+ assertStrictEquals(readable.read(), "ABCDEF");
+});
+
+Deno.test("Readable stream sets encoding correctly", () => {
+ const readable = new Readable({
+ read() {},
+ });
+
+ readable.setEncoding("utf8");
+
+ readable.push(new TextEncoder().encode("DEF"));
+ readable.unshift(new TextEncoder().encode("ABC"));
+
+ assertStrictEquals(readable.read(), "ABCDEF");
+});
+
+Deno.test("Readable stream holds up a big push", async () => {
+ let readExecuted = 0;
+ const readExecutedExpected = 3;
+ const readExpectedExecutions = deferred();
+
+ let endExecuted = 0;
+ const endExecutedExpected = 1;
+ const endExpectedExecutions = deferred();
+
+ const str = "asdfasdfasdfasdfasdf";
+
+ const r = new Readable({
+ highWaterMark: 5,
+ encoding: "utf8",
+ });
+
+ let reads = 0;
+
+ function _read() {
+ if (reads === 0) {
+ setTimeout(() => {
+ r.push(str);
+ }, 1);
+ reads++;
+ } else if (reads === 1) {
+ const ret = r.push(str);
+ assertEquals(ret, false);
+ reads++;
+ } else {
+ r.push(null);
+ }
+ }
+
+ r._read = () => {
+ readExecuted++;
+ if (readExecuted == readExecutedExpected) {
+ readExpectedExecutions.resolve();
+ }
+ _read();
+ };
+
+ r.on("end", () => {
+ endExecuted++;
+ if (endExecuted == endExecutedExpected) {
+ endExpectedExecutions.resolve();
+ }
+ });
+
+ // Push some data in to start.
+ // We've never gotten any read event at this point.
+ const ret = r.push(str);
+ assert(!ret);
+ let chunk = r.read();
+ assertEquals(chunk, str);
+ chunk = r.read();
+ assertEquals(chunk, null);
+
+ r.once("readable", () => {
+ // This time, we'll get *all* the remaining data, because
+ // it's been added synchronously, as the read WOULD take
+ // us below the hwm, and so it triggered a _read() again,
+ // which synchronously added more, which we then return.
+ chunk = r.read();
+ assertEquals(chunk, str + str);
+
+ chunk = r.read();
+ assertEquals(chunk, null);
+ });
+
+ const readTimeout = setTimeout(
+ () => readExpectedExecutions.reject(),
+ 1000,
+ );
+ const endTimeout = setTimeout(
+ () => endExpectedExecutions.reject(),
+ 1000,
+ );
+ await readExpectedExecutions;
+ await endExpectedExecutions;
+ clearTimeout(readTimeout);
+ clearTimeout(endTimeout);
+ assertEquals(readExecuted, readExecutedExpected);
+ assertEquals(endExecuted, endExecutedExpected);
+});
+
+Deno.test("Readable stream: 'on' event", async () => {
+ async function* generate() {
+ yield "a";
+ yield "b";
+ yield "c";
+ }
+
+ const stream = Readable.from(generate());
+
+ let iterations = 0;
+ const expected = ["a", "b", "c"];
+
+ stream.on("data", (chunk) => {
+ iterations++;
+ assertStrictEquals(chunk, expected.shift());
+ });
+
+ await once(stream, "end");
+
+ assertStrictEquals(iterations, 3);
+});
+
+Deno.test("Readable stream: 'data' event", async () => {
+ async function* generate() {
+ yield "a";
+ yield "b";
+ yield "c";
+ }
+
+ const stream = Readable.from(generate(), { objectMode: false });
+
+ let iterations = 0;
+ const expected = ["a", "b", "c"];
+
+ stream.on("data", (chunk) => {
+ iterations++;
+ assertStrictEquals(chunk instanceof Buffer, true);
+ assertStrictEquals(chunk.toString(), expected.shift());
+ });
+
+ await once(stream, "end");
+
+ assertStrictEquals(iterations, 3);
+});
+
+Deno.test("Readable stream: 'data' event on non-object", async () => {
+ async function* generate() {
+ yield "a";
+ yield "b";
+ yield "c";
+ }
+
+ const stream = Readable.from(generate(), { objectMode: false });
+
+ let iterations = 0;
+ const expected = ["a", "b", "c"];
+
+ stream.on("data", (chunk) => {
+ iterations++;
+ assertStrictEquals(chunk instanceof Buffer, true);
+ assertStrictEquals(chunk.toString(), expected.shift());
+ });
+
+ await once(stream, "end");
+
+ assertStrictEquals(iterations, 3);
+});
+
+Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => {
+ let readableExecuted = 0;
+ const readableExecutedExpected = 1;
+ const readableExpectedExecutions = deferred();
+
+ const r = new Readable({
+ highWaterMark: 3,
+ });
+
+ r._read = () => {
+ throw new Error("_read must not be called");
+ };
+ r.push(Buffer.from("blerg"));
+
+ setTimeout(function () {
+ assert(!r._readableState.reading);
+ r.on("readable", () => {
+ readableExecuted++;
+ if (readableExecuted == readableExecutedExpected) {
+ readableExpectedExecutions.resolve();
+ }
+ });
+ }, 1);
+
+ const readableTimeout = setTimeout(
+ () => readableExpectedExecutions.reject(),
+ 1000,
+ );
+ await readableExpectedExecutions;
+ clearTimeout(readableTimeout);
+ assertEquals(readableExecuted, readableExecutedExpected);
+});
+
+Deno.test("Readable stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => {
+ let readableExecuted = 0;
+ const readableExecutedExpected = 1;
+ const readableExpectedExecutions = deferred();
+
+ let readExecuted = 0;
+ const readExecutedExpected = 1;
+ const readExpectedExecutions = deferred();
+
+ const r = new Readable({
+ highWaterMark: 3,
+ });
+
+ r._read = () => {
+ readExecuted++;
+ if (readExecuted == readExecutedExpected) {
+ readExpectedExecutions.resolve();
+ }
+ };
+
+ r.push(Buffer.from("bl"));
+
+ setTimeout(function () {
+ assert(r._readableState.reading);
+ r.on("readable", () => {
+ readableExecuted++;
+ if (readableExecuted == readableExecutedExpected) {
+ readableExpectedExecutions.resolve();
+ }
+ });
+ }, 1);
+
+ const readableTimeout = setTimeout(
+ () => readableExpectedExecutions.reject(),
+ 1000,
+ );
+ const readTimeout = setTimeout(
+ () => readExpectedExecutions.reject(),
+ 1000,
+ );
+ await readableExpectedExecutions;
+ await readExpectedExecutions;
+ clearTimeout(readableTimeout);
+ clearTimeout(readTimeout);
+ assertEquals(readableExecuted, readableExecutedExpected);
+ assertEquals(readExecuted, readExecutedExpected);
+});
+
+Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => {
+ let readableExecuted = 0;
+ const readableExecutedExpected = 1;
+ const readableExpectedExecutions = deferred();
+
+ const r = new Readable({
+ highWaterMark: 30,
+ });
+
+ r._read = () => {
+ throw new Error("Must not be executed");
+ };
+
+ r.push(Buffer.from("blerg"));
+ //This ends the stream and triggers end
+ r.push(null);
+
+ setTimeout(function () {
+ // Assert we're testing what we think we are
+ assert(!r._readableState.reading);
+ r.on("readable", () => {
+ readableExecuted++;
+ if (readableExecuted == readableExecutedExpected) {
+ readableExpectedExecutions.resolve();
+ }
+ });
+ }, 1);
+
+ const readableTimeout = setTimeout(
+ () => readableExpectedExecutions.reject(),
+ 1000,
+ );
+ await readableExpectedExecutions;
+ clearTimeout(readableTimeout);
+ assertEquals(readableExecuted, readableExecutedExpected);
+});
+
+Deno.test("Readable stream: 'read' is emitted on empty string pushed in non-object mode", async () => {
+ let endExecuted = 0;
+ const endExecutedExpected = 1;
+ const endExpectedExecutions = deferred();
+
+ const underlyingData = ["", "x", "y", "", "z"];
+ const expected = underlyingData.filter((data) => data);
+ const result: unknown[] = [];
+
+ const r = new Readable({
+ encoding: "utf8",
+ });
+ r._read = function () {
+ queueMicrotask(() => {
+ if (!underlyingData.length) {
+ this.push(null);
+ } else {
+ this.push(underlyingData.shift());
+ }
+ });
+ };
+
+ r.on("readable", () => {
+ const data = r.read();
+ if (data !== null) result.push(data);
+ });
+
+ r.on("end", () => {
+ endExecuted++;
+ if (endExecuted == endExecutedExpected) {
+ endExpectedExecutions.resolve();
+ }
+ assertEquals(result, expected);
+ });
+
+ const endTimeout = setTimeout(
+ () => endExpectedExecutions.reject(),
+ 1000,
+ );
+ await endExpectedExecutions;
+ clearTimeout(endTimeout);
+ assertEquals(endExecuted, endExecutedExpected);
+});
+
+Deno.test("Readable stream: listeners can be removed", () => {
+ const r = new Readable();
+ r._read = () => {};
+ r.on("data", () => {});
+
+ r.removeAllListeners("data");
+
+ assertEquals(r.eventNames().length, 0);
+});
diff --git a/std/node/_stream/stream.ts b/std/node/_stream/stream.ts
new file mode 100644
index 000000000..708b8bcd3
--- /dev/null
+++ b/std/node/_stream/stream.ts
@@ -0,0 +1,79 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import EventEmitter from "../events.ts";
+import type Writable from "./writable.ts";
+import { types } from "../util.ts";
+
+class Stream extends EventEmitter {
+ constructor() {
+ super();
+ }
+
+ static _isUint8Array = types.isUint8Array;
+ static _uint8ArrayToBuffer = (chunk: Uint8Array) => Buffer.from(chunk);
+
+ pipe(dest: Writable, options: { end: boolean }) {
+ // deno-lint-ignore no-this-alias
+ const source = this;
+
+ //TODO(Soremwar)
+ //isStdio exist on stdin || stdout only, which extend from Duplex
+ //if (!dest._isStdio && (options?.end ?? true)) {
+ //Find an alternative to be able to pipe streams to stdin & stdout
+ //Port them as well?
+ if (options?.end ?? true) {
+ source.on("end", onend);
+ source.on("close", onclose);
+ }
+
+ let didOnEnd = false;
+ function onend() {
+ if (didOnEnd) return;
+ didOnEnd = true;
+
+ dest.end();
+ }
+
+ function onclose() {
+ if (didOnEnd) return;
+ didOnEnd = true;
+
+ if (typeof dest.destroy === "function") dest.destroy();
+ }
+
+ // Don't leave dangling pipes when there are errors.
+ function onerror(this: Stream, er: Error) {
+ cleanup();
+ if (this.listenerCount("error") === 0) {
+ throw er; // Unhandled stream error in pipe.
+ }
+ }
+
+ source.on("error", onerror);
+ dest.on("error", onerror);
+
+ // Remove all the event listeners that were added.
+ function cleanup() {
+ source.removeListener("end", onend);
+ source.removeListener("close", onclose);
+
+ source.removeListener("error", onerror);
+ dest.removeListener("error", onerror);
+
+ source.removeListener("end", cleanup);
+ source.removeListener("close", cleanup);
+
+ dest.removeListener("close", cleanup);
+ }
+
+ source.on("end", cleanup);
+ source.on("close", cleanup);
+
+ dest.on("close", cleanup);
+ dest.emit("pipe", source);
+
+ return dest;
+ }
+}
+
+export default Stream;
diff --git a/std/node/_stream/symbols.ts b/std/node/_stream/symbols.ts
new file mode 100644
index 000000000..addb969d3
--- /dev/null
+++ b/std/node/_stream/symbols.ts
@@ -0,0 +1,4 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+export const kConstruct = Symbol("kConstruct");
+export const kDestroy = Symbol("kDestroy");
+export const kPaused = Symbol("kPaused");
diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts
new file mode 100644
index 000000000..158af8325
--- /dev/null
+++ b/std/node/_stream/writable.ts
@@ -0,0 +1,952 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import Stream from "./stream.ts";
+import { captureRejectionSymbol } from "../events.ts";
+import { kConstruct, kDestroy } from "./symbols.ts";
+import {
+ ERR_INVALID_ARG_TYPE,
+ ERR_INVALID_OPT_VALUE,
+ ERR_METHOD_NOT_IMPLEMENTED,
+ ERR_MULTIPLE_CALLBACK,
+ ERR_STREAM_ALREADY_FINISHED,
+ ERR_STREAM_CANNOT_PIPE,
+ ERR_STREAM_DESTROYED,
+ ERR_STREAM_NULL_VALUES,
+ ERR_STREAM_WRITE_AFTER_END,
+ ERR_UNKNOWN_ENCODING,
+} from "../_errors.ts";
+
+function nop() {}
+
+//TODO(Soremwar)
+//Bring in encodings
+type write_v = (
+ // deno-lint-ignore no-explicit-any
+ chunks: Array<{ chunk: any; encoding: string }>,
+ callback: (error?: Error | null) => void,
+) => void;
+
+type AfterWriteTick = {
+ cb: (error?: Error) => void;
+ count: number;
+ state: WritableState;
+ stream: Writable;
+};
+
+const kOnFinished = Symbol("kOnFinished");
+
+function destroy(this: Writable, err?: Error, cb?: () => void) {
+ const w = this._writableState;
+
+ if (w.destroyed) {
+ if (typeof cb === "function") {
+ cb();
+ }
+
+ return this;
+ }
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ }
+
+ w.destroyed = true;
+
+ if (!w.constructed) {
+ this.once(kDestroy, (er) => {
+ _destroy(this, err || er, cb);
+ });
+ } else {
+ _destroy(this, err, cb);
+ }
+
+ return this;
+}
+
+function _destroy(
+ self: Writable,
+ err?: Error,
+ cb?: (error?: Error | null) => void,
+) {
+ self._destroy(err || null, (err) => {
+ const w = self._writableState;
+
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ }
+
+ w.closed = true;
+
+ if (typeof cb === "function") {
+ cb(err);
+ }
+
+ if (err) {
+ queueMicrotask(() => {
+ if (!w.errorEmitted) {
+ w.errorEmitted = true;
+ self.emit("error", err);
+ }
+ w.closeEmitted = true;
+ if (w.emitClose) {
+ self.emit("close");
+ }
+ });
+ } else {
+ queueMicrotask(() => {
+ w.closeEmitted = true;
+ if (w.emitClose) {
+ self.emit("close");
+ }
+ });
+ }
+ });
+}
+
+function errorOrDestroy(stream: Writable, err: Error, sync = false) {
+ const w = stream._writableState;
+
+ if (w.destroyed) {
+ return stream;
+ }
+
+ if (w.autoDestroy) {
+ stream.destroy(err);
+ } else if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack;
+
+ if (!w.errored) {
+ w.errored = err;
+ }
+ if (sync) {
+ queueMicrotask(() => {
+ if (w.errorEmitted) {
+ return;
+ }
+ w.errorEmitted = true;
+ stream.emit("error", err);
+ });
+ } else {
+ if (w.errorEmitted) {
+ return;
+ }
+ w.errorEmitted = true;
+ stream.emit("error", err);
+ }
+ }
+}
+
+function construct(stream: Writable, cb: (error: Error) => void) {
+ if (!stream._construct) {
+ return;
+ }
+
+ stream.once(kConstruct, cb);
+ const w = stream._writableState;
+
+ w.constructed = false;
+
+ queueMicrotask(() => {
+ let called = false;
+ stream._construct?.((err) => {
+ w.constructed = true;
+
+ if (called) {
+ err = new ERR_MULTIPLE_CALLBACK();
+ } else {
+ called = true;
+ }
+
+ if (w.destroyed) {
+ stream.emit(kDestroy, err);
+ } else if (err) {
+ errorOrDestroy(stream, err, true);
+ } else {
+ queueMicrotask(() => {
+ stream.emit(kConstruct);
+ });
+ }
+ });
+ });
+}
+
+//TODO(Soremwar)
+//Bring encodings in
+function writeOrBuffer(
+ stream: Writable,
+ state: WritableState,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ callback: (error: Error) => void,
+) {
+ const len = state.objectMode ? 1 : chunk.length;
+
+ state.length += len;
+
+ if (state.writing || state.corked || state.errored || !state.constructed) {
+ state.buffered.push({ chunk, encoding, callback });
+ if (state.allBuffers && encoding !== "buffer") {
+ state.allBuffers = false;
+ }
+ if (state.allNoop && callback !== nop) {
+ state.allNoop = false;
+ }
+ } else {
+ state.writelen = len;
+ state.writecb = callback;
+ state.writing = true;
+ state.sync = true;
+ stream._write(chunk, encoding, state.onwrite);
+ state.sync = false;
+ }
+
+ const ret = state.length < state.highWaterMark;
+
+ if (!ret) {
+ state.needDrain = true;
+ }
+
+ return ret && !state.errored && !state.destroyed;
+}
+
+//TODO(Soremwar)
+//Bring encodings in
+function doWrite(
+ stream: Writable,
+ state: WritableState,
+ writev: boolean,
+ len: number,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ cb: (error: Error) => void,
+) {
+ state.writelen = len;
+ state.writecb = cb;
+ state.writing = true;
+ state.sync = true;
+ if (state.destroyed) {
+ state.onwrite(new ERR_STREAM_DESTROYED("write"));
+ } else if (writev) {
+ (stream._writev as unknown as write_v)(chunk, state.onwrite);
+ } else {
+ stream._write(chunk, encoding, state.onwrite);
+ }
+ state.sync = false;
+}
+
+function onwriteError(
+ stream: Writable,
+ state: WritableState,
+ er: Error,
+ cb: (error: Error) => void,
+) {
+ --state.pendingcb;
+
+ cb(er);
+ errorBuffer(state);
+ errorOrDestroy(stream, er);
+}
+
+function onwrite(stream: Writable, er?: Error | null) {
+ const state = stream._writableState;
+ const sync = state.sync;
+ const cb = state.writecb;
+
+ if (typeof cb !== "function") {
+ errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
+ return;
+ }
+
+ state.writing = false;
+ state.writecb = null;
+ state.length -= state.writelen;
+ state.writelen = 0;
+
+ if (er) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ er.stack;
+
+ if (!state.errored) {
+ state.errored = er;
+ }
+
+ if (sync) {
+ queueMicrotask(() => onwriteError(stream, state, er, cb));
+ } else {
+ onwriteError(stream, state, er, cb);
+ }
+ } else {
+ if (state.buffered.length > state.bufferedIndex) {
+ clearBuffer(stream, state);
+ }
+
+ if (sync) {
+ if (
+ state.afterWriteTickInfo !== null &&
+ state.afterWriteTickInfo.cb === cb
+ ) {
+ state.afterWriteTickInfo.count++;
+ } else {
+ state.afterWriteTickInfo = {
+ count: 1,
+ cb: (cb as (error?: Error) => void),
+ stream,
+ state,
+ };
+ queueMicrotask(() =>
+ afterWriteTick(state.afterWriteTickInfo as AfterWriteTick)
+ );
+ }
+ } else {
+ afterWrite(stream, state, 1, cb as (error?: Error) => void);
+ }
+ }
+}
+
+function afterWriteTick({
+ cb,
+ count,
+ state,
+ stream,
+}: AfterWriteTick) {
+ state.afterWriteTickInfo = null;
+ return afterWrite(stream, state, count, cb);
+}
+
+function afterWrite(
+ stream: Writable,
+ state: WritableState,
+ count: number,
+ cb: (error?: Error) => void,
+) {
+ const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
+ state.needDrain;
+ if (needDrain) {
+ state.needDrain = false;
+ stream.emit("drain");
+ }
+
+ while (count-- > 0) {
+ state.pendingcb--;
+ cb();
+ }
+
+ if (state.destroyed) {
+ errorBuffer(state);
+ }
+
+ finishMaybe(stream, state);
+}
+
+/** If there's something in the buffer waiting, then invoke callbacks.*/
+function errorBuffer(state: WritableState) {
+ if (state.writing) {
+ return;
+ }
+
+ for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
+ const { chunk, callback } = state.buffered[n];
+ const len = state.objectMode ? 1 : chunk.length;
+ state.length -= len;
+ callback(new ERR_STREAM_DESTROYED("write"));
+ }
+
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback(new ERR_STREAM_DESTROYED("end"));
+ }
+
+ resetBuffer(state);
+}
+
+/** If there's something in the buffer waiting, then process it.*/
+function clearBuffer(stream: Writable, state: WritableState) {
+ if (
+ state.corked ||
+ state.bufferProcessing ||
+ state.destroyed ||
+ !state.constructed
+ ) {
+ return;
+ }
+
+ const { buffered, bufferedIndex, objectMode } = state;
+ const bufferedLength = buffered.length - bufferedIndex;
+
+ if (!bufferedLength) {
+ return;
+ }
+
+ const i = bufferedIndex;
+
+ state.bufferProcessing = true;
+ if (bufferedLength > 1 && stream._writev) {
+ state.pendingcb -= bufferedLength - 1;
+
+ const callback = state.allNoop ? nop : (err: Error) => {
+ for (let n = i; n < buffered.length; ++n) {
+ buffered[n].callback(err);
+ }
+ };
+ const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
+
+ doWrite(stream, state, true, state.length, chunks, "", callback);
+
+ resetBuffer(state);
+ } else {
+ do {
+ const { chunk, encoding, callback } = buffered[i];
+ const len = objectMode ? 1 : chunk.length;
+ doWrite(stream, state, false, len, chunk, encoding, callback);
+ } while (i < buffered.length && !state.writing);
+
+ if (i === buffered.length) {
+ resetBuffer(state);
+ } else if (i > 256) {
+ buffered.splice(0, i);
+ state.bufferedIndex = 0;
+ } else {
+ state.bufferedIndex = i;
+ }
+ }
+ state.bufferProcessing = false;
+}
+
+function finish(stream: Writable, state: WritableState) {
+ state.pendingcb--;
+ if (state.errorEmitted || state.closeEmitted) {
+ return;
+ }
+
+ state.finished = true;
+
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback();
+ }
+
+ stream.emit("finish");
+
+ if (state.autoDestroy) {
+ stream.destroy();
+ }
+}
+
+function finishMaybe(stream: Writable, state: WritableState, sync?: boolean) {
+ if (needFinish(state)) {
+ prefinish(stream, state);
+ if (state.pendingcb === 0 && needFinish(state)) {
+ state.pendingcb++;
+ if (sync) {
+ queueMicrotask(() => finish(stream, state));
+ } else {
+ finish(stream, state);
+ }
+ }
+ }
+}
+
+function prefinish(stream: Writable, state: WritableState) {
+ if (!state.prefinished && !state.finalCalled) {
+ if (typeof stream._final === "function" && !state.destroyed) {
+ state.finalCalled = true;
+
+ state.sync = true;
+ state.pendingcb++;
+ stream._final((err) => {
+ state.pendingcb--;
+ if (err) {
+ for (const callback of state[kOnFinished].splice(0)) {
+ callback(err);
+ }
+ errorOrDestroy(stream, err, state.sync);
+ } else if (needFinish(state)) {
+ state.prefinished = true;
+ stream.emit("prefinish");
+ state.pendingcb++;
+ queueMicrotask(() => finish(stream, state));
+ }
+ });
+ state.sync = false;
+ } else {
+ state.prefinished = true;
+ stream.emit("prefinish");
+ }
+ }
+}
+
+function needFinish(state: WritableState) {
+ return (state.ending &&
+ state.constructed &&
+ state.length === 0 &&
+ !state.errored &&
+ state.buffered.length === 0 &&
+ !state.finished &&
+ !state.writing);
+}
+
+interface WritableOptions {
+ autoDestroy?: boolean;
+ decodeStrings?: boolean;
+ //TODO(Soremwar)
+ //Bring encodings in
+ defaultEncoding?: string;
+ destroy?(
+ this: Writable,
+ error: Error | null,
+ callback: (error: Error | null) => void,
+ ): void;
+ emitClose?: boolean;
+ final?(this: Writable, callback: (error?: Error | null) => void): void;
+ highWaterMark?: number;
+ objectMode?: boolean;
+ //TODO(Soremwar)
+ //Bring encodings in
+ write?(
+ this: Writable,
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ callback: (error?: Error | null) => void,
+ ): void;
+ //TODO(Soremwar)
+ //Bring encodings in
+ writev?(
+ this: Writable,
+ // deno-lint-ignore no-explicit-any
+ chunks: Array<{ chunk: any; encoding: string }>,
+ callback: (error?: Error | null) => void,
+ ): void;
+}
+
+class WritableState {
+ [kOnFinished]: Array<(error?: Error) => void> = [];
+ afterWriteTickInfo: null | AfterWriteTick = null;
+ allBuffers = true;
+ allNoop = true;
+ autoDestroy: boolean;
+ //TODO(Soremwar)
+ //Bring in encodings
+ buffered: Array<{
+ allBuffers?: boolean;
+ // deno-lint-ignore no-explicit-any
+ chunk: any;
+ encoding: string;
+ callback: (error: Error) => void;
+ }> = [];
+ bufferedIndex = 0;
+ bufferProcessing = false;
+ closed = false;
+ closeEmitted = false;
+ constructed: boolean;
+ corked = 0;
+ decodeStrings: boolean;
+ defaultEncoding: string;
+ destroyed = false;
+ emitClose: boolean;
+ ended = false;
+ ending = false;
+ errored: Error | null = null;
+ errorEmitted = false;
+ finalCalled = false;
+ finished = false;
+ highWaterMark: number;
+ length = 0;
+ needDrain = false;
+ objectMode: boolean;
+ onwrite: (error?: Error | null) => void;
+ pendingcb = 0;
+ prefinished = false;
+ sync = true;
+ writecb: null | ((error: Error) => void) = null;
+ writable = true;
+ writelen = 0;
+ writing = false;
+
+ constructor(options: WritableOptions | undefined, stream: Writable) {
+ this.objectMode = !!options?.objectMode;
+
+ this.highWaterMark = options?.highWaterMark ??
+ (this.objectMode ? 16 : 16 * 1024);
+
+ if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) {
+ this.highWaterMark = Math.floor(this.highWaterMark);
+ } else {
+ throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark);
+ }
+
+ this.decodeStrings = !options?.decodeStrings === false;
+
+ this.defaultEncoding = options?.defaultEncoding || "utf8";
+
+ this.onwrite = onwrite.bind(undefined, stream);
+
+ resetBuffer(this);
+
+ this.emitClose = options?.emitClose ?? true;
+ this.autoDestroy = options?.autoDestroy ?? true;
+ this.constructed = true;
+ }
+
+ getBuffer() {
+ return this.buffered.slice(this.bufferedIndex);
+ }
+
+ get bufferedRequestCount() {
+ return this.buffered.length - this.bufferedIndex;
+ }
+}
+
+function resetBuffer(state: WritableState) {
+ state.buffered = [];
+ state.bufferedIndex = 0;
+ state.allBuffers = true;
+ state.allNoop = true;
+}
+
+/** A bit simpler than readable streams.
+* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all
+* the drain event emission and buffering.
+*/
+class Writable extends Stream {
+ _construct?: (cb: (error?: Error) => void) => void;
+ _final?: (
+ this: Writable,
+ callback: (error?: Error | null | undefined) => void,
+ ) => void;
+ _writableState: WritableState;
+ _writev?: write_v | null = null;
+
+ constructor(options?: WritableOptions) {
+ super();
+ this._writableState = new WritableState(options, this);
+
+ if (options) {
+ if (typeof options.write === "function") {
+ this._write = options.write;
+ }
+
+ if (typeof options.writev === "function") {
+ this._writev = options.writev;
+ }
+
+ if (typeof options.destroy === "function") {
+ this._destroy = options.destroy;
+ }
+
+ if (typeof options.final === "function") {
+ this._final = options.final;
+ }
+ }
+
+ construct(this, () => {
+ const state = this._writableState;
+
+ if (!state.writing) {
+ clearBuffer(this, state);
+ }
+
+ finishMaybe(this, state);
+ });
+ }
+
+ [captureRejectionSymbol](err?: Error) {
+ this.destroy(err);
+ }
+
+ static WritableState = WritableState;
+
+ get destroyed() {
+ return this._writableState ? this._writableState.destroyed : false;
+ }
+
+ set destroyed(value) {
+ if (this._writableState) {
+ this._writableState.destroyed = value;
+ }
+ }
+
+ get writable() {
+ const w = this._writableState;
+ return !w.destroyed && !w.errored && !w.ending && !w.ended;
+ }
+
+ set writable(val) {
+ if (this._writableState) {
+ this._writableState.writable = !!val;
+ }
+ }
+
+ get writableFinished() {
+ return this._writableState ? this._writableState.finished : false;
+ }
+
+ get writableObjectMode() {
+ return this._writableState ? this._writableState.objectMode : false;
+ }
+
+ get writableBuffer() {
+ return this._writableState && this._writableState.getBuffer();
+ }
+
+ get writableEnded() {
+ return this._writableState ? this._writableState.ending : false;
+ }
+
+ get writableHighWaterMark() {
+ return this._writableState && this._writableState.highWaterMark;
+ }
+
+ get writableCorked() {
+ return this._writableState ? this._writableState.corked : 0;
+ }
+
+ get writableLength() {
+ return this._writableState && this._writableState.length;
+ }
+
+ _undestroy() {
+ const w = this._writableState;
+ w.constructed = true;
+ w.destroyed = false;
+ w.closed = false;
+ w.closeEmitted = false;
+ w.errored = null;
+ w.errorEmitted = false;
+ w.ended = false;
+ w.ending = false;
+ w.finalCalled = false;
+ w.prefinished = false;
+ w.finished = false;
+ }
+
+ _destroy(err: Error | null, cb: (error?: Error | null) => void) {
+ cb(err);
+ }
+
+ destroy(err?: Error, cb?: () => void) {
+ const state = this._writableState;
+ if (!state.destroyed) {
+ queueMicrotask(() => errorBuffer(state));
+ }
+ destroy.call(this, err, cb);
+ return this;
+ }
+
+ end(cb?: () => void): void;
+ // deno-lint-ignore no-explicit-any
+ end(chunk: any, cb?: () => void): void;
+ //TODO(Soremwar)
+ //Bring in encodings
+ // deno-lint-ignore no-explicit-any
+ end(chunk: any, encoding: string, cb?: () => void): void;
+
+ end(
+ // deno-lint-ignore no-explicit-any
+ x?: any | (() => void),
+ //TODO(Soremwar)
+ //Bring in encodings
+ y?: string | (() => void),
+ z?: () => void,
+ ) {
+ const state = this._writableState;
+ // deno-lint-ignore no-explicit-any
+ let chunk: any | null;
+ //TODO(Soremwar)
+ //Bring in encodings
+ let encoding: string | null;
+ let cb: undefined | ((error?: Error) => void);
+
+ if (typeof x === "function") {
+ chunk = null;
+ encoding = null;
+ cb = x;
+ } else if (typeof y === "function") {
+ chunk = x;
+ encoding = null;
+ cb = y;
+ } else {
+ chunk = x;
+ encoding = y as string;
+ cb = z;
+ }
+
+ if (chunk !== null && chunk !== undefined) {
+ this.write(chunk, encoding);
+ }
+
+ if (state.corked) {
+ state.corked = 1;
+ this.uncork();
+ }
+
+ let err: Error | undefined;
+ if (!state.errored && !state.ending) {
+ state.ending = true;
+ finishMaybe(this, state, true);
+ state.ended = true;
+ } else if (state.finished) {
+ err = new ERR_STREAM_ALREADY_FINISHED("end");
+ } else if (state.destroyed) {
+ err = new ERR_STREAM_DESTROYED("end");
+ }
+
+ if (typeof cb === "function") {
+ if (err || state.finished) {
+ queueMicrotask(() => {
+ (cb as (error?: Error | undefined) => void)(err);
+ });
+ } else {
+ state[kOnFinished].push(cb);
+ }
+ }
+
+ return this;
+ }
+
+ //TODO(Soremwar)
+ //Bring in encodings
+ _write(
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string,
+ cb: (error?: Error | null) => void,
+ ): void {
+ if (this._writev) {
+ this._writev([{ chunk, encoding }], cb);
+ } else {
+ throw new ERR_METHOD_NOT_IMPLEMENTED("_write()");
+ }
+ }
+
+ //This signature was changed to keep inheritance coherent
+ pipe(dest: Writable): Writable {
+ errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
+ return dest;
+ }
+
+ // deno-lint-ignore no-explicit-any
+ write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean;
+ //TODO(Soremwar)
+ //Bring in encodings
+ write(
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ encoding: string | null,
+ cb?: (error: Error | null | undefined) => void,
+ ): boolean;
+
+ //TODO(Soremwar)
+ //Bring in encodings
+ write(
+ // deno-lint-ignore no-explicit-any
+ chunk: any,
+ x?: string | null | ((error: Error | null | undefined) => void),
+ y?: ((error: Error | null | undefined) => void),
+ ) {
+ const state = this._writableState;
+ //TODO(Soremwar)
+ //Bring in encodings
+ let encoding: string;
+ let cb: (error?: Error | null) => void;
+
+ if (typeof x === "function") {
+ cb = x;
+ encoding = state.defaultEncoding;
+ } else {
+ if (!x) {
+ encoding = state.defaultEncoding;
+ } else if (x !== "buffer" && !Buffer.isEncoding(x)) {
+ throw new ERR_UNKNOWN_ENCODING(x);
+ } else {
+ encoding = x;
+ }
+ if (typeof y !== "function") {
+ cb = nop;
+ } else {
+ cb = y;
+ }
+ }
+
+ if (chunk === null) {
+ throw new ERR_STREAM_NULL_VALUES();
+ } else if (!state.objectMode) {
+ if (typeof chunk === "string") {
+ if (state.decodeStrings !== false) {
+ chunk = Buffer.from(chunk, encoding);
+ encoding = "buffer";
+ }
+ } else if (chunk instanceof Buffer) {
+ encoding = "buffer";
+ } else if (Stream._isUint8Array(chunk)) {
+ chunk = Stream._uint8ArrayToBuffer(chunk);
+ encoding = "buffer";
+ } else {
+ throw new ERR_INVALID_ARG_TYPE(
+ "chunk",
+ ["string", "Buffer", "Uint8Array"],
+ chunk,
+ );
+ }
+ }
+
+ let err: Error | undefined;
+ if (state.ending) {
+ err = new ERR_STREAM_WRITE_AFTER_END();
+ } else if (state.destroyed) {
+ err = new ERR_STREAM_DESTROYED("write");
+ }
+
+ if (err) {
+ queueMicrotask(() => cb(err));
+ errorOrDestroy(this, err, true);
+ return false;
+ }
+ state.pendingcb++;
+ return writeOrBuffer(this, state, chunk, encoding, cb);
+ }
+
+ cork() {
+ this._writableState.corked++;
+ }
+
+ uncork() {
+ const state = this._writableState;
+
+ if (state.corked) {
+ state.corked--;
+
+ if (!state.writing) {
+ clearBuffer(this, state);
+ }
+ }
+ }
+
+ //TODO(Soremwar)
+ //Bring allowed encodings
+ setDefaultEncoding(encoding: string) {
+ // node::ParseEncoding() requires lower case.
+ if (typeof encoding === "string") {
+ encoding = encoding.toLowerCase();
+ }
+ if (!Buffer.isEncoding(encoding)) {
+ throw new ERR_UNKNOWN_ENCODING(encoding);
+ }
+ this._writableState.defaultEncoding = encoding;
+ return this;
+ }
+}
+
+export default Writable;
+export { WritableState };
diff --git a/std/node/_stream/writable_test.ts b/std/node/_stream/writable_test.ts
new file mode 100644
index 000000000..d0650c49e
--- /dev/null
+++ b/std/node/_stream/writable_test.ts
@@ -0,0 +1,209 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import finished from "./end-of-stream.ts";
+import Writable from "../_stream/writable.ts";
+import { deferred } from "../../async/mod.ts";
+import {
+ assert,
+ assertEquals,
+ assertStrictEquals,
+ assertThrows,
+} from "../../testing/asserts.ts";
+
+Deno.test("Writable stream writes correctly", async () => {
+ let callback: undefined | ((error?: Error | null | undefined) => void);
+
+ let writeExecuted = 0;
+ const writeExecutedExpected = 1;
+ const writeExpectedExecutions = deferred();
+
+ let writevExecuted = 0;
+ const writevExecutedExpected = 1;
+ const writevExpectedExecutions = deferred();
+
+ const writable = new Writable({
+ write: (chunk, encoding, cb) => {
+ writeExecuted++;
+ if (writeExecuted == writeExecutedExpected) {
+ writeExpectedExecutions.resolve();
+ }
+ assert(chunk instanceof Buffer);
+ assertStrictEquals(encoding, "buffer");
+ assertStrictEquals(String(chunk), "ABC");
+ callback = cb;
+ },
+ writev: (chunks) => {
+ writevExecuted++;
+ if (writevExecuted == writevExecutedExpected) {
+ writevExpectedExecutions.resolve();
+ }
+ assertStrictEquals(chunks.length, 2);
+ assertStrictEquals(chunks[0].encoding, "buffer");
+ assertStrictEquals(chunks[1].encoding, "buffer");
+ assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI");
+ },
+ });
+
+ writable.write(new TextEncoder().encode("ABC"));
+ writable.write(new TextEncoder().encode("DEF"));
+ writable.end(new TextEncoder().encode("GHI"));
+ callback?.();
+
+ const writeTimeout = setTimeout(
+ () => writeExpectedExecutions.reject(),
+ 1000,
+ );
+ const writevTimeout = setTimeout(
+ () => writevExpectedExecutions.reject(),
+ 1000,
+ );
+ await writeExpectedExecutions;
+ await writevExpectedExecutions;
+ clearTimeout(writeTimeout);
+ clearTimeout(writevTimeout);
+ assertEquals(writeExecuted, writeExecutedExpected);
+ assertEquals(writevExecuted, writevExecutedExpected);
+});
+
+Deno.test("Writable stream writes Uint8Array in object mode", async () => {
+ let writeExecuted = 0;
+ const writeExecutedExpected = 1;
+ const writeExpectedExecutions = deferred();
+
+ const ABC = new TextEncoder().encode("ABC");
+
+ const writable = new Writable({
+ objectMode: true,
+ write: (chunk, encoding, cb) => {
+ writeExecuted++;
+ if (writeExecuted == writeExecutedExpected) {
+ writeExpectedExecutions.resolve();
+ }
+ assert(!(chunk instanceof Buffer));
+ assert(chunk instanceof Uint8Array);
+ assertEquals(chunk, ABC);
+ assertEquals(encoding, "utf8");
+ cb();
+ },
+ });
+
+ writable.end(ABC);
+
+ const writeTimeout = setTimeout(
+ () => writeExpectedExecutions.reject(),
+ 1000,
+ );
+ await writeExpectedExecutions;
+ clearTimeout(writeTimeout);
+ assertEquals(writeExecuted, writeExecutedExpected);
+});
+
+Deno.test("Writable stream throws on unexpected close", async () => {
+ let finishedExecuted = 0;
+ const finishedExecutedExpected = 1;
+ const finishedExpectedExecutions = deferred();
+
+ const writable = new Writable({
+ write: () => {},
+ });
+ writable.writable = false;
+ writable.destroy();
+
+ finished(writable, (err) => {
+ finishedExecuted++;
+ if (finishedExecuted == finishedExecutedExpected) {
+ finishedExpectedExecutions.resolve();
+ }
+ assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE");
+ });
+
+ const finishedTimeout = setTimeout(
+ () => finishedExpectedExecutions.reject(),
+ 1000,
+ );
+ await finishedExpectedExecutions;
+ clearTimeout(finishedTimeout);
+ assertEquals(finishedExecuted, finishedExecutedExpected);
+});
+
+Deno.test("Writable stream finishes correctly", async () => {
+ let finishedExecuted = 0;
+ const finishedExecutedExpected = 1;
+ const finishedExpectedExecutions = deferred();
+
+ const w = new Writable({
+ write(_chunk, _encoding, cb) {
+ cb();
+ },
+ autoDestroy: false,
+ });
+
+ w.end("asd");
+
+ queueMicrotask(() => {
+ finished(w, () => {
+ finishedExecuted++;
+ if (finishedExecuted == finishedExecutedExpected) {
+ finishedExpectedExecutions.resolve();
+ }
+ });
+ });
+
+ const finishedTimeout = setTimeout(
+ () => finishedExpectedExecutions.reject(),
+ 1000,
+ );
+ await finishedExpectedExecutions;
+ clearTimeout(finishedTimeout);
+ assertEquals(finishedExecuted, finishedExecutedExpected);
+});
+
+Deno.test("Writable stream finishes correctly after error", async () => {
+ let errorExecuted = 0;
+ const errorExecutedExpected = 1;
+ const errorExpectedExecutions = deferred();
+
+ let finishedExecuted = 0;
+ const finishedExecutedExpected = 1;
+ const finishedExpectedExecutions = deferred();
+
+ const w = new Writable({
+ write(_chunk, _encoding, cb) {
+ cb(new Error());
+ },
+ autoDestroy: false,
+ });
+ w.write("asd");
+ w.on("error", () => {
+ errorExecuted++;
+ if (errorExecuted == errorExecutedExpected) {
+ errorExpectedExecutions.resolve();
+ }
+ finished(w, () => {
+ finishedExecuted++;
+ if (finishedExecuted == finishedExecutedExpected) {
+ finishedExpectedExecutions.resolve();
+ }
+ });
+ });
+
+ const errorTimeout = setTimeout(
+ () => errorExpectedExecutions.reject(),
+ 1000,
+ );
+ const finishedTimeout = setTimeout(
+ () => finishedExpectedExecutions.reject(),
+ 1000,
+ );
+ await finishedExpectedExecutions;
+ await errorExpectedExecutions;
+ clearTimeout(finishedTimeout);
+ clearTimeout(errorTimeout);
+ assertEquals(finishedExecuted, finishedExecutedExpected);
+ assertEquals(errorExecuted, errorExecutedExpected);
+});
+
+Deno.test("Writable stream fails on 'write' null value", () => {
+ const writable = new Writable();
+ assertThrows(() => writable.write(null));
+});