diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2024-04-08 22:02:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-08 15:02:49 -0600 |
commit | d3b63bb315c7573974d8bd79dcfb6849cb29cf4e (patch) | |
tree | 6f216dc558642f06bd9439e1b696118eab4622b3 /ext/http/lib.rs | |
parent | cb12a9350332860971387e3a1fb40dc77fa992d3 (diff) |
fix(ext/http): Make `Deno.serveHttp()` work when proxying (#23269)
Closes https://github.com/denoland/deno/issues/21900
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 99 |
1 files changed, 66 insertions, 33 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 129aaac47..df31b9c44 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -209,7 +209,15 @@ impl HttpConnResource { // Accepts a new incoming HTTP request. async fn accept( self: &Rc<Self>, - ) -> Result<Option<(HttpStreamResource, String, String)>, AnyError> { + ) -> Result< + Option<( + HttpStreamReadResource, + HttpStreamWriteResource, + String, + String, + )>, + AnyError, + > { let fut = async { let (request_tx, request_rx) = oneshot::channel(); let (response_tx, response_rx) = oneshot::channel(); @@ -218,7 +226,6 @@ impl HttpConnResource { self.acceptors_tx.unbounded_send(acceptor).ok()?; let request = request_rx.await.ok()?; - let accept_encoding = { let encodings = fly_accept_encoding::encodings_iter_http_02(request.headers()) @@ -234,9 +241,10 @@ impl HttpConnResource { let method = request.method().to_string(); let url = req_url(&request, self.scheme, &self.addr); - let stream = - HttpStreamResource::new(self, request, response_tx, accept_encoding); - Some((stream, method, url)) + let read_stream = HttpStreamReadResource::new(self, request); + let write_stream = + HttpStreamWriteResource::new(self, response_tx, accept_encoding); + Some((read_stream, write_stream, method, url)) }; async { @@ -348,38 +356,34 @@ impl HttpAcceptor { } } -/// A resource representing a single HTTP request/response stream. -pub struct HttpStreamResource { - conn: Rc<HttpConnResource>, +pub struct HttpStreamReadResource { + _conn: Rc<HttpConnResource>, pub rd: AsyncRefCell<HttpRequestReader>, - wr: AsyncRefCell<HttpResponseWriter>, - accept_encoding: Encoding, cancel_handle: CancelHandle, size: SizeHint, } -impl HttpStreamResource { - fn new( - conn: &Rc<HttpConnResource>, - request: Request<Body>, - response_tx: oneshot::Sender<Response<Body>>, - accept_encoding: Encoding, - ) -> Self { +pub struct HttpStreamWriteResource { + conn: Rc<HttpConnResource>, + wr: AsyncRefCell<HttpResponseWriter>, + accept_encoding: Encoding, +} + +impl HttpStreamReadResource { + fn new(conn: &Rc<HttpConnResource>, request: Request<Body>) -> Self { let size = request.body().size_hint(); Self { - conn: conn.clone(), + _conn: conn.clone(), rd: HttpRequestReader::Headers(request).into(), - wr: HttpResponseWriter::Headers(response_tx).into(), - accept_encoding, size, cancel_handle: CancelHandle::new(), } } } -impl Resource for HttpStreamResource { +impl Resource for HttpStreamReadResource { fn name(&self) -> Cow<str> { - "httpStream".into() + "httpReadStream".into() } fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { @@ -440,6 +444,26 @@ impl Resource for HttpStreamResource { } } +impl HttpStreamWriteResource { + fn new( + conn: &Rc<HttpConnResource>, + response_tx: oneshot::Sender<Response<Body>>, + accept_encoding: Encoding, + ) -> Self { + Self { + conn: conn.clone(), + wr: HttpResponseWriter::Headers(response_tx).into(), + accept_encoding, + } + } +} + +impl Resource for HttpStreamWriteResource { + fn name(&self) -> Cow<str> { + "httpWriteStream".into() + } +} + /// The read half of an HTTP stream. pub enum HttpRequestReader { Headers(Request<Body>), @@ -504,7 +528,9 @@ impl Drop for BodyUncompressedSender { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NextRequestResponse( - // stream_rid: + // read_stream_rid: + ResourceId, + // write_stream_rid: ResourceId, // method: // This is a String rather than a ByteString because reqwest will only return @@ -523,10 +549,17 @@ async fn op_http_accept( let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?; match conn.accept().await { - Ok(Some((stream, method, url))) => { - let stream_rid = - state.borrow_mut().resource_table.add_rc(Rc::new(stream)); - let r = NextRequestResponse(stream_rid, method, url); + Ok(Some((read_stream, write_stream, method, url))) => { + let read_stream_rid = state + .borrow_mut() + .resource_table + .add_rc(Rc::new(read_stream)); + let write_stream_rid = state + .borrow_mut() + .resource_table + .add_rc(Rc::new(write_stream)); + let r = + NextRequestResponse(read_stream_rid, write_stream_rid, method, url); Ok(Some(r)) } Ok(None) => Ok(None), @@ -628,7 +661,7 @@ async fn op_http_write_headers( let stream = state .borrow_mut() .resource_table - .get::<HttpStreamResource>(rid)?; + .get::<HttpStreamWriteResource>(rid)?; // Track supported encoding let encoding = stream.accept_encoding; @@ -693,7 +726,7 @@ fn op_http_headers( state: &mut OpState, #[smi] rid: u32, ) -> Result<Vec<(ByteString, ByteString)>, AnyError> { - let stream = state.resource_table.get::<HttpStreamResource>(rid)?; + let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?; let rd = RcRef::map(&stream, |r| &r.rd) .try_borrow() .ok_or_else(|| http_error("already in use"))?; @@ -849,7 +882,7 @@ async fn op_http_write_resource( let http_stream = state .borrow() .resource_table - .get::<HttpStreamResource>(rid)?; + .get::<HttpStreamWriteResource>(rid)?; let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; let resource = state.borrow().resource_table.get_any(stream)?; loop { @@ -908,7 +941,7 @@ async fn op_http_write( let stream = state .borrow() .resource_table - .get::<HttpStreamResource>(rid)?; + .get::<HttpStreamWriteResource>(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; match &mut *wr { @@ -960,7 +993,7 @@ async fn op_http_shutdown( let stream = state .borrow() .resource_table - .get::<HttpStreamResource>(rid)?; + .get::<HttpStreamWriteResource>(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let wr = take(&mut *wr); match wr { @@ -1008,7 +1041,7 @@ async fn op_http_upgrade_websocket( let stream = state .borrow_mut() .resource_table - .get::<HttpStreamResource>(rid)?; + .get::<HttpStreamReadResource>(rid)?; let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; let request = match &mut *rd { |