diff options
-rw-r--r-- | Cargo.lock | 4 | ||||
-rw-r--r-- | cli/tests/integration/js_unit_tests.rs | 1 | ||||
-rw-r--r-- | cli/tests/unit/kv_queue_undelivered_test.ts | 56 | ||||
-rw-r--r-- | cli/tests/unit/kv_test.ts | 445 | ||||
-rw-r--r-- | cli/tsc/dts/lib.deno.unstable.d.ts | 59 | ||||
-rw-r--r-- | ext/kv/01_db.ts | 109 | ||||
-rw-r--r-- | ext/kv/Cargo.toml | 4 | ||||
-rw-r--r-- | ext/kv/interface.rs | 14 | ||||
-rw-r--r-- | ext/kv/lib.rs | 65 | ||||
-rw-r--r-- | ext/kv/sqlite.rs | 501 |
10 files changed, 1205 insertions, 53 deletions
diff --git a/Cargo.lock b/Cargo.lock index edb0a6e12..ecd31131c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1170,8 +1170,12 @@ dependencies = [ "deno_core", "hex", "num-bigint", + "rand", "rusqlite", "serde", + "serde_json", + "tokio", + "uuid", ] [[package]] diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs index 4d6ef3675..c8039f89c 100644 --- a/cli/tests/integration/js_unit_tests.rs +++ b/cli/tests/integration/js_unit_tests.rs @@ -46,6 +46,7 @@ util::unit_test_factory!( intl_test, io_test, kv_test, + kv_queue_undelivered_test, link_test, make_temp_test, message_channel_test, diff --git a/cli/tests/unit/kv_queue_undelivered_test.ts b/cli/tests/unit/kv_queue_undelivered_test.ts new file mode 100644 index 000000000..c2c02905c --- /dev/null +++ b/cli/tests/unit/kv_queue_undelivered_test.ts @@ -0,0 +1,56 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +import { assertEquals } from "./test_util.ts"; + +const sleep = (time: number) => new Promise((r) => setTimeout(r, time)); + +let isCI: boolean; +try { + isCI = Deno.env.get("CI") !== undefined; +} catch { + isCI = true; +} + +function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) { + Deno.test({ + name, + // https://github.com/denoland/deno/issues/18363 + ignore: Deno.build.os === "darwin" && isCI, + async fn() { + const db: Deno.Kv = await Deno.openKv( + ":memory:", + ); + await fn(db); + }, + }); +} + +async function collect<T>( + iter: Deno.KvListIterator<T>, +): Promise<Deno.KvEntry<T>[]> { + const entries: Deno.KvEntry<T>[] = []; + for await (const entry of iter) { + entries.push(entry); + } + return entries; +} + +queueTest("queue with undelivered", async (db) => { + const listener = db.listenQueue((_msg) => { + throw new TypeError("dequeue error"); + }); + try { + await db.enqueue("test", { + keysIfUndelivered: [["queue_failed", "a"], ["queue_failed", "b"]], + }); + await sleep(100000); + const undelivered = await collect(db.list({ prefix: ["queue_failed"] })); + assertEquals(undelivered.length, 2); + assertEquals(undelivered[0].key, ["queue_failed", "a"]); + assertEquals(undelivered[0].value, "test"); + assertEquals(undelivered[1].key, ["queue_failed", "b"]); + assertEquals(undelivered[1].value, "test"); + } finally { + db.close(); + await listener; + } +}); diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 3c5efa588..edb269e6e 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -3,11 +3,16 @@ import { assert, assertEquals, AssertionError, + assertNotEquals, assertRejects, assertThrows, + Deferred, + deferred, } from "./test_util.ts"; import { assertType, IsExact } from "../../../test_util/std/testing/types.ts"; +const sleep = (time: number) => new Promise((r) => setTimeout(r, time)); + let isCI: boolean; try { isCI = Deno.env.get("CI") !== undefined; @@ -59,6 +64,20 @@ function dbTest(name: string, fn: (db: Deno.Kv) => Promise<void>) { }); } +function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) { + Deno.test({ + name, + // https://github.com/denoland/deno/issues/18363 + ignore: Deno.build.os === "darwin" && isCI, + async fn() { + const db: Deno.Kv = await Deno.openKv( + ":memory:", + ); + await fn(db); + }, + }); +} + dbTest("basic read-write-delete and versionstamps", async (db) => { const result1 = await db.get(["a"]); assertEquals(result1.key, ["a"]); @@ -1304,3 +1323,429 @@ async function _typeCheckingTests() { assert(!j.done); assertType<IsExact<typeof j.value, Deno.KvEntry<string>>>(true); } + +queueTest("basic listenQueue and enqueue", async (db) => { + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + dequeuedMessage = msg; + promise.resolve(); + }); + try { + const res = await db.enqueue("test"); + assert(res.ok); + assertNotEquals(res.versionstamp, null); + await promise; + assertEquals(dequeuedMessage, "test"); + } finally { + db.close(); + await listener; + } +}); + +for (const { name, value } of VALUE_CASES) { + queueTest(`listenQueue and enqueue ${name}`, async (db) => { + const numEnqueues = 10; + let count = 0; + const promises: Deferred<void>[] = []; + const dequeuedMessages: unknown[] = []; + const listeners: Promise<void>[] = []; + listeners.push(db.listenQueue((msg) => { + dequeuedMessages.push(msg); + promises[count++].resolve(); + })); + try { + for (let i = 0; i < numEnqueues; i++) { + promises.push(deferred()); + await db.enqueue(value); + } + for (let i = 0; i < numEnqueues; i++) { + await promises[i]; + } + for (let i = 0; i < numEnqueues; i++) { + assertEquals(dequeuedMessages[i], value); + } + } finally { + db.close(); + for (const listener of listeners) { + await listener; + } + } + }); +} + +queueTest("queue mixed types", async (db) => { + let promise: Deferred<void>; + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + dequeuedMessage = msg; + promise.resolve(); + }); + try { + for (const item of VALUE_CASES) { + promise = deferred(); + await db.enqueue(item.value); + await promise; + assertEquals(dequeuedMessage, item.value); + } + } finally { + db.close(); + await listener; + } +}); + +queueTest("queue delay", async (db) => { + let dequeueTime: number | undefined; + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + dequeueTime = Date.now(); + dequeuedMessage = msg; + promise.resolve(); + }); + try { + const enqueueTime = Date.now(); + await db.enqueue("test", { delay: 1000 }); + await promise; + assertEquals(dequeuedMessage, "test"); + assert(dequeueTime !== undefined); + assert(dequeueTime - enqueueTime >= 1000); + } finally { + db.close(); + await listener; + } +}); + +queueTest("queue delay with atomic", async (db) => { + let dequeueTime: number | undefined; + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + dequeueTime = Date.now(); + dequeuedMessage = msg; + promise.resolve(); + }); + try { + const enqueueTime = Date.now(); + const res = await db.atomic() + .enqueue("test", { delay: 1000 }) + .commit(); + assert(res.ok); + + await promise; + assertEquals(dequeuedMessage, "test"); + assert(dequeueTime !== undefined); + assert(dequeueTime - enqueueTime >= 1000); + } finally { + db.close(); + await listener; + } +}); + +queueTest("queue delay and now", async (db) => { + let count = 0; + let dequeueTime: number | undefined; + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + count += 1; + if (count == 2) { + dequeueTime = Date.now(); + dequeuedMessage = msg; + promise.resolve(); + } + }); + try { + const enqueueTime = Date.now(); + await db.enqueue("test-1000", { delay: 1000 }); + await db.enqueue("test"); + await promise; + assertEquals(dequeuedMessage, "test-1000"); + assert(dequeueTime !== undefined); + assert(dequeueTime - enqueueTime >= 1000); + } finally { + db.close(); + await listener; + } +}); + +dbTest("queue negative delay", async (db) => { + await assertRejects(async () => { + await db.enqueue("test", { delay: -100 }); + }, TypeError); +}); + +dbTest("queue nan delay", async (db) => { + await assertRejects(async () => { + await db.enqueue("test", { delay: Number.NaN }); + }, TypeError); +}); + +dbTest("queue large delay", async (db) => { + await db.enqueue("test", { delay: 7 * 24 * 60 * 60 * 1000 }); + await assertRejects(async () => { + await db.enqueue("test", { delay: 7 * 24 * 60 * 60 * 1000 + 1 }); + }, TypeError); +}); + +queueTest("listenQueue with async callback", async (db) => { + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue(async (msg) => { + dequeuedMessage = msg; + await sleep(100); + promise.resolve(); + }); + try { + await db.enqueue("test"); + await promise; + assertEquals(dequeuedMessage, "test"); + } finally { + db.close(); + await listener; + } +}); + +queueTest("queue retries", async (db) => { + let count = 0; + const listener = db.listenQueue(async (_msg) => { + count += 1; + await sleep(10); + throw new TypeError("dequeue error"); + }); + try { + await db.enqueue("test"); + await sleep(10000); + } finally { + db.close(); + await listener; + } + + // There should have been 1 attempt + 3 retries in the 10 seconds + assertEquals(4, count); +}); + +queueTest("multiple listenQueues", async (db) => { + const numListens = 10; + let count = 0; + const promises: Deferred<void>[] = []; + const dequeuedMessages: unknown[] = []; + const listeners: Promise<void>[] = []; + for (let i = 0; i < numListens; i++) { + listeners.push(db.listenQueue((msg) => { + dequeuedMessages.push(msg); + promises[count++].resolve(); + })); + } + try { + for (let i = 0; i < numListens; i++) { + promises.push(deferred()); + await db.enqueue("msg_" + i); + await promises[i]; + const msg = dequeuedMessages[i]; + assertEquals("msg_" + i, msg); + } + } finally { + db.close(); + for (let i = 0; i < numListens; i++) { + await listeners[i]; + } + } +}); + +queueTest("enqueue with atomic", async (db) => { + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db.listenQueue((msg) => { + dequeuedMessage = msg; + promise.resolve(); + }); + + try { + await db.set(["t"], "1"); + + let currentValue = await db.get(["t"]); + assertEquals("1", currentValue.value); + + const res = await db.atomic() + .check(currentValue) + .set(currentValue.key, "2") + .enqueue("test") + .commit(); + assert(res.ok); + + await promise; + assertEquals("test", dequeuedMessage); + + currentValue = await db.get(["t"]); + assertEquals("2", currentValue.value); + } finally { + db.close(); + await listener; + } +}); + +queueTest("enqueue with atomic nonce", async (db) => { + const promise = deferred(); + let dequeuedMessage: unknown = null; + + const nonce = crypto.randomUUID(); + + const listener = db.listenQueue(async (val) => { + const message = val as { msg: string; nonce: string }; + const nonce = message.nonce; + const nonceValue = await db.get(["nonces", nonce]); + if (nonceValue.versionstamp === null) { + dequeuedMessage = message.msg; + promise.resolve(); + return; + } + + assertNotEquals(nonceValue.versionstamp, null); + const res = await db.atomic() + .check(nonceValue) + .delete(["nonces", nonce]) + .set(["a", "b"], message.msg) + .commit(); + if (res.ok) { + // Simulate an error so that the message has to be redelivered + throw new Error("injected error"); + } + }); + + try { + const res = await db.atomic() + .check({ key: ["nonces", nonce], versionstamp: null }) + .set(["nonces", nonce], true) + .enqueue({ msg: "test", nonce }) + .commit(); + assert(res.ok); + + await promise; + assertEquals("test", dequeuedMessage); + + const currentValue = await db.get(["a", "b"]); + assertEquals("test", currentValue.value); + + const nonceValue = await db.get(["nonces", nonce]); + assertEquals(nonceValue.versionstamp, null); + } finally { + db.close(); + await listener; + } +}); + +Deno.test({ + name: "queue persistence with inflight messages", + sanitizeOps: false, + sanitizeResources: false, + async fn() { + const filename = "cli/tests/testdata/queue.db"; + try { + await Deno.remove(filename); + } catch { + // pass + } + try { + let db: Deno.Kv = await Deno.openKv(filename); + + let count = 0; + let promise = deferred(); + + // Register long-running handler. + let listener = db.listenQueue(async (_msg) => { + count += 1; + if (count == 3) { + promise.resolve(); + } + await sleep(60000); + }); + + // Enqueue 3 messages. + await db.enqueue("msg0"); + await db.enqueue("msg1"); + await db.enqueue("msg2"); + await promise; + + // Close the database and wait for the listerner to finish. + db.close(); + await listener; + + // Now reopen the database. + db = await Deno.openKv(filename); + + count = 0; + promise = deferred(); + + // Register a handler that will complete quickly. + listener = db.listenQueue((_msg) => { + count += 1; + if (count == 3) { + promise.resolve(); + } + }); + + // Wait for the handlers to finish. + await promise; + assertEquals(3, count); + db.close(); + await listener; + } finally { + await Deno.remove(filename); + } + }, +}); + +Deno.test({ + name: "queue persistence with delay messages", + sanitizeOps: false, + sanitizeResources: false, + async fn() { + const filename = "cli/tests/testdata/queue.db"; + try { + await Deno.remove(filename); + } catch { + // pass + } + try { + let db: Deno.Kv = await Deno.openKv(filename); + + let count = 0; + let promise = deferred(); + + // Register long-running handler. + let listener = db.listenQueue((_msg) => {}); + + // Enqueue 3 messages into the future. + await db.enqueue("msg0", { delay: 10000 }); + await db.enqueue("msg1", { delay: 10000 }); + await db.enqueue("msg2", { delay: 10000 }); + + // Close the database and wait for the listerner to finish. + db.close(); + await listener; + + // Now reopen the database. + db = await Deno.openKv(filename); + + count = 0; + promise = deferred(); + + // Register a handler that will complete quickly. + listener = db.listenQueue((_msg) => { + count += 1; + if (count == 3) { + promise.resolve(); + } + }); + + // Wait for the handlers to finish. + await promise; + assertEquals(3, count); + db.close(); + await listener; + } finally { + await Deno.remove(filename); + } + }, +}); diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 27d3af4cd..8f11adfff 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1915,6 +1915,14 @@ declare namespace Deno { */ delete(key: KvKey): this; /** + * Add to the operation a mutation that enqueues a value into the queue + * if all checks pass during the commit. + */ + enqueue( + value: unknown, + options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + ): this; + /** * Commit the operation to the KV store. Returns a value indicating whether * checks passed and mutations were performed. If the operation failed * because of a failed check, the return value will be a {@linkcode @@ -2088,6 +2096,57 @@ declare namespace Deno { ): KvListIterator<T>; /** + * Add a value into the database queue to be delivered to the queue + * listener via {@linkcode Deno.Kv.listenQueue}. + * + * ```ts + * const db = await Deno.openKv(); + * await db.enqueue("bar"); + * ``` + * + * The `delay` option can be used to specify the delay (in milliseconds) + * of the value delivery. The default delay is 0, which means immediate + * delivery. + * + * ```ts + * const db = await Deno.openKv(); + * await db.enqueue("bar", { delay: 60000 }); + * ``` + * + * The `keysIfUndelivered` option can be used to specify the keys to + * be set if the value is not successfully delivered to the queue + * listener after several attempts. The values are set to the value of + * the queued message. + * + * ```ts + * const db = await Deno.openKv(); + * await db.enqueue("bar", { keysIfUndelivered: [["foo", "bar"]] }); + * ``` + */ + enqueue( + value: unknown, + options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + ): Promise<KvCommitResult>; + + /** + * Listen for queue values to be delivered from the database queue, which + * were enqueued with {@linkcode Deno.Kv.enqueue}. The provided handler + * callback is invoked on every dequeued value. A failed callback + * invocation is automatically retried multiple times until it succeeds + * or until the maximum number of retries is reached. + * + * ```ts + * const db = await Deno.openKv(); + * db.listenQueue(async (msg: unknown) => { + * await db.set(["foo"], msg); + * }); + * ``` + */ + listenQueue( + handler: (value: unknown) => Promise<void> | void, + ): Promise<void>; + + /** * Create a new {@linkcode Deno.AtomicOperation} object which can be used to * perform an atomic transaction on the database. This does not perform any * operations on the database - the atomic transaction must be committed diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index f8181cc2e..eb103ae0c 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -26,6 +26,20 @@ async function openKv(path: string) { return new Kv(rid, kvSymbol); } +const millisecondsInOneWeek = 7 * 24 * 60 * 60 * 1000; + +function validateQueueDelay(delay: number) { + if (delay < 0) { + throw new TypeError("delay cannot be negative"); + } + if (delay > millisecondsInOneWeek) { + throw new TypeError("delay cannot be greater than one week"); + } + if (isNaN(delay)) { + throw new TypeError("delay cannot be NaN"); + } +} + interface RawKvEntry { key: Deno.KvKey; value: RawValue; @@ -47,6 +61,7 @@ const kvSymbol = Symbol("KvRid"); class Kv { #rid: number; + #closed: boolean; constructor(rid: number = undefined, symbol: symbol = undefined) { if (kvSymbol !== symbol) { @@ -55,6 +70,7 @@ class Kv { ); } this.#rid = rid; + this.#closed = false; } atomic() { @@ -203,8 +219,82 @@ class Kv { }; } + async enqueue( + message: unknown, + opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + ) { + if (opts?.delay !== undefined) { + validateQueueDelay(opts?.delay); + } + + const enqueues = [ + [ + core.serialize(message, { forStorage: true }), + opts?.delay ?? 0, + opts?.keysIfUndelivered ?? [], + null, + ], + ]; + + const versionstamp = await core.opAsync( + "op_kv_atomic_write", + this.#rid, + [], + [], + enqueues, + ); + if (versionstamp === null) throw new TypeError("Failed to enqueue value"); + return { ok: true, versionstamp }; + } + + async listenQueue( + handler: (message: unknown) => Promise<void> | void, + ): Promise<void> { + while (!this.#closed) { + // Wait for the next message. + let next: { 0: Uint8Array; 1: number }; + try { + next = await core.opAsync( + "op_kv_dequeue_next_message", + this.#rid, + ); + } catch (error) { + if (this.#closed) { + break; + } else { + throw error; + } + } + + // Deserialize the payload. + const { 0: payload, 1: handleId } = next; + const deserializedPayload = core.deserialize(payload, { + forStorage: true, + }); + + // Dispatch the payload. + (async () => { + let success = false; + try { + const result = handler(deserializedPayload); + const _res = result instanceof Promise ? (await result) : result; + success = true; + } catch (error) { + console.error("Exception in queue handler", error); + } finally { + await core.opAsync( + "op_kv_finish_dequeued_message", + handleId, + success, + ); + } + })(); + } + } + close() { core.close(this.#rid); + this.#closed = true; } } @@ -213,6 +303,7 @@ class AtomicOperation { #checks: [Deno.KvKey, string | null][] = []; #mutations: [Deno.KvKey, string, RawValue | null][] = []; + #enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][] = []; constructor(rid: number) { this.#rid = rid; @@ -280,13 +371,29 @@ class AtomicOperation { return this; } + enqueue( + message: unknown, + opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + ): this { + if (opts?.delay !== undefined) { + validateQueueDelay(opts?.delay); + } + this.#enqueues.push([ + core.serialize(message, { forStorage: true }), + opts?.delay ?? 0, + opts?.keysIfUndelivered ?? [], + null, + ]); + return this; + } + async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> { const versionstamp = await core.opAsync( "op_kv_atomic_write", this.#rid, this.#checks, this.#mutations, - [], // TODO(@losfair): enqueue + this.#enqueues, ); if (versionstamp === null) return { ok: false }; return { ok: true, versionstamp }; diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml index 1cb64c099..b25837143 100644 --- a/ext/kv/Cargo.toml +++ b/ext/kv/Cargo.toml @@ -20,5 +20,9 @@ base64.workspace = true deno_core.workspace = true hex.workspace = true num-bigint.workspace = true +rand.workspace = true rusqlite.workspace = true serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +uuid.workspace = true diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs index 31b7638b4..b67ee1243 100644 --- a/ext/kv/interface.rs +++ b/ext/kv/interface.rs @@ -25,6 +25,8 @@ pub trait DatabaseHandler { #[async_trait(?Send)] pub trait Database { + type QMH: QueueMessageHandle + 'static; + async fn snapshot_read( &self, requests: Vec<ReadRange>, @@ -35,6 +37,16 @@ pub trait Database { &self, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError>; + + async fn dequeue_next_message(&self) -> Result<Self::QMH, AnyError>; + + fn close(&self); +} + +#[async_trait(?Send)] +pub trait QueueMessageHandle { + async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError>; + async fn finish(&self, success: bool) -> Result<(), AnyError>; } /// Options for a snapshot read. @@ -242,7 +254,7 @@ pub struct KvMutation { /// keys specified in `keys_if_undelivered`. pub struct Enqueue { pub payload: Vec<u8>, - pub deadline_ms: u64, + pub delay_ms: u64, pub keys_if_undelivered: Vec<Vec<u8>>, pub backoff_schedule: Option<Vec<u32>>, } diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index dbc626225..2763fcf50 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -8,6 +8,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::num::NonZeroU32; use std::rc::Rc; +use std::vec; use codec::decode_key; use codec::encode_key; @@ -60,6 +61,8 @@ deno_core::extension!(deno_kv, op_kv_snapshot_read<DBH>, op_kv_atomic_write<DBH>, op_kv_encode_cursor, + op_kv_dequeue_next_message<DBH>, + op_kv_finish_dequeued_message<DBH>, ], esm = [ "01_db.ts" ], options = { @@ -80,6 +83,10 @@ impl<DB: Database + 'static> Resource for DatabaseResource<DB> { fn name(&self) -> Cow<str> { "database".into() } + + fn close(self: Rc<Self>) { + self.db.close(); + } } #[op] @@ -280,6 +287,62 @@ where Ok(output_ranges) } +struct QueueMessageResource<QPH: QueueMessageHandle + 'static> { + handle: QPH, +} + +impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> { + fn name(&self) -> Cow<str> { + "queue_message".into() + } +} + +#[op] +async fn op_kv_dequeue_next_message<DBH>( + state: Rc<RefCell<OpState>>, + rid: ResourceId, +) -> Result<(ZeroCopyBuf, ResourceId), AnyError> +where + DBH: DatabaseHandler + 'static, +{ + let db = { + let state = state.borrow(); + let resource = + state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?; + resource.db.clone() + }; + + let mut handle = db.dequeue_next_message().await?; + let payload = handle.take_payload().await?.into(); + let handle_rid = { + let mut state = state.borrow_mut(); + state.resource_table.add(QueueMessageResource { handle }) + }; + Ok((payload, handle_rid)) +} + +#[op] +async fn op_kv_finish_dequeued_message<DBH>( + state: Rc<RefCell<OpState>>, + handle_rid: ResourceId, + success: bool, +) -> Result<(), AnyError> +where + DBH: DatabaseHandler + 'static, +{ + let handle = { + let mut state = state.borrow_mut(); + let handle = state + .resource_table + .take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid) + .map_err(|_| type_error("Queue message not found"))?; + Rc::try_unwrap(handle) + .map_err(|_| type_error("Queue message not found"))? + .handle + }; + handle.finish(success).await +} + type V8KvCheck = (KvKey, Option<ByteString>); impl TryFrom<V8KvCheck> for KvCheck { @@ -333,7 +396,7 @@ impl TryFrom<V8Enqueue> for Enqueue { fn try_from(value: V8Enqueue) -> Result<Self, AnyError> { Ok(Enqueue { payload: value.0.to_vec(), - deadline_ms: value.1, + delay_ms: value.1, keys_if_undelivered: value .2 .into_iter() diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index 80d230ab1..6cff3145d 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -7,10 +7,17 @@ use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; +use std::rc::Weak; +use std::sync::Arc; +use std::time::Duration; +use std::time::SystemTime; use async_trait::async_trait; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures; +use deno_core::futures::FutureExt; +use deno_core::task::spawn; use deno_core::task::spawn_blocking; use deno_core::AsyncRefCell; use deno_core::OpState; @@ -18,6 +25,12 @@ use rusqlite::params; use rusqlite::OpenFlags; use rusqlite::OptionalExtension; use rusqlite::Transaction; +use tokio::sync::mpsc; +use tokio::sync::watch; +use tokio::sync::OnceCell; +use tokio::sync::OwnedSemaphorePermit; +use tokio::sync::Semaphore; +use uuid::Uuid; use crate::AtomicWrite; use crate::CommitResult; @@ -25,6 +38,7 @@ use crate::Database; use crate::DatabaseHandler; use crate::KvEntry; use crate::MutationKind; +use crate::QueueMessageHandle; use crate::ReadRange; use crate::ReadRangeOutput; use crate::SnapshotReadOptions; @@ -44,6 +58,18 @@ const STATEMENT_KV_POINT_SET: &str = "insert into kv (k, v, v_encoding, version) values (:k, :v, :v_encoding, :version) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version"; const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?"; +const STATEMENT_QUEUE_ADD_READY: &str = "insert into queue (ts, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)"; +const STATEMENT_QUEUE_GET_NEXT_READY: &str = "select ts, id, data, backoff_schedule, keys_if_undelivered from queue where ts <= ? order by ts limit 100"; +const STATEMENT_QUEUE_GET_EARLIEST_READY: &str = + "select ts from queue order by ts limit 1"; +const STATEMENT_QUEUE_REMOVE_READY: &str = "delete from queue where id = ?"; +const STATEMENT_QUEUE_ADD_RUNNING: &str = "insert into queue_running (deadline, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)"; +const STATEMENT_QUEUE_REMOVE_RUNNING: &str = + "delete from queue_running where id = ?"; +const STATEMENT_QUEUE_GET_RUNNING_BY_ID: &str = "select deadline, id, data, backoff_schedule, keys_if_undelivered from queue_running where id = ?"; +const STATEMENT_QUEUE_GET_RUNNING: &str = + "select id from queue_running order by deadline limit 100"; + const STATEMENT_CREATE_MIGRATION_TABLE: &str = " create table if not exists migration_state( k integer not null primary key, @@ -87,6 +113,9 @@ create table queue_running( ", ]; +const DISPATCH_CONCURRENCY_LIMIT: usize = 100; +const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1000, 5000, 30000, 60000]; + pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> { pub default_storage_dir: Option<PathBuf>, _permissions: PhantomData<P>, @@ -182,14 +211,23 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { .await .unwrap()?; - Ok(SqliteDb(Rc::new(AsyncRefCell::new(Cell::new(Some(conn)))))) + Ok(SqliteDb { + conn: Rc::new(AsyncRefCell::new(Cell::new(Some(conn)))), + queue: OnceCell::new(), + }) } } -pub struct SqliteDb(Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>); +pub struct SqliteDb { + conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + queue: OnceCell<SqliteQueue>, +} impl SqliteDb { - async fn run_tx<F, R>(&self, f: F) -> Result<R, AnyError> + async fn run_tx<F, R>( + conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + f: F, + ) -> Result<R, AnyError> where F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>) + Send @@ -198,7 +236,7 @@ impl SqliteDb { { // Transactions need exclusive access to the connection. Wait until // we can borrow_mut the connection. - let cell = self.0.borrow_mut().await; + let cell = conn.borrow_mut().await; // Take the db out of the cell and run the transaction via spawn_blocking. let mut db = cell.take().unwrap(); @@ -220,59 +258,372 @@ impl SqliteDb { } } +pub struct DequeuedMessage { + conn: Weak<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + id: String, + payload: Option<Vec<u8>>, + waker_tx: mpsc::Sender<()>, + _permit: OwnedSemaphorePermit, +} + +#[async_trait(?Send)] +impl QueueMessageHandle for DequeuedMessage { + async fn finish(&self, success: bool) -> Result<(), AnyError> { + let Some(conn) = self.conn.upgrade() else { + return Ok(()); + }; + let id = self.id.clone(); + let requeued = SqliteDb::run_tx(conn, move |tx| { + let requeued = { + if success { + let changed = tx + .prepare_cached(STATEMENT_QUEUE_REMOVE_RUNNING)? + .execute([&id])?; + assert!(changed <= 1); + false + } else { + SqliteQueue::requeue_message(&id, &tx)? + } + }; + tx.commit()?; + Ok(requeued) + }) + .await?; + if requeued { + // If the message was requeued, wake up the dequeue loop. + self.waker_tx.send(()).await?; + } + Ok(()) + } + + async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> { + self + .payload + .take() + .ok_or_else(|| type_error("Payload already consumed")) + } +} + +type DequeueReceiver = mpsc::Receiver<(Vec<u8>, String)>; + +struct SqliteQueue { + conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>, + concurrency_limiter: Arc<Semaphore>, + waker_tx: mpsc::Sender<()>, + shutdown_tx: watch::Sender<()>, +} + +impl SqliteQueue { + fn new(conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>) -> Self { + let conn_clone = conn.clone(); + let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); + let (waker_tx, waker_rx) = mpsc::channel::<()>(1); + let (dequeue_tx, dequeue_rx) = mpsc::channel::<(Vec<u8>, String)>(64); + + spawn(async move { + // Oneshot requeue of all inflight messages. + Self::requeue_inflight_messages(conn.clone()).await.unwrap(); + + // Continous dequeue loop. + Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx) + .await + .unwrap(); + }); + + Self { + conn: conn_clone, + dequeue_rx: Rc::new(AsyncRefCell::new(dequeue_rx)), + waker_tx, + shutdown_tx, + concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)), + } + } + + async fn dequeue(&self) -> Result<DequeuedMessage, AnyError> { + // Wait for the next message to be available from dequeue_rx. + let (payload, id) = { + let mut queue_rx = self.dequeue_rx.borrow_mut().await; + let Some(msg) = queue_rx.recv().await else { + return Err(type_error("Database closed")); + }; + msg + }; + + let permit = self.concurrency_limiter.clone().acquire_owned().await?; + + Ok(DequeuedMessage { + conn: Rc::downgrade(&self.conn), + id, + payload: Some(payload), + waker_tx: self.waker_tx.clone(), + _permit: permit, + }) + } + + async fn wake(&self) -> Result<(), AnyError> { + self.waker_tx.send(()).await?; + Ok(()) + } + + fn shutdown(&self) { + self.shutdown_tx.send(()).unwrap(); + } + + async fn dequeue_loop( + conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + dequeue_tx: mpsc::Sender<(Vec<u8>, String)>, + mut shutdown_rx: watch::Receiver<()>, + mut waker_rx: mpsc::Receiver<()>, + ) -> Result<(), AnyError> { + loop { + let messages = SqliteDb::run_tx(conn.clone(), move |tx| { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + let messages = tx + .prepare_cached(STATEMENT_QUEUE_GET_NEXT_READY)? + .query_map([now], |row| { + let ts: u64 = row.get(0)?; + let id: String = row.get(1)?; + let data: Vec<u8> = row.get(2)?; + let backoff_schedule: String = row.get(3)?; + let keys_if_undelivered: String = row.get(4)?; + Ok((ts, id, data, backoff_schedule, keys_if_undelivered)) + })? + .collect::<Result<Vec<_>, rusqlite::Error>>()?; + + for (ts, id, data, backoff_schedule, keys_if_undelivered) in &messages { + let changed = tx + .prepare_cached(STATEMENT_QUEUE_REMOVE_READY)? + .execute(params![id])?; + assert_eq!(changed, 1); + + let changed = + tx.prepare_cached(STATEMENT_QUEUE_ADD_RUNNING)?.execute( + params![ts, id, &data, &backoff_schedule, &keys_if_undelivered], + )?; + assert_eq!(changed, 1); + } + tx.commit()?; + + Ok( + messages + .into_iter() + .map(|(_, id, data, _, _)| (id, data)) + .collect::<Vec<_>>(), + ) + }) + .await?; + + let busy = !messages.is_empty(); + + for (id, data) in messages { + if dequeue_tx.send((data, id)).await.is_err() { + // Queue receiver was dropped. Stop the dequeue loop. + return Ok(()); + } + } + + if !busy { + // There's nothing to dequeue right now; sleep until one of the + // following happens: + // - It's time to dequeue the next message based on its timestamp + // - A new message is added to the queue + // - The database is closed + let sleep_fut = { + match Self::get_earliest_ready_ts(conn.clone()).await? { + Some(ts) => { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + if ts <= now { + continue; + } + tokio::time::sleep(Duration::from_millis(ts - now)).boxed() + } + None => futures::future::pending().boxed(), + } + }; + tokio::select! { + _ = sleep_fut => {} + _ = waker_rx.recv() => {} + _ = shutdown_rx.changed() => return Ok(()) + } + } + } + } + + async fn get_earliest_ready_ts( + conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + ) -> Result<Option<u64>, AnyError> { + SqliteDb::run_tx(conn.clone(), move |tx| { + let ts = tx + .prepare_cached(STATEMENT_QUEUE_GET_EARLIEST_READY)? + .query_row([], |row| { + let ts: u64 = row.get(0)?; + Ok(ts) + }) + .optional()?; + Ok(ts) + }) + .await + } + + async fn requeue_inflight_messages( + conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + ) -> Result<(), AnyError> { + loop { + let done = SqliteDb::run_tx(conn.clone(), move |tx| { + let entries = tx + .prepare_cached(STATEMENT_QUEUE_GET_RUNNING)? + .query_map([], |row| { + let id: String = row.get(0)?; + Ok(id) + })? + .collect::<Result<Vec<_>, rusqlite::Error>>()?; + for id in &entries { + Self::requeue_message(id, &tx)?; + } + tx.commit()?; + Ok(entries.is_empty()) + }) + .await?; + if done { + return Ok(()); + } + } + } + + fn requeue_message( + id: &str, + tx: &rusqlite::Transaction<'_>, + ) -> Result<bool, AnyError> { + let Some((_, id, data, backoff_schedule, keys_if_undelivered)) = tx + .prepare_cached(STATEMENT_QUEUE_GET_RUNNING_BY_ID)? + .query_row([id], |row| { + let deadline: u64 = row.get(0)?; + let id: String = row.get(1)?; + let data: Vec<u8> = row.get(2)?; + let backoff_schedule: String = row.get(3)?; + let keys_if_undelivered: String = row.get(4)?; + Ok((deadline, id, data, backoff_schedule, keys_if_undelivered)) + }) + .optional()? else { + return Ok(false); + }; + + let backoff_schedule = { + let backoff_schedule = + serde_json::from_str::<Option<Vec<u64>>>(&backoff_schedule)?; + backoff_schedule.unwrap_or_default() + }; + + let mut requeued = false; + if !backoff_schedule.is_empty() { + // Requeue based on backoff schedule + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let new_ts = now + backoff_schedule[0]; + let new_backoff_schedule = serde_json::to_string(&backoff_schedule[1..])?; + let changed = tx + .prepare_cached(STATEMENT_QUEUE_ADD_READY)? + .execute(params![ + new_ts, + id, + &data, + &new_backoff_schedule, + &keys_if_undelivered + ]) + .unwrap(); + assert_eq!(changed, 1); + requeued = true; + } else if !keys_if_undelivered.is_empty() { + // No more requeues. Insert the message into the undelivered queue. + let keys_if_undelivered = + serde_json::from_str::<Vec<Vec<u8>>>(&keys_if_undelivered)?; + + let version: i64 = tx + .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)? + .query_row([], |row| row.get(0))?; + + for key in keys_if_undelivered { + let changed = tx + .prepare_cached(STATEMENT_KV_POINT_SET)? + .execute(params![key, &data, &VALUE_ENCODING_V8, &version])?; + assert_eq!(changed, 1); + } + } + + // Remove from running + let changed = tx + .prepare_cached(STATEMENT_QUEUE_REMOVE_RUNNING)? + .execute(params![id])?; + assert_eq!(changed, 1); + + Ok(requeued) + } +} + #[async_trait(?Send)] impl Database for SqliteDb { + type QMH = DequeuedMessage; + async fn snapshot_read( &self, requests: Vec<ReadRange>, _options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError> { - self - .run_tx(move |tx| { - let mut responses = Vec::with_capacity(requests.len()); - for request in requests { - let mut stmt = tx.prepare_cached(if request.reverse { - STATEMENT_KV_RANGE_SCAN_REVERSE - } else { - STATEMENT_KV_RANGE_SCAN - })?; - let entries = stmt - .query_map( - ( - request.start.as_slice(), - request.end.as_slice(), - request.limit.get(), - ), - |row| { - let key: Vec<u8> = row.get(0)?; - let value: Vec<u8> = row.get(1)?; - let encoding: i64 = row.get(2)?; - - let value = decode_value(value, encoding); - - let version: i64 = row.get(3)?; - Ok(KvEntry { - key, - value, - versionstamp: version_to_versionstamp(version), - }) - }, - )? - .collect::<Result<Vec<_>, rusqlite::Error>>()?; - responses.push(ReadRangeOutput { entries }); - } + Self::run_tx(self.conn.clone(), move |tx| { + let mut responses = Vec::with_capacity(requests.len()); + for request in requests { + let mut stmt = tx.prepare_cached(if request.reverse { + STATEMENT_KV_RANGE_SCAN_REVERSE + } else { + STATEMENT_KV_RANGE_SCAN + })?; + let entries = stmt + .query_map( + ( + request.start.as_slice(), + request.end.as_slice(), + request.limit.get(), + ), + |row| { + let key: Vec<u8> = row.get(0)?; + let value: Vec<u8> = row.get(1)?; + let encoding: i64 = row.get(2)?; + + let value = decode_value(value, encoding); + + let version: i64 = row.get(3)?; + Ok(KvEntry { + key, + value, + versionstamp: version_to_versionstamp(version), + }) + }, + )? + .collect::<Result<Vec<_>, rusqlite::Error>>()?; + responses.push(ReadRangeOutput { entries }); + } - Ok(responses) - }) - .await + Ok(responses) + }) + .await } async fn atomic_write( &self, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError> { - self - .run_tx(move |tx| { + let (has_enqueues, commit_result) = + Self::run_tx(self.conn.clone(), move |tx| { for check in write.checks { let real_versionstamp = tx .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)? @@ -280,7 +631,7 @@ impl Database for SqliteDb { .optional()? .map(version_to_versionstamp); if real_versionstamp != check.versionstamp { - return Ok(None); + return Ok((false, None)); } } @@ -336,17 +687,67 @@ impl Database for SqliteDb { } } - // TODO(@losfair): enqueues + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + let has_enqueues = !write.enqueues.is_empty(); + for enqueue in write.enqueues { + let id = Uuid::new_v4().to_string(); + let backoff_schedule = serde_json::to_string( + &enqueue + .backoff_schedule + .or_else(|| Some(DEFAULT_BACKOFF_SCHEDULE.to_vec())), + )?; + let keys_if_undelivered = + serde_json::to_string(&enqueue.keys_if_undelivered)?; + + let changed = + tx.prepare_cached(STATEMENT_QUEUE_ADD_READY)? + .execute(params![ + now + enqueue.delay_ms, + id, + &enqueue.payload, + &backoff_schedule, + &keys_if_undelivered + ])?; + assert_eq!(changed, 1) + } tx.commit()?; - let new_vesionstamp = version_to_versionstamp(version); - Ok(Some(CommitResult { - versionstamp: new_vesionstamp, - })) + Ok(( + has_enqueues, + Some(CommitResult { + versionstamp: new_vesionstamp, + }), + )) }) - .await + .await?; + + if has_enqueues { + if let Some(queue) = self.queue.get() { + queue.wake().await?; + } + } + Ok(commit_result) + } + + async fn dequeue_next_message(&self) -> Result<Self::QMH, AnyError> { + let queue = self + .queue + .get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) }) + .await; + let handle = queue.dequeue().await?; + Ok(handle) + } + + fn close(&self) { + if let Some(queue) = self.queue.get() { + queue.shutdown(); + } } } |