diff options
author | Heyang Zhou <zhy20000919@hotmail.com> | 2023-08-18 17:34:16 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-18 17:34:16 +0800 |
commit | c77c836a238ce683b21c12f88cd1101b930ce042 (patch) | |
tree | b69b2318f618d1b384dc168ab6652496d63e2f2c | |
parent | b5839eefcf02e62e9e77e8095f372ac06a523cba (diff) |
feat(ext/kv): key expiration (#20091)
Co-authored-by: Luca Casonato <hello@lcas.dev>
-rw-r--r-- | cli/tests/unit/kv_test.ts | 126 | ||||
-rw-r--r-- | cli/tsc/dts/lib.deno.unstable.d.ts | 32 | ||||
-rw-r--r-- | ext/kv/01_db.ts | 37 | ||||
-rw-r--r-- | ext/kv/interface.rs | 1 | ||||
-rw-r--r-- | ext/kv/lib.rs | 8 | ||||
-rw-r--r-- | ext/kv/sqlite.rs | 192 |
6 files changed, 335 insertions, 61 deletions
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 74a8ed6b3..438ebd7ee 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -1806,3 +1806,129 @@ Deno.test({ } }, }); + +Deno.test({ + name: "kv expiration", + async fn() { + const filename = await Deno.makeTempFile({ prefix: "kv_expiration_db" }); + try { + await Deno.remove(filename); + } catch { + // pass + } + let db: Deno.Kv | null = null; + + try { + db = await Deno.openKv(filename); + + await db.set(["a"], 1, { expireIn: 1000 }); + await db.set(["b"], 2, { expireIn: 1000 }); + assertEquals((await db.get(["a"])).value, 1); + assertEquals((await db.get(["b"])).value, 2); + + // Value overwrite should also reset expiration + await db.set(["b"], 2, { expireIn: 3600 * 1000 }); + + // Wait for expiration + await sleep(1000); + + // Re-open to trigger immediate cleanup + db.close(); + db = null; + db = await Deno.openKv(filename); + + let ok = false; + for (let i = 0; i < 50; i++) { + await sleep(100); + if ( + JSON.stringify( + (await db.getMany([["a"], ["b"]])).map((x) => x.value), + ) === "[null,2]" + ) { + ok = true; + break; + } + } + + if (!ok) { + throw new Error("Values did not expire"); + } + } finally { + if (db) { + try { + db.close(); + } catch { + // pass + } + } + try { + await Deno.remove(filename); + } catch { + // pass + } + } + }, +}); + +Deno.test({ + name: "kv expiration with atomic", + async fn() { + const filename = await Deno.makeTempFile({ prefix: "kv_expiration_db" }); + try { + await Deno.remove(filename); + } catch { + // pass + } + let db: Deno.Kv | null = null; + + try { + db = await Deno.openKv(filename); + + await db.atomic().set(["a"], 1, { expireIn: 1000 }).set(["b"], 2, { + expireIn: 1000, + }).commit(); + assertEquals((await db.getMany([["a"], ["b"]])).map((x) => x.value), [ + 1, + 2, + ]); + + // Wait for expiration + await sleep(1000); + + // Re-open to trigger immediate cleanup + db.close(); + db = null; + db = await Deno.openKv(filename); + + let ok = false; + for (let i = 0; i < 50; i++) { + await sleep(100); + if ( + JSON.stringify( + (await db.getMany([["a"], ["b"]])).map((x) => x.value), + ) === "[null,null]" + ) { + ok = true; + break; + } + } + + if (!ok) { + throw new Error("Values did not expire"); + } + } finally { + if (db) { + try { + db.close(); + } catch { + // pass + } + } + try { + await Deno.remove(filename); + } catch { + // pass + } + } + }, +}); diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index eb612a655..70b346a2a 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1357,7 +1357,13 @@ declare namespace Deno { * mutation is applied to the key. * * - `set` - Sets the value of the key to the given value, overwriting any - * existing value. + * existing value. Optionally an `expireIn` option can be specified to + * set a time-to-live (TTL) for the key. The TTL is specified in + * milliseconds, and the key will be deleted from the database at earliest + * after the specified number of milliseconds have elapsed. Once the + * specified duration has passed, the key may still be visible for some + * additional time. If the `expireIn` option is not specified, the key will + * not expire. * - `delete` - Deletes the key from the database. The mutation is a no-op if * the key does not exist. * - `sum` - Adds the given value to the existing value of the key. Both the @@ -1379,7 +1385,7 @@ declare namespace Deno { export type KvMutation = & { key: KvKey } & ( - | { type: "set"; value: unknown } + | { type: "set"; value: unknown; expireIn?: number } | { type: "delete" } | { type: "sum"; value: KvU64 } | { type: "max"; value: KvU64 } @@ -1591,8 +1597,15 @@ declare namespace Deno { /** * Add to the operation a mutation that sets the value of the specified key * to the specified value if all checks pass during the commit. + * + * Optionally an `expireIn` option can be specified to set a time-to-live + * (TTL) for the key. The TTL is specified in milliseconds, and the key will + * be deleted from the database at earliest after the specified number of + * milliseconds have elapsed. Once the specified duration has passed, the + * key may still be visible for some additional time. If the `expireIn` + * option is not specified, the key will not expire. */ - set(key: KvKey, value: unknown): this; + set(key: KvKey, value: unknown, options?: { expireIn?: number }): this; /** * Add to the operation a mutation that deletes the specified key if all * checks pass during the commit. @@ -1721,8 +1734,19 @@ declare namespace Deno { * const db = await Deno.openKv(); * await db.set(["foo"], "bar"); * ``` + * + * Optionally an `expireIn` option can be specified to set a time-to-live + * (TTL) for the key. The TTL is specified in milliseconds, and the key will + * be deleted from the database at earliest after the specified number of + * milliseconds have elapsed. Once the specified duration has passed, the + * key may still be visible for some additional time. If the `expireIn` + * option is not specified, the key will not expire. */ - set(key: KvKey, value: unknown): Promise<KvCommitResult>; + set( + key: KvKey, + value: unknown, + options?: { expireIn?: number }, + ): Promise<KvCommitResult>; /** * Delete the value for the given key from the database. If no value exists diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 8fd1f5997..4cd8744ba 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -130,12 +130,15 @@ class Kv { }); } - async set(key: Deno.KvKey, value: unknown) { + async set(key: Deno.KvKey, value: unknown, options?: { expireIn?: number }) { value = serializeValue(value); const checks: Deno.AtomicCheck[] = []; + const expireAt = typeof options?.expireIn === "number" + ? Date.now() + options.expireIn + : undefined; const mutations = [ - [key, "set", value], + [key, "set", value, expireAt], ]; const versionstamp = await core.opAsync( @@ -152,7 +155,7 @@ class Kv { async delete(key: Deno.KvKey) { const checks: Deno.AtomicCheck[] = []; const mutations = [ - [key, "delete", null], + [key, "delete", null, undefined], ]; const result = await core.opAsync( @@ -318,7 +321,7 @@ class AtomicOperation { #rid: number; #checks: [Deno.KvKey, string | null][] = []; - #mutations: [Deno.KvKey, string, RawValue | null][] = []; + #mutations: [Deno.KvKey, string, RawValue | null, number | undefined][] = []; #enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][] = []; constructor(rid: number) { @@ -337,6 +340,7 @@ class AtomicOperation { const key = mutation.key; let type: string; let value: RawValue | null; + let expireAt: number | undefined = undefined; switch (mutation.type) { case "delete": type = "delete"; @@ -345,6 +349,10 @@ class AtomicOperation { } break; case "set": + if (typeof mutation.expireIn === "number") { + expireAt = Date.now() + mutation.expireIn; + } + /* falls through */ case "sum": case "min": case "max": @@ -357,33 +365,40 @@ class AtomicOperation { default: throw new TypeError("Invalid mutation type"); } - this.#mutations.push([key, type, value]); + this.#mutations.push([key, type, value, expireAt]); } return this; } sum(key: Deno.KvKey, n: bigint): this { - this.#mutations.push([key, "sum", serializeValue(new KvU64(n))]); + this.#mutations.push([key, "sum", serializeValue(new KvU64(n)), undefined]); return this; } min(key: Deno.KvKey, n: bigint): this { - this.#mutations.push([key, "min", serializeValue(new KvU64(n))]); + this.#mutations.push([key, "min", serializeValue(new KvU64(n)), undefined]); return this; } max(key: Deno.KvKey, n: bigint): this { - this.#mutations.push([key, "max", serializeValue(new KvU64(n))]); + this.#mutations.push([key, "max", serializeValue(new KvU64(n)), undefined]); return this; } - set(key: Deno.KvKey, value: unknown): this { - this.#mutations.push([key, "set", serializeValue(value)]); + set( + key: Deno.KvKey, + value: unknown, + options?: { expireIn?: number }, + ): this { + const expireAt = typeof options?.expireIn === "number" + ? Date.now() + options.expireIn + : undefined; + this.#mutations.push([key, "set", serializeValue(value), expireAt]); return this; } delete(key: Deno.KvKey): this { - this.#mutations.push([key, "delete", null]); + this.#mutations.push([key, "delete", null, undefined]); return this; } diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs index b67ee1243..28b43f8d7 100644 --- a/ext/kv/interface.rs +++ b/ext/kv/interface.rs @@ -237,6 +237,7 @@ pub struct KvCheck { pub struct KvMutation { pub key: Vec<u8>, pub kind: MutationKind, + pub expire_at: Option<u64>, } /// A request to enqueue a message to the database. This message is delivered diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index a781f4579..7164a700b 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -375,7 +375,7 @@ impl TryFrom<V8KvCheck> for KvCheck { } } -type V8KvMutation = (KvKey, String, Option<FromV8Value>); +type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>); impl TryFrom<V8KvMutation> for KvMutation { type Error = AnyError; @@ -396,7 +396,11 @@ impl TryFrom<V8KvMutation> for KvMutation { ))) } }; - Ok(KvMutation { key, kind }) + Ok(KvMutation { + key, + kind, + expire_at: value.3, + }) } } diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index aea438d2d..8e37d2c87 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -1,7 +1,6 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use std::borrow::Cow; -use std::cell::Cell; use std::cell::RefCell; use std::future::Future; use std::marker::PhantomData; @@ -10,10 +9,12 @@ use std::path::PathBuf; use std::rc::Rc; use std::rc::Weak; use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; use std::time::SystemTime; use async_trait::async_trait; +use deno_core::error::get_custom_error_class; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures; @@ -57,7 +58,7 @@ const STATEMENT_KV_POINT_GET_VALUE_ONLY: &str = const STATEMENT_KV_POINT_GET_VERSION_ONLY: &str = "select version from kv where k = ?"; const STATEMENT_KV_POINT_SET: &str = - "insert into kv (k, v, v_encoding, version) values (:k, :v, :v_encoding, :version) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version"; + "insert into kv (k, v, v_encoding, version, expiration_ms) values (:k, :v, :v_encoding, :version, :expiration_ms) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version, expiration_ms = :expiration_ms"; const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?"; const STATEMENT_QUEUE_ADD_READY: &str = "insert into queue (ts, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)"; @@ -79,7 +80,7 @@ create table if not exists migration_state( ) "; -const MIGRATIONS: [&str; 2] = [ +const MIGRATIONS: [&str; 3] = [ " create table data_version ( k integer primary key, @@ -113,11 +114,55 @@ create table queue_running( primary key (deadline, id) ); ", + " +alter table kv add column seq integer not null default 0; +alter table data_version add column seq integer not null default 0; +alter table kv add column expiration_ms integer not null default -1; +create index kv_expiration_ms_idx on kv (expiration_ms); +", ]; const DISPATCH_CONCURRENCY_LIMIT: usize = 100; const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1000, 5000, 30000, 60000]; +const ERROR_USING_CLOSED_DATABASE: &str = "Attempted to use a closed database"; + +#[derive(Clone)] +struct ProtectedConn { + guard: Rc<AsyncRefCell<()>>, + conn: Arc<Mutex<Option<rusqlite::Connection>>>, +} + +#[derive(Clone)] +struct WeakProtectedConn { + guard: Weak<AsyncRefCell<()>>, + conn: std::sync::Weak<Mutex<Option<rusqlite::Connection>>>, +} + +impl ProtectedConn { + fn new(conn: rusqlite::Connection) -> Self { + Self { + guard: Rc::new(AsyncRefCell::new(())), + conn: Arc::new(Mutex::new(Some(conn))), + } + } + + fn downgrade(&self) -> WeakProtectedConn { + WeakProtectedConn { + guard: Rc::downgrade(&self.guard), + conn: Arc::downgrade(&self.conn), + } + } +} + +impl WeakProtectedConn { + fn upgrade(&self) -> Option<ProtectedConn> { + let guard = self.guard.upgrade()?; + let conn = self.conn.upgrade()?; + Some(ProtectedConn { guard, conn }) + } +} + pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> { pub default_storage_dir: Option<PathBuf>, _permissions: PhantomData<P>, @@ -197,7 +242,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { } }) .await?; - let conn = Rc::new(AsyncRefCell::new(Cell::new(Some(conn)))); + let conn = ProtectedConn::new(conn); SqliteDb::run_tx(conn.clone(), |tx| { tx.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?; @@ -227,16 +272,35 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { }) .await?; + let expiration_watcher = spawn(watch_expiration(conn.clone())); + Ok(SqliteDb { conn, queue: OnceCell::new(), + expiration_watcher, }) } } pub struct SqliteDb { - conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + conn: ProtectedConn, queue: OnceCell<SqliteQueue>, + expiration_watcher: deno_core::task::JoinHandle<()>, +} + +impl Drop for SqliteDb { + fn drop(&mut self) { + self.expiration_watcher.abort(); + + // The above `abort()` operation is asynchronous. It's not + // guaranteed that the sqlite connection will be closed immediately. + // So here we synchronously take the conn mutex and drop the connection. + // + // This blocks the event loop if the connection is still being used, + // but ensures correctness - deleting the database file after calling + // the `close` method will always work. + self.conn.conn.lock().unwrap().take(); + } } async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>( @@ -263,10 +327,7 @@ async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>( } impl SqliteDb { - async fn run_tx<F, R>( - conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, - f: F, - ) -> Result<R, AnyError> + async fn run_tx<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError> where F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>) + Clone @@ -277,42 +338,38 @@ impl SqliteDb { sqlite_retry_loop(|| Self::run_tx_inner(conn.clone(), f.clone())).await } - async fn run_tx_inner<F, R>( - conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, - f: F, - ) -> Result<R, AnyError> + async fn run_tx_inner<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError> where F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>) + Send + 'static, R: Send + 'static, { - // Transactions need exclusive access to the connection. Wait until - // we can borrow_mut the connection. - let cell = conn.borrow_mut().await; - - // Take the db out of the cell and run the transaction via spawn_blocking. - let mut db = cell.take().unwrap(); - let (result, db) = spawn_blocking(move || { - let result = { - match db.transaction() { - Ok(tx) => f(tx), - Err(e) => Err(e.into()), - } + // `run_tx` runs in an asynchronous context. First acquire the async lock to + // coordinate with other async invocations. + let _guard_holder = conn.guard.borrow_mut().await; + + // Then, take the synchronous lock. This operation is guaranteed to success without waiting, + // unless the database is being closed. + let db = conn.conn.clone(); + spawn_blocking(move || { + let mut db = db.try_lock().ok(); + let Some(db) = db.as_mut().and_then(|x| x.as_mut()) else { + return Err(type_error(ERROR_USING_CLOSED_DATABASE)) }; - (result, db) + let result = match db.transaction() { + Ok(tx) => f(tx), + Err(e) => Err(e.into()), + }; + result }) .await - .unwrap(); - - // Put the db back into the cell. - cell.set(Some(db)); - result + .unwrap() } } pub struct DequeuedMessage { - conn: Weak<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + conn: WeakProtectedConn, id: String, payload: Option<Vec<u8>>, waker_tx: mpsc::Sender<()>, @@ -341,7 +398,20 @@ impl QueueMessageHandle for DequeuedMessage { tx.commit()?; Ok(requeued) }) - .await?; + .await; + let requeued = match requeued { + Ok(x) => x, + Err(e) => { + // Silently ignore the error if the database has been closed + // This message will be delivered on the next run + if get_custom_error_class(&e) == Some("TypeError") + && e.to_string() == ERROR_USING_CLOSED_DATABASE + { + return Ok(()); + } + return Err(e); + } + }; if requeued { // If the message was requeued, wake up the dequeue loop. self.waker_tx.send(()).await?; @@ -360,7 +430,7 @@ impl QueueMessageHandle for DequeuedMessage { type DequeueReceiver = mpsc::Receiver<(Vec<u8>, String)>; struct SqliteQueue { - conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + conn: ProtectedConn, dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>, concurrency_limiter: Arc<Semaphore>, waker_tx: mpsc::Sender<()>, @@ -368,7 +438,7 @@ struct SqliteQueue { } impl SqliteQueue { - fn new(conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>) -> Self { + fn new(conn: ProtectedConn) -> Self { let conn_clone = conn.clone(); let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); let (waker_tx, waker_rx) = mpsc::channel::<()>(1); @@ -406,7 +476,7 @@ impl SqliteQueue { let permit = self.concurrency_limiter.clone().acquire_owned().await?; Ok(DequeuedMessage { - conn: Rc::downgrade(&self.conn), + conn: self.conn.downgrade(), id, payload: Some(payload), waker_tx: self.waker_tx.clone(), @@ -424,7 +494,7 @@ impl SqliteQueue { } async fn dequeue_loop( - conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + conn: ProtectedConn, dequeue_tx: mpsc::Sender<(Vec<u8>, String)>, mut shutdown_rx: watch::Receiver<()>, mut waker_rx: mpsc::Receiver<()>, @@ -511,7 +581,7 @@ impl SqliteQueue { } async fn get_earliest_ready_ts( - conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + conn: ProtectedConn, ) -> Result<Option<u64>, AnyError> { SqliteDb::run_tx(conn.clone(), move |tx| { let ts = tx @@ -527,7 +597,7 @@ impl SqliteQueue { } async fn requeue_inflight_messages( - conn: Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>, + conn: ProtectedConn, ) -> Result<(), AnyError> { loop { let done = SqliteDb::run_tx(conn.clone(), move |tx| { @@ -608,7 +678,7 @@ impl SqliteQueue { for key in keys_if_undelivered { let changed = tx .prepare_cached(STATEMENT_KV_POINT_SET)? - .execute(params![key, &data, &VALUE_ENCODING_V8, &version])?; + .execute(params![key, &data, &VALUE_ENCODING_V8, &version, -1i64])?; assert_eq!(changed, 1); } } @@ -623,6 +693,31 @@ impl SqliteQueue { } } +async fn watch_expiration(db: ProtectedConn) { + loop { + // Scan for expired keys + let res = SqliteDb::run_tx(db.clone(), move |tx| { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + tx.prepare_cached( + "delete from kv where expiration_ms >= 0 and expiration_ms <= ?", + )? + .execute(params![now])?; + tx.commit()?; + Ok(()) + }) + .await; + if let Err(e) = res { + eprintln!("kv: Error in expiration watcher: {}", e); + } + let sleep_duration = + Duration::from_secs_f64(60.0 + rand::thread_rng().gen_range(0.0..30.0)); + tokio::time::sleep(sleep_duration).await; + } +} + #[async_trait(?Send)] impl Database for SqliteDb { type QMH = DequeuedMessage; @@ -698,9 +793,17 @@ impl Database for SqliteDb { match &mutation.kind { MutationKind::Set(value) => { let (value, encoding) = encode_value(value); - let changed = tx - .prepare_cached(STATEMENT_KV_POINT_SET)? - .execute(params![mutation.key, &value, &encoding, &version])?; + let changed = + tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![ + mutation.key, + value, + &encoding, + &version, + mutation + .expire_at + .and_then(|x| i64::try_from(x).ok()) + .unwrap_or(-1i64) + ])?; assert_eq!(changed, 1) } MutationKind::Delete => { @@ -845,7 +948,8 @@ fn mutate_le64( key, &new_value[..], encoding, - new_version + new_version, + -1i64, ])?; assert_eq!(changed, 1); |