summaryrefslogtreecommitdiff
path: root/ext/node/ops/http.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/ops/http.rs')
-rw-r--r--ext/node/ops/http.rs91
1 files changed, 52 insertions, 39 deletions
diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs
index 773902ded..69571078f 100644
--- a/ext/node/ops/http.rs
+++ b/ext/node/ops/http.rs
@@ -8,14 +8,12 @@ use std::task::Context;
use std::task::Poll;
use bytes::Bytes;
-use deno_core::anyhow;
-use deno_core::error::type_error;
-use deno_core::error::AnyError;
use deno_core::futures::stream::Peekable;
use deno_core::futures::Future;
use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
+use deno_core::futures::TryFutureExt;
use deno_core::op2;
use deno_core::serde::Serialize;
use deno_core::unsync::spawn;
@@ -33,6 +31,7 @@ use deno_core::Resource;
use deno_core::ResourceId;
use deno_fetch::get_or_create_client_from_state;
use deno_fetch::FetchCancelHandle;
+use deno_fetch::FetchError;
use deno_fetch::FetchRequestResource;
use deno_fetch::FetchReturn;
use deno_fetch::HttpClientResource;
@@ -59,12 +58,15 @@ pub fn op_node_http_request<P>(
#[serde] headers: Vec<(ByteString, ByteString)>,
#[smi] client_rid: Option<u32>,
#[smi] body: Option<ResourceId>,
-) -> Result<FetchReturn, AnyError>
+) -> Result<FetchReturn, FetchError>
where
P: crate::NodePermissions + 'static,
{
let client = if let Some(rid) = client_rid {
- let r = state.resource_table.get::<HttpClientResource>(rid)?;
+ let r = state
+ .resource_table
+ .get::<HttpClientResource>(rid)
+ .map_err(FetchError::Resource)?;
r.client.clone()
} else {
get_or_create_client_from_state(state)?
@@ -81,10 +83,8 @@ where
let mut header_map = HeaderMap::new();
for (key, value) in headers {
- let name = HeaderName::from_bytes(&key)
- .map_err(|err| type_error(err.to_string()))?;
- let v = HeaderValue::from_bytes(&value)
- .map_err(|err| type_error(err.to_string()))?;
+ let name = HeaderName::from_bytes(&key)?;
+ let v = HeaderValue::from_bytes(&value)?;
header_map.append(name, v);
}
@@ -92,7 +92,10 @@ where
let (body, con_len) = if let Some(body) = body {
(
BodyExt::boxed(NodeHttpResourceToBodyAdapter::new(
- state.resource_table.take_any(body)?,
+ state
+ .resource_table
+ .take_any(body)
+ .map_err(FetchError::Resource)?,
)),
None,
)
@@ -117,7 +120,7 @@ where
*request.uri_mut() = url
.as_str()
.parse()
- .map_err(|_| type_error("Invalid URL"))?;
+ .map_err(|_| FetchError::InvalidUrl(url.clone()))?;
*request.headers_mut() = header_map;
if let Some((username, password)) = maybe_authority {
@@ -136,9 +139,9 @@ where
let fut = async move {
client
.send(request)
+ .map_err(Into::into)
.or_cancel(cancel_handle_)
.await
- .map(|res| res.map_err(|err| type_error(err.to_string())))
};
let request_rid = state.resource_table.add(FetchRequestResource {
@@ -174,11 +177,12 @@ pub struct NodeHttpFetchResponse {
pub async fn op_node_http_fetch_send(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<NodeHttpFetchResponse, AnyError> {
+) -> Result<NodeHttpFetchResponse, FetchError> {
let request = state
.borrow_mut()
.resource_table
- .take::<FetchRequestResource>(rid)?;
+ .take::<FetchRequestResource>(rid)
+ .map_err(FetchError::Resource)?;
let request = Rc::try_unwrap(request)
.ok()
@@ -191,22 +195,23 @@ pub async fn op_node_http_fetch_send(
// If any error in the chain is a hyper body error, return that as a special result we can use to
// reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`).
// TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead
- let mut err_ref: &dyn std::error::Error = err.as_ref();
- while let Some(err) = std::error::Error::source(err_ref) {
- if let Some(err) = err.downcast_ref::<hyper::Error>() {
- if let Some(err) = std::error::Error::source(err) {
- return Ok(NodeHttpFetchResponse {
- error: Some(err.to_string()),
- ..Default::default()
- });
+
+ if let FetchError::ClientSend(err_src) = &err {
+ if let Some(client_err) = std::error::Error::source(&err_src.source) {
+ if let Some(err_src) = client_err.downcast_ref::<hyper::Error>() {
+ if let Some(err_src) = std::error::Error::source(err_src) {
+ return Ok(NodeHttpFetchResponse {
+ error: Some(err_src.to_string()),
+ ..Default::default()
+ });
+ }
}
}
- err_ref = err;
}
- return Err(type_error(err.to_string()));
+ return Err(err);
}
- Err(_) => return Err(type_error("request was cancelled")),
+ Err(_) => return Err(FetchError::RequestCanceled),
};
let status = res.status();
@@ -250,11 +255,12 @@ pub async fn op_node_http_fetch_send(
pub async fn op_node_http_fetch_response_upgrade(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<ResourceId, AnyError> {
+) -> Result<ResourceId, FetchError> {
let raw_response = state
.borrow_mut()
.resource_table
- .take::<NodeHttpFetchResponseResource>(rid)?;
+ .take::<NodeHttpFetchResponseResource>(rid)
+ .map_err(FetchError::Resource)?;
let raw_response = Rc::try_unwrap(raw_response)
.expect("Someone is holding onto NodeHttpFetchResponseResource");
@@ -277,7 +283,7 @@ pub async fn op_node_http_fetch_response_upgrade(
}
read_tx.write_all(&buf[..read]).await?;
}
- Ok::<_, AnyError>(())
+ Ok::<_, FetchError>(())
});
spawn(async move {
let mut buf = [0; 1024];
@@ -288,7 +294,7 @@ pub async fn op_node_http_fetch_response_upgrade(
}
upgraded_tx.write_all(&buf[..read]).await?;
}
- Ok::<_, AnyError>(())
+ Ok::<_, FetchError>(())
});
}
@@ -318,23 +324,26 @@ impl UpgradeStream {
}
}
- async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
+ async fn read(
+ self: Rc<Self>,
+ buf: &mut [u8],
+ ) -> Result<usize, std::io::Error> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let read = RcRef::map(self, |this| &this.read);
let mut read = read.borrow_mut().await;
- Ok(Pin::new(&mut *read).read(buf).await?)
+ Pin::new(&mut *read).read(buf).await
}
.try_or_cancel(cancel_handle)
.await
}
- async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
+ async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, std::io::Error> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let write = RcRef::map(self, |this| &this.write);
let mut write = write.borrow_mut().await;
- Ok(Pin::new(&mut *write).write(buf).await?)
+ Pin::new(&mut *write).write(buf).await
}
.try_or_cancel(cancel_handle)
.await
@@ -387,7 +396,7 @@ impl NodeHttpFetchResponseResource {
}
}
- pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, AnyError> {
+ pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, hyper::Error> {
let reader = self.response_reader.into_inner();
match reader {
NodeHttpFetchResponseReader::Start(resp) => {
@@ -445,7 +454,9 @@ impl Resource for NodeHttpFetchResponseResource {
// safely call `await` on it without creating a race condition.
Some(_) => match reader.as_mut().next().await.unwrap() {
Ok(chunk) => assert!(chunk.is_empty()),
- Err(err) => break Err(type_error(err.to_string())),
+ Err(err) => {
+ break Err(deno_core::error::type_error(err.to_string()))
+ }
},
None => break Ok(BufView::empty()),
}
@@ -453,7 +464,7 @@ impl Resource for NodeHttpFetchResponseResource {
};
let cancel_handle = RcRef::map(self, |r| &r.cancel);
- fut.try_or_cancel(cancel_handle).await
+ fut.try_or_cancel(cancel_handle).await.map_err(Into::into)
})
}
@@ -469,7 +480,9 @@ impl Resource for NodeHttpFetchResponseResource {
#[allow(clippy::type_complexity)]
pub struct NodeHttpResourceToBodyAdapter(
Rc<dyn Resource>,
- Option<Pin<Box<dyn Future<Output = Result<BufView, anyhow::Error>>>>>,
+ Option<
+ Pin<Box<dyn Future<Output = Result<BufView, deno_core::anyhow::Error>>>>,
+ >,
);
impl NodeHttpResourceToBodyAdapter {
@@ -485,7 +498,7 @@ unsafe impl Send for NodeHttpResourceToBodyAdapter {}
unsafe impl Sync for NodeHttpResourceToBodyAdapter {}
impl Stream for NodeHttpResourceToBodyAdapter {
- type Item = Result<Bytes, anyhow::Error>;
+ type Item = Result<Bytes, deno_core::anyhow::Error>;
fn poll_next(
self: Pin<&mut Self>,
@@ -515,7 +528,7 @@ impl Stream for NodeHttpResourceToBodyAdapter {
impl hyper::body::Body for NodeHttpResourceToBodyAdapter {
type Data = Bytes;
- type Error = anyhow::Error;
+ type Error = deno_core::anyhow::Error;
fn poll_frame(
self: Pin<&mut Self>,