summaryrefslogtreecommitdiff
path: root/ext/kv/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r--ext/kv/lib.rs541
1 files changed, 541 insertions, 0 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
new file mode 100644
index 000000000..49a59af74
--- /dev/null
+++ b/ext/kv/lib.rs
@@ -0,0 +1,541 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+pub mod codec;
+mod interface;
+pub mod sqlite;
+
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::num::NonZeroU32;
+use std::rc::Rc;
+
+use codec::decode_key;
+use codec::encode_key;
+use deno_core::anyhow::Context;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op;
+use deno_core::serde_v8::AnyValue;
+use deno_core::serde_v8::BigInt;
+use deno_core::ByteString;
+use deno_core::OpState;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use serde::Serialize;
+
+pub use crate::interface::*;
+
+struct UnstableChecker {
+ pub unstable: bool,
+}
+
+impl UnstableChecker {
+ // NOTE(bartlomieju): keep in sync with `cli/program_state.rs`
+ pub fn check_unstable(&self, api_name: &str) {
+ if !self.unstable {
+ eprintln!(
+ "Unstable API '{api_name}'. The --unstable flag must be provided."
+ );
+ std::process::exit(70);
+ }
+ }
+}
+
+deno_core::extension!(deno_kv,
+ // TODO(bartlomieju): specify deps
+ deps = [ ],
+ parameters = [ DBH: DatabaseHandler ],
+ ops = [
+ op_kv_database_open<DBH>,
+ op_kv_snapshot_read<DBH>,
+ op_kv_atomic_write<DBH>,
+ op_kv_encode_cursor,
+ ],
+ esm = [ "01_db.ts" ],
+ options = {
+ handler: DBH,
+ unstable: bool,
+ },
+ state = |state, options| {
+ state.put(Rc::new(options.handler));
+ state.put(UnstableChecker { unstable: options.unstable })
+ }
+);
+
+struct DatabaseResource<DB: Database + 'static> {
+ db: Rc<DB>,
+}
+
+impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
+ fn name(&self) -> Cow<str> {
+ "database".into()
+ }
+}
+
+#[op]
+async fn op_kv_database_open<DBH>(
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+) -> Result<ResourceId, AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let handler = {
+ let state = state.borrow();
+ state
+ .borrow::<UnstableChecker>()
+ .check_unstable("Deno.openKv");
+ state.borrow::<Rc<DBH>>().clone()
+ };
+ let db = handler.open(state.clone(), path).await?;
+ let rid = state
+ .borrow_mut()
+ .resource_table
+ .add(DatabaseResource { db: Rc::new(db) });
+ Ok(rid)
+}
+
+type KvKey = Vec<AnyValue>;
+
+impl From<AnyValue> for KeyPart {
+ fn from(value: AnyValue) -> Self {
+ match value {
+ AnyValue::Bool(false) => KeyPart::True,
+ AnyValue::Bool(true) => KeyPart::False,
+ AnyValue::Number(n) => KeyPart::Float(n),
+ AnyValue::BigInt(n) => KeyPart::Int(n),
+ AnyValue::String(s) => KeyPart::String(s),
+ AnyValue::Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
+ }
+ }
+}
+
+impl From<KeyPart> for AnyValue {
+ fn from(value: KeyPart) -> Self {
+ match value {
+ KeyPart::True => AnyValue::Bool(false),
+ KeyPart::False => AnyValue::Bool(true),
+ KeyPart::Float(n) => AnyValue::Number(n),
+ KeyPart::Int(n) => AnyValue::BigInt(n),
+ KeyPart::String(s) => AnyValue::String(s),
+ KeyPart::Bytes(buf) => AnyValue::Buffer(buf.into()),
+ }
+ }
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
+enum V8Value {
+ V8(ZeroCopyBuf),
+ Bytes(ZeroCopyBuf),
+ U64(BigInt),
+}
+
+impl TryFrom<V8Value> for Value {
+ type Error = AnyError;
+ fn try_from(value: V8Value) -> Result<Self, AnyError> {
+ Ok(match value {
+ V8Value::V8(buf) => Value::V8(buf.to_vec()),
+ V8Value::Bytes(buf) => Value::Bytes(buf.to_vec()),
+ V8Value::U64(n) => Value::U64(num_bigint::BigInt::from(n).try_into()?),
+ })
+ }
+}
+
+impl From<Value> for V8Value {
+ fn from(value: Value) -> Self {
+ match value {
+ Value::V8(buf) => V8Value::V8(buf.into()),
+ Value::Bytes(buf) => V8Value::Bytes(buf.into()),
+ Value::U64(n) => V8Value::U64(num_bigint::BigInt::from(n).into()),
+ }
+ }
+}
+
+#[derive(Deserialize, Serialize)]
+struct V8KvEntry {
+ key: KvKey,
+ value: V8Value,
+ versionstamp: ByteString,
+}
+
+impl TryFrom<KvEntry> for V8KvEntry {
+ type Error = AnyError;
+ fn try_from(entry: KvEntry) -> Result<Self, AnyError> {
+ Ok(V8KvEntry {
+ key: decode_key(&entry.key)?
+ .0
+ .into_iter()
+ .map(Into::into)
+ .collect(),
+ value: entry.value.into(),
+ versionstamp: hex::encode(entry.versionstamp).into(),
+ })
+ }
+}
+
+#[derive(Deserialize, Serialize)]
+#[serde(rename_all = "camelCase")]
+enum V8Consistency {
+ Strong,
+ Eventual,
+}
+
+impl From<V8Consistency> for Consistency {
+ fn from(value: V8Consistency) -> Self {
+ match value {
+ V8Consistency::Strong => Consistency::Strong,
+ V8Consistency::Eventual => Consistency::Eventual,
+ }
+ }
+}
+
+// (prefix, start, end, limit, reverse, cursor)
+type SnapshotReadRange = (
+ Option<KvKey>,
+ Option<KvKey>,
+ Option<KvKey>,
+ u32,
+ bool,
+ Option<ByteString>,
+);
+
+#[op]
+async fn op_kv_snapshot_read<DBH>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ ranges: Vec<SnapshotReadRange>,
+ consistency: V8Consistency,
+) -> Result<Vec<Vec<V8KvEntry>>, AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let db = {
+ let state = state.borrow();
+ let resource =
+ state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+ resource.db.clone()
+ };
+ let read_ranges = ranges
+ .into_iter()
+ .map(|(prefix, start, end, limit, reverse, cursor)| {
+ let selector = RawSelector::from_tuple(prefix, start, end)?;
+
+ let (start, end) =
+ decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?;
+ Ok(ReadRange {
+ start,
+ end,
+ limit: NonZeroU32::new(limit)
+ .with_context(|| "limit must be greater than 0")?,
+ reverse,
+ })
+ })
+ .collect::<Result<Vec<_>, AnyError>>()?;
+ let opts = SnapshotReadOptions {
+ consistency: consistency.into(),
+ };
+ let output_ranges = db.snapshot_read(read_ranges, opts).await?;
+ let output_ranges = output_ranges
+ .into_iter()
+ .map(|x| {
+ x.entries
+ .into_iter()
+ .map(TryInto::try_into)
+ .collect::<Result<Vec<_>, AnyError>>()
+ })
+ .collect::<Result<Vec<_>, AnyError>>()?;
+ Ok(output_ranges)
+}
+
+type V8KvCheck = (KvKey, Option<ByteString>);
+
+impl TryFrom<V8KvCheck> for KvCheck {
+ type Error = AnyError;
+ fn try_from(value: V8KvCheck) -> Result<Self, AnyError> {
+ let versionstamp = match value.1 {
+ Some(data) => {
+ let mut out = [0u8; 10];
+ hex::decode_to_slice(data, &mut out)
+ .map_err(|_| type_error("invalid versionstamp"))?;
+ Some(out)
+ }
+ None => None,
+ };
+ Ok(KvCheck {
+ key: encode_v8_key(value.0)?,
+ versionstamp,
+ })
+ }
+}
+
+type V8KvMutation = (KvKey, String, Option<V8Value>);
+
+impl TryFrom<V8KvMutation> for KvMutation {
+ type Error = AnyError;
+ fn try_from(value: V8KvMutation) -> Result<Self, AnyError> {
+ let key = encode_v8_key(value.0)?;
+ let kind = match (value.1.as_str(), value.2) {
+ ("set", Some(value)) => MutationKind::Set(value.try_into()?),
+ ("delete", None) => MutationKind::Delete,
+ ("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
+ ("min", Some(value)) => MutationKind::Min(value.try_into()?),
+ ("max", Some(value)) => MutationKind::Max(value.try_into()?),
+ (op, Some(_)) => {
+ return Err(type_error(format!("invalid mutation '{op}' with value")))
+ }
+ (op, None) => {
+ return Err(type_error(format!(
+ "invalid mutation '{op}' without value"
+ )))
+ }
+ };
+ Ok(KvMutation { key, kind })
+ }
+}
+
+type V8Enqueue = (ZeroCopyBuf, u64, Vec<KvKey>, Option<Vec<u32>>);
+
+impl TryFrom<V8Enqueue> for Enqueue {
+ type Error = AnyError;
+ fn try_from(value: V8Enqueue) -> Result<Self, AnyError> {
+ Ok(Enqueue {
+ payload: value.0.to_vec(),
+ deadline_ms: value.1,
+ keys_if_undelivered: value
+ .2
+ .into_iter()
+ .map(encode_v8_key)
+ .collect::<std::io::Result<_>>()?,
+ backoff_schedule: value.3,
+ })
+ }
+}
+
+fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> {
+ encode_key(&Key(key.into_iter().map(From::from).collect()))
+}
+
+enum RawSelector {
+ Prefixed {
+ prefix: Vec<u8>,
+ start: Option<Vec<u8>>,
+ end: Option<Vec<u8>>,
+ },
+ Range {
+ start: Vec<u8>,
+ end: Vec<u8>,
+ },
+}
+
+impl RawSelector {
+ fn from_tuple(
+ prefix: Option<KvKey>,
+ start: Option<KvKey>,
+ end: Option<KvKey>,
+ ) -> Result<Self, AnyError> {
+ let prefix = prefix.map(encode_v8_key).transpose()?;
+ let start = start.map(encode_v8_key).transpose()?;
+ let end = end.map(encode_v8_key).transpose()?;
+
+ match (prefix, start, end) {
+ (Some(prefix), None, None) => Ok(Self::Prefixed {
+ prefix,
+ start: None,
+ end: None,
+ }),
+ (Some(prefix), Some(start), None) => Ok(Self::Prefixed {
+ prefix,
+ start: Some(start),
+ end: None,
+ }),
+ (Some(prefix), None, Some(end)) => Ok(Self::Prefixed {
+ prefix,
+ start: None,
+ end: Some(end),
+ }),
+ (None, Some(start), Some(end)) => Ok(Self::Range { start, end }),
+ (None, Some(start), None) => {
+ let end = start.iter().copied().chain(Some(0)).collect();
+ Ok(Self::Range { start, end })
+ }
+ _ => Err(type_error("invalid range")),
+ }
+ }
+
+ fn start(&self) -> Option<&[u8]> {
+ match self {
+ Self::Prefixed { start, .. } => start.as_deref(),
+ Self::Range { start, .. } => Some(start),
+ }
+ }
+
+ fn end(&self) -> Option<&[u8]> {
+ match self {
+ Self::Prefixed { end, .. } => end.as_deref(),
+ Self::Range { end, .. } => Some(end),
+ }
+ }
+
+ fn common_prefix(&self) -> &[u8] {
+ match self {
+ Self::Prefixed { prefix, .. } => prefix,
+ Self::Range { start, end } => common_prefix_for_bytes(start, end),
+ }
+ }
+
+ fn range_start_key(&self) -> Vec<u8> {
+ match self {
+ Self::Prefixed {
+ start: Some(start), ..
+ } => start.clone(),
+ Self::Range { start, .. } => start.clone(),
+ Self::Prefixed { prefix, .. } => {
+ prefix.iter().copied().chain(Some(0)).collect()
+ }
+ }
+ }
+
+ fn range_end_key(&self) -> Vec<u8> {
+ match self {
+ Self::Prefixed { end: Some(end), .. } => end.clone(),
+ Self::Range { end, .. } => end.clone(),
+ Self::Prefixed { prefix, .. } => {
+ prefix.iter().copied().chain(Some(0xff)).collect()
+ }
+ }
+ }
+}
+
+fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] {
+ let mut i = 0;
+ while i < a.len() && i < b.len() && a[i] == b[i] {
+ i += 1;
+ }
+ &a[..i]
+}
+
+fn encode_cursor(
+ selector: &RawSelector,
+ boundary_key: &[u8],
+) -> Result<String, AnyError> {
+ let common_prefix = selector.common_prefix();
+ if !boundary_key.starts_with(common_prefix) {
+ return Err(type_error("invalid boundary key"));
+ }
+
+ Ok(base64::encode_config(
+ &boundary_key[common_prefix.len()..],
+ base64::URL_SAFE,
+ ))
+}
+
+fn decode_selector_and_cursor(
+ selector: &RawSelector,
+ reverse: bool,
+ cursor: Option<&ByteString>,
+) -> Result<(Vec<u8>, Vec<u8>), AnyError> {
+ let Some(cursor) = cursor else {
+ return Ok((selector.range_start_key(), selector.range_end_key()));
+ };
+
+ let common_prefix = selector.common_prefix();
+ let cursor = base64::decode_config(cursor, base64::URL_SAFE)
+ .map_err(|_| type_error("invalid cursor"))?;
+
+ let first_key: Vec<u8>;
+ let last_key: Vec<u8>;
+
+ if reverse {
+ first_key = selector.range_start_key();
+ last_key = common_prefix
+ .iter()
+ .copied()
+ .chain(cursor.iter().copied())
+ .collect();
+ } else {
+ first_key = common_prefix
+ .iter()
+ .copied()
+ .chain(cursor.iter().copied())
+ .chain(Some(0))
+ .collect();
+ last_key = selector.range_end_key();
+ }
+
+ // Defend against out-of-bounds reading
+ if let Some(start) = selector.start() {
+ if &first_key[..] < start {
+ return Err(type_error("cursor out of bounds"));
+ }
+ }
+
+ if let Some(end) = selector.end() {
+ if &last_key[..] > end {
+ return Err(type_error("cursor out of bounds"));
+ }
+ }
+
+ Ok((first_key, last_key))
+}
+
+#[op]
+async fn op_kv_atomic_write<DBH>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ checks: Vec<V8KvCheck>,
+ mutations: Vec<V8KvMutation>,
+ enqueues: Vec<V8Enqueue>,
+) -> Result<bool, AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let db = {
+ let state = state.borrow();
+ let resource =
+ state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+ resource.db.clone()
+ };
+
+ let checks = checks
+ .into_iter()
+ .map(TryInto::try_into)
+ .collect::<Result<_, AnyError>>()
+ .with_context(|| "invalid check")?;
+ let mutations = mutations
+ .into_iter()
+ .map(TryInto::try_into)
+ .collect::<Result<_, AnyError>>()
+ .with_context(|| "invalid mutation")?;
+ let enqueues = enqueues
+ .into_iter()
+ .map(TryInto::try_into)
+ .collect::<Result<_, AnyError>>()
+ .with_context(|| "invalid enqueue")?;
+
+ let atomic_write = AtomicWrite {
+ checks,
+ mutations,
+ enqueues,
+ };
+
+ let result = db.atomic_write(atomic_write).await?;
+
+ Ok(result)
+}
+
+// (prefix, start, end)
+type EncodeCursorRangeSelector = (Option<KvKey>, Option<KvKey>, Option<KvKey>);
+
+#[op]
+fn op_kv_encode_cursor(
+ (prefix, start, end): EncodeCursorRangeSelector,
+ boundary_key: KvKey,
+) -> Result<String, AnyError> {
+ let selector = RawSelector::from_tuple(prefix, start, end)?;
+ let boundary_key = encode_v8_key(boundary_key)?;
+ let cursor = encode_cursor(&selector, &boundary_key)?;
+ Ok(cursor)
+}