summaryrefslogtreecommitdiff
path: root/std/node/_stream/pipeline_test.ts
diff options
context:
space:
mode:
authorSteven Guerrero <stephenguerrero43@gmail.com>2020-11-26 07:50:08 -0500
committerGitHub <noreply@github.com>2020-11-26 13:50:08 +0100
commit9042fcc12e7774cdd0ca3a5d08918a07dae8102b (patch)
tree8b5ff11412aae9bb714e0bb0b9b0358db64a8657 /std/node/_stream/pipeline_test.ts
parent60e980c78180ee3b0a14d692307be275dc181c8d (diff)
feat(std/node/stream): Add Duplex, Transform, Passthrough, pipeline, finished and promises (#7940)
Diffstat (limited to 'std/node/_stream/pipeline_test.ts')
-rw-r--r--std/node/_stream/pipeline_test.ts387
1 files changed, 387 insertions, 0 deletions
diff --git a/std/node/_stream/pipeline_test.ts b/std/node/_stream/pipeline_test.ts
new file mode 100644
index 000000000..aa1869416
--- /dev/null
+++ b/std/node/_stream/pipeline_test.ts
@@ -0,0 +1,387 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import PassThrough from "./passthrough.ts";
+import pipeline from "./pipeline.ts";
+import Readable from "./readable.ts";
+import Transform from "./transform.ts";
+import Writable from "./writable.ts";
+import { mustCall } from "../_utils.ts";
+import {
+ assert,
+ assertEquals,
+ assertStrictEquals,
+} from "../../testing/asserts.ts";
+import type { NodeErrorAbstraction } from "../_errors.ts";
+
+Deno.test("Pipeline ends on stream finished", async () => {
+ let finished = false;
+
+ // deno-lint-ignore no-explicit-any
+ const processed: any[] = [];
+ const expected = [
+ Buffer.from("a"),
+ Buffer.from("b"),
+ Buffer.from("c"),
+ ];
+
+ const read = new Readable({
+ read() {},
+ });
+
+ const write = new Writable({
+ write(data, _enc, cb) {
+ processed.push(data);
+ cb();
+ },
+ });
+
+ write.on("finish", () => {
+ finished = true;
+ });
+
+ for (let i = 0; i < expected.length; i++) {
+ read.push(expected[i]);
+ }
+ read.push(null);
+
+ const [finishedCompleted, finishedCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assert(finished);
+ assertEquals(processed, expected);
+ },
+ 1,
+ );
+
+ pipeline(read, write, finishedCb);
+
+ await finishedCompleted;
+});
+
+Deno.test("Pipeline fails on stream destroyed", async () => {
+ const read = new Readable({
+ read() {},
+ });
+
+ const write = new Writable({
+ write(_data, _enc, cb) {
+ cb();
+ },
+ });
+
+ read.push("data");
+ queueMicrotask(() => read.destroy());
+
+ const [pipelineExecuted, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(err);
+ },
+ 1,
+ );
+ pipeline(read, write, pipelineCb);
+
+ await pipelineExecuted;
+});
+
+Deno.test("Pipeline exits on stream error", async () => {
+ const read = new Readable({
+ read() {},
+ });
+
+ const transform = new Transform({
+ transform(_data, _enc, cb) {
+ cb(new Error("kaboom"));
+ },
+ });
+
+ const write = new Writable({
+ write(_data, _enc, cb) {
+ cb();
+ },
+ });
+
+ const [readExecution, readCb] = mustCall();
+ read.on("close", readCb);
+ const [closeExecution, closeCb] = mustCall();
+ transform.on("close", closeCb);
+ const [writeExecution, writeCb] = mustCall();
+ write.on("close", writeCb);
+
+ const errorExecutions = [read, transform, write]
+ .map((stream) => {
+ const [execution, cb] = mustCall((err?: NodeErrorAbstraction | null) => {
+ assertEquals(err, new Error("kaboom"));
+ });
+
+ stream.on("error", cb);
+ return execution;
+ });
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assertEquals(err, new Error("kaboom"));
+ },
+ );
+ const dst = pipeline(read, transform, write, pipelineCb);
+
+ assertStrictEquals(dst, write);
+
+ read.push("hello");
+
+ await readExecution;
+ await closeExecution;
+ await writeExecution;
+ await Promise.all(errorExecutions);
+ await pipelineExecution;
+});
+
+Deno.test("Pipeline processes iterators correctly", async () => {
+ let res = "";
+ const w = new Writable({
+ write(chunk, _encoding, callback) {
+ res += chunk;
+ callback();
+ },
+ });
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assertEquals(res, "helloworld");
+ },
+ );
+ pipeline(
+ function* () {
+ yield "hello";
+ yield "world";
+ }(),
+ w,
+ pipelineCb,
+ );
+
+ await pipelineExecution;
+});
+
+Deno.test("Pipeline processes async iterators correctly", async () => {
+ let res = "";
+ const w = new Writable({
+ write(chunk, _encoding, callback) {
+ res += chunk;
+ callback();
+ },
+ });
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assertEquals(res, "helloworld");
+ },
+ );
+ pipeline(
+ async function* () {
+ await Promise.resolve();
+ yield "hello";
+ yield "world";
+ }(),
+ w,
+ pipelineCb,
+ );
+
+ await pipelineExecution;
+});
+
+Deno.test("Pipeline processes generators correctly", async () => {
+ let res = "";
+ const w = new Writable({
+ write(chunk, _encoding, callback) {
+ res += chunk;
+ callback();
+ },
+ });
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assertEquals(res, "helloworld");
+ },
+ );
+ pipeline(
+ function* () {
+ yield "hello";
+ yield "world";
+ },
+ w,
+ pipelineCb,
+ );
+
+ await pipelineExecution;
+});
+
+Deno.test("Pipeline processes async generators correctly", async () => {
+ let res = "";
+ const w = new Writable({
+ write(chunk, _encoding, callback) {
+ res += chunk;
+ callback();
+ },
+ });
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assertEquals(res, "helloworld");
+ },
+ );
+ pipeline(
+ async function* () {
+ await Promise.resolve();
+ yield "hello";
+ yield "world";
+ },
+ w,
+ pipelineCb,
+ );
+
+ await pipelineExecution;
+});
+
+Deno.test("Pipeline handles generator transforms", async () => {
+ let res = "";
+
+ const [pipelineExecuted, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assert(!err);
+ assertEquals(res, "HELLOWORLD");
+ },
+ );
+ pipeline(
+ async function* () {
+ await Promise.resolve();
+ yield "hello";
+ yield "world";
+ },
+ async function* (source: string[]) {
+ for await (const chunk of source) {
+ yield chunk.toUpperCase();
+ }
+ },
+ async function (source: string[]) {
+ for await (const chunk of source) {
+ res += chunk;
+ }
+ },
+ pipelineCb,
+ );
+
+ await pipelineExecuted;
+});
+
+Deno.test("Pipeline passes result to final callback", async () => {
+ const [pipelineExecuted, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null, val?: unknown) => {
+ assert(!err);
+ assertEquals(val, "HELLOWORLD");
+ },
+ );
+ pipeline(
+ async function* () {
+ await Promise.resolve();
+ yield "hello";
+ yield "world";
+ },
+ async function* (source: string[]) {
+ for await (const chunk of source) {
+ yield chunk.toUpperCase();
+ }
+ },
+ async function (source: string[]) {
+ let ret = "";
+ for await (const chunk of source) {
+ ret += chunk;
+ }
+ return ret;
+ },
+ pipelineCb,
+ );
+
+ await pipelineExecuted;
+});
+
+Deno.test("Pipeline returns a stream after ending", async () => {
+ const [pipelineExecuted, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assertEquals(err, undefined);
+ },
+ );
+ const ret = pipeline(
+ async function* () {
+ await Promise.resolve();
+ yield "hello";
+ },
+ // deno-lint-ignore require-yield
+ async function* (source: string[]) {
+ for await (const chunk of source) {
+ chunk;
+ }
+ },
+ pipelineCb,
+ );
+
+ ret.resume();
+
+ assertEquals(typeof ret.pipe, "function");
+
+ await pipelineExecuted;
+});
+
+Deno.test("Pipeline returns a stream after erroring", async () => {
+ const errorText = "kaboom";
+
+ const [pipelineExecuted, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assertEquals(err?.message, errorText);
+ },
+ );
+ const ret = pipeline(
+ // deno-lint-ignore require-yield
+ async function* () {
+ await Promise.resolve();
+ throw new Error(errorText);
+ },
+ // deno-lint-ignore require-yield
+ async function* (source: string[]) {
+ for await (const chunk of source) {
+ chunk;
+ }
+ },
+ pipelineCb,
+ );
+
+ ret.resume();
+
+ assertEquals(typeof ret.pipe, "function");
+
+ await pipelineExecuted;
+});
+
+Deno.test("Pipeline destination gets destroyed on error", async () => {
+ const errorText = "kaboom";
+ const s = new PassThrough();
+
+ const [pipelineExecution, pipelineCb] = mustCall(
+ (err?: NodeErrorAbstraction | null) => {
+ assertEquals(err?.message, errorText);
+ assertEquals(s.destroyed, true);
+ },
+ );
+ pipeline(
+ // deno-lint-ignore require-yield
+ async function* () {
+ throw new Error(errorText);
+ },
+ s,
+ pipelineCb,
+ );
+
+ await pipelineExecution;
+});