summaryrefslogtreecommitdiff
path: root/extensions/web/blob.rs
diff options
context:
space:
mode:
authorJimmy Wärting <jimmy@warting.se>2021-07-05 15:34:37 +0200
committerGitHub <noreply@github.com>2021-07-05 15:34:37 +0200
commit2c0b0e45b72ef1b5d7fa95e1e110d07ddbc720f7 (patch)
tree3b54a6f1f156f8d105cf41ac290035c8b5f8f1c9 /extensions/web/blob.rs
parentea87d860beda7cd40eb6857199a00e5ba8700fd2 (diff)
refactor: asynchronous blob backing store (#10969)
Co-authored-by: Luca Casonato <hello@lcas.dev>
Diffstat (limited to 'extensions/web/blob.rs')
-rw-r--r--extensions/web/blob.rs265
1 files changed, 265 insertions, 0 deletions
diff --git a/extensions/web/blob.rs b/extensions/web/blob.rs
new file mode 100644
index 000000000..96a982677
--- /dev/null
+++ b/extensions/web/blob.rs
@@ -0,0 +1,265 @@
+use async_trait::async_trait;
+use deno_core::error::type_error;
+use deno_core::url::Url;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::Mutex;
+
+use deno_core::error::AnyError;
+use uuid::Uuid;
+
+use crate::Location;
+
+pub type PartMap = HashMap<Uuid, Arc<Box<dyn BlobPart + Send + Sync>>>;
+
+#[derive(Clone, Default, Debug)]
+pub struct BlobStore {
+ parts: Arc<Mutex<PartMap>>,
+ object_urls: Arc<Mutex<HashMap<Url, Arc<Blob>>>>,
+}
+
+impl BlobStore {
+ pub fn insert_part(&self, part: Box<dyn BlobPart + Send + Sync>) -> Uuid {
+ let id = Uuid::new_v4();
+ let mut parts = self.parts.lock().unwrap();
+ parts.insert(id, Arc::new(part));
+ id
+ }
+
+ pub fn get_part(
+ &self,
+ id: &Uuid,
+ ) -> Option<Arc<Box<dyn BlobPart + Send + Sync>>> {
+ let parts = self.parts.lock().unwrap();
+ let part = parts.get(&id);
+ part.cloned()
+ }
+
+ pub fn remove_part(
+ &self,
+ id: &Uuid,
+ ) -> Option<Arc<Box<dyn BlobPart + Send + Sync>>> {
+ let mut parts = self.parts.lock().unwrap();
+ parts.remove(&id)
+ }
+
+ pub fn get_object_url(
+ &self,
+ mut url: Url,
+ ) -> Result<Option<Arc<Blob>>, AnyError> {
+ let blob_store = self.object_urls.lock().unwrap();
+ url.set_fragment(None);
+ Ok(blob_store.get(&url).cloned())
+ }
+
+ pub fn insert_object_url(
+ &self,
+ blob: Blob,
+ maybe_location: Option<Url>,
+ ) -> Url {
+ let origin = if let Some(location) = maybe_location {
+ location.origin().ascii_serialization()
+ } else {
+ "null".to_string()
+ };
+ let id = Uuid::new_v4();
+ let url = Url::parse(&format!("blob:{}/{}", origin, id)).unwrap();
+
+ let mut blob_store = self.object_urls.lock().unwrap();
+ blob_store.insert(url.clone(), Arc::new(blob));
+
+ url
+ }
+
+ pub fn remove_object_url(&self, url: &Url) {
+ let mut blob_store = self.object_urls.lock().unwrap();
+ blob_store.remove(&url);
+ }
+}
+
+#[derive(Debug)]
+pub struct Blob {
+ pub media_type: String,
+
+ pub parts: Vec<Arc<Box<dyn BlobPart + Send + Sync>>>,
+}
+
+impl Blob {
+ // TODO(lucacsonato): this should be a stream!
+ pub async fn read_all(&self) -> Result<Vec<u8>, AnyError> {
+ let size = self.size();
+ let mut bytes = Vec::with_capacity(size);
+
+ for part in &self.parts {
+ let chunk = part.read().await?;
+ bytes.extend_from_slice(chunk);
+ }
+
+ assert_eq!(bytes.len(), size);
+
+ Ok(bytes)
+ }
+
+ fn size(&self) -> usize {
+ let mut total = 0;
+ for part in &self.parts {
+ total += part.size()
+ }
+ total
+ }
+}
+
+#[async_trait]
+pub trait BlobPart: Debug {
+ // TODO(lucacsonato): this should be a stream!
+ async fn read(&self) -> Result<&[u8], AnyError>;
+ fn size(&self) -> usize;
+}
+
+#[derive(Debug)]
+pub struct InMemoryBlobPart(Vec<u8>);
+
+impl From<Vec<u8>> for InMemoryBlobPart {
+ fn from(vec: Vec<u8>) -> Self {
+ Self(vec)
+ }
+}
+
+#[async_trait]
+impl BlobPart for InMemoryBlobPart {
+ async fn read(&self) -> Result<&[u8], AnyError> {
+ Ok(&self.0)
+ }
+
+ fn size(&self) -> usize {
+ self.0.len()
+ }
+}
+
+#[derive(Debug)]
+pub struct SlicedBlobPart {
+ part: Arc<Box<dyn BlobPart + Send + Sync>>,
+ start: usize,
+ len: usize,
+}
+
+#[async_trait]
+impl BlobPart for SlicedBlobPart {
+ async fn read(&self) -> Result<&[u8], AnyError> {
+ let original = self.part.read().await?;
+ Ok(&original[self.start..self.start + self.len])
+ }
+
+ fn size(&self) -> usize {
+ self.len
+ }
+}
+
+pub fn op_blob_create_part(
+ state: &mut deno_core::OpState,
+ data: ZeroCopyBuf,
+ _: (),
+) -> Result<Uuid, AnyError> {
+ let blob_store = state.borrow::<BlobStore>();
+ let part = InMemoryBlobPart(data.to_vec());
+ let id = blob_store.insert_part(Box::new(part));
+ Ok(id)
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SliceOptions {
+ start: usize,
+ len: usize,
+}
+
+pub fn op_blob_slice_part(
+ state: &mut deno_core::OpState,
+ id: Uuid,
+ options: SliceOptions,
+) -> Result<Uuid, AnyError> {
+ let blob_store = state.borrow::<BlobStore>();
+ let part = blob_store
+ .get_part(&id)
+ .ok_or_else(|| type_error("Blob part not found"))?;
+
+ let SliceOptions { start, len } = options;
+
+ let size = part.size();
+ if start + len > size {
+ return Err(type_error(
+ "start + len can not be larger than blob part size",
+ ));
+ }
+
+ let sliced_part = SlicedBlobPart { part, start, len };
+ let id = blob_store.insert_part(Box::new(sliced_part));
+
+ Ok(id)
+}
+
+pub async fn op_blob_read_part(
+ state: Rc<RefCell<deno_core::OpState>>,
+ id: Uuid,
+ _: (),
+) -> Result<ZeroCopyBuf, AnyError> {
+ let part = {
+ let state = state.borrow();
+ let blob_store = state.borrow::<BlobStore>();
+ blob_store.get_part(&id)
+ }
+ .ok_or_else(|| type_error("Blob part not found"))?;
+ let buf = part.read().await?;
+ Ok(ZeroCopyBuf::from(buf.to_vec()))
+}
+
+pub fn op_blob_remove_part(
+ state: &mut deno_core::OpState,
+ id: Uuid,
+ _: (),
+) -> Result<(), AnyError> {
+ let blob_store = state.borrow::<BlobStore>();
+ blob_store.remove_part(&id);
+ Ok(())
+}
+
+pub fn op_blob_create_object_url(
+ state: &mut deno_core::OpState,
+ media_type: String,
+ part_ids: Vec<Uuid>,
+) -> Result<String, AnyError> {
+ let mut parts = Vec::with_capacity(part_ids.len());
+ let blob_store = state.borrow::<BlobStore>();
+ for part_id in part_ids {
+ let part = blob_store
+ .get_part(&part_id)
+ .ok_or_else(|| type_error("Blob part not found"))?;
+ parts.push(part);
+ }
+
+ let blob = Blob { media_type, parts };
+
+ let maybe_location = state.try_borrow::<Location>();
+ let blob_store = state.borrow::<BlobStore>();
+
+ let url = blob_store
+ .insert_object_url(blob, maybe_location.map(|location| location.0.clone()));
+
+ Ok(url.to_string())
+}
+
+pub fn op_blob_revoke_object_url(
+ state: &mut deno_core::OpState,
+ url: String,
+ _: (),
+) -> Result<(), AnyError> {
+ let url = Url::parse(&url)?;
+ let blob_store = state.borrow::<BlobStore>();
+ blob_store.remove_object_url(&url);
+ Ok(())
+}