summaryrefslogtreecommitdiff
path: root/ext/cache
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2024-01-15 13:14:54 -0700
committerGitHub <noreply@github.com>2024-01-15 13:14:54 -0700
commit3ff80eb1521c49a43e0fae53840e5a636571ebfe (patch)
treec2054112d6fea2f102300f5d2c5743dd23f8a9f8 /ext/cache
parent72ecfe04198c5e912826663033a8963fbdea4521 (diff)
chore(ext/cache): remove CachePutResource in preparation for resource rewrite (#21949)
We can use `resourceForReadableStream` to ensure that cached resources are implemented more efficiently and remove one more resource special case.
Diffstat (limited to 'ext/cache')
-rw-r--r--ext/cache/01_cache.js42
-rw-r--r--ext/cache/lib.rs49
-rw-r--r--ext/cache/sqlite.rs150
3 files changed, 97 insertions, 144 deletions
diff --git a/ext/cache/01_cache.js b/ext/cache/01_cache.js
index 541feb5c1..7d6e9e0ec 100644
--- a/ext/cache/01_cache.js
+++ b/ext/cache/01_cache.js
@@ -4,7 +4,6 @@ const {
op_cache_delete,
op_cache_match,
op_cache_put,
- op_cache_put_finish,
op_cache_storage_delete,
op_cache_storage_has,
op_cache_storage_open,
@@ -28,7 +27,11 @@ import {
import { toInnerResponse } from "ext:deno_fetch/23_response.js";
import { URLPrototype } from "ext:deno_url/00_url.js";
import { getHeader } from "ext:deno_fetch/20_headers.js";
-import { readableStreamForRid } from "ext:deno_web/06_streams.js";
+import {
+ getReadableStreamResourceBacking,
+ readableStreamForRid,
+ resourceForReadableStream,
+} from "ext:deno_web/06_streams.js";
class CacheStorage {
constructor() {
@@ -130,40 +133,37 @@ class Cache {
if (innerResponse.body !== null && innerResponse.body.unusable()) {
throw new TypeError("Response body is already used");
}
- // acquire lock before async op
- const reader = innerResponse.body?.stream.getReader();
+
+ const stream = innerResponse.body?.stream;
+ let rid = null;
+ if (stream) {
+ const resourceBacking = getReadableStreamResourceBacking(
+ innerResponse.body?.stream,
+ );
+ if (resourceBacking) {
+ rid = resourceBacking.rid;
+ } else {
+ rid = resourceForReadableStream(stream, innerResponse.body?.length);
+ }
+ }
// Remove fragment from request URL before put.
reqUrl.hash = "";
// Step 9-11.
- const rid = await op_cache_put(
+ // Step 12-19: TODO(@satyarohith): do the insertion in background.
+ await op_cache_put(
{
cacheId: this[_id],
// deno-lint-ignore prefer-primordials
requestUrl: reqUrl.toString(),
responseHeaders: innerResponse.headerList,
requestHeaders: innerRequest.headerList,
- responseHasBody: innerResponse.body !== null,
responseStatus: innerResponse.status,
responseStatusText: innerResponse.statusMessage,
+ responseRid: rid,
},
);
- if (reader) {
- try {
- while (true) {
- const { value, done } = await reader.read();
- if (done) {
- await op_cache_put_finish(rid);
- break;
- }
- await core.writeAll(rid, value);
- }
- } finally {
- core.close(rid);
- }
- }
- // Step 12-19: TODO(@satyarohith): do the insertion in background.
}
/** See https://w3c.github.io/ServiceWorker/#cache-match */
diff --git a/ext/cache/lib.rs b/ext/cache/lib.rs
index 845a6ad86..cee5b7e56 100644
--- a/ext/cache/lib.rs
+++ b/ext/cache/lib.rs
@@ -29,7 +29,6 @@ deno_core::extension!(deno_cache,
op_cache_storage_has<CA>,
op_cache_storage_delete<CA>,
op_cache_put<CA>,
- op_cache_put_finish<CA>,
op_cache_match<CA>,
op_cache_delete<CA>,
],
@@ -55,9 +54,9 @@ pub struct CachePutRequest {
pub request_url: String,
pub request_headers: Vec<(ByteString, ByteString)>,
pub response_headers: Vec<(ByteString, ByteString)>,
- pub response_has_body: bool,
pub response_status: u16,
pub response_status_text: String,
+ pub response_rid: Option<ResourceId>,
}
#[derive(Deserialize, Serialize, Debug)]
@@ -90,27 +89,24 @@ pub struct CacheDeleteRequest {
#[async_trait(?Send)]
pub trait Cache: Clone + 'static {
- type CachePutResourceType: Resource;
+ type CacheMatchResourceType: Resource;
async fn storage_open(&self, cache_name: String) -> Result<i64, AnyError>;
async fn storage_has(&self, cache_name: String) -> Result<bool, AnyError>;
async fn storage_delete(&self, cache_name: String) -> Result<bool, AnyError>;
- /// Create a put request.
- async fn put_create(
+ /// Put a resource into the cache.
+ async fn put(
&self,
request_response: CachePutRequest,
- ) -> Result<Option<Rc<Self::CachePutResourceType>>, AnyError>;
- /// Complete a put request.
- async fn put_finish(
- &self,
- resource: Rc<Self::CachePutResourceType>,
+ resource: Option<Rc<dyn Resource>>,
) -> Result<(), AnyError>;
+
async fn r#match(
&self,
request: CacheMatchRequest,
) -> Result<
- Option<(CacheMatchResponseMeta, Option<Rc<dyn Resource>>)>,
+ Option<(CacheMatchResponseMeta, Option<Self::CacheMatchResourceType>)>,
AnyError,
>;
async fn delete(&self, request: CacheDeleteRequest)
@@ -155,38 +151,19 @@ where
}
#[op2(async)]
-#[smi]
pub async fn op_cache_put<CA>(
state: Rc<RefCell<OpState>>,
#[serde] request_response: CachePutRequest,
-) -> Result<Option<ResourceId>, AnyError>
-where
- CA: Cache,
-{
- let cache = get_cache::<CA>(&state)?;
- match cache.put_create(request_response).await? {
- Some(resource) => {
- let rid = state.borrow_mut().resource_table.add_rc_dyn(resource);
- Ok(Some(rid))
- }
- None => Ok(None),
- }
-}
-
-#[op2(async)]
-pub async fn op_cache_put_finish<CA>(
- state: Rc<RefCell<OpState>>,
- #[smi] rid: ResourceId,
) -> Result<(), AnyError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<CA::CachePutResourceType>(rid)?;
- cache.put_finish(resource).await
+ let resource = match request_response.response_rid {
+ Some(rid) => Some(state.borrow_mut().resource_table.take_any(rid)?),
+ None => None,
+ };
+ cache.put(request_response, resource).await
}
#[op2(async)]
@@ -202,7 +179,7 @@ where
match cache.r#match(request).await? {
Some((meta, None)) => Ok(Some(CacheMatchResponse(meta, None))),
Some((meta, Some(resource))) => {
- let rid = state.borrow_mut().resource_table.add_rc_dyn(resource);
+ let rid = state.borrow_mut().resource_table.add(resource);
Ok(Some(CacheMatchResponse(meta, Some(rid))))
}
None => Ok(None),
diff --git a/ext/cache/sqlite.rs b/ext/cache/sqlite.rs
index 47ac18b05..c3c55dd5e 100644
--- a/ext/cache/sqlite.rs
+++ b/ext/cache/sqlite.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
-
use std::borrow::Cow;
use std::path::PathBuf;
+use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::time::SystemTime;
@@ -9,16 +9,19 @@ use std::time::UNIX_EPOCH;
use async_trait::async_trait;
use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
use deno_core::parking_lot::Mutex;
use deno_core::unsync::spawn_blocking;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
+use deno_core::BufMutView;
use deno_core::ByteString;
use deno_core::Resource;
use rusqlite::params;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use crate::deserialize_headers;
@@ -94,7 +97,7 @@ impl SqliteBackedCache {
#[async_trait(?Send)]
impl Cache for SqliteBackedCache {
- type CachePutResourceType = CachePutResource;
+ type CacheMatchResourceType = CacheResponseResource;
/// Open a cache storage. Internally, this creates a row in the
/// sqlite db if the cache doesn't exist and returns the internal id
@@ -169,58 +172,63 @@ impl Cache for SqliteBackedCache {
.await?
}
- async fn put_create(
+ async fn put(
&self,
request_response: CachePutRequest,
- ) -> Result<Option<Rc<CachePutResource>>, AnyError> {
+ resource: Option<Rc<dyn Resource>>,
+ ) -> Result<(), AnyError> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
- let response_body_key = if request_response.response_has_body {
- Some(hash(&format!(
+
+ if let Some(resource) = resource {
+ let body_key = hash(&format!(
"{}_{}",
&request_response.request_url,
now.as_nanos()
- )))
- } else {
- None
- };
-
- if let Some(body_key) = response_body_key {
+ ));
let responses_dir =
get_responses_dir(cache_storage_dir, request_response.cache_id);
let response_path = responses_dir.join(&body_key);
- let file = tokio::fs::File::create(response_path).await?;
- Ok(Some(Rc::new(CachePutResource {
- file: AsyncRefCell::new(file),
- db,
- put_request: request_response,
- response_body_key: body_key,
- start_time: now.as_secs(),
- })))
+ let mut file = tokio::fs::File::create(response_path).await?;
+ let mut buf = BufMutView::new(64 * 1024);
+ loop {
+ let (size, buf2) = resource.clone().read_byob(buf).await?;
+ if size == 0 {
+ break;
+ }
+ buf = buf2;
+
+ // Use poll_write to avoid holding a slice across await points
+ poll_fn(|cx| Pin::new(&mut file).poll_write(cx, &buf[..size])).await?;
+ }
+
+ file.flush().await?;
+ file.sync_all().await?;
+
+ assert_eq!(
+ insert_cache_asset(db, request_response, Some(body_key.clone()),)
+ .await?,
+ Some(body_key)
+ );
} else {
- insert_cache_asset(db, request_response, None).await?;
- Ok(None)
+ assert!(insert_cache_asset(db, request_response, None)
+ .await?
+ .is_none());
}
- }
-
- async fn put_finish(
- &self,
- resource: Rc<CachePutResource>,
- ) -> Result<(), AnyError> {
- resource.write_to_cache().await
+ Ok(())
}
async fn r#match(
&self,
request: CacheMatchRequest,
) -> Result<
- Option<(CacheMatchResponseMeta, Option<Rc<dyn Resource>>)>,
+ Option<(CacheMatchResponseMeta, Option<CacheResponseResource>)>,
AnyError,
> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
- let query_result = spawn_blocking(move || {
+ let (query_result, request) = spawn_blocking(move || {
let db = db.lock();
let result = db.query_row(
"SELECT response_body_key, response_headers, response_status, response_status_text, request_headers
@@ -235,10 +243,17 @@ impl Cache for SqliteBackedCache {
let request_headers: Vec<u8> = row.get(4)?;
let response_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&response_headers);
let request_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&request_headers);
- Ok((CacheMatchResponseMeta {request_headers, response_headers,response_status,response_status_text}, response_body_key))
+ Ok((CacheMatchResponseMeta {
+ request_headers,
+ response_headers,
+ response_status,
+ response_status_text},
+ response_body_key
+ ))
},
);
- result.optional()
+ // Return ownership of request to the caller
+ result.optional().map(|x| (x, request))
})
.await??;
@@ -261,11 +276,21 @@ impl Cache for SqliteBackedCache {
let response_path =
get_responses_dir(cache_storage_dir, request.cache_id)
.join(response_body_key);
- let file = tokio::fs::File::open(response_path).await?;
- return Ok(Some((
- cache_meta,
- Some(Rc::new(CacheResponseResource::new(file))),
- )));
+ let file = match tokio::fs::File::open(response_path).await {
+ Ok(file) => file,
+ Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
+ // Best efforts to delete the old cache item
+ _ = self
+ .delete(CacheDeleteRequest {
+ cache_id: request.cache_id,
+ request_url: request.request_url,
+ })
+ .await;
+ return Ok(None);
+ }
+ Err(err) => return Err(err.into()),
+ };
+ return Ok(Some((cache_meta, Some(CacheResponseResource::new(file)))));
}
Some((cache_meta, None)) => {
return Ok(Some((cache_meta, None)));
@@ -339,55 +364,6 @@ impl deno_core::Resource for SqliteBackedCache {
}
}
-pub struct CachePutResource {
- pub db: Arc<Mutex<rusqlite::Connection>>,
- pub put_request: CachePutRequest,
- pub response_body_key: String,
- pub file: AsyncRefCell<tokio::fs::File>,
- pub start_time: u64,
-}
-
-impl CachePutResource {
- async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
- let resource = deno_core::RcRef::map(&self, |r| &r.file);
- let mut file = resource.borrow_mut().await;
- file.write_all(data).await?;
- Ok(data.len())
- }
-
- async fn write_to_cache(self: Rc<Self>) -> Result<(), AnyError> {
- let resource = deno_core::RcRef::map(&self, |r| &r.file);
- let mut file = resource.borrow_mut().await;
- file.flush().await?;
- file.sync_all().await?;
- let maybe_body_key = insert_cache_asset(
- self.db.clone(),
- self.put_request.clone(),
- Some(self.response_body_key.clone()),
- )
- .await?;
- match maybe_body_key {
- Some(key) => {
- assert_eq!(key, self.response_body_key);
- Ok(())
- }
- // This should never happen because we will always have
- // body key associated with CachePutResource
- None => Err(deno_core::anyhow::anyhow!(
- "unexpected: response body key is None"
- )),
- }
- }
-}
-
-impl Resource for CachePutResource {
- fn name(&self) -> Cow<str> {
- "CachePutResource".into()
- }
-
- deno_core::impl_writable!();
-}
-
pub struct CacheResponseResource {
file: AsyncRefCell<tokio::fs::File>,
}