summaryrefslogtreecommitdiff
path: root/std/node/_stream/async_iterator_test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/async_iterator_test.ts')
-rw-r--r--std/node/_stream/async_iterator_test.ts249
1 files changed, 249 insertions, 0 deletions
diff --git a/std/node/_stream/async_iterator_test.ts b/std/node/_stream/async_iterator_test.ts
new file mode 100644
index 000000000..17698e0fd
--- /dev/null
+++ b/std/node/_stream/async_iterator_test.ts
@@ -0,0 +1,249 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import Readable from "./readable.ts";
+import Stream from "./stream.ts";
+import toReadableAsyncIterator from "./async_iterator.ts";
+import { deferred } from "../../async/mod.ts";
+import { assertEquals, assertThrowsAsync } from "../../testing/asserts.ts";
+
+Deno.test("Stream to async iterator", async () => {
+ let destroyExecuted = 0;
+ const destroyExecutedExpected = 1;
+ const destroyExpectedExecutions = deferred();
+
+ class AsyncIteratorStream extends Stream {
+ constructor() {
+ super();
+ }
+
+ destroy() {
+ destroyExecuted++;
+ if (destroyExecuted == destroyExecutedExpected) {
+ destroyExpectedExecutions.resolve();
+ }
+ }
+
+ [Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator];
+ }
+
+ const stream = new AsyncIteratorStream();
+
+ queueMicrotask(() => {
+ stream.emit("data", "hello");
+ stream.emit("data", "world");
+ stream.emit("end");
+ });
+
+ let res = "";
+
+ for await (const d of stream) {
+ res += d;
+ }
+ assertEquals(res, "helloworld");
+
+ const destroyTimeout = setTimeout(
+ () => destroyExpectedExecutions.reject(),
+ 1000,
+ );
+ await destroyExpectedExecutions;
+ clearTimeout(destroyTimeout);
+ assertEquals(destroyExecuted, destroyExecutedExpected);
+});
+
+Deno.test("Stream to async iterator throws on 'error' emitted", async () => {
+ let closeExecuted = 0;
+ const closeExecutedExpected = 1;
+ const closeExpectedExecutions = deferred();
+
+ let errorExecuted = 0;
+ const errorExecutedExpected = 1;
+ const errorExpectedExecutions = deferred();
+
+ class StreamImplementation extends Stream {
+ close() {
+ closeExecuted++;
+ if (closeExecuted == closeExecutedExpected) {
+ closeExpectedExecutions.resolve();
+ }
+ }
+ }
+
+ const stream = new StreamImplementation();
+ queueMicrotask(() => {
+ stream.emit("data", 0);
+ stream.emit("data", 1);
+ stream.emit("error", new Error("asd"));
+ });
+
+ toReadableAsyncIterator(stream)
+ .next()
+ .catch((err) => {
+ errorExecuted++;
+ if (errorExecuted == errorExecutedExpected) {
+ errorExpectedExecutions.resolve();
+ }
+ assertEquals(err.message, "asd");
+ });
+
+ const closeTimeout = setTimeout(
+ () => closeExpectedExecutions.reject(),
+ 1000,
+ );
+ const errorTimeout = setTimeout(
+ () => errorExpectedExecutions.reject(),
+ 1000,
+ );
+ await closeExpectedExecutions;
+ await errorExpectedExecutions;
+ clearTimeout(closeTimeout);
+ clearTimeout(errorTimeout);
+ assertEquals(closeExecuted, closeExecutedExpected);
+ assertEquals(errorExecuted, errorExecutedExpected);
+});
+
+Deno.test("Async iterator matches values of Readable", async () => {
+ const readable = new Readable({
+ objectMode: true,
+ read() {},
+ });
+ readable.push(0);
+ readable.push(1);
+ readable.push(null);
+
+ const iter = readable[Symbol.asyncIterator]();
+
+ assertEquals(
+ await iter.next().then(({ value }) => value),
+ 0,
+ );
+ for await (const d of iter) {
+ assertEquals(d, 1);
+ }
+});
+
+Deno.test("Async iterator throws on Readable destroyed sync", async () => {
+ const message = "kaboom from read";
+
+ const readable = new Readable({
+ objectMode: true,
+ read() {
+ this.destroy(new Error(message));
+ },
+ });
+
+ await assertThrowsAsync(
+ async () => {
+ // deno-lint-ignore no-empty
+ for await (const k of readable) {}
+ },
+ Error,
+ message,
+ );
+});
+
+Deno.test("Async iterator throws on Readable destroyed async", async () => {
+ const message = "kaboom";
+ const readable = new Readable({
+ read() {},
+ });
+ const iterator = readable[Symbol.asyncIterator]();
+
+ readable.destroy(new Error(message));
+
+ await assertThrowsAsync(
+ iterator.next.bind(iterator),
+ Error,
+ message,
+ );
+});
+
+Deno.test("Async iterator finishes the iterator when Readable destroyed", async () => {
+ const readable = new Readable({
+ read() {},
+ });
+
+ readable.destroy();
+
+ const { done } = await readable[Symbol.asyncIterator]().next();
+ assertEquals(done, true);
+});
+
+Deno.test("Async iterator finishes all item promises when Readable destroyed", async () => {
+ const r = new Readable({
+ objectMode: true,
+ read() {
+ },
+ });
+
+ const b = r[Symbol.asyncIterator]();
+ const c = b.next();
+ const d = b.next();
+ r.destroy();
+ assertEquals(await c, { done: true, value: undefined });
+ assertEquals(await d, { done: true, value: undefined });
+});
+
+Deno.test("Async iterator: 'next' is triggered by Readable push", async () => {
+ const max = 42;
+ let readed = 0;
+ let received = 0;
+ const readable = new Readable({
+ objectMode: true,
+ read() {
+ this.push("hello");
+ if (++readed === max) {
+ this.push(null);
+ }
+ },
+ });
+
+ for await (const k of readable) {
+ received++;
+ assertEquals(k, "hello");
+ }
+
+ assertEquals(readed, received);
+});
+
+Deno.test("Async iterator: 'close' called on forced iteration end", async () => {
+ let closeExecuted = 0;
+ const closeExecutedExpected = 1;
+ const closeExpectedExecutions = deferred();
+
+ class IndestructibleReadable extends Readable {
+ constructor() {
+ super({
+ autoDestroy: false,
+ read() {},
+ });
+ }
+
+ close() {
+ closeExecuted++;
+ if (closeExecuted == closeExecutedExpected) {
+ closeExpectedExecutions.resolve();
+ }
+ readable.emit("close");
+ }
+
+ // deno-lint-ignore ban-ts-comment
+ //@ts-ignore
+ destroy = null;
+ }
+
+ const readable = new IndestructibleReadable();
+ readable.push("asd");
+ readable.push("asd");
+
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
+ for await (const d of readable) {
+ break;
+ }
+
+ const closeTimeout = setTimeout(
+ () => closeExpectedExecutions.reject(),
+ 1000,
+ );
+ await closeExpectedExecutions;
+ clearTimeout(closeTimeout);
+ assertEquals(closeExecuted, closeExecutedExpected);
+});