diff options
Diffstat (limited to 'cli')
-rw-r--r-- | cli/tests/integration/js_unit_tests.rs | 1 | ||||
-rw-r--r-- | cli/tests/unit/kv_queue_undelivered_test.ts | 56 | ||||
-rw-r--r-- | cli/tests/unit/kv_test.ts | 445 | ||||
-rw-r--r-- | cli/tsc/dts/lib.deno.unstable.d.ts | 59 |
4 files changed, 561 insertions, 0 deletions
diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs index 4d6ef3675..c8039f89c 100644 --- a/cli/tests/integration/js_unit_tests.rs +++ b/cli/tests/integration/js_unit_tests.rs @@ -46,6 +46,7 @@ util::unit_test_factory!( intl_test, io_test, kv_test, + kv_queue_undelivered_test, link_test, make_temp_test, message_channel_test, diff --git a/cli/tests/unit/kv_queue_undelivered_test.ts b/cli/tests/unit/kv_queue_undelivered_test.ts new file mode 100644 index 000000000..c2c02905c --- /dev/null +++ b/cli/tests/unit/kv_queue_undelivered_test.ts @@ -0,0 +1,56 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +import { assertEquals } from "./test_util.ts"; + +const sleep = (time: number) => new Promise((r) => setTimeout(r, time)); + +let isCI: boolean; +try { + isCI = Deno.env.get("CI") !== undefined; +} catch { + isCI = true; +} + +function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) { + Deno.test({ + name, + // https://github.com/denoland/deno/issues/18363 + ignore: Deno.build.os === "darwin" && isCI, + async fn() { + const db: Deno.Kv = await Deno.openKv( + ":memory:", + ); + await fn(db); + }, + }); +} + +async function collect<T>( + iter: Deno.KvListIterator<T>, +): Promise<Deno.KvEntry<T>[]> { + const entries: Deno.KvEntry<T>[] = []; + for await (const entry of iter) { + entries.push(entry); + } + return entries; +} + +queueTest("queue with undelivered", async (db) => { + const listener = db.listenQueue((_msg) => { + throw new TypeError("dequeue error"); + }); + try { + await db.enqueue("test", { + keysIfUndelivered: [["queue_failed", "a"], ["queue_failed", "b"]], + }); + await sleep(100000); + const undelivered = await collect(db.list({ prefix: ["queue_failed"] })); + assertEquals(undelivered.length, 2); + assertEquals(undelivered[0].key, ["queue_failed", "a"]); + assertEquals(undelivered[0].value, "test"); + assertEquals(undelivered[1].key, ["queue_failed", "b"]); + assertEquals(undelivered[1].value, "test"); + } finally { + db.close(); + await listener; + } +}); diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 3c5efa588..edb269e6e 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -3,11 +3,16 @@ import { assert, assertEquals, AssertionError, + assertNotEquals, assertRejects, assertThrows, + Deferred, + deferred, } from "./test_util.ts"; import { assertType, IsExact } from "../../../test_util/std/testing/types.ts"; +const sleep = (time: number) => new Promise((r) => setTimeout(r, time)); + let isCI: boolean; try { isCI = Deno.env.get("CI") !== undefined; @@ -59,6 +64,20 @@ function dbTest(name: string, fn: (db: Deno.Kv) => Promise<void>) { }); } +function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) { + Deno.test({ + name, + // https://github.com/denoland/deno/issues/18363 + ignore: Deno.build.os === "darwin" && isCI, + async fn() { + const db: Deno.Kv = await Deno.openKv( + ":memory:", + ); + await fn(db); + }, + }); +} + dbTest("basic read-write-delete and versionstamps", async (db) => { const result1 = await db.get(["a"]); assertEquals(result1.key, ["a"]); @@ -1304,3 +1323,429 @@ async function _typeCheckingTests() { assert(!j.done); assertType<IsExact<typeof j.value, Deno.KvEntry<string>>>(true); } + +queueTest("basic listenQueue and enqueue", async (db) => { + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + dequeuedMessage = msg; + promise.resolve(); + }); + try { + const res = await db.enqueue("test"); + assert(res.ok); + assertNotEquals(res.versionstamp, null); + await promise; + assertEquals(dequeuedMessage, "test"); + } finally { + db.close(); + await listener; + } +}); + +for (const { name, value } of VALUE_CASES) { + queueTest(`listenQueue and enqueue ${name}`, async (db) => { + const numEnqueues = 10; + let count = 0; + const promises: Deferred<void>[] = []; + const dequeuedMessages: unknown[] = []; + const listeners: Promise<void>[] = []; + listeners.push(db.listenQueue((msg) => { + dequeuedMessages.push(msg); + promises[count++].resolve(); + })); + try { + for (let i = 0; i < numEnqueues; i++) { + promises.push(deferred()); + await db.enqueue(value); + } + for (let i = 0; i < numEnqueues; i++) { + await promises[i]; + } + for (let i = 0; i < numEnqueues; i++) { + assertEquals(dequeuedMessages[i], value); + } + } finally { + db.close(); + for (const listener of listeners) { + await listener; + } + } + }); +} + +queueTest("queue mixed types", async (db) => { + let promise: Deferred<void>; + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + dequeuedMessage = msg; + promise.resolve(); + }); + try { + for (const item of VALUE_CASES) { + promise = deferred(); + await db.enqueue(item.value); + await promise; + assertEquals(dequeuedMessage, item.value); + } + } finally { + db.close(); + await listener; + } +}); + +queueTest("queue delay", async (db) => { + let dequeueTime: number | undefined; + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + dequeueTime = Date.now(); + dequeuedMessage = msg; + promise.resolve(); + }); + try { + const enqueueTime = Date.now(); + await db.enqueue("test", { delay: 1000 }); + await promise; + assertEquals(dequeuedMessage, "test"); + assert(dequeueTime !== undefined); + assert(dequeueTime - enqueueTime >= 1000); + } finally { + db.close(); + await listener; + } +}); + +queueTest("queue delay with atomic", async (db) => { + let dequeueTime: number | undefined; + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + dequeueTime = Date.now(); + dequeuedMessage = msg; + promise.resolve(); + }); + try { + const enqueueTime = Date.now(); + const res = await db.atomic() + .enqueue("test", { delay: 1000 }) + .commit(); + assert(res.ok); + + await promise; + assertEquals(dequeuedMessage, "test"); + assert(dequeueTime !== undefined); + assert(dequeueTime - enqueueTime >= 1000); + } finally { + db.close(); + await listener; + } +}); + +queueTest("queue delay and now", async (db) => { + let count = 0; + let dequeueTime: number | undefined; + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + count += 1; + if (count == 2) { + dequeueTime = Date.now(); + dequeuedMessage = msg; + promise.resolve(); + } + }); + try { + const enqueueTime = Date.now(); + await db.enqueue("test-1000", { delay: 1000 }); + await db.enqueue("test"); + await promise; + assertEquals(dequeuedMessage, "test-1000"); + assert(dequeueTime !== undefined); + assert(dequeueTime - enqueueTime >= 1000); + } finally { + db.close(); + await listener; + } +}); + +dbTest("queue negative delay", async (db) => { + await assertRejects(async () => { + await db.enqueue("test", { delay: -100 }); + }, TypeError); +}); + +dbTest("queue nan delay", async (db) => { + await assertRejects(async () => { + await db.enqueue("test", { delay: Number.NaN }); + }, TypeError); +}); + +dbTest("queue large delay", async (db) => { + await db.enqueue("test", { delay: 7 * 24 * 60 * 60 * 1000 }); + await assertRejects(async () => { + await db.enqueue("test", { delay: 7 * 24 * 60 * 60 * 1000 + 1 }); + }, TypeError); +}); + +queueTest("listenQueue with async callback", async (db) => { + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue(async (msg) => { + dequeuedMessage = msg; + await sleep(100); + promise.resolve(); + }); + try { + await db.enqueue("test"); + await promise; + assertEquals(dequeuedMessage, "test"); + } finally { + db.close(); + await listener; + } +}); + +queueTest("queue retries", async (db) => { + let count = 0; + const listener = db.listenQueue(async (_msg) => { + count += 1; + await sleep(10); + throw new TypeError("dequeue error"); + }); + try { + await db.enqueue("test"); + await sleep(10000); + } finally { + db.close(); + await listener; + } + + // There should have been 1 attempt + 3 retries in the 10 seconds + assertEquals(4, count); +}); + +queueTest("multiple listenQueues", async (db) => { + const numListens = 10; + let count = 0; + const promises: Deferred<void>[] = []; + const dequeuedMessages: unknown[] = []; + const listeners: Promise<void>[] = []; + for (let i = 0; i < numListens; i++) { + listeners.push(db.listenQueue((msg) => { + dequeuedMessages.push(msg); + promises[count++].resolve(); + })); + } + try { + for (let i = 0; i < numListens; i++) { + promises.push(deferred()); + await db.enqueue("msg_" + i); + await promises[i]; + const msg = dequeuedMessages[i]; + assertEquals("msg_" + i, msg); + } + } finally { + db.close(); + for (let i = 0; i < numListens; i++) { + await listeners[i]; + } + } +}); + +queueTest("enqueue with atomic", async (db) => { + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + dequeuedMessage = msg; + promise.resolve(); + }); + + try { + await db.set(["t"], "1"); + + let currentValue = await db.get(["t"]); + assertEquals("1", currentValue.value); + + const res = await db.atomic() + .check(currentValue) + .set(currentValue.key, "2") + .enqueue("test") + .commit(); + assert(res.ok); + + await promise; + assertEquals("test", dequeuedMessage); + + currentValue = await db.get(["t"]); + assertEquals("2", currentValue.value); + } finally { + db.close(); + await listener; + } +}); + +queueTest("enqueue with atomic nonce", async (db) => { + const promise = deferred(); + let dequeuedMessage: unknown = null; + + const nonce = crypto.randomUUID(); + + const listener = db.listenQueue(async (val) => { + const message = val as { msg: string; nonce: string }; + const nonce = message.nonce; + const nonceValue = await db.get(["nonces", nonce]); + if (nonceValue.versionstamp === null) { + dequeuedMessage = message.msg; + promise.resolve(); + return; + } + + assertNotEquals(nonceValue.versionstamp, null); + const res = await db.atomic() + .check(nonceValue) + .delete(["nonces", nonce]) + .set(["a", "b"], message.msg) + .commit(); + if (res.ok) { + // Simulate an error so that the message has to be redelivered + throw new Error("injected error"); + } + }); + + try { + const res = await db.atomic() + .check({ key: ["nonces", nonce], versionstamp: null }) + .set(["nonces", nonce], true) + .enqueue({ msg: "test", nonce }) + .commit(); + assert(res.ok); + + await promise; + assertEquals("test", dequeuedMessage); + + const currentValue = await db.get(["a", "b"]); + assertEquals("test", currentValue.value); + + const nonceValue = await db.get(["nonces", nonce]); + assertEquals(nonceValue.versionstamp, null); + } finally { + db.close(); + await listener; + } +}); + +Deno.test({ + name: "queue persistence with inflight messages", + sanitizeOps: false, + sanitizeResources: false, + async fn() { + const filename = "cli/tests/testdata/queue.db"; + try { + await Deno.remove(filename); + } catch { + // pass + } + try { + let db: Deno.Kv = await Deno.openKv(filename); + + let count = 0; + let promise = deferred(); + + // Register long-running handler. + let listener = db.listenQueue(async (_msg) => { + count += 1; + if (count == 3) { + promise.resolve(); + } + await sleep(60000); + }); + + // Enqueue 3 messages. + await db.enqueue("msg0"); + await db.enqueue("msg1"); + await db.enqueue("msg2"); + await promise; + + // Close the database and wait for the listerner to finish. + db.close(); + await listener; + + // Now reopen the database. + db = await Deno.openKv(filename); + + count = 0; + promise = deferred(); + + // Register a handler that will complete quickly. + listener = db.listenQueue((_msg) => { + count += 1; + if (count == 3) { + promise.resolve(); + } + }); + + // Wait for the handlers to finish. + await promise; + assertEquals(3, count); + db.close(); + await listener; + } finally { + await Deno.remove(filename); + } + }, +}); + +Deno.test({ + name: "queue persistence with delay messages", + sanitizeOps: false, + sanitizeResources: false, + async fn() { + const filename = "cli/tests/testdata/queue.db"; + try { + await Deno.remove(filename); + } catch { + // pass + } + try { + let db: Deno.Kv = await Deno.openKv(filename); + + let count = 0; + let promise = deferred(); + + // Register long-running handler. + let listener = db.listenQueue((_msg) => {}); + + // Enqueue 3 messages into the future. + await db.enqueue("msg0", { delay: 10000 }); + await db.enqueue("msg1", { delay: 10000 }); + await db.enqueue("msg2", { delay: 10000 }); + + // Close the database and wait for the listerner to finish. + db.close(); + await listener; + + // Now reopen the database. + db = await Deno.openKv(filename); + + count = 0; + promise = deferred(); + + // Register a handler that will complete quickly. + listener = db.listenQueue((_msg) => { + count += 1; + if (count == 3) { + promise.resolve(); + } + }); + + // Wait for the handlers to finish. + await promise; + assertEquals(3, count); + db.close(); + await listener; + } finally { + await Deno.remove(filename); + } + }, +}); diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 27d3af4cd..8f11adfff 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1915,6 +1915,14 @@ declare namespace Deno { */ delete(key: KvKey): this; /** + * Add to the operation a mutation that enqueues a value into the queue + * if all checks pass during the commit. + */ + enqueue( + value: unknown, + options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + ): this; + /** * Commit the operation to the KV store. Returns a value indicating whether * checks passed and mutations were performed. If the operation failed * because of a failed check, the return value will be a {@linkcode @@ -2088,6 +2096,57 @@ declare namespace Deno { ): KvListIterator<T>; /** + * Add a value into the database queue to be delivered to the queue + * listener via {@linkcode Deno.Kv.listenQueue}. + * + * ```ts + * const db = await Deno.openKv(); + * await db.enqueue("bar"); + * ``` + * + * The `delay` option can be used to specify the delay (in milliseconds) + * of the value delivery. The default delay is 0, which means immediate + * delivery. + * + * ```ts + * const db = await Deno.openKv(); + * await db.enqueue("bar", { delay: 60000 }); + * ``` + * + * The `keysIfUndelivered` option can be used to specify the keys to + * be set if the value is not successfully delivered to the queue + * listener after several attempts. The values are set to the value of + * the queued message. + * + * ```ts + * const db = await Deno.openKv(); + * await db.enqueue("bar", { keysIfUndelivered: [["foo", "bar"]] }); + * ``` + */ + enqueue( + value: unknown, + options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + ): Promise<KvCommitResult>; + + /** + * Listen for queue values to be delivered from the database queue, which + * were enqueued with {@linkcode Deno.Kv.enqueue}. The provided handler + * callback is invoked on every dequeued value. A failed callback + * invocation is automatically retried multiple times until it succeeds + * or until the maximum number of retries is reached. + * + * ```ts + * const db = await Deno.openKv(); + * db.listenQueue(async (msg: unknown) => { + * await db.set(["foo"], msg); + * }); + * ``` + */ + listenQueue( + handler: (value: unknown) => Promise<void> | void, + ): Promise<void>; + + /** * Create a new {@linkcode Deno.AtomicOperation} object which can be used to * perform an atomic transaction on the database. This does not perform any * operations on the database - the atomic transaction must be committed |