diff options
Diffstat (limited to 'ext/fetch')
-rw-r--r-- | ext/fetch/26_fetch.js | 23 | ||||
-rw-r--r-- | ext/fetch/lib.rs | 62 |
2 files changed, 27 insertions, 58 deletions
diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 22baaf5c2..f15e7f6b9 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -73,24 +73,6 @@ return core.opAsync("op_fetch_send", rid); } - /** - * @param {number} rid - * @param {Uint8Array} body - * @returns {Promise<void>} - */ - function opFetchRequestWrite(rid, body) { - return core.opAsync("op_fetch_request_write", rid, body); - } - - /** - * @param {number} rid - * @param {Uint8Array} body - * @returns {Promise<number>} - */ - function opFetchResponseRead(rid, body) { - return core.opAsync("op_fetch_response_read", rid, body); - } - // A finalization registry to clean up underlying fetch resources that are GC'ed. const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => { core.tryClose(rid); @@ -120,7 +102,8 @@ // This is the largest possible size for a single packet on a TLS // stream. const chunk = new Uint8Array(16 * 1024 + 256); - const read = await opFetchResponseRead( + // TODO(@AaronO): switch to handle nulls if that's moved to core + const read = await core.read( responseBodyRid, chunk, ); @@ -260,7 +243,7 @@ } try { await PromisePrototypeCatch( - opFetchRequestWrite(requestBodyRid, value), + core.write(requestBodyRid, value), (err) => { if (terminator.aborted) return; throw err; diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 4bd62cd7c..5bae92c8e 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -13,6 +13,7 @@ use deno_core::op_async; use deno_core::op_sync; use deno_core::url::Url; use deno_core::AsyncRefCell; +use deno_core::AsyncResult; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -84,8 +85,6 @@ where .ops(vec![ ("op_fetch", op_sync(op_fetch::<FP, FH>)), ("op_fetch_send", op_async(op_fetch_send)), - ("op_fetch_request_write", op_async(op_fetch_request_write)), - ("op_fetch_response_read", op_async(op_fetch_response_read)), ( "op_fetch_custom_client", op_sync(op_fetch_custom_client::<FP>), @@ -420,42 +419,6 @@ pub async fn op_fetch_send( }) } -pub async fn op_fetch_request_write( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - data: ZeroCopyBuf, -) -> Result<(), AnyError> { - let buf = data.to_vec(); - - let resource = state - .borrow() - .resource_table - .get::<FetchRequestBodyResource>(rid)?; - let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - body.send(Ok(buf)).or_cancel(cancel).await?.map_err(|_| { - type_error("request body receiver not connected (request closed)") - })?; - - Ok(()) -} - -pub async fn op_fetch_response_read( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - data: ZeroCopyBuf, -) -> Result<usize, AnyError> { - let resource = state - .borrow() - .resource_table - .get::<FetchResponseBodyResource>(rid)?; - let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let mut buf = data.clone(); - let read = reader.read(&mut buf).try_or_cancel(cancel).await?; - Ok(read) -} - type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>; struct FetchRequestResource( @@ -490,6 +453,20 @@ impl Resource for FetchRequestBodyResource { "fetchRequestBody".into() } + fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { + Box::pin(async move { + let data = buf.to_vec(); + let len = data.len(); + let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + body.send(Ok(data)).or_cancel(cancel).await?.map_err(|_| { + type_error("request body receiver not connected (request closed)") + })?; + + Ok(len) + }) + } + fn close(self: Rc<Self>) { self.cancel.cancel() } @@ -508,6 +485,15 @@ impl Resource for FetchResponseBodyResource { "fetchResponseBody".into() } + fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> AsyncResult<usize> { + Box::pin(async move { + let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + let read = reader.read(&mut buf).try_or_cancel(cancel).await?; + Ok(read) + }) + } + fn close(self: Rc<Self>) { self.cancel.cancel() } |