summaryrefslogtreecommitdiff
path: root/ext/kv/sqlite.rs
diff options
context:
space:
mode:
authorHeyang Zhou <zhy20000919@hotmail.com>2023-08-17 18:53:55 +0800
committerGitHub <noreply@github.com>2023-08-17 18:53:55 +0800
commit0960e895da1275792c1f38999f6a185c864edb3f (patch)
tree0b12abf31b0177c6c30d12ddc9f2aab1c8052d5c /ext/kv/sqlite.rs
parentec63b36994527d0969d8083a6c4be8cd325c7473 (diff)
fix(ext/kv): retry transaction on `SQLITE_BUSY` errors (#20189)
Properly handle the `SQLITE_BUSY` error code by retrying the transaction. Also wraps database initialization logic in a transaction to protect against incomplete/concurrent initializations. Fixes https://github.com/denoland/deno/issues/20116.
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r--ext/kv/sqlite.rs128
1 files changed, 92 insertions, 36 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index 808bf9b4f..aea438d2d 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -3,6 +3,7 @@
use std::borrow::Cow;
use std::cell::Cell;
use std::cell::RefCell;
+use std::future::Future;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
@@ -21,6 +22,7 @@ use deno_core::task::spawn;
use deno_core::task::spawn_blocking;
use deno_core::AsyncRefCell;
use deno_core::OpState;
+use rand::Rng;
use rusqlite::params;
use rusqlite::OpenFlags;
use rusqlite::OptionalExtension;
@@ -165,28 +167,41 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
}
}
- let default_storage_dir = self.default_storage_dir.clone();
- let conn = spawn_blocking(move || {
- let conn = match (path.as_deref(), &default_storage_dir) {
- (Some(":memory:"), _) | (None, None) => {
- rusqlite::Connection::open_in_memory()?
- }
- (Some(path), _) => {
- let flags =
- OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
- rusqlite::Connection::open_with_flags(path, flags)?
- }
- (None, Some(path)) => {
- std::fs::create_dir_all(path)?;
- let path = path.join("kv.sqlite3");
- rusqlite::Connection::open(&path)?
- }
- };
+ let conn = sqlite_retry_loop(|| {
+ let path = path.clone();
+ let default_storage_dir = self.default_storage_dir.clone();
+ async move {
+ spawn_blocking(move || {
+ let conn = match (path.as_deref(), &default_storage_dir) {
+ (Some(":memory:"), _) | (None, None) => {
+ rusqlite::Connection::open_in_memory()?
+ }
+ (Some(path), _) => {
+ let flags =
+ OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
+ rusqlite::Connection::open_with_flags(path, flags)?
+ }
+ (None, Some(path)) => {
+ std::fs::create_dir_all(path)?;
+ let path = path.join("kv.sqlite3");
+ rusqlite::Connection::open(path)?
+ }
+ };
- conn.pragma_update(None, "journal_mode", "wal")?;
- conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
+ conn.pragma_update(None, "journal_mode", "wal")?;
+
+ Ok::<_, AnyError>(conn)
+ })
+ .await
+ .unwrap()
+ }
+ })
+ .await?;
+ let conn = Rc::new(AsyncRefCell::new(Cell::new(Some(conn))));
+ SqliteDb::run_tx(conn.clone(), |tx| {
+ tx.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
- let current_version: usize = conn
+ let current_version: usize = tx
.query_row(
"select version from migration_state where k = 0",
[],
@@ -198,21 +213,22 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
for (i, migration) in MIGRATIONS.iter().enumerate() {
let version = i + 1;
if version > current_version {
- conn.execute_batch(migration)?;
- conn.execute(
+ tx.execute_batch(migration)?;
+ tx.execute(
"replace into migration_state (k, version) values(?, ?)",
[&0, &version],
)?;
}
}
- Ok::<_, AnyError>(conn)
+ tx.commit()?;
+
+ Ok(())
})
- .await
- .unwrap()?;
+ .await?;
Ok(SqliteDb {
- conn: Rc::new(AsyncRefCell::new(Cell::new(Some(conn)))),
+ conn,
queue: OnceCell::new(),
})
}
@@ -223,6 +239,29 @@ pub struct SqliteDb {
queue: OnceCell<SqliteQueue>,
}
+async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>(
+ mut f: impl FnMut() -> Fut,
+) -> Result<R, AnyError> {
+ loop {
+ match f().await {
+ Ok(x) => return Ok(x),
+ Err(e) => {
+ if let Some(x) = e.downcast_ref::<rusqlite::Error>() {
+ if x.sqlite_error_code() == Some(rusqlite::ErrorCode::DatabaseBusy) {
+ log::debug!("kv: Database is busy, retrying");
+ tokio::time::sleep(Duration::from_millis(
+ rand::thread_rng().gen_range(5..20),
+ ))
+ .await;
+ continue;
+ }
+ }
+ return Err(e);
+ }
+ }
+ }
+}
+
impl SqliteDb {
async fn run_tx<F, R>(
conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
@@ -230,6 +269,20 @@ impl SqliteDb {
) -> Result<R, AnyError>
where
F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>)
+ + Clone
+ + Send
+ + 'static,
+ R: Send + 'static,
+ {
+ sqlite_retry_loop(|| Self::run_tx_inner(conn.clone(), f.clone())).await
+ }
+
+ async fn run_tx_inner<F, R>(
+ conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
+ f: F,
+ ) -> Result<R, AnyError>
+ where
+ F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>)
+ Send
+ 'static,
R: Send + 'static,
@@ -579,9 +632,10 @@ impl Database for SqliteDb {
requests: Vec<ReadRange>,
_options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ let requests = Arc::new(requests);
Self::run_tx(self.conn.clone(), move |tx| {
let mut responses = Vec::with_capacity(requests.len());
- for request in requests {
+ for request in &*requests {
let mut stmt = tx.prepare_cached(if request.reverse {
STATEMENT_KV_RANGE_SCAN_REVERSE
} else {
@@ -622,9 +676,10 @@ impl Database for SqliteDb {
&self,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
+ let write = Arc::new(write);
let (has_enqueues, commit_result) =
Self::run_tx(self.conn.clone(), move |tx| {
- for check in write.checks {
+ for check in &write.checks {
let real_versionstamp = tx
.prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)?
.query_row([check.key.as_slice()], |row| row.get(0))
@@ -639,10 +694,10 @@ impl Database for SqliteDb {
.prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
.query_row([], |row| row.get(0))?;
- for mutation in write.mutations {
- match mutation.kind {
+ for mutation in &write.mutations {
+ match &mutation.kind {
MutationKind::Set(value) => {
- let (value, encoding) = encode_value(&value);
+ let (value, encoding) = encode_value(value);
let changed = tx
.prepare_cached(STATEMENT_KV_POINT_SET)?
.execute(params![mutation.key, &value, &encoding, &version])?;
@@ -659,7 +714,7 @@ impl Database for SqliteDb {
&tx,
&mutation.key,
"sum",
- &operand,
+ operand,
version,
|a, b| a.wrapping_add(b),
)?;
@@ -669,7 +724,7 @@ impl Database for SqliteDb {
&tx,
&mutation.key,
"min",
- &operand,
+ operand,
version,
|a, b| a.min(b),
)?;
@@ -679,7 +734,7 @@ impl Database for SqliteDb {
&tx,
&mutation.key,
"max",
- &operand,
+ operand,
version,
|a, b| a.max(b),
)?;
@@ -693,12 +748,13 @@ impl Database for SqliteDb {
.as_millis() as u64;
let has_enqueues = !write.enqueues.is_empty();
- for enqueue in write.enqueues {
+ for enqueue in &write.enqueues {
let id = Uuid::new_v4().to_string();
let backoff_schedule = serde_json::to_string(
&enqueue
.backoff_schedule
- .or_else(|| Some(DEFAULT_BACKOFF_SCHEDULE.to_vec())),
+ .as_deref()
+ .or_else(|| Some(&DEFAULT_BACKOFF_SCHEDULE[..])),
)?;
let keys_if_undelivered =
serde_json::to_string(&enqueue.keys_if_undelivered)?;