diff options
author | Heyang Zhou <zhy20000919@hotmail.com> | 2023-08-17 18:53:55 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-17 18:53:55 +0800 |
commit | 0960e895da1275792c1f38999f6a185c864edb3f (patch) | |
tree | 0b12abf31b0177c6c30d12ddc9f2aab1c8052d5c /ext/kv/sqlite.rs | |
parent | ec63b36994527d0969d8083a6c4be8cd325c7473 (diff) |
fix(ext/kv): retry transaction on `SQLITE_BUSY` errors (#20189)
Properly handle the `SQLITE_BUSY` error code by retrying the
transaction.
Also wraps database initialization logic in a transaction to protect
against incomplete/concurrent initializations.
Fixes https://github.com/denoland/deno/issues/20116.
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r-- | ext/kv/sqlite.rs | 128 |
1 files changed, 92 insertions, 36 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index 808bf9b4f..aea438d2d 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use std::cell::Cell; use std::cell::RefCell; +use std::future::Future; use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; @@ -21,6 +22,7 @@ use deno_core::task::spawn; use deno_core::task::spawn_blocking; use deno_core::AsyncRefCell; use deno_core::OpState; +use rand::Rng; use rusqlite::params; use rusqlite::OpenFlags; use rusqlite::OptionalExtension; @@ -165,28 +167,41 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { } } - let default_storage_dir = self.default_storage_dir.clone(); - let conn = spawn_blocking(move || { - let conn = match (path.as_deref(), &default_storage_dir) { - (Some(":memory:"), _) | (None, None) => { - rusqlite::Connection::open_in_memory()? - } - (Some(path), _) => { - let flags = - OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); - rusqlite::Connection::open_with_flags(path, flags)? - } - (None, Some(path)) => { - std::fs::create_dir_all(path)?; - let path = path.join("kv.sqlite3"); - rusqlite::Connection::open(&path)? - } - }; + let conn = sqlite_retry_loop(|| { + let path = path.clone(); + let default_storage_dir = self.default_storage_dir.clone(); + async move { + spawn_blocking(move || { + let conn = match (path.as_deref(), &default_storage_dir) { + (Some(":memory:"), _) | (None, None) => { + rusqlite::Connection::open_in_memory()? + } + (Some(path), _) => { + let flags = + OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); + rusqlite::Connection::open_with_flags(path, flags)? + } + (None, Some(path)) => { + std::fs::create_dir_all(path)?; + let path = path.join("kv.sqlite3"); + rusqlite::Connection::open(path)? + } + }; - conn.pragma_update(None, "journal_mode", "wal")?; - conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?; + conn.pragma_update(None, "journal_mode", "wal")?; + + Ok::<_, AnyError>(conn) + }) + .await + .unwrap() + } + }) + .await?; + let conn = Rc::new(AsyncRefCell::new(Cell::new(Some(conn)))); + SqliteDb::run_tx(conn.clone(), |tx| { + tx.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?; - let current_version: usize = conn + let current_version: usize = tx .query_row( "select version from migration_state where k = 0", [], @@ -198,21 +213,22 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { for (i, migration) in MIGRATIONS.iter().enumerate() { let version = i + 1; if version > current_version { - conn.execute_batch(migration)?; - conn.execute( + tx.execute_batch(migration)?; + tx.execute( "replace into migration_state (k, version) values(?, ?)", [&0, &version], )?; } } - Ok::<_, AnyError>(conn) + tx.commit()?; + + Ok(()) }) - .await - .unwrap()?; + .await?; Ok(SqliteDb { - conn: Rc::new(AsyncRefCell::new(Cell::new(Some(conn)))), + conn, queue: OnceCell::new(), }) } @@ -223,6 +239,29 @@ pub struct SqliteDb { queue: OnceCell<SqliteQueue>, } +async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>( + mut f: impl FnMut() -> Fut, +) -> Result<R, AnyError> { + loop { + match f().await { + Ok(x) => return Ok(x), + Err(e) => { + if let Some(x) = e.downcast_ref::<rusqlite::Error>() { + if x.sqlite_error_code() == Some(rusqlite::ErrorCode::DatabaseBusy) { + log::debug!("kv: Database is busy, retrying"); + tokio::time::sleep(Duration::from_millis( + rand::thread_rng().gen_range(5..20), + )) + .await; + continue; + } + } + return Err(e); + } + } + } +} + impl SqliteDb { async fn run_tx<F, R>( conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, @@ -230,6 +269,20 @@ impl SqliteDb { ) -> Result<R, AnyError> where F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>) + + Clone + + Send + + 'static, + R: Send + 'static, + { + sqlite_retry_loop(|| Self::run_tx_inner(conn.clone(), f.clone())).await + } + + async fn run_tx_inner<F, R>( + conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + f: F, + ) -> Result<R, AnyError> + where + F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>) + Send + 'static, R: Send + 'static, @@ -579,9 +632,10 @@ impl Database for SqliteDb { requests: Vec<ReadRange>, _options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError> { + let requests = Arc::new(requests); Self::run_tx(self.conn.clone(), move |tx| { let mut responses = Vec::with_capacity(requests.len()); - for request in requests { + for request in &*requests { let mut stmt = tx.prepare_cached(if request.reverse { STATEMENT_KV_RANGE_SCAN_REVERSE } else { @@ -622,9 +676,10 @@ impl Database for SqliteDb { &self, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError> { + let write = Arc::new(write); let (has_enqueues, commit_result) = Self::run_tx(self.conn.clone(), move |tx| { - for check in write.checks { + for check in &write.checks { let real_versionstamp = tx .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)? .query_row([check.key.as_slice()], |row| row.get(0)) @@ -639,10 +694,10 @@ impl Database for SqliteDb { .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)? .query_row([], |row| row.get(0))?; - for mutation in write.mutations { - match mutation.kind { + for mutation in &write.mutations { + match &mutation.kind { MutationKind::Set(value) => { - let (value, encoding) = encode_value(&value); + let (value, encoding) = encode_value(value); let changed = tx .prepare_cached(STATEMENT_KV_POINT_SET)? .execute(params![mutation.key, &value, &encoding, &version])?; @@ -659,7 +714,7 @@ impl Database for SqliteDb { &tx, &mutation.key, "sum", - &operand, + operand, version, |a, b| a.wrapping_add(b), )?; @@ -669,7 +724,7 @@ impl Database for SqliteDb { &tx, &mutation.key, "min", - &operand, + operand, version, |a, b| a.min(b), )?; @@ -679,7 +734,7 @@ impl Database for SqliteDb { &tx, &mutation.key, "max", - &operand, + operand, version, |a, b| a.max(b), )?; @@ -693,12 +748,13 @@ impl Database for SqliteDb { .as_millis() as u64; let has_enqueues = !write.enqueues.is_empty(); - for enqueue in write.enqueues { + for enqueue in &write.enqueues { let id = Uuid::new_v4().to_string(); let backoff_schedule = serde_json::to_string( &enqueue .backoff_schedule - .or_else(|| Some(DEFAULT_BACKOFF_SCHEDULE.to_vec())), + .as_deref() + .or_else(|| Some(&DEFAULT_BACKOFF_SCHEDULE[..])), )?; let keys_if_undelivered = serde_json::to_string(&enqueue.keys_if_undelivered)?; |