summaryrefslogtreecommitdiff
path: root/std/node/_stream/duplex_test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/duplex_test.ts')
-rw-r--r--std/node/_stream/duplex_test.ts698
1 files changed, 0 insertions, 698 deletions
diff --git a/std/node/_stream/duplex_test.ts b/std/node/_stream/duplex_test.ts
deleted file mode 100644
index 1596ec218..000000000
--- a/std/node/_stream/duplex_test.ts
+++ /dev/null
@@ -1,698 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import { Buffer } from "../buffer.ts";
-import Duplex from "./duplex.ts";
-import finished from "./end_of_stream.ts";
-import {
- assert,
- assertEquals,
- assertStrictEquals,
- assertThrows,
-} from "../../testing/asserts.ts";
-import { deferred, delay } from "../../async/mod.ts";
-
-Deno.test("Duplex stream works normally", () => {
- const stream = new Duplex({ objectMode: true });
-
- assert(stream._readableState.objectMode);
- assert(stream._writableState.objectMode);
- assert(stream.allowHalfOpen);
- assertEquals(stream.listenerCount("end"), 0);
-
- let written: { val: number };
- let read: { val: number };
-
- stream._write = (obj, _, cb) => {
- written = obj;
- cb();
- };
-
- stream._read = () => {};
-
- stream.on("data", (obj) => {
- read = obj;
- });
-
- stream.push({ val: 1 });
- stream.end({ val: 2 });
-
- stream.on("finish", () => {
- assertEquals(read.val, 1);
- assertEquals(written.val, 2);
- });
-});
-
-Deno.test("Duplex stream gets constructed correctly", () => {
- const d1 = new Duplex({
- objectMode: true,
- highWaterMark: 100,
- });
-
- assertEquals(d1.readableObjectMode, true);
- assertEquals(d1.readableHighWaterMark, 100);
- assertEquals(d1.writableObjectMode, true);
- assertEquals(d1.writableHighWaterMark, 100);
-
- const d2 = new Duplex({
- readableObjectMode: false,
- readableHighWaterMark: 10,
- writableObjectMode: true,
- writableHighWaterMark: 100,
- });
-
- assertEquals(d2.writableObjectMode, true);
- assertEquals(d2.writableHighWaterMark, 100);
- assertEquals(d2.readableObjectMode, false);
- assertEquals(d2.readableHighWaterMark, 10);
-});
-
-Deno.test("Duplex stream can be paused", () => {
- const readable = new Duplex();
-
- // _read is a noop, here.
- readable._read = () => {};
-
- // Default state of a stream is not "paused"
- assert(!readable.isPaused());
-
- // Make the stream start flowing...
- readable.on("data", () => {});
-
- // still not paused.
- assert(!readable.isPaused());
-
- readable.pause();
- assert(readable.isPaused());
- readable.resume();
- assert(!readable.isPaused());
-});
-
-Deno.test("Duplex stream sets enconding correctly", () => {
- const readable = new Duplex({
- read() {},
- });
-
- readable.setEncoding("utf8");
-
- readable.push(new TextEncoder().encode("DEF"));
- readable.unshift(new TextEncoder().encode("ABC"));
-
- assertStrictEquals(readable.read(), "ABCDEF");
-});
-
-Deno.test("Duplex stream sets encoding correctly", () => {
- const readable = new Duplex({
- read() {},
- });
-
- readable.setEncoding("utf8");
-
- readable.push(new TextEncoder().encode("DEF"));
- readable.unshift(new TextEncoder().encode("ABC"));
-
- assertStrictEquals(readable.read(), "ABCDEF");
-});
-
-Deno.test("Duplex stream holds up a big push", async () => {
- let readExecuted = 0;
- const readExecutedExpected = 3;
- const readExpectedExecutions = deferred();
-
- let endExecuted = 0;
- const endExecutedExpected = 1;
- const endExpectedExecutions = deferred();
-
- const str = "asdfasdfasdfasdfasdf";
-
- const r = new Duplex({
- highWaterMark: 5,
- encoding: "utf8",
- });
-
- let reads = 0;
-
- function _read() {
- if (reads === 0) {
- setTimeout(() => {
- r.push(str);
- }, 1);
- reads++;
- } else if (reads === 1) {
- const ret = r.push(str);
- assertEquals(ret, false);
- reads++;
- } else {
- r.push(null);
- }
- }
-
- r._read = () => {
- readExecuted++;
- if (readExecuted == readExecutedExpected) {
- readExpectedExecutions.resolve();
- }
- _read();
- };
-
- r.on("end", () => {
- endExecuted++;
- if (endExecuted == endExecutedExpected) {
- endExpectedExecutions.resolve();
- }
- });
-
- // Push some data in to start.
- // We've never gotten any read event at this point.
- const ret = r.push(str);
- assert(!ret);
- let chunk = r.read();
- assertEquals(chunk, str);
- chunk = r.read();
- assertEquals(chunk, null);
-
- r.once("readable", () => {
- // This time, we'll get *all* the remaining data, because
- // it's been added synchronously, as the read WOULD take
- // us below the hwm, and so it triggered a _read() again,
- // which synchronously added more, which we then return.
- chunk = r.read();
- assertEquals(chunk, str + str);
-
- chunk = r.read();
- assertEquals(chunk, null);
- });
-
- const readTimeout = setTimeout(
- () => readExpectedExecutions.reject(),
- 1000,
- );
- const endTimeout = setTimeout(
- () => endExpectedExecutions.reject(),
- 1000,
- );
- await readExpectedExecutions;
- await endExpectedExecutions;
- clearTimeout(readTimeout);
- clearTimeout(endTimeout);
- assertEquals(readExecuted, readExecutedExpected);
- assertEquals(endExecuted, endExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- const r = new Duplex({
- highWaterMark: 3,
- });
-
- r._read = () => {
- throw new Error("_read must not be called");
- };
- r.push(Buffer.from("blerg"));
-
- setTimeout(function () {
- assert(!r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- clearTimeout(readableTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- let readExecuted = 0;
- const readExecutedExpected = 1;
- const readExpectedExecutions = deferred();
-
- const r = new Duplex({
- highWaterMark: 3,
- });
-
- r._read = () => {
- readExecuted++;
- if (readExecuted == readExecutedExpected) {
- readExpectedExecutions.resolve();
- }
- };
-
- r.push(Buffer.from("bl"));
-
- setTimeout(function () {
- assert(r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- const readTimeout = setTimeout(
- () => readExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- await readExpectedExecutions;
- clearTimeout(readableTimeout);
- clearTimeout(readTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
- assertEquals(readExecuted, readExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => {
- let readableExecuted = 0;
- const readableExecutedExpected = 1;
- const readableExpectedExecutions = deferred();
-
- const r = new Duplex({
- highWaterMark: 30,
- });
-
- r._read = () => {
- throw new Error("Must not be executed");
- };
-
- r.push(Buffer.from("blerg"));
- //This ends the stream and triggers end
- r.push(null);
-
- setTimeout(function () {
- // Assert we're testing what we think we are
- assert(!r._readableState.reading);
- r.on("readable", () => {
- readableExecuted++;
- if (readableExecuted == readableExecutedExpected) {
- readableExpectedExecutions.resolve();
- }
- });
- }, 1);
-
- const readableTimeout = setTimeout(
- () => readableExpectedExecutions.reject(),
- 1000,
- );
- await readableExpectedExecutions;
- clearTimeout(readableTimeout);
- assertEquals(readableExecuted, readableExecutedExpected);
-});
-
-Deno.test("Duplex stream: 'read' is emitted on empty string pushed in non-object mode", async () => {
- let endExecuted = 0;
- const endExecutedExpected = 1;
- const endExpectedExecutions = deferred();
-
- const underlyingData = ["", "x", "y", "", "z"];
- const expected = underlyingData.filter((data) => data);
- const result: unknown[] = [];
-
- const r = new Duplex({
- encoding: "utf8",
- });
- r._read = function () {
- queueMicrotask(() => {
- if (!underlyingData.length) {
- this.push(null);
- } else {
- this.push(underlyingData.shift());
- }
- });
- };
-
- r.on("readable", () => {
- const data = r.read();
- if (data !== null) result.push(data);
- });
-
- r.on("end", () => {
- endExecuted++;
- if (endExecuted == endExecutedExpected) {
- endExpectedExecutions.resolve();
- }
- assertEquals(result, expected);
- });
-
- const endTimeout = setTimeout(
- () => endExpectedExecutions.reject(),
- 1000,
- );
- await endExpectedExecutions;
- clearTimeout(endTimeout);
- assertEquals(endExecuted, endExecutedExpected);
-});
-
-Deno.test("Duplex stream: listeners can be removed", () => {
- const r = new Duplex();
- r._read = () => {};
- r.on("data", () => {});
-
- r.removeAllListeners("data");
-
- assertEquals(r.eventNames().length, 0);
-});
-
-Deno.test("Duplex stream writes correctly", async () => {
- let callback: undefined | ((error?: Error | null | undefined) => void);
-
- let writeExecuted = 0;
- const writeExecutedExpected = 1;
- const writeExpectedExecutions = deferred();
-
- let writevExecuted = 0;
- const writevExecutedExpected = 1;
- const writevExpectedExecutions = deferred();
-
- const writable = new Duplex({
- write: (chunk, encoding, cb) => {
- writeExecuted++;
- if (writeExecuted == writeExecutedExpected) {
- writeExpectedExecutions.resolve();
- }
- assert(chunk instanceof Buffer);
- assertStrictEquals(encoding, "buffer");
- assertStrictEquals(String(chunk), "ABC");
- callback = cb;
- },
- writev: (chunks) => {
- writevExecuted++;
- if (writevExecuted == writevExecutedExpected) {
- writevExpectedExecutions.resolve();
- }
- assertStrictEquals(chunks.length, 2);
- assertStrictEquals(chunks[0].encoding, "buffer");
- assertStrictEquals(chunks[1].encoding, "buffer");
- assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI");
- },
- });
-
- writable.write(new TextEncoder().encode("ABC"));
- writable.write(new TextEncoder().encode("DEF"));
- writable.end(new TextEncoder().encode("GHI"));
- callback?.();
-
- const writeTimeout = setTimeout(
- () => writeExpectedExecutions.reject(),
- 1000,
- );
- const writevTimeout = setTimeout(
- () => writevExpectedExecutions.reject(),
- 1000,
- );
- await writeExpectedExecutions;
- await writevExpectedExecutions;
- clearTimeout(writeTimeout);
- clearTimeout(writevTimeout);
- assertEquals(writeExecuted, writeExecutedExpected);
- assertEquals(writevExecuted, writevExecutedExpected);
-});
-
-Deno.test("Duplex stream writes Uint8Array in object mode", async () => {
- let writeExecuted = 0;
- const writeExecutedExpected = 1;
- const writeExpectedExecutions = deferred();
-
- const ABC = new TextEncoder().encode("ABC");
-
- const writable = new Duplex({
- objectMode: true,
- write: (chunk, encoding, cb) => {
- writeExecuted++;
- if (writeExecuted == writeExecutedExpected) {
- writeExpectedExecutions.resolve();
- }
- assert(!(chunk instanceof Buffer));
- assert(chunk instanceof Uint8Array);
- assertEquals(chunk, ABC);
- assertEquals(encoding, "utf8");
- cb();
- },
- });
-
- writable.end(ABC);
-
- const writeTimeout = setTimeout(
- () => writeExpectedExecutions.reject(),
- 1000,
- );
- await writeExpectedExecutions;
- clearTimeout(writeTimeout);
- assertEquals(writeExecuted, writeExecutedExpected);
-});
-
-Deno.test("Duplex stream throws on unexpected close", async () => {
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const writable = new Duplex({
- write: () => {},
- });
- writable.writable = false;
- writable.destroy();
-
- finished(writable, (err) => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE");
- });
-
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- clearTimeout(finishedTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
-});
-
-Deno.test("Duplex stream finishes correctly after error", async () => {
- let errorExecuted = 0;
- const errorExecutedExpected = 1;
- const errorExpectedExecutions = deferred();
-
- let finishedExecuted = 0;
- const finishedExecutedExpected = 1;
- const finishedExpectedExecutions = deferred();
-
- const w = new Duplex({
- write(_chunk, _encoding, cb) {
- cb(new Error());
- },
- autoDestroy: false,
- });
- w.write("asd");
- w.on("error", () => {
- errorExecuted++;
- if (errorExecuted == errorExecutedExpected) {
- errorExpectedExecutions.resolve();
- }
- finished(w, () => {
- finishedExecuted++;
- if (finishedExecuted == finishedExecutedExpected) {
- finishedExpectedExecutions.resolve();
- }
- });
- });
-
- const errorTimeout = setTimeout(
- () => errorExpectedExecutions.reject(),
- 1000,
- );
- const finishedTimeout = setTimeout(
- () => finishedExpectedExecutions.reject(),
- 1000,
- );
- await finishedExpectedExecutions;
- await errorExpectedExecutions;
- clearTimeout(finishedTimeout);
- clearTimeout(errorTimeout);
- assertEquals(finishedExecuted, finishedExecutedExpected);
- assertEquals(errorExecuted, errorExecutedExpected);
-});
-
-Deno.test("Duplex stream fails on 'write' null value", () => {
- const writable = new Duplex();
- assertThrows(() => writable.write(null));
-});
-
-Deno.test("Duplex stream is destroyed correctly", async () => {
- let closeExecuted = 0;
- const closeExecutedExpected = 1;
- const closeExpectedExecutions = deferred();
-
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- write(_chunk, _enc, cb) {
- cb();
- },
- read() {},
- });
-
- duplex.resume();
-
- function never() {
- unexpectedExecution.reject();
- }
-
- duplex.on("end", never);
- duplex.on("finish", never);
- duplex.on("close", () => {
- closeExecuted++;
- if (closeExecuted == closeExecutedExpected) {
- closeExpectedExecutions.resolve();
- }
- });
-
- duplex.destroy();
- assertEquals(duplex.destroyed, true);
-
- const closeTimeout = setTimeout(
- () => closeExpectedExecutions.reject(),
- 1000,
- );
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
- await closeExpectedExecutions;
- clearTimeout(closeTimeout);
- assertEquals(closeExecuted, closeExecutedExpected);
-});
-
-Deno.test("Duplex stream errors correctly on destroy", async () => {
- let errorExecuted = 0;
- const errorExecutedExpected = 1;
- const errorExpectedExecutions = deferred();
-
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- write(_chunk, _enc, cb) {
- cb();
- },
- read() {},
- });
- duplex.resume();
-
- const expected = new Error("kaboom");
-
- function never() {
- unexpectedExecution.reject();
- }
-
- duplex.on("end", never);
- duplex.on("finish", never);
- duplex.on("error", (err) => {
- errorExecuted++;
- if (errorExecuted == errorExecutedExpected) {
- errorExpectedExecutions.resolve();
- }
- assertStrictEquals(err, expected);
- });
-
- duplex.destroy(expected);
- assertEquals(duplex.destroyed, true);
-
- const errorTimeout = setTimeout(
- () => errorExpectedExecutions.reject(),
- 1000,
- );
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
- await errorExpectedExecutions;
- clearTimeout(errorTimeout);
- assertEquals(errorExecuted, errorExecutedExpected);
-});
-
-Deno.test("Duplex stream doesn't finish on allowHalfOpen", async () => {
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- read() {},
- });
-
- assertEquals(duplex.allowHalfOpen, true);
- duplex.on("finish", () => unexpectedExecution.reject());
- assertEquals(duplex.listenerCount("end"), 0);
- duplex.resume();
- duplex.push(null);
-
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
-});
-
-Deno.test("Duplex stream finishes when allowHalfOpen is disabled", async () => {
- let finishExecuted = 0;
- const finishExecutedExpected = 1;
- const finishExpectedExecutions = deferred();
-
- const duplex = new Duplex({
- read() {},
- allowHalfOpen: false,
- });
-
- assertEquals(duplex.allowHalfOpen, false);
- duplex.on("finish", () => {
- finishExecuted++;
- if (finishExecuted == finishExecutedExpected) {
- finishExpectedExecutions.resolve();
- }
- });
- assertEquals(duplex.listenerCount("end"), 0);
- duplex.resume();
- duplex.push(null);
-
- const finishTimeout = setTimeout(
- () => finishExpectedExecutions.reject(),
- 1000,
- );
- await finishExpectedExecutions;
- clearTimeout(finishTimeout);
- assertEquals(finishExecuted, finishExecutedExpected);
-});
-
-Deno.test("Duplex stream doesn't finish when allowHalfOpen is disabled but stream ended", async () => {
- const unexpectedExecution = deferred();
-
- const duplex = new Duplex({
- read() {},
- allowHalfOpen: false,
- });
-
- assertEquals(duplex.allowHalfOpen, false);
- duplex._writableState.ended = true;
- duplex.on("finish", () => unexpectedExecution.reject());
- assertEquals(duplex.listenerCount("end"), 0);
- duplex.resume();
- duplex.push(null);
-
- await Promise.race([
- unexpectedExecution,
- delay(100),
- ]);
-});