summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-09-26 20:06:57 -0700
committerGitHub <noreply@github.com>2023-09-26 20:06:57 -0700
commitf0a022bed4c8ca46c472c6361f1793cb29f73480 (patch)
tree4f88ffcd77e94292a8696111ba6828a197eb153e /ext
parentb433133a1ff1c329b05ce024d8db5fefb8f7b431 (diff)
fix(kv_queues): graceful shutdown (#20627)
This fixes the `TypeError: Database closed` error during shutdown.
Diffstat (limited to 'ext')
-rw-r--r--ext/kv/01_db.ts47
-rw-r--r--ext/kv/dynamic.rs13
-rw-r--r--ext/kv/interface.rs2
-rw-r--r--ext/kv/lib.rs20
-rw-r--r--ext/kv/remote.rs2
-rw-r--r--ext/kv/sqlite.rs47
6 files changed, 69 insertions, 62 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index e934a3b6d..6e8a571f0 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -61,7 +61,6 @@ const kvSymbol = Symbol("KvRid");
class Kv {
#rid: number;
- #closed: boolean;
constructor(rid: number = undefined, symbol: symbol = undefined) {
if (kvSymbol !== symbol) {
@@ -70,7 +69,6 @@ class Kv {
);
}
this.#rid = rid;
- this.#closed = false;
}
atomic() {
@@ -251,20 +249,14 @@ class Kv {
handler: (message: unknown) => Promise<void> | void,
): Promise<void> {
const finishMessageOps = new Map<number, Promise<void>>();
- while (!this.#closed) {
+ while (true) {
// Wait for the next message.
- let next: { 0: Uint8Array; 1: number };
- try {
- next = await core.opAsync(
- "op_kv_dequeue_next_message",
- this.#rid,
- );
- } catch (error) {
- if (this.#closed) {
- break;
- } else {
- throw error;
- }
+ const next: { 0: Uint8Array; 1: number } = await core.opAsync(
+ "op_kv_dequeue_next_message",
+ this.#rid,
+ );
+ if (next === null) {
+ break;
}
// Deserialize the payload.
@@ -283,20 +275,16 @@ class Kv {
} catch (error) {
console.error("Exception in queue handler", error);
} finally {
- if (this.#closed) {
- core.close(handleId);
- } else {
- const promise: Promise<void> = core.opAsync(
- "op_kv_finish_dequeued_message",
- handleId,
- success,
- );
- finishMessageOps.set(handleId, promise);
- try {
- await promise;
- } finally {
- finishMessageOps.delete(handleId);
- }
+ const promise: Promise<void> = core.opAsync(
+ "op_kv_finish_dequeued_message",
+ handleId,
+ success,
+ );
+ finishMessageOps.set(handleId, promise);
+ try {
+ await promise;
+ } finally {
+ finishMessageOps.delete(handleId);
}
}
})();
@@ -310,7 +298,6 @@ class Kv {
close() {
core.close(this.#rid);
- this.#closed = true;
}
}
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs
index f79c10f55..9084cc1bf 100644
--- a/ext/kv/dynamic.rs
+++ b/ext/kv/dynamic.rs
@@ -132,7 +132,7 @@ pub trait DynamicDb {
async fn dyn_dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
- ) -> Result<Box<dyn QueueMessageHandle>, AnyError>;
+ ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;
fn dyn_close(&self);
}
@@ -161,7 +161,7 @@ impl Database for Box<dyn DynamicDb> {
async fn dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
- ) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
+ ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
(**self).dyn_dequeue_next_message(state).await
}
@@ -196,8 +196,13 @@ where
async fn dyn_dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
- ) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
- Ok(Box::new(self.dequeue_next_message(state).await?))
+ ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
+ Ok(
+ self
+ .dequeue_next_message(state)
+ .await?
+ .map(|x| Box::new(x) as Box<dyn QueueMessageHandle>),
+ )
}
fn dyn_close(&self) {
diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs
index abeaf8dd5..1acf3ce16 100644
--- a/ext/kv/interface.rs
+++ b/ext/kv/interface.rs
@@ -43,7 +43,7 @@ pub trait Database {
async fn dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
- ) -> Result<Self::QMH, AnyError>;
+ ) -> Result<Option<Self::QMH>, AnyError>;
fn close(&self);
}
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index 72d5e862b..762009d2a 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -16,6 +16,7 @@ use chrono::Utc;
use codec::decode_key;
use codec::encode_key;
use deno_core::anyhow::Context;
+use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;
@@ -322,24 +323,35 @@ impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
async fn op_kv_dequeue_next_message<DBH>(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<(ToJsBuffer, ResourceId), AnyError>
+) -> Result<Option<(ToJsBuffer, ResourceId)>, AnyError>
where
DBH: DatabaseHandler + 'static,
{
let db = {
let state = state.borrow();
let resource =
- state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+ match state.resource_table.get::<DatabaseResource<DBH::DB>>(rid) {
+ Ok(resource) => resource,
+ Err(err) => {
+ if get_custom_error_class(&err) == Some("BadResource") {
+ return Ok(None);
+ } else {
+ return Err(err);
+ }
+ }
+ };
resource.db.clone()
};
- let mut handle = db.dequeue_next_message(state.clone()).await?;
+ let Some(mut handle) = db.dequeue_next_message(state.clone()).await? else {
+ return Ok(None);
+ };
let payload = handle.take_payload().await?.into();
let handle_rid = {
let mut state = state.borrow_mut();
state.resource_table.add(QueueMessageResource { handle })
};
- Ok((payload, handle_rid))
+ Ok(Some((payload, handle_rid)))
}
#[op2(async)]
diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs
index fc18e4615..36c4d3af2 100644
--- a/ext/kv/remote.rs
+++ b/ext/kv/remote.rs
@@ -277,7 +277,7 @@ impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> {
async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
- ) -> Result<Self::QMH, AnyError> {
+ ) -> Result<Option<Self::QMH>, AnyError> {
let msg = "Deno.Kv.listenQueue is not supported for remote KV databases";
eprintln!("{}", yellow(msg));
deno_core::futures::future::pending().await
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index ceeb98c25..192141e27 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -395,9 +395,7 @@ impl QueueMessageHandle for DequeuedMessage {
Err(e) => {
// Silently ignore the error if the database has been closed
// This message will be delivered on the next run
- if get_custom_error_class(&e) == Some("TypeError")
- && e.to_string() == ERROR_USING_CLOSED_DATABASE
- {
+ if is_conn_closed_error(&e) {
return Ok(());
}
return Err(e);
@@ -437,25 +435,25 @@ impl SqliteQueue {
spawn(async move {
// Oneshot requeue of all inflight messages.
- Self::requeue_inflight_messages(conn.clone()).await.unwrap();
+ if let Err(e) = Self::requeue_inflight_messages(conn.clone()).await {
+ // Exit the dequeue loop cleanly if the database has been closed.
+ if is_conn_closed_error(&e) {
+ return;
+ }
+ panic!("kv: Error in requeue_inflight_messages: {}", e);
+ }
// Continuous dequeue loop.
- match Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
- .await
+ if let Err(e) =
+ Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
+ .await
{
- Ok(_) => Ok(()),
- Err(e) => {
- // Exit the dequeue loop cleanly if the database has been closed.
- if get_custom_error_class(&e) == Some("TypeError")
- && e.to_string() == ERROR_USING_CLOSED_DATABASE
- {
- Ok(())
- } else {
- Err(e)
- }
+ // Exit the dequeue loop cleanly if the database has been closed.
+ if is_conn_closed_error(&e) {
+ return;
}
+ panic!("kv: Error in dequeue_loop: {}", e);
}
- .unwrap();
});
Self {
@@ -467,25 +465,25 @@ impl SqliteQueue {
}
}
- async fn dequeue(&self) -> Result<DequeuedMessage, AnyError> {
+ async fn dequeue(&self) -> Result<Option<DequeuedMessage>, AnyError> {
// Wait for the next message to be available from dequeue_rx.
let (payload, id) = {
let mut queue_rx = self.dequeue_rx.borrow_mut().await;
let Some(msg) = queue_rx.recv().await else {
- return Err(type_error("Database closed"));
+ return Ok(None);
};
msg
};
let permit = self.concurrency_limiter.clone().acquire_owned().await?;
- Ok(DequeuedMessage {
+ Ok(Some(DequeuedMessage {
conn: self.conn.downgrade(),
id,
payload: Some(payload),
waker_tx: self.waker_tx.clone(),
_permit: permit,
- })
+ }))
}
async fn wake(&self) -> Result<(), AnyError> {
@@ -904,7 +902,7 @@ impl Database for SqliteDb {
async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
- ) -> Result<Self::QMH, AnyError> {
+ ) -> Result<Option<Self::QMH>, AnyError> {
let queue = self
.queue
.get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) })
@@ -1013,3 +1011,8 @@ fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
}
}
}
+
+fn is_conn_closed_error(e: &AnyError) -> bool {
+ get_custom_error_class(e) == Some("TypeError")
+ && e.to_string() == ERROR_USING_CLOSED_DATABASE
+}