summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorHeyang Zhou <zhy20000919@hotmail.com>2023-03-22 12:13:24 +0800
committerGitHub <noreply@github.com>2023-03-22 12:13:24 +0800
commit92ebf4afe5d55135b3ba39616bcb77106c07c597 (patch)
treef79fe65811c7449f5b50c093852eceaad228d39f /ext
parent8bcffff9dc517aa93dea2816b2a854f65d24eccc (diff)
feat(ext/kv): key-value store (#18232)
This commit adds unstable "Deno.openKv()" API that allows to open a key-value database at a specified path. --------- Co-authored-by: Luca Casonato <hello@lcas.dev> Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
Diffstat (limited to 'ext')
-rw-r--r--ext/kv/01_db.ts469
-rw-r--r--ext/kv/Cargo.toml24
-rw-r--r--ext/kv/codec.rs559
-rw-r--r--ext/kv/interface.rs294
-rw-r--r--ext/kv/lib.rs541
-rw-r--r--ext/kv/sqlite.rs348
-rw-r--r--ext/node/Cargo.toml2
7 files changed, 2236 insertions, 1 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
new file mode 100644
index 000000000..571a1b3cd
--- /dev/null
+++ b/ext/kv/01_db.ts
@@ -0,0 +1,469 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+// @ts-ignore internal api
+const {
+ ObjectGetPrototypeOf,
+ AsyncGeneratorPrototype,
+} = globalThis.__bootstrap.primordials;
+const core = Deno.core;
+const ops = core.ops;
+
+const encodeCursor: (
+ selector: [Deno.KvKey | null, Deno.KvKey | null, Deno.KvKey | null],
+ boundaryKey: Deno.KvKey,
+) => string = (selector, boundaryKey) =>
+ ops.op_kv_encode_cursor(selector, boundaryKey);
+
+async function openKv(path: string) {
+ const rid = await core.opAsync("op_kv_database_open", path);
+ return new Kv(rid);
+}
+
+interface RawKvEntry {
+ key: Deno.KvKey;
+ value: RawValue;
+ versionstamp: string;
+}
+
+type RawValue = {
+ kind: "v8";
+ value: Uint8Array;
+} | {
+ kind: "bytes";
+ value: Uint8Array;
+} | {
+ kind: "u64";
+ value: bigint;
+};
+
+class Kv {
+ #rid: number;
+
+ constructor(rid: number) {
+ this.#rid = rid;
+ }
+
+ atomic() {
+ return new AtomicOperation(this.#rid);
+ }
+
+ async get(key: Deno.KvKey, opts?: { consistency?: Deno.KvConsistencyLevel }) {
+ key = convertKey(key);
+ const [entries]: [RawKvEntry[]] = await core.opAsync(
+ "op_kv_snapshot_read",
+ this.#rid,
+ [[
+ null,
+ key,
+ null,
+ 1,
+ false,
+ null,
+ ]],
+ opts?.consistency ?? "strong",
+ );
+ if (!entries.length) {
+ return {
+ key,
+ value: null,
+ versionstamp: null,
+ };
+ }
+ return deserializeValue(entries[0]);
+ }
+
+ async set(key: Deno.KvKey, value: unknown) {
+ key = convertKey(key);
+ value = serializeValue(value);
+
+ const checks: Deno.AtomicCheck[] = [];
+ const mutations = [
+ [key, "set", value],
+ ];
+
+ const result = await core.opAsync(
+ "op_kv_atomic_write",
+ this.#rid,
+ checks,
+ mutations,
+ [],
+ );
+ if (!result) throw new TypeError("Failed to set value");
+ }
+
+ async delete(key: Deno.KvKey) {
+ key = convertKey(key);
+
+ const checks: Deno.AtomicCheck[] = [];
+ const mutations = [
+ [key, "delete", null],
+ ];
+
+ const result = await core.opAsync(
+ "op_kv_atomic_write",
+ this.#rid,
+ checks,
+ mutations,
+ [],
+ );
+ if (!result) throw new TypeError("Failed to set value");
+ }
+
+ list(
+ selector: Deno.KvListSelector,
+ options: {
+ limit?: number;
+ batchSize?: number;
+ cursor?: string;
+ reverse?: boolean;
+ consistency?: Deno.KvConsistencyLevel;
+ } = {},
+ ): KvListIterator {
+ if (options.limit !== undefined && options.limit <= 0) {
+ throw new Error("limit must be positive");
+ }
+
+ let batchSize = options.batchSize ?? (options.limit ?? 100);
+ if (batchSize <= 0) throw new Error("batchSize must be positive");
+ if (batchSize > 500) batchSize = 500;
+
+ return new KvListIterator({
+ limit: options.limit,
+ selector,
+ cursor: options.cursor,
+ reverse: options.reverse ?? false,
+ consistency: options.consistency ?? "strong",
+ batchSize,
+ pullBatch: this.#pullBatch(batchSize),
+ });
+ }
+
+ #pullBatch(batchSize: number): (
+ selector: Deno.KvListSelector,
+ cursor: string | undefined,
+ reverse: boolean,
+ consistency: Deno.KvConsistencyLevel,
+ ) => Promise<Deno.KvEntry[]> {
+ return async (selector, cursor, reverse, consistency) => {
+ const [entries]: [RawKvEntry[]] = await core.opAsync(
+ "op_kv_snapshot_read",
+ this.#rid,
+ [[
+ "prefix" in selector ? selector.prefix : null,
+ "start" in selector ? selector.start : null,
+ "end" in selector ? selector.end : null,
+ batchSize,
+ reverse,
+ cursor,
+ ]],
+ consistency,
+ );
+
+ return entries.map(deserializeValue);
+ };
+ }
+
+ close() {
+ core.close(this.#rid);
+ }
+}
+
+class AtomicOperation {
+ #rid: number;
+
+ #checks: [Deno.KvKey, string | null][] = [];
+ #mutations: [Deno.KvKey, string, RawValue | null][] = [];
+
+ constructor(rid: number) {
+ this.#rid = rid;
+ }
+
+ check(...checks: Deno.AtomicCheck[]): this {
+ for (const check of checks) {
+ this.#checks.push([convertKey(check.key), check.versionstamp]);
+ }
+ return this;
+ }
+
+ mutate(...mutations: Deno.KvMutation[]): this {
+ for (const mutation of mutations) {
+ const key = convertKey(mutation.key);
+ let type: string;
+ let value: RawValue | null;
+ switch (mutation.type) {
+ case "delete":
+ type = "delete";
+ if (mutation.value) {
+ throw new TypeError("invalid mutation 'delete' with value");
+ }
+ break;
+ case "set":
+ case "sum":
+ case "min":
+ case "max":
+ type = mutation.type;
+ if (!("value" in mutation)) {
+ throw new TypeError(`invalid mutation '${type}' without value`);
+ }
+ value = serializeValue(mutation.value);
+ break;
+ default:
+ throw new TypeError("Invalid mutation type");
+ }
+ this.#mutations.push([key, type, value]);
+ }
+ return this;
+ }
+
+ set(key: Deno.KvKey, value: unknown): this {
+ this.#mutations.push([convertKey(key), "set", serializeValue(value)]);
+ return this;
+ }
+
+ delete(key: Deno.KvKey): this {
+ this.#mutations.push([convertKey(key), "delete", null]);
+ return this;
+ }
+
+ async commit(): Promise<boolean> {
+ const result = await core.opAsync(
+ "op_kv_atomic_write",
+ this.#rid,
+ this.#checks,
+ this.#mutations,
+ [], // TODO(@losfair): enqueue
+ );
+ return result;
+ }
+
+ then() {
+ throw new TypeError(
+ "`Deno.AtomicOperation` is not a promise. Did you forget to call `commit()`?",
+ );
+ }
+}
+
+const MIN_U64 = 0n;
+const MAX_U64 = 0xffffffffffffffffn;
+
+class KvU64 {
+ readonly value: bigint;
+
+ constructor(value: bigint) {
+ if (typeof value !== "bigint") {
+ throw new TypeError("value must be a bigint");
+ }
+ if (value < MIN_U64) {
+ throw new RangeError("value must be a positive bigint");
+ }
+ if (value > MAX_U64) {
+ throw new RangeError("value must be a 64-bit unsigned integer");
+ }
+ this.value = value;
+ Object.freeze(this);
+ }
+}
+
+function convertKey(key: Deno.KvKey | Deno.KvKeyPart): Deno.KvKey {
+ if (Array.isArray(key)) {
+ return key;
+ } else {
+ return [key as Deno.KvKeyPart];
+ }
+}
+
+function deserializeValue(entry: RawKvEntry): Deno.KvEntry {
+ const { kind, value } = entry.value;
+ switch (kind) {
+ case "v8":
+ return {
+ ...entry,
+ value: core.deserialize(value),
+ };
+ case "bytes":
+ return {
+ ...entry,
+ value,
+ };
+ case "u64":
+ return {
+ ...entry,
+ value: new KvU64(value),
+ };
+ default:
+ throw new TypeError("Invalid value type");
+ }
+}
+
+function serializeValue(value: unknown): RawValue {
+ if (value instanceof Uint8Array) {
+ return {
+ kind: "bytes",
+ value,
+ };
+ } else if (value instanceof KvU64) {
+ return {
+ kind: "u64",
+ value: value.value,
+ };
+ } else {
+ return {
+ kind: "v8",
+ value: core.serialize(value),
+ };
+ }
+}
+
+// This gets the %AsyncIteratorPrototype% object (which exists but is not a
+// global). We extend the KvListIterator iterator from, so that we immediately
+// support async iterator helpers once they land. The %AsyncIterator% does not
+// yet actually exist however, so right now the AsyncIterator binding refers to
+// %Object%. I know.
+// Once AsyncIterator is a global, we can just use it (from primordials), rather
+// than doing this here.
+const AsyncIteratorPrototype = ObjectGetPrototypeOf(AsyncGeneratorPrototype);
+const AsyncIterator = AsyncIteratorPrototype.constructor;
+
+class KvListIterator extends AsyncIterator
+ implements AsyncIterator<Deno.KvEntry> {
+ #selector: Deno.KvListSelector;
+ #entries: Deno.KvEntry[] | null = null;
+ #cursorGen: (() => string) | null = null;
+ #done = false;
+ #lastBatch = false;
+ #pullBatch: (
+ selector: Deno.KvListSelector,
+ cursor: string | undefined,
+ reverse: boolean,
+ consistency: Deno.KvConsistencyLevel,
+ ) => Promise<Deno.KvEntry[]>;
+ #limit: number | undefined;
+ #count = 0;
+ #reverse: boolean;
+ #batchSize: number;
+ #consistency: Deno.KvConsistencyLevel;
+
+ constructor(
+ { limit, selector, cursor, reverse, consistency, batchSize, pullBatch }: {
+ limit?: number;
+ selector: Deno.KvListSelector;
+ cursor?: string;
+ reverse: boolean;
+ batchSize: number;
+ consistency: Deno.KvConsistencyLevel;
+ pullBatch: (
+ selector: Deno.KvListSelector,
+ cursor: string | undefined,
+ reverse: boolean,
+ consistency: Deno.KvConsistencyLevel,
+ ) => Promise<Deno.KvEntry[]>;
+ },
+ ) {
+ super();
+ let prefix: Deno.KvKey | undefined;
+ let start: Deno.KvKey | undefined;
+ let end: Deno.KvKey | undefined;
+ if ("prefix" in selector && selector.prefix !== undefined) {
+ prefix = Object.freeze([...selector.prefix]);
+ }
+ if ("start" in selector && selector.start !== undefined) {
+ start = Object.freeze([...selector.start]);
+ }
+ if ("end" in selector && selector.end !== undefined) {
+ end = Object.freeze([...selector.end]);
+ }
+ if (prefix) {
+ if (start && end) {
+ throw new TypeError(
+ "Selector can not specify both 'start' and 'end' key when specifying 'prefix'.",
+ );
+ }
+ if (start) {
+ this.#selector = { prefix, start };
+ } else if (end) {
+ this.#selector = { prefix, end };
+ } else {
+ this.#selector = { prefix };
+ }
+ } else {
+ if (start && end) {
+ this.#selector = { start, end };
+ } else {
+ throw new TypeError(
+ "Selector must specify either 'prefix' or both 'start' and 'end' key.",
+ );
+ }
+ }
+ Object.freeze(this.#selector);
+ this.#pullBatch = pullBatch;
+ this.#limit = limit;
+ this.#reverse = reverse;
+ this.#consistency = consistency;
+ this.#batchSize = batchSize;
+ this.#cursorGen = cursor ? () => cursor : null;
+ }
+
+ get cursor(): string {
+ if (this.#cursorGen === null) {
+ throw new Error("Cannot get cursor before first iteration");
+ }
+
+ return this.#cursorGen();
+ }
+
+ async next(): Promise<IteratorResult<Deno.KvEntry>> {
+ // Fused or limit exceeded
+ if (
+ this.#done ||
+ (this.#limit !== undefined && this.#count >= this.#limit)
+ ) {
+ return { done: true, value: undefined };
+ }
+
+ // Attempt to fill the buffer
+ if (!this.#entries?.length && !this.#lastBatch) {
+ const batch = await this.#pullBatch(
+ this.#selector,
+ this.#cursorGen ? this.#cursorGen() : undefined,
+ this.#reverse,
+ this.#consistency,
+ );
+
+ // Reverse the batch so we can pop from the end
+ batch.reverse();
+ this.#entries = batch;
+
+ // Last batch, do not attempt to pull more
+ if (batch.length < this.#batchSize) {
+ this.#lastBatch = true;
+ }
+ }
+
+ const entry = this.#entries?.pop();
+ if (!entry) {
+ this.#done = true;
+ this.#cursorGen = () => "";
+ return { done: true, value: undefined };
+ }
+
+ this.#cursorGen = () => {
+ const selector = this.#selector;
+ return encodeCursor([
+ "prefix" in selector ? selector.prefix : null,
+ "start" in selector ? selector.start : null,
+ "end" in selector ? selector.end : null,
+ ], entry.key);
+ };
+ this.#count++;
+ return {
+ done: false,
+ value: entry,
+ };
+ }
+
+ [Symbol.asyncIterator](): AsyncIterator<Deno.KvEntry> {
+ return this;
+ }
+}
+
+export { Kv, KvListIterator, KvU64, openKv };
diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml
new file mode 100644
index 000000000..cd18adc6c
--- /dev/null
+++ b/ext/kv/Cargo.toml
@@ -0,0 +1,24 @@
+# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+[package]
+name = "deno_kv"
+version = "0.1.0"
+authors.workspace = true
+edition.workspace = true
+license.workspace = true
+readme = "README.md"
+repository.workspace = true
+description = "Implementation of the Deno database API"
+
+[lib]
+path = "lib.rs"
+
+[dependencies]
+anyhow.workspace = true
+async-trait.workspace = true
+base64.workspace = true
+deno_core.workspace = true
+hex.workspace = true
+num-bigint.workspace = true
+rusqlite.workspace = true
+serde.workspace = true
diff --git a/ext/kv/codec.rs b/ext/kv/codec.rs
new file mode 100644
index 000000000..b2acfdbc2
--- /dev/null
+++ b/ext/kv/codec.rs
@@ -0,0 +1,559 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/main/foundationdb/src/tuple/pack.rs
+
+use crate::Key;
+use crate::KeyPart;
+
+//const NIL: u8 = 0x00;
+const BYTES: u8 = 0x01;
+const STRING: u8 = 0x02;
+//const NESTED: u8 = 0x05;
+const NEGINTSTART: u8 = 0x0b;
+const INTZERO: u8 = 0x14;
+const POSINTEND: u8 = 0x1d;
+//const FLOAT: u8 = 0x20;
+const DOUBLE: u8 = 0x21;
+const FALSE: u8 = 0x26;
+const TRUE: u8 = 0x27;
+
+const ESCAPE: u8 = 0xff;
+
+const CANONICAL_NAN_POS: u64 = 0x7ff8000000000000u64;
+const CANONICAL_NAN_NEG: u64 = 0xfff8000000000000u64;
+
+pub fn canonicalize_f64(n: f64) -> f64 {
+ if n.is_nan() {
+ if n.is_sign_negative() {
+ f64::from_bits(CANONICAL_NAN_NEG)
+ } else {
+ f64::from_bits(CANONICAL_NAN_POS)
+ }
+ } else {
+ n
+ }
+}
+
+pub fn encode_key(key: &Key) -> std::io::Result<Vec<u8>> {
+ // Disallow empty key
+ if key.0.is_empty() {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ "key should not be empty",
+ ));
+ }
+
+ let mut output: Vec<u8> = vec![];
+ for part in &key.0 {
+ match part {
+ KeyPart::String(key) => {
+ output.push(STRING);
+ escape_raw_bytes_into(&mut output, key.as_bytes());
+ output.push(0);
+ }
+ KeyPart::Int(key) => {
+ bigint::encode_into(&mut output, key)?;
+ }
+ KeyPart::Float(key) => {
+ double::encode_into(&mut output, *key);
+ }
+ KeyPart::Bytes(key) => {
+ output.push(BYTES);
+ escape_raw_bytes_into(&mut output, key);
+ output.push(0);
+ }
+ KeyPart::False => {
+ output.push(FALSE);
+ }
+ KeyPart::True => {
+ output.push(TRUE);
+ }
+ }
+ }
+ Ok(output)
+}
+
+pub fn decode_key(mut bytes: &[u8]) -> std::io::Result<Key> {
+ // Disallow empty key
+ if bytes.is_empty() {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ "key should not be empty",
+ ));
+ }
+
+ let mut key = Key(vec![]);
+ while !bytes.is_empty() {
+ let tag = bytes[0];
+ bytes = &bytes[1..];
+
+ let next_bytes = match tag {
+ self::STRING => {
+ let (next_bytes, data) = parse_slice(bytes)?;
+ let data = String::from_utf8(data).map_err(|_| {
+ std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid utf8")
+ })?;
+ key.0.push(KeyPart::String(data));
+ next_bytes
+ }
+ self::NEGINTSTART..=self::POSINTEND => {
+ let (next_bytes, data) = bigint::decode_from(bytes, tag)?;
+ key.0.push(KeyPart::Int(data));
+ next_bytes
+ }
+ self::DOUBLE => {
+ let (next_bytes, data) = double::decode_from(bytes)?;
+ key.0.push(KeyPart::Float(data));
+ next_bytes
+ }
+ self::BYTES => {
+ let (next_bytes, data) = parse_slice(bytes)?;
+ key.0.push(KeyPart::Bytes(data));
+ next_bytes
+ }
+ self::FALSE => {
+ key.0.push(KeyPart::False);
+ bytes
+ }
+ self::TRUE => {
+ key.0.push(KeyPart::True);
+ bytes
+ }
+ _ => {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "invalid tag",
+ ))
+ }
+ };
+
+ bytes = next_bytes;
+ }
+ Ok(key)
+}
+
+fn escape_raw_bytes_into(out: &mut Vec<u8>, x: &[u8]) {
+ for &b in x {
+ out.push(b);
+ if b == 0 {
+ out.push(ESCAPE);
+ }
+ }
+}
+
+mod bigint {
+ use num_bigint::BigInt;
+ use num_bigint::Sign;
+
+ use super::parse_byte;
+ use super::parse_bytes;
+ const MAX_SZ: usize = 8;
+
+ // Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/7415e116d5d96c2630976058de28e439eed7e809/foundationdb/src/tuple/pack.rs#L575
+ pub fn encode_into(out: &mut Vec<u8>, key: &BigInt) -> std::io::Result<()> {
+ if key.sign() == Sign::NoSign {
+ out.push(super::INTZERO);
+ return Ok(());
+ }
+ let (sign, mut bytes) = key.to_bytes_be();
+ let n = bytes.len();
+ match sign {
+ Sign::Minus => {
+ if n <= MAX_SZ {
+ out.push(super::INTZERO - n as u8);
+ } else {
+ out.extend_from_slice(&[super::NEGINTSTART, bigint_n(n)? ^ 0xff]);
+ }
+ invert(&mut bytes);
+ out.extend_from_slice(&bytes);
+ }
+ Sign::NoSign => unreachable!(),
+ Sign::Plus => {
+ if n <= MAX_SZ {
+ out.push(super::INTZERO + n as u8);
+ } else {
+ out.extend_from_slice(&[super::POSINTEND, bigint_n(n)?]);
+ }
+ out.extend_from_slice(&bytes);
+ }
+ }
+ Ok(())
+ }
+
+ pub fn decode_from(
+ input: &[u8],
+ tag: u8,
+ ) -> std::io::Result<(&[u8], BigInt)> {
+ if super::INTZERO <= tag && tag <= super::INTZERO + MAX_SZ as u8 {
+ let n = (tag - super::INTZERO) as usize;
+ let (input, bytes) = parse_bytes(input, n)?;
+ Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes)))
+ } else if super::INTZERO - MAX_SZ as u8 <= tag && tag < super::INTZERO {
+ let n = (super::INTZERO - tag) as usize;
+ let (input, bytes) = parse_bytes(input, n)?;
+ Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes))))
+ } else if tag == super::NEGINTSTART {
+ let (input, raw_length) = parse_byte(input)?;
+ let n = usize::from(raw_length ^ 0xff);
+ let (input, bytes) = parse_bytes(input, n)?;
+ Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes))))
+ } else if tag == super::POSINTEND {
+ let (input, raw_length) = parse_byte(input)?;
+ let n: usize = usize::from(raw_length);
+ let (input, bytes) = parse_bytes(input, n)?;
+ Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes)))
+ } else {
+ Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ format!("unknown bigint tag: {}", tag),
+ ))
+ }
+ }
+
+ fn invert(bytes: &mut [u8]) {
+ // The ones' complement of a binary number is defined as the value
+ // obtained by inverting all the bits in the binary representation
+ // of the number (swapping 0s for 1s and vice versa).
+ for byte in bytes.iter_mut() {
+ *byte = !*byte;
+ }
+ }
+
+ fn inverted(bytes: &[u8]) -> Vec<u8> {
+ // The ones' complement of a binary number is defined as the value
+ // obtained by inverting all the bits in the binary representation
+ // of the number (swapping 0s for 1s and vice versa).
+ bytes.iter().map(|byte| !*byte).collect()
+ }
+
+ fn bigint_n(n: usize) -> std::io::Result<u8> {
+ u8::try_from(n).map_err(|_| {
+ std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ "BigUint requires more than 255 bytes to be represented",
+ )
+ })
+ }
+}
+
+mod double {
+ macro_rules! sign_bit {
+ ($type:ident) => {
+ (1 << (std::mem::size_of::<$type>() * 8 - 1))
+ };
+ }
+
+ fn f64_to_ux_be_bytes(f: f64) -> [u8; 8] {
+ let u = if f.is_sign_negative() {
+ f.to_bits() ^ ::std::u64::MAX
+ } else {
+ f.to_bits() ^ sign_bit!(u64)
+ };
+ u.to_be_bytes()
+ }
+
+ pub fn encode_into(out: &mut Vec<u8>, x: f64) {
+ out.push(super::DOUBLE);
+ out.extend_from_slice(&f64_to_ux_be_bytes(super::canonicalize_f64(x)));
+ }
+
+ pub fn decode_from(input: &[u8]) -> std::io::Result<(&[u8], f64)> {
+ let (input, bytes) = super::parse_bytes(input, 8)?;
+ let mut arr = [0u8; 8];
+ arr.copy_from_slice(bytes);
+ let u = u64::from_be_bytes(arr);
+ Ok((
+ input,
+ f64::from_bits(if (u & sign_bit!(u64)) == 0 {
+ u ^ ::std::u64::MAX
+ } else {
+ u ^ sign_bit!(u64)
+ }),
+ ))
+ }
+}
+
+#[inline]
+fn parse_bytes(input: &[u8], num: usize) -> std::io::Result<(&[u8], &[u8])> {
+ if input.len() < num {
+ Err(std::io::ErrorKind::UnexpectedEof.into())
+ } else {
+ Ok((&input[num..], &input[..num]))
+ }
+}
+
+#[inline]
+fn parse_byte(input: &[u8]) -> std::io::Result<(&[u8], u8)> {
+ if input.is_empty() {
+ Err(std::io::ErrorKind::UnexpectedEof.into())
+ } else {
+ Ok((&input[1..], input[0]))
+ }
+}
+
+fn parse_slice(input: &[u8]) -> std::io::Result<(&[u8], Vec<u8>)> {
+ let mut output: Vec<u8> = Vec::new();
+ let mut i = 0usize;
+
+ while i < input.len() {
+ let byte = input[i];
+ i += 1;
+
+ if byte == 0 {
+ if input.get(i).copied() == Some(ESCAPE) {
+ output.push(0);
+ i += 1;
+ continue;
+ } else {
+ return Ok((&input[i..], output));
+ }
+ }
+
+ output.push(byte);
+ }
+
+ Err(std::io::ErrorKind::UnexpectedEof.into())
+}
+
+#[cfg(test)]
+mod tests {
+ use num_bigint::BigInt;
+ use std::cmp::Ordering;
+
+ use crate::Key;
+ use crate::KeyPart;
+
+ use super::decode_key;
+ use super::encode_key;
+
+ fn roundtrip(key: Key) {
+ let bytes = encode_key(&key).unwrap();
+ let decoded = decode_key(&bytes).unwrap();
+ assert_eq!(&key, &decoded);
+ assert_eq!(format!("{:?}", key), format!("{:?}", decoded));
+ }
+
+ fn check_order(a: Key, b: Key, expected: Ordering) {
+ let a_bytes = encode_key(&a).unwrap();
+ let b_bytes = encode_key(&b).unwrap();
+
+ assert_eq!(a.cmp(&b), expected);
+ assert_eq!(a_bytes.cmp(&b_bytes), expected);
+ }
+
+ fn check_bijection(key: Key, serialized: &[u8]) {
+ let bytes = encode_key(&key).unwrap();
+ assert_eq!(&bytes[..], serialized);
+ let decoded = decode_key(serialized).unwrap();
+ assert_eq!(&key, &decoded);
+ }
+
+ #[test]
+ fn simple_roundtrip() {
+ roundtrip(Key(vec![
+ KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00]),
+ KeyPart::String("foo".to_string()),
+ KeyPart::Float(-f64::NAN),
+ KeyPart::Float(-f64::INFINITY),
+ KeyPart::Float(-42.1),
+ KeyPart::Float(-0.0),
+ KeyPart::Float(0.0),
+ KeyPart::Float(42.1),
+ KeyPart::Float(f64::INFINITY),
+ KeyPart::Float(f64::NAN),
+ KeyPart::Int(BigInt::from(-10000)),
+ KeyPart::Int(BigInt::from(-1)),
+ KeyPart::Int(BigInt::from(0)),
+ KeyPart::Int(BigInt::from(1)),
+ KeyPart::Int(BigInt::from(10000)),
+ KeyPart::False,
+ KeyPart::True,
+ ]));
+ }
+
+ #[test]
+ #[rustfmt::skip]
+ fn order_bytes() {
+ check_order(
+ Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
+ Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
+ Ordering::Equal,
+ );
+
+ check_order(
+ Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
+ Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]),
+ Ordering::Less,
+ );
+
+ check_order(
+ Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]),
+ Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
+ Ordering::Greater,
+ );
+
+ check_order(
+ Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
+ Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]),
+ Ordering::Less,
+ );
+
+ check_order(
+ Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]),
+ Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
+ Ordering::Greater,
+ );
+ }
+
+ #[test]
+ #[rustfmt::skip]
+ fn order_tags() {
+ check_order(
+ Key(vec![KeyPart::Bytes(vec![])]),
+ Key(vec![KeyPart::String("".into())]),
+ Ordering::Less,
+ );
+
+ check_order(
+ Key(vec![KeyPart::String("".into())]),
+ Key(vec![KeyPart::Int(BigInt::from(0))]),
+ Ordering::Less,
+ );
+
+ check_order(
+ Key(vec![KeyPart::Int(BigInt::from(0))]),
+ Key(vec![KeyPart::Float(0.0)]),
+ Ordering::Less,
+ );
+
+ check_order(
+ Key(vec![KeyPart::Float(0.0)]),
+ Key(vec![KeyPart::False]),
+ Ordering::Less,
+ );
+
+ check_order(
+ Key(vec![KeyPart::False]),
+ Key(vec![KeyPart::True]),
+ Ordering::Less,
+ );
+
+ check_order(
+ Key(vec![KeyPart::True]),
+ Key(vec![KeyPart::Bytes(vec![])]),
+ Ordering::Greater,
+ );
+ }
+
+ #[test]
+ #[rustfmt::skip]
+ fn order_floats() {
+ check_order(
+ Key(vec![KeyPart::Float(-f64::NAN)]),
+ Key(vec![KeyPart::Float(-f64::INFINITY)]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Float(-f64::INFINITY)]),
+ Key(vec![KeyPart::Float(-10.0)]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Float(-10.0)]),
+ Key(vec![KeyPart::Float(-0.0)]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Float(-0.0)]),
+ Key(vec![KeyPart::Float(0.0)]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Float(0.0)]),
+ Key(vec![KeyPart::Float(10.0)]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Float(10.0)]),
+ Key(vec![KeyPart::Float(f64::INFINITY)]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Float(f64::INFINITY)]),
+ Key(vec![KeyPart::Float(f64::NAN)]),
+ Ordering::Less,
+ );
+ }
+
+ #[test]
+ #[rustfmt::skip]
+ fn order_ints() {
+ check_order(
+ Key(vec![KeyPart::Int(BigInt::from(-10000))]),
+ Key(vec![KeyPart::Int(BigInt::from(-100))]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Int(BigInt::from(-100))]),
+ Key(vec![KeyPart::Int(BigInt::from(-1))]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Int(BigInt::from(-1))]),
+ Key(vec![KeyPart::Int(BigInt::from(0))]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Int(BigInt::from(0))]),
+ Key(vec![KeyPart::Int(BigInt::from(1))]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Int(BigInt::from(1))]),
+ Key(vec![KeyPart::Int(BigInt::from(100))]),
+ Ordering::Less,
+ );
+ check_order(
+ Key(vec![KeyPart::Int(BigInt::from(100))]),
+ Key(vec![KeyPart::Int(BigInt::from(10000))]),
+ Ordering::Less,
+ );
+ }
+
+ #[test]
+ #[rustfmt::skip]
+ fn float_canonicalization() {
+ let key1 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000001))]);
+ let key2 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000002))]);
+
+ assert_eq!(key1, key2);
+ assert_eq!(encode_key(&key1).unwrap(), encode_key(&key2).unwrap());
+ }
+
+ #[test]
+ #[rustfmt::skip]
+ fn explicit_bijection() {
+ // string
+ check_bijection(
+ Key(vec![KeyPart::String("hello".into())]),
+ &[0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00],
+ );
+
+ // zero byte escape
+ check_bijection(
+ Key(vec![KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08])]),
+ &[0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00],
+ );
+
+ // array
+ check_bijection(
+ Key(vec![
+ KeyPart::String("hello".into()),
+ KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08]),
+ ]),
+ &[
+ 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, /* string */
+ 0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00, /* bytes */
+ ],
+ );
+ }
+}
diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs
new file mode 100644
index 000000000..ee27522d1
--- /dev/null
+++ b/ext/kv/interface.rs
@@ -0,0 +1,294 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::cell::RefCell;
+use std::cmp::Ordering;
+use std::num::NonZeroU32;
+use std::rc::Rc;
+
+use async_trait::async_trait;
+use deno_core::error::AnyError;
+use deno_core::OpState;
+use num_bigint::BigInt;
+
+use crate::codec::canonicalize_f64;
+
+#[async_trait(?Send)]
+pub trait DatabaseHandler {
+ type DB: Database + 'static;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError>;
+}
+
+#[async_trait(?Send)]
+pub trait Database {
+ async fn snapshot_read(
+ &self,
+ requests: Vec<ReadRange>,
+ options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError>;
+
+ async fn atomic_write(&self, write: AtomicWrite) -> Result<bool, AnyError>;
+}
+
+/// Options for a snapshot read.
+pub struct SnapshotReadOptions {
+ pub consistency: Consistency,
+}
+
+/// The consistency of a read.
+#[derive(Eq, PartialEq, Copy, Clone, Debug)]
+pub enum Consistency {
+ Strong,
+ Eventual,
+}
+
+/// A key is for a KV pair. It is a vector of KeyParts.
+///
+/// The ordering of the keys is defined by the ordering of the KeyParts. The
+/// first KeyPart is the most significant, and the last KeyPart is the least
+/// significant.
+#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)]
+pub struct Key(pub Vec<KeyPart>);
+
+/// A key part is single item in a key. It can be a boolean, a double float, a
+/// variable precision signed integer, a UTF-8 string, or an arbitrary byte
+/// array.
+///
+/// The ordering of a KeyPart is dependent on the type of the KeyPart.
+///
+/// Between different types, the ordering is as follows: arbitrary byte array <
+/// UTF-8 string < variable precision signed integer < double float < false < true.
+///
+/// Within a type, the ordering is as follows:
+/// - For a **boolean**, false is less than true.
+/// - For a **double float**, the ordering must follow -NaN < -Infinity < -100.0 < -1.0 < -0.5 < -0.0 < 0.0 < 0.5 < 1.0 < 100.0 < Infinity < NaN.
+/// - For a **variable precision signed integer**, the ordering must follow mathematical ordering.
+/// - For a **UTF-8 string**, the ordering must follow the UTF-8 byte ordering.
+/// - For an **arbitrary byte array**, the ordering must follow the byte ordering.
+///
+/// This means that the key part `1.0` is less than the key part `2.0`, but is
+/// greater than the key part `0n`, because `1.0` is a double float and `0n`
+/// is a variable precision signed integer, and the ordering types obviously has
+/// precedence over the ordering within a type.
+#[derive(Clone, Debug)]
+pub enum KeyPart {
+ Bytes(Vec<u8>),
+ String(String),
+ Int(BigInt),
+ Float(f64),
+ False,
+ True,
+}
+
+impl KeyPart {
+ fn tag_ordering(&self) -> u8 {
+ match self {
+ KeyPart::Bytes(_) => 0,
+ KeyPart::String(_) => 1,
+ KeyPart::Int(_) => 2,
+ KeyPart::Float(_) => 3,
+ KeyPart::False => 4,
+ KeyPart::True => 5,
+ }
+ }
+}
+
+impl Eq for KeyPart {}
+
+impl PartialEq for KeyPart {
+ fn eq(&self, other: &Self) -> bool {
+ self.cmp(other) == Ordering::Equal
+ }
+}
+
+impl Ord for KeyPart {
+ fn cmp(&self, other: &Self) -> Ordering {
+ match (self, other) {
+ (KeyPart::Bytes(b1), KeyPart::Bytes(b2)) => b1.cmp(b2),
+ (KeyPart::String(s1), KeyPart::String(s2)) => {
+ s1.as_bytes().cmp(s2.as_bytes())
+ }
+ (KeyPart::Int(i1), KeyPart::Int(i2)) => i1.cmp(i2),
+ (KeyPart::Float(f1), KeyPart::Float(f2)) => {
+ canonicalize_f64(*f1).total_cmp(&canonicalize_f64(*f2))
+ }
+ _ => self.tag_ordering().cmp(&other.tag_ordering()),
+ }
+ }
+}
+
+impl PartialOrd for KeyPart {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+/// A request to read a range of keys from the database. If `end` is `None`,
+/// then the range is from `start` shall also be used as the end of the range.
+///
+/// The range is inclusive of the start and exclusive of the end. The start may
+/// not be greater than the end.
+///
+/// The range is limited to `limit` number of entries.
+pub struct ReadRange {
+ pub start: Vec<u8>,
+ pub end: Vec<u8>,
+ pub limit: NonZeroU32,
+ pub reverse: bool,
+}
+
+/// A response to a `ReadRange` request.
+pub struct ReadRangeOutput {
+ pub entries: Vec<KvEntry>,
+}
+
+/// A versionstamp is a 10 byte array that is used to represent the version of
+/// a key in the database.
+type Versionstamp = [u8; 10];
+
+/// A key-value entry with a versionstamp.
+pub struct KvEntry {
+ pub key: Vec<u8>,
+ pub value: Value,
+ pub versionstamp: Versionstamp,
+}
+
+/// A serialized value for a KV pair as stored in the database. All values
+/// **can** be serialized into the V8 representation, but not all values are.
+///
+/// The V8 representation is an opaque byte array that is only meaningful to
+/// the V8 engine. It is guaranteed to be backwards compatible. Because this
+/// representation is opaque, it is not possible to inspect or modify the value
+/// without deserializing it.
+///
+/// The inability to inspect or modify the value without deserializing it means
+/// that these values can not be quickly modified when performing atomic
+/// read-modify-write operations on the database (because the database may not
+/// have the ability to deserialize the V8 value into a modifiable value).
+///
+/// Because of this constraint, there are more specialized representations for
+/// certain types of values that can be used in atomic read-modify-write
+/// operations. These specialized representations are:
+///
+/// - **Bytes**: an arbitrary byte array.
+/// - **U64**: a 64-bit unsigned integer.
+pub enum Value {
+ V8(Vec<u8>),
+ Bytes(Vec<u8>),
+ U64(u64),
+}
+
+/// A request to perform an atomic check-modify-write operation on the database.
+///
+/// The operation is performed atomically, meaning that the operation will
+/// either succeed or fail. If the operation fails, then the database will be
+/// left in the same state as before the operation was attempted. If the
+/// operation succeeds, then the database will be left in a new state.
+///
+/// The operation is performed by first checking the database for the current
+/// state of the keys, defined by the `checks` field. If the current state of
+/// the keys does not match the expected state, then the operation fails. If
+/// the current state of the keys matches the expected state, then the
+/// mutations are applied to the database.
+///
+/// All checks and mutations are performed atomically.
+///
+/// The mutations are performed in the order that they are specified in the
+/// `mutations` field. The order of checks is not specified, and is also not
+/// important because this ordering is un-observable.
+pub struct AtomicWrite {
+ pub checks: Vec<KvCheck>,
+ pub mutations: Vec<KvMutation>,
+ pub enqueues: Vec<Enqueue>,
+}
+
+/// A request to perform a check on a key in the database. The check is not
+/// performed on the value of the key, but rather on the versionstamp of the
+/// key.
+pub struct KvCheck {
+ pub key: Vec<u8>,
+ pub versionstamp: Option<Versionstamp>,
+}
+
+/// A request to perform a mutation on a key in the database. The mutation is
+/// performed on the value of the key.
+///
+/// The type of mutation is specified by the `kind` field. The action performed
+/// by each mutation kind is specified in the docs for [MutationKind].
+pub struct KvMutation {
+ pub key: Vec<u8>,
+ pub kind: MutationKind,
+}
+
+/// A request to enqueue a message to the database. This message is delivered
+/// to a listener of the queue at least once.
+///
+/// ## Retry
+///
+/// When the delivery of a message fails, it is retried for a finite number
+/// of times. Each retry happens after a backoff period. The backoff periods
+/// are specified by the `backoff_schedule` field in milliseconds. If
+/// unspecified, the default backoff schedule of the platform (CLI or Deploy)
+/// is used.
+///
+/// If all retry attempts failed, the message is written to the KV under all
+/// keys specified in `keys_if_undelivered`.
+pub struct Enqueue {
+ pub payload: Vec<u8>,
+ pub deadline_ms: u64,
+ pub keys_if_undelivered: Vec<Vec<u8>>,
+ pub backoff_schedule: Option<Vec<u32>>,
+}
+
+/// The type of mutation to perform on a key in the database.
+///
+/// ## Set
+///
+/// The set mutation sets the value of the key to the specified value. It
+/// discards the previous value of the key, if any.
+///
+/// This operand supports all [Value] types.
+///
+/// ## Delete
+///
+/// The delete mutation deletes the value of the key.
+///
+/// ## Sum
+///
+/// The sum mutation adds the specified value to the existing value of the key.
+///
+/// This operand supports only value types [Value::U64]. The existing value in
+/// the database must match the type of the value specified in the mutation. If
+/// the key does not exist in the database, then the value specified in the
+/// mutation is used as the new value of the key.
+///
+/// ## Min
+///
+/// The min mutation sets the value of the key to the minimum of the existing
+/// value of the key and the specified value.
+///
+/// This operand supports only value types [Value::U64]. The existing value in
+/// the database must match the type of the value specified in the mutation. If
+/// the key does not exist in the database, then the value specified in the
+/// mutation is used as the new value of the key.
+///
+/// ## Max
+///
+/// The max mutation sets the value of the key to the maximum of the existing
+/// value of the key and the specified value.
+///
+/// This operand supports only value types [Value::U64]. The existing value in
+/// the database must match the type of the value specified in the mutation. If
+/// the key does not exist in the database, then the value specified in the
+/// mutation is used as the new value of the key.
+pub enum MutationKind {
+ Set(Value),
+ Delete,
+ Sum(Value),
+ Min(Value),
+ Max(Value),
+}
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
new file mode 100644
index 000000000..49a59af74
--- /dev/null
+++ b/ext/kv/lib.rs
@@ -0,0 +1,541 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+pub mod codec;
+mod interface;
+pub mod sqlite;
+
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::num::NonZeroU32;
+use std::rc::Rc;
+
+use codec::decode_key;
+use codec::encode_key;
+use deno_core::anyhow::Context;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op;
+use deno_core::serde_v8::AnyValue;
+use deno_core::serde_v8::BigInt;
+use deno_core::ByteString;
+use deno_core::OpState;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use serde::Serialize;
+
+pub use crate::interface::*;
+
+struct UnstableChecker {
+ pub unstable: bool,
+}
+
+impl UnstableChecker {
+ // NOTE(bartlomieju): keep in sync with `cli/program_state.rs`
+ pub fn check_unstable(&self, api_name: &str) {
+ if !self.unstable {
+ eprintln!(
+ "Unstable API '{api_name}'. The --unstable flag must be provided."
+ );
+ std::process::exit(70);
+ }
+ }
+}
+
+deno_core::extension!(deno_kv,
+ // TODO(bartlomieju): specify deps
+ deps = [ ],
+ parameters = [ DBH: DatabaseHandler ],
+ ops = [
+ op_kv_database_open<DBH>,
+ op_kv_snapshot_read<DBH>,
+ op_kv_atomic_write<DBH>,
+ op_kv_encode_cursor,
+ ],
+ esm = [ "01_db.ts" ],
+ options = {
+ handler: DBH,
+ unstable: bool,
+ },
+ state = |state, options| {
+ state.put(Rc::new(options.handler));
+ state.put(UnstableChecker { unstable: options.unstable })
+ }
+);
+
+struct DatabaseResource<DB: Database + 'static> {
+ db: Rc<DB>,
+}
+
+impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
+ fn name(&self) -> Cow<str> {
+ "database".into()
+ }
+}
+
+#[op]
+async fn op_kv_database_open<DBH>(
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+) -> Result<ResourceId, AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let handler = {
+ let state = state.borrow();
+ state
+ .borrow::<UnstableChecker>()
+ .check_unstable("Deno.openKv");
+ state.borrow::<Rc<DBH>>().clone()
+ };
+ let db = handler.open(state.clone(), path).await?;
+ let rid = state
+ .borrow_mut()
+ .resource_table
+ .add(DatabaseResource { db: Rc::new(db) });
+ Ok(rid)
+}
+
+type KvKey = Vec<AnyValue>;
+
+impl From<AnyValue> for KeyPart {
+ fn from(value: AnyValue) -> Self {
+ match value {
+ AnyValue::Bool(false) => KeyPart::True,
+ AnyValue::Bool(true) => KeyPart::False,
+ AnyValue::Number(n) => KeyPart::Float(n),
+ AnyValue::BigInt(n) => KeyPart::Int(n),
+ AnyValue::String(s) => KeyPart::String(s),
+ AnyValue::Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
+ }
+ }
+}
+
+impl From<KeyPart> for AnyValue {
+ fn from(value: KeyPart) -> Self {
+ match value {
+ KeyPart::True => AnyValue::Bool(false),
+ KeyPart::False => AnyValue::Bool(true),
+ KeyPart::Float(n) => AnyValue::Number(n),
+ KeyPart::Int(n) => AnyValue::BigInt(n),
+ KeyPart::String(s) => AnyValue::String(s),
+ KeyPart::Bytes(buf) => AnyValue::Buffer(buf.into()),
+ }
+ }
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
+enum V8Value {
+ V8(ZeroCopyBuf),
+ Bytes(ZeroCopyBuf),
+ U64(BigInt),
+}
+
+impl TryFrom<V8Value> for Value {
+ type Error = AnyError;
+ fn try_from(value: V8Value) -> Result<Self, AnyError> {
+ Ok(match value {
+ V8Value::V8(buf) => Value::V8(buf.to_vec()),
+ V8Value::Bytes(buf) => Value::Bytes(buf.to_vec()),
+ V8Value::U64(n) => Value::U64(num_bigint::BigInt::from(n).try_into()?),
+ })
+ }
+}
+
+impl From<Value> for V8Value {
+ fn from(value: Value) -> Self {
+ match value {
+ Value::V8(buf) => V8Value::V8(buf.into()),
+ Value::Bytes(buf) => V8Value::Bytes(buf.into()),
+ Value::U64(n) => V8Value::U64(num_bigint::BigInt::from(n).into()),
+ }
+ }
+}
+
+#[derive(Deserialize, Serialize)]
+struct V8KvEntry {
+ key: KvKey,
+ value: V8Value,
+ versionstamp: ByteString,
+}
+
+impl TryFrom<KvEntry> for V8KvEntry {
+ type Error = AnyError;
+ fn try_from(entry: KvEntry) -> Result<Self, AnyError> {
+ Ok(V8KvEntry {
+ key: decode_key(&entry.key)?
+ .0
+ .into_iter()
+ .map(Into::into)
+ .collect(),
+ value: entry.value.into(),
+ versionstamp: hex::encode(entry.versionstamp).into(),
+ })
+ }
+}
+
+#[derive(Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+enum V8Consistency {
+ Strong,
+ Eventual,
+}
+
+impl From<V8Consistency> for Consistency {
+ fn from(value: V8Consistency) -> Self {
+ match value {
+ V8Consistency::Strong => Consistency::Strong,
+ V8Consistency::Eventual => Consistency::Eventual,
+ }
+ }
+}
+
+// (prefix, start, end, limit, reverse, cursor)
+type SnapshotReadRange = (
+ Option<KvKey>,
+ Option<KvKey>,
+ Option<KvKey>,
+ u32,
+ bool,
+ Option<ByteString>,
+);
+
+#[op]
+async fn op_kv_snapshot_read<DBH>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ ranges: Vec<SnapshotReadRange>,
+ consistency: V8Consistency,
+) -> Result<Vec<Vec<V8KvEntry>>, AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let db = {
+ let state = state.borrow();
+ let resource =
+ state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+ resource.db.clone()
+ };
+ let read_ranges = ranges
+ .into_iter()
+ .map(|(prefix, start, end, limit, reverse, cursor)| {
+ let selector = RawSelector::from_tuple(prefix, start, end)?;
+
+ let (start, end) =
+ decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?;
+ Ok(ReadRange {
+ start,
+ end,
+ limit: NonZeroU32::new(limit)
+ .with_context(|| "limit must be greater than 0")?,
+ reverse,
+ })
+ })
+ .collect::<Result<Vec<_>, AnyError>>()?;
+ let opts = SnapshotReadOptions {
+ consistency: consistency.into(),
+ };
+ let output_ranges = db.snapshot_read(read_ranges, opts).await?;
+ let output_ranges = output_ranges
+ .into_iter()
+ .map(|x| {
+ x.entries
+ .into_iter()
+ .map(TryInto::try_into)
+ .collect::<Result<Vec<_>, AnyError>>()
+ })
+ .collect::<Result<Vec<_>, AnyError>>()?;
+ Ok(output_ranges)
+}
+
+type V8KvCheck = (KvKey, Option<ByteString>);
+
+impl TryFrom<V8KvCheck> for KvCheck {
+ type Error = AnyError;
+ fn try_from(value: V8KvCheck) -> Result<Self, AnyError> {
+ let versionstamp = match value.1 {
+ Some(data) => {
+ let mut out = [0u8; 10];
+ hex::decode_to_slice(data, &mut out)
+ .map_err(|_| type_error("invalid versionstamp"))?;
+ Some(out)
+ }
+ None => None,
+ };
+ Ok(KvCheck {
+ key: encode_v8_key(value.0)?,
+ versionstamp,
+ })
+ }
+}
+
+type V8KvMutation = (KvKey, String, Option<V8Value>);
+
+impl TryFrom<V8KvMutation> for KvMutation {
+ type Error = AnyError;
+ fn try_from(value: V8KvMutation) -> Result<Self, AnyError> {
+ let key = encode_v8_key(value.0)?;
+ let kind = match (value.1.as_str(), value.2) {
+ ("set", Some(value)) => MutationKind::Set(value.try_into()?),
+ ("delete", None) => MutationKind::Delete,
+ ("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
+ ("min", Some(value)) => MutationKind::Min(value.try_into()?),
+ ("max", Some(value)) => MutationKind::Max(value.try_into()?),
+ (op, Some(_)) => {
+ return Err(type_error(format!("invalid mutation '{op}' with value")))
+ }
+ (op, None) => {
+ return Err(type_error(format!(
+ "invalid mutation '{op}' without value"
+ )))
+ }
+ };
+ Ok(KvMutation { key, kind })
+ }
+}
+
+type V8Enqueue = (ZeroCopyBuf, u64, Vec<KvKey>, Option<Vec<u32>>);
+
+impl TryFrom<V8Enqueue> for Enqueue {
+ type Error = AnyError;
+ fn try_from(value: V8Enqueue) -> Result<Self, AnyError> {
+ Ok(Enqueue {
+ payload: value.0.to_vec(),
+ deadline_ms: value.1,
+ keys_if_undelivered: value
+ .2
+ .into_iter()
+ .map(encode_v8_key)
+ .collect::<std::io::Result<_>>()?,
+ backoff_schedule: value.3,
+ })
+ }
+}
+
+fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> {
+ encode_key(&Key(key.into_iter().map(From::from).collect()))
+}
+
+enum RawSelector {
+ Prefixed {
+ prefix: Vec<u8>,
+ start: Option<Vec<u8>>,
+ end: Option<Vec<u8>>,
+ },
+ Range {
+ start: Vec<u8>,
+ end: Vec<u8>,
+ },
+}
+
+impl RawSelector {
+ fn from_tuple(
+ prefix: Option<KvKey>,
+ start: Option<KvKey>,
+ end: Option<KvKey>,
+ ) -> Result<Self, AnyError> {
+ let prefix = prefix.map(encode_v8_key).transpose()?;
+ let start = start.map(encode_v8_key).transpose()?;
+ let end = end.map(encode_v8_key).transpose()?;
+
+ match (prefix, start, end) {
+ (Some(prefix), None, None) => Ok(Self::Prefixed {
+ prefix,
+ start: None,
+ end: None,
+ }),
+ (Some(prefix), Some(start), None) => Ok(Self::Prefixed {
+ prefix,
+ start: Some(start),
+ end: None,
+ }),
+ (Some(prefix), None, Some(end)) => Ok(Self::Prefixed {
+ prefix,
+ start: None,
+ end: Some(end),
+ }),
+ (None, Some(start), Some(end)) => Ok(Self::Range { start, end }),
+ (None, Some(start), None) => {
+ let end = start.iter().copied().chain(Some(0)).collect();
+ Ok(Self::Range { start, end })
+ }
+ _ => Err(type_error("invalid range")),
+ }
+ }
+
+ fn start(&self) -> Option<&[u8]> {
+ match self {
+ Self::Prefixed { start, .. } => start.as_deref(),
+ Self::Range { start, .. } => Some(start),
+ }
+ }
+
+ fn end(&self) -> Option<&[u8]> {
+ match self {
+ Self::Prefixed { end, .. } => end.as_deref(),
+ Self::Range { end, .. } => Some(end),
+ }
+ }
+
+ fn common_prefix(&self) -> &[u8] {
+ match self {
+ Self::Prefixed { prefix, .. } => prefix,
+ Self::Range { start, end } => common_prefix_for_bytes(start, end),
+ }
+ }
+
+ fn range_start_key(&self) -> Vec<u8> {
+ match self {
+ Self::Prefixed {
+ start: Some(start), ..
+ } => start.clone(),
+ Self::Range { start, .. } => start.clone(),
+ Self::Prefixed { prefix, .. } => {
+ prefix.iter().copied().chain(Some(0)).collect()
+ }
+ }
+ }
+
+ fn range_end_key(&self) -> Vec<u8> {
+ match self {
+ Self::Prefixed { end: Some(end), .. } => end.clone(),
+ Self::Range { end, .. } => end.clone(),
+ Self::Prefixed { prefix, .. } => {
+ prefix.iter().copied().chain(Some(0xff)).collect()
+ }
+ }
+ }
+}
+
+fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] {
+ let mut i = 0;
+ while i < a.len() && i < b.len() && a[i] == b[i] {
+ i += 1;
+ }
+ &a[..i]
+}
+
+fn encode_cursor(
+ selector: &RawSelector,
+ boundary_key: &[u8],
+) -> Result<String, AnyError> {
+ let common_prefix = selector.common_prefix();
+ if !boundary_key.starts_with(common_prefix) {
+ return Err(type_error("invalid boundary key"));
+ }
+
+ Ok(base64::encode_config(
+ &boundary_key[common_prefix.len()..],
+ base64::URL_SAFE,
+ ))
+}
+
+fn decode_selector_and_cursor(
+ selector: &RawSelector,
+ reverse: bool,
+ cursor: Option<&ByteString>,
+) -> Result<(Vec<u8>, Vec<u8>), AnyError> {
+ let Some(cursor) = cursor else {
+ return Ok((selector.range_start_key(), selector.range_end_key()));
+ };
+
+ let common_prefix = selector.common_prefix();
+ let cursor = base64::decode_config(cursor, base64::URL_SAFE)
+ .map_err(|_| type_error("invalid cursor"))?;
+
+ let first_key: Vec<u8>;
+ let last_key: Vec<u8>;
+
+ if reverse {
+ first_key = selector.range_start_key();
+ last_key = common_prefix
+ .iter()
+ .copied()
+ .chain(cursor.iter().copied())
+ .collect();
+ } else {
+ first_key = common_prefix
+ .iter()
+ .copied()
+ .chain(cursor.iter().copied())
+ .chain(Some(0))
+ .collect();
+ last_key = selector.range_end_key();
+ }
+
+ // Defend against out-of-bounds reading
+ if let Some(start) = selector.start() {
+ if &first_key[..] < start {
+ return Err(type_error("cursor out of bounds"));
+ }
+ }
+
+ if let Some(end) = selector.end() {
+ if &last_key[..] > end {
+ return Err(type_error("cursor out of bounds"));
+ }
+ }
+
+ Ok((first_key, last_key))
+}
+
+#[op]
+async fn op_kv_atomic_write<DBH>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ checks: Vec<V8KvCheck>,
+ mutations: Vec<V8KvMutation>,
+ enqueues: Vec<V8Enqueue>,
+) -> Result<bool, AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let db = {
+ let state = state.borrow();
+ let resource =
+ state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+ resource.db.clone()
+ };
+
+ let checks = checks
+ .into_iter()
+ .map(TryInto::try_into)
+ .collect::<Result<_, AnyError>>()
+ .with_context(|| "invalid check")?;
+ let mutations = mutations
+ .into_iter()
+ .map(TryInto::try_into)
+ .collect::<Result<_, AnyError>>()
+ .with_context(|| "invalid mutation")?;
+ let enqueues = enqueues
+ .into_iter()
+ .map(TryInto::try_into)
+ .collect::<Result<_, AnyError>>()
+ .with_context(|| "invalid enqueue")?;
+
+ let atomic_write = AtomicWrite {
+ checks,
+ mutations,
+ enqueues,
+ };
+
+ let result = db.atomic_write(atomic_write).await?;
+
+ Ok(result)
+}
+
+// (prefix, start, end)
+type EncodeCursorRangeSelector = (Option<KvKey>, Option<KvKey>, Option<KvKey>);
+
+#[op]
+fn op_kv_encode_cursor(
+ (prefix, start, end): EncodeCursorRangeSelector,
+ boundary_key: KvKey,
+) -> Result<String, AnyError> {
+ let selector = RawSelector::from_tuple(prefix, start, end)?;
+ let boundary_key = encode_v8_key(boundary_key)?;
+ let cursor = encode_cursor(&selector, &boundary_key)?;
+ Ok(cursor)
+}
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
new file mode 100644
index 000000000..82ff8f8e2
--- /dev/null
+++ b/ext/kv/sqlite.rs
@@ -0,0 +1,348 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::marker::PhantomData;
+use std::path::Path;
+use std::path::PathBuf;
+use std::rc::Rc;
+
+use async_trait::async_trait;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::OpState;
+use rusqlite::params;
+use rusqlite::OptionalExtension;
+use rusqlite::Transaction;
+
+use crate::AtomicWrite;
+use crate::Database;
+use crate::DatabaseHandler;
+use crate::KvEntry;
+use crate::MutationKind;
+use crate::ReadRange;
+use crate::ReadRangeOutput;
+use crate::SnapshotReadOptions;
+use crate::Value;
+
+const STATEMENT_INC_AND_GET_DATA_VERSION: &str =
+ "update data_version set version = version + 1 where k = 0 returning version";
+const STATEMENT_KV_RANGE_SCAN: &str =
+ "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k asc limit ?";
+const STATEMENT_KV_RANGE_SCAN_REVERSE: &str =
+ "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k desc limit ?";
+const STATEMENT_KV_POINT_GET_VALUE_ONLY: &str =
+ "select v, v_encoding from kv where k = ?";
+const STATEMENT_KV_POINT_GET_VERSION_ONLY: &str =
+ "select version from kv where k = ?";
+const STATEMENT_KV_POINT_SET: &str =
+ "insert into kv (k, v, v_encoding, version) values (:k, :v, :v_encoding, :version) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version";
+const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?";
+
+const STATEMENT_CREATE_MIGRATION_TABLE: &str = "
+create table if not exists migration_state(
+ k integer not null primary key,
+ version integer not null
+)
+";
+
+const MIGRATIONS: [&str; 2] = [
+ "
+create table data_version (
+ k integer primary key,
+ version integer not null
+);
+insert into data_version (k, version) values (0, 0);
+create table kv (
+ k blob primary key,
+ v blob not null,
+ v_encoding integer not null,
+ version integer not null
+) without rowid;
+",
+ "
+create table queue (
+ ts integer not null,
+ id text not null,
+ data blob not null,
+ backoff_schedule text not null,
+ keys_if_undelivered blob not null,
+
+ primary key (ts, id)
+);
+create table queue_running(
+ deadline integer not null,
+ id text not null,
+ data blob not null,
+ backoff_schedule text not null,
+ keys_if_undelivered blob not null,
+
+ primary key (deadline, id)
+);
+",
+];
+
+pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> {
+ pub default_storage_dir: Option<PathBuf>,
+ _permissions: PhantomData<P>,
+}
+
+pub trait SqliteDbHandlerPermissions {
+ fn check_read(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError>;
+ fn check_write(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError>;
+}
+
+impl<P: SqliteDbHandlerPermissions> SqliteDbHandler<P> {
+ pub fn new(default_storage_dir: Option<PathBuf>) -> Self {
+ Self {
+ default_storage_dir,
+ _permissions: PhantomData,
+ }
+ }
+}
+
+#[async_trait(?Send)]
+impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
+ type DB = SqliteDb;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError> {
+ let conn = match (path.as_deref(), &self.default_storage_dir) {
+ (Some(":memory:") | None, None) => {
+ rusqlite::Connection::open_in_memory()?
+ }
+ (Some(path), _) => {
+ let path = Path::new(path);
+ {
+ let mut state = state.borrow_mut();
+ let permissions = state.borrow_mut::<P>();
+ permissions.check_read(path, "Deno.openKv")?;
+ permissions.check_write(path, "Deno.openKv")?;
+ }
+ rusqlite::Connection::open(path)?
+ }
+ (None, Some(path)) => {
+ std::fs::create_dir_all(path)?;
+ let path = path.join("kv.sqlite3");
+ rusqlite::Connection::open(&path)?
+ }
+ };
+
+ conn.pragma_update(None, "journal_mode", "wal")?;
+ conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
+
+ let current_version: usize = conn
+ .query_row(
+ "select version from migration_state where k = 0",
+ [],
+ |row| row.get(0),
+ )
+ .optional()?
+ .unwrap_or(0);
+
+ for (i, migration) in MIGRATIONS.iter().enumerate() {
+ let version = i + 1;
+ if version > current_version {
+ conn.execute_batch(migration)?;
+ conn.execute(
+ "replace into migration_state (k, version) values(?, ?)",
+ [&0, &version],
+ )?;
+ }
+ }
+
+ Ok(SqliteDb(RefCell::new(conn)))
+ }
+}
+
+pub struct SqliteDb(RefCell<rusqlite::Connection>);
+
+#[async_trait(?Send)]
+impl Database for SqliteDb {
+ async fn snapshot_read(
+ &self,
+ requests: Vec<ReadRange>,
+ _options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ let mut responses = Vec::with_capacity(requests.len());
+ let mut db = self.0.borrow_mut();
+ let tx = db.transaction()?;
+
+ for request in requests {
+ let mut stmt = tx.prepare_cached(if request.reverse {
+ STATEMENT_KV_RANGE_SCAN_REVERSE
+ } else {
+ STATEMENT_KV_RANGE_SCAN
+ })?;
+ let entries = stmt
+ .query_map(
+ (
+ request.start.as_slice(),
+ request.end.as_slice(),
+ request.limit.get(),
+ ),
+ |row| {
+ let key: Vec<u8> = row.get(0)?;
+ let value: Vec<u8> = row.get(1)?;
+ let encoding: i64 = row.get(2)?;
+
+ let value = decode_value(value, encoding);
+
+ let version: i64 = row.get(3)?;
+ Ok(KvEntry {
+ key,
+ value,
+ versionstamp: version_to_versionstamp(version),
+ })
+ },
+ )?
+ .collect::<Result<Vec<_>, rusqlite::Error>>()?;
+ responses.push(ReadRangeOutput { entries });
+ }
+
+ Ok(responses)
+ }
+
+ async fn atomic_write(&self, write: AtomicWrite) -> Result<bool, AnyError> {
+ let mut db = self.0.borrow_mut();
+
+ let tx = db.transaction()?;
+
+ for check in write.checks {
+ let real_versionstamp = tx
+ .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)?
+ .query_row([check.key.as_slice()], |row| row.get(0))
+ .optional()?
+ .map(version_to_versionstamp);
+ if real_versionstamp != check.versionstamp {
+ return Ok(false);
+ }
+ }
+
+ let version: i64 = tx
+ .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
+ .query_row([], |row| row.get(0))?;
+
+ for mutation in write.mutations {
+ match mutation.kind {
+ MutationKind::Set(value) => {
+ let (value, encoding) = encode_value(&value);
+ let changed = tx
+ .prepare_cached(STATEMENT_KV_POINT_SET)?
+ .execute(params![mutation.key, &value, &encoding, &version])?;
+ assert_eq!(changed, 1)
+ }
+ MutationKind::Delete => {
+ let changed = tx
+ .prepare_cached(STATEMENT_KV_POINT_DELETE)?
+ .execute(params![mutation.key])?;
+ assert!(changed == 0 || changed == 1)
+ }
+ MutationKind::Sum(operand) => {
+ mutate_le64(&tx, &mutation.key, "sum", &operand, version, |a, b| {
+ a.wrapping_add(b)
+ })?;
+ }
+ MutationKind::Min(operand) => {
+ mutate_le64(&tx, &mutation.key, "min", &operand, version, |a, b| {
+ a.min(b)
+ })?;
+ }
+ MutationKind::Max(operand) => {
+ mutate_le64(&tx, &mutation.key, "max", &operand, version, |a, b| {
+ a.max(b)
+ })?;
+ }
+ }
+ }
+
+ // TODO(@losfair): enqueues
+
+ tx.commit()?;
+
+ Ok(true)
+ }
+}
+
+/// Mutates a LE64 value in the database, defaulting to setting it to the
+/// operand if it doesn't exist.
+fn mutate_le64(
+ tx: &Transaction,
+ key: &[u8],
+ op_name: &str,
+ operand: &Value,
+ new_version: i64,
+ mutate: impl FnOnce(u64, u64) -> u64,
+) -> Result<(), AnyError> {
+ let Value::U64(operand) = *operand else {
+ return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 operand")));
+ };
+
+ let old_value = tx
+ .prepare_cached(STATEMENT_KV_POINT_GET_VALUE_ONLY)?
+ .query_row([key], |row| {
+ let value: Vec<u8> = row.get(0)?;
+ let encoding: i64 = row.get(1)?;
+
+ let value = decode_value(value, encoding);
+ Ok(value)
+ })
+ .optional()?;
+
+ let new_value = match old_value {
+ Some(Value::U64(old_value) ) => mutate(old_value, operand),
+ Some(_) => return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 value in the database"))),
+ None => operand,
+ };
+
+ let new_value = Value::U64(new_value);
+ let (new_value, encoding) = encode_value(&new_value);
+
+ let changed = tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![
+ key,
+ &new_value[..],
+ encoding,
+ new_version
+ ])?;
+ assert_eq!(changed, 1);
+
+ Ok(())
+}
+
+fn version_to_versionstamp(version: i64) -> [u8; 10] {
+ let mut versionstamp = [0; 10];
+ versionstamp[..8].copy_from_slice(&version.to_be_bytes());
+ versionstamp
+}
+
+const VALUE_ENCODING_V8: i64 = 1;
+const VALUE_ENCODING_LE64: i64 = 2;
+const VALUE_ENCODING_BYTES: i64 = 3;
+
+fn decode_value(value: Vec<u8>, encoding: i64) -> crate::Value {
+ match encoding {
+ VALUE_ENCODING_V8 => crate::Value::V8(value),
+ VALUE_ENCODING_BYTES => crate::Value::Bytes(value),
+ VALUE_ENCODING_LE64 => {
+ let mut buf = [0; 8];
+ buf.copy_from_slice(&value);
+ crate::Value::U64(u64::from_le_bytes(buf))
+ }
+ _ => todo!(),
+ }
+}
+
+fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
+ match value {
+ crate::Value::V8(value) => (Cow::Borrowed(value), VALUE_ENCODING_V8),
+ crate::Value::Bytes(value) => (Cow::Borrowed(value), VALUE_ENCODING_BYTES),
+ crate::Value::U64(value) => {
+ let mut buf = [0; 8];
+ buf.copy_from_slice(&value.to_le_bytes());
+ (Cow::Owned(buf.to_vec()), VALUE_ENCODING_LE64)
+ }
+ }
+}
diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml
index 9ed990a23..c91d6a056 100644
--- a/ext/node/Cargo.toml
+++ b/ext/node/Cargo.toml
@@ -18,7 +18,7 @@ aes.workspace = true
cbc.workspace = true
deno_core.workspace = true
digest = { version = "0.10.5", features = ["core-api", "std"] }
-hex = "0.4.3"
+hex.workspace = true
idna = "0.3.0"
indexmap.workspace = true
md-5 = "0.10.5"