diff options
author | Igor Zinkovsky <igor@deno.com> | 2023-06-13 17:49:57 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-13 17:49:57 -0700 |
commit | fd9d6baea311d9b227b130749647a86838ba2ccc (patch) | |
tree | f0207f62199d5db463fd7f208d45c9bf09c8db14 /ext/kv/01_db.ts | |
parent | d451abfc9154e02f39c08d10c185d1618b2ef6d3 (diff) |
feat(kv) queue implementation (#19459)
Extend the unstable `Deno.Kv` API to support queues.
Diffstat (limited to 'ext/kv/01_db.ts')
-rw-r--r-- | ext/kv/01_db.ts | 109 |
1 files changed, 108 insertions, 1 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index f8181cc2e..eb103ae0c 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -26,6 +26,20 @@ async function openKv(path: string) { return new Kv(rid, kvSymbol); } +const millisecondsInOneWeek = 7 * 24 * 60 * 60 * 1000; + +function validateQueueDelay(delay: number) { + if (delay < 0) { + throw new TypeError("delay cannot be negative"); + } + if (delay > millisecondsInOneWeek) { + throw new TypeError("delay cannot be greater than one week"); + } + if (isNaN(delay)) { + throw new TypeError("delay cannot be NaN"); + } +} + interface RawKvEntry { key: Deno.KvKey; value: RawValue; @@ -47,6 +61,7 @@ const kvSymbol = Symbol("KvRid"); class Kv { #rid: number; + #closed: boolean; constructor(rid: number = undefined, symbol: symbol = undefined) { if (kvSymbol !== symbol) { @@ -55,6 +70,7 @@ class Kv { ); } this.#rid = rid; + this.#closed = false; } atomic() { @@ -203,8 +219,82 @@ class Kv { }; } + async enqueue( + message: unknown, + opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + ) { + if (opts?.delay !== undefined) { + validateQueueDelay(opts?.delay); + } + + const enqueues = [ + [ + core.serialize(message, { forStorage: true }), + opts?.delay ?? 0, + opts?.keysIfUndelivered ?? [], + null, + ], + ]; + + const versionstamp = await core.opAsync( + "op_kv_atomic_write", + this.#rid, + [], + [], + enqueues, + ); + if (versionstamp === null) throw new TypeError("Failed to enqueue value"); + return { ok: true, versionstamp }; + } + + async listenQueue( + handler: (message: unknown) => Promise<void> | void, + ): Promise<void> { + while (!this.#closed) { + // Wait for the next message. + let next: { 0: Uint8Array; 1: number }; + try { + next = await core.opAsync( + "op_kv_dequeue_next_message", + this.#rid, + ); + } catch (error) { + if (this.#closed) { + break; + } else { + throw error; + } + } + + // Deserialize the payload. + const { 0: payload, 1: handleId } = next; + const deserializedPayload = core.deserialize(payload, { + forStorage: true, + }); + + // Dispatch the payload. + (async () => { + let success = false; + try { + const result = handler(deserializedPayload); + const _res = result instanceof Promise ? (await result) : result; + success = true; + } catch (error) { + console.error("Exception in queue handler", error); + } finally { + await core.opAsync( + "op_kv_finish_dequeued_message", + handleId, + success, + ); + } + })(); + } + } + close() { core.close(this.#rid); + this.#closed = true; } } @@ -213,6 +303,7 @@ class AtomicOperation { #checks: [Deno.KvKey, string | null][] = []; #mutations: [Deno.KvKey, string, RawValue | null][] = []; + #enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][] = []; constructor(rid: number) { this.#rid = rid; @@ -280,13 +371,29 @@ class AtomicOperation { return this; } + enqueue( + message: unknown, + opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + ): this { + if (opts?.delay !== undefined) { + validateQueueDelay(opts?.delay); + } + this.#enqueues.push([ + core.serialize(message, { forStorage: true }), + opts?.delay ?? 0, + opts?.keysIfUndelivered ?? [], + null, + ]); + return this; + } + async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> { const versionstamp = await core.opAsync( "op_kv_atomic_write", this.#rid, this.#checks, this.#mutations, - [], // TODO(@losfair): enqueue + this.#enqueues, ); if (versionstamp === null) return { ok: false }; return { ok: true, versionstamp }; |