diff options
author | Luca Casonato <hello@lcas.dev> | 2022-09-30 07:54:12 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-30 07:54:12 +0200 |
commit | 20c7300412bdb487fc758577d6256bbcf96efd12 (patch) | |
tree | 2dcd218a6095a2ad143fb27e304391b5fe64cf27 /ext/http/lib.rs | |
parent | 38f544538b337074cbce317e67859a69bb23684c (diff) |
refactor(ext/http): remove op_http_read (#16096)
We can use Resource::read_return & op_read instead. This allows HTTP
request bodies to participate in FastStream.
To make this work, `readableStreamForRid` required a change to allow non
auto-closing resources to be handled. This required some minor changes
in our FastStream paths in ext/http and ext/flash.
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 102 |
1 files changed, 52 insertions, 50 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index d1b38fb42..bffe3c3d5 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -78,7 +78,6 @@ pub fn init() -> Extension { )) .ops(vec![ op_http_accept::decl(), - op_http_read::decl(), op_http_write_headers::decl(), op_http_headers::decl(), op_http_write::decl(), @@ -329,11 +328,63 @@ impl HttpStreamResource { } } +impl HttpStreamResource { + async fn read( + self: Rc<Self>, + mut buf: ZeroCopyBuf, + ) -> Result<(usize, ZeroCopyBuf), AnyError> { + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + + let body = loop { + match &mut *rd { + HttpRequestReader::Headers(_) => {} + HttpRequestReader::Body(_, body) => break body, + HttpRequestReader::Closed => return Ok((0, buf)), + } + match take(&mut *rd) { + HttpRequestReader::Headers(request) => { + let (parts, body) = request.into_parts(); + *rd = HttpRequestReader::Body(parts.headers, body.peekable()); + } + _ => unreachable!(), + }; + }; + + let fut = async { + let mut body = Pin::new(body); + loop { + match body.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(buf.len(), chunk.len()); + buf[..len].copy_from_slice(&chunk.split_to(len)); + break Ok((len, buf)); + } + Some(_) => match body.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok((0, buf)), + } + } + }; + + let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); + fut.try_or_cancel(cancel_handle).await + } +} + impl Resource for HttpStreamResource { fn name(&self) -> Cow<str> { "httpStream".into() } + fn read_return( + self: Rc<Self>, + _buf: ZeroCopyBuf, + ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> { + Box::pin(self.read(_buf)) + } + fn close(self: Rc<Self>) { self.cancel_handle.cancel(); } @@ -817,55 +868,6 @@ async fn op_http_shutdown( } #[op] -async fn op_http_read( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - mut buf: ZeroCopyBuf, -) -> Result<usize, AnyError> { - let stream = state - .borrow_mut() - .resource_table - .get::<HttpStreamResource>(rid)?; - let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; - - let body = loop { - match &mut *rd { - HttpRequestReader::Headers(_) => {} - HttpRequestReader::Body(_, body) => break body, - HttpRequestReader::Closed => return Ok(0), - } - match take(&mut *rd) { - HttpRequestReader::Headers(request) => { - let (parts, body) = request.into_parts(); - *rd = HttpRequestReader::Body(parts.headers, body.peekable()); - } - _ => unreachable!(), - }; - }; - - let fut = async { - let mut body = Pin::new(body); - loop { - match body.as_mut().peek_mut().await { - Some(Ok(chunk)) if !chunk.is_empty() => { - let len = min(buf.len(), chunk.len()); - buf[..len].copy_from_slice(&chunk.split_to(len)); - break Ok(len); - } - Some(_) => match body.as_mut().next().await.unwrap() { - Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), - }, - None => break Ok(0), - } - } - }; - - let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle); - fut.try_or_cancel(cancel_handle).await -} - -#[op] fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> { let digest = ring::digest::digest( &ring::digest::SHA1_FOR_LEGACY_USE_ONLY, |