summaryrefslogtreecommitdiff
path: root/ext/kv/lib.rs
diff options
context:
space:
mode:
authorhaturau <135221985+haturatu@users.noreply.github.com>2024-11-20 01:20:47 +0900
committerGitHub <noreply@github.com>2024-11-20 01:20:47 +0900
commit85719a67e59c7aa45bead26e4942d7df8b1b42d4 (patch)
treeface0aecaac53e93ce2f23b53c48859bcf1a36ec /ext/kv/lib.rs
parent67697bc2e4a62a9670699fd18ad0dd8efc5bd955 (diff)
parent186b52731c6bb326c4d32905c5e732d082e83465 (diff)
Merge branch 'denoland:main' into main
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r--ext/kv/lib.rs308
1 files changed, 193 insertions, 115 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index 13e4f1662..5392b9721 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -12,15 +12,12 @@ use std::num::NonZeroU32;
use std::rc::Rc;
use std::time::Duration;
-use anyhow::bail;
use base64::prelude::BASE64_URL_SAFE;
use base64::Engine;
+use boxed_error::Boxed;
use chrono::DateTime;
use chrono::Utc;
-use deno_core::anyhow::Context;
use deno_core::error::get_custom_error_class;
-use deno_core::error::type_error;
-use deno_core::error::AnyError;
use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::serde_v8::AnyValue;
@@ -118,12 +115,75 @@ impl Resource for DatabaseWatcherResource {
}
}
+#[derive(Debug, Boxed)]
+pub struct KvError(pub Box<KvErrorKind>);
+
+#[derive(Debug, thiserror::Error)]
+pub enum KvErrorKind {
+ #[error(transparent)]
+ DatabaseHandler(deno_core::error::AnyError),
+ #[error(transparent)]
+ Resource(deno_core::error::AnyError),
+ #[error("Too many ranges (max {0})")]
+ TooManyRanges(usize),
+ #[error("Too many entries (max {0})")]
+ TooManyEntries(usize),
+ #[error("Too many checks (max {0})")]
+ TooManyChecks(usize),
+ #[error("Too many mutations (max {0})")]
+ TooManyMutations(usize),
+ #[error("Too many keys (max {0})")]
+ TooManyKeys(usize),
+ #[error("limit must be greater than 0")]
+ InvalidLimit,
+ #[error("Invalid boundary key")]
+ InvalidBoundaryKey,
+ #[error("Key too large for read (max {0} bytes)")]
+ KeyTooLargeToRead(usize),
+ #[error("Key too large for write (max {0} bytes)")]
+ KeyTooLargeToWrite(usize),
+ #[error("Total mutation size too large (max {0} bytes)")]
+ TotalMutationTooLarge(usize),
+ #[error("Total key size too large (max {0} bytes)")]
+ TotalKeyTooLarge(usize),
+ #[error(transparent)]
+ Kv(deno_core::error::AnyError),
+ #[error(transparent)]
+ Io(#[from] std::io::Error),
+ #[error("Queue message not found")]
+ QueueMessageNotFound,
+ #[error("Start key is not in the keyspace defined by prefix")]
+ StartKeyNotInKeyspace,
+ #[error("End key is not in the keyspace defined by prefix")]
+ EndKeyNotInKeyspace,
+ #[error("Start key is greater than end key")]
+ StartKeyGreaterThanEndKey,
+ #[error("Invalid check")]
+ InvalidCheck(#[source] KvCheckError),
+ #[error("Invalid mutation")]
+ InvalidMutation(#[source] KvMutationError),
+ #[error("Invalid enqueue")]
+ InvalidEnqueue(#[source] std::io::Error),
+ #[error("key cannot be empty")]
+ EmptyKey, // TypeError
+ #[error("Value too large (max {0} bytes)")]
+ ValueTooLarge(usize), // TypeError
+ #[error("enqueue payload too large (max {0} bytes)")]
+ EnqueuePayloadTooLarge(usize), // TypeError
+ #[error("invalid cursor")]
+ InvalidCursor,
+ #[error("cursor out of bounds")]
+ CursorOutOfBounds,
+ #[error("Invalid range")]
+ InvalidRange,
+}
+
#[op2(async)]
#[smi]
async fn op_kv_database_open<DBH>(
state: Rc<RefCell<OpState>>,
#[string] path: Option<String>,
-) -> Result<ResourceId, AnyError>
+) -> Result<ResourceId, KvError>
where
DBH: DatabaseHandler + 'static,
{
@@ -134,7 +194,10 @@ where
.check_or_exit(UNSTABLE_FEATURE_NAME, "Deno.openKv");
state.borrow::<Rc<DBH>>().clone()
};
- let db = handler.open(state.clone(), path).await?;
+ let db = handler
+ .open(state.clone(), path)
+ .await
+ .map_err(KvErrorKind::DatabaseHandler)?;
let rid = state.borrow_mut().resource_table.add(DatabaseResource {
db,
cancel_handle: CancelHandle::new_rc(),
@@ -184,8 +247,8 @@ enum ToV8Value {
}
impl TryFrom<FromV8Value> for KvValue {
- type Error = AnyError;
- fn try_from(value: FromV8Value) -> Result<Self, AnyError> {
+ type Error = num_bigint::TryFromBigIntError<num_bigint::BigInt>;
+ fn try_from(value: FromV8Value) -> Result<Self, Self::Error> {
Ok(match value {
FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()),
FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()),
@@ -214,8 +277,8 @@ struct ToV8KvEntry {
}
impl TryFrom<KvEntry> for ToV8KvEntry {
- type Error = AnyError;
- fn try_from(entry: KvEntry) -> Result<Self, AnyError> {
+ type Error = std::io::Error;
+ fn try_from(entry: KvEntry) -> Result<Self, Self::Error> {
Ok(ToV8KvEntry {
key: decode_key(&entry.key)?
.0
@@ -261,14 +324,16 @@ async fn op_kv_snapshot_read<DBH>(
#[smi] rid: ResourceId,
#[serde] ranges: Vec<SnapshotReadRange>,
#[serde] consistency: V8Consistency,
-) -> Result<Vec<Vec<ToV8KvEntry>>, AnyError>
+) -> Result<Vec<Vec<ToV8KvEntry>>, KvError>
where
DBH: DatabaseHandler + 'static,
{
let db = {
let state = state.borrow();
- let resource =
- state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+ let resource = state
+ .resource_table
+ .get::<DatabaseResource<DBH::DB>>(rid)
+ .map_err(KvErrorKind::Resource)?;
resource.db.clone()
};
@@ -278,10 +343,7 @@ where
};
if ranges.len() > config.max_read_ranges {
- return Err(type_error(format!(
- "Too many ranges (max {})",
- config.max_read_ranges
- )));
+ return Err(KvErrorKind::TooManyRanges(config.max_read_ranges).into_box());
}
let mut total_entries = 0usize;
@@ -300,33 +362,34 @@ where
Ok(ReadRange {
start,
end,
- limit: NonZeroU32::new(limit)
- .with_context(|| "limit must be greater than 0")?,
+ limit: NonZeroU32::new(limit).ok_or(KvErrorKind::InvalidLimit)?,
reverse,
})
})
- .collect::<Result<Vec<_>, AnyError>>()?;
+ .collect::<Result<Vec<_>, KvError>>()?;
if total_entries > config.max_read_entries {
- return Err(type_error(format!(
- "Too many entries (max {})",
- config.max_read_entries
- )));
+ return Err(
+ KvErrorKind::TooManyEntries(config.max_read_entries).into_box(),
+ );
}
let opts = SnapshotReadOptions {
consistency: consistency.into(),
};
- let output_ranges = db.snapshot_read(read_ranges, opts).await?;
+ let output_ranges = db
+ .snapshot_read(read_ranges, opts)
+ .await
+ .map_err(KvErrorKind::Kv)?;
let output_ranges = output_ranges
.into_iter()
.map(|x| {
x.entries
.into_iter()
.map(TryInto::try_into)
- .collect::<Result<Vec<_>, AnyError>>()
+ .collect::<Result<Vec<_>, std::io::Error>>()
})
- .collect::<Result<Vec<_>, AnyError>>()?;
+ .collect::<Result<Vec<_>, std::io::Error>>()?;
Ok(output_ranges)
}
@@ -345,7 +408,7 @@ impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
async fn op_kv_dequeue_next_message<DBH>(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<Option<(ToJsBuffer, ResourceId)>, AnyError>
+) -> Result<Option<(ToJsBuffer, ResourceId)>, KvError>
where
DBH: DatabaseHandler + 'static,
{
@@ -358,17 +421,19 @@ where
if get_custom_error_class(&err) == Some("BadResource") {
return Ok(None);
} else {
- return Err(err);
+ return Err(KvErrorKind::Resource(err).into_box());
}
}
};
resource.db.clone()
};
- let Some(mut handle) = db.dequeue_next_message().await? else {
+ let Some(mut handle) =
+ db.dequeue_next_message().await.map_err(KvErrorKind::Kv)?
+ else {
return Ok(None);
};
- let payload = handle.take_payload().await?.into();
+ let payload = handle.take_payload().await.map_err(KvErrorKind::Kv)?.into();
let handle_rid = {
let mut state = state.borrow_mut();
state.resource_table.add(QueueMessageResource { handle })
@@ -382,18 +447,18 @@ fn op_kv_watch<DBH>(
state: &mut OpState,
#[smi] rid: ResourceId,
#[serde] keys: Vec<KvKey>,
-) -> Result<ResourceId, AnyError>
+) -> Result<ResourceId, KvError>
where
DBH: DatabaseHandler + 'static,
{
- let resource = state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+ let resource = state
+ .resource_table
+ .get::<DatabaseResource<DBH::DB>>(rid)
+ .map_err(KvErrorKind::Resource)?;
let config = state.borrow::<Rc<KvConfig>>().clone();
if keys.len() > config.max_watched_keys {
- return Err(type_error(format!(
- "Too many keys (max {})",
- config.max_watched_keys
- )));
+ return Err(KvErrorKind::TooManyKeys(config.max_watched_keys).into_box());
}
let keys: Vec<Vec<u8>> = keys
@@ -428,10 +493,13 @@ enum WatchEntry {
async fn op_kv_watch_next(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<Option<Vec<WatchEntry>>, AnyError> {
+) -> Result<Option<Vec<WatchEntry>>, KvError> {
let resource = {
let state = state.borrow();
- let resource = state.resource_table.get::<DatabaseWatcherResource>(rid)?;
+ let resource = state
+ .resource_table
+ .get::<DatabaseWatcherResource>(rid)
+ .map_err(KvErrorKind::Resource)?;
resource.clone()
};
@@ -457,7 +525,7 @@ async fn op_kv_watch_next(
return Ok(None);
};
- let entries = res?;
+ let entries = res.map_err(KvErrorKind::Kv)?;
let entries = entries
.into_iter()
.map(|entry| {
@@ -468,7 +536,7 @@ async fn op_kv_watch_next(
WatchKeyOutput::Unchanged => WatchEntry::Unchanged,
})
})
- .collect::<Result<_, anyhow::Error>>()?;
+ .collect::<Result<_, KvError>>()?;
Ok(Some(entries))
}
@@ -478,7 +546,7 @@ async fn op_kv_finish_dequeued_message<DBH>(
state: Rc<RefCell<OpState>>,
#[smi] handle_rid: ResourceId,
success: bool,
-) -> Result<(), AnyError>
+) -> Result<(), KvError>
where
DBH: DatabaseHandler + 'static,
{
@@ -487,9 +555,9 @@ where
let handle = state
.resource_table
.take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid)
- .map_err(|_| type_error("Queue message not found"))?;
+ .map_err(|_| KvErrorKind::QueueMessageNotFound)?;
Rc::try_unwrap(handle)
- .map_err(|_| type_error("Queue message not found"))?
+ .map_err(|_| KvErrorKind::QueueMessageNotFound)?
.handle
};
// if we fail to finish the message, there is not much we can do and the
@@ -500,32 +568,52 @@ where
Ok(())
}
+#[derive(Debug, thiserror::Error)]
+pub enum KvCheckError {
+ #[error("invalid versionstamp")]
+ InvalidVersionstamp,
+ #[error(transparent)]
+ Io(std::io::Error),
+}
+
type V8KvCheck = (KvKey, Option<ByteString>);
-fn check_from_v8(value: V8KvCheck) -> Result<Check, AnyError> {
+fn check_from_v8(value: V8KvCheck) -> Result<Check, KvCheckError> {
let versionstamp = match value.1 {
Some(data) => {
let mut out = [0u8; 10];
if data.len() != out.len() * 2 {
- bail!(type_error("invalid versionstamp"));
+ return Err(KvCheckError::InvalidVersionstamp);
}
faster_hex::hex_decode(&data, &mut out)
- .map_err(|_| type_error("invalid versionstamp"))?;
+ .map_err(|_| KvCheckError::InvalidVersionstamp)?;
Some(out)
}
None => None,
};
Ok(Check {
- key: encode_v8_key(value.0)?,
+ key: encode_v8_key(value.0).map_err(KvCheckError::Io)?,
versionstamp,
})
}
+#[derive(Debug, thiserror::Error)]
+pub enum KvMutationError {
+ #[error(transparent)]
+ BigInt(#[from] num_bigint::TryFromBigIntError<num_bigint::BigInt>),
+ #[error(transparent)]
+ Io(#[from] std::io::Error),
+ #[error("Invalid mutation '{0}' with value")]
+ InvalidMutationWithValue(String),
+ #[error("Invalid mutation '{0}' without value")]
+ InvalidMutationWithoutValue(String),
+}
+
type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);
fn mutation_from_v8(
(value, current_timstamp): (V8KvMutation, DateTime<Utc>),
-) -> Result<Mutation, AnyError> {
+) -> Result<Mutation, KvMutationError> {
let key = encode_v8_key(value.0)?;
let kind = match (value.1.as_str(), value.2) {
("set", Some(value)) => MutationKind::Set(value.try_into()?),
@@ -542,10 +630,10 @@ fn mutation_from_v8(
MutationKind::SetSuffixVersionstampedKey(value.try_into()?)
}
(op, Some(_)) => {
- return Err(type_error(format!("Invalid mutation '{op}' with value")))
+ return Err(KvMutationError::InvalidMutationWithValue(op.to_string()))
}
(op, None) => {
- return Err(type_error(format!("Invalid mutation '{op}' without value")))
+ return Err(KvMutationError::InvalidMutationWithoutValue(op.to_string()))
}
};
Ok(Mutation {
@@ -562,7 +650,7 @@ type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>);
fn enqueue_from_v8(
value: V8Enqueue,
current_timestamp: DateTime<Utc>,
-) -> Result<Enqueue, AnyError> {
+) -> Result<Enqueue, std::io::Error> {
Ok(Enqueue {
payload: value.0.to_vec(),
deadline: current_timestamp
@@ -597,7 +685,7 @@ impl RawSelector {
prefix: Option<KvKey>,
start: Option<KvKey>,
end: Option<KvKey>,
- ) -> Result<Self, AnyError> {
+ ) -> Result<Self, KvError> {
let prefix = prefix.map(encode_v8_key).transpose()?;
let start = start.map(encode_v8_key).transpose()?;
let end = end.map(encode_v8_key).transpose()?;
@@ -610,9 +698,7 @@ impl RawSelector {
}),
(Some(prefix), Some(start), None) => {
if !start.starts_with(&prefix) || start.len() == prefix.len() {
- return Err(type_error(
- "Start key is not in the keyspace defined by prefix",
- ));
+ return Err(KvErrorKind::StartKeyNotInKeyspace.into_box());
}
Ok(Self::Prefixed {
prefix,
@@ -622,9 +708,7 @@ impl RawSelector {
}
(Some(prefix), None, Some(end)) => {
if !end.starts_with(&prefix) || end.len() == prefix.len() {
- return Err(type_error(
- "End key is not in the keyspace defined by prefix",
- ));
+ return Err(KvErrorKind::EndKeyNotInKeyspace.into_box());
}
Ok(Self::Prefixed {
prefix,
@@ -634,7 +718,7 @@ impl RawSelector {
}
(None, Some(start), Some(end)) => {
if start > end {
- return Err(type_error("Start key is greater than end key"));
+ return Err(KvErrorKind::StartKeyGreaterThanEndKey.into_box());
}
Ok(Self::Range { start, end })
}
@@ -642,7 +726,7 @@ impl RawSelector {
let end = start.iter().copied().chain(Some(0)).collect();
Ok(Self::Range { start, end })
}
- _ => Err(type_error("Invalid range")),
+ _ => Err(KvErrorKind::InvalidRange.into_box()),
}
}
@@ -701,10 +785,10 @@ fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] {
fn encode_cursor(
selector: &RawSelector,
boundary_key: &[u8],
-) -> Result<String, AnyError> {
+) -> Result<String, KvError> {
let common_prefix = selector.common_prefix();
if !boundary_key.starts_with(common_prefix) {
- return Err(type_error("Invalid boundary key"));
+ return Err(KvErrorKind::InvalidBoundaryKey.into_box());
}
Ok(BASE64_URL_SAFE.encode(&boundary_key[common_prefix.len()..]))
}
@@ -713,7 +797,7 @@ fn decode_selector_and_cursor(
selector: &RawSelector,
reverse: bool,
cursor: Option<&ByteString>,
-) -> Result<(Vec<u8>, Vec<u8>), AnyError> {
+) -> Result<(Vec<u8>, Vec<u8>), KvError> {
let Some(cursor) = cursor else {
return Ok((selector.range_start_key(), selector.range_end_key()));
};
@@ -721,7 +805,7 @@ fn decode_selector_and_cursor(
let common_prefix = selector.common_prefix();
let cursor = BASE64_URL_SAFE
.decode(cursor)
- .map_err(|_| type_error("invalid cursor"))?;
+ .map_err(|_| KvErrorKind::InvalidCursor)?;
let first_key: Vec<u8>;
let last_key: Vec<u8>;
@@ -746,13 +830,13 @@ fn decode_selector_and_cursor(
// Defend against out-of-bounds reading
if let Some(start) = selector.start() {
if &first_key[..] < start {
- return Err(type_error("cursor out of bounds"));
+ return Err(KvErrorKind::CursorOutOfBounds.into_box());
}
}
if let Some(end) = selector.end() {
if &last_key[..] > end {
- return Err(type_error("cursor out of bounds"));
+ return Err(KvErrorKind::CursorOutOfBounds.into_box());
}
}
@@ -767,15 +851,17 @@ async fn op_kv_atomic_write<DBH>(
#[serde] checks: Vec<V8KvCheck>,
#[serde] mutations: Vec<V8KvMutation>,
#[serde] enqueues: Vec<V8Enqueue>,
-) -> Result<Option<String>, AnyError>
+) -> Result<Option<String>, KvError>
where
DBH: DatabaseHandler + 'static,
{
let current_timestamp = chrono::Utc::now();
let db = {
let state = state.borrow();
- let resource =
- state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+ let resource = state
+ .resource_table
+ .get::<DatabaseResource<DBH::DB>>(rid)
+ .map_err(KvErrorKind::Resource)?;
resource.db.clone()
};
@@ -785,34 +871,28 @@ where
};
if checks.len() > config.max_checks {
- return Err(type_error(format!(
- "Too many checks (max {})",
- config.max_checks
- )));
+ return Err(KvErrorKind::TooManyChecks(config.max_checks).into_box());
}
if mutations.len() + enqueues.len() > config.max_mutations {
- return Err(type_error(format!(
- "Too many mutations (max {})",
- config.max_mutations
- )));
+ return Err(KvErrorKind::TooManyMutations(config.max_mutations).into_box());
}
let checks = checks
.into_iter()
.map(check_from_v8)
- .collect::<Result<Vec<Check>, AnyError>>()
- .with_context(|| "invalid check")?;
+ .collect::<Result<Vec<Check>, KvCheckError>>()
+ .map_err(KvErrorKind::InvalidCheck)?;
let mutations = mutations
.into_iter()
.map(|mutation| mutation_from_v8((mutation, current_timestamp)))
- .collect::<Result<Vec<Mutation>, AnyError>>()
- .with_context(|| "Invalid mutation")?;
+ .collect::<Result<Vec<Mutation>, KvMutationError>>()
+ .map_err(KvErrorKind::InvalidMutation)?;
let enqueues = enqueues
.into_iter()
.map(|e| enqueue_from_v8(e, current_timestamp))
- .collect::<Result<Vec<Enqueue>, AnyError>>()
- .with_context(|| "invalid enqueue")?;
+ .collect::<Result<Vec<Enqueue>, std::io::Error>>()
+ .map_err(KvErrorKind::InvalidEnqueue)?;
let mut total_payload_size = 0usize;
let mut total_key_size = 0usize;
@@ -823,7 +903,7 @@ where
.chain(mutations.iter().map(|m| &m.key))
{
if key.is_empty() {
- return Err(type_error("key cannot be empty"));
+ return Err(KvErrorKind::EmptyKey.into_box());
}
total_payload_size += check_write_key_size(key, &config)?;
@@ -847,17 +927,16 @@ where
}
if total_payload_size > config.max_total_mutation_size_bytes {
- return Err(type_error(format!(
- "Total mutation size too large (max {} bytes)",
- config.max_total_mutation_size_bytes
- )));
+ return Err(
+ KvErrorKind::TotalMutationTooLarge(config.max_total_mutation_size_bytes)
+ .into_box(),
+ );
}
if total_key_size > config.max_total_key_size_bytes {
- return Err(type_error(format!(
- "Total key size too large (max {} bytes)",
- config.max_total_key_size_bytes
- )));
+ return Err(
+ KvErrorKind::TotalKeyTooLarge(config.max_total_key_size_bytes).into_box(),
+ );
}
let atomic_write = AtomicWrite {
@@ -866,7 +945,10 @@ where
enqueues,
};
- let result = db.atomic_write(atomic_write).await?;
+ let result = db
+ .atomic_write(atomic_write)
+ .await
+ .map_err(KvErrorKind::Kv)?;
Ok(result.map(|res| faster_hex::hex_string(&res.versionstamp)))
}
@@ -879,19 +961,18 @@ type EncodeCursorRangeSelector = (Option<KvKey>, Option<KvKey>, Option<KvKey>);
fn op_kv_encode_cursor(
#[serde] (prefix, start, end): EncodeCursorRangeSelector,
#[serde] boundary_key: KvKey,
-) -> Result<String, AnyError> {
+) -> Result<String, KvError> {
let selector = RawSelector::from_tuple(prefix, start, end)?;
let boundary_key = encode_v8_key(boundary_key)?;
let cursor = encode_cursor(&selector, &boundary_key)?;
Ok(cursor)
}
-fn check_read_key_size(key: &[u8], config: &KvConfig) -> Result<(), AnyError> {
+fn check_read_key_size(key: &[u8], config: &KvConfig) -> Result<(), KvError> {
if key.len() > config.max_read_key_size_bytes {
- Err(type_error(format!(
- "Key too large for read (max {} bytes)",
- config.max_read_key_size_bytes
- )))
+ Err(
+ KvErrorKind::KeyTooLargeToRead(config.max_read_key_size_bytes).into_box(),
+ )
} else {
Ok(())
}
@@ -900,12 +981,12 @@ fn check_read_key_size(key: &[u8], config: &KvConfig) -> Result<(), AnyError> {
fn check_write_key_size(
key: &[u8],
config: &KvConfig,
-) -> Result<usize, AnyError> {
+) -> Result<usize, KvError> {
if key.len() > config.max_write_key_size_bytes {
- Err(type_error(format!(
- "Key too large for write (max {} bytes)",
- config.max_write_key_size_bytes
- )))
+ Err(
+ KvErrorKind::KeyTooLargeToWrite(config.max_write_key_size_bytes)
+ .into_box(),
+ )
} else {
Ok(key.len())
}
@@ -914,7 +995,7 @@ fn check_write_key_size(
fn check_value_size(
value: &KvValue,
config: &KvConfig,
-) -> Result<usize, AnyError> {
+) -> Result<usize, KvError> {
let payload = match value {
KvValue::Bytes(x) => x,
KvValue::V8(x) => x,
@@ -922,10 +1003,7 @@ fn check_value_size(
};
if payload.len() > config.max_value_size_bytes {
- Err(type_error(format!(
- "Value too large (max {} bytes)",
- config.max_value_size_bytes
- )))
+ Err(KvErrorKind::ValueTooLarge(config.max_value_size_bytes).into_box())
} else {
Ok(payload.len())
}
@@ -934,12 +1012,12 @@ fn check_value_size(
fn check_enqueue_payload_size(
payload: &[u8],
config: &KvConfig,
-) -> Result<usize, AnyError> {
+) -> Result<usize, KvError> {
if payload.len() > config.max_value_size_bytes {
- Err(type_error(format!(
- "enqueue payload too large (max {} bytes)",
- config.max_value_size_bytes
- )))
+ Err(
+ KvErrorKind::EnqueuePayloadTooLarge(config.max_value_size_bytes)
+ .into_box(),
+ )
} else {
Ok(payload.len())
}