diff options
Diffstat (limited to 'ext/kv/01_db.ts')
-rw-r--r-- | ext/kv/01_db.ts | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 34678261a..73deee27f 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -11,8 +11,10 @@ const { SymbolFor, SymbolToStringTag, Uint8ArrayPrototype, + Error, } = globalThis.__bootstrap.primordials; import { SymbolDispose } from "ext:deno_web/00_infra.js"; +import { ReadableStream } from "ext:deno_web/06_streams.js"; const core = Deno.core; const ops = core.ops; @@ -297,6 +299,71 @@ class Kv { finishMessageOps.clear(); } + watch(keys: Deno.KvKey[], options = {}) { + const raw = options.raw ?? false; + const rid = ops.op_kv_watch(this.#rid, keys); + const lastEntries: (Deno.KvEntryMaybe<unknown> | undefined)[] = Array.from( + { length: keys.length }, + () => undefined, + ); + return new ReadableStream({ + async pull(controller) { + while (true) { + let updates; + try { + updates = await core.opAsync("op_kv_watch_next", rid); + } catch (err) { + core.tryClose(rid); + controller.error(err); + return; + } + if (updates === null) { + core.tryClose(rid); + controller.close(); + return; + } + let changed = false; + for (let i = 0; i < keys.length; i++) { + if (updates[i] === "unchanged") { + if (lastEntries[i] === undefined) { + throw new Error( + "watch: invalid unchanged update (internal error)", + ); + } + continue; + } + if ( + lastEntries[i] !== undefined && + (updates[i]?.versionstamp ?? null) === + lastEntries[i]?.versionstamp + ) { + continue; + } + changed = true; + if (updates[i] === null) { + lastEntries[i] = { + key: [...keys[i]], + value: null, + versionstamp: null, + }; + } else { + lastEntries[i] = updates[i]; + } + } + if (!changed && !raw) continue; // no change + const entries = lastEntries.map((entry) => + entry.versionstamp === null ? { ...entry } : deserializeValue(entry) + ); + controller.enqueue(entries); + return; + } + }, + cancel() { + core.tryClose(rid); + }, + }); + } + close() { core.close(this.#rid); } |