summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/kv/01_db.ts37
-rw-r--r--ext/kv/interface.rs1
-rw-r--r--ext/kv/lib.rs8
-rw-r--r--ext/kv/sqlite.rs192
4 files changed, 181 insertions, 57 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index 8fd1f5997..4cd8744ba 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -130,12 +130,15 @@ class Kv {
});
}
- async set(key: Deno.KvKey, value: unknown) {
+ async set(key: Deno.KvKey, value: unknown, options?: { expireIn?: number }) {
value = serializeValue(value);
const checks: Deno.AtomicCheck[] = [];
+ const expireAt = typeof options?.expireIn === "number"
+ ? Date.now() + options.expireIn
+ : undefined;
const mutations = [
- [key, "set", value],
+ [key, "set", value, expireAt],
];
const versionstamp = await core.opAsync(
@@ -152,7 +155,7 @@ class Kv {
async delete(key: Deno.KvKey) {
const checks: Deno.AtomicCheck[] = [];
const mutations = [
- [key, "delete", null],
+ [key, "delete", null, undefined],
];
const result = await core.opAsync(
@@ -318,7 +321,7 @@ class AtomicOperation {
#rid: number;
#checks: [Deno.KvKey, string | null][] = [];
- #mutations: [Deno.KvKey, string, RawValue | null][] = [];
+ #mutations: [Deno.KvKey, string, RawValue | null, number | undefined][] = [];
#enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][] = [];
constructor(rid: number) {
@@ -337,6 +340,7 @@ class AtomicOperation {
const key = mutation.key;
let type: string;
let value: RawValue | null;
+ let expireAt: number | undefined = undefined;
switch (mutation.type) {
case "delete":
type = "delete";
@@ -345,6 +349,10 @@ class AtomicOperation {
}
break;
case "set":
+ if (typeof mutation.expireIn === "number") {
+ expireAt = Date.now() + mutation.expireIn;
+ }
+ /* falls through */
case "sum":
case "min":
case "max":
@@ -357,33 +365,40 @@ class AtomicOperation {
default:
throw new TypeError("Invalid mutation type");
}
- this.#mutations.push([key, type, value]);
+ this.#mutations.push([key, type, value, expireAt]);
}
return this;
}
sum(key: Deno.KvKey, n: bigint): this {
- this.#mutations.push([key, "sum", serializeValue(new KvU64(n))]);
+ this.#mutations.push([key, "sum", serializeValue(new KvU64(n)), undefined]);
return this;
}
min(key: Deno.KvKey, n: bigint): this {
- this.#mutations.push([key, "min", serializeValue(new KvU64(n))]);
+ this.#mutations.push([key, "min", serializeValue(new KvU64(n)), undefined]);
return this;
}
max(key: Deno.KvKey, n: bigint): this {
- this.#mutations.push([key, "max", serializeValue(new KvU64(n))]);
+ this.#mutations.push([key, "max", serializeValue(new KvU64(n)), undefined]);
return this;
}
- set(key: Deno.KvKey, value: unknown): this {
- this.#mutations.push([key, "set", serializeValue(value)]);
+ set(
+ key: Deno.KvKey,
+ value: unknown,
+ options?: { expireIn?: number },
+ ): this {
+ const expireAt = typeof options?.expireIn === "number"
+ ? Date.now() + options.expireIn
+ : undefined;
+ this.#mutations.push([key, "set", serializeValue(value), expireAt]);
return this;
}
delete(key: Deno.KvKey): this {
- this.#mutations.push([key, "delete", null]);
+ this.#mutations.push([key, "delete", null, undefined]);
return this;
}
diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs
index b67ee1243..28b43f8d7 100644
--- a/ext/kv/interface.rs
+++ b/ext/kv/interface.rs
@@ -237,6 +237,7 @@ pub struct KvCheck {
pub struct KvMutation {
pub key: Vec<u8>,
pub kind: MutationKind,
+ pub expire_at: Option<u64>,
}
/// A request to enqueue a message to the database. This message is delivered
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index a781f4579..7164a700b 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -375,7 +375,7 @@ impl TryFrom<V8KvCheck> for KvCheck {
}
}
-type V8KvMutation = (KvKey, String, Option<FromV8Value>);
+type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);
impl TryFrom<V8KvMutation> for KvMutation {
type Error = AnyError;
@@ -396,7 +396,11 @@ impl TryFrom<V8KvMutation> for KvMutation {
)))
}
};
- Ok(KvMutation { key, kind })
+ Ok(KvMutation {
+ key,
+ kind,
+ expire_at: value.3,
+ })
}
}
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index aea438d2d..8e37d2c87 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -1,7 +1,6 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::borrow::Cow;
-use std::cell::Cell;
use std::cell::RefCell;
use std::future::Future;
use std::marker::PhantomData;
@@ -10,10 +9,12 @@ use std::path::PathBuf;
use std::rc::Rc;
use std::rc::Weak;
use std::sync::Arc;
+use std::sync::Mutex;
use std::time::Duration;
use std::time::SystemTime;
use async_trait::async_trait;
+use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures;
@@ -57,7 +58,7 @@ const STATEMENT_KV_POINT_GET_VALUE_ONLY: &str =
const STATEMENT_KV_POINT_GET_VERSION_ONLY: &str =
"select version from kv where k = ?";
const STATEMENT_KV_POINT_SET: &str =
- "insert into kv (k, v, v_encoding, version) values (:k, :v, :v_encoding, :version) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version";
+ "insert into kv (k, v, v_encoding, version, expiration_ms) values (:k, :v, :v_encoding, :version, :expiration_ms) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version, expiration_ms = :expiration_ms";
const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?";
const STATEMENT_QUEUE_ADD_READY: &str = "insert into queue (ts, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)";
@@ -79,7 +80,7 @@ create table if not exists migration_state(
)
";
-const MIGRATIONS: [&str; 2] = [
+const MIGRATIONS: [&str; 3] = [
"
create table data_version (
k integer primary key,
@@ -113,11 +114,55 @@ create table queue_running(
primary key (deadline, id)
);
",
+ "
+alter table kv add column seq integer not null default 0;
+alter table data_version add column seq integer not null default 0;
+alter table kv add column expiration_ms integer not null default -1;
+create index kv_expiration_ms_idx on kv (expiration_ms);
+",
];
const DISPATCH_CONCURRENCY_LIMIT: usize = 100;
const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1000, 5000, 30000, 60000];
+const ERROR_USING_CLOSED_DATABASE: &str = "Attempted to use a closed database";
+
+#[derive(Clone)]
+struct ProtectedConn {
+ guard: Rc<AsyncRefCell<()>>,
+ conn: Arc<Mutex<Option<rusqlite::Connection>>>,
+}
+
+#[derive(Clone)]
+struct WeakProtectedConn {
+ guard: Weak<AsyncRefCell<()>>,
+ conn: std::sync::Weak<Mutex<Option<rusqlite::Connection>>>,
+}
+
+impl ProtectedConn {
+ fn new(conn: rusqlite::Connection) -> Self {
+ Self {
+ guard: Rc::new(AsyncRefCell::new(())),
+ conn: Arc::new(Mutex::new(Some(conn))),
+ }
+ }
+
+ fn downgrade(&self) -> WeakProtectedConn {
+ WeakProtectedConn {
+ guard: Rc::downgrade(&self.guard),
+ conn: Arc::downgrade(&self.conn),
+ }
+ }
+}
+
+impl WeakProtectedConn {
+ fn upgrade(&self) -> Option<ProtectedConn> {
+ let guard = self.guard.upgrade()?;
+ let conn = self.conn.upgrade()?;
+ Some(ProtectedConn { guard, conn })
+ }
+}
+
pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> {
pub default_storage_dir: Option<PathBuf>,
_permissions: PhantomData<P>,
@@ -197,7 +242,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
}
})
.await?;
- let conn = Rc::new(AsyncRefCell::new(Cell::new(Some(conn))));
+ let conn = ProtectedConn::new(conn);
SqliteDb::run_tx(conn.clone(), |tx| {
tx.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
@@ -227,16 +272,35 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
})
.await?;
+ let expiration_watcher = spawn(watch_expiration(conn.clone()));
+
Ok(SqliteDb {
conn,
queue: OnceCell::new(),
+ expiration_watcher,
})
}
}
pub struct SqliteDb {
- conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
+ conn: ProtectedConn,
queue: OnceCell<SqliteQueue>,
+ expiration_watcher: deno_core::task::JoinHandle<()>,
+}
+
+impl Drop for SqliteDb {
+ fn drop(&mut self) {
+ self.expiration_watcher.abort();
+
+ // The above `abort()` operation is asynchronous. It's not
+ // guaranteed that the sqlite connection will be closed immediately.
+ // So here we synchronously take the conn mutex and drop the connection.
+ //
+ // This blocks the event loop if the connection is still being used,
+ // but ensures correctness - deleting the database file after calling
+ // the `close` method will always work.
+ self.conn.conn.lock().unwrap().take();
+ }
}
async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>(
@@ -263,10 +327,7 @@ async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>(
}
impl SqliteDb {
- async fn run_tx<F, R>(
- conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
- f: F,
- ) -> Result<R, AnyError>
+ async fn run_tx<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError>
where
F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>)
+ Clone
@@ -277,42 +338,38 @@ impl SqliteDb {
sqlite_retry_loop(|| Self::run_tx_inner(conn.clone(), f.clone())).await
}
- async fn run_tx_inner<F, R>(
- conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
- f: F,
- ) -> Result<R, AnyError>
+ async fn run_tx_inner<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError>
where
F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>)
+ Send
+ 'static,
R: Send + 'static,
{
- // Transactions need exclusive access to the connection. Wait until
- // we can borrow_mut the connection.
- let cell = conn.borrow_mut().await;
-
- // Take the db out of the cell and run the transaction via spawn_blocking.
- let mut db = cell.take().unwrap();
- let (result, db) = spawn_blocking(move || {
- let result = {
- match db.transaction() {
- Ok(tx) => f(tx),
- Err(e) => Err(e.into()),
- }
+ // `run_tx` runs in an asynchronous context. First acquire the async lock to
+ // coordinate with other async invocations.
+ let _guard_holder = conn.guard.borrow_mut().await;
+
+ // Then, take the synchronous lock. This operation is guaranteed to success without waiting,
+ // unless the database is being closed.
+ let db = conn.conn.clone();
+ spawn_blocking(move || {
+ let mut db = db.try_lock().ok();
+ let Some(db) = db.as_mut().and_then(|x| x.as_mut()) else {
+ return Err(type_error(ERROR_USING_CLOSED_DATABASE))
};
- (result, db)
+ let result = match db.transaction() {
+ Ok(tx) => f(tx),
+ Err(e) => Err(e.into()),
+ };
+ result
})
.await
- .unwrap();
-
- // Put the db back into the cell.
- cell.set(Some(db));
- result
+ .unwrap()
}
}
pub struct DequeuedMessage {
- conn: Weak<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
+ conn: WeakProtectedConn,
id: String,
payload: Option<Vec<u8>>,
waker_tx: mpsc::Sender<()>,
@@ -341,7 +398,20 @@ impl QueueMessageHandle for DequeuedMessage {
tx.commit()?;
Ok(requeued)
})
- .await?;
+ .await;
+ let requeued = match requeued {
+ Ok(x) => x,
+ Err(e) => {
+ // Silently ignore the error if the database has been closed
+ // This message will be delivered on the next run
+ if get_custom_error_class(&e) == Some("TypeError")
+ && e.to_string() == ERROR_USING_CLOSED_DATABASE
+ {
+ return Ok(());
+ }
+ return Err(e);
+ }
+ };
if requeued {
// If the message was requeued, wake up the dequeue loop.
self.waker_tx.send(()).await?;
@@ -360,7 +430,7 @@ impl QueueMessageHandle for DequeuedMessage {
type DequeueReceiver = mpsc::Receiver<(Vec<u8>, String)>;
struct SqliteQueue {
- conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
+ conn: ProtectedConn,
dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>,
concurrency_limiter: Arc<Semaphore>,
waker_tx: mpsc::Sender<()>,
@@ -368,7 +438,7 @@ struct SqliteQueue {
}
impl SqliteQueue {
- fn new(conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>) -> Self {
+ fn new(conn: ProtectedConn) -> Self {
let conn_clone = conn.clone();
let (shutdown_tx, shutdown_rx) = watch::channel::<()>(());
let (waker_tx, waker_rx) = mpsc::channel::<()>(1);
@@ -406,7 +476,7 @@ impl SqliteQueue {
let permit = self.concurrency_limiter.clone().acquire_owned().await?;
Ok(DequeuedMessage {
- conn: Rc::downgrade(&self.conn),
+ conn: self.conn.downgrade(),
id,
payload: Some(payload),
waker_tx: self.waker_tx.clone(),
@@ -424,7 +494,7 @@ impl SqliteQueue {
}
async fn dequeue_loop(
- conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
+ conn: ProtectedConn,
dequeue_tx: mpsc::Sender<(Vec<u8>, String)>,
mut shutdown_rx: watch::Receiver<()>,
mut waker_rx: mpsc::Receiver<()>,
@@ -511,7 +581,7 @@ impl SqliteQueue {
}
async fn get_earliest_ready_ts(
- conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
+ conn: ProtectedConn,
) -> Result<Option<u64>, AnyError> {
SqliteDb::run_tx(conn.clone(), move |tx| {
let ts = tx
@@ -527,7 +597,7 @@ impl SqliteQueue {
}
async fn requeue_inflight_messages(
- conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>,
+ conn: ProtectedConn,
) -> Result<(), AnyError> {
loop {
let done = SqliteDb::run_tx(conn.clone(), move |tx| {
@@ -608,7 +678,7 @@ impl SqliteQueue {
for key in keys_if_undelivered {
let changed = tx
.prepare_cached(STATEMENT_KV_POINT_SET)?
- .execute(params![key, &data, &VALUE_ENCODING_V8, &version])?;
+ .execute(params![key, &data, &VALUE_ENCODING_V8, &version, -1i64])?;
assert_eq!(changed, 1);
}
}
@@ -623,6 +693,31 @@ impl SqliteQueue {
}
}
+async fn watch_expiration(db: ProtectedConn) {
+ loop {
+ // Scan for expired keys
+ let res = SqliteDb::run_tx(db.clone(), move |tx| {
+ let now = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_millis() as u64;
+ tx.prepare_cached(
+ "delete from kv where expiration_ms >= 0 and expiration_ms <= ?",
+ )?
+ .execute(params![now])?;
+ tx.commit()?;
+ Ok(())
+ })
+ .await;
+ if let Err(e) = res {
+ eprintln!("kv: Error in expiration watcher: {}", e);
+ }
+ let sleep_duration =
+ Duration::from_secs_f64(60.0 + rand::thread_rng().gen_range(0.0..30.0));
+ tokio::time::sleep(sleep_duration).await;
+ }
+}
+
#[async_trait(?Send)]
impl Database for SqliteDb {
type QMH = DequeuedMessage;
@@ -698,9 +793,17 @@ impl Database for SqliteDb {
match &mutation.kind {
MutationKind::Set(value) => {
let (value, encoding) = encode_value(value);
- let changed = tx
- .prepare_cached(STATEMENT_KV_POINT_SET)?
- .execute(params![mutation.key, &value, &encoding, &version])?;
+ let changed =
+ tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![
+ mutation.key,
+ value,
+ &encoding,
+ &version,
+ mutation
+ .expire_at
+ .and_then(|x| i64::try_from(x).ok())
+ .unwrap_or(-1i64)
+ ])?;
assert_eq!(changed, 1)
}
MutationKind::Delete => {
@@ -845,7 +948,8 @@ fn mutate_le64(
key,
&new_value[..],
encoding,
- new_version
+ new_version,
+ -1i64,
])?;
assert_eq!(changed, 1);