summaryrefslogtreecommitdiff
path: root/cli
diff options
context:
space:
mode:
Diffstat (limited to 'cli')
-rw-r--r--cli/tests/integration/js_unit_tests.rs1
-rw-r--r--cli/tests/unit/kv_queue_undelivered_test.ts56
-rw-r--r--cli/tests/unit/kv_test.ts445
-rw-r--r--cli/tsc/dts/lib.deno.unstable.d.ts59
4 files changed, 561 insertions, 0 deletions
diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs
index 4d6ef3675..c8039f89c 100644
--- a/cli/tests/integration/js_unit_tests.rs
+++ b/cli/tests/integration/js_unit_tests.rs
@@ -46,6 +46,7 @@ util::unit_test_factory!(
intl_test,
io_test,
kv_test,
+ kv_queue_undelivered_test,
link_test,
make_temp_test,
message_channel_test,
diff --git a/cli/tests/unit/kv_queue_undelivered_test.ts b/cli/tests/unit/kv_queue_undelivered_test.ts
new file mode 100644
index 000000000..c2c02905c
--- /dev/null
+++ b/cli/tests/unit/kv_queue_undelivered_test.ts
@@ -0,0 +1,56 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+import { assertEquals } from "./test_util.ts";
+
+const sleep = (time: number) => new Promise((r) => setTimeout(r, time));
+
+let isCI: boolean;
+try {
+ isCI = Deno.env.get("CI") !== undefined;
+} catch {
+ isCI = true;
+}
+
+function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) {
+ Deno.test({
+ name,
+ // https://github.com/denoland/deno/issues/18363
+ ignore: Deno.build.os === "darwin" && isCI,
+ async fn() {
+ const db: Deno.Kv = await Deno.openKv(
+ ":memory:",
+ );
+ await fn(db);
+ },
+ });
+}
+
+async function collect<T>(
+ iter: Deno.KvListIterator<T>,
+): Promise<Deno.KvEntry<T>[]> {
+ const entries: Deno.KvEntry<T>[] = [];
+ for await (const entry of iter) {
+ entries.push(entry);
+ }
+ return entries;
+}
+
+queueTest("queue with undelivered", async (db) => {
+ const listener = db.listenQueue((_msg) => {
+ throw new TypeError("dequeue error");
+ });
+ try {
+ await db.enqueue("test", {
+ keysIfUndelivered: [["queue_failed", "a"], ["queue_failed", "b"]],
+ });
+ await sleep(100000);
+ const undelivered = await collect(db.list({ prefix: ["queue_failed"] }));
+ assertEquals(undelivered.length, 2);
+ assertEquals(undelivered[0].key, ["queue_failed", "a"]);
+ assertEquals(undelivered[0].value, "test");
+ assertEquals(undelivered[1].key, ["queue_failed", "b"]);
+ assertEquals(undelivered[1].value, "test");
+ } finally {
+ db.close();
+ await listener;
+ }
+});
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts
index 3c5efa588..edb269e6e 100644
--- a/cli/tests/unit/kv_test.ts
+++ b/cli/tests/unit/kv_test.ts
@@ -3,11 +3,16 @@ import {
assert,
assertEquals,
AssertionError,
+ assertNotEquals,
assertRejects,
assertThrows,
+ Deferred,
+ deferred,
} from "./test_util.ts";
import { assertType, IsExact } from "../../../test_util/std/testing/types.ts";
+const sleep = (time: number) => new Promise((r) => setTimeout(r, time));
+
let isCI: boolean;
try {
isCI = Deno.env.get("CI") !== undefined;
@@ -59,6 +64,20 @@ function dbTest(name: string, fn: (db: Deno.Kv) => Promise<void>) {
});
}
+function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) {
+ Deno.test({
+ name,
+ // https://github.com/denoland/deno/issues/18363
+ ignore: Deno.build.os === "darwin" && isCI,
+ async fn() {
+ const db: Deno.Kv = await Deno.openKv(
+ ":memory:",
+ );
+ await fn(db);
+ },
+ });
+}
+
dbTest("basic read-write-delete and versionstamps", async (db) => {
const result1 = await db.get(["a"]);
assertEquals(result1.key, ["a"]);
@@ -1304,3 +1323,429 @@ async function _typeCheckingTests() {
assert(!j.done);
assertType<IsExact<typeof j.value, Deno.KvEntry<string>>>(true);
}
+
+queueTest("basic listenQueue and enqueue", async (db) => {
+ const promise = deferred();
+ let dequeuedMessage: unknown = null;
+ const listener = db.listenQueue((msg) => {
+ dequeuedMessage = msg;
+ promise.resolve();
+ });
+ try {
+ const res = await db.enqueue("test");
+ assert(res.ok);
+ assertNotEquals(res.versionstamp, null);
+ await promise;
+ assertEquals(dequeuedMessage, "test");
+ } finally {
+ db.close();
+ await listener;
+ }
+});
+
+for (const { name, value } of VALUE_CASES) {
+ queueTest(`listenQueue and enqueue ${name}`, async (db) => {
+ const numEnqueues = 10;
+ let count = 0;
+ const promises: Deferred<void>[] = [];
+ const dequeuedMessages: unknown[] = [];
+ const listeners: Promise<void>[] = [];
+ listeners.push(db.listenQueue((msg) => {
+ dequeuedMessages.push(msg);
+ promises[count++].resolve();
+ }));
+ try {
+ for (let i = 0; i < numEnqueues; i++) {
+ promises.push(deferred());
+ await db.enqueue(value);
+ }
+ for (let i = 0; i < numEnqueues; i++) {
+ await promises[i];
+ }
+ for (let i = 0; i < numEnqueues; i++) {
+ assertEquals(dequeuedMessages[i], value);
+ }
+ } finally {
+ db.close();
+ for (const listener of listeners) {
+ await listener;
+ }
+ }
+ });
+}
+
+queueTest("queue mixed types", async (db) => {
+ let promise: Deferred<void>;
+ let dequeuedMessage: unknown = null;
+ const listener = db.listenQueue((msg) => {
+ dequeuedMessage = msg;
+ promise.resolve();
+ });
+ try {
+ for (const item of VALUE_CASES) {
+ promise = deferred();
+ await db.enqueue(item.value);
+ await promise;
+ assertEquals(dequeuedMessage, item.value);
+ }
+ } finally {
+ db.close();
+ await listener;
+ }
+});
+
+queueTest("queue delay", async (db) => {
+ let dequeueTime: number | undefined;
+ const promise = deferred();
+ let dequeuedMessage: unknown = null;
+ const listener = db.listenQueue((msg) => {
+ dequeueTime = Date.now();
+ dequeuedMessage = msg;
+ promise.resolve();
+ });
+ try {
+ const enqueueTime = Date.now();
+ await db.enqueue("test", { delay: 1000 });
+ await promise;
+ assertEquals(dequeuedMessage, "test");
+ assert(dequeueTime !== undefined);
+ assert(dequeueTime - enqueueTime >= 1000);
+ } finally {
+ db.close();
+ await listener;
+ }
+});
+
+queueTest("queue delay with atomic", async (db) => {
+ let dequeueTime: number | undefined;
+ const promise = deferred();
+ let dequeuedMessage: unknown = null;
+ const listener = db.listenQueue((msg) => {
+ dequeueTime = Date.now();
+ dequeuedMessage = msg;
+ promise.resolve();
+ });
+ try {
+ const enqueueTime = Date.now();
+ const res = await db.atomic()
+ .enqueue("test", { delay: 1000 })
+ .commit();
+ assert(res.ok);
+
+ await promise;
+ assertEquals(dequeuedMessage, "test");
+ assert(dequeueTime !== undefined);
+ assert(dequeueTime - enqueueTime >= 1000);
+ } finally {
+ db.close();
+ await listener;
+ }
+});
+
+queueTest("queue delay and now", async (db) => {
+ let count = 0;
+ let dequeueTime: number | undefined;
+ const promise = deferred();
+ let dequeuedMessage: unknown = null;
+ const listener = db.listenQueue((msg) => {
+ count += 1;
+ if (count == 2) {
+ dequeueTime = Date.now();
+ dequeuedMessage = msg;
+ promise.resolve();
+ }
+ });
+ try {
+ const enqueueTime = Date.now();
+ await db.enqueue("test-1000", { delay: 1000 });
+ await db.enqueue("test");
+ await promise;
+ assertEquals(dequeuedMessage, "test-1000");
+ assert(dequeueTime !== undefined);
+ assert(dequeueTime - enqueueTime >= 1000);
+ } finally {
+ db.close();
+ await listener;
+ }
+});
+
+dbTest("queue negative delay", async (db) => {
+ await assertRejects(async () => {
+ await db.enqueue("test", { delay: -100 });
+ }, TypeError);
+});
+
+dbTest("queue nan delay", async (db) => {
+ await assertRejects(async () => {
+ await db.enqueue("test", { delay: Number.NaN });
+ }, TypeError);
+});
+
+dbTest("queue large delay", async (db) => {
+ await db.enqueue("test", { delay: 7 * 24 * 60 * 60 * 1000 });
+ await assertRejects(async () => {
+ await db.enqueue("test", { delay: 7 * 24 * 60 * 60 * 1000 + 1 });
+ }, TypeError);
+});
+
+queueTest("listenQueue with async callback", async (db) => {
+ const promise = deferred();
+ let dequeuedMessage: unknown = null;
+ const listener = db.listenQueue(async (msg) => {
+ dequeuedMessage = msg;
+ await sleep(100);
+ promise.resolve();
+ });
+ try {
+ await db.enqueue("test");
+ await promise;
+ assertEquals(dequeuedMessage, "test");
+ } finally {
+ db.close();
+ await listener;
+ }
+});
+
+queueTest("queue retries", async (db) => {
+ let count = 0;
+ const listener = db.listenQueue(async (_msg) => {
+ count += 1;
+ await sleep(10);
+ throw new TypeError("dequeue error");
+ });
+ try {
+ await db.enqueue("test");
+ await sleep(10000);
+ } finally {
+ db.close();
+ await listener;
+ }
+
+ // There should have been 1 attempt + 3 retries in the 10 seconds
+ assertEquals(4, count);
+});
+
+queueTest("multiple listenQueues", async (db) => {
+ const numListens = 10;
+ let count = 0;
+ const promises: Deferred<void>[] = [];
+ const dequeuedMessages: unknown[] = [];
+ const listeners: Promise<void>[] = [];
+ for (let i = 0; i < numListens; i++) {
+ listeners.push(db.listenQueue((msg) => {
+ dequeuedMessages.push(msg);
+ promises[count++].resolve();
+ }));
+ }
+ try {
+ for (let i = 0; i < numListens; i++) {
+ promises.push(deferred());
+ await db.enqueue("msg_" + i);
+ await promises[i];
+ const msg = dequeuedMessages[i];
+ assertEquals("msg_" + i, msg);
+ }
+ } finally {
+ db.close();
+ for (let i = 0; i < numListens; i++) {
+ await listeners[i];
+ }
+ }
+});
+
+queueTest("enqueue with atomic", async (db) => {
+ const promise = deferred();
+ let dequeuedMessage: unknown = null;
+ const listener = db.listenQueue((msg) => {
+ dequeuedMessage = msg;
+ promise.resolve();
+ });
+
+ try {
+ await db.set(["t"], "1");
+
+ let currentValue = await db.get(["t"]);
+ assertEquals("1", currentValue.value);
+
+ const res = await db.atomic()
+ .check(currentValue)
+ .set(currentValue.key, "2")
+ .enqueue("test")
+ .commit();
+ assert(res.ok);
+
+ await promise;
+ assertEquals("test", dequeuedMessage);
+
+ currentValue = await db.get(["t"]);
+ assertEquals("2", currentValue.value);
+ } finally {
+ db.close();
+ await listener;
+ }
+});
+
+queueTest("enqueue with atomic nonce", async (db) => {
+ const promise = deferred();
+ let dequeuedMessage: unknown = null;
+
+ const nonce = crypto.randomUUID();
+
+ const listener = db.listenQueue(async (val) => {
+ const message = val as { msg: string; nonce: string };
+ const nonce = message.nonce;
+ const nonceValue = await db.get(["nonces", nonce]);
+ if (nonceValue.versionstamp === null) {
+ dequeuedMessage = message.msg;
+ promise.resolve();
+ return;
+ }
+
+ assertNotEquals(nonceValue.versionstamp, null);
+ const res = await db.atomic()
+ .check(nonceValue)
+ .delete(["nonces", nonce])
+ .set(["a", "b"], message.msg)
+ .commit();
+ if (res.ok) {
+ // Simulate an error so that the message has to be redelivered
+ throw new Error("injected error");
+ }
+ });
+
+ try {
+ const res = await db.atomic()
+ .check({ key: ["nonces", nonce], versionstamp: null })
+ .set(["nonces", nonce], true)
+ .enqueue({ msg: "test", nonce })
+ .commit();
+ assert(res.ok);
+
+ await promise;
+ assertEquals("test", dequeuedMessage);
+
+ const currentValue = await db.get(["a", "b"]);
+ assertEquals("test", currentValue.value);
+
+ const nonceValue = await db.get(["nonces", nonce]);
+ assertEquals(nonceValue.versionstamp, null);
+ } finally {
+ db.close();
+ await listener;
+ }
+});
+
+Deno.test({
+ name: "queue persistence with inflight messages",
+ sanitizeOps: false,
+ sanitizeResources: false,
+ async fn() {
+ const filename = "cli/tests/testdata/queue.db";
+ try {
+ await Deno.remove(filename);
+ } catch {
+ // pass
+ }
+ try {
+ let db: Deno.Kv = await Deno.openKv(filename);
+
+ let count = 0;
+ let promise = deferred();
+
+ // Register long-running handler.
+ let listener = db.listenQueue(async (_msg) => {
+ count += 1;
+ if (count == 3) {
+ promise.resolve();
+ }
+ await sleep(60000);
+ });
+
+ // Enqueue 3 messages.
+ await db.enqueue("msg0");
+ await db.enqueue("msg1");
+ await db.enqueue("msg2");
+ await promise;
+
+ // Close the database and wait for the listerner to finish.
+ db.close();
+ await listener;
+
+ // Now reopen the database.
+ db = await Deno.openKv(filename);
+
+ count = 0;
+ promise = deferred();
+
+ // Register a handler that will complete quickly.
+ listener = db.listenQueue((_msg) => {
+ count += 1;
+ if (count == 3) {
+ promise.resolve();
+ }
+ });
+
+ // Wait for the handlers to finish.
+ await promise;
+ assertEquals(3, count);
+ db.close();
+ await listener;
+ } finally {
+ await Deno.remove(filename);
+ }
+ },
+});
+
+Deno.test({
+ name: "queue persistence with delay messages",
+ sanitizeOps: false,
+ sanitizeResources: false,
+ async fn() {
+ const filename = "cli/tests/testdata/queue.db";
+ try {
+ await Deno.remove(filename);
+ } catch {
+ // pass
+ }
+ try {
+ let db: Deno.Kv = await Deno.openKv(filename);
+
+ let count = 0;
+ let promise = deferred();
+
+ // Register long-running handler.
+ let listener = db.listenQueue((_msg) => {});
+
+ // Enqueue 3 messages into the future.
+ await db.enqueue("msg0", { delay: 10000 });
+ await db.enqueue("msg1", { delay: 10000 });
+ await db.enqueue("msg2", { delay: 10000 });
+
+ // Close the database and wait for the listerner to finish.
+ db.close();
+ await listener;
+
+ // Now reopen the database.
+ db = await Deno.openKv(filename);
+
+ count = 0;
+ promise = deferred();
+
+ // Register a handler that will complete quickly.
+ listener = db.listenQueue((_msg) => {
+ count += 1;
+ if (count == 3) {
+ promise.resolve();
+ }
+ });
+
+ // Wait for the handlers to finish.
+ await promise;
+ assertEquals(3, count);
+ db.close();
+ await listener;
+ } finally {
+ await Deno.remove(filename);
+ }
+ },
+});
diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts
index 27d3af4cd..8f11adfff 100644
--- a/cli/tsc/dts/lib.deno.unstable.d.ts
+++ b/cli/tsc/dts/lib.deno.unstable.d.ts
@@ -1915,6 +1915,14 @@ declare namespace Deno {
*/
delete(key: KvKey): this;
/**
+ * Add to the operation a mutation that enqueues a value into the queue
+ * if all checks pass during the commit.
+ */
+ enqueue(
+ value: unknown,
+ options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
+ ): this;
+ /**
* Commit the operation to the KV store. Returns a value indicating whether
* checks passed and mutations were performed. If the operation failed
* because of a failed check, the return value will be a {@linkcode
@@ -2088,6 +2096,57 @@ declare namespace Deno {
): KvListIterator<T>;
/**
+ * Add a value into the database queue to be delivered to the queue
+ * listener via {@linkcode Deno.Kv.listenQueue}.
+ *
+ * ```ts
+ * const db = await Deno.openKv();
+ * await db.enqueue("bar");
+ * ```
+ *
+ * The `delay` option can be used to specify the delay (in milliseconds)
+ * of the value delivery. The default delay is 0, which means immediate
+ * delivery.
+ *
+ * ```ts
+ * const db = await Deno.openKv();
+ * await db.enqueue("bar", { delay: 60000 });
+ * ```
+ *
+ * The `keysIfUndelivered` option can be used to specify the keys to
+ * be set if the value is not successfully delivered to the queue
+ * listener after several attempts. The values are set to the value of
+ * the queued message.
+ *
+ * ```ts
+ * const db = await Deno.openKv();
+ * await db.enqueue("bar", { keysIfUndelivered: [["foo", "bar"]] });
+ * ```
+ */
+ enqueue(
+ value: unknown,
+ options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
+ ): Promise<KvCommitResult>;
+
+ /**
+ * Listen for queue values to be delivered from the database queue, which
+ * were enqueued with {@linkcode Deno.Kv.enqueue}. The provided handler
+ * callback is invoked on every dequeued value. A failed callback
+ * invocation is automatically retried multiple times until it succeeds
+ * or until the maximum number of retries is reached.
+ *
+ * ```ts
+ * const db = await Deno.openKv();
+ * db.listenQueue(async (msg: unknown) => {
+ * await db.set(["foo"], msg);
+ * });
+ * ```
+ */
+ listenQueue(
+ handler: (value: unknown) => Promise<void> | void,
+ ): Promise<void>;
+
+ /**
* Create a new {@linkcode Deno.AtomicOperation} object which can be used to
* perform an atomic transaction on the database. This does not perform any
* operations on the database - the atomic transaction must be committed