diff options
author | Luca Casonato <hello@lcas.dev> | 2023-12-05 14:21:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-05 21:21:46 +0800 |
commit | 74e39a927c63e789fec1c8f1817812920079229d (patch) | |
tree | fa38e32c700865b25710f491d551086733d58d5f /ext/kv/01_db.ts | |
parent | a24d3e8763bc48b69936db9231efb76766914303 (diff) |
feat(unstable): kv.watch() (#21147)
This commit adds support for a new `kv.watch()` method that allows
watching for changes to a key-value pair. This is useful for cases
where you want to be notified when a key-value pair changes, but
don't want to have to poll for changes.
---------
Co-authored-by: losfair <zhy20000919@hotmail.com>
Diffstat (limited to 'ext/kv/01_db.ts')
-rw-r--r-- | ext/kv/01_db.ts | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 34678261a..73deee27f 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -11,8 +11,10 @@ const { SymbolFor, SymbolToStringTag, Uint8ArrayPrototype, + Error, } = globalThis.__bootstrap.primordials; import { SymbolDispose } from "ext:deno_web/00_infra.js"; +import { ReadableStream } from "ext:deno_web/06_streams.js"; const core = Deno.core; const ops = core.ops; @@ -297,6 +299,71 @@ class Kv { finishMessageOps.clear(); } + watch(keys: Deno.KvKey[], options = {}) { + const raw = options.raw ?? false; + const rid = ops.op_kv_watch(this.#rid, keys); + const lastEntries: (Deno.KvEntryMaybe<unknown> | undefined)[] = Array.from( + { length: keys.length }, + () => undefined, + ); + return new ReadableStream({ + async pull(controller) { + while (true) { + let updates; + try { + updates = await core.opAsync("op_kv_watch_next", rid); + } catch (err) { + core.tryClose(rid); + controller.error(err); + return; + } + if (updates === null) { + core.tryClose(rid); + controller.close(); + return; + } + let changed = false; + for (let i = 0; i < keys.length; i++) { + if (updates[i] === "unchanged") { + if (lastEntries[i] === undefined) { + throw new Error( + "watch: invalid unchanged update (internal error)", + ); + } + continue; + } + if ( + lastEntries[i] !== undefined && + (updates[i]?.versionstamp ?? null) === + lastEntries[i]?.versionstamp + ) { + continue; + } + changed = true; + if (updates[i] === null) { + lastEntries[i] = { + key: [...keys[i]], + value: null, + versionstamp: null, + }; + } else { + lastEntries[i] = updates[i]; + } + } + if (!changed && !raw) continue; // no change + const entries = lastEntries.map((entry) => + entry.versionstamp === null ? { ...entry } : deserializeValue(entry) + ); + controller.enqueue(entries); + return; + } + }, + cancel() { + core.tryClose(rid); + }, + }); + } + close() { core.close(this.#rid); } |