diff options
author | Luca Casonato <lucacasonato@yahoo.com> | 2021-01-10 20:54:29 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-10 20:54:29 +0100 |
commit | 1a6ce29f3d11e5f0d0d022914e3f9fbcfa487294 (patch) | |
tree | 9e1940a9a7a7392e6818341eea67becfa26ebbfa /op_crates | |
parent | 2c1f74402c00a2975cdaf9199b6487e5fd8175ba (diff) |
feat(fetch): req streaming + 0-copy resp streaming (#9036)
* feat(fetch): req streaming + 0-copy resp streaming
* lint
* lint
* fix test
* rm test.js
* explicitly use CancelHandle::default()
* Apply review suggestions
Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl>
* fix test
* Merge remote-tracking branch 'origin/master' into fetch_real_streaming
* fix test
* retrigger ci
Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl>
Diffstat (limited to 'op_crates')
-rw-r--r-- | op_crates/fetch/26_fetch.js | 88 | ||||
-rw-r--r-- | op_crates/fetch/Cargo.toml | 3 | ||||
-rw-r--r-- | op_crates/fetch/lib.rs | 199 |
3 files changed, 225 insertions, 65 deletions
diff --git a/op_crates/fetch/26_fetch.js b/op_crates/fetch/26_fetch.js index 0d405d4ec..2d50f1142 100644 --- a/op_crates/fetch/26_fetch.js +++ b/op_crates/fetch/26_fetch.js @@ -897,8 +897,20 @@ if (body != null) { zeroCopy = new Uint8Array(body.buffer, body.byteOffset, body.byteLength); } + return core.jsonOpSync("op_fetch", args, ...(zeroCopy ? [zeroCopy] : [])); + } - return core.jsonOpAsync("op_fetch", args, ...(zeroCopy ? [zeroCopy] : [])); + function opFetchSend(args) { + return core.jsonOpAsync("op_fetch_send", args); + } + + function opFetchRequestWrite(args, body) { + const zeroCopy = new Uint8Array( + body.buffer, + body.byteOffset, + body.byteLength, + ); + return core.jsonOpAsync("op_fetch_request_write", args, zeroCopy); } const NULL_BODY_STATUS = [101, 204, 205, 304]; @@ -1184,19 +1196,41 @@ baseUrl = href; } - function sendFetchReq(url, method, headers, body, clientRid) { + async function sendFetchReq(url, method, headers, body, clientRid) { let headerArray = []; if (headers) { headerArray = Array.from(headers.entries()); } - return opFetch({ - method, - url, - baseUrl, - headers: headerArray, - clientRid, - }, body); + const { requestRid, requestBodyRid } = opFetch( + { + method, + url, + baseUrl, + headers: headerArray, + clientRid, + hasBody: !!body, + }, + body instanceof Uint8Array ? body : undefined, + ); + if (requestBodyRid) { + const writer = new WritableStream({ + async write(chunk, controller) { + try { + await opFetchRequestWrite({ rid: requestBodyRid }, chunk); + } catch (err) { + controller.error(err); + controller.close(); + } + }, + close() { + core.close(requestBodyRid); + }, + }); + body.pipeTo(writer); + } + + return await opFetchSend({ rid: requestRid }); } async function fetch(input, init) { @@ -1253,13 +1287,8 @@ ); body = multipartBuilder.getBody(); contentType = multipartBuilder.getContentType(); - } else { - // TODO(lucacasonato): do this in a streaming fashion once we support it - const buf = new Buffer(); - for await (const chunk of init.body) { - buf.write(chunk); - } - body = buf.bytes(); + } else if (init.body instanceof ReadableStream) { + body = init.body; } if (contentType && !headers.has("content-type")) { headers.set("content-type", contentType); @@ -1275,8 +1304,8 @@ method = input.method; headers = input.headers; - if (input._bodySource) { - body = new DataView(await input.arrayBuffer()); + if (input.body) { + body = input.body; } } @@ -1290,7 +1319,7 @@ body, clientRid, ); - const rid = fetchResponse.bodyRid; + const rid = fetchResponse.responseRid; if ( NULL_BODY_STATUS.includes(fetchResponse.status) || @@ -1298,21 +1327,28 @@ ) { // We won't use body of received response, so close it now // otherwise it will be kept in resource table. - core.close(fetchResponse.bodyRid); + core.close(rid); responseBody = null; } else { responseBody = new ReadableStream({ type: "bytes", async pull(controller) { try { - const result = await core.jsonOpAsync("op_fetch_read", { rid }); - if (!result || !result.chunk) { + const chunk = new Uint8Array(16 * 1024 + 256); + const { read } = await core.jsonOpAsync( + "op_fetch_response_read", + { rid }, + chunk, + ); + if (read != 0) { + if (chunk.length == read) { + controller.enqueue(chunk); + } else { + controller.enqueue(chunk.subarray(0, read)); + } + } else { controller.close(); core.close(rid); - } else { - // TODO(ry) This is terribly inefficient. Make this zero-copy. - const chunk = new Uint8Array(result.chunk); - controller.enqueue(chunk); } } catch (e) { controller.error(e); diff --git a/op_crates/fetch/Cargo.toml b/op_crates/fetch/Cargo.toml index 2890bb349..855695475 100644 --- a/op_crates/fetch/Cargo.toml +++ b/op_crates/fetch/Cargo.toml @@ -15,5 +15,8 @@ path = "lib.rs" [dependencies] deno_core = { version = "0.75.0", path = "../../core" } + +bytes = "0.5.6" reqwest = { version = "0.10.8", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli"] } serde = { version = "1.0.116", features = ["derive"] } +tokio = { version = "0.2.22", features = ["full"] }
\ No newline at end of file diff --git a/op_crates/fetch/lib.rs b/op_crates/fetch/lib.rs index c2e458d89..0a2623c34 100644 --- a/op_crates/fetch/lib.rs +++ b/op_crates/fetch/lib.rs @@ -5,6 +5,9 @@ use deno_core::error::bad_resource_id; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::Future; +use deno_core::futures::Stream; +use deno_core::futures::StreamExt; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; @@ -22,6 +25,7 @@ use deno_core::ZeroCopyBuf; use reqwest::header::HeaderName; use reqwest::header::HeaderValue; use reqwest::redirect::Policy; +use reqwest::Body; use reqwest::Client; use reqwest::Method; use reqwest::Response; @@ -32,7 +36,12 @@ use std::convert::From; use std::fs::File; use std::io::Read; use std::path::PathBuf; +use std::pin::Pin; use std::rc::Rc; +use tokio::io::stream_reader; +use tokio::io::AsyncReadExt; +use tokio::io::StreamReader; +use tokio::sync::mpsc; pub use reqwest; // Re-export reqwest @@ -87,10 +96,10 @@ pub fn get_declaration() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_fetch.d.ts") } -pub async fn op_fetch<FP>( - state: Rc<RefCell<OpState>>, +pub fn op_fetch<FP>( + state: &mut OpState, args: Value, - data: BufVec, + data: &mut [ZeroCopyBuf], ) -> Result<Value, AnyError> where FP: FetchPermissions + 'static, @@ -103,20 +112,19 @@ where base_url: Option<String>, headers: Vec<(String, String)>, client_rid: Option<u32>, + has_body: bool, } let args: FetchArgs = serde_json::from_value(args)?; let client = if let Some(rid) = args.client_rid { - let state_ = state.borrow(); - let r = state_ + let r = state .resource_table .get::<HttpClientResource>(rid) .ok_or_else(bad_resource_id)?; r.client.clone() } else { - let state_ = state.borrow(); - let client = state_.borrow::<reqwest::Client>(); + let client = state.borrow::<reqwest::Client>(); client.clone() }; @@ -139,28 +147,79 @@ where return Err(type_error(format!("scheme '{}' not supported", scheme))); } - { - let state_ = state.borrow(); - let permissions = state_.borrow::<FP>(); - permissions.check_net_url(&url)?; - } + let permissions = state.borrow::<FP>(); + permissions.check_net_url(&url)?; let mut request = client.request(method, url); - match data.len() { - 0 => {} - 1 => request = request.body(Vec::from(&*data[0])), - _ => panic!("Invalid number of arguments"), - } + let maybe_request_body_rid = if args.has_body { + match data.len() { + 0 => { + // If no body is passed, we return a writer for streaming the body. + let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1); + request = request.body(Body::wrap_stream(rx)); + + let request_body_rid = + state.resource_table.add(FetchRequestBodyResource { + body: AsyncRefCell::new(tx), + cancel: CancelHandle::default(), + }); + + Some(request_body_rid) + } + 1 => { + // If a body is passed, we use it, and don't return a body for streaming. + request = request.body(Vec::from(&*data[0])); + None + } + _ => panic!("Invalid number of arguments"), + } + } else { + None + }; for (key, value) in args.headers { let name = HeaderName::from_bytes(key.as_bytes()).unwrap(); let v = HeaderValue::from_str(&value).unwrap(); request = request.header(name, v); } - //debug!("Before fetch {}", url); - let res = match request.send().await { + let fut = request.send(); + + let request_rid = state + .resource_table + .add(FetchRequestResource(Box::pin(fut))); + + Ok(json!({ + "requestRid": request_rid, + "requestBodyRid": maybe_request_body_rid + })) +} + +pub async fn op_fetch_send( + state: Rc<RefCell<OpState>>, + args: Value, + _data: BufVec, +) -> Result<Value, AnyError> { + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct Args { + rid: u32, + } + + let args: Args = serde_json::from_value(args)?; + + let request = state + .borrow_mut() + .resource_table + .take::<FetchRequestResource>(args.rid) + .ok_or_else(bad_resource_id)?; + + let request = Rc::try_unwrap(request) + .ok() + .expect("multiple op_fetch_send ongoing"); + + let res = match request.0.await { Ok(res) => res, Err(e) => return Err(type_error(e.to_string())), }; @@ -185,23 +244,61 @@ where } } - let rid = state.borrow_mut().resource_table.add(HttpBodyResource { - response: AsyncRefCell::new(res), - cancel: Default::default(), - }); + let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let stream_reader = stream_reader(stream); + let rid = state + .borrow_mut() + .resource_table + .add(FetchResponseBodyResource { + reader: AsyncRefCell::new(stream_reader), + cancel: CancelHandle::default(), + }); Ok(json!({ - "bodyRid": rid, "status": status.as_u16(), "statusText": status.canonical_reason().unwrap_or(""), - "headers": res_headers + "headers": res_headers, + "responseRid": rid, })) } -pub async fn op_fetch_read( +pub async fn op_fetch_request_write( state: Rc<RefCell<OpState>>, args: Value, - _data: BufVec, + data: BufVec, +) -> Result<Value, AnyError> { + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct Args { + rid: u32, + } + + let args: Args = serde_json::from_value(args)?; + let rid = args.rid; + + let buf = match data.len() { + 1 => Vec::from(&*data[0]), + _ => panic!("Invalid number of arguments"), + }; + + let resource = state + .borrow() + .resource_table + .get::<FetchRequestBodyResource>(rid as u32) + .ok_or_else(bad_resource_id)?; + let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + body.send(Ok(buf)).or_cancel(cancel).await??; + + Ok(json!({})) +} + +pub async fn op_fetch_response_read( + state: Rc<RefCell<OpState>>, + args: Value, + data: BufVec, ) -> Result<Value, AnyError> { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -212,30 +309,54 @@ pub async fn op_fetch_read( let args: Args = serde_json::from_value(args)?; let rid = args.rid; + if data.len() != 1 { + panic!("Invalid number of arguments"); + } + let resource = state .borrow() .resource_table - .get::<HttpBodyResource>(rid as u32) + .get::<FetchResponseBodyResource>(rid as u32) .ok_or_else(bad_resource_id)?; - let mut response = RcRef::map(&resource, |r| &r.response).borrow_mut().await; + let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; let cancel = RcRef::map(resource, |r| &r.cancel); - let maybe_chunk = response.chunk().or_cancel(cancel).await??; - if let Some(chunk) = maybe_chunk { - // TODO(ry) This is terribly inefficient. Make this zero-copy. - Ok(json!({ "chunk": &*chunk })) - } else { - Ok(json!({ "chunk": null })) + let mut buf = data[0].clone(); + let read = reader.read(&mut buf).or_cancel(cancel).await??; + Ok(json!({ "read": read })) +} + +struct FetchRequestResource( + Pin<Box<dyn Future<Output = Result<Response, reqwest::Error>>>>, +); + +impl Resource for FetchRequestResource { + fn name(&self) -> Cow<str> { + "fetchRequest".into() } } -struct HttpBodyResource { - response: AsyncRefCell<Response>, +struct FetchRequestBodyResource { + body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>, + cancel: CancelHandle, +} + +impl Resource for FetchRequestBodyResource { + fn name(&self) -> Cow<str> { + "fetchRequestBody".into() + } +} + +type BytesStream = + Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; + +struct FetchResponseBodyResource { + reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, cancel: CancelHandle, } -impl Resource for HttpBodyResource { +impl Resource for FetchResponseBodyResource { fn name(&self) -> Cow<str> { - "httpBody".into() + "fetchResponseBody".into() } } |