summaryrefslogtreecommitdiff
path: root/ext/kv/sqlite.rs
diff options
context:
space:
mode:
authorHeyang Zhou <zhy20000919@hotmail.com>2023-03-22 12:13:24 +0800
committerGitHub <noreply@github.com>2023-03-22 12:13:24 +0800
commit92ebf4afe5d55135b3ba39616bcb77106c07c597 (patch)
treef79fe65811c7449f5b50c093852eceaad228d39f /ext/kv/sqlite.rs
parent8bcffff9dc517aa93dea2816b2a854f65d24eccc (diff)
feat(ext/kv): key-value store (#18232)
This commit adds unstable "Deno.openKv()" API that allows to open a key-value database at a specified path. --------- Co-authored-by: Luca Casonato <hello@lcas.dev> Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r--ext/kv/sqlite.rs348
1 files changed, 348 insertions, 0 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
new file mode 100644
index 000000000..82ff8f8e2
--- /dev/null
+++ b/ext/kv/sqlite.rs
@@ -0,0 +1,348 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::marker::PhantomData;
+use std::path::Path;
+use std::path::PathBuf;
+use std::rc::Rc;
+
+use async_trait::async_trait;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::OpState;
+use rusqlite::params;
+use rusqlite::OptionalExtension;
+use rusqlite::Transaction;
+
+use crate::AtomicWrite;
+use crate::Database;
+use crate::DatabaseHandler;
+use crate::KvEntry;
+use crate::MutationKind;
+use crate::ReadRange;
+use crate::ReadRangeOutput;
+use crate::SnapshotReadOptions;
+use crate::Value;
+
+const STATEMENT_INC_AND_GET_DATA_VERSION: &str =
+ "update data_version set version = version + 1 where k = 0 returning version";
+const STATEMENT_KV_RANGE_SCAN: &str =
+ "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k asc limit ?";
+const STATEMENT_KV_RANGE_SCAN_REVERSE: &str =
+ "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k desc limit ?";
+const STATEMENT_KV_POINT_GET_VALUE_ONLY: &str =
+ "select v, v_encoding from kv where k = ?";
+const STATEMENT_KV_POINT_GET_VERSION_ONLY: &str =
+ "select version from kv where k = ?";
+const STATEMENT_KV_POINT_SET: &str =
+ "insert into kv (k, v, v_encoding, version) values (:k, :v, :v_encoding, :version) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version";
+const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?";
+
+const STATEMENT_CREATE_MIGRATION_TABLE: &str = "
+create table if not exists migration_state(
+ k integer not null primary key,
+ version integer not null
+)
+";
+
+const MIGRATIONS: [&str; 2] = [
+ "
+create table data_version (
+ k integer primary key,
+ version integer not null
+);
+insert into data_version (k, version) values (0, 0);
+create table kv (
+ k blob primary key,
+ v blob not null,
+ v_encoding integer not null,
+ version integer not null
+) without rowid;
+",
+ "
+create table queue (
+ ts integer not null,
+ id text not null,
+ data blob not null,
+ backoff_schedule text not null,
+ keys_if_undelivered blob not null,
+
+ primary key (ts, id)
+);
+create table queue_running(
+ deadline integer not null,
+ id text not null,
+ data blob not null,
+ backoff_schedule text not null,
+ keys_if_undelivered blob not null,
+
+ primary key (deadline, id)
+);
+",
+];
+
+pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> {
+ pub default_storage_dir: Option<PathBuf>,
+ _permissions: PhantomData<P>,
+}
+
+pub trait SqliteDbHandlerPermissions {
+ fn check_read(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError>;
+ fn check_write(&mut self, p: &Path, api_name: &str) -> Result<(), AnyError>;
+}
+
+impl<P: SqliteDbHandlerPermissions> SqliteDbHandler<P> {
+ pub fn new(default_storage_dir: Option<PathBuf>) -> Self {
+ Self {
+ default_storage_dir,
+ _permissions: PhantomData,
+ }
+ }
+}
+
+#[async_trait(?Send)]
+impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
+ type DB = SqliteDb;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError> {
+ let conn = match (path.as_deref(), &self.default_storage_dir) {
+ (Some(":memory:") | None, None) => {
+ rusqlite::Connection::open_in_memory()?
+ }
+ (Some(path), _) => {
+ let path = Path::new(path);
+ {
+ let mut state = state.borrow_mut();
+ let permissions = state.borrow_mut::<P>();
+ permissions.check_read(path, "Deno.openKv")?;
+ permissions.check_write(path, "Deno.openKv")?;
+ }
+ rusqlite::Connection::open(path)?
+ }
+ (None, Some(path)) => {
+ std::fs::create_dir_all(path)?;
+ let path = path.join("kv.sqlite3");
+ rusqlite::Connection::open(&path)?
+ }
+ };
+
+ conn.pragma_update(None, "journal_mode", "wal")?;
+ conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
+
+ let current_version: usize = conn
+ .query_row(
+ "select version from migration_state where k = 0",
+ [],
+ |row| row.get(0),
+ )
+ .optional()?
+ .unwrap_or(0);
+
+ for (i, migration) in MIGRATIONS.iter().enumerate() {
+ let version = i + 1;
+ if version > current_version {
+ conn.execute_batch(migration)?;
+ conn.execute(
+ "replace into migration_state (k, version) values(?, ?)",
+ [&0, &version],
+ )?;
+ }
+ }
+
+ Ok(SqliteDb(RefCell::new(conn)))
+ }
+}
+
+pub struct SqliteDb(RefCell<rusqlite::Connection>);
+
+#[async_trait(?Send)]
+impl Database for SqliteDb {
+ async fn snapshot_read(
+ &self,
+ requests: Vec<ReadRange>,
+ _options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ let mut responses = Vec::with_capacity(requests.len());
+ let mut db = self.0.borrow_mut();
+ let tx = db.transaction()?;
+
+ for request in requests {
+ let mut stmt = tx.prepare_cached(if request.reverse {
+ STATEMENT_KV_RANGE_SCAN_REVERSE
+ } else {
+ STATEMENT_KV_RANGE_SCAN
+ })?;
+ let entries = stmt
+ .query_map(
+ (
+ request.start.as_slice(),
+ request.end.as_slice(),
+ request.limit.get(),
+ ),
+ |row| {
+ let key: Vec<u8> = row.get(0)?;
+ let value: Vec<u8> = row.get(1)?;
+ let encoding: i64 = row.get(2)?;
+
+ let value = decode_value(value, encoding);
+
+ let version: i64 = row.get(3)?;
+ Ok(KvEntry {
+ key,
+ value,
+ versionstamp: version_to_versionstamp(version),
+ })
+ },
+ )?
+ .collect::<Result<Vec<_>, rusqlite::Error>>()?;
+ responses.push(ReadRangeOutput { entries });
+ }
+
+ Ok(responses)
+ }
+
+ async fn atomic_write(&self, write: AtomicWrite) -> Result<bool, AnyError> {
+ let mut db = self.0.borrow_mut();
+
+ let tx = db.transaction()?;
+
+ for check in write.checks {
+ let real_versionstamp = tx
+ .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)?
+ .query_row([check.key.as_slice()], |row| row.get(0))
+ .optional()?
+ .map(version_to_versionstamp);
+ if real_versionstamp != check.versionstamp {
+ return Ok(false);
+ }
+ }
+
+ let version: i64 = tx
+ .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
+ .query_row([], |row| row.get(0))?;
+
+ for mutation in write.mutations {
+ match mutation.kind {
+ MutationKind::Set(value) => {
+ let (value, encoding) = encode_value(&value);
+ let changed = tx
+ .prepare_cached(STATEMENT_KV_POINT_SET)?
+ .execute(params![mutation.key, &value, &encoding, &version])?;
+ assert_eq!(changed, 1)
+ }
+ MutationKind::Delete => {
+ let changed = tx
+ .prepare_cached(STATEMENT_KV_POINT_DELETE)?
+ .execute(params![mutation.key])?;
+ assert!(changed == 0 || changed == 1)
+ }
+ MutationKind::Sum(operand) => {
+ mutate_le64(&tx, &mutation.key, "sum", &operand, version, |a, b| {
+ a.wrapping_add(b)
+ })?;
+ }
+ MutationKind::Min(operand) => {
+ mutate_le64(&tx, &mutation.key, "min", &operand, version, |a, b| {
+ a.min(b)
+ })?;
+ }
+ MutationKind::Max(operand) => {
+ mutate_le64(&tx, &mutation.key, "max", &operand, version, |a, b| {
+ a.max(b)
+ })?;
+ }
+ }
+ }
+
+ // TODO(@losfair): enqueues
+
+ tx.commit()?;
+
+ Ok(true)
+ }
+}
+
+/// Mutates a LE64 value in the database, defaulting to setting it to the
+/// operand if it doesn't exist.
+fn mutate_le64(
+ tx: &Transaction,
+ key: &[u8],
+ op_name: &str,
+ operand: &Value,
+ new_version: i64,
+ mutate: impl FnOnce(u64, u64) -> u64,
+) -> Result<(), AnyError> {
+ let Value::U64(operand) = *operand else {
+ return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 operand")));
+ };
+
+ let old_value = tx
+ .prepare_cached(STATEMENT_KV_POINT_GET_VALUE_ONLY)?
+ .query_row([key], |row| {
+ let value: Vec<u8> = row.get(0)?;
+ let encoding: i64 = row.get(1)?;
+
+ let value = decode_value(value, encoding);
+ Ok(value)
+ })
+ .optional()?;
+
+ let new_value = match old_value {
+ Some(Value::U64(old_value) ) => mutate(old_value, operand),
+ Some(_) => return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 value in the database"))),
+ None => operand,
+ };
+
+ let new_value = Value::U64(new_value);
+ let (new_value, encoding) = encode_value(&new_value);
+
+ let changed = tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![
+ key,
+ &new_value[..],
+ encoding,
+ new_version
+ ])?;
+ assert_eq!(changed, 1);
+
+ Ok(())
+}
+
+fn version_to_versionstamp(version: i64) -> [u8; 10] {
+ let mut versionstamp = [0; 10];
+ versionstamp[..8].copy_from_slice(&version.to_be_bytes());
+ versionstamp
+}
+
+const VALUE_ENCODING_V8: i64 = 1;
+const VALUE_ENCODING_LE64: i64 = 2;
+const VALUE_ENCODING_BYTES: i64 = 3;
+
+fn decode_value(value: Vec<u8>, encoding: i64) -> crate::Value {
+ match encoding {
+ VALUE_ENCODING_V8 => crate::Value::V8(value),
+ VALUE_ENCODING_BYTES => crate::Value::Bytes(value),
+ VALUE_ENCODING_LE64 => {
+ let mut buf = [0; 8];
+ buf.copy_from_slice(&value);
+ crate::Value::U64(u64::from_le_bytes(buf))
+ }
+ _ => todo!(),
+ }
+}
+
+fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
+ match value {
+ crate::Value::V8(value) => (Cow::Borrowed(value), VALUE_ENCODING_V8),
+ crate::Value::Bytes(value) => (Cow::Borrowed(value), VALUE_ENCODING_BYTES),
+ crate::Value::U64(value) => {
+ let mut buf = [0; 8];
+ buf.copy_from_slice(&value.to_le_bytes());
+ (Cow::Owned(buf.to_vec()), VALUE_ENCODING_LE64)
+ }
+ }
+}