summaryrefslogtreecommitdiff
path: root/ext/http/lib.rs
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2022-09-30 07:54:12 +0200
committerGitHub <noreply@github.com>2022-09-30 07:54:12 +0200
commit20c7300412bdb487fc758577d6256bbcf96efd12 (patch)
tree2dcd218a6095a2ad143fb27e304391b5fe64cf27 /ext/http/lib.rs
parent38f544538b337074cbce317e67859a69bb23684c (diff)
refactor(ext/http): remove op_http_read (#16096)
We can use Resource::read_return & op_read instead. This allows HTTP request bodies to participate in FastStream. To make this work, `readableStreamForRid` required a change to allow non auto-closing resources to be handled. This required some minor changes in our FastStream paths in ext/http and ext/flash.
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r--ext/http/lib.rs102
1 files changed, 52 insertions, 50 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index d1b38fb42..bffe3c3d5 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -78,7 +78,6 @@ pub fn init() -> Extension {
))
.ops(vec![
op_http_accept::decl(),
- op_http_read::decl(),
op_http_write_headers::decl(),
op_http_headers::decl(),
op_http_write::decl(),
@@ -329,11 +328,63 @@ impl HttpStreamResource {
}
}
+impl HttpStreamResource {
+ async fn read(
+ self: Rc<Self>,
+ mut buf: ZeroCopyBuf,
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
+
+ let body = loop {
+ match &mut *rd {
+ HttpRequestReader::Headers(_) => {}
+ HttpRequestReader::Body(_, body) => break body,
+ HttpRequestReader::Closed => return Ok((0, buf)),
+ }
+ match take(&mut *rd) {
+ HttpRequestReader::Headers(request) => {
+ let (parts, body) = request.into_parts();
+ *rd = HttpRequestReader::Body(parts.headers, body.peekable());
+ }
+ _ => unreachable!(),
+ };
+ };
+
+ let fut = async {
+ let mut body = Pin::new(body);
+ loop {
+ match body.as_mut().peek_mut().await {
+ Some(Ok(chunk)) if !chunk.is_empty() => {
+ let len = min(buf.len(), chunk.len());
+ buf[..len].copy_from_slice(&chunk.split_to(len));
+ break Ok((len, buf));
+ }
+ Some(_) => match body.as_mut().next().await.unwrap() {
+ Ok(chunk) => assert!(chunk.is_empty()),
+ Err(err) => break Err(AnyError::from(err)),
+ },
+ None => break Ok((0, buf)),
+ }
+ }
+ };
+
+ let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
+ fut.try_or_cancel(cancel_handle).await
+ }
+}
+
impl Resource for HttpStreamResource {
fn name(&self) -> Cow<str> {
"httpStream".into()
}
+ fn read_return(
+ self: Rc<Self>,
+ _buf: ZeroCopyBuf,
+ ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> {
+ Box::pin(self.read(_buf))
+ }
+
fn close(self: Rc<Self>) {
self.cancel_handle.cancel();
}
@@ -817,55 +868,6 @@ async fn op_http_shutdown(
}
#[op]
-async fn op_http_read(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- mut buf: ZeroCopyBuf,
-) -> Result<usize, AnyError> {
- let stream = state
- .borrow_mut()
- .resource_table
- .get::<HttpStreamResource>(rid)?;
- let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
-
- let body = loop {
- match &mut *rd {
- HttpRequestReader::Headers(_) => {}
- HttpRequestReader::Body(_, body) => break body,
- HttpRequestReader::Closed => return Ok(0),
- }
- match take(&mut *rd) {
- HttpRequestReader::Headers(request) => {
- let (parts, body) = request.into_parts();
- *rd = HttpRequestReader::Body(parts.headers, body.peekable());
- }
- _ => unreachable!(),
- };
- };
-
- let fut = async {
- let mut body = Pin::new(body);
- loop {
- match body.as_mut().peek_mut().await {
- Some(Ok(chunk)) if !chunk.is_empty() => {
- let len = min(buf.len(), chunk.len());
- buf[..len].copy_from_slice(&chunk.split_to(len));
- break Ok(len);
- }
- Some(_) => match body.as_mut().next().await.unwrap() {
- Ok(chunk) => assert!(chunk.is_empty()),
- Err(err) => break Err(AnyError::from(err)),
- },
- None => break Ok(0),
- }
- }
- };
-
- let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle);
- fut.try_or_cancel(cancel_handle).await
-}
-
-#[op]
fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> {
let digest = ring::digest::digest(
&ring::digest::SHA1_FOR_LEGACY_USE_ONLY,