diff options
Diffstat (limited to 'ext/kv/remote.rs')
-rw-r--r-- | ext/kv/remote.rs | 44 |
1 files changed, 42 insertions, 2 deletions
diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs index a1273e78b..7541b5a06 100644 --- a/ext/kv/remote.rs +++ b/ext/kv/remote.rs @@ -8,10 +8,14 @@ use std::sync::Arc; use crate::DatabaseHandler; use anyhow::Context; use async_trait::async_trait; +use bytes::Bytes; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::Stream; +use deno_core::futures::TryStreamExt as _; use deno_core::OpState; use deno_fetch::create_http_client; +use deno_fetch::reqwest; use deno_fetch::CreateHttpClientOptions; use deno_tls::rustls::RootCertStore; use deno_tls::Proxy; @@ -19,6 +23,8 @@ use deno_tls::RootCertStoreProvider; use deno_tls::TlsKeys; use denokv_remote::MetadataEndpoint; use denokv_remote::Remote; +use denokv_remote::RemoteResponse; +use denokv_remote::RemoteTransport; use url::Url; #[derive(Clone)] @@ -102,11 +108,44 @@ impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions } } +#[derive(Clone)] +pub struct ReqwestClient(reqwest::Client); +pub struct ReqwestResponse(reqwest::Response); + +impl RemoteTransport for ReqwestClient { + type Response = ReqwestResponse; + async fn post( + &self, + url: Url, + headers: http::HeaderMap, + body: Bytes, + ) -> Result<(Url, http::StatusCode, Self::Response), anyhow::Error> { + let res = self.0.post(url).headers(headers).body(body).send().await?; + let url = res.url().clone(); + let status = res.status(); + Ok((url, status, ReqwestResponse(res))) + } +} + +impl RemoteResponse for ReqwestResponse { + async fn bytes(self) -> Result<Bytes, anyhow::Error> { + Ok(self.0.bytes().await?) + } + fn stream( + self, + ) -> impl Stream<Item = Result<Bytes, anyhow::Error>> + Send + Sync { + self.0.bytes_stream().map_err(|e| e.into()) + } + async fn text(self) -> Result<String, anyhow::Error> { + Ok(self.0.text().await?) + } +} + #[async_trait(?Send)] impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler for RemoteDbHandler<P> { - type DB = Remote<PermissionChecker<P>>; + type DB = Remote<PermissionChecker<P>, ReqwestClient>; async fn open( &self, @@ -162,13 +201,14 @@ impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler http2: true, }, )?; + let reqwest_client = ReqwestClient(client); let permissions = PermissionChecker { state: state.clone(), _permissions: PhantomData, }; - let remote = Remote::new(client, permissions, metadata_endpoint); + let remote = Remote::new(reqwest_client, permissions, metadata_endpoint); Ok(remote) } |