summaryrefslogtreecommitdiff
path: root/ext/kv/sqlite.rs
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-09-29 11:40:36 -0700
committerGitHub <noreply@github.com>2023-09-29 11:40:36 -0700
commit61b91e10ad41e6d207d60113a2f6f2b63a706940 (patch)
tree89e448051b6f4939c1bfa9bb047d94da2970bcbe /ext/kv/sqlite.rs
parent5edd102f3f912a53c7bcad3b0fa4feb672ada323 (diff)
fix(ext/kv): send queue wake messages accross different kv instances (#20465)
fixes #20454 Current KV queues implementation assumes that `enqueue` and `listenQueue` are called on the same instance of `Deno.Kv`. It's possible that the same Deno process opens multiple KV instances pointing to the same fs path, and in that case `listenQueue` should still get notified of messages enqueued through a different KV instance.
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r--ext/kv/sqlite.rs163
1 files changed, 129 insertions, 34 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index 192141e27..327091f05 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -2,7 +2,10 @@
use std::borrow::Cow;
use std::cell::RefCell;
+use std::collections::HashMap;
+use std::env::current_dir;
use std::future::Future;
+use std::io::ErrorKind;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
@@ -23,11 +26,14 @@ use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
use deno_core::AsyncRefCell;
use deno_core::OpState;
+use deno_node::PathClean;
use rand::Rng;
use rusqlite::params;
use rusqlite::OpenFlags;
use rusqlite::OptionalExtension;
use rusqlite::Transaction;
+use tokio::sync::broadcast;
+use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::sync::OnceCell;
@@ -212,30 +218,35 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
}
}
- let conn = sqlite_retry_loop(|| {
+ let (conn, queue_waker_key) = sqlite_retry_loop(|| {
let path = path.clone();
let default_storage_dir = self.default_storage_dir.clone();
async move {
spawn_blocking(move || {
- let conn = match (path.as_deref(), &default_storage_dir) {
- (Some(":memory:"), _) | (None, None) => {
- rusqlite::Connection::open_in_memory()?
- }
- (Some(path), _) => {
- let flags =
- OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
- rusqlite::Connection::open_with_flags(path, flags)?
- }
- (None, Some(path)) => {
- std::fs::create_dir_all(path)?;
- let path = path.join("kv.sqlite3");
- rusqlite::Connection::open(path)?
- }
- };
+ let (conn, queue_waker_key) =
+ match (path.as_deref(), &default_storage_dir) {
+ (Some(":memory:"), _) | (None, None) => {
+ (rusqlite::Connection::open_in_memory()?, None)
+ }
+ (Some(path), _) => {
+ let flags =
+ OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
+ let resolved_path = canonicalize_path(&PathBuf::from(path))?;
+ (
+ rusqlite::Connection::open_with_flags(path, flags)?,
+ Some(resolved_path),
+ )
+ }
+ (None, Some(path)) => {
+ std::fs::create_dir_all(path)?;
+ let path = path.join("kv.sqlite3");
+ (rusqlite::Connection::open(path.clone())?, Some(path))
+ }
+ };
conn.pragma_update(None, "journal_mode", "wal")?;
- Ok::<_, AnyError>(conn)
+ Ok::<_, AnyError>((conn, queue_waker_key))
})
.await
.unwrap()
@@ -277,6 +288,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
Ok(SqliteDb {
conn,
queue: OnceCell::new(),
+ queue_waker_key,
expiration_watcher,
})
}
@@ -285,6 +297,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
pub struct SqliteDb {
conn: ProtectedConn,
queue: OnceCell<SqliteQueue>,
+ queue_waker_key: Option<PathBuf>,
expiration_watcher: deno_core::unsync::JoinHandle<()>,
}
@@ -363,7 +376,7 @@ pub struct DequeuedMessage {
conn: WeakProtectedConn,
id: String,
payload: Option<Vec<u8>>,
- waker_tx: mpsc::Sender<()>,
+ waker_tx: broadcast::Sender<()>,
_permit: OwnedSemaphorePermit,
}
@@ -403,7 +416,7 @@ impl QueueMessageHandle for DequeuedMessage {
};
if requeued {
// If the message was requeued, wake up the dequeue loop.
- self.waker_tx.send(()).await?;
+ let _ = self.waker_tx.send(());
}
Ok(())
}
@@ -422,15 +435,18 @@ struct SqliteQueue {
conn: ProtectedConn,
dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>,
concurrency_limiter: Arc<Semaphore>,
- waker_tx: mpsc::Sender<()>,
+ waker_tx: broadcast::Sender<()>,
shutdown_tx: watch::Sender<()>,
}
impl SqliteQueue {
- fn new(conn: ProtectedConn) -> Self {
+ fn new(
+ conn: ProtectedConn,
+ waker_tx: broadcast::Sender<()>,
+ waker_rx: broadcast::Receiver<()>,
+ ) -> Self {
let conn_clone = conn.clone();
let (shutdown_tx, shutdown_rx) = watch::channel::<()>(());
- let (waker_tx, waker_rx) = mpsc::channel::<()>(1);
let (dequeue_tx, dequeue_rx) = mpsc::channel::<(Vec<u8>, String)>(64);
spawn(async move {
@@ -486,11 +502,6 @@ impl SqliteQueue {
}))
}
- async fn wake(&self) -> Result<(), AnyError> {
- self.waker_tx.send(()).await?;
- Ok(())
- }
-
fn shutdown(&self) {
let _ = self.shutdown_tx.send(());
}
@@ -499,7 +510,7 @@ impl SqliteQueue {
conn: ProtectedConn,
dequeue_tx: mpsc::Sender<(Vec<u8>, String)>,
mut shutdown_rx: watch::Receiver<()>,
- mut waker_rx: mpsc::Receiver<()>,
+ mut waker_rx: broadcast::Receiver<()>,
) -> Result<(), AnyError> {
loop {
let messages = SqliteDb::run_tx(conn.clone(), move |tx| {
@@ -575,7 +586,9 @@ impl SqliteQueue {
};
tokio::select! {
_ = sleep_fut => {}
- x = waker_rx.recv() => if x.is_none() {return Ok(());},
+ x = waker_rx.recv() => {
+ if let Err(RecvError::Closed) = x {return Ok(());}
+ },
_ = shutdown_rx.changed() => return Ok(())
}
}
@@ -773,7 +786,7 @@ impl Database for SqliteDb {
async fn atomic_write(
&self,
- _state: Rc<RefCell<OpState>>,
+ state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
let write = Arc::new(write);
@@ -892,8 +905,17 @@ impl Database for SqliteDb {
.await?;
if has_enqueues {
- if let Some(queue) = self.queue.get() {
- queue.wake().await?;
+ match self.queue.get() {
+ Some(queue) => {
+ let _ = queue.waker_tx.send(());
+ }
+ None => {
+ if let Some(waker_key) = &self.queue_waker_key {
+ let (waker_tx, _) =
+ shared_queue_waker_channel(waker_key, state.clone());
+ let _ = waker_tx.send(());
+ }
+ }
}
}
Ok(commit_result)
@@ -901,11 +923,21 @@ impl Database for SqliteDb {
async fn dequeue_next_message(
&self,
- _state: Rc<RefCell<OpState>>,
+ state: Rc<RefCell<OpState>>,
) -> Result<Option<Self::QMH>, AnyError> {
let queue = self
.queue
- .get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) })
+ .get_or_init(|| async move {
+ let (waker_tx, waker_rx) = {
+ match &self.queue_waker_key {
+ Some(waker_key) => {
+ shared_queue_waker_channel(waker_key, state.clone())
+ }
+ None => broadcast::channel(1),
+ }
+ };
+ SqliteQueue::new(self.conn.clone(), waker_tx, waker_rx)
+ })
.await;
let handle = queue.dequeue().await?;
Ok(handle)
@@ -1012,6 +1044,69 @@ fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
}
}
+pub struct QueueWaker {
+ wakers_tx: HashMap<PathBuf, broadcast::Sender<()>>,
+}
+
+fn shared_queue_waker_channel(
+ waker_key: &Path,
+ state: Rc<RefCell<OpState>>,
+) -> (broadcast::Sender<()>, broadcast::Receiver<()>) {
+ let mut state = state.borrow_mut();
+ let waker = {
+ let waker = state.try_borrow_mut::<QueueWaker>();
+ match waker {
+ Some(waker) => waker,
+ None => {
+ let waker = QueueWaker {
+ wakers_tx: HashMap::new(),
+ };
+ state.put::<QueueWaker>(waker);
+ state.borrow_mut::<QueueWaker>()
+ }
+ }
+ };
+
+ let waker_tx = waker
+ .wakers_tx
+ .entry(waker_key.to_path_buf())
+ .or_insert_with(|| {
+ let (waker_tx, _) = broadcast::channel(1);
+ waker_tx
+ });
+
+ (waker_tx.clone(), waker_tx.subscribe())
+}
+
+/// Same as Path::canonicalize, but also handles non-existing paths.
+fn canonicalize_path(path: &Path) -> Result<PathBuf, AnyError> {
+ let path = path.to_path_buf().clean();
+ let mut path = path;
+ let mut names_stack = Vec::new();
+ loop {
+ match path.canonicalize() {
+ Ok(mut canonicalized_path) => {
+ for name in names_stack.into_iter().rev() {
+ canonicalized_path = canonicalized_path.join(name);
+ }
+ return Ok(canonicalized_path);
+ }
+ Err(err) if err.kind() == ErrorKind::NotFound => {
+ let file_name = path.file_name().map(|os_str| os_str.to_os_string());
+ if let Some(file_name) = file_name {
+ names_stack.push(file_name.to_str().unwrap().to_string());
+ path = path.parent().unwrap().to_path_buf();
+ } else {
+ names_stack.push(path.to_str().unwrap().to_string());
+ let current_dir = current_dir()?;
+ path = current_dir.clone();
+ }
+ }
+ Err(err) => return Err(err.into()),
+ }
+ }
+}
+
fn is_conn_closed_error(e: &AnyError) -> bool {
get_custom_error_class(e) == Some("TypeError")
&& e.to_string() == ERROR_USING_CLOSED_DATABASE