diff options
-rw-r--r-- | cli/tests/unit/cache_api_test.ts | 16 | ||||
-rw-r--r-- | ext/cache/01_cache.js | 42 | ||||
-rw-r--r-- | ext/cache/lib.rs | 49 | ||||
-rw-r--r-- | ext/cache/sqlite.rs | 150 |
4 files changed, 113 insertions, 144 deletions
diff --git a/cli/tests/unit/cache_api_test.ts b/cli/tests/unit/cache_api_test.ts index d2c112f77..792929870 100644 --- a/cli/tests/unit/cache_api_test.ts +++ b/cli/tests/unit/cache_api_test.ts @@ -189,3 +189,19 @@ Deno.test(async function cachePutOverwrite() { const res_ = await cache.match(request); assertEquals(await res_?.text(), "res2"); }); + +// Ensure that we can successfully put a response backed by a resource +Deno.test(async function cachePutResource() { + const tempFile = Deno.makeTempFileSync({ prefix: "deno-", suffix: ".txt" }); + Deno.writeTextFileSync(tempFile, "Contents".repeat(1024)); + + const file = Deno.openSync(tempFile); + + const cacheName = "cache-v1"; + const cache = await caches.open(cacheName); + + const request = new Request("https://example.com/file"); + await cache.put(request, new Response(file.readable)); + const res = await cache.match(request); + assertEquals(await res?.text(), "Contents".repeat(1024)); +}); diff --git a/ext/cache/01_cache.js b/ext/cache/01_cache.js index 541feb5c1..7d6e9e0ec 100644 --- a/ext/cache/01_cache.js +++ b/ext/cache/01_cache.js @@ -4,7 +4,6 @@ const { op_cache_delete, op_cache_match, op_cache_put, - op_cache_put_finish, op_cache_storage_delete, op_cache_storage_has, op_cache_storage_open, @@ -28,7 +27,11 @@ import { import { toInnerResponse } from "ext:deno_fetch/23_response.js"; import { URLPrototype } from "ext:deno_url/00_url.js"; import { getHeader } from "ext:deno_fetch/20_headers.js"; -import { readableStreamForRid } from "ext:deno_web/06_streams.js"; +import { + getReadableStreamResourceBacking, + readableStreamForRid, + resourceForReadableStream, +} from "ext:deno_web/06_streams.js"; class CacheStorage { constructor() { @@ -130,40 +133,37 @@ class Cache { if (innerResponse.body !== null && innerResponse.body.unusable()) { throw new TypeError("Response body is already used"); } - // acquire lock before async op - const reader = innerResponse.body?.stream.getReader(); + + const stream = innerResponse.body?.stream; + let rid = null; + if (stream) { + const resourceBacking = getReadableStreamResourceBacking( + innerResponse.body?.stream, + ); + if (resourceBacking) { + rid = resourceBacking.rid; + } else { + rid = resourceForReadableStream(stream, innerResponse.body?.length); + } + } // Remove fragment from request URL before put. reqUrl.hash = ""; // Step 9-11. - const rid = await op_cache_put( + // Step 12-19: TODO(@satyarohith): do the insertion in background. + await op_cache_put( { cacheId: this[_id], // deno-lint-ignore prefer-primordials requestUrl: reqUrl.toString(), responseHeaders: innerResponse.headerList, requestHeaders: innerRequest.headerList, - responseHasBody: innerResponse.body !== null, responseStatus: innerResponse.status, responseStatusText: innerResponse.statusMessage, + responseRid: rid, }, ); - if (reader) { - try { - while (true) { - const { value, done } = await reader.read(); - if (done) { - await op_cache_put_finish(rid); - break; - } - await core.writeAll(rid, value); - } - } finally { - core.close(rid); - } - } - // Step 12-19: TODO(@satyarohith): do the insertion in background. } /** See https://w3c.github.io/ServiceWorker/#cache-match */ diff --git a/ext/cache/lib.rs b/ext/cache/lib.rs index 845a6ad86..cee5b7e56 100644 --- a/ext/cache/lib.rs +++ b/ext/cache/lib.rs @@ -29,7 +29,6 @@ deno_core::extension!(deno_cache, op_cache_storage_has<CA>, op_cache_storage_delete<CA>, op_cache_put<CA>, - op_cache_put_finish<CA>, op_cache_match<CA>, op_cache_delete<CA>, ], @@ -55,9 +54,9 @@ pub struct CachePutRequest { pub request_url: String, pub request_headers: Vec<(ByteString, ByteString)>, pub response_headers: Vec<(ByteString, ByteString)>, - pub response_has_body: bool, pub response_status: u16, pub response_status_text: String, + pub response_rid: Option<ResourceId>, } #[derive(Deserialize, Serialize, Debug)] @@ -90,27 +89,24 @@ pub struct CacheDeleteRequest { #[async_trait(?Send)] pub trait Cache: Clone + 'static { - type CachePutResourceType: Resource; + type CacheMatchResourceType: Resource; async fn storage_open(&self, cache_name: String) -> Result<i64, AnyError>; async fn storage_has(&self, cache_name: String) -> Result<bool, AnyError>; async fn storage_delete(&self, cache_name: String) -> Result<bool, AnyError>; - /// Create a put request. - async fn put_create( + /// Put a resource into the cache. + async fn put( &self, request_response: CachePutRequest, - ) -> Result<Option<Rc<Self::CachePutResourceType>>, AnyError>; - /// Complete a put request. - async fn put_finish( - &self, - resource: Rc<Self::CachePutResourceType>, + resource: Option<Rc<dyn Resource>>, ) -> Result<(), AnyError>; + async fn r#match( &self, request: CacheMatchRequest, ) -> Result< - Option<(CacheMatchResponseMeta, Option<Rc<dyn Resource>>)>, + Option<(CacheMatchResponseMeta, Option<Self::CacheMatchResourceType>)>, AnyError, >; async fn delete(&self, request: CacheDeleteRequest) @@ -155,38 +151,19 @@ where } #[op2(async)] -#[smi] pub async fn op_cache_put<CA>( state: Rc<RefCell<OpState>>, #[serde] request_response: CachePutRequest, -) -> Result<Option<ResourceId>, AnyError> -where - CA: Cache, -{ - let cache = get_cache::<CA>(&state)?; - match cache.put_create(request_response).await? { - Some(resource) => { - let rid = state.borrow_mut().resource_table.add_rc_dyn(resource); - Ok(Some(rid)) - } - None => Ok(None), - } -} - -#[op2(async)] -pub async fn op_cache_put_finish<CA>( - state: Rc<RefCell<OpState>>, - #[smi] rid: ResourceId, ) -> Result<(), AnyError> where CA: Cache, { let cache = get_cache::<CA>(&state)?; - let resource = state - .borrow_mut() - .resource_table - .get::<CA::CachePutResourceType>(rid)?; - cache.put_finish(resource).await + let resource = match request_response.response_rid { + Some(rid) => Some(state.borrow_mut().resource_table.take_any(rid)?), + None => None, + }; + cache.put(request_response, resource).await } #[op2(async)] @@ -202,7 +179,7 @@ where match cache.r#match(request).await? { Some((meta, None)) => Ok(Some(CacheMatchResponse(meta, None))), Some((meta, Some(resource))) => { - let rid = state.borrow_mut().resource_table.add_rc_dyn(resource); + let rid = state.borrow_mut().resource_table.add(resource); Ok(Some(CacheMatchResponse(meta, Some(rid)))) } None => Ok(None), diff --git a/ext/cache/sqlite.rs b/ext/cache/sqlite.rs index 47ac18b05..c3c55dd5e 100644 --- a/ext/cache/sqlite.rs +++ b/ext/cache/sqlite.rs @@ -1,7 +1,7 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. - use std::borrow::Cow; use std::path::PathBuf; +use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; use std::time::SystemTime; @@ -9,16 +9,19 @@ use std::time::UNIX_EPOCH; use async_trait::async_trait; use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; use deno_core::parking_lot::Mutex; use deno_core::unsync::spawn_blocking; use deno_core::AsyncRefCell; use deno_core::AsyncResult; +use deno_core::BufMutView; use deno_core::ByteString; use deno_core::Resource; use rusqlite::params; use rusqlite::Connection; use rusqlite::OptionalExtension; use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use crate::deserialize_headers; @@ -94,7 +97,7 @@ impl SqliteBackedCache { #[async_trait(?Send)] impl Cache for SqliteBackedCache { - type CachePutResourceType = CachePutResource; + type CacheMatchResourceType = CacheResponseResource; /// Open a cache storage. Internally, this creates a row in the /// sqlite db if the cache doesn't exist and returns the internal id @@ -169,58 +172,63 @@ impl Cache for SqliteBackedCache { .await? } - async fn put_create( + async fn put( &self, request_response: CachePutRequest, - ) -> Result<Option<Rc<CachePutResource>>, AnyError> { + resource: Option<Rc<dyn Resource>>, + ) -> Result<(), AnyError> { let db = self.connection.clone(); let cache_storage_dir = self.cache_storage_dir.clone(); let now = SystemTime::now().duration_since(UNIX_EPOCH)?; - let response_body_key = if request_response.response_has_body { - Some(hash(&format!( + + if let Some(resource) = resource { + let body_key = hash(&format!( "{}_{}", &request_response.request_url, now.as_nanos() - ))) - } else { - None - }; - - if let Some(body_key) = response_body_key { + )); let responses_dir = get_responses_dir(cache_storage_dir, request_response.cache_id); let response_path = responses_dir.join(&body_key); - let file = tokio::fs::File::create(response_path).await?; - Ok(Some(Rc::new(CachePutResource { - file: AsyncRefCell::new(file), - db, - put_request: request_response, - response_body_key: body_key, - start_time: now.as_secs(), - }))) + let mut file = tokio::fs::File::create(response_path).await?; + let mut buf = BufMutView::new(64 * 1024); + loop { + let (size, buf2) = resource.clone().read_byob(buf).await?; + if size == 0 { + break; + } + buf = buf2; + + // Use poll_write to avoid holding a slice across await points + poll_fn(|cx| Pin::new(&mut file).poll_write(cx, &buf[..size])).await?; + } + + file.flush().await?; + file.sync_all().await?; + + assert_eq!( + insert_cache_asset(db, request_response, Some(body_key.clone()),) + .await?, + Some(body_key) + ); } else { - insert_cache_asset(db, request_response, None).await?; - Ok(None) + assert!(insert_cache_asset(db, request_response, None) + .await? + .is_none()); } - } - - async fn put_finish( - &self, - resource: Rc<CachePutResource>, - ) -> Result<(), AnyError> { - resource.write_to_cache().await + Ok(()) } async fn r#match( &self, request: CacheMatchRequest, ) -> Result< - Option<(CacheMatchResponseMeta, Option<Rc<dyn Resource>>)>, + Option<(CacheMatchResponseMeta, Option<CacheResponseResource>)>, AnyError, > { let db = self.connection.clone(); let cache_storage_dir = self.cache_storage_dir.clone(); - let query_result = spawn_blocking(move || { + let (query_result, request) = spawn_blocking(move || { let db = db.lock(); let result = db.query_row( "SELECT response_body_key, response_headers, response_status, response_status_text, request_headers @@ -235,10 +243,17 @@ impl Cache for SqliteBackedCache { let request_headers: Vec<u8> = row.get(4)?; let response_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&response_headers); let request_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&request_headers); - Ok((CacheMatchResponseMeta {request_headers, response_headers,response_status,response_status_text}, response_body_key)) + Ok((CacheMatchResponseMeta { + request_headers, + response_headers, + response_status, + response_status_text}, + response_body_key + )) }, ); - result.optional() + // Return ownership of request to the caller + result.optional().map(|x| (x, request)) }) .await??; @@ -261,11 +276,21 @@ impl Cache for SqliteBackedCache { let response_path = get_responses_dir(cache_storage_dir, request.cache_id) .join(response_body_key); - let file = tokio::fs::File::open(response_path).await?; - return Ok(Some(( - cache_meta, - Some(Rc::new(CacheResponseResource::new(file))), - ))); + let file = match tokio::fs::File::open(response_path).await { + Ok(file) => file, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + // Best efforts to delete the old cache item + _ = self + .delete(CacheDeleteRequest { + cache_id: request.cache_id, + request_url: request.request_url, + }) + .await; + return Ok(None); + } + Err(err) => return Err(err.into()), + }; + return Ok(Some((cache_meta, Some(CacheResponseResource::new(file))))); } Some((cache_meta, None)) => { return Ok(Some((cache_meta, None))); @@ -339,55 +364,6 @@ impl deno_core::Resource for SqliteBackedCache { } } -pub struct CachePutResource { - pub db: Arc<Mutex<rusqlite::Connection>>, - pub put_request: CachePutRequest, - pub response_body_key: String, - pub file: AsyncRefCell<tokio::fs::File>, - pub start_time: u64, -} - -impl CachePutResource { - async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> { - let resource = deno_core::RcRef::map(&self, |r| &r.file); - let mut file = resource.borrow_mut().await; - file.write_all(data).await?; - Ok(data.len()) - } - - async fn write_to_cache(self: Rc<Self>) -> Result<(), AnyError> { - let resource = deno_core::RcRef::map(&self, |r| &r.file); - let mut file = resource.borrow_mut().await; - file.flush().await?; - file.sync_all().await?; - let maybe_body_key = insert_cache_asset( - self.db.clone(), - self.put_request.clone(), - Some(self.response_body_key.clone()), - ) - .await?; - match maybe_body_key { - Some(key) => { - assert_eq!(key, self.response_body_key); - Ok(()) - } - // This should never happen because we will always have - // body key associated with CachePutResource - None => Err(deno_core::anyhow::anyhow!( - "unexpected: response body key is None" - )), - } - } -} - -impl Resource for CachePutResource { - fn name(&self) -> Cow<str> { - "CachePutResource".into() - } - - deno_core::impl_writable!(); -} - pub struct CacheResponseResource { file: AsyncRefCell<tokio::fs::File>, } |