diff options
author | Yoshiya Hinosawa <stibium121@gmail.com> | 2022-11-09 17:20:05 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-09 17:20:05 +0900 |
commit | 9edcab524fef558abce824731e78f83f7aac28dd (patch) | |
tree | bb78f8e648500d954dcde8d1a9f9bb153d63da98 | |
parent | c08fcd96c1d2e903101718c4792d2b5faec94b24 (diff) |
fix(ext/flash): revert #16284 and add test case (#16576)
-rw-r--r-- | cli/tests/unit/flash_test.ts | 34 | ||||
-rw-r--r-- | ext/flash/01_http.js | 40 | ||||
-rw-r--r-- | ext/flash/lib.rs | 52 |
3 files changed, 51 insertions, 75 deletions
diff --git a/cli/tests/unit/flash_test.ts b/cli/tests/unit/flash_test.ts index e2e64dfe3..024069455 100644 --- a/cli/tests/unit/flash_test.ts +++ b/cli/tests/unit/flash_test.ts @@ -2282,6 +2282,40 @@ Deno.test( }, ); +// Checks large streaming response +// https://github.com/denoland/deno/issues/16567 +Deno.test( + { permissions: { net: true } }, + async function testIssue16567() { + const ac = new AbortController(); + const promise = deferred(); + const server = Deno.serve(() => + new Response( + new ReadableStream({ + start(c) { + // 2MB "a...a" response with 40 chunks + for (const _ of Array(40)) { + c.enqueue(new Uint8Array(50_000).fill(97)); + } + c.close(); + }, + }), + ), { + async onListen() { + const res1 = await fetch("http://localhost:9000/"); + assertEquals((await res1.text()).length, 40 * 50_000); + + promise.resolve(); + ac.abort(); + }, + signal: ac.signal, + }); + + await promise; + await server; + }, +); + function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(r); diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js index df013ce65..4435860ff 100644 --- a/ext/flash/01_http.js +++ b/ext/flash/01_http.js @@ -371,8 +371,6 @@ } } else { const reader = respBody.getReader(); - const { value, done } = await reader.read(); - // Best case: sends headers + first chunk in a single go. writeFixedResponse( serverId, i, @@ -387,21 +385,14 @@ false, respondFast, ); - await respondChunked( - i, - value, - done, - ); - if (!done) { - while (true) { - const chunk = await reader.read(); - await respondChunked( - i, - chunk.value, - chunk.done, - ); - if (chunk.done) break; - } + while (true) { + const { value, done } = await reader.read(); + await respondChunked( + i, + value, + done, + ); + if (done) break; } } } @@ -632,22 +623,13 @@ }); function respondChunked(token, chunk, shutdown) { - const nwritten = core.ops.op_try_flash_respond_chuncked( + return core.opAsync( + "op_flash_respond_chuncked", serverId, token, - chunk ?? new Uint8Array(), + chunk, shutdown, ); - if (nwritten > 0) { - return core.opAsync( - "op_flash_respond_chuncked", - serverId, - token, - chunk, - shutdown, - nwritten, - ); - } } const fastOp = prepareFastCalls(); diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index 9cff7d2fc..1f3686760 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -150,34 +150,6 @@ async fn op_flash_respond_async( Ok(()) } -#[op(fast)] -fn op_try_flash_respond_chuncked( - op_state: &mut OpState, - server_id: u32, - token: u32, - response: &[u8], - shutdown: bool, -) -> u32 { - let flash_ctx = op_state.borrow_mut::<FlashContext>(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - let tx = ctx.requests.get(&token).unwrap(); - let sock = tx.socket(); - - // TODO(@littledivy): Use writev when `UnixIoSlice` lands. - // https://github.com/denoland/deno/pull/15629 - let h = format!("{:x}\r\n", response.len()); - let concat = [h.as_bytes(), response, b"\r\n"].concat(); - let expected = sock.try_write(&concat); - if expected != concat.len() { - return expected as u32; - } - if shutdown { - // Best case: We've written everything and the stream is done too. - let _ = ctx.requests.remove(&token).unwrap(); - } - 0 -} - #[op] async fn op_flash_respond_chuncked( op_state: Rc<RefCell<OpState>>, @@ -185,7 +157,6 @@ async fn op_flash_respond_chuncked( token: u32, response: Option<ZeroCopyBuf>, shutdown: bool, - nwritten: u32, ) -> Result<(), AnyError> { let mut op_state = op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); @@ -207,27 +178,17 @@ async fn op_flash_respond_chuncked( .with_async_stream(|stream| { Box::pin(async move { use tokio::io::AsyncWriteExt; - // TODO(@littledivy): Use writev when `UnixIoSlice` lands. - // https://github.com/denoland/deno/pull/15629 - macro_rules! write_whats_not_written { - ($e:expr) => { - let e = $e; - let n = nwritten as usize; - if n < e.len() { - stream.write_all(&e[n..]).await?; - } - }; - } if let Some(response) = response { - let h = format!("{:x}\r\n", response.len()); - write_whats_not_written!(h.as_bytes()); - write_whats_not_written!(&response); - write_whats_not_written!(b"\r\n"); + stream + .write_all(format!("{:x}\r\n", response.len()).as_bytes()) + .await?; + stream.write_all(&response).await?; + stream.write_all(b"\r\n").await?; } // The last chunk if shutdown { - write_whats_not_written!(b"0\r\n\r\n"); + stream.write_all(b"0\r\n\r\n").await?; } Ok(()) @@ -1487,7 +1448,6 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension { op_flash_respond::decl(), op_flash_respond_async::decl(), op_flash_respond_chuncked::decl(), - op_try_flash_respond_chuncked::decl(), op_flash_method::decl(), op_flash_path::decl(), op_flash_headers::decl(), |