summaryrefslogtreecommitdiff
path: root/cli
diff options
context:
space:
mode:
Diffstat (limited to 'cli')
-rw-r--r--cli/ops/fetch.rs62
-rw-r--r--cli/rt/11_streams.js10
-rw-r--r--cli/rt/26_fetch.js20
3 files changed, 64 insertions, 28 deletions
diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs
index 2b58ad217..be11955cf 100644
--- a/cli/ops/fetch.rs
+++ b/cli/ops/fetch.rs
@@ -1,9 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use super::io::StreamResource;
-use super::io::StreamResourceHolder;
use crate::http_util::create_http_client;
-use crate::http_util::HttpBody;
use deno_core::error::bad_resource_id;
use deno_core::error::type_error;
use deno_core::error::AnyError;
@@ -15,6 +12,7 @@ use http::header::HeaderName;
use http::header::HeaderValue;
use http::Method;
use reqwest::Client;
+use reqwest::Response;
use serde::Deserialize;
use serde_json::Value;
use std::cell::RefCell;
@@ -24,6 +22,7 @@ use std::rc::Rc;
pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_async(rt, "op_fetch", op_fetch);
+ super::reg_json_async(rt, "op_fetch_read", op_fetch_read);
super::reg_json_sync(rt, "op_create_http_client", op_create_http_client);
}
@@ -96,13 +95,10 @@ async fn op_fetch(
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}
- let body = HttpBody::from(res);
- let rid = state.borrow_mut().resource_table.add(
- "httpBody",
- Box::new(StreamResourceHolder::new(StreamResource::HttpBody(
- Box::new(body),
- ))),
- );
+ let rid = state
+ .borrow_mut()
+ .resource_table
+ .add("httpBody", Box::new(res));
Ok(json!({
"bodyRid": rid,
@@ -112,6 +108,52 @@ async fn op_fetch(
}))
}
+async fn op_fetch_read(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _data: BufVec,
+) -> Result<Value, AnyError> {
+ #[derive(Deserialize)]
+ #[serde(rename_all = "camelCase")]
+ struct Args {
+ rid: u32,
+ }
+
+ let args: Args = serde_json::from_value(args)?;
+ let rid = args.rid;
+
+ use futures::future::poll_fn;
+ use futures::ready;
+ use futures::FutureExt;
+ let f = poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let response = state
+ .resource_table
+ .get_mut::<Response>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut chunk_fut = response.chunk().boxed_local();
+ let r = ready!(chunk_fut.poll_unpin(cx))?;
+ if let Some(chunk) = r {
+ Ok(json!({ "chunk": &*chunk })).into()
+ } else {
+ Ok(json!({ "chunk": null })).into()
+ }
+ });
+ f.await
+ /*
+ // I'm programming this as I want it to be programmed, even though it might be
+ // incorrect, normally we would use poll_fn here. We need to make this await pattern work.
+ let chunk = response.chunk().await?;
+ if let Some(chunk) = chunk {
+ // TODO(ry) This is terribly inefficient. Make this zero-copy.
+ Ok(json!({ "chunk": &*chunk }))
+ } else {
+ Ok(json!({ "chunk": null }))
+ }
+ */
+}
+
struct HttpClientResource {
client: Client,
}
diff --git a/cli/rt/11_streams.js b/cli/rt/11_streams.js
index e5a5732e5..7f8af19e2 100644
--- a/cli/rt/11_streams.js
+++ b/cli/rt/11_streams.js
@@ -388,7 +388,7 @@
let { highWaterMark } = strategy;
const { type } = underlyingSource;
- if (isUnderlyingByteSource(underlyingSource)) {
+ if (underlyingSource.type == "bytes") {
if (size !== undefined) {
throw new RangeError(
`When underlying source is "bytes", strategy.size must be undefined.`,
@@ -1226,14 +1226,6 @@
);
}
- function isUnderlyingByteSource(
- underlyingSource,
- ) {
- const { type } = underlyingSource;
- const typeString = String(type);
- return typeString === "bytes";
- }
-
function isWritableStream(x) {
return !(
typeof x !== "object" ||
diff --git a/cli/rt/26_fetch.js b/cli/rt/26_fetch.js
index 1c1d8c78f..ca36d7ba4 100644
--- a/cli/rt/26_fetch.js
+++ b/cli/rt/26_fetch.js
@@ -5,7 +5,6 @@
const { notImplemented } = window.__bootstrap.util;
const { getHeaderValueParams, isTypedArray } = window.__bootstrap.webUtil;
const { Blob, bytesSymbol: blobBytesSymbol } = window.__bootstrap.blob;
- const { read } = window.__bootstrap.io;
const { close } = window.__bootstrap.resources;
const Body = window.__bootstrap.body;
const { ReadableStream } = window.__bootstrap.streams;
@@ -283,6 +282,7 @@
body,
clientRid,
);
+ const rid = fetchResponse.bodyRid;
if (
NULL_BODY_STATUS.includes(fetchResponse.status) ||
@@ -294,25 +294,27 @@
responseBody = null;
} else {
responseBody = new ReadableStream({
+ type: "bytes",
async pull(controller) {
try {
- const b = new Uint8Array(1024 * 32);
- const result = await read(fetchResponse.bodyRid, b);
- if (result === null) {
+ const result = await core.jsonOpAsync("op_fetch_read", { rid });
+ if (!result || !result.chunk) {
controller.close();
- return close(fetchResponse.bodyRid);
+ close(rid);
+ } else {
+ // TODO(ry) This is terribly inefficient. Make this zero-copy.
+ const chunk = new Uint8Array(result.chunk);
+ controller.enqueue(chunk);
}
-
- controller.enqueue(b.subarray(0, result));
} catch (e) {
controller.error(e);
controller.close();
- close(fetchResponse.bodyRid);
+ close(rid);
}
},
cancel() {
// When reader.cancel() is called
- close(fetchResponse.bodyRid);
+ close(rid);
},
});
}