diff options
author | Luca Casonato <lucacasonato@yahoo.com> | 2021-01-10 20:54:29 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-10 20:54:29 +0100 |
commit | 1a6ce29f3d11e5f0d0d022914e3f9fbcfa487294 (patch) | |
tree | 9e1940a9a7a7392e6818341eea67becfa26ebbfa /op_crates/fetch/lib.rs | |
parent | 2c1f74402c00a2975cdaf9199b6487e5fd8175ba (diff) |
feat(fetch): req streaming + 0-copy resp streaming (#9036)
* feat(fetch): req streaming + 0-copy resp streaming
* lint
* lint
* fix test
* rm test.js
* explicitly use CancelHandle::default()
* Apply review suggestions
Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl>
* fix test
* Merge remote-tracking branch 'origin/master' into fetch_real_streaming
* fix test
* retrigger ci
Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl>
Diffstat (limited to 'op_crates/fetch/lib.rs')
-rw-r--r-- | op_crates/fetch/lib.rs | 199 |
1 files changed, 160 insertions, 39 deletions
diff --git a/op_crates/fetch/lib.rs b/op_crates/fetch/lib.rs index c2e458d89..0a2623c34 100644 --- a/op_crates/fetch/lib.rs +++ b/op_crates/fetch/lib.rs @@ -5,6 +5,9 @@ use deno_core::error::bad_resource_id; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::Future; +use deno_core::futures::Stream; +use deno_core::futures::StreamExt; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; @@ -22,6 +25,7 @@ use deno_core::ZeroCopyBuf; use reqwest::header::HeaderName; use reqwest::header::HeaderValue; use reqwest::redirect::Policy; +use reqwest::Body; use reqwest::Client; use reqwest::Method; use reqwest::Response; @@ -32,7 +36,12 @@ use std::convert::From; use std::fs::File; use std::io::Read; use std::path::PathBuf; +use std::pin::Pin; use std::rc::Rc; +use tokio::io::stream_reader; +use tokio::io::AsyncReadExt; +use tokio::io::StreamReader; +use tokio::sync::mpsc; pub use reqwest; // Re-export reqwest @@ -87,10 +96,10 @@ pub fn get_declaration() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_fetch.d.ts") } -pub async fn op_fetch<FP>( - state: Rc<RefCell<OpState>>, +pub fn op_fetch<FP>( + state: &mut OpState, args: Value, - data: BufVec, + data: &mut [ZeroCopyBuf], ) -> Result<Value, AnyError> where FP: FetchPermissions + 'static, @@ -103,20 +112,19 @@ where base_url: Option<String>, headers: Vec<(String, String)>, client_rid: Option<u32>, + has_body: bool, } let args: FetchArgs = serde_json::from_value(args)?; let client = if let Some(rid) = args.client_rid { - let state_ = state.borrow(); - let r = state_ + let r = state .resource_table .get::<HttpClientResource>(rid) .ok_or_else(bad_resource_id)?; r.client.clone() } else { - let state_ = state.borrow(); - let client = state_.borrow::<reqwest::Client>(); + let client = state.borrow::<reqwest::Client>(); client.clone() }; @@ -139,28 +147,79 @@ where return Err(type_error(format!("scheme '{}' not supported", scheme))); } - { - let state_ = state.borrow(); - let permissions = state_.borrow::<FP>(); - permissions.check_net_url(&url)?; - } + let permissions = state.borrow::<FP>(); + permissions.check_net_url(&url)?; let mut request = client.request(method, url); - match data.len() { - 0 => {} - 1 => request = request.body(Vec::from(&*data[0])), - _ => panic!("Invalid number of arguments"), - } + let maybe_request_body_rid = if args.has_body { + match data.len() { + 0 => { + // If no body is passed, we return a writer for streaming the body. + let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1); + request = request.body(Body::wrap_stream(rx)); + + let request_body_rid = + state.resource_table.add(FetchRequestBodyResource { + body: AsyncRefCell::new(tx), + cancel: CancelHandle::default(), + }); + + Some(request_body_rid) + } + 1 => { + // If a body is passed, we use it, and don't return a body for streaming. + request = request.body(Vec::from(&*data[0])); + None + } + _ => panic!("Invalid number of arguments"), + } + } else { + None + }; for (key, value) in args.headers { let name = HeaderName::from_bytes(key.as_bytes()).unwrap(); let v = HeaderValue::from_str(&value).unwrap(); request = request.header(name, v); } - //debug!("Before fetch {}", url); - let res = match request.send().await { + let fut = request.send(); + + let request_rid = state + .resource_table + .add(FetchRequestResource(Box::pin(fut))); + + Ok(json!({ + "requestRid": request_rid, + "requestBodyRid": maybe_request_body_rid + })) +} + +pub async fn op_fetch_send( + state: Rc<RefCell<OpState>>, + args: Value, + _data: BufVec, +) -> Result<Value, AnyError> { + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct Args { + rid: u32, + } + + let args: Args = serde_json::from_value(args)?; + + let request = state + .borrow_mut() + .resource_table + .take::<FetchRequestResource>(args.rid) + .ok_or_else(bad_resource_id)?; + + let request = Rc::try_unwrap(request) + .ok() + .expect("multiple op_fetch_send ongoing"); + + let res = match request.0.await { Ok(res) => res, Err(e) => return Err(type_error(e.to_string())), }; @@ -185,23 +244,61 @@ where } } - let rid = state.borrow_mut().resource_table.add(HttpBodyResource { - response: AsyncRefCell::new(res), - cancel: Default::default(), - }); + let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let stream_reader = stream_reader(stream); + let rid = state + .borrow_mut() + .resource_table + .add(FetchResponseBodyResource { + reader: AsyncRefCell::new(stream_reader), + cancel: CancelHandle::default(), + }); Ok(json!({ - "bodyRid": rid, "status": status.as_u16(), "statusText": status.canonical_reason().unwrap_or(""), - "headers": res_headers + "headers": res_headers, + "responseRid": rid, })) } -pub async fn op_fetch_read( +pub async fn op_fetch_request_write( state: Rc<RefCell<OpState>>, args: Value, - _data: BufVec, + data: BufVec, +) -> Result<Value, AnyError> { + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct Args { + rid: u32, + } + + let args: Args = serde_json::from_value(args)?; + let rid = args.rid; + + let buf = match data.len() { + 1 => Vec::from(&*data[0]), + _ => panic!("Invalid number of arguments"), + }; + + let resource = state + .borrow() + .resource_table + .get::<FetchRequestBodyResource>(rid as u32) + .ok_or_else(bad_resource_id)?; + let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + body.send(Ok(buf)).or_cancel(cancel).await??; + + Ok(json!({})) +} + +pub async fn op_fetch_response_read( + state: Rc<RefCell<OpState>>, + args: Value, + data: BufVec, ) -> Result<Value, AnyError> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -212,30 +309,54 @@ pub async fn op_fetch_read( let args: Args = serde_json::from_value(args)?; let rid = args.rid; + if data.len() != 1 { + panic!("Invalid number of arguments"); + } + let resource = state .borrow() .resource_table - .get::<HttpBodyResource>(rid as u32) + .get::<FetchResponseBodyResource>(rid as u32) .ok_or_else(bad_resource_id)?; - let mut response = RcRef::map(&resource, |r| &r.response).borrow_mut().await; + let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; let cancel = RcRef::map(resource, |r| &r.cancel); - let maybe_chunk = response.chunk().or_cancel(cancel).await??; - if let Some(chunk) = maybe_chunk { - // TODO(ry) This is terribly inefficient. Make this zero-copy. - Ok(json!({ "chunk": &*chunk })) - } else { - Ok(json!({ "chunk": null })) + let mut buf = data[0].clone(); + let read = reader.read(&mut buf).or_cancel(cancel).await??; + Ok(json!({ "read": read })) +} + +struct FetchRequestResource( + Pin<Box<dyn Future<Output = Result<Response, reqwest::Error>>>>, +); + +impl Resource for FetchRequestResource { + fn name(&self) -> Cow<str> { + "fetchRequest".into() } } -struct HttpBodyResource { - response: AsyncRefCell<Response>, +struct FetchRequestBodyResource { + body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>, + cancel: CancelHandle, +} + +impl Resource for FetchRequestBodyResource { + fn name(&self) -> Cow<str> { + "fetchRequestBody".into() + } +} + +type BytesStream = + Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; + +struct FetchResponseBodyResource { + reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, cancel: CancelHandle, } -impl Resource for HttpBodyResource { +impl Resource for FetchResponseBodyResource { fn name(&self) -> Cow<str> { - "httpBody".into() + "fetchResponseBody".into() } } |