summaryrefslogtreecommitdiff
path: root/std/node/_stream/readable_test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/readable_test.ts')
-rw-r--r--std/node/_stream/readable_test.ts489
1 files changed, 489 insertions, 0 deletions
diff --git a/std/node/_stream/readable_test.ts b/std/node/_stream/readable_test.ts
new file mode 100644
index 000000000..72767e28f
--- /dev/null
+++ b/std/node/_stream/readable_test.ts
@@ -0,0 +1,489 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import Readable from "../_stream/readable.ts";
+import { once } from "../events.ts";
+import { deferred } from "../../async/mod.ts";
+import {
+ assert,
+ assertEquals,
+ assertStrictEquals,
+} from "../../testing/asserts.ts";
+
+Deno.test("Readable stream from iterator", async () => {
+ function* generate() {
+ yield "a";
+ yield "b";
+ yield "c";
+ }
+
+ const stream = Readable.from(generate());
+
+ const expected = ["a", "b", "c"];
+
+ for await (const chunk of stream) {
+ assertStrictEquals(chunk, expected.shift());
+ }
+});
+
+Deno.test("Readable stream from async iterator", async () => {
+ async function* generate() {
+ yield "a";
+ yield "b";
+ yield "c";
+ }
+
+ const stream = Readable.from(generate());
+
+ const expected = ["a", "b", "c"];
+
+ for await (const chunk of stream) {
+ assertStrictEquals(chunk, expected.shift());
+ }
+});
+
+Deno.test("Readable stream from promise", async () => {
+ const promises = [
+ Promise.resolve("a"),
+ Promise.resolve("b"),
+ Promise.resolve("c"),
+ ];
+
+ const stream = Readable.from(promises);
+
+ const expected = ["a", "b", "c"];
+
+ for await (const chunk of stream) {
+ assertStrictEquals(chunk, expected.shift());
+ }
+});
+
+Deno.test("Readable stream from string", async () => {
+ const string = "abc";
+ const stream = Readable.from(string);
+
+ for await (const chunk of stream) {
+ assertStrictEquals(chunk, string);
+ }
+});
+
+Deno.test("Readable stream from Buffer", async () => {
+ const string = "abc";
+ const stream = Readable.from(Buffer.from(string));
+
+ for await (const chunk of stream) {
+ assertStrictEquals((chunk as Buffer).toString(), string);
+ }
+});
+
+Deno.test("Readable stream gets destroyed on error", async () => {
+ // deno-lint-ignore require-yield
+ async function* generate() {
+ throw new Error("kaboom");
+ }
+
+ const stream = Readable.from(generate());
+
+ stream.read();
+
+ const [err] = await once(stream, "error");
+ assertStrictEquals(err.message, "kaboom");
+ assertStrictEquals(stream.destroyed, true);
+});
+
+Deno.test("Readable stream works as Transform stream", async () => {
+ async function* generate(stream: Readable) {
+ for await (const chunk of stream) {
+ yield (chunk as string).toUpperCase();
+ }
+ }
+
+ const source = new Readable({
+ objectMode: true,
+ read() {
+ this.push("a");
+ this.push("b");
+ this.push("c");
+ this.push(null);
+ },
+ });
+
+ const stream = Readable.from(generate(source));
+
+ const expected = ["A", "B", "C"];
+
+ for await (const chunk of stream) {
+ assertStrictEquals(chunk, expected.shift());
+ }
+});
+
+Deno.test("Readable stream can be paused", () => {
+ const readable = new Readable();
+
+ // _read is a noop, here.
+ readable._read = () => {};
+
+ // Default state of a stream is not "paused"
+ assert(!readable.isPaused());
+
+ // Make the stream start flowing...
+ readable.on("data", () => {});
+
+ // still not paused.
+ assert(!readable.isPaused());
+
+ readable.pause();
+ assert(readable.isPaused());
+ readable.resume();
+ assert(!readable.isPaused());
+});
+
+Deno.test("Readable stream sets enconding correctly", () => {
+ const readable = new Readable({
+ read() {},
+ });
+
+ readable.setEncoding("utf8");
+
+ readable.push(new TextEncoder().encode("DEF"));
+ readable.unshift(new TextEncoder().encode("ABC"));
+
+ assertStrictEquals(readable.read(), "ABCDEF");
+});
+
+Deno.test("Readable stream sets encoding correctly", () => {
+ const readable = new Readable({
+ read() {},
+ });
+
+ readable.setEncoding("utf8");
+
+ readable.push(new TextEncoder().encode("DEF"));
+ readable.unshift(new TextEncoder().encode("ABC"));
+
+ assertStrictEquals(readable.read(), "ABCDEF");
+});
+
+Deno.test("Readable stream holds up a big push", async () => {
+ let readExecuted = 0;
+ const readExecutedExpected = 3;
+ const readExpectedExecutions = deferred();
+
+ let endExecuted = 0;
+ const endExecutedExpected = 1;
+ const endExpectedExecutions = deferred();
+
+ const str = "asdfasdfasdfasdfasdf";
+
+ const r = new Readable({
+ highWaterMark: 5,
+ encoding: "utf8",
+ });
+
+ let reads = 0;
+
+ function _read() {
+ if (reads === 0) {
+ setTimeout(() => {
+ r.push(str);
+ }, 1);
+ reads++;
+ } else if (reads === 1) {
+ const ret = r.push(str);
+ assertEquals(ret, false);
+ reads++;
+ } else {
+ r.push(null);
+ }
+ }
+
+ r._read = () => {
+ readExecuted++;
+ if (readExecuted == readExecutedExpected) {
+ readExpectedExecutions.resolve();
+ }
+ _read();
+ };
+
+ r.on("end", () => {
+ endExecuted++;
+ if (endExecuted == endExecutedExpected) {
+ endExpectedExecutions.resolve();
+ }
+ });
+
+ // Push some data in to start.
+ // We've never gotten any read event at this point.
+ const ret = r.push(str);
+ assert(!ret);
+ let chunk = r.read();
+ assertEquals(chunk, str);
+ chunk = r.read();
+ assertEquals(chunk, null);
+
+ r.once("readable", () => {
+ // This time, we'll get *all* the remaining data, because
+ // it's been added synchronously, as the read WOULD take
+ // us below the hwm, and so it triggered a _read() again,
+ // which synchronously added more, which we then return.
+ chunk = r.read();
+ assertEquals(chunk, str + str);
+
+ chunk = r.read();
+ assertEquals(chunk, null);
+ });
+
+ const readTimeout = setTimeout(
+ () => readExpectedExecutions.reject(),
+ 1000,
+ );
+ const endTimeout = setTimeout(
+ () => endExpectedExecutions.reject(),
+ 1000,
+ );
+ await readExpectedExecutions;
+ await endExpectedExecutions;
+ clearTimeout(readTimeout);
+ clearTimeout(endTimeout);
+ assertEquals(readExecuted, readExecutedExpected);
+ assertEquals(endExecuted, endExecutedExpected);
+});
+
+Deno.test("Readable stream: 'on' event", async () => {
+ async function* generate() {
+ yield "a";
+ yield "b";
+ yield "c";
+ }
+
+ const stream = Readable.from(generate());
+
+ let iterations = 0;
+ const expected = ["a", "b", "c"];
+
+ stream.on("data", (chunk) => {
+ iterations++;
+ assertStrictEquals(chunk, expected.shift());
+ });
+
+ await once(stream, "end");
+
+ assertStrictEquals(iterations, 3);
+});
+
+Deno.test("Readable stream: 'data' event", async () => {
+ async function* generate() {
+ yield "a";
+ yield "b";
+ yield "c";
+ }
+
+ const stream = Readable.from(generate(), { objectMode: false });
+
+ let iterations = 0;
+ const expected = ["a", "b", "c"];
+
+ stream.on("data", (chunk) => {
+ iterations++;
+ assertStrictEquals(chunk instanceof Buffer, true);
+ assertStrictEquals(chunk.toString(), expected.shift());
+ });
+
+ await once(stream, "end");
+
+ assertStrictEquals(iterations, 3);
+});
+
+Deno.test("Readable stream: 'data' event on non-object", async () => {
+ async function* generate() {
+ yield "a";
+ yield "b";
+ yield "c";
+ }
+
+ const stream = Readable.from(generate(), { objectMode: false });
+
+ let iterations = 0;
+ const expected = ["a", "b", "c"];
+
+ stream.on("data", (chunk) => {
+ iterations++;
+ assertStrictEquals(chunk instanceof Buffer, true);
+ assertStrictEquals(chunk.toString(), expected.shift());
+ });
+
+ await once(stream, "end");
+
+ assertStrictEquals(iterations, 3);
+});
+
+Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => {
+ let readableExecuted = 0;
+ const readableExecutedExpected = 1;
+ const readableExpectedExecutions = deferred();
+
+ const r = new Readable({
+ highWaterMark: 3,
+ });
+
+ r._read = () => {
+ throw new Error("_read must not be called");
+ };
+ r.push(Buffer.from("blerg"));
+
+ setTimeout(function () {
+ assert(!r._readableState.reading);
+ r.on("readable", () => {
+ readableExecuted++;
+ if (readableExecuted == readableExecutedExpected) {
+ readableExpectedExecutions.resolve();
+ }
+ });
+ }, 1);
+
+ const readableTimeout = setTimeout(
+ () => readableExpectedExecutions.reject(),
+ 1000,
+ );
+ await readableExpectedExecutions;
+ clearTimeout(readableTimeout);
+ assertEquals(readableExecuted, readableExecutedExpected);
+});
+
+Deno.test("Readable stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => {
+ let readableExecuted = 0;
+ const readableExecutedExpected = 1;
+ const readableExpectedExecutions = deferred();
+
+ let readExecuted = 0;
+ const readExecutedExpected = 1;
+ const readExpectedExecutions = deferred();
+
+ const r = new Readable({
+ highWaterMark: 3,
+ });
+
+ r._read = () => {
+ readExecuted++;
+ if (readExecuted == readExecutedExpected) {
+ readExpectedExecutions.resolve();
+ }
+ };
+
+ r.push(Buffer.from("bl"));
+
+ setTimeout(function () {
+ assert(r._readableState.reading);
+ r.on("readable", () => {
+ readableExecuted++;
+ if (readableExecuted == readableExecutedExpected) {
+ readableExpectedExecutions.resolve();
+ }
+ });
+ }, 1);
+
+ const readableTimeout = setTimeout(
+ () => readableExpectedExecutions.reject(),
+ 1000,
+ );
+ const readTimeout = setTimeout(
+ () => readExpectedExecutions.reject(),
+ 1000,
+ );
+ await readableExpectedExecutions;
+ await readExpectedExecutions;
+ clearTimeout(readableTimeout);
+ clearTimeout(readTimeout);
+ assertEquals(readableExecuted, readableExecutedExpected);
+ assertEquals(readExecuted, readExecutedExpected);
+});
+
+Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => {
+ let readableExecuted = 0;
+ const readableExecutedExpected = 1;
+ const readableExpectedExecutions = deferred();
+
+ const r = new Readable({
+ highWaterMark: 30,
+ });
+
+ r._read = () => {
+ throw new Error("Must not be executed");
+ };
+
+ r.push(Buffer.from("blerg"));
+ //This ends the stream and triggers end
+ r.push(null);
+
+ setTimeout(function () {
+ // Assert we're testing what we think we are
+ assert(!r._readableState.reading);
+ r.on("readable", () => {
+ readableExecuted++;
+ if (readableExecuted == readableExecutedExpected) {
+ readableExpectedExecutions.resolve();
+ }
+ });
+ }, 1);
+
+ const readableTimeout = setTimeout(
+ () => readableExpectedExecutions.reject(),
+ 1000,
+ );
+ await readableExpectedExecutions;
+ clearTimeout(readableTimeout);
+ assertEquals(readableExecuted, readableExecutedExpected);
+});
+
+Deno.test("Readable stream: 'read' is emitted on empty string pushed in non-object mode", async () => {
+ let endExecuted = 0;
+ const endExecutedExpected = 1;
+ const endExpectedExecutions = deferred();
+
+ const underlyingData = ["", "x", "y", "", "z"];
+ const expected = underlyingData.filter((data) => data);
+ const result: unknown[] = [];
+
+ const r = new Readable({
+ encoding: "utf8",
+ });
+ r._read = function () {
+ queueMicrotask(() => {
+ if (!underlyingData.length) {
+ this.push(null);
+ } else {
+ this.push(underlyingData.shift());
+ }
+ });
+ };
+
+ r.on("readable", () => {
+ const data = r.read();
+ if (data !== null) result.push(data);
+ });
+
+ r.on("end", () => {
+ endExecuted++;
+ if (endExecuted == endExecutedExpected) {
+ endExpectedExecutions.resolve();
+ }
+ assertEquals(result, expected);
+ });
+
+ const endTimeout = setTimeout(
+ () => endExpectedExecutions.reject(),
+ 1000,
+ );
+ await endExpectedExecutions;
+ clearTimeout(endTimeout);
+ assertEquals(endExecuted, endExecutedExpected);
+});
+
+Deno.test("Readable stream: listeners can be removed", () => {
+ const r = new Readable();
+ r._read = () => {};
+ r.on("data", () => {});
+
+ r.removeAllListeners("data");
+
+ assertEquals(r.eventNames().length, 0);
+});