From 375ce63c6390cf7710210ce22f14a2b5a02cbfc3 Mon Sep 17 00:00:00 2001 From: Aaron O'Mullan Date: Tue, 9 Nov 2021 19:26:17 +0100 Subject: feat(core): streams (#12596) This allows resources to be "streams" by implementing read/write/shutdown. These streams are implicit since their nature (read/write/duplex) isn't known until called, but we could easily add another method to explicitly tag resources as streams. `op_read/op_write/op_shutdown` are now builtin ops provided by `deno_core` Note: this current implementation is simple & straightforward but it results in an additional alloc per read/write call Closes #12556 --- ext/fetch/26_fetch.js | 23 +++---------------- ext/fetch/lib.rs | 62 ++++++++++++++++++++------------------------------- 2 files changed, 27 insertions(+), 58 deletions(-) (limited to 'ext/fetch') diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 22baaf5c2..f15e7f6b9 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -73,24 +73,6 @@ return core.opAsync("op_fetch_send", rid); } - /** - * @param {number} rid - * @param {Uint8Array} body - * @returns {Promise} - */ - function opFetchRequestWrite(rid, body) { - return core.opAsync("op_fetch_request_write", rid, body); - } - - /** - * @param {number} rid - * @param {Uint8Array} body - * @returns {Promise} - */ - function opFetchResponseRead(rid, body) { - return core.opAsync("op_fetch_response_read", rid, body); - } - // A finalization registry to clean up underlying fetch resources that are GC'ed. const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => { core.tryClose(rid); @@ -120,7 +102,8 @@ // This is the largest possible size for a single packet on a TLS // stream. const chunk = new Uint8Array(16 * 1024 + 256); - const read = await opFetchResponseRead( + // TODO(@AaronO): switch to handle nulls if that's moved to core + const read = await core.read( responseBodyRid, chunk, ); @@ -260,7 +243,7 @@ } try { await PromisePrototypeCatch( - opFetchRequestWrite(requestBodyRid, value), + core.write(requestBodyRid, value), (err) => { if (terminator.aborted) return; throw err; diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 4bd62cd7c..5bae92c8e 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -13,6 +13,7 @@ use deno_core::op_async; use deno_core::op_sync; use deno_core::url::Url; use deno_core::AsyncRefCell; +use deno_core::AsyncResult; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -84,8 +85,6 @@ where .ops(vec![ ("op_fetch", op_sync(op_fetch::)), ("op_fetch_send", op_async(op_fetch_send)), - ("op_fetch_request_write", op_async(op_fetch_request_write)), - ("op_fetch_response_read", op_async(op_fetch_response_read)), ( "op_fetch_custom_client", op_sync(op_fetch_custom_client::), @@ -420,42 +419,6 @@ pub async fn op_fetch_send( }) } -pub async fn op_fetch_request_write( - state: Rc>, - rid: ResourceId, - data: ZeroCopyBuf, -) -> Result<(), AnyError> { - let buf = data.to_vec(); - - let resource = state - .borrow() - .resource_table - .get::(rid)?; - let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - body.send(Ok(buf)).or_cancel(cancel).await?.map_err(|_| { - type_error("request body receiver not connected (request closed)") - })?; - - Ok(()) -} - -pub async fn op_fetch_response_read( - state: Rc>, - rid: ResourceId, - data: ZeroCopyBuf, -) -> Result { - let resource = state - .borrow() - .resource_table - .get::(rid)?; - let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; - let cancel = RcRef::map(resource, |r| &r.cancel); - let mut buf = data.clone(); - let read = reader.read(&mut buf).try_or_cancel(cancel).await?; - Ok(read) -} - type CancelableResponseResult = Result, Canceled>; struct FetchRequestResource( @@ -490,6 +453,20 @@ impl Resource for FetchRequestBodyResource { "fetchRequestBody".into() } + fn write(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + Box::pin(async move { + let data = buf.to_vec(); + let len = data.len(); + let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + body.send(Ok(data)).or_cancel(cancel).await?.map_err(|_| { + type_error("request body receiver not connected (request closed)") + })?; + + Ok(len) + }) + } + fn close(self: Rc) { self.cancel.cancel() } @@ -508,6 +485,15 @@ impl Resource for FetchResponseBodyResource { "fetchResponseBody".into() } + fn read(self: Rc, mut buf: ZeroCopyBuf) -> AsyncResult { + Box::pin(async move { + let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + let read = reader.read(&mut buf).try_or_cancel(cancel).await?; + Ok(read) + }) + } + fn close(self: Rc) { self.cancel.cancel() } -- cgit v1.2.3