diff options
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r-- | ext/kv/lib.rs | 232 |
1 files changed, 122 insertions, 110 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index fb68596fa..9e2273108 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -1,9 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -pub mod codec; pub mod dynamic; mod interface; -mod proto; pub mod remote; pub mod sqlite; mod time; @@ -12,11 +10,12 @@ use std::borrow::Cow; use std::cell::RefCell; use std::num::NonZeroU32; use std::rc::Rc; +use std::time::Duration; use base64::prelude::BASE64_URL_SAFE; use base64::Engine; -use codec::decode_key; -use codec::encode_key; +use chrono::DateTime; +use chrono::Utc; use deno_core::anyhow::Context; use deno_core::error::get_custom_error_class; use deno_core::error::type_error; @@ -30,8 +29,26 @@ use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ToJsBuffer; +use denokv_proto::decode_key; +use denokv_proto::encode_key; +use denokv_proto::AtomicWrite; +use denokv_proto::Check; +use denokv_proto::Consistency; +use denokv_proto::Database; +use denokv_proto::Enqueue; +use denokv_proto::Key; +use denokv_proto::KeyPart; +use denokv_proto::KvEntry; +use denokv_proto::KvValue; +use denokv_proto::Mutation; +use denokv_proto::MutationKind; +use denokv_proto::QueueMessageHandle; +use denokv_proto::ReadRange; +use denokv_proto::SnapshotReadOptions; +use log::debug; use serde::Deserialize; use serde::Serialize; +use time::utc_now; pub use crate::interface::*; @@ -110,30 +127,26 @@ where type KvKey = Vec<AnyValue>; -impl From<AnyValue> for KeyPart { - fn from(value: AnyValue) -> Self { - match value { - AnyValue::Bool(false) => KeyPart::False, - AnyValue::Bool(true) => KeyPart::True, - AnyValue::Number(n) => KeyPart::Float(n), - AnyValue::BigInt(n) => KeyPart::Int(n), - AnyValue::String(s) => KeyPart::String(s), - AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()), - AnyValue::RustBuffer(_) => unreachable!(), - } +fn key_part_from_v8(value: AnyValue) -> KeyPart { + match value { + AnyValue::Bool(false) => KeyPart::False, + AnyValue::Bool(true) => KeyPart::True, + AnyValue::Number(n) => KeyPart::Float(n), + AnyValue::BigInt(n) => KeyPart::Int(n), + AnyValue::String(s) => KeyPart::String(s), + AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()), + AnyValue::RustBuffer(_) => unreachable!(), } } -impl From<KeyPart> for AnyValue { - fn from(value: KeyPart) -> Self { - match value { - KeyPart::False => AnyValue::Bool(false), - KeyPart::True => AnyValue::Bool(true), - KeyPart::Float(n) => AnyValue::Number(n), - KeyPart::Int(n) => AnyValue::BigInt(n), - KeyPart::String(s) => AnyValue::String(s), - KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()), - } +fn key_part_to_v8(value: KeyPart) -> AnyValue { + match value { + KeyPart::False => AnyValue::Bool(false), + KeyPart::True => AnyValue::Bool(true), + KeyPart::Float(n) => AnyValue::Number(n), + KeyPart::Int(n) => AnyValue::BigInt(n), + KeyPart::String(s) => AnyValue::String(s), + KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()), } } @@ -153,25 +166,25 @@ enum ToV8Value { U64(BigInt), } -impl TryFrom<FromV8Value> for Value { +impl TryFrom<FromV8Value> for KvValue { type Error = AnyError; fn try_from(value: FromV8Value) -> Result<Self, AnyError> { Ok(match value { - FromV8Value::V8(buf) => Value::V8(buf.to_vec()), - FromV8Value::Bytes(buf) => Value::Bytes(buf.to_vec()), + FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()), + FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()), FromV8Value::U64(n) => { - Value::U64(num_bigint::BigInt::from(n).try_into()?) + KvValue::U64(num_bigint::BigInt::from(n).try_into()?) } }) } } -impl From<Value> for ToV8Value { - fn from(value: Value) -> Self { +impl From<KvValue> for ToV8Value { + fn from(value: KvValue) -> Self { match value { - Value::V8(buf) => ToV8Value::V8(buf.into()), - Value::Bytes(buf) => ToV8Value::Bytes(buf.into()), - Value::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()), + KvValue::V8(buf) => ToV8Value::V8(buf.into()), + KvValue::Bytes(buf) => ToV8Value::Bytes(buf.into()), + KvValue::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()), } } } @@ -190,7 +203,7 @@ impl TryFrom<KvEntry> for ToV8KvEntry { key: decode_key(&entry.key)? .0 .into_iter() - .map(Into::into) + .map(key_part_to_v8) .collect(), value: entry.value.into(), versionstamp: hex::encode(entry.versionstamp).into(), @@ -282,8 +295,7 @@ where let opts = SnapshotReadOptions { consistency: consistency.into(), }; - let output_ranges = - db.snapshot_read(state.clone(), read_ranges, opts).await?; + let output_ranges = db.snapshot_read(read_ranges, opts).await?; let output_ranges = output_ranges .into_iter() .map(|x| { @@ -302,7 +314,7 @@ struct QueueMessageResource<QPH: QueueMessageHandle + 'static> { impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> { fn name(&self) -> Cow<str> { - "queue_message".into() + "queueMessage".into() } } @@ -331,7 +343,7 @@ where resource.db.clone() }; - let Some(mut handle) = db.dequeue_next_message(state.clone()).await? else { + let Some(mut handle) = db.dequeue_next_message().await? else { return Ok(None); }; let payload = handle.take_payload().await?.into(); @@ -361,81 +373,81 @@ where .map_err(|_| type_error("Queue message not found"))? .handle }; - handle.finish(success).await + // if we fail to finish the message, there is not much we can do and the + // message will be retried anyway, so we just ignore the error + if let Err(err) = handle.finish(success).await { + debug!("Failed to finish dequeued message: {}", err); + }; + Ok(()) } type V8KvCheck = (KvKey, Option<ByteString>); -impl TryFrom<V8KvCheck> for KvCheck { - type Error = AnyError; - fn try_from(value: V8KvCheck) -> Result<Self, AnyError> { - let versionstamp = match value.1 { - Some(data) => { - let mut out = [0u8; 10]; - hex::decode_to_slice(data, &mut out) - .map_err(|_| type_error("invalid versionstamp"))?; - Some(out) - } - None => None, - }; - Ok(KvCheck { - key: encode_v8_key(value.0)?, - versionstamp, - }) - } +fn check_from_v8(value: V8KvCheck) -> Result<Check, AnyError> { + let versionstamp = match value.1 { + Some(data) => { + let mut out = [0u8; 10]; + hex::decode_to_slice(data, &mut out) + .map_err(|_| type_error("invalid versionstamp"))?; + Some(out) + } + None => None, + }; + Ok(Check { + key: encode_v8_key(value.0)?, + versionstamp, + }) } type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>); -impl TryFrom<(V8KvMutation, u64)> for KvMutation { - type Error = AnyError; - fn try_from( - (value, current_timstamp): (V8KvMutation, u64), - ) -> Result<Self, AnyError> { - let key = encode_v8_key(value.0)?; - let kind = match (value.1.as_str(), value.2) { - ("set", Some(value)) => MutationKind::Set(value.try_into()?), - ("delete", None) => MutationKind::Delete, - ("sum", Some(value)) => MutationKind::Sum(value.try_into()?), - ("min", Some(value)) => MutationKind::Min(value.try_into()?), - ("max", Some(value)) => MutationKind::Max(value.try_into()?), - (op, Some(_)) => { - return Err(type_error(format!("invalid mutation '{op}' with value"))) - } - (op, None) => { - return Err(type_error(format!( - "invalid mutation '{op}' without value" - ))) - } - }; - Ok(KvMutation { - key, - kind, - expire_at: value.3.map(|expire_in| current_timstamp + expire_in), - }) - } +fn mutation_from_v8( + (value, current_timstamp): (V8KvMutation, DateTime<Utc>), +) -> Result<Mutation, AnyError> { + let key = encode_v8_key(value.0)?; + let kind = match (value.1.as_str(), value.2) { + ("set", Some(value)) => MutationKind::Set(value.try_into()?), + ("delete", None) => MutationKind::Delete, + ("sum", Some(value)) => MutationKind::Sum(value.try_into()?), + ("min", Some(value)) => MutationKind::Min(value.try_into()?), + ("max", Some(value)) => MutationKind::Max(value.try_into()?), + (op, Some(_)) => { + return Err(type_error(format!("invalid mutation '{op}' with value"))) + } + (op, None) => { + return Err(type_error(format!("invalid mutation '{op}' without value"))) + } + }; + Ok(Mutation { + key, + kind, + expire_at: value + .3 + .map(|expire_in| current_timstamp + Duration::from_millis(expire_in)), + }) } type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>); -impl TryFrom<V8Enqueue> for Enqueue { - type Error = AnyError; - fn try_from(value: V8Enqueue) -> Result<Self, AnyError> { - Ok(Enqueue { - payload: value.0.to_vec(), - delay_ms: value.1, - keys_if_undelivered: value - .2 - .into_iter() - .map(encode_v8_key) - .collect::<std::io::Result<_>>()?, - backoff_schedule: value.3, - }) - } +fn enqueue_from_v8( + value: V8Enqueue, + current_timestamp: DateTime<Utc>, +) -> Result<Enqueue, AnyError> { + Ok(Enqueue { + payload: value.0.to_vec(), + deadline: current_timestamp + + chrono::Duration::milliseconds(value.1 as i64), + keys_if_undelivered: value + .2 + .into_iter() + .map(encode_v8_key) + .collect::<std::io::Result<_>>()?, + backoff_schedule: value.3, + }) } fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> { - encode_key(&Key(key.into_iter().map(From::from).collect())) + encode_key(&Key(key.into_iter().map(key_part_from_v8).collect())) } enum RawSelector { @@ -610,7 +622,7 @@ async fn op_kv_atomic_write<DBH>( where DBH: DatabaseHandler + 'static, { - let current_timestamp = time::utc_now().timestamp_millis() as u64; + let current_timestamp = utc_now(); let db = { let state = state.borrow(); let resource = @@ -631,17 +643,17 @@ where let checks = checks .into_iter() - .map(TryInto::try_into) - .collect::<Result<Vec<KvCheck>, AnyError>>() + .map(check_from_v8) + .collect::<Result<Vec<Check>, AnyError>>() .with_context(|| "invalid check")?; let mutations = mutations .into_iter() - .map(|mutation| TryFrom::try_from((mutation, current_timestamp))) - .collect::<Result<Vec<KvMutation>, AnyError>>() + .map(|mutation| mutation_from_v8((mutation, current_timestamp))) + .collect::<Result<Vec<Mutation>, AnyError>>() .with_context(|| "invalid mutation")?; let enqueues = enqueues .into_iter() - .map(TryInto::try_into) + .map(|e| enqueue_from_v8(e, current_timestamp)) .collect::<Result<Vec<Enqueue>, AnyError>>() .with_context(|| "invalid enqueue")?; @@ -690,7 +702,7 @@ where enqueues, }; - let result = db.atomic_write(state.clone(), atomic_write).await?; + let result = db.atomic_write(atomic_write).await?; Ok(result.map(|res| hex::encode(res.versionstamp))) } @@ -732,11 +744,11 @@ fn check_write_key_size(key: &[u8]) -> Result<usize, AnyError> { } } -fn check_value_size(value: &Value) -> Result<usize, AnyError> { +fn check_value_size(value: &KvValue) -> Result<usize, AnyError> { let payload = match value { - Value::Bytes(x) => x, - Value::V8(x) => x, - Value::U64(_) => return Ok(8), + KvValue::Bytes(x) => x, + KvValue::V8(x) => x, + KvValue::U64(_) => return Ok(8), }; if payload.len() > MAX_VALUE_SIZE_BYTES { |