diff options
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r-- | ext/kv/lib.rs | 100 |
1 files changed, 60 insertions, 40 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index a4ccfe3d6..5392b9721 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -14,6 +14,7 @@ use std::time::Duration; use base64::prelude::BASE64_URL_SAFE; use base64::Engine; +use boxed_error::Boxed; use chrono::DateTime; use chrono::Utc; use deno_core::error::get_custom_error_class; @@ -114,8 +115,11 @@ impl Resource for DatabaseWatcherResource { } } +#[derive(Debug, Boxed)] +pub struct KvError(pub Box<KvErrorKind>); + #[derive(Debug, thiserror::Error)] -pub enum KvError { +pub enum KvErrorKind { #[error(transparent)] DatabaseHandler(deno_core::error::AnyError), #[error(transparent)] @@ -193,7 +197,7 @@ where let db = handler .open(state.clone(), path) .await - .map_err(KvError::DatabaseHandler)?; + .map_err(KvErrorKind::DatabaseHandler)?; let rid = state.borrow_mut().resource_table.add(DatabaseResource { db, cancel_handle: CancelHandle::new_rc(), @@ -329,7 +333,7 @@ where let resource = state .resource_table .get::<DatabaseResource<DBH::DB>>(rid) - .map_err(KvError::Resource)?; + .map_err(KvErrorKind::Resource)?; resource.db.clone() }; @@ -339,7 +343,7 @@ where }; if ranges.len() > config.max_read_ranges { - return Err(KvError::TooManyRanges(config.max_read_ranges)); + return Err(KvErrorKind::TooManyRanges(config.max_read_ranges).into_box()); } let mut total_entries = 0usize; @@ -358,14 +362,16 @@ where Ok(ReadRange { start, end, - limit: NonZeroU32::new(limit).ok_or(KvError::InvalidLimit)?, + limit: NonZeroU32::new(limit).ok_or(KvErrorKind::InvalidLimit)?, reverse, }) }) .collect::<Result<Vec<_>, KvError>>()?; if total_entries > config.max_read_entries { - return Err(KvError::TooManyEntries(config.max_read_entries)); + return Err( + KvErrorKind::TooManyEntries(config.max_read_entries).into_box(), + ); } let opts = SnapshotReadOptions { @@ -374,7 +380,7 @@ where let output_ranges = db .snapshot_read(read_ranges, opts) .await - .map_err(KvError::Kv)?; + .map_err(KvErrorKind::Kv)?; let output_ranges = output_ranges .into_iter() .map(|x| { @@ -415,7 +421,7 @@ where if get_custom_error_class(&err) == Some("BadResource") { return Ok(None); } else { - return Err(KvError::Resource(err)); + return Err(KvErrorKind::Resource(err).into_box()); } } }; @@ -423,11 +429,11 @@ where }; let Some(mut handle) = - db.dequeue_next_message().await.map_err(KvError::Kv)? + db.dequeue_next_message().await.map_err(KvErrorKind::Kv)? else { return Ok(None); }; - let payload = handle.take_payload().await.map_err(KvError::Kv)?.into(); + let payload = handle.take_payload().await.map_err(KvErrorKind::Kv)?.into(); let handle_rid = { let mut state = state.borrow_mut(); state.resource_table.add(QueueMessageResource { handle }) @@ -448,11 +454,11 @@ where let resource = state .resource_table .get::<DatabaseResource<DBH::DB>>(rid) - .map_err(KvError::Resource)?; + .map_err(KvErrorKind::Resource)?; let config = state.borrow::<Rc<KvConfig>>().clone(); if keys.len() > config.max_watched_keys { - return Err(KvError::TooManyKeys(config.max_watched_keys)); + return Err(KvErrorKind::TooManyKeys(config.max_watched_keys).into_box()); } let keys: Vec<Vec<u8>> = keys @@ -493,7 +499,7 @@ async fn op_kv_watch_next( let resource = state .resource_table .get::<DatabaseWatcherResource>(rid) - .map_err(KvError::Resource)?; + .map_err(KvErrorKind::Resource)?; resource.clone() }; @@ -519,7 +525,7 @@ async fn op_kv_watch_next( return Ok(None); }; - let entries = res.map_err(KvError::Kv)?; + let entries = res.map_err(KvErrorKind::Kv)?; let entries = entries .into_iter() .map(|entry| { @@ -549,9 +555,9 @@ where let handle = state .resource_table .take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid) - .map_err(|_| KvError::QueueMessageNotFound)?; + .map_err(|_| KvErrorKind::QueueMessageNotFound)?; Rc::try_unwrap(handle) - .map_err(|_| KvError::QueueMessageNotFound)? + .map_err(|_| KvErrorKind::QueueMessageNotFound)? .handle }; // if we fail to finish the message, there is not much we can do and the @@ -692,7 +698,7 @@ impl RawSelector { }), (Some(prefix), Some(start), None) => { if !start.starts_with(&prefix) || start.len() == prefix.len() { - return Err(KvError::StartKeyNotInKeyspace); + return Err(KvErrorKind::StartKeyNotInKeyspace.into_box()); } Ok(Self::Prefixed { prefix, @@ -702,7 +708,7 @@ impl RawSelector { } (Some(prefix), None, Some(end)) => { if !end.starts_with(&prefix) || end.len() == prefix.len() { - return Err(KvError::EndKeyNotInKeyspace); + return Err(KvErrorKind::EndKeyNotInKeyspace.into_box()); } Ok(Self::Prefixed { prefix, @@ -712,7 +718,7 @@ impl RawSelector { } (None, Some(start), Some(end)) => { if start > end { - return Err(KvError::StartKeyGreaterThanEndKey); + return Err(KvErrorKind::StartKeyGreaterThanEndKey.into_box()); } Ok(Self::Range { start, end }) } @@ -720,7 +726,7 @@ impl RawSelector { let end = start.iter().copied().chain(Some(0)).collect(); Ok(Self::Range { start, end }) } - _ => Err(KvError::InvalidRange), + _ => Err(KvErrorKind::InvalidRange.into_box()), } } @@ -782,7 +788,7 @@ fn encode_cursor( ) -> Result<String, KvError> { let common_prefix = selector.common_prefix(); if !boundary_key.starts_with(common_prefix) { - return Err(KvError::InvalidBoundaryKey); + return Err(KvErrorKind::InvalidBoundaryKey.into_box()); } Ok(BASE64_URL_SAFE.encode(&boundary_key[common_prefix.len()..])) } @@ -799,7 +805,7 @@ fn decode_selector_and_cursor( let common_prefix = selector.common_prefix(); let cursor = BASE64_URL_SAFE .decode(cursor) - .map_err(|_| KvError::InvalidCursor)?; + .map_err(|_| KvErrorKind::InvalidCursor)?; let first_key: Vec<u8>; let last_key: Vec<u8>; @@ -824,13 +830,13 @@ fn decode_selector_and_cursor( // Defend against out-of-bounds reading if let Some(start) = selector.start() { if &first_key[..] < start { - return Err(KvError::CursorOutOfBounds); + return Err(KvErrorKind::CursorOutOfBounds.into_box()); } } if let Some(end) = selector.end() { if &last_key[..] > end { - return Err(KvError::CursorOutOfBounds); + return Err(KvErrorKind::CursorOutOfBounds.into_box()); } } @@ -855,7 +861,7 @@ where let resource = state .resource_table .get::<DatabaseResource<DBH::DB>>(rid) - .map_err(KvError::Resource)?; + .map_err(KvErrorKind::Resource)?; resource.db.clone() }; @@ -865,28 +871,28 @@ where }; if checks.len() > config.max_checks { - return Err(KvError::TooManyChecks(config.max_checks)); + return Err(KvErrorKind::TooManyChecks(config.max_checks).into_box()); } if mutations.len() + enqueues.len() > config.max_mutations { - return Err(KvError::TooManyMutations(config.max_mutations)); + return Err(KvErrorKind::TooManyMutations(config.max_mutations).into_box()); } let checks = checks .into_iter() .map(check_from_v8) .collect::<Result<Vec<Check>, KvCheckError>>() - .map_err(KvError::InvalidCheck)?; + .map_err(KvErrorKind::InvalidCheck)?; let mutations = mutations .into_iter() .map(|mutation| mutation_from_v8((mutation, current_timestamp))) .collect::<Result<Vec<Mutation>, KvMutationError>>() - .map_err(KvError::InvalidMutation)?; + .map_err(KvErrorKind::InvalidMutation)?; let enqueues = enqueues .into_iter() .map(|e| enqueue_from_v8(e, current_timestamp)) .collect::<Result<Vec<Enqueue>, std::io::Error>>() - .map_err(KvError::InvalidEnqueue)?; + .map_err(KvErrorKind::InvalidEnqueue)?; let mut total_payload_size = 0usize; let mut total_key_size = 0usize; @@ -897,7 +903,7 @@ where .chain(mutations.iter().map(|m| &m.key)) { if key.is_empty() { - return Err(KvError::EmptyKey); + return Err(KvErrorKind::EmptyKey.into_box()); } total_payload_size += check_write_key_size(key, &config)?; @@ -921,13 +927,16 @@ where } if total_payload_size > config.max_total_mutation_size_bytes { - return Err(KvError::TotalMutationTooLarge( - config.max_total_mutation_size_bytes, - )); + return Err( + KvErrorKind::TotalMutationTooLarge(config.max_total_mutation_size_bytes) + .into_box(), + ); } if total_key_size > config.max_total_key_size_bytes { - return Err(KvError::TotalKeyTooLarge(config.max_total_key_size_bytes)); + return Err( + KvErrorKind::TotalKeyTooLarge(config.max_total_key_size_bytes).into_box(), + ); } let atomic_write = AtomicWrite { @@ -936,7 +945,10 @@ where enqueues, }; - let result = db.atomic_write(atomic_write).await.map_err(KvError::Kv)?; + let result = db + .atomic_write(atomic_write) + .await + .map_err(KvErrorKind::Kv)?; Ok(result.map(|res| faster_hex::hex_string(&res.versionstamp))) } @@ -958,7 +970,9 @@ fn op_kv_encode_cursor( fn check_read_key_size(key: &[u8], config: &KvConfig) -> Result<(), KvError> { if key.len() > config.max_read_key_size_bytes { - Err(KvError::KeyTooLargeToRead(config.max_read_key_size_bytes)) + Err( + KvErrorKind::KeyTooLargeToRead(config.max_read_key_size_bytes).into_box(), + ) } else { Ok(()) } @@ -969,7 +983,10 @@ fn check_write_key_size( config: &KvConfig, ) -> Result<usize, KvError> { if key.len() > config.max_write_key_size_bytes { - Err(KvError::KeyTooLargeToWrite(config.max_write_key_size_bytes)) + Err( + KvErrorKind::KeyTooLargeToWrite(config.max_write_key_size_bytes) + .into_box(), + ) } else { Ok(key.len()) } @@ -986,7 +1003,7 @@ fn check_value_size( }; if payload.len() > config.max_value_size_bytes { - Err(KvError::ValueTooLarge(config.max_value_size_bytes)) + Err(KvErrorKind::ValueTooLarge(config.max_value_size_bytes).into_box()) } else { Ok(payload.len()) } @@ -997,7 +1014,10 @@ fn check_enqueue_payload_size( config: &KvConfig, ) -> Result<usize, KvError> { if payload.len() > config.max_value_size_bytes { - Err(KvError::EnqueuePayloadTooLarge(config.max_value_size_bytes)) + Err( + KvErrorKind::EnqueuePayloadTooLarge(config.max_value_size_bytes) + .into_box(), + ) } else { Ok(payload.len()) } |