diff options
Diffstat (limited to 'ext/kv')
-rw-r--r-- | ext/kv/Cargo.toml | 1 | ||||
-rw-r--r-- | ext/kv/remote.rs | 39 |
2 files changed, 16 insertions, 24 deletions
diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml index 4556eb23c..e4249afea 100644 --- a/ext/kv/Cargo.toml +++ b/ext/kv/Cargo.toml @@ -29,7 +29,6 @@ denokv_remote.workspace = true denokv_sqlite.workspace = true faster-hex.workspace = true http.workspace = true -http-body-util.workspace = true log.workspace = true num-bigint.workspace = true prost.workspace = true diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs index 922853588..7541b5a06 100644 --- a/ext/kv/remote.rs +++ b/ext/kv/remote.rs @@ -12,8 +12,10 @@ use bytes::Bytes; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::Stream; +use deno_core::futures::TryStreamExt as _; use deno_core::OpState; use deno_fetch::create_http_client; +use deno_fetch::reqwest; use deno_fetch::CreateHttpClientOptions; use deno_tls::rustls::RootCertStore; use deno_tls::Proxy; @@ -23,7 +25,6 @@ use denokv_remote::MetadataEndpoint; use denokv_remote::Remote; use denokv_remote::RemoteResponse; use denokv_remote::RemoteTransport; -use http_body_util::BodyExt; use url::Url; #[derive(Clone)] @@ -108,43 +109,35 @@ impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions } #[derive(Clone)] -pub struct FetchClient(deno_fetch::Client); -pub struct FetchResponse(http::Response<deno_fetch::ResBody>); +pub struct ReqwestClient(reqwest::Client); +pub struct ReqwestResponse(reqwest::Response); -impl RemoteTransport for FetchClient { - type Response = FetchResponse; +impl RemoteTransport for ReqwestClient { + type Response = ReqwestResponse; async fn post( &self, url: Url, headers: http::HeaderMap, body: Bytes, ) -> Result<(Url, http::StatusCode, Self::Response), anyhow::Error> { - let body = http_body_util::Full::new(body) - .map_err(|never| match never {}) - .boxed(); - let mut req = http::Request::new(body); - *req.method_mut() = http::Method::POST; - *req.uri_mut() = url.as_str().parse()?; - *req.headers_mut() = headers; - - let res = self.0.clone().send(req).await?; + let res = self.0.post(url).headers(headers).body(body).send().await?; + let url = res.url().clone(); let status = res.status(); - Ok((url, status, FetchResponse(res))) + Ok((url, status, ReqwestResponse(res))) } } -impl RemoteResponse for FetchResponse { +impl RemoteResponse for ReqwestResponse { async fn bytes(self) -> Result<Bytes, anyhow::Error> { - Ok(self.0.collect().await?.to_bytes()) + Ok(self.0.bytes().await?) } fn stream( self, ) -> impl Stream<Item = Result<Bytes, anyhow::Error>> + Send + Sync { - self.0.into_body().into_data_stream() + self.0.bytes_stream().map_err(|e| e.into()) } async fn text(self) -> Result<String, anyhow::Error> { - let bytes = self.bytes().await?; - Ok(std::str::from_utf8(&bytes)?.into()) + Ok(self.0.text().await?) } } @@ -152,7 +145,7 @@ impl RemoteResponse for FetchResponse { impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler for RemoteDbHandler<P> { - type DB = Remote<PermissionChecker<P>, FetchClient>; + type DB = Remote<PermissionChecker<P>, ReqwestClient>; async fn open( &self, @@ -208,14 +201,14 @@ impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler http2: true, }, )?; - let fetch_client = FetchClient(client); + let reqwest_client = ReqwestClient(client); let permissions = PermissionChecker { state: state.clone(), _permissions: PhantomData, }; - let remote = Remote::new(fetch_client, permissions, metadata_endpoint); + let remote = Remote::new(reqwest_client, permissions, metadata_endpoint); Ok(remote) } |