summaryrefslogtreecommitdiff
path: root/ext/cache/sqlite.rs
diff options
context:
space:
mode:
authorSatya Rohith <me@satyarohith.com>2022-09-28 17:41:12 +0530
committerGitHub <noreply@github.com>2022-09-28 17:41:12 +0530
commitb312279e58e51520a38e51cca317a09cdadd7cb4 (patch)
treea0c6f432042ba25b569c151bbe59f1e721788d0c /ext/cache/sqlite.rs
parent1156f726a92d3d3985e591327c7526cd3e2b0473 (diff)
feat: implement Web Cache API (#15829)
Diffstat (limited to 'ext/cache/sqlite.rs')
-rw-r--r--ext/cache/sqlite.rs503
1 files changed, 503 insertions, 0 deletions
diff --git a/ext/cache/sqlite.rs b/ext/cache/sqlite.rs
new file mode 100644
index 000000000..1e5591839
--- /dev/null
+++ b/ext/cache/sqlite.rs
@@ -0,0 +1,503 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use async_trait::async_trait;
+use deno_core::error::AnyError;
+use deno_core::parking_lot::Mutex;
+use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::ByteString;
+use deno_core::Resource;
+use deno_core::ZeroCopyBuf;
+use rusqlite::params;
+use rusqlite::Connection;
+use rusqlite::OptionalExtension;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWriteExt;
+
+use std::borrow::Cow;
+use std::path::PathBuf;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::time::SystemTime;
+use std::time::UNIX_EPOCH;
+
+use crate::Cache;
+use crate::CacheDeleteRequest;
+use crate::CacheMatchRequest;
+use crate::CacheMatchResponseMeta;
+use crate::CachePutRequest;
+
+#[derive(Clone)]
+pub struct SqliteBackedCache {
+ pub connection: Arc<Mutex<Connection>>,
+ pub cache_storage_dir: PathBuf,
+}
+
+impl SqliteBackedCache {
+ pub fn new(cache_storage_dir: PathBuf) -> Self {
+ {
+ std::fs::create_dir_all(&cache_storage_dir)
+ .expect("failed to create cache dir");
+ let path = cache_storage_dir.join("cache_metadata.db");
+ let connection = rusqlite::Connection::open(&path).unwrap_or_else(|_| {
+ panic!("failed to open cache db at {}", path.display())
+ });
+ connection
+ .execute(
+ "CREATE TABLE IF NOT EXISTS cache_storage (
+ id INTEGER PRIMARY KEY,
+ cache_name TEXT NOT NULL UNIQUE
+ )",
+ (),
+ )
+ .expect("failed to create cache_storage table");
+ connection
+ .execute(
+ "CREATE TABLE IF NOT EXISTS request_response_list (
+ id INTEGER PRIMARY KEY,
+ cache_id INTEGER NOT NULL,
+ request_url TEXT NOT NULL,
+ request_headers BLOB NOT NULL,
+ response_headers BLOB NOT NULL,
+ response_status INTEGER NOT NULL,
+ response_status_text TEXT,
+ response_body_key TEXT,
+ last_inserted_at INTEGER UNSIGNED NOT NULL,
+ FOREIGN KEY (cache_id) REFERENCES cache_storage(id) ON DELETE CASCADE,
+
+ UNIQUE (cache_id, request_url)
+ )",
+ (),
+ )
+ .expect("failed to create request_response_list table");
+ SqliteBackedCache {
+ connection: Arc::new(Mutex::new(connection)),
+ cache_storage_dir,
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl Cache for SqliteBackedCache {
+ /// Open a cache storage. Internally, this creates a row in the
+ /// sqlite db if the cache doesn't exist and returns the internal id
+ /// of the cache.
+ async fn storage_open(&self, cache_name: String) -> Result<i64, AnyError> {
+ let db = self.connection.clone();
+ let cache_storage_dir = self.cache_storage_dir.clone();
+ tokio::task::spawn_blocking(move || {
+ let db = db.lock();
+ db.execute(
+ "INSERT OR IGNORE INTO cache_storage (cache_name) VALUES (?1)",
+ params![cache_name],
+ )?;
+ let cache_id = db.query_row(
+ "SELECT id FROM cache_storage WHERE cache_name = ?1",
+ params![cache_name],
+ |row| {
+ let id: i64 = row.get(0)?;
+ Ok(id)
+ },
+ )?;
+ let responses_dir = get_responses_dir(cache_storage_dir, cache_id);
+ std::fs::create_dir_all(&responses_dir)?;
+ Ok::<i64, AnyError>(cache_id)
+ })
+ .await?
+ }
+
+ /// Check if a cache with the provided name exists.
+ /// Note: this doesn't check the disk, it only checks the sqlite db.
+ async fn storage_has(&self, cache_name: String) -> Result<bool, AnyError> {
+ let db = self.connection.clone();
+ tokio::task::spawn_blocking(move || {
+ let db = db.lock();
+ let cache_exists = db.query_row(
+ "SELECT count(cache_name) FROM cache_storage WHERE cache_name = ?1",
+ params![cache_name],
+ |row| {
+ let count: i64 = row.get(0)?;
+ Ok(count > 0)
+ },
+ )?;
+ Ok::<bool, AnyError>(cache_exists)
+ })
+ .await?
+ }
+
+ /// Delete a cache storage. Internally, this deletes the row in the sqlite db.
+ async fn storage_delete(&self, cache_name: String) -> Result<bool, AnyError> {
+ let db = self.connection.clone();
+ let cache_storage_dir = self.cache_storage_dir.clone();
+ tokio::task::spawn_blocking(move || {
+ let db = db.lock();
+ let maybe_cache_id = db
+ .query_row(
+ "DELETE FROM cache_storage WHERE cache_name = ?1 RETURNING id",
+ params![cache_name],
+ |row| {
+ let id: i64 = row.get(0)?;
+ Ok(id)
+ },
+ )
+ .optional()?;
+ if let Some(cache_id) = maybe_cache_id {
+ let cache_dir = cache_storage_dir.join(cache_id.to_string());
+ if cache_dir.exists() {
+ std::fs::remove_dir_all(cache_dir)?;
+ }
+ }
+ Ok::<bool, AnyError>(maybe_cache_id.is_some())
+ })
+ .await?
+ }
+
+ async fn put(
+ &self,
+ request_response: CachePutRequest,
+ ) -> Result<Option<Rc<dyn Resource>>, AnyError> {
+ let db = self.connection.clone();
+ let cache_storage_dir = self.cache_storage_dir.clone();
+ let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
+ let response_body_key = if request_response.response_has_body {
+ Some(hash(&format!(
+ "{}_{}",
+ &request_response.request_url,
+ now.as_nanos()
+ )))
+ } else {
+ None
+ };
+
+ if let Some(body_key) = response_body_key {
+ let responses_dir =
+ get_responses_dir(cache_storage_dir, request_response.cache_id);
+ let response_path = responses_dir.join(&body_key);
+ let file = tokio::fs::File::create(response_path).await?;
+ Ok(Some(Rc::new(CachePutResource {
+ file: AsyncRefCell::new(file),
+ db,
+ put_request: request_response,
+ response_body_key: body_key,
+ start_time: now.as_secs(),
+ })))
+ } else {
+ insert_cache_asset(db, request_response, None).await?;
+ Ok(None)
+ }
+ }
+
+ async fn r#match(
+ &self,
+ request: CacheMatchRequest,
+ ) -> Result<
+ Option<(CacheMatchResponseMeta, Option<Rc<dyn Resource>>)>,
+ AnyError,
+ > {
+ let db = self.connection.clone();
+ let cache_storage_dir = self.cache_storage_dir.clone();
+ let query_result = tokio::task::spawn_blocking(move || {
+ let db = db.lock();
+ let result = db.query_row(
+ "SELECT response_body_key, response_headers, response_status, response_status_text, request_headers
+ FROM request_response_list
+ WHERE cache_id = ?1 AND request_url = ?2",
+ (request.cache_id, &request.request_url),
+ |row| {
+ let response_body_key: Option<String> = row.get(0)?;
+ let response_headers: Vec<u8> = row.get(1)?;
+ let response_status: u16 = row.get(2)?;
+ let response_status_text: String = row.get(3)?;
+ let request_headers: Vec<u8> = row.get(4)?;
+ let response_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&response_headers);
+ let request_headers: Vec<(ByteString, ByteString)> = deserialize_headers(&request_headers);
+ Ok((CacheMatchResponseMeta {request_headers, response_headers,response_status,response_status_text}, response_body_key))
+ },
+ );
+ result.optional()
+ })
+ .await??;
+
+ match query_result {
+ Some((cache_meta, Some(response_body_key))) => {
+ // From https://w3c.github.io/ServiceWorker/#request-matches-cached-item-algorithm
+ // If there's Vary header in the response, ensure all the
+ // headers of the cached request match the query request.
+ if let Some(vary_header) =
+ get_header("vary", &cache_meta.response_headers)
+ {
+ if !vary_header_matches(
+ &vary_header,
+ &request.request_headers,
+ &cache_meta.request_headers,
+ ) {
+ return Ok(None);
+ }
+ }
+ let response_path =
+ get_responses_dir(cache_storage_dir, request.cache_id)
+ .join(response_body_key);
+ let file = tokio::fs::File::open(response_path).await?;
+ return Ok(Some((
+ cache_meta,
+ Some(Rc::new(CacheResponseResource::new(file))),
+ )));
+ }
+ Some((cache_meta, None)) => {
+ return Ok(Some((cache_meta, None)));
+ }
+ None => return Ok(None),
+ }
+ }
+
+ async fn delete(
+ &self,
+ request: CacheDeleteRequest,
+ ) -> Result<bool, AnyError> {
+ let db = self.connection.clone();
+ tokio::task::spawn_blocking(move || {
+ // TODO(@satyarohith): remove the response body from disk if one exists
+ let db = db.lock();
+ let rows_effected = db.execute(
+ "DELETE FROM request_response_list WHERE cache_id = ?1 AND request_url = ?2",
+ (request.cache_id, &request.request_url),
+ )?;
+ Ok::<bool, AnyError>(rows_effected > 0)
+ })
+ .await?
+ }
+}
+
+async fn insert_cache_asset(
+ db: Arc<Mutex<rusqlite::Connection>>,
+ put: CachePutRequest,
+ body_key_start_time: Option<(String, u64)>,
+) -> Result<Option<String>, deno_core::anyhow::Error> {
+ tokio::task::spawn_blocking(move || {
+ let maybe_response_body = {
+ let db = db.lock();
+ let mut response_body_key = None;
+ if let Some((body_key, start_time)) = body_key_start_time {
+ response_body_key = Some(body_key);
+ let last_inserted_at = db.query_row("
+ SELECT last_inserted_at FROM request_response_list
+ WHERE cache_id = ?1 AND request_url = ?2",
+ (put.cache_id, &put.request_url), |row| {
+ let last_inserted_at: i64 = row.get(0)?;
+ Ok(last_inserted_at)
+ }).optional()?;
+ if let Some(last_inserted) = last_inserted_at {
+ // Some other worker has already inserted this resource into the cache.
+ // Note: okay to unwrap() as it is always present when response_body_key is present.
+ if start_time > (last_inserted as u64) {
+ return Ok(None);
+ }
+ }
+ }
+ db.query_row(
+ "INSERT OR REPLACE INTO request_response_list
+ (cache_id, request_url, request_headers, response_headers,
+ response_body_key, response_status, response_status_text, last_inserted_at)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
+ RETURNING response_body_key",
+ (
+ put.cache_id,
+ put.request_url,
+ serialize_headers(&put.request_headers),
+ serialize_headers(&put.response_headers),
+ response_body_key,
+ put.response_status,
+ put.response_status_text,
+ SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
+ ),
+ |row| {
+ let response_body_key: Option<String> = row.get(0)?;
+ Ok(response_body_key)
+ },
+ )?
+ };
+ Ok::<Option<String>, AnyError>(maybe_response_body)
+ }).await?
+}
+
+#[inline]
+fn get_responses_dir(cache_storage_dir: PathBuf, cache_id: i64) -> PathBuf {
+ cache_storage_dir
+ .join(cache_id.to_string())
+ .join("responses")
+}
+
+/// Check if the headers provided in the vary_header match
+/// the query request headers and the cached request headers.
+fn vary_header_matches(
+ vary_header: &ByteString,
+ query_request_headers: &[(ByteString, ByteString)],
+ cached_request_headers: &[(ByteString, ByteString)],
+) -> bool {
+ let vary_header = match std::str::from_utf8(vary_header) {
+ Ok(vary_header) => vary_header,
+ Err(_) => return false,
+ };
+ let headers = get_headers_from_vary_header(vary_header);
+ for header in headers {
+ let query_header = get_header(&header, query_request_headers);
+ let cached_header = get_header(&header, cached_request_headers);
+ if query_header != cached_header {
+ return false;
+ }
+ }
+ true
+}
+
+fn get_headers_from_vary_header(vary_header: &str) -> Vec<String> {
+ vary_header
+ .split(',')
+ .map(|s| s.trim().to_lowercase())
+ .collect()
+}
+
+fn get_header(
+ name: &str,
+ headers: &[(ByteString, ByteString)],
+) -> Option<ByteString> {
+ headers
+ .iter()
+ .find(|(k, _)| {
+ if let Ok(k) = std::str::from_utf8(k) {
+ k.eq_ignore_ascii_case(name)
+ } else {
+ false
+ }
+ })
+ .map(|(_, v)| v.to_owned())
+}
+
+impl deno_core::Resource for SqliteBackedCache {
+ fn name(&self) -> std::borrow::Cow<str> {
+ "SqliteBackedCache".into()
+ }
+}
+
+pub struct CachePutResource {
+ pub db: Arc<Mutex<rusqlite::Connection>>,
+ pub put_request: CachePutRequest,
+ pub response_body_key: String,
+ pub file: AsyncRefCell<tokio::fs::File>,
+ pub start_time: u64,
+}
+
+impl CachePutResource {
+ async fn write(self: Rc<Self>, data: ZeroCopyBuf) -> Result<usize, AnyError> {
+ let resource = deno_core::RcRef::map(&self, |r| &r.file);
+ let mut file = resource.borrow_mut().await;
+ file.write_all(&data).await?;
+ Ok(data.len())
+ }
+
+ async fn shutdown(self: Rc<Self>) -> Result<(), AnyError> {
+ let resource = deno_core::RcRef::map(&self, |r| &r.file);
+ let mut file = resource.borrow_mut().await;
+ file.flush().await?;
+ file.sync_all().await?;
+ insert_cache_asset(
+ self.db.clone(),
+ self.put_request.clone(),
+ Some((self.response_body_key.clone(), self.start_time)),
+ )
+ .await?;
+ Ok(())
+ }
+}
+
+impl Resource for CachePutResource {
+ fn name(&self) -> Cow<str> {
+ "CachePutResource".into()
+ }
+
+ fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ Box::pin(self.write(buf))
+ }
+
+ fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
+ Box::pin(self.shutdown())
+ }
+}
+
+pub struct CacheResponseResource {
+ file: AsyncRefCell<tokio::fs::File>,
+}
+
+impl CacheResponseResource {
+ fn new(file: tokio::fs::File) -> Self {
+ Self {
+ file: AsyncRefCell::new(file),
+ }
+ }
+
+ async fn read(
+ self: Rc<Self>,
+ mut buf: ZeroCopyBuf,
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ let resource = deno_core::RcRef::map(&self, |r| &r.file);
+ let mut file = resource.borrow_mut().await;
+ let nread = file.read(&mut buf).await?;
+ Ok((nread, buf))
+ }
+}
+
+impl Resource for CacheResponseResource {
+ fn name(&self) -> Cow<str> {
+ "CacheResponseResource".into()
+ }
+
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ Box::pin(self.read(buf))
+ }
+}
+
+pub fn hash(token: &str) -> String {
+ use sha2::Digest;
+ format!("{:x}", sha2::Sha256::digest(token.as_bytes()))
+}
+
+fn serialize_headers(headers: &[(ByteString, ByteString)]) -> Vec<u8> {
+ let mut serialized_headers = Vec::new();
+ for (name, value) in headers {
+ serialized_headers.extend_from_slice(name);
+ serialized_headers.extend_from_slice(b"\r\n");
+ serialized_headers.extend_from_slice(value);
+ serialized_headers.extend_from_slice(b"\r\n");
+ }
+ serialized_headers
+}
+
+fn deserialize_headers(
+ serialized_headers: &[u8],
+) -> Vec<(ByteString, ByteString)> {
+ let mut headers = Vec::new();
+ let mut piece = None;
+ let mut start = 0;
+ for (i, byte) in serialized_headers.iter().enumerate() {
+ if byte == &b'\r' && serialized_headers.get(i + 1) == Some(&b'\n') {
+ if piece.is_none() {
+ piece = Some(start..i);
+ } else {
+ let name = piece.unwrap();
+ let value = start..i;
+ headers.push((
+ ByteString::from(&serialized_headers[name]),
+ ByteString::from(&serialized_headers[value]),
+ ));
+ piece = None;
+ }
+ start = i + 2;
+ }
+ }
+ assert!(piece.is_none());
+ assert_eq!(start, serialized_headers.len());
+ headers
+}