diff options
| author | Luca Casonato <hello@lcas.dev> | 2022-09-30 07:54:12 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-09-30 07:54:12 +0200 |
| commit | 20c7300412bdb487fc758577d6256bbcf96efd12 (patch) | |
| tree | 2dcd218a6095a2ad143fb27e304391b5fe64cf27 /ext/http | |
| parent | 38f544538b337074cbce317e67859a69bb23684c (diff) | |
refactor(ext/http): remove op_http_read (#16096)
We can use Resource::read_return & op_read instead. This allows HTTP
request bodies to participate in FastStream.
To make this work, `readableStreamForRid` required a change to allow non
auto-closing resources to be handled. This required some minor changes
in our FastStream paths in ext/http and ext/flash.
Diffstat (limited to 'ext/http')
| -rw-r--r-- | ext/http/01_http.js | 52 | ||||
| -rw-r--r-- | ext/http/lib.rs | 102 |
2 files changed, 64 insertions, 90 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 63023a296..588a7da57 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -17,8 +17,7 @@ } = window.__bootstrap.fetch; const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype, ops } = core; - const { ReadableStream, ReadableStreamPrototype } = - window.__bootstrap.streams; + const { ReadableStreamPrototype } = window.__bootstrap.streams; const abortSignal = window.__bootstrap.abortSignal; const { WebSocket, @@ -33,8 +32,12 @@ } = window.__bootstrap.webSocket; const { TcpConn, UnixConn } = window.__bootstrap.net; const { TlsConn } = window.__bootstrap.tls; - const { Deferred, getReadableStreamRid, readableStreamClose } = - window.__bootstrap.streams; + const { + Deferred, + getReadableStreamResourceBacking, + readableStreamForRid, + readableStreamClose, + } = window.__bootstrap.streams; const { ArrayPrototypeIncludes, ArrayPrototypePush, @@ -50,7 +53,6 @@ StringPrototypeSplit, Symbol, SymbolAsyncIterator, - TypedArrayPrototypeSubarray, TypeError, Uint8Array, Uint8ArrayPrototype, @@ -121,7 +123,7 @@ // It will be closed automatically once the request has been handled and // the response has been sent. if (method !== "GET" && method !== "HEAD") { - body = createRequestBodyStream(streamRid); + body = readableStreamForRid(streamRid, false); } const innerRequest = newInnerRequest( @@ -170,10 +172,6 @@ } } - function readRequest(streamRid, buf) { - return core.opAsync("op_http_read", streamRid, buf); - } - function createRespondWith( httpConn, streamRid, @@ -270,9 +268,9 @@ ) { throw new TypeError("Unreachable"); } - const resourceRid = getReadableStreamRid(respBody); + const resourceBacking = getReadableStreamResourceBacking(respBody); let reader; - if (resourceRid) { + if (resourceBacking) { if (respBody.locked) { throw new TypeError("ReadableStream is locked."); } @@ -281,9 +279,9 @@ await core.opAsync( "op_http_write_resource", streamRid, - resourceRid, + resourceBacking.rid, ); - core.tryClose(resourceRid); + if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid); readableStreamClose(respBody); // Release JS lock. } catch (error) { const connError = httpConn[connErrorSymbol]; @@ -379,32 +377,6 @@ }; } - function createRequestBodyStream(streamRid) { - return new ReadableStream({ - type: "bytes", - async pull(controller) { - try { - // This is the largest possible size for a single packet on a TLS - // stream. - const chunk = new Uint8Array(16 * 1024 + 256); - const read = await readRequest(streamRid, chunk); - if (read > 0) { - // We read some data. Enqueue it onto the stream. - controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); - } else { - // We have reached the end of the body, so we close the stream. - controller.close(); - } - } catch (err) { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - controller.close(); - } - }, - }); - } - const _ws = Symbol("[[associated_ws]]"); function upgradeWebSocket(request, options = {}) { diff --git a/ext/http/lib.rs b/ext/http/lib.rs index d1b38fb42..bffe3c3d5 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -78,7 +78,6 @@ pub fn init() -> Extension { )) .ops(vec![ op_http_accept::decl(), - op_http_read::decl(), op_http_write_headers::decl(), op_http_headers::decl(), op_http_write::decl(), @@ -329,11 +328,63 @@ impl HttpStreamResource { } } +impl HttpStreamResource { + async fn read( + self: Rc<Self>, + mut buf: ZeroCopyBuf, + ) -> Result<(usize, ZeroCopyBuf), AnyError> { + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + + let body = loop { + match &mut *rd { + HttpRequestReader::Headers(_) => {} + HttpRequestReader::Body(_, body) => break body, + HttpRequestReader::Closed => return Ok((0, buf)), + } + match take(&mut *rd) { + HttpRequestReader::Headers(request) => { + let (parts, body) = request.into_parts(); + *rd = HttpRequestReader::Body(parts.headers, body.peekable()); + } + _ => unreachable!(), + }; + }; + + let fut = async { + let mut body = Pin::new(body); + loop { + match body.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(buf.len(), chunk.len()); + buf[..len].copy_from_slice(&chunk.split_to(len)); + break Ok((len, buf)); + } + Some(_) => match body.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok((0, buf)), + } + } + }; + + let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); + fut.try_or_cancel(cancel_handle).await + } +} + impl Resource for HttpStreamResource { fn name(&self) -> Cow<str> { "httpStream".into() } + fn read_return( + self: Rc<Self>, + _buf: ZeroCopyBuf, + ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> { + Box::pin(self.read(_buf)) + } + fn close(self: Rc<Self>) { self.cancel_handle.cancel(); } @@ -817,55 +868,6 @@ async fn op_http_shutdown( } #[op] -async fn op_http_read( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - mut buf: ZeroCopyBuf, -) -> Result<usize, AnyError> { - let stream = state - .borrow_mut() - .resource_table - .get::<HttpStreamResource>(rid)?; - let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; - - let body = loop { - match &mut *rd { - HttpRequestReader::Headers(_) => {} - HttpRequestReader::Body(_, body) => break body, - HttpRequestReader::Closed => return Ok(0), - } - match take(&mut *rd) { - HttpRequestReader::Headers(request) => { - let (parts, body) = request.into_parts(); - *rd = HttpRequestReader::Body(parts.headers, body.peekable()); - } - _ => unreachable!(), - }; - }; - - let fut = async { - let mut body = Pin::new(body); - loop { - match body.as_mut().peek_mut().await { - Some(Ok(chunk)) if !chunk.is_empty() => { - let len = min(buf.len(), chunk.len()); - buf[..len].copy_from_slice(&chunk.split_to(len)); - break Ok(len); - } - Some(_) => match body.as_mut().next().await.unwrap() { - Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), - }, - None => break Ok(0), - } - } - }; - - let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle); - fut.try_or_cancel(cancel_handle).await -} - -#[op] fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> { let digest = ring::digest::digest( &ring::digest::SHA1_FOR_LEGACY_USE_ONLY, |
