summaryrefslogtreecommitdiff
path: root/ext/cache/sqlite.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2024-01-15 13:14:54 -0700
committerGitHub <noreply@github.com>2024-01-15 13:14:54 -0700
commit3ff80eb1521c49a43e0fae53840e5a636571ebfe (patch)
treec2054112d6fea2f102300f5d2c5743dd23f8a9f8 /ext/cache/sqlite.rs
parent72ecfe04198c5e912826663033a8963fbdea4521 (diff)
chore(ext/cache): remove CachePutResource in preparation for resource rewrite (#21949)
We can use `resourceForReadableStream` to ensure that cached resources are implemented more efficiently and remove one more resource special case.
Diffstat (limited to 'ext/cache/sqlite.rs')
-rw-r--r--ext/cache/sqlite.rs150
1 files changed, 63 insertions, 87 deletions
diff --git a/ext/cache/sqlite.rs b/ext/cache/sqlite.rs
index 47ac18b05..c3c55dd5e 100644
--- a/ext/cache/sqlite.rs
+++ b/ext/cache/sqlite.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
-
use std::borrow::Cow;
use std::path::PathBuf;
+use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::time::SystemTime;
@@ -9,16 +9,19 @@ use std::time::UNIX_EPOCH;
use async_trait::async_trait;
use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
use deno_core::parking_lot::Mutex;
use deno_core::unsync::spawn_blocking;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
+use deno_core::BufMutView;
use deno_core::ByteString;
use deno_core::Resource;
use rusqlite::params;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use crate::deserialize_headers;
@@ -94,7 +97,7 @@ impl SqliteBackedCache {
#[async_trait(?Send)]
impl Cache for SqliteBackedCache {
- type CachePutResourceType = CachePutResource;
+ type CacheMatchResourceType = CacheResponseResource;
/// Open a cache storage. Internally, this creates a row in the
/// sqlite db if the cache doesn't exist and returns the internal id
@@ -169,58 +172,63 @@ impl Cache for SqliteBackedCache {
.await?
}
- async fn put_create(
+ async fn put(
&self,
request_response: CachePutRequest,
- ) -> Result<Option<Rc<CachePutResource>>, AnyError> {
+ resource: Option<Rc<dyn Resource>>,
+ ) -> Result<(), AnyError> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
- let response_body_key = if request_response.response_has_body {
- Some(hash(&format!(
+
+ if let Some(resource) = resource {
+ let body_key = hash(&format!(
"{}_{}",
&request_response.request_url,
now.as_nanos()
- )))
- } else {
- None
- };
-
- if let Some(body_key) = response_body_key {
+ ));
let responses_dir =
get_responses_dir(cache_storage_dir, request_response.cache_id);
let response_path = responses_dir.join(&body_key);
- let file = tokio::fs::File::create(response_path).await?;
- Ok(Some(Rc::new(CachePutResource {
- file: AsyncRefCell::new(file),
- db,
- put_request: request_response,
- response_body_key: body_key,
- start_time: now.as_secs(),
- })))
+ let mut file = tokio::fs::File::create(response_path).await?;
+ let mut buf = BufMutView::new(64 * 1024);
+ loop {
+ let (size, buf2) = resource.clone().read_byob(buf).await?;
+ if size == 0 {
+ break;
+ }
+ buf = buf2;
+
+ // Use poll_write to avoid holding a slice across await points
+ poll_fn(|cx| Pin::new(&mut file).poll_write(cx, &buf[..size])).await?;
+ }
+
+ file.flush().await?;
+ file.sync_all().await?;
+
+ assert_eq!(
+ insert_cache_asset(db, request_response, Some(body_key.clone()),)
+ .await?,
+ Some(body_key)
+ );
} else {
- insert_cache_asset(db, request_response, None).await?;
- Ok(None)
+ assert!(insert_cache_asset(db, request_response, None)
+ .await?
+ .is_none());
}
- }
-
- async fn put_finish(
- &self,
- resource: Rc<CachePutResource>,
- ) -> Result<(), AnyError> {
- resource.write_to_cache().await
+ Ok(())
}
async fn r#match(
&self,
request: CacheMatchRequest,
) -> Result<
- Option<(CacheMatchResponseMeta, Option<Rc<dyn Resource>>)>,
+ Option<(CacheMatchResponseMeta, Option<CacheResponseResource>)>,
AnyError,
> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
- let query_result = spawn_blocking(move || {
+ let (query_result, request) = spawn_blocking(move || {
let db = db.lock();
let result = db.query_row(
"SELECT response_body_key, response_headers, response_status, response_status_text, request_headers
@@ -235,10 +243,17 @@ impl Cache for SqliteBackedCache {
let request_headers: Vec<u8> = row.get(4)?;
let response_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&response_headers);
let request_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&request_headers);
- Ok((CacheMatchResponseMeta {request_headers, response_headers,response_status,response_status_text}, response_body_key))
+ Ok((CacheMatchResponseMeta {
+ request_headers,
+ response_headers,
+ response_status,
+ response_status_text},
+ response_body_key
+ ))
},
);
- result.optional()
+ // Return ownership of request to the caller
+ result.optional().map(|x| (x, request))
})
.await??;
@@ -261,11 +276,21 @@ impl Cache for SqliteBackedCache {
let response_path =
get_responses_dir(cache_storage_dir, request.cache_id)
.join(response_body_key);
- let file = tokio::fs::File::open(response_path).await?;
- return Ok(Some((
- cache_meta,
- Some(Rc::new(CacheResponseResource::new(file))),
- )));
+ let file = match tokio::fs::File::open(response_path).await {
+ Ok(file) => file,
+ Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
+ // Best efforts to delete the old cache item
+ _ = self
+ .delete(CacheDeleteRequest {
+ cache_id: request.cache_id,
+ request_url: request.request_url,
+ })
+ .await;
+ return Ok(None);
+ }
+ Err(err) => return Err(err.into()),
+ };
+ return Ok(Some((cache_meta, Some(CacheResponseResource::new(file)))));
}
Some((cache_meta, None)) => {
return Ok(Some((cache_meta, None)));
@@ -339,55 +364,6 @@ impl deno_core::Resource for SqliteBackedCache {
}
}
-pub struct CachePutResource {
- pub db: Arc<Mutex<rusqlite::Connection>>,
- pub put_request: CachePutRequest,
- pub response_body_key: String,
- pub file: AsyncRefCell<tokio::fs::File>,
- pub start_time: u64,
-}
-
-impl CachePutResource {
- async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
- let resource = deno_core::RcRef::map(&self, |r| &r.file);
- let mut file = resource.borrow_mut().await;
- file.write_all(data).await?;
- Ok(data.len())
- }
-
- async fn write_to_cache(self: Rc<Self>) -> Result<(), AnyError> {
- let resource = deno_core::RcRef::map(&self, |r| &r.file);
- let mut file = resource.borrow_mut().await;
- file.flush().await?;
- file.sync_all().await?;
- let maybe_body_key = insert_cache_asset(
- self.db.clone(),
- self.put_request.clone(),
- Some(self.response_body_key.clone()),
- )
- .await?;
- match maybe_body_key {
- Some(key) => {
- assert_eq!(key, self.response_body_key);
- Ok(())
- }
- // This should never happen because we will always have
- // body key associated with CachePutResource
- None => Err(deno_core::anyhow::anyhow!(
- "unexpected: response body key is None"
- )),
- }
- }
-}
-
-impl Resource for CachePutResource {
- fn name(&self) -> Cow<str> {
- "CachePutResource".into()
- }
-
- deno_core::impl_writable!();
-}
-
pub struct CacheResponseResource {
file: AsyncRefCell<tokio::fs::File>,
}