summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-23 08:58:20 -0700
committerGitHub <noreply@github.com>2023-12-23 08:58:20 -0700
commit1297c9a8f379d89691522c5cc0c6071c479e95a1 (patch)
tree7462b173b6b8106ea6255da67c265fcfd3a20c0d
parent36536c784ca981ae01d258d4239b2a362017d533 (diff)
chore(ext/node): use BufView natively in http2 (#21688)
Node HTTP/2 was using the default h2 `Bytes` datatype when we can be making using of `BufView` like we do in `Deno.serve`. `fetch` and `Deno.serverHttp` can't make use of `BufView` because they are using `reqwest` which is stuck on hyper 0.x at this time.
-rw-r--r--cli/ops/jupyter.rs2
-rw-r--r--ext/fetch/lib.rs7
-rw-r--r--ext/http/lib.rs6
-rw-r--r--ext/node/ops/http2.rs20
4 files changed, 19 insertions, 16 deletions
diff --git a/cli/ops/jupyter.rs b/cli/ops/jupyter.rs
index f63edebe8..7d12cb159 100644
--- a/cli/ops/jupyter.rs
+++ b/cli/ops/jupyter.rs
@@ -55,7 +55,7 @@ pub async fn op_jupyter_broadcast(
.new_message(&message_type)
.with_content(content)
.with_metadata(metadata)
- .with_buffers(buffers.into_iter().map(|b| b.into()).collect())
+ .with_buffers(buffers.into_iter().map(|b| b.to_vec().into()).collect())
.send(&mut *iopub_socket.lock().await)
.await?;
}
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index 6e1ecb5e4..737bc45c1 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -14,6 +14,7 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
+use bytes::Bytes;
use deno_core::anyhow::Error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
@@ -233,7 +234,7 @@ unsafe impl Send for ResourceToBodyAdapter {}
unsafe impl Sync for ResourceToBodyAdapter {}
impl Stream for ResourceToBodyAdapter {
- type Item = Result<BufView, Error>;
+ type Item = Result<Bytes, Error>;
fn poll_next(
self: Pin<&mut Self>,
@@ -250,9 +251,9 @@ impl Stream for ResourceToBodyAdapter {
Ok(buf) if buf.is_empty() => Poll::Ready(None),
Ok(_) => {
this.1 = Some(this.0.clone().read(64 * 1024));
- Poll::Ready(Some(res))
+ Poll::Ready(Some(res.map(|b| b.to_vec().into())))
}
- _ => Poll::Ready(Some(res)),
+ _ => Poll::Ready(Some(res.map(|b| b.to_vec().into()))),
},
}
} else {
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 6a71c74e3..cae2fcfcc 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -739,7 +739,7 @@ fn http_response(
Some(data) => {
// If a buffer was passed, but isn't compressible, we use it to
// construct a response body.
- Ok((HttpResponseWriter::Closed, Bytes::from(data).into()))
+ Ok((HttpResponseWriter::Closed, data.to_vec().into()))
}
None if compressing => {
// Create a one way pipe that implements tokio's async io traits. To do
@@ -881,7 +881,7 @@ async fn op_http_write_resource(
}
}
HttpResponseWriter::BodyUncompressed(body) => {
- let bytes = Bytes::from(view);
+ let bytes = view.to_vec().into();
if let Err(err) = body.sender().send_data(bytes).await {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
@@ -930,7 +930,7 @@ async fn op_http_write(
}
}
HttpResponseWriter::BodyUncompressed(body) => {
- let bytes = Bytes::from(buf);
+ let bytes = Bytes::from(buf.to_vec());
match body.sender().send_data(bytes).await {
Ok(_) => Ok(()),
Err(err) => {
diff --git a/ext/node/ops/http2.rs b/ext/node/ops/http2.rs
index 676ef7e6e..353a42e8b 100644
--- a/ext/node/ops/http2.rs
+++ b/ext/node/ops/http2.rs
@@ -12,6 +12,7 @@ use deno_core::futures::future::poll_fn;
use deno_core::op2;
use deno_core::serde::Serialize;
use deno_core::AsyncRefCell;
+use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@@ -34,7 +35,7 @@ use reqwest::header::HeaderValue;
use url::Url;
pub struct Http2Client {
- pub client: AsyncRefCell<h2::client::SendRequest<Bytes>>,
+ pub client: AsyncRefCell<h2::client::SendRequest<BufView>>,
pub url: Url,
}
@@ -46,7 +47,7 @@ impl Resource for Http2Client {
#[derive(Debug)]
pub struct Http2ClientConn {
- pub conn: AsyncRefCell<h2::client::Connection<NetworkStream>>,
+ pub conn: AsyncRefCell<h2::client::Connection<NetworkStream, BufView>>,
cancel_handle: CancelHandle,
}
@@ -63,7 +64,7 @@ impl Resource for Http2ClientConn {
#[derive(Debug)]
pub struct Http2ClientStream {
pub response: AsyncRefCell<h2::client::ResponseFuture>,
- pub stream: AsyncRefCell<h2::SendStream<Bytes>>,
+ pub stream: AsyncRefCell<h2::SendStream<BufView>>,
}
impl Resource for Http2ClientStream {
@@ -89,7 +90,7 @@ impl Resource for Http2ClientResponseBody {
#[derive(Debug)]
pub struct Http2ServerConnection {
- pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, Bytes>>,
+ pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, BufView>>,
}
impl Resource for Http2ServerConnection {
@@ -99,7 +100,7 @@ impl Resource for Http2ServerConnection {
}
pub struct Http2ServerSendResponse {
- pub send_response: AsyncRefCell<h2::server::SendResponse<Bytes>>,
+ pub send_response: AsyncRefCell<h2::server::SendResponse<BufView>>,
}
impl Resource for Http2ServerSendResponse {
@@ -123,7 +124,8 @@ pub async fn op_http2_connect(
let url = Url::parse(&url)?;
- let (client, conn) = h2::client::handshake(network_stream).await?;
+ let (client, conn) =
+ h2::client::Builder::new().handshake(network_stream).await?;
let mut state = state.borrow_mut();
let client_rid = state.resource_table.add(Http2Client {
client: AsyncRefCell::new(client),
@@ -145,7 +147,7 @@ pub async fn op_http2_listen(
let stream =
take_network_stream_resource(&mut state.borrow_mut().resource_table, rid)?;
- let conn = h2::server::handshake(stream).await?;
+ let conn = h2::server::Builder::new().handshake(stream).await?;
Ok(
state
.borrow_mut()
@@ -349,7 +351,7 @@ pub async fn op_http2_client_send_data(
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
// TODO(bartlomieju): handle end of stream
- stream.send_data(bytes::Bytes::from(data), false)?;
+ stream.send_data(data.to_vec().into(), false)?;
Ok(())
}
@@ -365,7 +367,7 @@ pub async fn op_http2_client_end_stream(
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
// TODO(bartlomieju): handle end of stream
- stream.send_data(bytes::Bytes::from(vec![]), true)?;
+ stream.send_data(BufView::empty(), true)?;
Ok(())
}