diff options
Diffstat (limited to 'extensions')
-rw-r--r-- | extensions/fetch/26_fetch.js | 14 | ||||
-rw-r--r-- | extensions/fetch/lib.rs | 66 | ||||
-rw-r--r-- | extensions/net/03_http.js | 5 | ||||
-rw-r--r-- | extensions/timers/benches/timers_ops.rs | 4 | ||||
-rw-r--r-- | extensions/web/09_file.js | 258 | ||||
-rw-r--r-- | extensions/web/11_blob_url.js | 11 | ||||
-rw-r--r-- | extensions/web/13_message_port.js | 4 | ||||
-rw-r--r-- | extensions/web/Cargo.toml | 1 | ||||
-rw-r--r-- | extensions/web/blob.rs | 265 | ||||
-rw-r--r-- | extensions/web/internal.d.ts | 10 | ||||
-rw-r--r-- | extensions/web/lib.rs | 113 |
11 files changed, 557 insertions, 194 deletions
diff --git a/extensions/fetch/26_fetch.js b/extensions/fetch/26_fetch.js index 47b07be0b..a33187344 100644 --- a/extensions/fetch/26_fetch.js +++ b/extensions/fetch/26_fetch.js @@ -152,15 +152,18 @@ if (req.body !== null) { if (req.body.streamOrStatic instanceof ReadableStream) { - if (req.body.length === null) { + if (req.body.length === null || req.body.source instanceof Blob) { reqBody = req.body.stream; } else { const reader = req.body.stream.getReader(); const r1 = await reader.read(); - if (r1.done) throw new TypeError("Unreachable"); - reqBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); + if (r1.done) { + reqBody = new Uint8Array(0); + } else { + reqBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } } } else { req.body.streamOrStatic.consumed = true; @@ -174,6 +177,7 @@ headers: req.headerList, clientRid: req.clientRid, hasBody: reqBody !== null, + bodyLength: req.body?.length, }, reqBody instanceof Uint8Array ? reqBody : null); function onAbort() { diff --git a/extensions/fetch/lib.rs b/extensions/fetch/lib.rs index 68fe7a20b..f870c58dc 100644 --- a/extensions/fetch/lib.rs +++ b/extensions/fetch/lib.rs @@ -26,7 +26,8 @@ use deno_core::ResourceId; use deno_core::ZeroCopyBuf; use data_url::DataUrl; -use deno_web::BlobUrlStore; +use deno_web::BlobStore; +use http::header::CONTENT_LENGTH; use reqwest::header::HeaderMap; use reqwest::header::HeaderName; use reqwest::header::HeaderValue; @@ -130,6 +131,7 @@ pub struct FetchArgs { headers: Vec<(ByteString, ByteString)>, client_rid: Option<u32>, has_body: bool, + body_length: Option<u64>, } #[derive(Serialize)] @@ -176,6 +178,14 @@ where None => { // If no body is passed, we return a writer for streaming the body. let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1); + + // If the size of the body is known, we include a content-length + // header explicitly. + if let Some(body_size) = args.body_length { + request = + request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) + } + request = request.body(Body::wrap_stream(ReceiverStream::new(rx))); let request_body_rid = @@ -207,7 +217,13 @@ where let cancel_handle = CancelHandle::new_rc(); let cancel_handle_ = cancel_handle.clone(); - let fut = async move { request.send().or_cancel(cancel_handle_).await }; + let fut = async move { + request + .send() + .or_cancel(cancel_handle_) + .await + .map(|res| res.map_err(|err| type_error(err.to_string()))) + }; let request_rid = state .resource_table @@ -240,32 +256,49 @@ where (request_rid, None, None) } "blob" => { - let blob_url_storage = - state.try_borrow::<BlobUrlStore>().ok_or_else(|| { - type_error("Blob URLs are not supported in this context.") - })?; + let blob_store = state.try_borrow::<BlobStore>().ok_or_else(|| { + type_error("Blob URLs are not supported in this context.") + })?; - let blob = blob_url_storage - .get(url)? + let blob = blob_store + .get_object_url(url)? .ok_or_else(|| type_error("Blob for the given URL not found."))?; if method != "GET" { return Err(type_error("Blob URL fetch only supports GET method.")); } - let response = http::Response::builder() - .status(http::StatusCode::OK) - .header(http::header::CONTENT_LENGTH, blob.data.len()) - .header(http::header::CONTENT_TYPE, blob.media_type) - .body(reqwest::Body::from(blob.data))?; + let cancel_handle = CancelHandle::new_rc(); + let cancel_handle_ = cancel_handle.clone(); - let fut = async move { Ok(Ok(Response::from(response))) }; + let fut = async move { + // TODO(lucacsonato): this should be a stream! + let chunk = match blob.read_all().or_cancel(cancel_handle_).await? { + Ok(chunk) => chunk, + Err(err) => return Ok(Err(err)), + }; + + let res = http::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_LENGTH, chunk.len()) + .header(http::header::CONTENT_TYPE, blob.media_type.clone()) + .body(reqwest::Body::from(chunk)) + .map_err(|err| type_error(err.to_string())); + + match res { + Ok(response) => Ok(Ok(Response::from(response))), + Err(err) => Ok(Err(err)), + } + }; let request_rid = state .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, None, None) + let cancel_handle_rid = + state.resource_table.add(FetchCancelHandle(cancel_handle)); + + (request_rid, None, Some(cancel_handle_rid)) } _ => return Err(type_error(format!("scheme '{}' not supported", scheme))), }; @@ -382,8 +415,7 @@ pub async fn op_fetch_response_read( Ok(read) } -type CancelableResponseResult = - Result<Result<Response, reqwest::Error>, Canceled>; +type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>; struct FetchRequestResource( Pin<Box<dyn Future<Output = CancelableResponseResult>>>, diff --git a/extensions/net/03_http.js b/extensions/net/03_http.js index c83936cfe..343b305a6 100644 --- a/extensions/net/03_http.js +++ b/extensions/net/03_http.js @@ -140,7 +140,10 @@ if (innerResp.body !== null) { if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); if (innerResp.body.streamOrStatic instanceof ReadableStream) { - if (innerResp.body.length === null) { + if ( + innerResp.body.length === null || + innerResp.body.source instanceof Blob + ) { respBody = innerResp.body.stream; } else { const reader = innerResp.body.stream.getReader(); diff --git a/extensions/timers/benches/timers_ops.rs b/extensions/timers/benches/timers_ops.rs index c463efbfe..269d9627d 100644 --- a/extensions/timers/benches/timers_ops.rs +++ b/extensions/timers/benches/timers_ops.rs @@ -3,13 +3,13 @@ use deno_core::Extension; use deno_bench_util::bench_or_profile; use deno_bench_util::bencher::{benchmark_group, Bencher}; use deno_bench_util::{bench_js_async, bench_js_sync}; -use deno_web::BlobUrlStore; +use deno_web::BlobStore; fn setup() -> Vec<Extension> { vec![ deno_webidl::init(), deno_url::init(), - deno_web::init(BlobUrlStore::default(), None), + deno_web::init(BlobStore::default(), None), deno_timers::init::<deno_timers::NoTimersPermission>(), Extension::builder() .js(vec![ diff --git a/extensions/web/09_file.js b/extensions/web/09_file.js index 403bbee35..5f335f0e1 100644 --- a/extensions/web/09_file.js +++ b/extensions/web/09_file.js @@ -67,58 +67,51 @@ return result; } - /** - * @param {...Uint8Array} bytesArrays - * @returns {Uint8Array} - */ - function concatUint8Arrays(...bytesArrays) { - let byteLength = 0; - for (const bytes of bytesArrays) { - byteLength += bytes.byteLength; + /** @param {(BlobReference | Blob)[]} parts */ + async function* toIterator(parts) { + for (const part of parts) { + yield* part.stream(); } - const finalBytes = new Uint8Array(byteLength); - let current = 0; - for (const bytes of bytesArrays) { - finalBytes.set(bytes, current); - current += bytes.byteLength; - } - return finalBytes; } /** @typedef {BufferSource | Blob | string} BlobPart */ /** - * @param {BlobPart[]} parts - * @param {string} endings - * @returns {Uint8Array} - */ + * @param {BlobPart[]} parts + * @param {string} endings + * @returns {{ parts: (BlobReference|Blob)[], size: number }} + */ function processBlobParts(parts, endings) { - /** @type {Uint8Array[]} */ - const bytesArrays = []; + /** @type {(BlobReference|Blob)[]} */ + const processedParts = []; + let size = 0; for (const element of parts) { if (element instanceof ArrayBuffer) { - bytesArrays.push(new Uint8Array(element.slice(0))); + const chunk = new Uint8Array(element.slice(0)); + processedParts.push(BlobReference.fromUint8Array(chunk)); + size += element.byteLength; } else if (ArrayBuffer.isView(element)) { - const buffer = element.buffer.slice( + const chunk = new Uint8Array( + element.buffer, element.byteOffset, - element.byteOffset + element.byteLength, + element.byteLength, ); - bytesArrays.push(new Uint8Array(buffer)); + size += element.byteLength; + processedParts.push(BlobReference.fromUint8Array(chunk)); } else if (element instanceof Blob) { - bytesArrays.push( - new Uint8Array(element[_byteSequence].buffer.slice(0)), - ); + processedParts.push(element); + size += element.size; } else if (typeof element === "string") { - let s = element; - if (endings == "native") { - s = convertLineEndingsToNative(s); - } - bytesArrays.push(core.encode(s)); + const chunk = core.encode( + endings == "native" ? convertLineEndingsToNative(element) : element, + ); + size += chunk.byteLength; + processedParts.push(BlobReference.fromUint8Array(chunk)); } else { - throw new TypeError("Unreachable code (invalild element type)"); + throw new TypeError("Unreachable code (invalid element type)"); } } - return concatUint8Arrays(...bytesArrays); + return { parts: processedParts, size }; } /** @@ -133,18 +126,30 @@ return normalizedType.toLowerCase(); } - const _byteSequence = Symbol("[[ByteSequence]]"); - - class Blob { - get [Symbol.toStringTag]() { - return "Blob"; + /** + * Get all Parts as a flat array containing all references + * @param {Blob} blob + * @param {string[]} bag + * @returns {string[]} + */ + function getParts(blob, bag = []) { + for (const part of blob[_parts]) { + if (part instanceof Blob) { + getParts(part, bag); + } else { + bag.push(part._id); + } } + return bag; + } - /** @type {string} */ - #type; + const _size = Symbol("Size"); + const _parts = Symbol("Parts"); - /** @type {Uint8Array} */ - [_byteSequence]; + class Blob { + #type = ""; + [_size] = 0; + [_parts]; /** * @param {BlobPart[]} blobParts @@ -163,18 +168,20 @@ this[webidl.brand] = webidl.brand; - /** @type {Uint8Array} */ - this[_byteSequence] = processBlobParts( + const { parts, size } = processBlobParts( blobParts, options.endings, ); + + this[_parts] = parts; + this[_size] = size; this.#type = normalizeType(options.type); } /** @returns {number} */ get size() { webidl.assertBranded(this, Blob); - return this[_byteSequence].byteLength; + return this[_size]; } /** @returns {string} */ @@ -237,6 +244,36 @@ relativeEnd = Math.min(end, O.size); } } + + const span = Math.max(relativeEnd - relativeStart, 0); + const blobParts = []; + let added = 0; + + for (const part of this[_parts]) { + // don't add the overflow to new blobParts + if (added >= span) { + // Could maybe be possible to remove variable `added` + // and only use relativeEnd? + break; + } + const size = part.size; + if (relativeStart && size <= relativeStart) { + // Skip the beginning and change the relative + // start & end position as we skip the unwanted parts + relativeStart -= size; + relativeEnd -= size; + } else { + const chunk = part.slice( + relativeStart, + Math.min(part.size, relativeEnd), + ); + added += chunk.size; + relativeEnd -= part.size; + blobParts.push(chunk); + relativeStart = 0; // All next sequential parts should start at 0 + } + } + /** @type {string} */ let relativeContentType; if (contentType === undefined) { @@ -244,9 +281,11 @@ } else { relativeContentType = normalizeType(contentType); } - return new Blob([ - O[_byteSequence].buffer.slice(relativeStart, relativeEnd), - ], { type: relativeContentType }); + + const blob = new Blob([], { type: relativeContentType }); + blob[_parts] = blobParts; + blob[_size] = span; + return blob; } /** @@ -254,14 +293,18 @@ */ stream() { webidl.assertBranded(this, Blob); - const bytes = this[_byteSequence]; + const partIterator = toIterator(this[_parts]); const stream = new ReadableStream({ type: "bytes", /** @param {ReadableByteStreamController} controller */ - start(controller) { - const chunk = new Uint8Array(bytes.buffer.slice(0)); - if (chunk.byteLength > 0) controller.enqueue(chunk); - controller.close(); + async pull(controller) { + while (true) { + const { value, done } = await partIterator.next(); + if (done) return controller.close(); + if (value.byteLength > 0) { + return controller.enqueue(value); + } + } }, }); return stream; @@ -282,12 +325,22 @@ async arrayBuffer() { webidl.assertBranded(this, Blob); const stream = this.stream(); - let bytes = new Uint8Array(); + const bytes = new Uint8Array(this.size); + let offset = 0; for await (const chunk of stream) { - bytes = concatUint8Arrays(bytes, chunk); + bytes.set(chunk, offset); + offset += chunk.byteLength; } return bytes.buffer; } + + get [Symbol.toStringTag]() { + return "Blob"; + } + + [Symbol.for("Deno.customInspect")](inspect) { + return `Blob ${inspect({ size: this.size, type: this.#type })}`; + } } webidl.configurePrototype(Blob); @@ -333,17 +386,13 @@ ); const _Name = Symbol("[[Name]]"); - const _LastModfied = Symbol("[[LastModified]]"); + const _LastModified = Symbol("[[LastModified]]"); class File extends Blob { - get [Symbol.toStringTag]() { - return "File"; - } - /** @type {string} */ [_Name]; /** @type {number} */ - [_LastModfied]; + [_LastModified]; /** * @param {BlobPart[]} fileBits @@ -373,10 +422,10 @@ this[_Name] = fileName; if (options.lastModified === undefined) { /** @type {number} */ - this[_LastModfied] = new Date().getTime(); + this[_LastModified] = new Date().getTime(); } else { /** @type {number} */ - this[_LastModfied] = options.lastModified; + this[_LastModified] = options.lastModified; } } @@ -389,7 +438,11 @@ /** @returns {number} */ get lastModified() { webidl.assertBranded(this, File); - return this[_LastModfied]; + return this[_LastModified]; + } + + get [Symbol.toStringTag]() { + return "File"; } } @@ -406,9 +459,80 @@ ], ); + // A finalization registry to deallocate a blob part when its JS reference is + // garbage collected. + const registry = new FinalizationRegistry((uuid) => { + core.opSync("op_blob_remove_part", uuid); + }); + + // TODO(lucacasonato): get a better stream from Rust in BlobReference#stream + + /** + * An opaque reference to a blob part in Rust. This could be backed by a file, + * in memory storage, or something else. + */ + class BlobReference { + /** + * Don't use directly. Use `BlobReference.fromUint8Array`. + * @param {string} id + * @param {number} size + */ + constructor(id, size) { + this._id = id; + this.size = size; + registry.register(this, id); + } + + /** + * Create a new blob part from a Uint8Array. + * + * @param {Uint8Array} data + * @returns {BlobReference} + */ + static fromUint8Array(data) { + const id = core.opSync("op_blob_create_part", data); + return new BlobReference(id, data.byteLength); + } + + /** + * Create a new BlobReference by slicing this BlobReference. This is a copy + * free operation - the sliced reference will still reference the original + * underlying bytes. + * + * @param {number} start + * @param {number} end + * @returns {BlobReference} + */ + slice(start, end) { + const size = end - start; + const id = core.opSync("op_blob_slice_part", this._id, { + start, + len: size, + }); + return new BlobReference(id, size); + } + + /** + * Read the entire contents of the reference blob. + * @returns {AsyncGenerator<Uint8Array>} + */ + async *stream() { + yield core.opAsync("op_blob_read_part", this._id); + + // let position = 0; + // const end = this.size; + // while (position !== end) { + // const size = Math.min(end - position, 65536); + // const chunk = this.slice(position, position + size); + // position += chunk.size; + // yield core.opAsync("op_blob_read_part", chunk._id); + // } + } + } + window.__bootstrap.file = { + getParts, Blob, - _byteSequence, File, }; })(this); diff --git a/extensions/web/11_blob_url.js b/extensions/web/11_blob_url.js index d030d79bd..fa0ea041c 100644 --- a/extensions/web/11_blob_url.js +++ b/extensions/web/11_blob_url.js @@ -15,7 +15,7 @@ ((window) => { const core = Deno.core; const webidl = window.__bootstrap.webidl; - const { _byteSequence } = window.__bootstrap.file; + const { getParts } = window.__bootstrap.file; const { URL } = window.__bootstrap.url; /** @@ -31,9 +31,9 @@ }); const url = core.opSync( - "op_file_create_object_url", + "op_blob_create_object_url", blob.type, - blob[_byteSequence], + getParts(blob), ); return url; @@ -51,10 +51,7 @@ prefix, }); - core.opSync( - "op_file_revoke_object_url", - url, - ); + core.opSync("op_blob_revoke_object_url", url); } URL.createObjectURL = createObjectURL; diff --git a/extensions/web/13_message_port.js b/extensions/web/13_message_port.js index ae8e148f4..3bd7c692b 100644 --- a/extensions/web/13_message_port.js +++ b/extensions/web/13_message_port.js @@ -154,6 +154,10 @@ this[_id] = null; } } + + get [Symbol.toStringTag]() { + return "MessagePort"; + } } defineEventHandler(MessagePort.prototype, "message", function (self) { diff --git a/extensions/web/Cargo.toml b/extensions/web/Cargo.toml index eeec91036..b056baeea 100644 --- a/extensions/web/Cargo.toml +++ b/extensions/web/Cargo.toml @@ -14,6 +14,7 @@ repository = "https://github.com/denoland/deno" path = "lib.rs" [dependencies] +async-trait = "0.1.50" base64 = "0.13.0" deno_core = { version = "0.92.0", path = "../../core" } encoding_rs = "0.8.28" diff --git a/extensions/web/blob.rs b/extensions/web/blob.rs new file mode 100644 index 000000000..96a982677 --- /dev/null +++ b/extensions/web/blob.rs @@ -0,0 +1,265 @@ +use async_trait::async_trait; +use deno_core::error::type_error; +use deno_core::url::Url; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use std::cell::RefCell; +use std::collections::HashMap; +use std::fmt::Debug; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::Mutex; + +use deno_core::error::AnyError; +use uuid::Uuid; + +use crate::Location; + +pub type PartMap = HashMap<Uuid, Arc<Box<dyn BlobPart + Send + Sync>>>; + +#[derive(Clone, Default, Debug)] +pub struct BlobStore { + parts: Arc<Mutex<PartMap>>, + object_urls: Arc<Mutex<HashMap<Url, Arc<Blob>>>>, +} + +impl BlobStore { + pub fn insert_part(&self, part: Box<dyn BlobPart + Send + Sync>) -> Uuid { + let id = Uuid::new_v4(); + let mut parts = self.parts.lock().unwrap(); + parts.insert(id, Arc::new(part)); + id + } + + pub fn get_part( + &self, + id: &Uuid, + ) -> Option<Arc<Box<dyn BlobPart + Send + Sync>>> { + let parts = self.parts.lock().unwrap(); + let part = parts.get(&id); + part.cloned() + } + + pub fn remove_part( + &self, + id: &Uuid, + ) -> Option<Arc<Box<dyn BlobPart + Send + Sync>>> { + let mut parts = self.parts.lock().unwrap(); + parts.remove(&id) + } + + pub fn get_object_url( + &self, + mut url: Url, + ) -> Result<Option<Arc<Blob>>, AnyError> { + let blob_store = self.object_urls.lock().unwrap(); + url.set_fragment(None); + Ok(blob_store.get(&url).cloned()) + } + + pub fn insert_object_url( + &self, + blob: Blob, + maybe_location: Option<Url>, + ) -> Url { + let origin = if let Some(location) = maybe_location { + location.origin().ascii_serialization() + } else { + "null".to_string() + }; + let id = Uuid::new_v4(); + let url = Url::parse(&format!("blob:{}/{}", origin, id)).unwrap(); + + let mut blob_store = self.object_urls.lock().unwrap(); + blob_store.insert(url.clone(), Arc::new(blob)); + + url + } + + pub fn remove_object_url(&self, url: &Url) { + let mut blob_store = self.object_urls.lock().unwrap(); + blob_store.remove(&url); + } +} + +#[derive(Debug)] +pub struct Blob { + pub media_type: String, + + pub parts: Vec<Arc<Box<dyn BlobPart + Send + Sync>>>, +} + +impl Blob { + // TODO(lucacsonato): this should be a stream! + pub async fn read_all(&self) -> Result<Vec<u8>, AnyError> { + let size = self.size(); + let mut bytes = Vec::with_capacity(size); + + for part in &self.parts { + let chunk = part.read().await?; + bytes.extend_from_slice(chunk); + } + + assert_eq!(bytes.len(), size); + + Ok(bytes) + } + + fn size(&self) -> usize { + let mut total = 0; + for part in &self.parts { + total += part.size() + } + total + } +} + +#[async_trait] +pub trait BlobPart: Debug { + // TODO(lucacsonato): this should be a stream! + async fn read(&self) -> Result<&[u8], AnyError>; + fn size(&self) -> usize; +} + +#[derive(Debug)] +pub struct InMemoryBlobPart(Vec<u8>); + +impl From<Vec<u8>> for InMemoryBlobPart { + fn from(vec: Vec<u8>) -> Self { + Self(vec) + } +} + +#[async_trait] +impl BlobPart for InMemoryBlobPart { + async fn read(&self) -> Result<&[u8], AnyError> { + Ok(&self.0) + } + + fn size(&self) -> usize { + self.0.len() + } +} + +#[derive(Debug)] +pub struct SlicedBlobPart { + part: Arc<Box<dyn BlobPart + Send + Sync>>, + start: usize, + len: usize, +} + +#[async_trait] +impl BlobPart for SlicedBlobPart { + async fn read(&self) -> Result<&[u8], AnyError> { + let original = self.part.read().await?; + Ok(&original[self.start..self.start + self.len]) + } + + fn size(&self) -> usize { + self.len + } +} + +pub fn op_blob_create_part( + state: &mut deno_core::OpState, + data: ZeroCopyBuf, + _: (), +) -> Result<Uuid, AnyError> { + let blob_store = state.borrow::<BlobStore>(); + let part = InMemoryBlobPart(data.to_vec()); + let id = blob_store.insert_part(Box::new(part)); + Ok(id) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SliceOptions { + start: usize, + len: usize, +} + +pub fn op_blob_slice_part( + state: &mut deno_core::OpState, + id: Uuid, + options: SliceOptions, +) -> Result<Uuid, AnyError> { + let blob_store = state.borrow::<BlobStore>(); + let part = blob_store + .get_part(&id) + .ok_or_else(|| type_error("Blob part not found"))?; + + let SliceOptions { start, len } = options; + + let size = part.size(); + if start + len > size { + return Err(type_error( + "start + len can not be larger than blob part size", + )); + } + + let sliced_part = SlicedBlobPart { part, start, len }; + let id = blob_store.insert_part(Box::new(sliced_part)); + + Ok(id) +} + +pub async fn op_blob_read_part( + state: Rc<RefCell<deno_core::OpState>>, + id: Uuid, + _: (), +) -> Result<ZeroCopyBuf, AnyError> { + let part = { + let state = state.borrow(); + let blob_store = state.borrow::<BlobStore>(); + blob_store.get_part(&id) + } + .ok_or_else(|| type_error("Blob part not found"))?; + let buf = part.read().await?; + Ok(ZeroCopyBuf::from(buf.to_vec())) +} + +pub fn op_blob_remove_part( + state: &mut deno_core::OpState, + id: Uuid, + _: (), +) -> Result<(), AnyError> { + let blob_store = state.borrow::<BlobStore>(); + blob_store.remove_part(&id); + Ok(()) +} + +pub fn op_blob_create_object_url( + state: &mut deno_core::OpState, + media_type: String, + part_ids: Vec<Uuid>, +) -> Result<String, AnyError> { + let mut parts = Vec::with_capacity(part_ids.len()); + let blob_store = state.borrow::<BlobStore>(); + for part_id in part_ids { + let part = blob_store + .get_part(&part_id) + .ok_or_else(|| type_error("Blob part not found"))?; + parts.push(part); + } + + let blob = Blob { media_type, parts }; + + let maybe_location = state.try_borrow::<Location>(); + let blob_store = state.borrow::<BlobStore>(); + + let url = blob_store + .insert_object_url(blob, maybe_location.map(|location| location.0.clone())); + + Ok(url.to_string()) +} + +pub fn op_blob_revoke_object_url( + state: &mut deno_core::OpState, + url: String, + _: (), +) -> Result<(), AnyError> { + let url = Url::parse(&url)?; + let blob_store = state.borrow::<BlobStore>(); + blob_store.remove_object_url(&url); + Ok(()) +} diff --git a/extensions/web/internal.d.ts b/extensions/web/internal.d.ts index bc3982a88..3a2a0c1be 100644 --- a/extensions/web/internal.d.ts +++ b/extensions/web/internal.d.ts @@ -73,13 +73,9 @@ declare namespace globalThis { }; declare var file: { - Blob: typeof Blob & { - [globalThis.__bootstrap.file._byteSequence]: Uint8Array; - }; - readonly _byteSequence: unique symbol; - File: typeof File & { - [globalThis.__bootstrap.file._byteSequence]: Uint8Array; - }; + getParts(blob: Blob): string[]; + Blob: typeof Blob; + File: typeof File; }; declare var streams: { diff --git a/extensions/web/lib.rs b/extensions/web/lib.rs index 67022c7ea..634004ac9 100644 --- a/extensions/web/lib.rs +++ b/extensions/web/lib.rs @@ -1,13 +1,9 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +mod blob; mod message_port; -pub use crate::message_port::create_entangled_message_port; -pub use crate::message_port::JsMessageData; -pub use crate::message_port::MessagePort; - use deno_core::error::bad_resource_id; -use deno_core::error::null_opbuf; use deno_core::error::range_error; use deno_core::error::type_error; use deno_core::error::AnyError; @@ -16,7 +12,6 @@ use deno_core::op_async; use deno_core::op_sync; use deno_core::url::Url; use deno_core::Extension; -use deno_core::ModuleSpecifier; use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; @@ -29,23 +24,30 @@ use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; -use std::collections::HashMap; use std::fmt; use std::path::PathBuf; -use std::sync::Arc; -use std::sync::Mutex; use std::usize; -use uuid::Uuid; +use crate::blob::op_blob_create_object_url; +use crate::blob::op_blob_create_part; +use crate::blob::op_blob_read_part; +use crate::blob::op_blob_remove_part; +use crate::blob::op_blob_revoke_object_url; +use crate::blob::op_blob_slice_part; +pub use crate::blob::Blob; +pub use crate::blob::BlobPart; +pub use crate::blob::BlobStore; +pub use crate::blob::InMemoryBlobPart; + +pub use crate::message_port::create_entangled_message_port; use crate::message_port::op_message_port_create_entangled; use crate::message_port::op_message_port_post_message; use crate::message_port::op_message_port_recv_message; +pub use crate::message_port::JsMessageData; +pub use crate::message_port::MessagePort; /// Load and execute the javascript code. -pub fn init( - blob_url_store: BlobUrlStore, - maybe_location: Option<Url>, -) -> Extension { +pub fn init(blob_store: BlobStore, maybe_location: Option<Url>) -> Extension { Extension::builder() .js(include_js_files!( prefix "deno:extensions/web", @@ -75,13 +77,17 @@ pub fn init( ("op_encoding_new_decoder", op_sync(op_encoding_new_decoder)), ("op_encoding_decode", op_sync(op_encoding_decode)), ("op_encoding_encode_into", op_sync(op_encoding_encode_into)), + ("op_blob_create_part", op_sync(op_blob_create_part)), + ("op_blob_slice_part", op_sync(op_blob_slice_part)), + ("op_blob_read_part", op_async(op_blob_read_part)), + ("op_blob_remove_part", op_sync(op_blob_remove_part)), ( - "op_file_create_object_url", - op_sync(op_file_create_object_url), + "op_blob_create_object_url", + op_sync(op_blob_create_object_url), ), ( - "op_file_revoke_object_url", - op_sync(op_file_revoke_object_url), + "op_blob_revoke_object_url", + op_sync(op_blob_revoke_object_url), ), ( "op_message_port_create_entangled", @@ -97,7 +103,7 @@ pub fn init( ), ]) .state(move |state| { - state.put(blob_url_store.clone()); + state.put(blob_store.clone()); if let Some(location) = maybe_location.clone() { state.put(Location(location)); } @@ -381,73 +387,4 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { .map(|_| "DOMExceptionInvalidCharacterError") }) } - -#[derive(Debug, Clone)] -pub struct Blob { - pub data: Vec<u8>, - pub media_type: String, -} - pub struct Location(pub Url); - -#[derive(Debug, Default, Clone)] -pub struct BlobUrlStore(Arc<Mutex<HashMap<Url, Blob>>>); - -impl BlobUrlStore { - pub fn get(&self, mut url: Url) -> Result<Option<Blob>, AnyError> { - let blob_store = self.0.lock().unwrap(); - url.set_fragment(None); - Ok(blob_store.get(&url).cloned()) - } - - pub fn insert(&self, blob: Blob, maybe_location: Option<Url>) -> Url { - let origin = if let Some(location) = maybe_location { - location.origin().ascii_serialization() - } else { - "null".to_string() - }; - let id = Uuid::new_v4(); - let url = Url::parse(&format!("blob:{}/{}", origin, id)).unwrap(); - - let mut blob_store = self.0.lock().unwrap(); - blob_store.insert(url.clone(), blob); - - url - } - - pub fn remove(&self, url: &ModuleSpecifier) { - let mut blob_store = self.0.lock().unwrap(); - blob_store.remove(&url); - } -} - -pub fn op_file_create_object_url( - state: &mut deno_core::OpState, - media_type: String, - zero_copy: Option<ZeroCopyBuf>, -) -> Result<String, AnyError> { - let data = zero_copy.ok_or_else(null_opbuf)?; - let blob = Blob { - data: data.to_vec(), - media_type, - }; - - let maybe_location = state.try_borrow::<Location>(); - let blob_store = state.borrow::<BlobUrlStore>(); - - let url = - blob_store.insert(blob, maybe_location.map(|location| location.0.clone())); - - Ok(url.to_string()) -} - -pub fn op_file_revoke_object_url( - state: &mut deno_core::OpState, - url: String, - _: (), -) -> Result<(), AnyError> { - let url = Url::parse(&url)?; - let blob_store = state.borrow::<BlobUrlStore>(); - blob_store.remove(&url); - Ok(()) -} |