summaryrefslogtreecommitdiff
path: root/ext/fetch
diff options
context:
space:
mode:
Diffstat (limited to 'ext/fetch')
-rw-r--r--ext/fetch/26_fetch.js23
-rw-r--r--ext/fetch/lib.rs62
2 files changed, 27 insertions, 58 deletions
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js
index 22baaf5c2..f15e7f6b9 100644
--- a/ext/fetch/26_fetch.js
+++ b/ext/fetch/26_fetch.js
@@ -73,24 +73,6 @@
return core.opAsync("op_fetch_send", rid);
}
- /**
- * @param {number} rid
- * @param {Uint8Array} body
- * @returns {Promise<void>}
- */
- function opFetchRequestWrite(rid, body) {
- return core.opAsync("op_fetch_request_write", rid, body);
- }
-
- /**
- * @param {number} rid
- * @param {Uint8Array} body
- * @returns {Promise<number>}
- */
- function opFetchResponseRead(rid, body) {
- return core.opAsync("op_fetch_response_read", rid, body);
- }
-
// A finalization registry to clean up underlying fetch resources that are GC'ed.
const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => {
core.tryClose(rid);
@@ -120,7 +102,8 @@
// This is the largest possible size for a single packet on a TLS
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
- const read = await opFetchResponseRead(
+ // TODO(@AaronO): switch to handle nulls if that's moved to core
+ const read = await core.read(
responseBodyRid,
chunk,
);
@@ -260,7 +243,7 @@
}
try {
await PromisePrototypeCatch(
- opFetchRequestWrite(requestBodyRid, value),
+ core.write(requestBodyRid, value),
(err) => {
if (terminator.aborted) return;
throw err;
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index 4bd62cd7c..5bae92c8e 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -13,6 +13,7 @@ use deno_core::op_async;
use deno_core::op_sync;
use deno_core::url::Url;
use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@@ -84,8 +85,6 @@ where
.ops(vec![
("op_fetch", op_sync(op_fetch::<FP, FH>)),
("op_fetch_send", op_async(op_fetch_send)),
- ("op_fetch_request_write", op_async(op_fetch_request_write)),
- ("op_fetch_response_read", op_async(op_fetch_response_read)),
(
"op_fetch_custom_client",
op_sync(op_fetch_custom_client::<FP>),
@@ -420,42 +419,6 @@ pub async fn op_fetch_send(
})
}
-pub async fn op_fetch_request_write(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: ZeroCopyBuf,
-) -> Result<(), AnyError> {
- let buf = data.to_vec();
-
- let resource = state
- .borrow()
- .resource_table
- .get::<FetchRequestBodyResource>(rid)?;
- let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
- let cancel = RcRef::map(resource, |r| &r.cancel);
- body.send(Ok(buf)).or_cancel(cancel).await?.map_err(|_| {
- type_error("request body receiver not connected (request closed)")
- })?;
-
- Ok(())
-}
-
-pub async fn op_fetch_response_read(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: ZeroCopyBuf,
-) -> Result<usize, AnyError> {
- let resource = state
- .borrow()
- .resource_table
- .get::<FetchResponseBodyResource>(rid)?;
- let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
- let cancel = RcRef::map(resource, |r| &r.cancel);
- let mut buf = data.clone();
- let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
- Ok(read)
-}
-
type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>;
struct FetchRequestResource(
@@ -490,6 +453,20 @@ impl Resource for FetchRequestBodyResource {
"fetchRequestBody".into()
}
+ fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ Box::pin(async move {
+ let data = buf.to_vec();
+ let len = data.len();
+ let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ body.send(Ok(data)).or_cancel(cancel).await?.map_err(|_| {
+ type_error("request body receiver not connected (request closed)")
+ })?;
+
+ Ok(len)
+ })
+ }
+
fn close(self: Rc<Self>) {
self.cancel.cancel()
}
@@ -508,6 +485,15 @@ impl Resource for FetchResponseBodyResource {
"fetchResponseBody".into()
}
+ fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ Box::pin(async move {
+ let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
+ Ok(read)
+ })
+ }
+
fn close(self: Rc<Self>) {
self.cancel.cancel()
}