diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-09-15 21:51:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-15 21:51:25 +0200 |
commit | 5a1505db67d0326bf37b765c8a566584b44a2c1a (patch) | |
tree | 4f1cadc6bc85f8538cc0c21ebc7354e404d48bcf /ext/node/ops/http2.rs | |
parent | 11f0ccf8052065e37f92d8ba43e9624fcd3b9814 (diff) |
feat(ext/node): http2.connect() API (#19671)
This commit improves compatibility of "node:http2" module by polyfilling
"connect" method and "ClientHttp2Session" class. Basic operations like
streaming, header and trailer handling are working correctly.
Refing/unrefing is still a TODO and "npm:grpc-js/grpc" is not yet working
correctly.
---------
Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'ext/node/ops/http2.rs')
-rw-r--r-- | ext/node/ops/http2.rs | 550 |
1 files changed, 550 insertions, 0 deletions
diff --git a/ext/node/ops/http2.rs b/ext/node/ops/http2.rs new file mode 100644 index 000000000..3e9fb5586 --- /dev/null +++ b/ext/node/ops/http2.rs @@ -0,0 +1,550 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::borrow::Cow; +use std::cell::RefCell; +use std::collections::HashMap; +use std::rc::Rc; +use std::task::Poll; + +use bytes::Bytes; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::op; +use deno_core::serde::Serialize; +use deno_core::AsyncRefCell; +use deno_core::ByteString; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::JsBuffer; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_net::raw::take_network_stream_resource; +use deno_net::raw::NetworkStream; +use h2; +use h2::RecvStream; +use http; +use http::request::Parts; +use http::HeaderMap; +use http::Response; +use http::StatusCode; +use reqwest::header::HeaderName; +use reqwest::header::HeaderValue; +use url::Url; + +pub struct Http2Client { + pub client: AsyncRefCell<h2::client::SendRequest<Bytes>>, + pub url: Url, +} + +impl Resource for Http2Client { + fn name(&self) -> Cow<str> { + "http2Client".into() + } +} + +#[derive(Debug)] +pub struct Http2ClientConn { + pub conn: AsyncRefCell<h2::client::Connection<NetworkStream>>, + cancel_handle: CancelHandle, +} + +impl Resource for Http2ClientConn { + fn name(&self) -> Cow<str> { + "http2ClientConnection".into() + } + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel() + } +} + +#[derive(Debug)] +pub struct Http2ClientStream { + pub response: AsyncRefCell<h2::client::ResponseFuture>, + pub stream: AsyncRefCell<h2::SendStream<Bytes>>, +} + +impl Resource for Http2ClientStream { + fn name(&self) -> Cow<str> { + "http2ClientStream".into() + } +} + +#[derive(Debug)] +pub struct Http2ClientResponseBody { + pub body: AsyncRefCell<h2::RecvStream>, + pub trailers_rx: + AsyncRefCell<Option<tokio::sync::oneshot::Receiver<Option<HeaderMap>>>>, + pub trailers_tx: + AsyncRefCell<Option<tokio::sync::oneshot::Sender<Option<HeaderMap>>>>, +} + +impl Resource for Http2ClientResponseBody { + fn name(&self) -> Cow<str> { + "http2ClientResponseBody".into() + } +} + +#[derive(Debug)] +pub struct Http2ServerConnection { + pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, Bytes>>, +} + +impl Resource for Http2ServerConnection { + fn name(&self) -> Cow<str> { + "http2ServerConnection".into() + } +} + +pub struct Http2ServerSendResponse { + pub send_response: AsyncRefCell<h2::server::SendResponse<Bytes>>, +} + +impl Resource for Http2ServerSendResponse { + fn name(&self) -> Cow<str> { + "http2ServerSendResponse".into() + } +} + +#[op] +pub async fn op_http2_connect( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + url: String, +) -> Result<(ResourceId, ResourceId), AnyError> { + // No permission check necessary because we're using an existing connection + let network_stream = { + let mut state = state.borrow_mut(); + take_network_stream_resource(&mut state.resource_table, rid)? + }; + + let url = Url::parse(&url)?; + + let (client, conn) = h2::client::handshake(network_stream).await?; + let mut state = state.borrow_mut(); + let client_rid = state.resource_table.add(Http2Client { + client: AsyncRefCell::new(client), + url, + }); + let conn_rid = state.resource_table.add(Http2ClientConn { + conn: AsyncRefCell::new(conn), + cancel_handle: CancelHandle::new(), + }); + Ok((client_rid, conn_rid)) +} + +#[op] +pub async fn op_http2_listen( + state: Rc<RefCell<OpState>>, + rid: ResourceId, +) -> Result<ResourceId, AnyError> { + let stream = + take_network_stream_resource(&mut state.borrow_mut().resource_table, rid)?; + + let conn = h2::server::handshake(stream).await?; + Ok( + state + .borrow_mut() + .resource_table + .add(Http2ServerConnection { + conn: AsyncRefCell::new(conn), + }), + ) +} + +#[op] +pub async fn op_http2_accept( + state: Rc<RefCell<OpState>>, + rid: ResourceId, +) -> Result< + Option<(Vec<(ByteString, ByteString)>, ResourceId, ResourceId)>, + AnyError, +> { + let resource = state + .borrow() + .resource_table + .get::<Http2ServerConnection>(rid)?; + let mut conn = RcRef::map(&resource, |r| &r.conn).borrow_mut().await; + if let Some(res) = conn.accept().await { + let (req, resp) = res?; + let (parts, body) = req.into_parts(); + let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel(); + let stm = state + .borrow_mut() + .resource_table + .add(Http2ClientResponseBody { + body: AsyncRefCell::new(body), + trailers_rx: AsyncRefCell::new(Some(trailers_rx)), + trailers_tx: AsyncRefCell::new(Some(trailers_tx)), + }); + + let Parts { + uri, + method, + headers, + .. + } = parts; + let mut req_headers = Vec::with_capacity(headers.len() + 4); + req_headers.push(( + ByteString::from(":method"), + ByteString::from(method.as_str()), + )); + req_headers.push(( + ByteString::from(":scheme"), + ByteString::from(uri.scheme().map(|s| s.as_str()).unwrap_or("http")), + )); + req_headers.push(( + ByteString::from(":path"), + ByteString::from(uri.path_and_query().map(|p| p.as_str()).unwrap_or("")), + )); + req_headers.push(( + ByteString::from(":authority"), + ByteString::from(uri.authority().map(|a| a.as_str()).unwrap_or("")), + )); + for (key, val) in headers.iter() { + req_headers.push((key.as_str().into(), val.as_bytes().into())); + } + + let resp = state + .borrow_mut() + .resource_table + .add(Http2ServerSendResponse { + send_response: AsyncRefCell::new(resp), + }); + + Ok(Some((req_headers, stm, resp))) + } else { + Ok(None) + } +} + +#[op] +pub async fn op_http2_send_response( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + status: u16, + headers: Vec<(ByteString, ByteString)>, +) -> Result<(ResourceId, u32), AnyError> { + let resource = state + .borrow() + .resource_table + .get::<Http2ServerSendResponse>(rid)?; + let mut send_response = RcRef::map(resource, |r| &r.send_response) + .borrow_mut() + .await; + let mut response = Response::new(()); + if let Ok(status) = StatusCode::from_u16(status) { + *response.status_mut() = status; + } + for (name, value) in headers { + response.headers_mut().append( + HeaderName::from_lowercase(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } + + let stream = send_response.send_response(response, false)?; + let stream_id = stream.stream_id(); + + Ok((rid, stream_id.into())) +} + +#[op] +pub async fn op_http2_poll_client_connection( + state: Rc<RefCell<OpState>>, + rid: ResourceId, +) -> Result<(), AnyError> { + let resource = state.borrow().resource_table.get::<Http2ClientConn>(rid)?; + + let cancel_handle = RcRef::map(resource.clone(), |this| &this.cancel_handle); + let mut conn = RcRef::map(resource, |this| &this.conn).borrow_mut().await; + + match (&mut *conn).or_cancel(cancel_handle).await { + Ok(result) => result?, + Err(_) => { + // TODO(bartlomieju): probably need a better mechanism for closing the connection + + // cancelled + } + } + + Ok(()) +} + +#[op] +pub async fn op_http2_client_request( + state: Rc<RefCell<OpState>>, + client_rid: ResourceId, + // TODO(bartlomieju): maybe use a vector with fixed layout to save sending + // 4 strings of keys? + mut pseudo_headers: HashMap<String, String>, + headers: Vec<(ByteString, ByteString)>, +) -> Result<(ResourceId, u32), AnyError> { + let resource = state + .borrow() + .resource_table + .get::<Http2Client>(client_rid)?; + + let url = resource.url.clone(); + + let pseudo_path = pseudo_headers.remove(":path").unwrap_or("/".to_string()); + let pseudo_method = pseudo_headers + .remove(":method") + .unwrap_or("GET".to_string()); + // TODO(bartlomieju): handle all pseudo-headers (:authority, :scheme) + let _pseudo_authority = pseudo_headers + .remove(":authority") + .unwrap_or("/".to_string()); + let _pseudo_scheme = pseudo_headers + .remove(":scheme") + .unwrap_or("http".to_string()); + + let url = url.join(&pseudo_path)?; + + let mut req = http::Request::builder() + .uri(url.as_str()) + .method(pseudo_method.as_str()); + + for (name, value) in headers { + req.headers_mut().unwrap().append( + HeaderName::from_lowercase(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } + + let request = req.body(()).unwrap(); + + let resource = { + let state = state.borrow(); + state.resource_table.get::<Http2Client>(client_rid)? + }; + let mut client = RcRef::map(&resource, |r| &r.client).borrow_mut().await; + poll_fn(|cx| client.poll_ready(cx)).await?; + let (response, stream) = client.send_request(request, false).unwrap(); + let stream_id = stream.stream_id(); + let stream_rid = state.borrow_mut().resource_table.add(Http2ClientStream { + response: AsyncRefCell::new(response), + stream: AsyncRefCell::new(stream), + }); + Ok((stream_rid, stream_id.into())) +} + +#[op] +pub async fn op_http2_client_send_data( + state: Rc<RefCell<OpState>>, + stream_rid: ResourceId, + data: JsBuffer, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::<Http2ClientStream>(stream_rid)?; + let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; + + // TODO(bartlomieju): handle end of stream + stream.send_data(bytes::Bytes::from(data), false)?; + Ok(()) +} + +#[op] +pub async fn op_http2_client_end_stream( + state: Rc<RefCell<OpState>>, + stream_rid: ResourceId, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::<Http2ClientStream>(stream_rid)?; + let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; + + // TODO(bartlomieju): handle end of stream + stream.send_data(bytes::Bytes::from(vec![]), true)?; + Ok(()) +} + +#[op] +pub async fn op_http2_client_reset_stream( + state: Rc<RefCell<OpState>>, + stream_rid: ResourceId, + code: u32, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::<Http2ClientStream>(stream_rid)?; + let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; + stream.send_reset(h2::Reason::from(code)); + Ok(()) +} + +#[op] +pub async fn op_http2_client_send_trailers( + state: Rc<RefCell<OpState>>, + stream_rid: ResourceId, + trailers: Vec<(ByteString, ByteString)>, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::<Http2ClientStream>(stream_rid)?; + let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; + + let mut trailers_map = http::HeaderMap::new(); + for (name, value) in trailers { + trailers_map.insert( + HeaderName::from_bytes(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } + + stream.send_trailers(trailers_map)?; + Ok(()) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Http2ClientResponse { + headers: Vec<(ByteString, ByteString)>, + body_rid: ResourceId, + status_code: u16, +} + +#[op] +pub async fn op_http2_client_get_response( + state: Rc<RefCell<OpState>>, + stream_rid: ResourceId, +) -> Result<Http2ClientResponse, AnyError> { + let resource = state + .borrow() + .resource_table + .get::<Http2ClientStream>(stream_rid)?; + let mut response_future = + RcRef::map(&resource, |r| &r.response).borrow_mut().await; + + let response = (&mut *response_future).await?; + + let (parts, body) = response.into_parts(); + let status = parts.status; + let mut res_headers = Vec::new(); + for (key, val) in parts.headers.iter() { + res_headers.push((key.as_str().into(), val.as_bytes().into())); + } + + let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel(); + let body_rid = + state + .borrow_mut() + .resource_table + .add(Http2ClientResponseBody { + body: AsyncRefCell::new(body), + trailers_rx: AsyncRefCell::new(Some(trailers_rx)), + trailers_tx: AsyncRefCell::new(Some(trailers_tx)), + }); + Ok(Http2ClientResponse { + headers: res_headers, + body_rid, + status_code: status.into(), + }) +} + +enum DataOrTrailers { + Data(Bytes), + Trailers(HeaderMap), + Eof, +} + +fn poll_data_or_trailers( + cx: &mut std::task::Context, + body: &mut RecvStream, +) -> Poll<Result<DataOrTrailers, h2::Error>> { + loop { + if let Poll::Ready(trailers) = body.poll_trailers(cx) { + if let Some(trailers) = trailers? { + return Poll::Ready(Ok(DataOrTrailers::Trailers(trailers))); + } else { + return Poll::Ready(Ok(DataOrTrailers::Eof)); + } + } + if let Poll::Ready(data) = body.poll_data(cx) { + if let Some(data) = data { + return Poll::Ready(Ok(DataOrTrailers::Data(data?))); + } + // If data is None, loop one more time to check for trailers + continue; + } + // Return pending here as poll_data will keep the waker + return Poll::Pending; + } +} + +#[op] +pub async fn op_http2_client_get_response_body_chunk( + state: Rc<RefCell<OpState>>, + body_rid: ResourceId, +) -> Result<(Option<Vec<u8>>, bool), AnyError> { + let resource = state + .borrow() + .resource_table + .get::<Http2ClientResponseBody>(body_rid)?; + let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + + loop { + match poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await? { + DataOrTrailers::Data(data) => { + return Ok((Some(data.to_vec()), false)); + } + DataOrTrailers::Trailers(trailers) => { + println!("{trailers:?}"); + if let Some(trailers_tx) = RcRef::map(&resource, |r| &r.trailers_tx) + .borrow_mut() + .await + .take() + { + _ = trailers_tx.send(Some(trailers)); + }; + + continue; + } + DataOrTrailers::Eof => { + RcRef::map(&resource, |r| &r.trailers_tx) + .borrow_mut() + .await + .take(); + return Ok((None, true)); + } + }; + } +} + +#[op] +pub async fn op_http2_client_get_response_trailers( + state: Rc<RefCell<OpState>>, + body_rid: ResourceId, +) -> Result<Option<Vec<(ByteString, ByteString)>>, AnyError> { + let resource = state + .borrow() + .resource_table + .get::<Http2ClientResponseBody>(body_rid)?; + let trailers = RcRef::map(&resource, |r| &r.trailers_rx) + .borrow_mut() + .await + .take(); + if let Some(trailers) = trailers { + if let Ok(Some(trailers)) = trailers.await { + let mut v = Vec::with_capacity(trailers.len()); + for (key, value) in trailers.iter() { + v.push(( + ByteString::from(key.as_str()), + ByteString::from(value.as_bytes()), + )); + } + Ok(Some(v)) + } else { + Ok(None) + } + } else { + Ok(None) + } +} |