summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYoshiya Hinosawa <stibium121@gmail.com>2022-11-09 17:20:05 +0900
committerGitHub <noreply@github.com>2022-11-09 17:20:05 +0900
commit9edcab524fef558abce824731e78f83f7aac28dd (patch)
treebb78f8e648500d954dcde8d1a9f9bb153d63da98
parentc08fcd96c1d2e903101718c4792d2b5faec94b24 (diff)
fix(ext/flash): revert #16284 and add test case (#16576)
-rw-r--r--cli/tests/unit/flash_test.ts34
-rw-r--r--ext/flash/01_http.js40
-rw-r--r--ext/flash/lib.rs52
3 files changed, 51 insertions, 75 deletions
diff --git a/cli/tests/unit/flash_test.ts b/cli/tests/unit/flash_test.ts
index e2e64dfe3..024069455 100644
--- a/cli/tests/unit/flash_test.ts
+++ b/cli/tests/unit/flash_test.ts
@@ -2282,6 +2282,40 @@ Deno.test(
},
);
+// Checks large streaming response
+// https://github.com/denoland/deno/issues/16567
+Deno.test(
+ { permissions: { net: true } },
+ async function testIssue16567() {
+ const ac = new AbortController();
+ const promise = deferred();
+ const server = Deno.serve(() =>
+ new Response(
+ new ReadableStream({
+ start(c) {
+ // 2MB "a...a" response with 40 chunks
+ for (const _ of Array(40)) {
+ c.enqueue(new Uint8Array(50_000).fill(97));
+ }
+ c.close();
+ },
+ }),
+ ), {
+ async onListen() {
+ const res1 = await fetch("http://localhost:9000/");
+ assertEquals((await res1.text()).length, 40 * 50_000);
+
+ promise.resolve();
+ ac.abort();
+ },
+ signal: ac.signal,
+ });
+
+ await promise;
+ await server;
+ },
+);
+
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
index df013ce65..4435860ff 100644
--- a/ext/flash/01_http.js
+++ b/ext/flash/01_http.js
@@ -371,8 +371,6 @@
}
} else {
const reader = respBody.getReader();
- const { value, done } = await reader.read();
- // Best case: sends headers + first chunk in a single go.
writeFixedResponse(
serverId,
i,
@@ -387,21 +385,14 @@
false,
respondFast,
);
- await respondChunked(
- i,
- value,
- done,
- );
- if (!done) {
- while (true) {
- const chunk = await reader.read();
- await respondChunked(
- i,
- chunk.value,
- chunk.done,
- );
- if (chunk.done) break;
- }
+ while (true) {
+ const { value, done } = await reader.read();
+ await respondChunked(
+ i,
+ value,
+ done,
+ );
+ if (done) break;
}
}
}
@@ -632,22 +623,13 @@
});
function respondChunked(token, chunk, shutdown) {
- const nwritten = core.ops.op_try_flash_respond_chuncked(
+ return core.opAsync(
+ "op_flash_respond_chuncked",
serverId,
token,
- chunk ?? new Uint8Array(),
+ chunk,
shutdown,
);
- if (nwritten > 0) {
- return core.opAsync(
- "op_flash_respond_chuncked",
- serverId,
- token,
- chunk,
- shutdown,
- nwritten,
- );
- }
}
const fastOp = prepareFastCalls();
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index 9cff7d2fc..1f3686760 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -150,34 +150,6 @@ async fn op_flash_respond_async(
Ok(())
}
-#[op(fast)]
-fn op_try_flash_respond_chuncked(
- op_state: &mut OpState,
- server_id: u32,
- token: u32,
- response: &[u8],
- shutdown: bool,
-) -> u32 {
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- let tx = ctx.requests.get(&token).unwrap();
- let sock = tx.socket();
-
- // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
- // https://github.com/denoland/deno/pull/15629
- let h = format!("{:x}\r\n", response.len());
- let concat = [h.as_bytes(), response, b"\r\n"].concat();
- let expected = sock.try_write(&concat);
- if expected != concat.len() {
- return expected as u32;
- }
- if shutdown {
- // Best case: We've written everything and the stream is done too.
- let _ = ctx.requests.remove(&token).unwrap();
- }
- 0
-}
-
#[op]
async fn op_flash_respond_chuncked(
op_state: Rc<RefCell<OpState>>,
@@ -185,7 +157,6 @@ async fn op_flash_respond_chuncked(
token: u32,
response: Option<ZeroCopyBuf>,
shutdown: bool,
- nwritten: u32,
) -> Result<(), AnyError> {
let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
@@ -207,27 +178,17 @@ async fn op_flash_respond_chuncked(
.with_async_stream(|stream| {
Box::pin(async move {
use tokio::io::AsyncWriteExt;
- // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
- // https://github.com/denoland/deno/pull/15629
- macro_rules! write_whats_not_written {
- ($e:expr) => {
- let e = $e;
- let n = nwritten as usize;
- if n < e.len() {
- stream.write_all(&e[n..]).await?;
- }
- };
- }
if let Some(response) = response {
- let h = format!("{:x}\r\n", response.len());
- write_whats_not_written!(h.as_bytes());
- write_whats_not_written!(&response);
- write_whats_not_written!(b"\r\n");
+ stream
+ .write_all(format!("{:x}\r\n", response.len()).as_bytes())
+ .await?;
+ stream.write_all(&response).await?;
+ stream.write_all(b"\r\n").await?;
}
// The last chunk
if shutdown {
- write_whats_not_written!(b"0\r\n\r\n");
+ stream.write_all(b"0\r\n\r\n").await?;
}
Ok(())
@@ -1487,7 +1448,6 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
op_flash_respond::decl(),
op_flash_respond_async::decl(),
op_flash_respond_chuncked::decl(),
- op_try_flash_respond_chuncked::decl(),
op_flash_method::decl(),
op_flash_path::decl(),
op_flash_headers::decl(),