summaryrefslogtreecommitdiff
path: root/ext/kv/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r--ext/kv/lib.rs232
1 files changed, 122 insertions, 110 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index fb68596fa..9e2273108 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -1,9 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-pub mod codec;
pub mod dynamic;
mod interface;
-mod proto;
pub mod remote;
pub mod sqlite;
mod time;
@@ -12,11 +10,12 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::num::NonZeroU32;
use std::rc::Rc;
+use std::time::Duration;
use base64::prelude::BASE64_URL_SAFE;
use base64::Engine;
-use codec::decode_key;
-use codec::encode_key;
+use chrono::DateTime;
+use chrono::Utc;
use deno_core::anyhow::Context;
use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
@@ -30,8 +29,26 @@ use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ToJsBuffer;
+use denokv_proto::decode_key;
+use denokv_proto::encode_key;
+use denokv_proto::AtomicWrite;
+use denokv_proto::Check;
+use denokv_proto::Consistency;
+use denokv_proto::Database;
+use denokv_proto::Enqueue;
+use denokv_proto::Key;
+use denokv_proto::KeyPart;
+use denokv_proto::KvEntry;
+use denokv_proto::KvValue;
+use denokv_proto::Mutation;
+use denokv_proto::MutationKind;
+use denokv_proto::QueueMessageHandle;
+use denokv_proto::ReadRange;
+use denokv_proto::SnapshotReadOptions;
+use log::debug;
use serde::Deserialize;
use serde::Serialize;
+use time::utc_now;
pub use crate::interface::*;
@@ -110,30 +127,26 @@ where
type KvKey = Vec<AnyValue>;
-impl From<AnyValue> for KeyPart {
- fn from(value: AnyValue) -> Self {
- match value {
- AnyValue::Bool(false) => KeyPart::False,
- AnyValue::Bool(true) => KeyPart::True,
- AnyValue::Number(n) => KeyPart::Float(n),
- AnyValue::BigInt(n) => KeyPart::Int(n),
- AnyValue::String(s) => KeyPart::String(s),
- AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
- AnyValue::RustBuffer(_) => unreachable!(),
- }
+fn key_part_from_v8(value: AnyValue) -> KeyPart {
+ match value {
+ AnyValue::Bool(false) => KeyPart::False,
+ AnyValue::Bool(true) => KeyPart::True,
+ AnyValue::Number(n) => KeyPart::Float(n),
+ AnyValue::BigInt(n) => KeyPart::Int(n),
+ AnyValue::String(s) => KeyPart::String(s),
+ AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
+ AnyValue::RustBuffer(_) => unreachable!(),
}
}
-impl From<KeyPart> for AnyValue {
- fn from(value: KeyPart) -> Self {
- match value {
- KeyPart::False => AnyValue::Bool(false),
- KeyPart::True => AnyValue::Bool(true),
- KeyPart::Float(n) => AnyValue::Number(n),
- KeyPart::Int(n) => AnyValue::BigInt(n),
- KeyPart::String(s) => AnyValue::String(s),
- KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()),
- }
+fn key_part_to_v8(value: KeyPart) -> AnyValue {
+ match value {
+ KeyPart::False => AnyValue::Bool(false),
+ KeyPart::True => AnyValue::Bool(true),
+ KeyPart::Float(n) => AnyValue::Number(n),
+ KeyPart::Int(n) => AnyValue::BigInt(n),
+ KeyPart::String(s) => AnyValue::String(s),
+ KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()),
}
}
@@ -153,25 +166,25 @@ enum ToV8Value {
U64(BigInt),
}
-impl TryFrom<FromV8Value> for Value {
+impl TryFrom<FromV8Value> for KvValue {
type Error = AnyError;
fn try_from(value: FromV8Value) -> Result<Self, AnyError> {
Ok(match value {
- FromV8Value::V8(buf) => Value::V8(buf.to_vec()),
- FromV8Value::Bytes(buf) => Value::Bytes(buf.to_vec()),
+ FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()),
+ FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()),
FromV8Value::U64(n) => {
- Value::U64(num_bigint::BigInt::from(n).try_into()?)
+ KvValue::U64(num_bigint::BigInt::from(n).try_into()?)
}
})
}
}
-impl From<Value> for ToV8Value {
- fn from(value: Value) -> Self {
+impl From<KvValue> for ToV8Value {
+ fn from(value: KvValue) -> Self {
match value {
- Value::V8(buf) => ToV8Value::V8(buf.into()),
- Value::Bytes(buf) => ToV8Value::Bytes(buf.into()),
- Value::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()),
+ KvValue::V8(buf) => ToV8Value::V8(buf.into()),
+ KvValue::Bytes(buf) => ToV8Value::Bytes(buf.into()),
+ KvValue::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()),
}
}
}
@@ -190,7 +203,7 @@ impl TryFrom<KvEntry> for ToV8KvEntry {
key: decode_key(&entry.key)?
.0
.into_iter()
- .map(Into::into)
+ .map(key_part_to_v8)
.collect(),
value: entry.value.into(),
versionstamp: hex::encode(entry.versionstamp).into(),
@@ -282,8 +295,7 @@ where
let opts = SnapshotReadOptions {
consistency: consistency.into(),
};
- let output_ranges =
- db.snapshot_read(state.clone(), read_ranges, opts).await?;
+ let output_ranges = db.snapshot_read(read_ranges, opts).await?;
let output_ranges = output_ranges
.into_iter()
.map(|x| {
@@ -302,7 +314,7 @@ struct QueueMessageResource<QPH: QueueMessageHandle + 'static> {
impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
fn name(&self) -> Cow<str> {
- "queue_message".into()
+ "queueMessage".into()
}
}
@@ -331,7 +343,7 @@ where
resource.db.clone()
};
- let Some(mut handle) = db.dequeue_next_message(state.clone()).await? else {
+ let Some(mut handle) = db.dequeue_next_message().await? else {
return Ok(None);
};
let payload = handle.take_payload().await?.into();
@@ -361,81 +373,81 @@ where
.map_err(|_| type_error("Queue message not found"))?
.handle
};
- handle.finish(success).await
+ // if we fail to finish the message, there is not much we can do and the
+ // message will be retried anyway, so we just ignore the error
+ if let Err(err) = handle.finish(success).await {
+ debug!("Failed to finish dequeued message: {}", err);
+ };
+ Ok(())
}
type V8KvCheck = (KvKey, Option<ByteString>);
-impl TryFrom<V8KvCheck> for KvCheck {
- type Error = AnyError;
- fn try_from(value: V8KvCheck) -> Result<Self, AnyError> {
- let versionstamp = match value.1 {
- Some(data) => {
- let mut out = [0u8; 10];
- hex::decode_to_slice(data, &mut out)
- .map_err(|_| type_error("invalid versionstamp"))?;
- Some(out)
- }
- None => None,
- };
- Ok(KvCheck {
- key: encode_v8_key(value.0)?,
- versionstamp,
- })
- }
+fn check_from_v8(value: V8KvCheck) -> Result<Check, AnyError> {
+ let versionstamp = match value.1 {
+ Some(data) => {
+ let mut out = [0u8; 10];
+ hex::decode_to_slice(data, &mut out)
+ .map_err(|_| type_error("invalid versionstamp"))?;
+ Some(out)
+ }
+ None => None,
+ };
+ Ok(Check {
+ key: encode_v8_key(value.0)?,
+ versionstamp,
+ })
}
type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);
-impl TryFrom<(V8KvMutation, u64)> for KvMutation {
- type Error = AnyError;
- fn try_from(
- (value, current_timstamp): (V8KvMutation, u64),
- ) -> Result<Self, AnyError> {
- let key = encode_v8_key(value.0)?;
- let kind = match (value.1.as_str(), value.2) {
- ("set", Some(value)) => MutationKind::Set(value.try_into()?),
- ("delete", None) => MutationKind::Delete,
- ("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
- ("min", Some(value)) => MutationKind::Min(value.try_into()?),
- ("max", Some(value)) => MutationKind::Max(value.try_into()?),
- (op, Some(_)) => {
- return Err(type_error(format!("invalid mutation '{op}' with value")))
- }
- (op, None) => {
- return Err(type_error(format!(
- "invalid mutation '{op}' without value"
- )))
- }
- };
- Ok(KvMutation {
- key,
- kind,
- expire_at: value.3.map(|expire_in| current_timstamp + expire_in),
- })
- }
+fn mutation_from_v8(
+ (value, current_timstamp): (V8KvMutation, DateTime<Utc>),
+) -> Result<Mutation, AnyError> {
+ let key = encode_v8_key(value.0)?;
+ let kind = match (value.1.as_str(), value.2) {
+ ("set", Some(value)) => MutationKind::Set(value.try_into()?),
+ ("delete", None) => MutationKind::Delete,
+ ("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
+ ("min", Some(value)) => MutationKind::Min(value.try_into()?),
+ ("max", Some(value)) => MutationKind::Max(value.try_into()?),
+ (op, Some(_)) => {
+ return Err(type_error(format!("invalid mutation '{op}' with value")))
+ }
+ (op, None) => {
+ return Err(type_error(format!("invalid mutation '{op}' without value")))
+ }
+ };
+ Ok(Mutation {
+ key,
+ kind,
+ expire_at: value
+ .3
+ .map(|expire_in| current_timstamp + Duration::from_millis(expire_in)),
+ })
}
type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>);
-impl TryFrom<V8Enqueue> for Enqueue {
- type Error = AnyError;
- fn try_from(value: V8Enqueue) -> Result<Self, AnyError> {
- Ok(Enqueue {
- payload: value.0.to_vec(),
- delay_ms: value.1,
- keys_if_undelivered: value
- .2
- .into_iter()
- .map(encode_v8_key)
- .collect::<std::io::Result<_>>()?,
- backoff_schedule: value.3,
- })
- }
+fn enqueue_from_v8(
+ value: V8Enqueue,
+ current_timestamp: DateTime<Utc>,
+) -> Result<Enqueue, AnyError> {
+ Ok(Enqueue {
+ payload: value.0.to_vec(),
+ deadline: current_timestamp
+ + chrono::Duration::milliseconds(value.1 as i64),
+ keys_if_undelivered: value
+ .2
+ .into_iter()
+ .map(encode_v8_key)
+ .collect::<std::io::Result<_>>()?,
+ backoff_schedule: value.3,
+ })
}
fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> {
- encode_key(&Key(key.into_iter().map(From::from).collect()))
+ encode_key(&Key(key.into_iter().map(key_part_from_v8).collect()))
}
enum RawSelector {
@@ -610,7 +622,7 @@ async fn op_kv_atomic_write<DBH>(
where
DBH: DatabaseHandler + 'static,
{
- let current_timestamp = time::utc_now().timestamp_millis() as u64;
+ let current_timestamp = utc_now();
let db = {
let state = state.borrow();
let resource =
@@ -631,17 +643,17 @@ where
let checks = checks
.into_iter()
- .map(TryInto::try_into)
- .collect::<Result<Vec<KvCheck>, AnyError>>()
+ .map(check_from_v8)
+ .collect::<Result<Vec<Check>, AnyError>>()
.with_context(|| "invalid check")?;
let mutations = mutations
.into_iter()
- .map(|mutation| TryFrom::try_from((mutation, current_timestamp)))
- .collect::<Result<Vec<KvMutation>, AnyError>>()
+ .map(|mutation| mutation_from_v8((mutation, current_timestamp)))
+ .collect::<Result<Vec<Mutation>, AnyError>>()
.with_context(|| "invalid mutation")?;
let enqueues = enqueues
.into_iter()
- .map(TryInto::try_into)
+ .map(|e| enqueue_from_v8(e, current_timestamp))
.collect::<Result<Vec<Enqueue>, AnyError>>()
.with_context(|| "invalid enqueue")?;
@@ -690,7 +702,7 @@ where
enqueues,
};
- let result = db.atomic_write(state.clone(), atomic_write).await?;
+ let result = db.atomic_write(atomic_write).await?;
Ok(result.map(|res| hex::encode(res.versionstamp)))
}
@@ -732,11 +744,11 @@ fn check_write_key_size(key: &[u8]) -> Result<usize, AnyError> {
}
}
-fn check_value_size(value: &Value) -> Result<usize, AnyError> {
+fn check_value_size(value: &KvValue) -> Result<usize, AnyError> {
let payload = match value {
- Value::Bytes(x) => x,
- Value::V8(x) => x,
- Value::U64(_) => return Ok(8),
+ KvValue::Bytes(x) => x,
+ KvValue::V8(x) => x,
+ KvValue::U64(_) => return Ok(8),
};
if payload.len() > MAX_VALUE_SIZE_BYTES {