summaryrefslogtreecommitdiff
path: root/ext/kv/01_db.ts
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2023-12-05 14:21:46 +0100
committerGitHub <noreply@github.com>2023-12-05 21:21:46 +0800
commit74e39a927c63e789fec1c8f1817812920079229d (patch)
treefa38e32c700865b25710f491d551086733d58d5f /ext/kv/01_db.ts
parenta24d3e8763bc48b69936db9231efb76766914303 (diff)
feat(unstable): kv.watch() (#21147)
This commit adds support for a new `kv.watch()` method that allows watching for changes to a key-value pair. This is useful for cases where you want to be notified when a key-value pair changes, but don't want to have to poll for changes. --------- Co-authored-by: losfair <zhy20000919@hotmail.com>
Diffstat (limited to 'ext/kv/01_db.ts')
-rw-r--r--ext/kv/01_db.ts67
1 files changed, 67 insertions, 0 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index 34678261a..73deee27f 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -11,8 +11,10 @@ const {
SymbolFor,
SymbolToStringTag,
Uint8ArrayPrototype,
+ Error,
} = globalThis.__bootstrap.primordials;
import { SymbolDispose } from "ext:deno_web/00_infra.js";
+import { ReadableStream } from "ext:deno_web/06_streams.js";
const core = Deno.core;
const ops = core.ops;
@@ -297,6 +299,71 @@ class Kv {
finishMessageOps.clear();
}
+ watch(keys: Deno.KvKey[], options = {}) {
+ const raw = options.raw ?? false;
+ const rid = ops.op_kv_watch(this.#rid, keys);
+ const lastEntries: (Deno.KvEntryMaybe<unknown> | undefined)[] = Array.from(
+ { length: keys.length },
+ () => undefined,
+ );
+ return new ReadableStream({
+ async pull(controller) {
+ while (true) {
+ let updates;
+ try {
+ updates = await core.opAsync("op_kv_watch_next", rid);
+ } catch (err) {
+ core.tryClose(rid);
+ controller.error(err);
+ return;
+ }
+ if (updates === null) {
+ core.tryClose(rid);
+ controller.close();
+ return;
+ }
+ let changed = false;
+ for (let i = 0; i < keys.length; i++) {
+ if (updates[i] === "unchanged") {
+ if (lastEntries[i] === undefined) {
+ throw new Error(
+ "watch: invalid unchanged update (internal error)",
+ );
+ }
+ continue;
+ }
+ if (
+ lastEntries[i] !== undefined &&
+ (updates[i]?.versionstamp ?? null) ===
+ lastEntries[i]?.versionstamp
+ ) {
+ continue;
+ }
+ changed = true;
+ if (updates[i] === null) {
+ lastEntries[i] = {
+ key: [...keys[i]],
+ value: null,
+ versionstamp: null,
+ };
+ } else {
+ lastEntries[i] = updates[i];
+ }
+ }
+ if (!changed && !raw) continue; // no change
+ const entries = lastEntries.map((entry) =>
+ entry.versionstamp === null ? { ...entry } : deserializeValue(entry)
+ );
+ controller.enqueue(entries);
+ return;
+ }
+ },
+ cancel() {
+ core.tryClose(rid);
+ },
+ });
+ }
+
close() {
core.close(this.#rid);
}