summaryrefslogtreecommitdiff
path: root/std/node/_stream/promises_test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/promises_test.ts')
-rw-r--r--std/node/_stream/promises_test.ts84
1 files changed, 84 insertions, 0 deletions
diff --git a/std/node/_stream/promises_test.ts b/std/node/_stream/promises_test.ts
new file mode 100644
index 000000000..90803b4af
--- /dev/null
+++ b/std/node/_stream/promises_test.ts
@@ -0,0 +1,84 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import Readable from "./readable.ts";
+import Writable from "./writable.ts";
+import { pipeline } from "./promises.ts";
+import { deferred } from "../../async/mod.ts";
+import {
+ assert,
+ assertEquals,
+ assertThrowsAsync,
+} from "../../testing/asserts.ts";
+
+Deno.test("Promise pipeline works correctly", async () => {
+ let pipelineExecuted = 0;
+ const pipelineExecutedExpected = 1;
+ const pipelineExpectedExecutions = deferred();
+
+ let finished = false;
+ // deno-lint-ignore no-explicit-any
+ const processed: any[] = [];
+ const expected = [
+ Buffer.from("a"),
+ Buffer.from("b"),
+ Buffer.from("c"),
+ ];
+
+ const read = new Readable({
+ read() {},
+ });
+
+ const write = new Writable({
+ write(data, _enc, cb) {
+ processed.push(data);
+ cb();
+ },
+ });
+
+ write.on("finish", () => {
+ finished = true;
+ });
+
+ for (let i = 0; i < expected.length; i++) {
+ read.push(expected[i]);
+ }
+ read.push(null);
+
+ pipeline(read, write).then(() => {
+ pipelineExecuted++;
+ if (pipelineExecuted == pipelineExecutedExpected) {
+ pipelineExpectedExecutions.resolve();
+ }
+ assert(finished);
+ assertEquals(processed, expected);
+ });
+
+ const pipelineTimeout = setTimeout(
+ () => pipelineExpectedExecutions.reject(),
+ 1000,
+ );
+ await pipelineExpectedExecutions;
+ clearTimeout(pipelineTimeout);
+ assertEquals(pipelineExecuted, pipelineExecutedExpected);
+});
+
+Deno.test("Promise pipeline throws on readable destroyed", async () => {
+ const read = new Readable({
+ read() {},
+ });
+
+ const write = new Writable({
+ write(_data, _enc, cb) {
+ cb();
+ },
+ });
+
+ read.push("data");
+ read.destroy();
+
+ await assertThrowsAsync(
+ () => pipeline(read, write),
+ Error,
+ "Premature close",
+ );
+});