summaryrefslogtreecommitdiff
path: root/extensions
diff options
context:
space:
mode:
Diffstat (limited to 'extensions')
-rw-r--r--extensions/fetch/26_fetch.js14
-rw-r--r--extensions/fetch/lib.rs66
-rw-r--r--extensions/net/03_http.js5
-rw-r--r--extensions/timers/benches/timers_ops.rs4
-rw-r--r--extensions/web/09_file.js258
-rw-r--r--extensions/web/11_blob_url.js11
-rw-r--r--extensions/web/13_message_port.js4
-rw-r--r--extensions/web/Cargo.toml1
-rw-r--r--extensions/web/blob.rs265
-rw-r--r--extensions/web/internal.d.ts10
-rw-r--r--extensions/web/lib.rs113
11 files changed, 557 insertions, 194 deletions
diff --git a/extensions/fetch/26_fetch.js b/extensions/fetch/26_fetch.js
index 47b07be0b..a33187344 100644
--- a/extensions/fetch/26_fetch.js
+++ b/extensions/fetch/26_fetch.js
@@ -152,15 +152,18 @@
if (req.body !== null) {
if (req.body.streamOrStatic instanceof ReadableStream) {
- if (req.body.length === null) {
+ if (req.body.length === null || req.body.source instanceof Blob) {
reqBody = req.body.stream;
} else {
const reader = req.body.stream.getReader();
const r1 = await reader.read();
- if (r1.done) throw new TypeError("Unreachable");
- reqBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
+ if (r1.done) {
+ reqBody = new Uint8Array(0);
+ } else {
+ reqBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
}
} else {
req.body.streamOrStatic.consumed = true;
@@ -174,6 +177,7 @@
headers: req.headerList,
clientRid: req.clientRid,
hasBody: reqBody !== null,
+ bodyLength: req.body?.length,
}, reqBody instanceof Uint8Array ? reqBody : null);
function onAbort() {
diff --git a/extensions/fetch/lib.rs b/extensions/fetch/lib.rs
index 68fe7a20b..f870c58dc 100644
--- a/extensions/fetch/lib.rs
+++ b/extensions/fetch/lib.rs
@@ -26,7 +26,8 @@ use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use data_url::DataUrl;
-use deno_web::BlobUrlStore;
+use deno_web::BlobStore;
+use http::header::CONTENT_LENGTH;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
@@ -130,6 +131,7 @@ pub struct FetchArgs {
headers: Vec<(ByteString, ByteString)>,
client_rid: Option<u32>,
has_body: bool,
+ body_length: Option<u64>,
}
#[derive(Serialize)]
@@ -176,6 +178,14 @@ where
None => {
// If no body is passed, we return a writer for streaming the body.
let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1);
+
+ // If the size of the body is known, we include a content-length
+ // header explicitly.
+ if let Some(body_size) = args.body_length {
+ request =
+ request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
+ }
+
request = request.body(Body::wrap_stream(ReceiverStream::new(rx)));
let request_body_rid =
@@ -207,7 +217,13 @@ where
let cancel_handle = CancelHandle::new_rc();
let cancel_handle_ = cancel_handle.clone();
- let fut = async move { request.send().or_cancel(cancel_handle_).await };
+ let fut = async move {
+ request
+ .send()
+ .or_cancel(cancel_handle_)
+ .await
+ .map(|res| res.map_err(|err| type_error(err.to_string())))
+ };
let request_rid = state
.resource_table
@@ -240,32 +256,49 @@ where
(request_rid, None, None)
}
"blob" => {
- let blob_url_storage =
- state.try_borrow::<BlobUrlStore>().ok_or_else(|| {
- type_error("Blob URLs are not supported in this context.")
- })?;
+ let blob_store = state.try_borrow::<BlobStore>().ok_or_else(|| {
+ type_error("Blob URLs are not supported in this context.")
+ })?;
- let blob = blob_url_storage
- .get(url)?
+ let blob = blob_store
+ .get_object_url(url)?
.ok_or_else(|| type_error("Blob for the given URL not found."))?;
if method != "GET" {
return Err(type_error("Blob URL fetch only supports GET method."));
}
- let response = http::Response::builder()
- .status(http::StatusCode::OK)
- .header(http::header::CONTENT_LENGTH, blob.data.len())
- .header(http::header::CONTENT_TYPE, blob.media_type)
- .body(reqwest::Body::from(blob.data))?;
+ let cancel_handle = CancelHandle::new_rc();
+ let cancel_handle_ = cancel_handle.clone();
- let fut = async move { Ok(Ok(Response::from(response))) };
+ let fut = async move {
+ // TODO(lucacsonato): this should be a stream!
+ let chunk = match blob.read_all().or_cancel(cancel_handle_).await? {
+ Ok(chunk) => chunk,
+ Err(err) => return Ok(Err(err)),
+ };
+
+ let res = http::Response::builder()
+ .status(http::StatusCode::OK)
+ .header(http::header::CONTENT_LENGTH, chunk.len())
+ .header(http::header::CONTENT_TYPE, blob.media_type.clone())
+ .body(reqwest::Body::from(chunk))
+ .map_err(|err| type_error(err.to_string()));
+
+ match res {
+ Ok(response) => Ok(Ok(Response::from(response))),
+ Err(err) => Ok(Err(err)),
+ }
+ };
let request_rid = state
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
- (request_rid, None, None)
+ let cancel_handle_rid =
+ state.resource_table.add(FetchCancelHandle(cancel_handle));
+
+ (request_rid, None, Some(cancel_handle_rid))
}
_ => return Err(type_error(format!("scheme '{}' not supported", scheme))),
};
@@ -382,8 +415,7 @@ pub async fn op_fetch_response_read(
Ok(read)
}
-type CancelableResponseResult =
- Result<Result<Response, reqwest::Error>, Canceled>;
+type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>;
struct FetchRequestResource(
Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
diff --git a/extensions/net/03_http.js b/extensions/net/03_http.js
index c83936cfe..343b305a6 100644
--- a/extensions/net/03_http.js
+++ b/extensions/net/03_http.js
@@ -140,7 +140,10 @@
if (innerResp.body !== null) {
if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
if (innerResp.body.streamOrStatic instanceof ReadableStream) {
- if (innerResp.body.length === null) {
+ if (
+ innerResp.body.length === null ||
+ innerResp.body.source instanceof Blob
+ ) {
respBody = innerResp.body.stream;
} else {
const reader = innerResp.body.stream.getReader();
diff --git a/extensions/timers/benches/timers_ops.rs b/extensions/timers/benches/timers_ops.rs
index c463efbfe..269d9627d 100644
--- a/extensions/timers/benches/timers_ops.rs
+++ b/extensions/timers/benches/timers_ops.rs
@@ -3,13 +3,13 @@ use deno_core::Extension;
use deno_bench_util::bench_or_profile;
use deno_bench_util::bencher::{benchmark_group, Bencher};
use deno_bench_util::{bench_js_async, bench_js_sync};
-use deno_web::BlobUrlStore;
+use deno_web::BlobStore;
fn setup() -> Vec<Extension> {
vec![
deno_webidl::init(),
deno_url::init(),
- deno_web::init(BlobUrlStore::default(), None),
+ deno_web::init(BlobStore::default(), None),
deno_timers::init::<deno_timers::NoTimersPermission>(),
Extension::builder()
.js(vec![
diff --git a/extensions/web/09_file.js b/extensions/web/09_file.js
index 403bbee35..5f335f0e1 100644
--- a/extensions/web/09_file.js
+++ b/extensions/web/09_file.js
@@ -67,58 +67,51 @@
return result;
}
- /**
- * @param {...Uint8Array} bytesArrays
- * @returns {Uint8Array}
- */
- function concatUint8Arrays(...bytesArrays) {
- let byteLength = 0;
- for (const bytes of bytesArrays) {
- byteLength += bytes.byteLength;
+ /** @param {(BlobReference | Blob)[]} parts */
+ async function* toIterator(parts) {
+ for (const part of parts) {
+ yield* part.stream();
}
- const finalBytes = new Uint8Array(byteLength);
- let current = 0;
- for (const bytes of bytesArrays) {
- finalBytes.set(bytes, current);
- current += bytes.byteLength;
- }
- return finalBytes;
}
/** @typedef {BufferSource | Blob | string} BlobPart */
/**
- * @param {BlobPart[]} parts
- * @param {string} endings
- * @returns {Uint8Array}
- */
+ * @param {BlobPart[]} parts
+ * @param {string} endings
+ * @returns {{ parts: (BlobReference|Blob)[], size: number }}
+ */
function processBlobParts(parts, endings) {
- /** @type {Uint8Array[]} */
- const bytesArrays = [];
+ /** @type {(BlobReference|Blob)[]} */
+ const processedParts = [];
+ let size = 0;
for (const element of parts) {
if (element instanceof ArrayBuffer) {
- bytesArrays.push(new Uint8Array(element.slice(0)));
+ const chunk = new Uint8Array(element.slice(0));
+ processedParts.push(BlobReference.fromUint8Array(chunk));
+ size += element.byteLength;
} else if (ArrayBuffer.isView(element)) {
- const buffer = element.buffer.slice(
+ const chunk = new Uint8Array(
+ element.buffer,
element.byteOffset,
- element.byteOffset + element.byteLength,
+ element.byteLength,
);
- bytesArrays.push(new Uint8Array(buffer));
+ size += element.byteLength;
+ processedParts.push(BlobReference.fromUint8Array(chunk));
} else if (element instanceof Blob) {
- bytesArrays.push(
- new Uint8Array(element[_byteSequence].buffer.slice(0)),
- );
+ processedParts.push(element);
+ size += element.size;
} else if (typeof element === "string") {
- let s = element;
- if (endings == "native") {
- s = convertLineEndingsToNative(s);
- }
- bytesArrays.push(core.encode(s));
+ const chunk = core.encode(
+ endings == "native" ? convertLineEndingsToNative(element) : element,
+ );
+ size += chunk.byteLength;
+ processedParts.push(BlobReference.fromUint8Array(chunk));
} else {
- throw new TypeError("Unreachable code (invalild element type)");
+ throw new TypeError("Unreachable code (invalid element type)");
}
}
- return concatUint8Arrays(...bytesArrays);
+ return { parts: processedParts, size };
}
/**
@@ -133,18 +126,30 @@
return normalizedType.toLowerCase();
}
- const _byteSequence = Symbol("[[ByteSequence]]");
-
- class Blob {
- get [Symbol.toStringTag]() {
- return "Blob";
+ /**
+ * Get all Parts as a flat array containing all references
+ * @param {Blob} blob
+ * @param {string[]} bag
+ * @returns {string[]}
+ */
+ function getParts(blob, bag = []) {
+ for (const part of blob[_parts]) {
+ if (part instanceof Blob) {
+ getParts(part, bag);
+ } else {
+ bag.push(part._id);
+ }
}
+ return bag;
+ }
- /** @type {string} */
- #type;
+ const _size = Symbol("Size");
+ const _parts = Symbol("Parts");
- /** @type {Uint8Array} */
- [_byteSequence];
+ class Blob {
+ #type = "";
+ [_size] = 0;
+ [_parts];
/**
* @param {BlobPart[]} blobParts
@@ -163,18 +168,20 @@
this[webidl.brand] = webidl.brand;
- /** @type {Uint8Array} */
- this[_byteSequence] = processBlobParts(
+ const { parts, size } = processBlobParts(
blobParts,
options.endings,
);
+
+ this[_parts] = parts;
+ this[_size] = size;
this.#type = normalizeType(options.type);
}
/** @returns {number} */
get size() {
webidl.assertBranded(this, Blob);
- return this[_byteSequence].byteLength;
+ return this[_size];
}
/** @returns {string} */
@@ -237,6 +244,36 @@
relativeEnd = Math.min(end, O.size);
}
}
+
+ const span = Math.max(relativeEnd - relativeStart, 0);
+ const blobParts = [];
+ let added = 0;
+
+ for (const part of this[_parts]) {
+ // don't add the overflow to new blobParts
+ if (added >= span) {
+ // Could maybe be possible to remove variable `added`
+ // and only use relativeEnd?
+ break;
+ }
+ const size = part.size;
+ if (relativeStart && size <= relativeStart) {
+ // Skip the beginning and change the relative
+ // start & end position as we skip the unwanted parts
+ relativeStart -= size;
+ relativeEnd -= size;
+ } else {
+ const chunk = part.slice(
+ relativeStart,
+ Math.min(part.size, relativeEnd),
+ );
+ added += chunk.size;
+ relativeEnd -= part.size;
+ blobParts.push(chunk);
+ relativeStart = 0; // All next sequential parts should start at 0
+ }
+ }
+
/** @type {string} */
let relativeContentType;
if (contentType === undefined) {
@@ -244,9 +281,11 @@
} else {
relativeContentType = normalizeType(contentType);
}
- return new Blob([
- O[_byteSequence].buffer.slice(relativeStart, relativeEnd),
- ], { type: relativeContentType });
+
+ const blob = new Blob([], { type: relativeContentType });
+ blob[_parts] = blobParts;
+ blob[_size] = span;
+ return blob;
}
/**
@@ -254,14 +293,18 @@
*/
stream() {
webidl.assertBranded(this, Blob);
- const bytes = this[_byteSequence];
+ const partIterator = toIterator(this[_parts]);
const stream = new ReadableStream({
type: "bytes",
/** @param {ReadableByteStreamController} controller */
- start(controller) {
- const chunk = new Uint8Array(bytes.buffer.slice(0));
- if (chunk.byteLength > 0) controller.enqueue(chunk);
- controller.close();
+ async pull(controller) {
+ while (true) {
+ const { value, done } = await partIterator.next();
+ if (done) return controller.close();
+ if (value.byteLength > 0) {
+ return controller.enqueue(value);
+ }
+ }
},
});
return stream;
@@ -282,12 +325,22 @@
async arrayBuffer() {
webidl.assertBranded(this, Blob);
const stream = this.stream();
- let bytes = new Uint8Array();
+ const bytes = new Uint8Array(this.size);
+ let offset = 0;
for await (const chunk of stream) {
- bytes = concatUint8Arrays(bytes, chunk);
+ bytes.set(chunk, offset);
+ offset += chunk.byteLength;
}
return bytes.buffer;
}
+
+ get [Symbol.toStringTag]() {
+ return "Blob";
+ }
+
+ [Symbol.for("Deno.customInspect")](inspect) {
+ return `Blob ${inspect({ size: this.size, type: this.#type })}`;
+ }
}
webidl.configurePrototype(Blob);
@@ -333,17 +386,13 @@
);
const _Name = Symbol("[[Name]]");
- const _LastModfied = Symbol("[[LastModified]]");
+ const _LastModified = Symbol("[[LastModified]]");
class File extends Blob {
- get [Symbol.toStringTag]() {
- return "File";
- }
-
/** @type {string} */
[_Name];
/** @type {number} */
- [_LastModfied];
+ [_LastModified];
/**
* @param {BlobPart[]} fileBits
@@ -373,10 +422,10 @@
this[_Name] = fileName;
if (options.lastModified === undefined) {
/** @type {number} */
- this[_LastModfied] = new Date().getTime();
+ this[_LastModified] = new Date().getTime();
} else {
/** @type {number} */
- this[_LastModfied] = options.lastModified;
+ this[_LastModified] = options.lastModified;
}
}
@@ -389,7 +438,11 @@
/** @returns {number} */
get lastModified() {
webidl.assertBranded(this, File);
- return this[_LastModfied];
+ return this[_LastModified];
+ }
+
+ get [Symbol.toStringTag]() {
+ return "File";
}
}
@@ -406,9 +459,80 @@
],
);
+ // A finalization registry to deallocate a blob part when its JS reference is
+ // garbage collected.
+ const registry = new FinalizationRegistry((uuid) => {
+ core.opSync("op_blob_remove_part", uuid);
+ });
+
+ // TODO(lucacasonato): get a better stream from Rust in BlobReference#stream
+
+ /**
+ * An opaque reference to a blob part in Rust. This could be backed by a file,
+ * in memory storage, or something else.
+ */
+ class BlobReference {
+ /**
+ * Don't use directly. Use `BlobReference.fromUint8Array`.
+ * @param {string} id
+ * @param {number} size
+ */
+ constructor(id, size) {
+ this._id = id;
+ this.size = size;
+ registry.register(this, id);
+ }
+
+ /**
+ * Create a new blob part from a Uint8Array.
+ *
+ * @param {Uint8Array} data
+ * @returns {BlobReference}
+ */
+ static fromUint8Array(data) {
+ const id = core.opSync("op_blob_create_part", data);
+ return new BlobReference(id, data.byteLength);
+ }
+
+ /**
+ * Create a new BlobReference by slicing this BlobReference. This is a copy
+ * free operation - the sliced reference will still reference the original
+ * underlying bytes.
+ *
+ * @param {number} start
+ * @param {number} end
+ * @returns {BlobReference}
+ */
+ slice(start, end) {
+ const size = end - start;
+ const id = core.opSync("op_blob_slice_part", this._id, {
+ start,
+ len: size,
+ });
+ return new BlobReference(id, size);
+ }
+
+ /**
+ * Read the entire contents of the reference blob.
+ * @returns {AsyncGenerator<Uint8Array>}
+ */
+ async *stream() {
+ yield core.opAsync("op_blob_read_part", this._id);
+
+ // let position = 0;
+ // const end = this.size;
+ // while (position !== end) {
+ // const size = Math.min(end - position, 65536);
+ // const chunk = this.slice(position, position + size);
+ // position += chunk.size;
+ // yield core.opAsync("op_blob_read_part", chunk._id);
+ // }
+ }
+ }
+
window.__bootstrap.file = {
+ getParts,
Blob,
- _byteSequence,
File,
};
})(this);
diff --git a/extensions/web/11_blob_url.js b/extensions/web/11_blob_url.js
index d030d79bd..fa0ea041c 100644
--- a/extensions/web/11_blob_url.js
+++ b/extensions/web/11_blob_url.js
@@ -15,7 +15,7 @@
((window) => {
const core = Deno.core;
const webidl = window.__bootstrap.webidl;
- const { _byteSequence } = window.__bootstrap.file;
+ const { getParts } = window.__bootstrap.file;
const { URL } = window.__bootstrap.url;
/**
@@ -31,9 +31,9 @@
});
const url = core.opSync(
- "op_file_create_object_url",
+ "op_blob_create_object_url",
blob.type,
- blob[_byteSequence],
+ getParts(blob),
);
return url;
@@ -51,10 +51,7 @@
prefix,
});
- core.opSync(
- "op_file_revoke_object_url",
- url,
- );
+ core.opSync("op_blob_revoke_object_url", url);
}
URL.createObjectURL = createObjectURL;
diff --git a/extensions/web/13_message_port.js b/extensions/web/13_message_port.js
index ae8e148f4..3bd7c692b 100644
--- a/extensions/web/13_message_port.js
+++ b/extensions/web/13_message_port.js
@@ -154,6 +154,10 @@
this[_id] = null;
}
}
+
+ get [Symbol.toStringTag]() {
+ return "MessagePort";
+ }
}
defineEventHandler(MessagePort.prototype, "message", function (self) {
diff --git a/extensions/web/Cargo.toml b/extensions/web/Cargo.toml
index eeec91036..b056baeea 100644
--- a/extensions/web/Cargo.toml
+++ b/extensions/web/Cargo.toml
@@ -14,6 +14,7 @@ repository = "https://github.com/denoland/deno"
path = "lib.rs"
[dependencies]
+async-trait = "0.1.50"
base64 = "0.13.0"
deno_core = { version = "0.92.0", path = "../../core" }
encoding_rs = "0.8.28"
diff --git a/extensions/web/blob.rs b/extensions/web/blob.rs
new file mode 100644
index 000000000..96a982677
--- /dev/null
+++ b/extensions/web/blob.rs
@@ -0,0 +1,265 @@
+use async_trait::async_trait;
+use deno_core::error::type_error;
+use deno_core::url::Url;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::Mutex;
+
+use deno_core::error::AnyError;
+use uuid::Uuid;
+
+use crate::Location;
+
+pub type PartMap = HashMap<Uuid, Arc<Box<dyn BlobPart + Send + Sync>>>;
+
+#[derive(Clone, Default, Debug)]
+pub struct BlobStore {
+ parts: Arc<Mutex<PartMap>>,
+ object_urls: Arc<Mutex<HashMap<Url, Arc<Blob>>>>,
+}
+
+impl BlobStore {
+ pub fn insert_part(&self, part: Box<dyn BlobPart + Send + Sync>) -> Uuid {
+ let id = Uuid::new_v4();
+ let mut parts = self.parts.lock().unwrap();
+ parts.insert(id, Arc::new(part));
+ id
+ }
+
+ pub fn get_part(
+ &self,
+ id: &Uuid,
+ ) -> Option<Arc<Box<dyn BlobPart + Send + Sync>>> {
+ let parts = self.parts.lock().unwrap();
+ let part = parts.get(&id);
+ part.cloned()
+ }
+
+ pub fn remove_part(
+ &self,
+ id: &Uuid,
+ ) -> Option<Arc<Box<dyn BlobPart + Send + Sync>>> {
+ let mut parts = self.parts.lock().unwrap();
+ parts.remove(&id)
+ }
+
+ pub fn get_object_url(
+ &self,
+ mut url: Url,
+ ) -> Result<Option<Arc<Blob>>, AnyError> {
+ let blob_store = self.object_urls.lock().unwrap();
+ url.set_fragment(None);
+ Ok(blob_store.get(&url).cloned())
+ }
+
+ pub fn insert_object_url(
+ &self,
+ blob: Blob,
+ maybe_location: Option<Url>,
+ ) -> Url {
+ let origin = if let Some(location) = maybe_location {
+ location.origin().ascii_serialization()
+ } else {
+ "null".to_string()
+ };
+ let id = Uuid::new_v4();
+ let url = Url::parse(&format!("blob:{}/{}", origin, id)).unwrap();
+
+ let mut blob_store = self.object_urls.lock().unwrap();
+ blob_store.insert(url.clone(), Arc::new(blob));
+
+ url
+ }
+
+ pub fn remove_object_url(&self, url: &Url) {
+ let mut blob_store = self.object_urls.lock().unwrap();
+ blob_store.remove(&url);
+ }
+}
+
+#[derive(Debug)]
+pub struct Blob {
+ pub media_type: String,
+
+ pub parts: Vec<Arc<Box<dyn BlobPart + Send + Sync>>>,
+}
+
+impl Blob {
+ // TODO(lucacsonato): this should be a stream!
+ pub async fn read_all(&self) -> Result<Vec<u8>, AnyError> {
+ let size = self.size();
+ let mut bytes = Vec::with_capacity(size);
+
+ for part in &self.parts {
+ let chunk = part.read().await?;
+ bytes.extend_from_slice(chunk);
+ }
+
+ assert_eq!(bytes.len(), size);
+
+ Ok(bytes)
+ }
+
+ fn size(&self) -> usize {
+ let mut total = 0;
+ for part in &self.parts {
+ total += part.size()
+ }
+ total
+ }
+}
+
+#[async_trait]
+pub trait BlobPart: Debug {
+ // TODO(lucacsonato): this should be a stream!
+ async fn read(&self) -> Result<&[u8], AnyError>;
+ fn size(&self) -> usize;
+}
+
+#[derive(Debug)]
+pub struct InMemoryBlobPart(Vec<u8>);
+
+impl From<Vec<u8>> for InMemoryBlobPart {
+ fn from(vec: Vec<u8>) -> Self {
+ Self(vec)
+ }
+}
+
+#[async_trait]
+impl BlobPart for InMemoryBlobPart {
+ async fn read(&self) -> Result<&[u8], AnyError> {
+ Ok(&self.0)
+ }
+
+ fn size(&self) -> usize {
+ self.0.len()
+ }
+}
+
+#[derive(Debug)]
+pub struct SlicedBlobPart {
+ part: Arc<Box<dyn BlobPart + Send + Sync>>,
+ start: usize,
+ len: usize,
+}
+
+#[async_trait]
+impl BlobPart for SlicedBlobPart {
+ async fn read(&self) -> Result<&[u8], AnyError> {
+ let original = self.part.read().await?;
+ Ok(&original[self.start..self.start + self.len])
+ }
+
+ fn size(&self) -> usize {
+ self.len
+ }
+}
+
+pub fn op_blob_create_part(
+ state: &mut deno_core::OpState,
+ data: ZeroCopyBuf,
+ _: (),
+) -> Result<Uuid, AnyError> {
+ let blob_store = state.borrow::<BlobStore>();
+ let part = InMemoryBlobPart(data.to_vec());
+ let id = blob_store.insert_part(Box::new(part));
+ Ok(id)
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SliceOptions {
+ start: usize,
+ len: usize,
+}
+
+pub fn op_blob_slice_part(
+ state: &mut deno_core::OpState,
+ id: Uuid,
+ options: SliceOptions,
+) -> Result<Uuid, AnyError> {
+ let blob_store = state.borrow::<BlobStore>();
+ let part = blob_store
+ .get_part(&id)
+ .ok_or_else(|| type_error("Blob part not found"))?;
+
+ let SliceOptions { start, len } = options;
+
+ let size = part.size();
+ if start + len > size {
+ return Err(type_error(
+ "start + len can not be larger than blob part size",
+ ));
+ }
+
+ let sliced_part = SlicedBlobPart { part, start, len };
+ let id = blob_store.insert_part(Box::new(sliced_part));
+
+ Ok(id)
+}
+
+pub async fn op_blob_read_part(
+ state: Rc<RefCell<deno_core::OpState>>,
+ id: Uuid,
+ _: (),
+) -> Result<ZeroCopyBuf, AnyError> {
+ let part = {
+ let state = state.borrow();
+ let blob_store = state.borrow::<BlobStore>();
+ blob_store.get_part(&id)
+ }
+ .ok_or_else(|| type_error("Blob part not found"))?;
+ let buf = part.read().await?;
+ Ok(ZeroCopyBuf::from(buf.to_vec()))
+}
+
+pub fn op_blob_remove_part(
+ state: &mut deno_core::OpState,
+ id: Uuid,
+ _: (),
+) -> Result<(), AnyError> {
+ let blob_store = state.borrow::<BlobStore>();
+ blob_store.remove_part(&id);
+ Ok(())
+}
+
+pub fn op_blob_create_object_url(
+ state: &mut deno_core::OpState,
+ media_type: String,
+ part_ids: Vec<Uuid>,
+) -> Result<String, AnyError> {
+ let mut parts = Vec::with_capacity(part_ids.len());
+ let blob_store = state.borrow::<BlobStore>();
+ for part_id in part_ids {
+ let part = blob_store
+ .get_part(&part_id)
+ .ok_or_else(|| type_error("Blob part not found"))?;
+ parts.push(part);
+ }
+
+ let blob = Blob { media_type, parts };
+
+ let maybe_location = state.try_borrow::<Location>();
+ let blob_store = state.borrow::<BlobStore>();
+
+ let url = blob_store
+ .insert_object_url(blob, maybe_location.map(|location| location.0.clone()));
+
+ Ok(url.to_string())
+}
+
+pub fn op_blob_revoke_object_url(
+ state: &mut deno_core::OpState,
+ url: String,
+ _: (),
+) -> Result<(), AnyError> {
+ let url = Url::parse(&url)?;
+ let blob_store = state.borrow::<BlobStore>();
+ blob_store.remove_object_url(&url);
+ Ok(())
+}
diff --git a/extensions/web/internal.d.ts b/extensions/web/internal.d.ts
index bc3982a88..3a2a0c1be 100644
--- a/extensions/web/internal.d.ts
+++ b/extensions/web/internal.d.ts
@@ -73,13 +73,9 @@ declare namespace globalThis {
};
declare var file: {
- Blob: typeof Blob & {
- [globalThis.__bootstrap.file._byteSequence]: Uint8Array;
- };
- readonly _byteSequence: unique symbol;
- File: typeof File & {
- [globalThis.__bootstrap.file._byteSequence]: Uint8Array;
- };
+ getParts(blob: Blob): string[];
+ Blob: typeof Blob;
+ File: typeof File;
};
declare var streams: {
diff --git a/extensions/web/lib.rs b/extensions/web/lib.rs
index 67022c7ea..634004ac9 100644
--- a/extensions/web/lib.rs
+++ b/extensions/web/lib.rs
@@ -1,13 +1,9 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+mod blob;
mod message_port;
-pub use crate::message_port::create_entangled_message_port;
-pub use crate::message_port::JsMessageData;
-pub use crate::message_port::MessagePort;
-
use deno_core::error::bad_resource_id;
-use deno_core::error::null_opbuf;
use deno_core::error::range_error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
@@ -16,7 +12,6 @@ use deno_core::op_async;
use deno_core::op_sync;
use deno_core::url::Url;
use deno_core::Extension;
-use deno_core::ModuleSpecifier;
use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
@@ -29,23 +24,30 @@ use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
-use std::collections::HashMap;
use std::fmt;
use std::path::PathBuf;
-use std::sync::Arc;
-use std::sync::Mutex;
use std::usize;
-use uuid::Uuid;
+use crate::blob::op_blob_create_object_url;
+use crate::blob::op_blob_create_part;
+use crate::blob::op_blob_read_part;
+use crate::blob::op_blob_remove_part;
+use crate::blob::op_blob_revoke_object_url;
+use crate::blob::op_blob_slice_part;
+pub use crate::blob::Blob;
+pub use crate::blob::BlobPart;
+pub use crate::blob::BlobStore;
+pub use crate::blob::InMemoryBlobPart;
+
+pub use crate::message_port::create_entangled_message_port;
use crate::message_port::op_message_port_create_entangled;
use crate::message_port::op_message_port_post_message;
use crate::message_port::op_message_port_recv_message;
+pub use crate::message_port::JsMessageData;
+pub use crate::message_port::MessagePort;
/// Load and execute the javascript code.
-pub fn init(
- blob_url_store: BlobUrlStore,
- maybe_location: Option<Url>,
-) -> Extension {
+pub fn init(blob_store: BlobStore, maybe_location: Option<Url>) -> Extension {
Extension::builder()
.js(include_js_files!(
prefix "deno:extensions/web",
@@ -75,13 +77,17 @@ pub fn init(
("op_encoding_new_decoder", op_sync(op_encoding_new_decoder)),
("op_encoding_decode", op_sync(op_encoding_decode)),
("op_encoding_encode_into", op_sync(op_encoding_encode_into)),
+ ("op_blob_create_part", op_sync(op_blob_create_part)),
+ ("op_blob_slice_part", op_sync(op_blob_slice_part)),
+ ("op_blob_read_part", op_async(op_blob_read_part)),
+ ("op_blob_remove_part", op_sync(op_blob_remove_part)),
(
- "op_file_create_object_url",
- op_sync(op_file_create_object_url),
+ "op_blob_create_object_url",
+ op_sync(op_blob_create_object_url),
),
(
- "op_file_revoke_object_url",
- op_sync(op_file_revoke_object_url),
+ "op_blob_revoke_object_url",
+ op_sync(op_blob_revoke_object_url),
),
(
"op_message_port_create_entangled",
@@ -97,7 +103,7 @@ pub fn init(
),
])
.state(move |state| {
- state.put(blob_url_store.clone());
+ state.put(blob_store.clone());
if let Some(location) = maybe_location.clone() {
state.put(Location(location));
}
@@ -381,73 +387,4 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
.map(|_| "DOMExceptionInvalidCharacterError")
})
}
-
-#[derive(Debug, Clone)]
-pub struct Blob {
- pub data: Vec<u8>,
- pub media_type: String,
-}
-
pub struct Location(pub Url);
-
-#[derive(Debug, Default, Clone)]
-pub struct BlobUrlStore(Arc<Mutex<HashMap<Url, Blob>>>);
-
-impl BlobUrlStore {
- pub fn get(&self, mut url: Url) -> Result<Option<Blob>, AnyError> {
- let blob_store = self.0.lock().unwrap();
- url.set_fragment(None);
- Ok(blob_store.get(&url).cloned())
- }
-
- pub fn insert(&self, blob: Blob, maybe_location: Option<Url>) -> Url {
- let origin = if let Some(location) = maybe_location {
- location.origin().ascii_serialization()
- } else {
- "null".to_string()
- };
- let id = Uuid::new_v4();
- let url = Url::parse(&format!("blob:{}/{}", origin, id)).unwrap();
-
- let mut blob_store = self.0.lock().unwrap();
- blob_store.insert(url.clone(), blob);
-
- url
- }
-
- pub fn remove(&self, url: &ModuleSpecifier) {
- let mut blob_store = self.0.lock().unwrap();
- blob_store.remove(&url);
- }
-}
-
-pub fn op_file_create_object_url(
- state: &mut deno_core::OpState,
- media_type: String,
- zero_copy: Option<ZeroCopyBuf>,
-) -> Result<String, AnyError> {
- let data = zero_copy.ok_or_else(null_opbuf)?;
- let blob = Blob {
- data: data.to_vec(),
- media_type,
- };
-
- let maybe_location = state.try_borrow::<Location>();
- let blob_store = state.borrow::<BlobUrlStore>();
-
- let url =
- blob_store.insert(blob, maybe_location.map(|location| location.0.clone()));
-
- Ok(url.to_string())
-}
-
-pub fn op_file_revoke_object_url(
- state: &mut deno_core::OpState,
- url: String,
- _: (),
-) -> Result<(), AnyError> {
- let url = Url::parse(&url)?;
- let blob_store = state.borrow::<BlobUrlStore>();
- blob_store.remove(&url);
- Ok(())
-}