summaryrefslogtreecommitdiff
path: root/ext/http/lib.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2024-04-08 22:02:49 +0100
committerGitHub <noreply@github.com>2024-04-08 15:02:49 -0600
commitd3b63bb315c7573974d8bd79dcfb6849cb29cf4e (patch)
tree6f216dc558642f06bd9439e1b696118eab4622b3 /ext/http/lib.rs
parentcb12a9350332860971387e3a1fb40dc77fa992d3 (diff)
fix(ext/http): Make `Deno.serveHttp()` work when proxying (#23269)
Closes https://github.com/denoland/deno/issues/21900
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r--ext/http/lib.rs99
1 files changed, 66 insertions, 33 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 129aaac47..df31b9c44 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -209,7 +209,15 @@ impl HttpConnResource {
// Accepts a new incoming HTTP request.
async fn accept(
self: &Rc<Self>,
- ) -> Result<Option<(HttpStreamResource, String, String)>, AnyError> {
+ ) -> Result<
+ Option<(
+ HttpStreamReadResource,
+ HttpStreamWriteResource,
+ String,
+ String,
+ )>,
+ AnyError,
+ > {
let fut = async {
let (request_tx, request_rx) = oneshot::channel();
let (response_tx, response_rx) = oneshot::channel();
@@ -218,7 +226,6 @@ impl HttpConnResource {
self.acceptors_tx.unbounded_send(acceptor).ok()?;
let request = request_rx.await.ok()?;
-
let accept_encoding = {
let encodings =
fly_accept_encoding::encodings_iter_http_02(request.headers())
@@ -234,9 +241,10 @@ impl HttpConnResource {
let method = request.method().to_string();
let url = req_url(&request, self.scheme, &self.addr);
- let stream =
- HttpStreamResource::new(self, request, response_tx, accept_encoding);
- Some((stream, method, url))
+ let read_stream = HttpStreamReadResource::new(self, request);
+ let write_stream =
+ HttpStreamWriteResource::new(self, response_tx, accept_encoding);
+ Some((read_stream, write_stream, method, url))
};
async {
@@ -348,38 +356,34 @@ impl HttpAcceptor {
}
}
-/// A resource representing a single HTTP request/response stream.
-pub struct HttpStreamResource {
- conn: Rc<HttpConnResource>,
+pub struct HttpStreamReadResource {
+ _conn: Rc<HttpConnResource>,
pub rd: AsyncRefCell<HttpRequestReader>,
- wr: AsyncRefCell<HttpResponseWriter>,
- accept_encoding: Encoding,
cancel_handle: CancelHandle,
size: SizeHint,
}
-impl HttpStreamResource {
- fn new(
- conn: &Rc<HttpConnResource>,
- request: Request<Body>,
- response_tx: oneshot::Sender<Response<Body>>,
- accept_encoding: Encoding,
- ) -> Self {
+pub struct HttpStreamWriteResource {
+ conn: Rc<HttpConnResource>,
+ wr: AsyncRefCell<HttpResponseWriter>,
+ accept_encoding: Encoding,
+}
+
+impl HttpStreamReadResource {
+ fn new(conn: &Rc<HttpConnResource>, request: Request<Body>) -> Self {
let size = request.body().size_hint();
Self {
- conn: conn.clone(),
+ _conn: conn.clone(),
rd: HttpRequestReader::Headers(request).into(),
- wr: HttpResponseWriter::Headers(response_tx).into(),
- accept_encoding,
size,
cancel_handle: CancelHandle::new(),
}
}
}
-impl Resource for HttpStreamResource {
+impl Resource for HttpStreamReadResource {
fn name(&self) -> Cow<str> {
- "httpStream".into()
+ "httpReadStream".into()
}
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
@@ -440,6 +444,26 @@ impl Resource for HttpStreamResource {
}
}
+impl HttpStreamWriteResource {
+ fn new(
+ conn: &Rc<HttpConnResource>,
+ response_tx: oneshot::Sender<Response<Body>>,
+ accept_encoding: Encoding,
+ ) -> Self {
+ Self {
+ conn: conn.clone(),
+ wr: HttpResponseWriter::Headers(response_tx).into(),
+ accept_encoding,
+ }
+ }
+}
+
+impl Resource for HttpStreamWriteResource {
+ fn name(&self) -> Cow<str> {
+ "httpWriteStream".into()
+ }
+}
+
/// The read half of an HTTP stream.
pub enum HttpRequestReader {
Headers(Request<Body>),
@@ -504,7 +528,9 @@ impl Drop for BodyUncompressedSender {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct NextRequestResponse(
- // stream_rid:
+ // read_stream_rid:
+ ResourceId,
+ // write_stream_rid:
ResourceId,
// method:
// This is a String rather than a ByteString because reqwest will only return
@@ -523,10 +549,17 @@ async fn op_http_accept(
let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?;
match conn.accept().await {
- Ok(Some((stream, method, url))) => {
- let stream_rid =
- state.borrow_mut().resource_table.add_rc(Rc::new(stream));
- let r = NextRequestResponse(stream_rid, method, url);
+ Ok(Some((read_stream, write_stream, method, url))) => {
+ let read_stream_rid = state
+ .borrow_mut()
+ .resource_table
+ .add_rc(Rc::new(read_stream));
+ let write_stream_rid = state
+ .borrow_mut()
+ .resource_table
+ .add_rc(Rc::new(write_stream));
+ let r =
+ NextRequestResponse(read_stream_rid, write_stream_rid, method, url);
Ok(Some(r))
}
Ok(None) => Ok(None),
@@ -628,7 +661,7 @@ async fn op_http_write_headers(
let stream = state
.borrow_mut()
.resource_table
- .get::<HttpStreamResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)?;
// Track supported encoding
let encoding = stream.accept_encoding;
@@ -693,7 +726,7 @@ fn op_http_headers(
state: &mut OpState,
#[smi] rid: u32,
) -> Result<Vec<(ByteString, ByteString)>, AnyError> {
- let stream = state.resource_table.get::<HttpStreamResource>(rid)?;
+ let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?;
let rd = RcRef::map(&stream, |r| &r.rd)
.try_borrow()
.ok_or_else(|| http_error("already in use"))?;
@@ -849,7 +882,7 @@ async fn op_http_write_resource(
let http_stream = state
.borrow()
.resource_table
- .get::<HttpStreamResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)?;
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
let resource = state.borrow().resource_table.get_any(stream)?;
loop {
@@ -908,7 +941,7 @@ async fn op_http_write(
let stream = state
.borrow()
.resource_table
- .get::<HttpStreamResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
match &mut *wr {
@@ -960,7 +993,7 @@ async fn op_http_shutdown(
let stream = state
.borrow()
.resource_table
- .get::<HttpStreamResource>(rid)?;
+ .get::<HttpStreamWriteResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let wr = take(&mut *wr);
match wr {
@@ -1008,7 +1041,7 @@ async fn op_http_upgrade_websocket(
let stream = state
.borrow_mut()
.resource_table
- .get::<HttpStreamResource>(rid)?;
+ .get::<HttpStreamReadResource>(rid)?;
let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
let request = match &mut *rd {