diff options
| author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2024-04-08 22:02:49 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-04-08 15:02:49 -0600 |
| commit | d3b63bb315c7573974d8bd79dcfb6849cb29cf4e (patch) | |
| tree | 6f216dc558642f06bd9439e1b696118eab4622b3 /ext | |
| parent | cb12a9350332860971387e3a1fb40dc77fa992d3 (diff) | |
fix(ext/http): Make `Deno.serveHttp()` work when proxying (#23269)
Closes https://github.com/denoland/deno/issues/21900
Diffstat (limited to 'ext')
| -rw-r--r-- | ext/http/01_http.js | 38 | ||||
| -rw-r--r-- | ext/http/lib.rs | 99 |
2 files changed, 88 insertions, 49 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 8bed444ca..a312cf60e 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -137,8 +137,10 @@ class HttpConn { return null; } - const { 0: streamRid, 1: method, 2: url } = nextRequest; - SetPrototypeAdd(this.#managedResources, streamRid); + const { 0: readStreamRid, 1: writeStreamRid, 2: method, 3: url } = + nextRequest; + SetPrototypeAdd(this.#managedResources, readStreamRid); + SetPrototypeAdd(this.#managedResources, writeStreamRid); /** @type {ReadableStream<Uint8Array> | undefined} */ let body = null; @@ -146,17 +148,16 @@ class HttpConn { // It will be closed automatically once the request has been handled and // the response has been sent. if (method !== "GET" && method !== "HEAD") { - body = readableStreamForRid(streamRid, false); + body = readableStreamForRid(readStreamRid, false); } const innerRequest = newInnerRequest( method, url, - () => op_http_headers(streamRid), + () => op_http_headers(readStreamRid), body !== null ? new InnerBody(body) : null, false, ); - innerRequest[streamRid] = streamRid; const abortController = new AbortController(); const request = fromInnerRequest( innerRequest, @@ -167,7 +168,8 @@ class HttpConn { const respondWith = createRespondWith( this, - streamRid, + readStreamRid, + writeStreamRid, abortController, ); @@ -178,10 +180,10 @@ class HttpConn { close() { if (!this.#closed) { this.#closed = true; - core.close(this.#rid); + core.tryClose(this.#rid); for (const rid of new SafeSetIterator(this.#managedResources)) { SetPrototypeDelete(this.#managedResources, rid); - core.close(rid); + core.tryClose(rid); } } } @@ -209,7 +211,8 @@ class HttpConn { function createRespondWith( httpConn, - streamRid, + readStreamRid, + writeStreamRid, abortController, ) { return async function respondWith(resp) { @@ -270,7 +273,7 @@ function createRespondWith( ); try { await op_http_write_headers( - streamRid, + writeStreamRid, innerResp.status ?? 200, innerResp.headerList, isStreamingResponseBody ? null : respBody, @@ -310,7 +313,7 @@ function createRespondWith( reader = respBody.getReader(); // Acquire JS lock. try { await op_http_write_resource( - streamRid, + writeStreamRid, resourceBacking.rid, ); if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid); @@ -340,7 +343,7 @@ function createRespondWith( break; } try { - await op_http_write(streamRid, value); + await op_http_write(writeStreamRid, value); } catch (error) { const connError = httpConn[connErrorSymbol]; if ( @@ -359,7 +362,7 @@ function createRespondWith( if (success) { try { - await op_http_shutdown(streamRid); + await op_http_shutdown(writeStreamRid); } catch (error) { await reader.cancel(error); throw error; @@ -370,7 +373,7 @@ function createRespondWith( const ws = resp[_ws]; if (ws) { const wsRid = await op_http_upgrade_websocket( - streamRid, + readStreamRid, ); ws[_rid] = wsRid; ws[_protocol] = resp.headers.get("sec-websocket-protocol"); @@ -395,8 +398,11 @@ function createRespondWith( abortController.abort(error); throw error; } finally { - if (deleteManagedResource(httpConn, streamRid)) { - core.close(streamRid); + if (deleteManagedResource(httpConn, readStreamRid)) { + core.tryClose(readStreamRid); + } + if (deleteManagedResource(httpConn, writeStreamRid)) { + core.tryClose(writeStreamRid); } } }; diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 129aaac47..df31b9c44 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -209,7 +209,15 @@ impl HttpConnResource { // Accepts a new incoming HTTP request. async fn accept( self: &Rc<Self>, - ) -> Result<Option<(HttpStreamResource, String, String)>, AnyError> { + ) -> Result< + Option<( + HttpStreamReadResource, + HttpStreamWriteResource, + String, + String, + )>, + AnyError, + > { let fut = async { let (request_tx, request_rx) = oneshot::channel(); let (response_tx, response_rx) = oneshot::channel(); @@ -218,7 +226,6 @@ impl HttpConnResource { self.acceptors_tx.unbounded_send(acceptor).ok()?; let request = request_rx.await.ok()?; - let accept_encoding = { let encodings = fly_accept_encoding::encodings_iter_http_02(request.headers()) @@ -234,9 +241,10 @@ impl HttpConnResource { let method = request.method().to_string(); let url = req_url(&request, self.scheme, &self.addr); - let stream = - HttpStreamResource::new(self, request, response_tx, accept_encoding); - Some((stream, method, url)) + let read_stream = HttpStreamReadResource::new(self, request); + let write_stream = + HttpStreamWriteResource::new(self, response_tx, accept_encoding); + Some((read_stream, write_stream, method, url)) }; async { @@ -348,38 +356,34 @@ impl HttpAcceptor { } } -/// A resource representing a single HTTP request/response stream. -pub struct HttpStreamResource { - conn: Rc<HttpConnResource>, +pub struct HttpStreamReadResource { + _conn: Rc<HttpConnResource>, pub rd: AsyncRefCell<HttpRequestReader>, - wr: AsyncRefCell<HttpResponseWriter>, - accept_encoding: Encoding, cancel_handle: CancelHandle, size: SizeHint, } -impl HttpStreamResource { - fn new( - conn: &Rc<HttpConnResource>, - request: Request<Body>, - response_tx: oneshot::Sender<Response<Body>>, - accept_encoding: Encoding, - ) -> Self { +pub struct HttpStreamWriteResource { + conn: Rc<HttpConnResource>, + wr: AsyncRefCell<HttpResponseWriter>, + accept_encoding: Encoding, +} + +impl HttpStreamReadResource { + fn new(conn: &Rc<HttpConnResource>, request: Request<Body>) -> Self { let size = request.body().size_hint(); Self { - conn: conn.clone(), + _conn: conn.clone(), rd: HttpRequestReader::Headers(request).into(), - wr: HttpResponseWriter::Headers(response_tx).into(), - accept_encoding, size, cancel_handle: CancelHandle::new(), } } } -impl Resource for HttpStreamResource { +impl Resource for HttpStreamReadResource { fn name(&self) -> Cow<str> { - "httpStream".into() + "httpReadStream".into() } fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { @@ -440,6 +444,26 @@ impl Resource for HttpStreamResource { } } +impl HttpStreamWriteResource { + fn new( + conn: &Rc<HttpConnResource>, + response_tx: oneshot::Sender<Response<Body>>, + accept_encoding: Encoding, + ) -> Self { + Self { + conn: conn.clone(), + wr: HttpResponseWriter::Headers(response_tx).into(), + accept_encoding, + } + } +} + +impl Resource for HttpStreamWriteResource { + fn name(&self) -> Cow<str> { + "httpWriteStream".into() + } +} + /// The read half of an HTTP stream. pub enum HttpRequestReader { Headers(Request<Body>), @@ -504,7 +528,9 @@ impl Drop for BodyUncompressedSender { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NextRequestResponse( - // stream_rid: + // read_stream_rid: + ResourceId, + // write_stream_rid: ResourceId, // method: // This is a String rather than a ByteString because reqwest will only return @@ -523,10 +549,17 @@ async fn op_http_accept( let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?; match conn.accept().await { - Ok(Some((stream, method, url))) => { - let stream_rid = - state.borrow_mut().resource_table.add_rc(Rc::new(stream)); - let r = NextRequestResponse(stream_rid, method, url); + Ok(Some((read_stream, write_stream, method, url))) => { + let read_stream_rid = state + .borrow_mut() + .resource_table + .add_rc(Rc::new(read_stream)); + let write_stream_rid = state + .borrow_mut() + .resource_table + .add_rc(Rc::new(write_stream)); + let r = + NextRequestResponse(read_stream_rid, write_stream_rid, method, url); Ok(Some(r)) } Ok(None) => Ok(None), @@ -628,7 +661,7 @@ async fn op_http_write_headers( let stream = state .borrow_mut() .resource_table - .get::<HttpStreamResource>(rid)?; + .get::<HttpStreamWriteResource>(rid)?; // Track supported encoding let encoding = stream.accept_encoding; @@ -693,7 +726,7 @@ fn op_http_headers( state: &mut OpState, #[smi] rid: u32, ) -> Result<Vec<(ByteString, ByteString)>, AnyError> { - let stream = state.resource_table.get::<HttpStreamResource>(rid)?; + let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?; let rd = RcRef::map(&stream, |r| &r.rd) .try_borrow() .ok_or_else(|| http_error("already in use"))?; @@ -849,7 +882,7 @@ async fn op_http_write_resource( let http_stream = state .borrow() .resource_table - .get::<HttpStreamResource>(rid)?; + .get::<HttpStreamWriteResource>(rid)?; let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; let resource = state.borrow().resource_table.get_any(stream)?; loop { @@ -908,7 +941,7 @@ async fn op_http_write( let stream = state .borrow() .resource_table - .get::<HttpStreamResource>(rid)?; + .get::<HttpStreamWriteResource>(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; match &mut *wr { @@ -960,7 +993,7 @@ async fn op_http_shutdown( let stream = state .borrow() .resource_table - .get::<HttpStreamResource>(rid)?; + .get::<HttpStreamWriteResource>(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let wr = take(&mut *wr); match wr { @@ -1008,7 +1041,7 @@ async fn op_http_upgrade_websocket( let stream = state .borrow_mut() .resource_table - .get::<HttpStreamResource>(rid)?; + .get::<HttpStreamReadResource>(rid)?; let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; let request = match &mut *rd { |
