diff options
author | Luca Casonato <hello@lcas.dev> | 2023-10-31 12:13:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-31 11:13:57 +0000 |
commit | 2d9298f5f5550c21ba218ff7095aa9afe80c7e02 (patch) | |
tree | 6daee1cc6007745097e176c7367b697e58632baf /ext/kv/sqlite.rs | |
parent | 092555c611ebab87ad570b4dcb73d54288dccdd9 (diff) |
chore: update ext/kv to use denokv_* crates (#20986)
This commit updates the ext/kv module to use the denokv_* crates for
the protocol and the sqlite backend. This also fixes a couple of bugs in
the sqlite backend, and updates versionstamps to be updated less
linearly.
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r-- | ext/kv/sqlite.rs | 1057 |
1 files changed, 56 insertions, 1001 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index 327091f05..b4e251f96 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -1,176 +1,37 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use std::borrow::Cow; use std::cell::RefCell; use std::collections::HashMap; use std::env::current_dir; -use std::future::Future; use std::io::ErrorKind; use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; -use std::rc::Weak; use std::sync::Arc; use std::sync::Mutex; -use std::time::Duration; -use std::time::SystemTime; +use std::sync::OnceLock; use async_trait::async_trait; -use deno_core::error::get_custom_error_class; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::FutureExt; -use deno_core::unsync::spawn; use deno_core::unsync::spawn_blocking; -use deno_core::AsyncRefCell; use deno_core::OpState; use deno_node::PathClean; -use rand::Rng; -use rusqlite::params; +pub use denokv_sqlite::TypeError; +use rand::RngCore; +use rand::SeedableRng; use rusqlite::OpenFlags; -use rusqlite::OptionalExtension; -use rusqlite::Transaction; -use tokio::sync::broadcast; -use tokio::sync::broadcast::error::RecvError; -use tokio::sync::mpsc; -use tokio::sync::watch; -use tokio::sync::OnceCell; -use tokio::sync::OwnedSemaphorePermit; -use tokio::sync::Semaphore; -use uuid::Uuid; +use tokio::sync::Notify; -use crate::AtomicWrite; -use crate::CommitResult; -use crate::Database; use crate::DatabaseHandler; -use crate::KvEntry; -use crate::MutationKind; -use crate::QueueMessageHandle; -use crate::ReadRange; -use crate::ReadRangeOutput; -use crate::SnapshotReadOptions; -use crate::Value; -const STATEMENT_INC_AND_GET_DATA_VERSION: &str = - "update data_version set version = version + 1 where k = 0 returning version"; -const STATEMENT_KV_RANGE_SCAN: &str = - "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k asc limit ?"; -const STATEMENT_KV_RANGE_SCAN_REVERSE: &str = - "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k desc limit ?"; -const STATEMENT_KV_POINT_GET_VALUE_ONLY: &str = - "select v, v_encoding from kv where k = ?"; -const STATEMENT_KV_POINT_GET_VERSION_ONLY: &str = - "select version from kv where k = ?"; -const STATEMENT_KV_POINT_SET: &str = - "insert into kv (k, v, v_encoding, version, expiration_ms) values (:k, :v, :v_encoding, :version, :expiration_ms) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version, expiration_ms = :expiration_ms"; -const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?"; - -const STATEMENT_QUEUE_ADD_READY: &str = "insert into queue (ts, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)"; -const STATEMENT_QUEUE_GET_NEXT_READY: &str = "select ts, id, data, backoff_schedule, keys_if_undelivered from queue where ts <= ? order by ts limit 100"; -const STATEMENT_QUEUE_GET_EARLIEST_READY: &str = - "select ts from queue order by ts limit 1"; -const STATEMENT_QUEUE_REMOVE_READY: &str = "delete from queue where id = ?"; -const STATEMENT_QUEUE_ADD_RUNNING: &str = "insert into queue_running (deadline, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)"; -const STATEMENT_QUEUE_REMOVE_RUNNING: &str = - "delete from queue_running where id = ?"; -const STATEMENT_QUEUE_GET_RUNNING_BY_ID: &str = "select deadline, id, data, backoff_schedule, keys_if_undelivered from queue_running where id = ?"; -const STATEMENT_QUEUE_GET_RUNNING: &str = - "select id from queue_running order by deadline limit 100"; - -const STATEMENT_CREATE_MIGRATION_TABLE: &str = " -create table if not exists migration_state( - k integer not null primary key, - version integer not null -) -"; - -const MIGRATIONS: [&str; 3] = [ - " -create table data_version ( - k integer primary key, - version integer not null -); -insert into data_version (k, version) values (0, 0); -create table kv ( - k blob primary key, - v blob not null, - v_encoding integer not null, - version integer not null -) without rowid; -", - " -create table queue ( - ts integer not null, - id text not null, - data blob not null, - backoff_schedule text not null, - keys_if_undelivered blob not null, - - primary key (ts, id) -); -create table queue_running( - deadline integer not null, - id text not null, - data blob not null, - backoff_schedule text not null, - keys_if_undelivered blob not null, - - primary key (deadline, id) -); -", - " -alter table kv add column seq integer not null default 0; -alter table data_version add column seq integer not null default 0; -alter table kv add column expiration_ms integer not null default -1; -create index kv_expiration_ms_idx on kv (expiration_ms); -", -]; - -const DISPATCH_CONCURRENCY_LIMIT: usize = 100; -const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1000, 5000, 30000, 60000]; - -const ERROR_USING_CLOSED_DATABASE: &str = "Attempted to use a closed database"; - -#[derive(Clone)] -struct ProtectedConn { - guard: Rc<AsyncRefCell<()>>, - conn: Arc<Mutex<Option<rusqlite::Connection>>>, -} - -#[derive(Clone)] -struct WeakProtectedConn { - guard: Weak<AsyncRefCell<()>>, - conn: std::sync::Weak<Mutex<Option<rusqlite::Connection>>>, -} - -impl ProtectedConn { - fn new(conn: rusqlite::Connection) -> Self { - Self { - guard: Rc::new(AsyncRefCell::new(())), - conn: Arc::new(Mutex::new(Some(conn))), - } - } - - fn downgrade(&self) -> WeakProtectedConn { - WeakProtectedConn { - guard: Rc::downgrade(&self.guard), - conn: Arc::downgrade(&self.conn), - } - } -} - -impl WeakProtectedConn { - fn upgrade(&self) -> Option<ProtectedConn> { - let guard = self.guard.upgrade()?; - let conn = self.conn.upgrade()?; - Some(ProtectedConn { guard, conn }) - } -} +static QUEUE_WAKER_MAP: OnceLock<Mutex<HashMap<PathBuf, Arc<Notify>>>> = + OnceLock::new(); pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> { pub default_storage_dir: Option<PathBuf>, + versionstamp_rng_seed: Option<u64>, _permissions: PhantomData<P>, } @@ -180,9 +41,13 @@ pub trait SqliteDbHandlerPermissions { } impl<P: SqliteDbHandlerPermissions> SqliteDbHandler<P> { - pub fn new(default_storage_dir: Option<PathBuf>) -> Self { + pub fn new( + default_storage_dir: Option<PathBuf>, + versionstamp_rng_seed: Option<u64>, + ) -> Self { Self { default_storage_dir, + versionstamp_rng_seed, _permissions: PhantomData, } } @@ -190,7 +55,7 @@ impl<P: SqliteDbHandlerPermissions> SqliteDbHandler<P> { #[async_trait(?Send)] impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { - type DB = SqliteDb; + type DB = denokv_sqlite::Sqlite; async fn open( &self, @@ -218,866 +83,61 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { } } - let (conn, queue_waker_key) = sqlite_retry_loop(|| { - let path = path.clone(); - let default_storage_dir = self.default_storage_dir.clone(); - async move { - spawn_blocking(move || { - let (conn, queue_waker_key) = - match (path.as_deref(), &default_storage_dir) { - (Some(":memory:"), _) | (None, None) => { - (rusqlite::Connection::open_in_memory()?, None) - } - (Some(path), _) => { - let flags = - OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); - let resolved_path = canonicalize_path(&PathBuf::from(path))?; - ( - rusqlite::Connection::open_with_flags(path, flags)?, - Some(resolved_path), - ) - } - (None, Some(path)) => { - std::fs::create_dir_all(path)?; - let path = path.join("kv.sqlite3"); - (rusqlite::Connection::open(path.clone())?, Some(path)) - } - }; - - conn.pragma_update(None, "journal_mode", "wal")?; - - Ok::<_, AnyError>((conn, queue_waker_key)) - }) - .await - .unwrap() - } - }) - .await?; - let conn = ProtectedConn::new(conn); - SqliteDb::run_tx(conn.clone(), |tx| { - tx.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?; - - let current_version: usize = tx - .query_row( - "select version from migration_state where k = 0", - [], - |row| row.get(0), - ) - .optional()? - .unwrap_or(0); - - for (i, migration) in MIGRATIONS.iter().enumerate() { - let version = i + 1; - if version > current_version { - tx.execute_batch(migration)?; - tx.execute( - "replace into migration_state (k, version) values(?, ?)", - [&0, &version], - )?; - } - } - - tx.commit()?; - - Ok(()) - }) - .await?; - - let expiration_watcher = spawn(watch_expiration(conn.clone())); - - Ok(SqliteDb { - conn, - queue: OnceCell::new(), - queue_waker_key, - expiration_watcher, - }) - } -} - -pub struct SqliteDb { - conn: ProtectedConn, - queue: OnceCell<SqliteQueue>, - queue_waker_key: Option<PathBuf>, - expiration_watcher: deno_core::unsync::JoinHandle<()>, -} - -impl Drop for SqliteDb { - fn drop(&mut self) { - self.close(); - } -} - -async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>( - mut f: impl FnMut() -> Fut, -) -> Result<R, AnyError> { - loop { - match f().await { - Ok(x) => return Ok(x), - Err(e) => { - if let Some(x) = e.downcast_ref::<rusqlite::Error>() { - if x.sqlite_error_code() == Some(rusqlite::ErrorCode::DatabaseBusy) { - log::debug!("kv: Database is busy, retrying"); - tokio::time::sleep(Duration::from_millis( - rand::thread_rng().gen_range(5..20), - )) - .await; - continue; - } - } - return Err(e); - } - } - } -} - -impl SqliteDb { - async fn run_tx<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError> - where - F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>) - + Clone - + Send - + 'static, - R: Send + 'static, - { - sqlite_retry_loop(|| Self::run_tx_inner(conn.clone(), f.clone())).await - } - - async fn run_tx_inner<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError> - where - F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>) - + Send - + 'static, - R: Send + 'static, - { - // `run_tx` runs in an asynchronous context. First acquire the async lock to - // coordinate with other async invocations. - let _guard_holder = conn.guard.borrow_mut().await; - - // Then, take the synchronous lock. This operation is guaranteed to success without waiting, - // unless the database is being closed. - let db = conn.conn.clone(); - spawn_blocking(move || { - let mut db = db.try_lock().ok(); - let Some(db) = db.as_mut().and_then(|x| x.as_mut()) else { - return Err(type_error(ERROR_USING_CLOSED_DATABASE)); - }; - let result = match db.transaction() { - Ok(tx) => f(tx), - Err(e) => Err(e.into()), - }; - result - }) - .await - .unwrap() - } -} - -pub struct DequeuedMessage { - conn: WeakProtectedConn, - id: String, - payload: Option<Vec<u8>>, - waker_tx: broadcast::Sender<()>, - _permit: OwnedSemaphorePermit, -} - -#[async_trait(?Send)] -impl QueueMessageHandle for DequeuedMessage { - async fn finish(&self, success: bool) -> Result<(), AnyError> { - let Some(conn) = self.conn.upgrade() else { - return Ok(()); - }; - let id = self.id.clone(); - let requeued = SqliteDb::run_tx(conn, move |tx| { - let requeued = { - if success { - let changed = tx - .prepare_cached(STATEMENT_QUEUE_REMOVE_RUNNING)? - .execute([&id])?; - assert!(changed <= 1); - false - } else { - SqliteQueue::requeue_message(&id, &tx)? - } - }; - tx.commit()?; - Ok(requeued) - }) - .await; - let requeued = match requeued { - Ok(x) => x, - Err(e) => { - // Silently ignore the error if the database has been closed - // This message will be delivered on the next run - if is_conn_closed_error(&e) { - return Ok(()); - } - return Err(e); - } - }; - if requeued { - // If the message was requeued, wake up the dequeue loop. - let _ = self.waker_tx.send(()); - } - Ok(()) - } - - async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> { - self - .payload - .take() - .ok_or_else(|| type_error("Payload already consumed")) - } -} - -type DequeueReceiver = mpsc::Receiver<(Vec<u8>, String)>; - -struct SqliteQueue { - conn: ProtectedConn, - dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>, - concurrency_limiter: Arc<Semaphore>, - waker_tx: broadcast::Sender<()>, - shutdown_tx: watch::Sender<()>, -} - -impl SqliteQueue { - fn new( - conn: ProtectedConn, - waker_tx: broadcast::Sender<()>, - waker_rx: broadcast::Receiver<()>, - ) -> Self { - let conn_clone = conn.clone(); - let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); - let (dequeue_tx, dequeue_rx) = mpsc::channel::<(Vec<u8>, String)>(64); - - spawn(async move { - // Oneshot requeue of all inflight messages. - if let Err(e) = Self::requeue_inflight_messages(conn.clone()).await { - // Exit the dequeue loop cleanly if the database has been closed. - if is_conn_closed_error(&e) { - return; - } - panic!("kv: Error in requeue_inflight_messages: {}", e); - } - - // Continuous dequeue loop. - if let Err(e) = - Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx) - .await - { - // Exit the dequeue loop cleanly if the database has been closed. - if is_conn_closed_error(&e) { - return; - } - panic!("kv: Error in dequeue_loop: {}", e); - } - }); - - Self { - conn: conn_clone, - dequeue_rx: Rc::new(AsyncRefCell::new(dequeue_rx)), - waker_tx, - shutdown_tx, - concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)), - } - } - - async fn dequeue(&self) -> Result<Option<DequeuedMessage>, AnyError> { - // Wait for the next message to be available from dequeue_rx. - let (payload, id) = { - let mut queue_rx = self.dequeue_rx.borrow_mut().await; - let Some(msg) = queue_rx.recv().await else { - return Ok(None); - }; - msg - }; - - let permit = self.concurrency_limiter.clone().acquire_owned().await?; - - Ok(Some(DequeuedMessage { - conn: self.conn.downgrade(), - id, - payload: Some(payload), - waker_tx: self.waker_tx.clone(), - _permit: permit, - })) - } - - fn shutdown(&self) { - let _ = self.shutdown_tx.send(()); - } - - async fn dequeue_loop( - conn: ProtectedConn, - dequeue_tx: mpsc::Sender<(Vec<u8>, String)>, - mut shutdown_rx: watch::Receiver<()>, - mut waker_rx: broadcast::Receiver<()>, - ) -> Result<(), AnyError> { - loop { - let messages = SqliteDb::run_tx(conn.clone(), move |tx| { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - - let messages = tx - .prepare_cached(STATEMENT_QUEUE_GET_NEXT_READY)? - .query_map([now], |row| { - let ts: u64 = row.get(0)?; - let id: String = row.get(1)?; - let data: Vec<u8> = row.get(2)?; - let backoff_schedule: String = row.get(3)?; - let keys_if_undelivered: String = row.get(4)?; - Ok((ts, id, data, backoff_schedule, keys_if_undelivered)) - })? - .collect::<Result<Vec<_>, rusqlite::Error>>()?; - - for (ts, id, data, backoff_schedule, keys_if_undelivered) in &messages { - let changed = tx - .prepare_cached(STATEMENT_QUEUE_REMOVE_READY)? - .execute(params![id])?; - assert_eq!(changed, 1); - - let changed = - tx.prepare_cached(STATEMENT_QUEUE_ADD_RUNNING)?.execute( - params![ts, id, &data, &backoff_schedule, &keys_if_undelivered], - )?; - assert_eq!(changed, 1); - } - tx.commit()?; - - Ok( - messages - .into_iter() - .map(|(_, id, data, _, _)| (id, data)) - .collect::<Vec<_>>(), - ) - }) - .await?; - - let busy = !messages.is_empty(); - - for (id, data) in messages { - if dequeue_tx.send((data, id)).await.is_err() { - // Queue receiver was dropped. Stop the dequeue loop. - return Ok(()); - } - } - - if !busy { - // There's nothing to dequeue right now; sleep until one of the - // following happens: - // - It's time to dequeue the next message based on its timestamp - // - A new message is added to the queue - // - The database is closed - let sleep_fut = { - match Self::get_earliest_ready_ts(conn.clone()).await? { - Some(ts) => { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - if ts <= now { - continue; - } - tokio::time::sleep(Duration::from_millis(ts - now)).boxed() - } - None => futures::future::pending().boxed(), - } - }; - tokio::select! { - _ = sleep_fut => {} - x = waker_rx.recv() => { - if let Err(RecvError::Closed) = x {return Ok(());} - }, - _ = shutdown_rx.changed() => return Ok(()) - } - } - } - } - - async fn get_earliest_ready_ts( - conn: ProtectedConn, - ) -> Result<Option<u64>, AnyError> { - SqliteDb::run_tx(conn.clone(), move |tx| { - let ts = tx - .prepare_cached(STATEMENT_QUEUE_GET_EARLIEST_READY)? - .query_row([], |row| { - let ts: u64 = row.get(0)?; - Ok(ts) - }) - .optional()?; - Ok(ts) - }) - .await - } - - async fn requeue_inflight_messages( - conn: ProtectedConn, - ) -> Result<(), AnyError> { - loop { - let done = SqliteDb::run_tx(conn.clone(), move |tx| { - let entries = tx - .prepare_cached(STATEMENT_QUEUE_GET_RUNNING)? - .query_map([], |row| { - let id: String = row.get(0)?; - Ok(id) - })? - .collect::<Result<Vec<_>, rusqlite::Error>>()?; - for id in &entries { - Self::requeue_message(id, &tx)?; - } - tx.commit()?; - Ok(entries.is_empty()) - }) - .await?; - if done { - return Ok(()); - } - } - } - - fn requeue_message( - id: &str, - tx: &rusqlite::Transaction<'_>, - ) -> Result<bool, AnyError> { - let Some((_, id, data, backoff_schedule, keys_if_undelivered)) = tx - .prepare_cached(STATEMENT_QUEUE_GET_RUNNING_BY_ID)? - .query_row([id], |row| { - let deadline: u64 = row.get(0)?; - let id: String = row.get(1)?; - let data: Vec<u8> = row.get(2)?; - let backoff_schedule: String = row.get(3)?; - let keys_if_undelivered: String = row.get(4)?; - Ok((deadline, id, data, backoff_schedule, keys_if_undelivered)) - }) - .optional()? - else { - return Ok(false); - }; - - let backoff_schedule = { - let backoff_schedule = - serde_json::from_str::<Option<Vec<u64>>>(&backoff_schedule)?; - backoff_schedule.unwrap_or_default() - }; - - let mut requeued = false; - if !backoff_schedule.is_empty() { - // Requeue based on backoff schedule - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - let new_ts = now + backoff_schedule[0]; - let new_backoff_schedule = serde_json::to_string(&backoff_schedule[1..])?; - let changed = tx - .prepare_cached(STATEMENT_QUEUE_ADD_READY)? - .execute(params![ - new_ts, - id, - &data, - &new_backoff_schedule, - &keys_if_undelivered - ]) - .unwrap(); - assert_eq!(changed, 1); - requeued = true; - } else if !keys_if_undelivered.is_empty() { - // No more requeues. Insert the message into the undelivered queue. - let keys_if_undelivered = - serde_json::from_str::<Vec<Vec<u8>>>(&keys_if_undelivered)?; - - let version: i64 = tx - .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)? - .query_row([], |row| row.get(0))?; - - for key in keys_if_undelivered { - let changed = tx - .prepare_cached(STATEMENT_KV_POINT_SET)? - .execute(params![key, &data, &VALUE_ENCODING_V8, &version, -1i64])?; - assert_eq!(changed, 1); - } - } - - // Remove from running - let changed = tx - .prepare_cached(STATEMENT_QUEUE_REMOVE_RUNNING)? - .execute(params![id])?; - assert_eq!(changed, 1); - - Ok(requeued) - } -} - -async fn watch_expiration(db: ProtectedConn) { - loop { - // Scan for expired keys - let res = SqliteDb::run_tx(db.clone(), move |tx| { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - tx.prepare_cached( - "delete from kv where expiration_ms >= 0 and expiration_ms <= ?", - )? - .execute(params![now])?; - tx.commit()?; - Ok(()) - }) - .await; - if let Err(e) = res { - eprintln!("kv: Error in expiration watcher: {}", e); - } - let sleep_duration = - Duration::from_secs_f64(60.0 + rand::thread_rng().gen_range(0.0..30.0)); - tokio::time::sleep(sleep_duration).await; - } -} - -#[async_trait(?Send)] -impl Database for SqliteDb { - type QMH = DequeuedMessage; - - async fn snapshot_read( - &self, - _state: Rc<RefCell<OpState>>, - requests: Vec<ReadRange>, - _options: SnapshotReadOptions, - ) -> Result<Vec<ReadRangeOutput>, AnyError> { - let requests = Arc::new(requests); - Self::run_tx(self.conn.clone(), move |tx| { - let mut responses = Vec::with_capacity(requests.len()); - for request in &*requests { - let mut stmt = tx.prepare_cached(if request.reverse { - STATEMENT_KV_RANGE_SCAN_REVERSE - } else { - STATEMENT_KV_RANGE_SCAN - })?; - let entries = stmt - .query_map( - ( - request.start.as_slice(), - request.end.as_slice(), - request.limit.get(), - ), - |row| { - let key: Vec<u8> = row.get(0)?; - let value: Vec<u8> = row.get(1)?; - let encoding: i64 = row.get(2)?; - - let value = decode_value(value, encoding); - - let version: i64 = row.get(3)?; - Ok(KvEntry { - key, - value, - versionstamp: version_to_versionstamp(version), - }) - }, - )? - .collect::<Result<Vec<_>, rusqlite::Error>>()?; - responses.push(ReadRangeOutput { entries }); - } - - Ok(responses) - }) - .await - } - - async fn atomic_write( - &self, - state: Rc<RefCell<OpState>>, - write: AtomicWrite, - ) -> Result<Option<CommitResult>, AnyError> { - let write = Arc::new(write); - let (has_enqueues, commit_result) = - Self::run_tx(self.conn.clone(), move |tx| { - for check in &write.checks { - let real_versionstamp = tx - .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)? - .query_row([check.key.as_slice()], |row| row.get(0)) - .optional()? - .map(version_to_versionstamp); - if real_versionstamp != check.versionstamp { - return Ok((false, None)); - } - } - - let version: i64 = tx - .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)? - .query_row([], |row| row.get(0))?; - - for mutation in &write.mutations { - match &mutation.kind { - MutationKind::Set(value) => { - let (value, encoding) = encode_value(value); - let changed = - tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![ - mutation.key, - value, - &encoding, - &version, - mutation - .expire_at - .and_then(|x| i64::try_from(x).ok()) - .unwrap_or(-1i64) - ])?; - assert_eq!(changed, 1) - } - MutationKind::Delete => { - let changed = tx - .prepare_cached(STATEMENT_KV_POINT_DELETE)? - .execute(params![mutation.key])?; - assert!(changed == 0 || changed == 1) - } - MutationKind::Sum(operand) => { - mutate_le64( - &tx, - &mutation.key, - "sum", - operand, - version, - |a, b| a.wrapping_add(b), - )?; + let path = path.clone(); + let default_storage_dir = self.default_storage_dir.clone(); + let (conn, queue_waker_key) = spawn_blocking(move || { + denokv_sqlite::sqlite_retry_loop(|| { + let (conn, queue_waker_key) = + match (path.as_deref(), &default_storage_dir) { + (Some(":memory:"), _) | (None, None) => { + (rusqlite::Connection::open_in_memory()?, None) } - MutationKind::Min(operand) => { - mutate_le64( - &tx, - &mutation.key, - "min", - operand, - version, - |a, b| a.min(b), - )?; + (Some(path), _) => { + let flags = + OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); + let resolved_path = canonicalize_path(&PathBuf::from(path))?; + ( + rusqlite::Connection::open_with_flags(path, flags)?, + Some(resolved_path), + ) } - MutationKind::Max(operand) => { - mutate_le64( - &tx, - &mutation.key, - "max", - operand, - version, - |a, b| a.max(b), - )?; + (None, Some(path)) => { + std::fs::create_dir_all(path)?; + let path = path.join("kv.sqlite3"); + (rusqlite::Connection::open(path.clone())?, Some(path)) } - } - } - - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - - let has_enqueues = !write.enqueues.is_empty(); - for enqueue in &write.enqueues { - let id = Uuid::new_v4().to_string(); - let backoff_schedule = serde_json::to_string( - &enqueue - .backoff_schedule - .as_deref() - .or_else(|| Some(&DEFAULT_BACKOFF_SCHEDULE[..])), - )?; - let keys_if_undelivered = - serde_json::to_string(&enqueue.keys_if_undelivered)?; + }; - let changed = - tx.prepare_cached(STATEMENT_QUEUE_ADD_READY)? - .execute(params![ - now + enqueue.delay_ms, - id, - &enqueue.payload, - &backoff_schedule, - &keys_if_undelivered - ])?; - assert_eq!(changed, 1) - } - - tx.commit()?; - let new_versionstamp = version_to_versionstamp(version); - - Ok(( - has_enqueues, - Some(CommitResult { - versionstamp: new_versionstamp, - }), - )) - }) - .await?; - - if has_enqueues { - match self.queue.get() { - Some(queue) => { - let _ = queue.waker_tx.send(()); - } - None => { - if let Some(waker_key) = &self.queue_waker_key { - let (waker_tx, _) = - shared_queue_waker_channel(waker_key, state.clone()); - let _ = waker_tx.send(()); - } - } - } - } - Ok(commit_result) - } + conn.pragma_update(None, "journal_mode", "wal")?; - async fn dequeue_next_message( - &self, - state: Rc<RefCell<OpState>>, - ) -> Result<Option<Self::QMH>, AnyError> { - let queue = self - .queue - .get_or_init(|| async move { - let (waker_tx, waker_rx) = { - match &self.queue_waker_key { - Some(waker_key) => { - shared_queue_waker_channel(waker_key, state.clone()) - } - None => broadcast::channel(1), - } - }; - SqliteQueue::new(self.conn.clone(), waker_tx, waker_rx) + Ok::<_, AnyError>((conn, queue_waker_key)) }) - .await; - let handle = queue.dequeue().await?; - Ok(handle) - } - - fn close(&self) { - if let Some(queue) = self.queue.get() { - queue.shutdown(); - } - - self.expiration_watcher.abort(); - - // The above `abort()` operation is asynchronous. It's not - // guaranteed that the sqlite connection will be closed immediately. - // So here we synchronously take the conn mutex and drop the connection. - // - // This blocks the event loop if the connection is still being used, - // but ensures correctness - deleting the database file after calling - // the `close` method will always work. - self.conn.conn.lock().unwrap().take(); - } -} - -/// Mutates a LE64 value in the database, defaulting to setting it to the -/// operand if it doesn't exist. -fn mutate_le64( - tx: &Transaction, - key: &[u8], - op_name: &str, - operand: &Value, - new_version: i64, - mutate: impl FnOnce(u64, u64) -> u64, -) -> Result<(), AnyError> { - let Value::U64(operand) = *operand else { - return Err(type_error(format!( - "Failed to perform '{op_name}' mutation on a non-U64 operand" - ))); - }; - - let old_value = tx - .prepare_cached(STATEMENT_KV_POINT_GET_VALUE_ONLY)? - .query_row([key], |row| { - let value: Vec<u8> = row.get(0)?; - let encoding: i64 = row.get(1)?; - - let value = decode_value(value, encoding); - Ok(value) }) - .optional()?; - - let new_value = match old_value { - Some(Value::U64(old_value) ) => mutate(old_value, operand), - Some(_) => return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 value in the database"))), - None => operand, - }; - - let new_value = Value::U64(new_value); - let (new_value, encoding) = encode_value(&new_value); - - let changed = tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![ - key, - &new_value[..], - encoding, - new_version, - -1i64, - ])?; - assert_eq!(changed, 1); - - Ok(()) -} - -fn version_to_versionstamp(version: i64) -> [u8; 10] { - let mut versionstamp = [0; 10]; - versionstamp[..8].copy_from_slice(&version.to_be_bytes()); - versionstamp -} + .await + .unwrap()?; -const VALUE_ENCODING_V8: i64 = 1; -const VALUE_ENCODING_LE64: i64 = 2; -const VALUE_ENCODING_BYTES: i64 = 3; + let dequeue_notify = if let Some(queue_waker_key) = queue_waker_key { + QUEUE_WAKER_MAP + .get_or_init(Default::default) + .lock() + .unwrap() + .entry(queue_waker_key) + .or_default() + .clone() + } else { + Arc::new(Notify::new()) + }; -fn decode_value(value: Vec<u8>, encoding: i64) -> crate::Value { - match encoding { - VALUE_ENCODING_V8 => crate::Value::V8(value), - VALUE_ENCODING_BYTES => crate::Value::Bytes(value), - VALUE_ENCODING_LE64 => { - let mut buf = [0; 8]; - buf.copy_from_slice(&value); - crate::Value::U64(u64::from_le_bytes(buf)) - } - _ => todo!(), - } -} + let versionstamp_rng: Box<dyn RngCore + Send> = + match &self.versionstamp_rng_seed { + Some(seed) => Box::new(rand::rngs::StdRng::seed_from_u64(*seed)), + None => Box::new(rand::rngs::StdRng::from_entropy()), + }; -fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) { - match value { - crate::Value::V8(value) => (Cow::Borrowed(value), VALUE_ENCODING_V8), - crate::Value::Bytes(value) => (Cow::Borrowed(value), VALUE_ENCODING_BYTES), - crate::Value::U64(value) => { - let mut buf = [0; 8]; - buf.copy_from_slice(&value.to_le_bytes()); - (Cow::Owned(buf.to_vec()), VALUE_ENCODING_LE64) - } + denokv_sqlite::Sqlite::new(conn, dequeue_notify, versionstamp_rng) } } -pub struct QueueWaker { - wakers_tx: HashMap<PathBuf, broadcast::Sender<()>>, -} - -fn shared_queue_waker_channel( - waker_key: &Path, - state: Rc<RefCell<OpState>>, -) -> (broadcast::Sender<()>, broadcast::Receiver<()>) { - let mut state = state.borrow_mut(); - let waker = { - let waker = state.try_borrow_mut::<QueueWaker>(); - match waker { - Some(waker) => waker, - None => { - let waker = QueueWaker { - wakers_tx: HashMap::new(), - }; - state.put::<QueueWaker>(waker); - state.borrow_mut::<QueueWaker>() - } - } - }; - - let waker_tx = waker - .wakers_tx - .entry(waker_key.to_path_buf()) - .or_insert_with(|| { - let (waker_tx, _) = broadcast::channel(1); - waker_tx - }); - - (waker_tx.clone(), waker_tx.subscribe()) -} - /// Same as Path::canonicalize, but also handles non-existing paths. fn canonicalize_path(path: &Path) -> Result<PathBuf, AnyError> { let path = path.to_path_buf().clean(); @@ -1106,8 +166,3 @@ fn canonicalize_path(path: &Path) -> Result<PathBuf, AnyError> { } } } - -fn is_conn_closed_error(e: &AnyError) -> bool { - get_custom_error_class(e) == Some("TypeError") - && e.to_string() == ERROR_USING_CLOSED_DATABASE -} |