summaryrefslogtreecommitdiff
path: root/std
diff options
context:
space:
mode:
Diffstat (limited to 'std')
-rw-r--r--std/node/events.ts118
-rw-r--r--std/node/events_test.ts182
2 files changed, 299 insertions, 1 deletions
diff --git a/std/node/events.ts b/std/node/events.ts
index 92fe7a704..f4b3e0bc5 100644
--- a/std/node/events.ts
+++ b/std/node/events.ts
@@ -393,3 +393,121 @@ export function once(
}
});
}
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+function createIterResult(value: any, done: boolean): IteratorResult<any> {
+ return { value, done };
+}
+
+interface AsyncInterable {
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ next(): Promise<IteratorResult<any, any>>;
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ return(): Promise<IteratorResult<any, any>>;
+ throw(err: Error): void;
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ [Symbol.asyncIterator](): any;
+}
+
+/**
+ * Returns an AsyncIterator that iterates eventName events. It will throw if
+ * the EventEmitter emits 'error'. It removes all listeners when exiting the
+ * loop. The value returned by each iteration is an array composed of the
+ * emitted event arguments.
+ */
+export function on(
+ emitter: EventEmitter,
+ event: string | symbol
+): AsyncInterable {
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ const unconsumedEventValues: any[] = [];
+ const unconsumedPromises = [];
+ let error = null;
+ let finished = false;
+
+ const iterator = {
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ next(): Promise<IteratorResult<any>> {
+ // First, we consume all unread events
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ const value: any = unconsumedEventValues.shift();
+ if (value) {
+ return Promise.resolve(createIterResult(value, false));
+ }
+
+ // Then we error, if an error happened
+ // This happens one time if at all, because after 'error'
+ // we stop listening
+ if (error) {
+ const p: Promise<never> = Promise.reject(error);
+ // Only the first element errors
+ error = null;
+ return p;
+ }
+
+ // If the iterator is finished, resolve to done
+ if (finished) {
+ return Promise.resolve(createIterResult(undefined, true));
+ }
+
+ // Wait until an event happens
+ return new Promise(function(resolve, reject) {
+ unconsumedPromises.push({ resolve, reject });
+ });
+ },
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ return(): Promise<IteratorResult<any>> {
+ emitter.removeListener(event, eventHandler);
+ emitter.removeListener("error", errorHandler);
+ finished = true;
+
+ for (const promise of unconsumedPromises) {
+ promise.resolve(createIterResult(undefined, true));
+ }
+
+ return Promise.resolve(createIterResult(undefined, true));
+ },
+
+ throw(err: Error): void {
+ error = err;
+ emitter.removeListener(event, eventHandler);
+ emitter.removeListener("error", errorHandler);
+ },
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ [Symbol.asyncIterator](): AsyncIterable<any> {
+ return this;
+ }
+ };
+
+ emitter.on(event, eventHandler);
+ emitter.on("error", errorHandler);
+
+ return iterator;
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ function eventHandler(...args: any[]): void {
+ const promise = unconsumedPromises.shift();
+ if (promise) {
+ promise.resolve(createIterResult(args, false));
+ } else {
+ unconsumedEventValues.push(args);
+ }
+ }
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ function errorHandler(err: any): void {
+ finished = true;
+
+ const toError = unconsumedPromises.shift();
+ if (toError) {
+ toError.reject(err);
+ } else {
+ // The next time we call next()
+ error = err;
+ }
+
+ iterator.return();
+ }
+}
diff --git a/std/node/events_test.ts b/std/node/events_test.ts
index f86265d72..c89df298a 100644
--- a/std/node/events_test.ts
+++ b/std/node/events_test.ts
@@ -5,7 +5,7 @@ import {
fail,
assertThrows
} from "../testing/asserts.ts";
-import EventEmitter, { WrappedFunction, once } from "./events.ts";
+import EventEmitter, { WrappedFunction, once, on } from "./events.ts";
const shouldNeverBeEmitted: Function = () => {
fail("Should never be called");
@@ -439,3 +439,183 @@ test({
assertEquals(events, ["errorMonitor event", "error"]);
}
});
+
+test({
+ name: "asyncronous iteration of events are handled as expected",
+ async fn() {
+ const ee = new EventEmitter();
+ setTimeout(() => {
+ ee.emit("foo", "bar");
+ ee.emit("bar", 24);
+ ee.emit("foo", 42);
+ }, 0);
+
+ const iterable = on(ee, "foo");
+
+ const expected = [["bar"], [42]];
+
+ for await (const event of iterable) {
+ const current = expected.shift();
+
+ assertEquals(current, event);
+
+ if (expected.length === 0) {
+ break;
+ }
+ }
+ assertEquals(ee.listenerCount("foo"), 0);
+ assertEquals(ee.listenerCount("error"), 0);
+ }
+});
+
+test({
+ name: "asyncronous error handling of emitted events works as expected",
+ async fn() {
+ const ee = new EventEmitter();
+ const _err = new Error("kaboom");
+ setTimeout(() => {
+ ee.emit("error", _err);
+ }, 0);
+
+ const iterable = on(ee, "foo");
+ let thrown = false;
+
+ try {
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
+ for await (const event of iterable) {
+ fail("no events should be processed due to the error thrown");
+ }
+ } catch (err) {
+ thrown = true;
+ assertEquals(err, _err);
+ }
+ assertEquals(thrown, true);
+ }
+});
+
+test({
+ name: "error thrown during asyncronous processing of events is handled",
+ async fn() {
+ const ee = new EventEmitter();
+ const _err = new Error("kaboom");
+ setTimeout(() => {
+ ee.emit("foo", 42);
+ ee.emit("error", _err);
+ }, 0);
+
+ const iterable = on(ee, "foo");
+ const expected = [[42]];
+ let thrown = false;
+
+ try {
+ for await (const event of iterable) {
+ const current = expected.shift();
+ assertEquals(current, event);
+ }
+ } catch (err) {
+ thrown = true;
+ assertEquals(err, _err);
+ }
+ assertEquals(thrown, true);
+ assertEquals(ee.listenerCount("foo"), 0);
+ assertEquals(ee.listenerCount("error"), 0);
+ }
+});
+
+test({
+ name:
+ "error thrown in processing loop of asyncronous event prevents processing of additional events",
+ async fn() {
+ const ee = new EventEmitter();
+ const _err = new Error("kaboom");
+
+ setTimeout(() => {
+ ee.emit("foo", 42);
+ ee.emit("foo", 999);
+ }, 0);
+
+ try {
+ for await (const event of on(ee, "foo")) {
+ assertEquals(event, [42]);
+ throw _err;
+ }
+ } catch (err) {
+ assertEquals(err, _err);
+ }
+
+ assertEquals(ee.listenerCount("foo"), 0);
+ assertEquals(ee.listenerCount("error"), 0);
+ }
+});
+
+test({
+ name: "asyncronous iterator next() works as expected",
+ async fn() {
+ const ee = new EventEmitter();
+ const iterable = on(ee, "foo");
+
+ setTimeout(function() {
+ ee.emit("foo", "bar");
+ ee.emit("foo", 42);
+ iterable.return();
+ }, 0);
+
+ const results = await Promise.all([
+ iterable.next(),
+ iterable.next(),
+ iterable.next()
+ ]);
+
+ assertEquals(results, [
+ {
+ value: ["bar"],
+ done: false
+ },
+ {
+ value: [42],
+ done: false
+ },
+ {
+ value: undefined,
+ done: true
+ }
+ ]);
+
+ assertEquals(await iterable.next(), {
+ value: undefined,
+ done: true
+ });
+ }
+});
+
+test({
+ name: "async iterable throw handles various scenarios",
+ async fn() {
+ const ee = new EventEmitter();
+ const iterable = on(ee, "foo");
+
+ setTimeout(() => {
+ ee.emit("foo", "bar");
+ ee.emit("foo", 42); // lost in the queue
+ iterable.throw(_err);
+ }, 0);
+
+ const _err = new Error("kaboom");
+ let thrown = false;
+
+ const expected = [["bar"], [42]];
+
+ try {
+ for await (const event of iterable) {
+ assertEquals(event, expected.shift());
+ }
+ } catch (err) {
+ thrown = true;
+ assertEquals(err, _err);
+ }
+ assertEquals(thrown, true);
+ assertEquals(expected.length, 0);
+ assertEquals(ee.listenerCount("foo"), 0);
+ assertEquals(ee.listenerCount("error"), 0);
+ }
+});