summaryrefslogtreecommitdiff
path: root/ext/kv/sqlite.rs
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-06-02 11:12:26 -0700
committerGitHub <noreply@github.com>2023-06-02 11:12:26 -0700
commitce5bf9fb2a52fa337afb5f54ec2553eb4d411fd2 (patch)
tree7a5863e0612707ca451e443ead5696194245158a /ext/kv/sqlite.rs
parent98320ff1f88e6b6ee1d85d64e99519986f6a7239 (diff)
fix(kv) run sqlite transactions via spawn_blocking (#19350)
`rusqlite` does not support async operations; with this PR SQLite operations will run through `spawn_blocking` to ensure that the event loop does not get blocked. There is still only a single SQLite connection. So all operations will do an async wait on the connection. In the future we can add a connection pool if needed.
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r--ext/kv/sqlite.rs321
1 files changed, 193 insertions, 128 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index 63be1281b..80d230ab1 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::borrow::Cow;
+use std::cell::Cell;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::path::Path;
@@ -10,6 +11,8 @@ use std::rc::Rc;
use async_trait::async_trait;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::task::spawn_blocking;
+use deno_core::AsyncRefCell;
use deno_core::OpState;
use rusqlite::params;
use rusqlite::OpenFlags;
@@ -112,11 +115,9 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Self::DB, AnyError> {
- let conn = match (path.as_deref(), &self.default_storage_dir) {
- (Some(":memory:"), _) | (None, None) => {
- rusqlite::Connection::open_in_memory()?
- }
- (Some(path), _) => {
+ // Validate path
+ if let Some(path) = &path {
+ if path != ":memory:" {
if path.is_empty() {
return Err(type_error("Filename cannot be empty"));
}
@@ -132,44 +133,92 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
permissions.check_read(path, "Deno.openKv")?;
permissions.check_write(path, "Deno.openKv")?;
}
- let flags = OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
- rusqlite::Connection::open_with_flags(path, flags)?
- }
- (None, Some(path)) => {
- std::fs::create_dir_all(path)?;
- let path = path.join("kv.sqlite3");
- rusqlite::Connection::open(&path)?
- }
- };
-
- conn.pragma_update(None, "journal_mode", "wal")?;
- conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
-
- let current_version: usize = conn
- .query_row(
- "select version from migration_state where k = 0",
- [],
- |row| row.get(0),
- )
- .optional()?
- .unwrap_or(0);
-
- for (i, migration) in MIGRATIONS.iter().enumerate() {
- let version = i + 1;
- if version > current_version {
- conn.execute_batch(migration)?;
- conn.execute(
- "replace into migration_state (k, version) values(?, ?)",
- [&0, &version],
- )?;
}
}
- Ok(SqliteDb(RefCell::new(conn)))
+ let default_storage_dir = self.default_storage_dir.clone();
+ let conn = spawn_blocking(move || {
+ let conn = match (path.as_deref(), &default_storage_dir) {
+ (Some(":memory:"), _) | (None, None) => {
+ rusqlite::Connection::open_in_memory()?
+ }
+ (Some(path), _) => {
+ let flags =
+ OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
+ rusqlite::Connection::open_with_flags(path, flags)?
+ }
+ (None, Some(path)) => {
+ std::fs::create_dir_all(path)?;
+ let path = path.join("kv.sqlite3");
+ rusqlite::Connection::open(&path)?
+ }
+ };
+
+ conn.pragma_update(None, "journal_mode", "wal")?;
+ conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
+
+ let current_version: usize = conn
+ .query_row(
+ "select version from migration_state where k = 0",
+ [],
+ |row| row.get(0),
+ )
+ .optional()?
+ .unwrap_or(0);
+
+ for (i, migration) in MIGRATIONS.iter().enumerate() {
+ let version = i + 1;
+ if version > current_version {
+ conn.execute_batch(migration)?;
+ conn.execute(
+ "replace into migration_state (k, version) values(?, ?)",
+ [&0, &version],
+ )?;
+ }
+ }
+
+ Ok::<_, AnyError>(conn)
+ })
+ .await
+ .unwrap()?;
+
+ Ok(SqliteDb(Rc::new(AsyncRefCell::new(Cell::new(Some(conn))))))
}
}
-pub struct SqliteDb(RefCell<rusqlite::Connection>);
+pub struct SqliteDb(Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>);
+
+impl SqliteDb {
+ async fn run_tx<F, R>(&self, f: F) -> Result<R, AnyError>
+ where
+ F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>)
+ + Send
+ + 'static,
+ R: Send + 'static,
+ {
+ // Transactions need exclusive access to the connection. Wait until
+ // we can borrow_mut the connection.
+ let cell = self.0.borrow_mut().await;
+
+ // Take the db out of the cell and run the transaction via spawn_blocking.
+ let mut db = cell.take().unwrap();
+ let (result, db) = spawn_blocking(move || {
+ let result = {
+ match db.transaction() {
+ Ok(tx) => f(tx),
+ Err(e) => Err(e.into()),
+ }
+ };
+ (result, db)
+ })
+ .await
+ .unwrap();
+
+ // Put the db back into the cell.
+ cell.set(Some(db));
+ result
+ }
+}
#[async_trait(?Send)]
impl Database for SqliteDb {
@@ -178,110 +227,126 @@ impl Database for SqliteDb {
requests: Vec<ReadRange>,
_options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
- let mut responses = Vec::with_capacity(requests.len());
- let mut db = self.0.borrow_mut();
- let tx = db.transaction()?;
-
- for request in requests {
- let mut stmt = tx.prepare_cached(if request.reverse {
- STATEMENT_KV_RANGE_SCAN_REVERSE
- } else {
- STATEMENT_KV_RANGE_SCAN
- })?;
- let entries = stmt
- .query_map(
- (
- request.start.as_slice(),
- request.end.as_slice(),
- request.limit.get(),
- ),
- |row| {
- let key: Vec<u8> = row.get(0)?;
- let value: Vec<u8> = row.get(1)?;
- let encoding: i64 = row.get(2)?;
-
- let value = decode_value(value, encoding);
-
- let version: i64 = row.get(3)?;
- Ok(KvEntry {
- key,
- value,
- versionstamp: version_to_versionstamp(version),
- })
- },
- )?
- .collect::<Result<Vec<_>, rusqlite::Error>>()?;
- responses.push(ReadRangeOutput { entries });
- }
+ self
+ .run_tx(move |tx| {
+ let mut responses = Vec::with_capacity(requests.len());
+ for request in requests {
+ let mut stmt = tx.prepare_cached(if request.reverse {
+ STATEMENT_KV_RANGE_SCAN_REVERSE
+ } else {
+ STATEMENT_KV_RANGE_SCAN
+ })?;
+ let entries = stmt
+ .query_map(
+ (
+ request.start.as_slice(),
+ request.end.as_slice(),
+ request.limit.get(),
+ ),
+ |row| {
+ let key: Vec<u8> = row.get(0)?;
+ let value: Vec<u8> = row.get(1)?;
+ let encoding: i64 = row.get(2)?;
+
+ let value = decode_value(value, encoding);
+
+ let version: i64 = row.get(3)?;
+ Ok(KvEntry {
+ key,
+ value,
+ versionstamp: version_to_versionstamp(version),
+ })
+ },
+ )?
+ .collect::<Result<Vec<_>, rusqlite::Error>>()?;
+ responses.push(ReadRangeOutput { entries });
+ }
- Ok(responses)
+ Ok(responses)
+ })
+ .await
}
async fn atomic_write(
&self,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
- let mut db = self.0.borrow_mut();
-
- let tx = db.transaction()?;
-
- for check in write.checks {
- let real_versionstamp = tx
- .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)?
- .query_row([check.key.as_slice()], |row| row.get(0))
- .optional()?
- .map(version_to_versionstamp);
- if real_versionstamp != check.versionstamp {
- return Ok(None);
- }
- }
-
- let version: i64 = tx
- .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
- .query_row([], |row| row.get(0))?;
-
- for mutation in write.mutations {
- match mutation.kind {
- MutationKind::Set(value) => {
- let (value, encoding) = encode_value(&value);
- let changed = tx
- .prepare_cached(STATEMENT_KV_POINT_SET)?
- .execute(params![mutation.key, &value, &encoding, &version])?;
- assert_eq!(changed, 1)
+ self
+ .run_tx(move |tx| {
+ for check in write.checks {
+ let real_versionstamp = tx
+ .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)?
+ .query_row([check.key.as_slice()], |row| row.get(0))
+ .optional()?
+ .map(version_to_versionstamp);
+ if real_versionstamp != check.versionstamp {
+ return Ok(None);
+ }
}
- MutationKind::Delete => {
- let changed = tx
- .prepare_cached(STATEMENT_KV_POINT_DELETE)?
- .execute(params![mutation.key])?;
- assert!(changed == 0 || changed == 1)
- }
- MutationKind::Sum(operand) => {
- mutate_le64(&tx, &mutation.key, "sum", &operand, version, |a, b| {
- a.wrapping_add(b)
- })?;
- }
- MutationKind::Min(operand) => {
- mutate_le64(&tx, &mutation.key, "min", &operand, version, |a, b| {
- a.min(b)
- })?;
- }
- MutationKind::Max(operand) => {
- mutate_le64(&tx, &mutation.key, "max", &operand, version, |a, b| {
- a.max(b)
- })?;
+
+ let version: i64 = tx
+ .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
+ .query_row([], |row| row.get(0))?;
+
+ for mutation in write.mutations {
+ match mutation.kind {
+ MutationKind::Set(value) => {
+ let (value, encoding) = encode_value(&value);
+ let changed = tx
+ .prepare_cached(STATEMENT_KV_POINT_SET)?
+ .execute(params![mutation.key, &value, &encoding, &version])?;
+ assert_eq!(changed, 1)
+ }
+ MutationKind::Delete => {
+ let changed = tx
+ .prepare_cached(STATEMENT_KV_POINT_DELETE)?
+ .execute(params![mutation.key])?;
+ assert!(changed == 0 || changed == 1)
+ }
+ MutationKind::Sum(operand) => {
+ mutate_le64(
+ &tx,
+ &mutation.key,
+ "sum",
+ &operand,
+ version,
+ |a, b| a.wrapping_add(b),
+ )?;
+ }
+ MutationKind::Min(operand) => {
+ mutate_le64(
+ &tx,
+ &mutation.key,
+ "min",
+ &operand,
+ version,
+ |a, b| a.min(b),
+ )?;
+ }
+ MutationKind::Max(operand) => {
+ mutate_le64(
+ &tx,
+ &mutation.key,
+ "max",
+ &operand,
+ version,
+ |a, b| a.max(b),
+ )?;
+ }
+ }
}
- }
- }
- // TODO(@losfair): enqueues
+ // TODO(@losfair): enqueues
- tx.commit()?;
+ tx.commit()?;
- let new_vesionstamp = version_to_versionstamp(version);
+ let new_vesionstamp = version_to_versionstamp(version);
- Ok(Some(CommitResult {
- versionstamp: new_vesionstamp,
- }))
+ Ok(Some(CommitResult {
+ versionstamp: new_vesionstamp,
+ }))
+ })
+ .await
}
}