diff options
author | Steven Guerrero <stephenguerrero43@gmail.com> | 2020-11-26 07:50:08 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-26 13:50:08 +0100 |
commit | 9042fcc12e7774cdd0ca3a5d08918a07dae8102b (patch) | |
tree | 8b5ff11412aae9bb714e0bb0b9b0358db64a8657 /std/node/_stream/pipeline_test.ts | |
parent | 60e980c78180ee3b0a14d692307be275dc181c8d (diff) |
feat(std/node/stream): Add Duplex, Transform, Passthrough, pipeline, finished and promises (#7940)
Diffstat (limited to 'std/node/_stream/pipeline_test.ts')
-rw-r--r-- | std/node/_stream/pipeline_test.ts | 387 |
1 files changed, 387 insertions, 0 deletions
diff --git a/std/node/_stream/pipeline_test.ts b/std/node/_stream/pipeline_test.ts new file mode 100644 index 000000000..aa1869416 --- /dev/null +++ b/std/node/_stream/pipeline_test.ts @@ -0,0 +1,387 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import PassThrough from "./passthrough.ts"; +import pipeline from "./pipeline.ts"; +import Readable from "./readable.ts"; +import Transform from "./transform.ts"; +import Writable from "./writable.ts"; +import { mustCall } from "../_utils.ts"; +import { + assert, + assertEquals, + assertStrictEquals, +} from "../../testing/asserts.ts"; +import type { NodeErrorAbstraction } from "../_errors.ts"; + +Deno.test("Pipeline ends on stream finished", async () => { + let finished = false; + + // deno-lint-ignore no-explicit-any + const processed: any[] = []; + const expected = [ + Buffer.from("a"), + Buffer.from("b"), + Buffer.from("c"), + ]; + + const read = new Readable({ + read() {}, + }); + + const write = new Writable({ + write(data, _enc, cb) { + processed.push(data); + cb(); + }, + }); + + write.on("finish", () => { + finished = true; + }); + + for (let i = 0; i < expected.length; i++) { + read.push(expected[i]); + } + read.push(null); + + const [finishedCompleted, finishedCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assert(finished); + assertEquals(processed, expected); + }, + 1, + ); + + pipeline(read, write, finishedCb); + + await finishedCompleted; +}); + +Deno.test("Pipeline fails on stream destroyed", async () => { + const read = new Readable({ + read() {}, + }); + + const write = new Writable({ + write(_data, _enc, cb) { + cb(); + }, + }); + + read.push("data"); + queueMicrotask(() => read.destroy()); + + const [pipelineExecuted, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(err); + }, + 1, + ); + pipeline(read, write, pipelineCb); + + await pipelineExecuted; +}); + +Deno.test("Pipeline exits on stream error", async () => { + const read = new Readable({ + read() {}, + }); + + const transform = new Transform({ + transform(_data, _enc, cb) { + cb(new Error("kaboom")); + }, + }); + + const write = new Writable({ + write(_data, _enc, cb) { + cb(); + }, + }); + + const [readExecution, readCb] = mustCall(); + read.on("close", readCb); + const [closeExecution, closeCb] = mustCall(); + transform.on("close", closeCb); + const [writeExecution, writeCb] = mustCall(); + write.on("close", writeCb); + + const errorExecutions = [read, transform, write] + .map((stream) => { + const [execution, cb] = mustCall((err?: NodeErrorAbstraction | null) => { + assertEquals(err, new Error("kaboom")); + }); + + stream.on("error", cb); + return execution; + }); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assertEquals(err, new Error("kaboom")); + }, + ); + const dst = pipeline(read, transform, write, pipelineCb); + + assertStrictEquals(dst, write); + + read.push("hello"); + + await readExecution; + await closeExecution; + await writeExecution; + await Promise.all(errorExecutions); + await pipelineExecution; +}); + +Deno.test("Pipeline processes iterators correctly", async () => { + let res = ""; + const w = new Writable({ + write(chunk, _encoding, callback) { + res += chunk; + callback(); + }, + }); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assertEquals(res, "helloworld"); + }, + ); + pipeline( + function* () { + yield "hello"; + yield "world"; + }(), + w, + pipelineCb, + ); + + await pipelineExecution; +}); + +Deno.test("Pipeline processes async iterators correctly", async () => { + let res = ""; + const w = new Writable({ + write(chunk, _encoding, callback) { + res += chunk; + callback(); + }, + }); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assertEquals(res, "helloworld"); + }, + ); + pipeline( + async function* () { + await Promise.resolve(); + yield "hello"; + yield "world"; + }(), + w, + pipelineCb, + ); + + await pipelineExecution; +}); + +Deno.test("Pipeline processes generators correctly", async () => { + let res = ""; + const w = new Writable({ + write(chunk, _encoding, callback) { + res += chunk; + callback(); + }, + }); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assertEquals(res, "helloworld"); + }, + ); + pipeline( + function* () { + yield "hello"; + yield "world"; + }, + w, + pipelineCb, + ); + + await pipelineExecution; +}); + +Deno.test("Pipeline processes async generators correctly", async () => { + let res = ""; + const w = new Writable({ + write(chunk, _encoding, callback) { + res += chunk; + callback(); + }, + }); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assertEquals(res, "helloworld"); + }, + ); + pipeline( + async function* () { + await Promise.resolve(); + yield "hello"; + yield "world"; + }, + w, + pipelineCb, + ); + + await pipelineExecution; +}); + +Deno.test("Pipeline handles generator transforms", async () => { + let res = ""; + + const [pipelineExecuted, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assert(!err); + assertEquals(res, "HELLOWORLD"); + }, + ); + pipeline( + async function* () { + await Promise.resolve(); + yield "hello"; + yield "world"; + }, + async function* (source: string[]) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + }, + async function (source: string[]) { + for await (const chunk of source) { + res += chunk; + } + }, + pipelineCb, + ); + + await pipelineExecuted; +}); + +Deno.test("Pipeline passes result to final callback", async () => { + const [pipelineExecuted, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null, val?: unknown) => { + assert(!err); + assertEquals(val, "HELLOWORLD"); + }, + ); + pipeline( + async function* () { + await Promise.resolve(); + yield "hello"; + yield "world"; + }, + async function* (source: string[]) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + }, + async function (source: string[]) { + let ret = ""; + for await (const chunk of source) { + ret += chunk; + } + return ret; + }, + pipelineCb, + ); + + await pipelineExecuted; +}); + +Deno.test("Pipeline returns a stream after ending", async () => { + const [pipelineExecuted, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assertEquals(err, undefined); + }, + ); + const ret = pipeline( + async function* () { + await Promise.resolve(); + yield "hello"; + }, + // deno-lint-ignore require-yield + async function* (source: string[]) { + for await (const chunk of source) { + chunk; + } + }, + pipelineCb, + ); + + ret.resume(); + + assertEquals(typeof ret.pipe, "function"); + + await pipelineExecuted; +}); + +Deno.test("Pipeline returns a stream after erroring", async () => { + const errorText = "kaboom"; + + const [pipelineExecuted, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assertEquals(err?.message, errorText); + }, + ); + const ret = pipeline( + // deno-lint-ignore require-yield + async function* () { + await Promise.resolve(); + throw new Error(errorText); + }, + // deno-lint-ignore require-yield + async function* (source: string[]) { + for await (const chunk of source) { + chunk; + } + }, + pipelineCb, + ); + + ret.resume(); + + assertEquals(typeof ret.pipe, "function"); + + await pipelineExecuted; +}); + +Deno.test("Pipeline destination gets destroyed on error", async () => { + const errorText = "kaboom"; + const s = new PassThrough(); + + const [pipelineExecution, pipelineCb] = mustCall( + (err?: NodeErrorAbstraction | null) => { + assertEquals(err?.message, errorText); + assertEquals(s.destroyed, true); + }, + ); + pipeline( + // deno-lint-ignore require-yield + async function* () { + throw new Error(errorText); + }, + s, + pipelineCb, + ); + + await pipelineExecution; +}); |