summaryrefslogtreecommitdiff
path: root/ext/kv/01_db.ts
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-06-13 17:49:57 -0700
committerGitHub <noreply@github.com>2023-06-13 17:49:57 -0700
commitfd9d6baea311d9b227b130749647a86838ba2ccc (patch)
treef0207f62199d5db463fd7f208d45c9bf09c8db14 /ext/kv/01_db.ts
parentd451abfc9154e02f39c08d10c185d1618b2ef6d3 (diff)
feat(kv) queue implementation (#19459)
Extend the unstable `Deno.Kv` API to support queues.
Diffstat (limited to 'ext/kv/01_db.ts')
-rw-r--r--ext/kv/01_db.ts109
1 files changed, 108 insertions, 1 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index f8181cc2e..eb103ae0c 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -26,6 +26,20 @@ async function openKv(path: string) {
return new Kv(rid, kvSymbol);
}
+const millisecondsInOneWeek = 7 * 24 * 60 * 60 * 1000;
+
+function validateQueueDelay(delay: number) {
+ if (delay < 0) {
+ throw new TypeError("delay cannot be negative");
+ }
+ if (delay > millisecondsInOneWeek) {
+ throw new TypeError("delay cannot be greater than one week");
+ }
+ if (isNaN(delay)) {
+ throw new TypeError("delay cannot be NaN");
+ }
+}
+
interface RawKvEntry {
key: Deno.KvKey;
value: RawValue;
@@ -47,6 +61,7 @@ const kvSymbol = Symbol("KvRid");
class Kv {
#rid: number;
+ #closed: boolean;
constructor(rid: number = undefined, symbol: symbol = undefined) {
if (kvSymbol !== symbol) {
@@ -55,6 +70,7 @@ class Kv {
);
}
this.#rid = rid;
+ this.#closed = false;
}
atomic() {
@@ -203,8 +219,82 @@ class Kv {
};
}
+ async enqueue(
+ message: unknown,
+ opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
+ ) {
+ if (opts?.delay !== undefined) {
+ validateQueueDelay(opts?.delay);
+ }
+
+ const enqueues = [
+ [
+ core.serialize(message, { forStorage: true }),
+ opts?.delay ?? 0,
+ opts?.keysIfUndelivered ?? [],
+ null,
+ ],
+ ];
+
+ const versionstamp = await core.opAsync(
+ "op_kv_atomic_write",
+ this.#rid,
+ [],
+ [],
+ enqueues,
+ );
+ if (versionstamp === null) throw new TypeError("Failed to enqueue value");
+ return { ok: true, versionstamp };
+ }
+
+ async listenQueue(
+ handler: (message: unknown) => Promise<void> | void,
+ ): Promise<void> {
+ while (!this.#closed) {
+ // Wait for the next message.
+ let next: { 0: Uint8Array; 1: number };
+ try {
+ next = await core.opAsync(
+ "op_kv_dequeue_next_message",
+ this.#rid,
+ );
+ } catch (error) {
+ if (this.#closed) {
+ break;
+ } else {
+ throw error;
+ }
+ }
+
+ // Deserialize the payload.
+ const { 0: payload, 1: handleId } = next;
+ const deserializedPayload = core.deserialize(payload, {
+ forStorage: true,
+ });
+
+ // Dispatch the payload.
+ (async () => {
+ let success = false;
+ try {
+ const result = handler(deserializedPayload);
+ const _res = result instanceof Promise ? (await result) : result;
+ success = true;
+ } catch (error) {
+ console.error("Exception in queue handler", error);
+ } finally {
+ await core.opAsync(
+ "op_kv_finish_dequeued_message",
+ handleId,
+ success,
+ );
+ }
+ })();
+ }
+ }
+
close() {
core.close(this.#rid);
+ this.#closed = true;
}
}
@@ -213,6 +303,7 @@ class AtomicOperation {
#checks: [Deno.KvKey, string | null][] = [];
#mutations: [Deno.KvKey, string, RawValue | null][] = [];
+ #enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][] = [];
constructor(rid: number) {
this.#rid = rid;
@@ -280,13 +371,29 @@ class AtomicOperation {
return this;
}
+ enqueue(
+ message: unknown,
+ opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
+ ): this {
+ if (opts?.delay !== undefined) {
+ validateQueueDelay(opts?.delay);
+ }
+ this.#enqueues.push([
+ core.serialize(message, { forStorage: true }),
+ opts?.delay ?? 0,
+ opts?.keysIfUndelivered ?? [],
+ null,
+ ]);
+ return this;
+ }
+
async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> {
const versionstamp = await core.opAsync(
"op_kv_atomic_write",
this.#rid,
this.#checks,
this.#mutations,
- [], // TODO(@losfair): enqueue
+ this.#enqueues,
);
if (versionstamp === null) return { ok: false };
return { ok: true, versionstamp };